package com.highdatas.mdm.service.impl;
|
|
import com.highdatas.mdm.service.HBaseService;
|
import lombok.extern.slf4j.Slf4j;
|
import org.apache.commons.lang3.StringUtils;
|
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.hbase.HColumnDescriptor;
|
import org.apache.hadoop.hbase.HTableDescriptor;
|
import org.apache.hadoop.hbase.TableName;
|
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
import org.apache.hadoop.hbase.client.Result;
|
import org.apache.hadoop.hbase.client.Scan;
|
import org.apache.hadoop.hbase.filter.*;
|
import org.apache.hadoop.hbase.util.Bytes;
|
import org.springframework.beans.factory.annotation.Autowired;
|
import org.springframework.data.hadoop.hbase.HbaseTemplate;
|
import org.springframework.stereotype.Service;
|
|
import java.util.List;
|
import java.util.stream.Collectors;
|
|
/**
|
* @author kimi
|
* @description
|
* @date 2020-03-31 16:03
|
*/
|
|
@Service
|
@Slf4j
|
public class HBaseServiceImpl implements HBaseService {
|
@Autowired
|
HbaseTemplate hbaseTemplate;
|
|
@Override
|
public List<Result> getRowKeyAndColumn(String tableName, String startRowkey, String stopRowkey, String column, String qualifier) {
|
FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
|
if (StringUtils.isNotBlank(column)) {
|
log.debug("{}", column);
|
filterList.addFilter(new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(column))));
|
}
|
if (StringUtils.isNotBlank(qualifier)) {
|
log.debug("{}", qualifier);
|
filterList.addFilter(new QualifierFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(qualifier))));
|
}
|
Scan scan = new Scan();
|
if (filterList.getFilters().size() > 0) {
|
scan.setFilter(filterList);
|
}
|
scan.setStartRow(Bytes.toBytes(startRowkey));
|
scan.setStopRow(Bytes.toBytes(stopRowkey));
|
|
return hbaseTemplate.find(tableName, scan, (rowMapper, rowNum) -> rowMapper);
|
}
|
|
@Override
|
public List<Result> getListRowkeyData(String tableName, List<String> rowKeys, String familyColumn, String column) {
|
return rowKeys.stream().map(rk -> {
|
if (StringUtils.isNotBlank(familyColumn)) {
|
if (StringUtils.isNotBlank(column)) {
|
return hbaseTemplate.get(tableName, rk, familyColumn, column, (rowMapper, rowNum) -> rowMapper);
|
} else {
|
return hbaseTemplate.get(tableName, rk, familyColumn, (rowMapper, rowNum) -> rowMapper);
|
}
|
}
|
return hbaseTemplate.get(tableName, rk, (rowMapper, rowNum) -> rowMapper);
|
}).collect(Collectors.toList());
|
}
|
|
@Override
|
public void createTable(String tableName) {
|
try {
|
Configuration conf = hbaseTemplate.getConfiguration();
|
//得到HBase的一个客户端
|
HBaseAdmin client = new HBaseAdmin(conf);
|
//创建表的描述符
|
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
|
//创建列族
|
HColumnDescriptor h1 = new HColumnDescriptor("info");
|
HColumnDescriptor h2 = new HColumnDescriptor("prop");
|
//把列族加入表的描述符中
|
htd.addFamily(h1);
|
htd.addFamily(h2);
|
client.createTable(htd);
|
client.close();
|
}
|
catch (Exception e) {
|
e.printStackTrace();
|
}
|
}
|
}
|