package com.winhc.phoenix.example.service.impl; import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.winhc.phoenix.example.service.HbaseQueryService; import com.winhc.phoenix.example.util.HbaseResultUtils; import lombok.AllArgsConstructor; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.filter.PageFilter; import org.springframework.stereotype.Service; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Map; /** * @Author: XuJiakai * @Date: 2020/9/16 18:55 * @Description: */ @Slf4j @Service @AllArgsConstructor public class HbaseQueryServiceImpl implements HbaseQueryService { private Connection connection; @SneakyThrows @Override public List scan(String tableName, String rowPrefix, Long size) { try (Table table = connection.getTable(TableName.valueOf(tableName.toUpperCase()))) { Scan scan = new Scan(); scan.setRowPrefixFilter(rowPrefix.getBytes()); PageFilter pageFilter = new PageFilter(size); /*SingleColumnValueFilter filter = new SingleColumnValueFilter("F".getBytes(), "START_DATE".getBytes(), CompareOperator.GREATER_OR_EQUAL, time.getBytes()); SingleColumnValueFilter filter2 = new SingleColumnValueFilter("F".getBytes(), "START_DATE".getBytes(), CompareOperator.LESS_OR_EQUAL, calculateTheClosestNextRowKeyForPrefix(time.getBytes())); */ /*FilterList filterList = new FilterList(); filterList.addFilter(Arrays.asList(filter, filter2));*/ scan.setFilter(pageFilter); ResultScanner scanner = table.getScanner(scan); List list = new ArrayList<>(); for (Result result : scanner) { //每一行数据 Map columnMap = HbaseResultUtils.parseResult(result); list.add(columnMap); } return list; } catch (IOException e) { throw e; } } @SneakyThrows @Override public Object get(String tableName, String rowkey) { try (Table table = connection.getTable(TableName.valueOf(tableName.toUpperCase()))) { Get get = new Get(rowkey.getBytes()); Result result = table.get(get); //每一行数据 return HbaseResultUtils.parseResult(result); } catch (IOException e) { log.error(e.getMessage(), e); throw e; } } // jackson转换工具 private static final ObjectMapper objectMapper = new ObjectMapper() .configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, true); @Override public void asyncScan(String tableName, String rowPrefix) { try (Table table = connection.getTable(TableName.valueOf(tableName.toUpperCase()))) { Scan scan = new Scan(); scan.addFamily("F".getBytes()); scan.setRowPrefixFilter(rowPrefix.getBytes()); scan.setCaching(1); ResultScanner scanner = table.getScanner(scan); for (Result result : scanner) { //每一行数据 Map columnMap = HbaseResultUtils.parseResult(result); log.info(objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(columnMap)); } } catch (IOException e) { log.error(e.getMessage(), e); } } private byte[] calculateTheClosestNextRowKeyForPrefix(byte[] rowKeyPrefix) { // Essentially we are treating it like an 'unsigned very very long' and doing +1 manually. // Search for the place where the trailing 0xFFs start int offset = rowKeyPrefix.length; while (offset > 0) { if (rowKeyPrefix[offset - 1] != (byte) 0xFF) { break; } offset--; } if (offset == 0) { // We got an 0xFFFF... (only FFs) stopRow value which is // the last possible prefix before the end of the table. // So set it to stop at the 'end of the table' return HConstants.EMPTY_END_ROW; } // Copy the right length of the original byte[] newStopRow = Arrays.copyOfRange(rowKeyPrefix, 0, offset); // And increment the last one newStopRow[newStopRow.length - 1]++; return newStopRow; } }