HbaseQueryServiceImpl.java 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. package com.winhc.phoenix.example.service.impl;
  2. import com.fasterxml.jackson.databind.DeserializationFeature;
  3. import com.fasterxml.jackson.databind.ObjectMapper;
  4. import com.winhc.phoenix.example.service.HbaseQueryService;
  5. import com.winhc.phoenix.example.util.HbaseResultUtils;
  6. import lombok.AllArgsConstructor;
  7. import lombok.SneakyThrows;
  8. import lombok.extern.slf4j.Slf4j;
  9. import org.apache.hadoop.hbase.HConstants;
  10. import org.apache.hadoop.hbase.TableName;
  11. import org.apache.hadoop.hbase.client.*;
  12. import org.apache.hadoop.hbase.filter.PageFilter;
  13. import org.springframework.stereotype.Service;
  14. import java.io.IOException;
  15. import java.util.ArrayList;
  16. import java.util.Arrays;
  17. import java.util.List;
  18. import java.util.Map;
  19. /**
  20. * @Author: XuJiakai
  21. * @Date: 2020/9/16 18:55
  22. * @Description:
  23. */
  24. @Slf4j
  25. @Service
  26. @AllArgsConstructor
  27. public class HbaseQueryServiceImpl implements HbaseQueryService {
  28. private Connection connection;
  29. @SneakyThrows
  30. @Override
  31. public List<Object> scan(String tableName, String rowPrefix, Long size) {
  32. try (Table table = connection.getTable(TableName.valueOf(tableName.toUpperCase()))) {
  33. Scan scan = new Scan();
  34. scan.setRowPrefixFilter(rowPrefix.getBytes());
  35. PageFilter pageFilter = new PageFilter(size);
  36. /*SingleColumnValueFilter filter = new SingleColumnValueFilter("F".getBytes(), "START_DATE".getBytes(), CompareOperator.GREATER_OR_EQUAL, time.getBytes());
  37. SingleColumnValueFilter filter2 = new SingleColumnValueFilter("F".getBytes(), "START_DATE".getBytes(), CompareOperator.LESS_OR_EQUAL, calculateTheClosestNextRowKeyForPrefix(time.getBytes()));
  38. */
  39. /*FilterList filterList = new FilterList();
  40. filterList.addFilter(Arrays.asList(filter, filter2));*/
  41. scan.setFilter(pageFilter);
  42. ResultScanner scanner = table.getScanner(scan);
  43. List<Object> list = new ArrayList<>();
  44. for (Result result : scanner) {
  45. //每一行数据
  46. Map<String, Object> columnMap = HbaseResultUtils.parseResult(result);
  47. list.add(columnMap);
  48. }
  49. return list;
  50. } catch (IOException e) {
  51. throw e;
  52. }
  53. }
  54. @SneakyThrows
  55. @Override
  56. public Object get(String tableName, String rowkey) {
  57. try (Table table = connection.getTable(TableName.valueOf(tableName.toUpperCase()))) {
  58. Get get = new Get(rowkey.getBytes());
  59. Result result = table.get(get);
  60. //每一行数据
  61. return HbaseResultUtils.parseResult(result);
  62. } catch (IOException e) {
  63. log.error(e.getMessage(), e);
  64. throw e;
  65. }
  66. }
  67. // jackson转换工具
  68. private static final ObjectMapper objectMapper = new ObjectMapper()
  69. .configure(DeserializationFeature.ACCEPT_SINGLE_VALUE_AS_ARRAY, true);
  70. @Override
  71. public void asyncScan(String tableName, String rowPrefix) {
  72. try (Table table = connection.getTable(TableName.valueOf(tableName.toUpperCase()))) {
  73. Scan scan = new Scan();
  74. scan.addFamily("F".getBytes());
  75. scan.setRowPrefixFilter(rowPrefix.getBytes());
  76. scan.setCaching(1);
  77. ResultScanner scanner = table.getScanner(scan);
  78. for (Result result : scanner) {
  79. //每一行数据
  80. Map<String, Object> columnMap = HbaseResultUtils.parseResult(result);
  81. log.info(objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(columnMap));
  82. }
  83. } catch (IOException e) {
  84. log.error(e.getMessage(), e);
  85. }
  86. }
  87. private byte[] calculateTheClosestNextRowKeyForPrefix(byte[] rowKeyPrefix) {
  88. // Essentially we are treating it like an 'unsigned very very long' and doing +1 manually.
  89. // Search for the place where the trailing 0xFFs start
  90. int offset = rowKeyPrefix.length;
  91. while (offset > 0) {
  92. if (rowKeyPrefix[offset - 1] != (byte) 0xFF) {
  93. break;
  94. }
  95. offset--;
  96. }
  97. if (offset == 0) {
  98. // We got an 0xFFFF... (only FFs) stopRow value which is
  99. // the last possible prefix before the end of the table.
  100. // So set it to stop at the 'end of the table'
  101. return HConstants.EMPTY_END_ROW;
  102. }
  103. // Copy the right length of the original
  104. byte[] newStopRow = Arrays.copyOfRange(rowKeyPrefix, 0, offset);
  105. // And increment the last one
  106. newStopRow[newStopRow.length - 1]++;
  107. return newStopRow;
  108. }
  109. }