123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124 |
- package com.winhc.phoenix.example.service.impl;
- import com.fasterxml.jackson.databind.DeserializationFeature;
- import com.fasterxml.jackson.databind.ObjectMapper;
- import com.winhc.phoenix.example.service.HbaseQueryService;
- import com.winhc.phoenix.example.util.HbaseResultUtils;
- import lombok.AllArgsConstructor;
- import lombok.SneakyThrows;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.hadoop.hbase.HConstants;
- import org.apache.hadoop.hbase.TableName;
- import org.apache.hadoop.hbase.client.*;
- import org.apache.hadoop.hbase.filter.PageFilter;
- import org.springframework.stereotype.Service;
- import java.io.IOException;
- import java.util.ArrayList;
- import java.util.Arrays;
- import java.util.List;
- import java.util.Map;
- /**
- * @Author: XuJiakai
- * @Date: 2020/9/16 18:55
- * @Description:
- */
- @Slf4j
- @Service
- @AllArgsConstructor
- public class HbaseQueryServiceImpl implements HbaseQueryService {
- private Connection connection;
- @SneakyThrows
- @Override
- public List<Object> scan(String tableName, String rowPrefix, Long size) {
- try (Table table = connection.getTable(TableName.valueOf(tableName.toUpperCase()))) {
- Scan scan = new Scan();
- scan.setRowPrefixFilter(rowPrefix.getBytes());
- PageFilter pageFilter = new PageFilter(size);
- /*SingleColumnValueFilter filter = new SingleColumnValueFilter("F".getBytes(), "START_DATE".getBytes(), CompareOperator.GREATER_OR_EQUAL, time.getBytes());
- SingleColumnValueFilter filter2 = new SingleColumnValueFilter("F".getBytes(), "START_DATE".getBytes(), CompareOperator.LESS_OR_EQUAL, calculateTheClosestNextRowKeyForPrefix(time.getBytes()));
- */
- /*FilterList filterList = new FilterList();
- filterList.addFilter(Arrays.asList(filter, filter2));*/
- scan.setFilter(pageFilter);
- ResultScanner scanner = table.getScanner(scan);
- List<Object> list = new ArrayList<>();
- for (Result result : scanner) {
- //每一行数据
- Map<String, Object> columnMap = HbaseResultUtils.parseResult(result);
- list.add(columnMap);
- }
- return list;
- } catch (IOException e) {
- throw e;
- }
- }
- @SneakyThrows
- @Override
- public Object get(String tableName, String rowkey) {
- try (Table table = connection.getTable(TableName.valueOf(tableName.toUpperCase()))) {
- Get get = new Get(rowkey.getBytes());
- Result result = table.get(get);
- //每一行数据
- return HbaseResultUtils.parseResult(result);
- } catch (IOException e) {
- log.error(e.getMessage(), e);
- throw e;
- }
- }
- // jackson转换工具
- private static final ObjectMapper objectMapper = new ObjectMapper()
- .configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, true);
- @Override
- public void asyncScan(String tableName, String rowPrefix) {
- try (Table table = connection.getTable(TableName.valueOf(tableName.toUpperCase()))) {
- Scan scan = new Scan();
- scan.addFamily("F".getBytes());
- scan.setRowPrefixFilter(rowPrefix.getBytes());
- scan.setCaching(1);
- ResultScanner scanner = table.getScanner(scan);
- for (Result result : scanner) {
- //每一行数据
- Map<String, Object> columnMap = HbaseResultUtils.parseResult(result);
- log.info(objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(columnMap));
- }
- } catch (IOException e) {
- log.error(e.getMessage(), e);
- }
- }
- private byte[] calculateTheClosestNextRowKeyForPrefix(byte[] rowKeyPrefix) {
- // Essentially we are treating it like an 'unsigned very very long' and doing +1 manually.
- // Search for the place where the trailing 0xFFs start
- int offset = rowKeyPrefix.length;
- while (offset > 0) {
- if (rowKeyPrefix[offset - 1] != (byte) 0xFF) {
- break;
- }
- offset--;
- }
- if (offset == 0) {
- // We got an 0xFFFF... (only FFs) stopRow value which is
- // the last possible prefix before the end of the table.
- // So set it to stop at the 'end of the table'
- return HConstants.EMPTY_END_ROW;
- }
- // Copy the right length of the original
- byte[] newStopRow = Arrays.copyOfRange(rowKeyPrefix, 0, offset);
- // And increment the last one
- newStopRow[newStopRow.length - 1]++;
- return newStopRow;
- }
- }
|