瀏覽代碼

add: hbase操作工具

许家凯 4 年之前
父節點
當前提交
26b4052d48

+ 108 - 0
src/main/java/com/winhc/phoenix/example/framework/es/EsFastScan.java

@@ -0,0 +1,108 @@
+package com.winhc.phoenix.example.framework.es;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.action.search.SearchScrollRequest;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.common.unit.TimeValue;
+import org.elasticsearch.search.Scroll;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+/**
+ * @author: XuJiakai
+ * 2020/11/16 19:23
+ */
+@Slf4j
+public class EsFastScan {
+    private final Scroll scroll = new Scroll(TimeValue.timeValueMinutes(8L));
+
+    private RestHighLevelClient client;
+    private Consumer<SearchHit[]> func;
+    private String index;
+    private String type;
+    private int batchSize = 1000;
+    private int thread_num = 1;
+
+    public EsFastScan(RestHighLevelClient client, Consumer<SearchHit[]> func, String index, String type) {
+        this.client = client;
+        this.func = func;
+        this.index = index;
+        this.type = type;
+    }
+
+
+    public EsFastScan batchSize(int batchSize) {
+        this.batchSize = batchSize;
+        return this;
+    }
+
+
+    @SneakyThrows
+    public void scan() {
+        int poolSize = thread_num;
+        ArrayBlockingQueue<Runnable> objects = new ArrayBlockingQueue<>(poolSize * 2);
+        ThreadPoolExecutor executorService = new ThreadPoolExecutor(
+                poolSize, poolSize,
+                0L, TimeUnit.MILLISECONDS,
+                objects,
+                new ThreadFactoryBuilder().setNameFormat("ScanEs-pool-").build(),
+                (r, executor) -> {
+                    try {
+                        executor.getQueue().put(r);
+                    } catch (InterruptedException e) {
+                        throw new RejectedExecutionException("interrupted", e);
+                    }
+                }
+        );
+
+
+        SearchRequest searchRequest = new SearchRequest(index).types(type).scroll(scroll);
+
+        searchRequest.source(new SearchSourceBuilder().size(batchSize));
+        SearchResponse searchResponse = client.search(searchRequest);
+
+        long totalHits = searchResponse.getHits().getTotalHits();
+        int n = searchResponse.getHits().getHits().length;
+        if (n != 0) {
+            executorService.submit(() -> {
+                log.info("执行一批:{}", n);
+                func.accept(searchResponse.getHits().getHits());
+            });
+        } else {
+            log.warn("数据为空!");
+        }
+
+        do {
+            int num = scanData(executorService, searchResponse.getScrollId());
+            if (num == 0) {
+                break;
+            }
+        } while (true);
+        log.info("es scan is successful,total hits: {}", totalHits);
+    }
+
+    @SneakyThrows
+    private int scanData(ThreadPoolExecutor executorService, String scrollId) {
+        SearchScrollRequest scroll = new SearchScrollRequest(scrollId).scroll(this.scroll);
+        SearchResponse searchResponse = client.searchScroll(scroll);
+        int num = searchResponse.getHits().getHits().length;
+        if (num == 0) {
+            return num;
+        }
+        executorService.submit(() -> {
+            log.info("执行一批:{}", num);
+            func.accept(searchResponse.getHits().getHits());
+        });
+        return num;
+    }
+}

+ 182 - 0
src/main/java/com/winhc/phoenix/example/framework/hbase/HbaseFastScan.java

@@ -0,0 +1,182 @@
+package com.winhc.phoenix.example.framework.hbase;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.winhc.phoenix.example.util.HbaseResultUtils;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.util.Bytes;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+/**
+ * @author: XuJiakai
+ * 2020/11/16 11:35
+ */
+@Slf4j
+public class HbaseFastScan {
+    private int thread_num = 1;
+    private int batchSize = 500;
+
+    private Connection connection;
+    private TableName tableName;
+
+    private Consumer<List<Map<String, Object>>> func;
+
+    public HbaseFastScan(Connection connection, String tableName, Consumer<List<Map<String, Object>>> func) {
+        this.connection = connection;
+        this.tableName = TableName.valueOf(tableName);
+        this.func = func;
+    }
+
+    public HbaseFastScan thread_num(int thread_num) {
+        this.thread_num = thread_num;
+        return this;
+    }
+
+    public HbaseFastScan batchSize(int batchSize) {
+        this.batchSize = batchSize;
+        return this;
+    }
+
+    private class Task extends Thread {
+        private byte[] startKey;
+        private byte[] endKey;
+        private String regionName;
+        private String tn;
+
+        public Task(RegionInfo regionInfo) {
+            this.regionName = regionInfo.getRegionNameAsString();
+            this.startKey = regionInfo.getStartKey();
+            this.endKey = regionInfo.getEndKey();
+            this.tn = regionName + "-" + getKeyAsString(startKey) + "_" + getKeyAsString(endKey);
+            log.info("{}:{}  {}", regionName, getKeyAsString(startKey), getKeyAsString(endKey));
+            setName(tn);
+        }
+
+        @Override
+        public void run() {
+            log.info("start。。。");
+            List<Map<String, Object>> list = new ArrayList<>();
+            try (Table table = connection.getTable(tableName)) {
+                Scan scan = new Scan();
+                if (startKey.length != 0) {
+                    scan.withStartRow(startKey);
+                }
+                if (endKey.length != 0) {
+                    scan.withStopRow(endKey);
+                }
+                scan.setCaching(batchSize);
+
+                ResultScanner scanner = table.getScanner(scan);
+                for (Result result : scanner) {
+                    Map<String, Object> columnMap = HbaseResultUtils.parseResult(result);
+                    list.add(columnMap);
+                    if (list.size() >= batchSize) {
+                        func.accept(list);
+                        list.clear();
+                        log.info("current rowkey: {}", columnMap.getOrDefault("rowkey", null));
+                    }
+                }
+                if (!list.isEmpty()) {
+                    func.accept(list);
+                    list.clear();
+                }
+                log.info("{} 执行完成!", tn);
+            } catch (Exception e) {
+                log.error(e.getMessage(), e);
+            }
+        }
+
+
+    }
+
+    @SneakyThrows
+    public void scan() {
+        try (RegionLocator regionLocator = connection.getRegionLocator(tableName);) {
+
+            List<HRegionLocation> allRegionLocations = regionLocator.getAllRegionLocations();
+
+            int poolSize = 10;
+            ArrayBlockingQueue<Runnable> objects = new ArrayBlockingQueue<>(poolSize);
+            ThreadPoolExecutor executorService = new ThreadPoolExecutor(
+                    poolSize, poolSize,
+                    0L, TimeUnit.MILLISECONDS,
+                    objects, // 未处理的任务的等待队列
+                    new ThreadFactoryBuilder().setNameFormat("ScanHbase-pool-").build(),
+                    (r, executor) -> {
+                        try {
+                            executor.getQueue().put(r);
+                        } catch (InterruptedException e) {
+                            throw new RejectedExecutionException("interrupted", e);
+                        }
+                    }
+            );
+
+            for (int i = 0; i < allRegionLocations.size(); i++) {
+                HRegionLocation hRegionLocation = allRegionLocations.get(i);
+                log.info("region host: {}", hRegionLocation.getHostname());
+                RegionInfo regionInfo = hRegionLocation.getRegion();
+
+                String regionName = regionInfo.getRegionNameAsString();
+                byte[] startKey = regionInfo.getStartKey();
+                byte[] endKey = regionInfo.getEndKey();
+                String tn = regionName + "-" + getKeyAsString(startKey) + "_" + getKeyAsString(endKey);
+                Scan scan = new Scan();
+                if (startKey.length != 0) {
+                    scan.withStartRow(startKey);
+                }
+                if (endKey.length != 0) {
+                    scan.withStopRow(endKey);
+                }
+                new Task(regionInfo).start();
+
+//            executorService.submit(new Task(regionInfo));
+          /*  executorService.submit(() -> {
+                log.info("start。。。");
+                List<Map<String, Object>> list = new ArrayList<>();
+                try (Table table = connection.getTable(tableName)) {
+
+                    scan.setCaching(batchSize);
+
+                    ResultScanner scanner = table.getScanner(scan);
+                    for (Result result : scanner) {
+                        Map<String, Object> columnMap = HbaseResultUtils.parseResult(result);
+                        list.add(columnMap);
+                        if (list.size() >= batchSize) {
+                            func.accept(list);
+                            list.clear();
+                            log.info("current rowkey: {}", columnMap.getOrDefault("rowkey", null));
+                        }
+                    }
+                    if (!list.isEmpty()) {
+                        func.accept(list);
+                        list.clear();
+                    }
+                    log.info("{} 执行完成!", tn);
+                } catch (Exception e) {
+                    log.error(e.getMessage(), e);
+                }
+            });*/
+
+            }
+        }
+    }
+
+    private static String getKeyAsString(byte[] b) {
+        if (b.length == 0) {
+            return "0";
+        } else {
+            return Bytes.toString(b);
+        }
+    }
+}

+ 1 - 1
src/main/java/com/winhc/phoenix/example/job/DeleteHbaseByMongoJob.java

@@ -29,7 +29,7 @@ public class DeleteHbaseByMongoJob {
         Consumer<List<Document>> func = list -> {
             List<String> cids = list.stream().map(d -> d.getString("id")).collect(Collectors.toList());
             System.out.println(cids);
-//            hbaseOperationService.deleteByRowkey("COMPANY_DYNAMIC", cids);
+            hbaseOperationService.deleteByRowkey("COMPANY_DYNAMIC", cids);
         };
 
         MongoDbFastScan mongoDbFastScan = new MongoDbFastScan("xjk_tmp_del_dyn", func, db);

+ 31 - 0
src/main/java/com/winhc/phoenix/example/job/HbaseScanJob.java

@@ -0,0 +1,31 @@
+package com.winhc.phoenix.example.job;
+
+import com.winhc.phoenix.example.framework.hbase.HbaseFastScan;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.hadoop.hbase.client.Connection;
+import org.springframework.stereotype.Component;
+
+import java.util.List;
+import java.util.Map;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/**
+ * @author: XuJiakai
+ * 2020/11/16 14:33
+ */
+@Slf4j
+@Component
+@AllArgsConstructor
+public class HbaseScanJob {
+    private final Connection connection;
+
+    public void start() {
+        Consumer<List<Map<String, Object>>> func = list -> {
+            List<String> cids = list.stream().map(d -> (String)d.get("rowkey")).collect(Collectors.toList());
+            System.out.println(cids);
+        };
+        new HbaseFastScan(connection, "COMPANY_TAX_CONTRAVENTION", func).scan();
+    }
+}

+ 36 - 0
src/main/java/com/winhc/phoenix/example/service/HbaseOperationService.java

@@ -9,8 +9,44 @@ import java.util.List;
  */
 public interface HbaseOperationService {
 
+    /**
+     * 批量删除某一行数据
+     *
+     * @param tableName
+     * @param ids
+     * @return
+     */
     boolean deleteByRowkey(String tableName, List<String> ids);
 
+    /**
+     * 删除某一行数据
+     *
+     * @param tableName
+     * @param id
+     * @return
+     */
     boolean deleteByRowkey(String tableName, String id);
 
+    /**
+     * 批量删除某一列数据
+     *
+     * @param tableName
+     * @param ids
+     * @param family
+     * @param qualifier
+     * @return
+     */
+    boolean deleteColsByRowkey(String tableName, List<String> ids, String family, String qualifier);
+
+    /**
+     * 删除某一列数据
+     *
+     * @param tableName
+     * @param ids
+     * @param family
+     * @param qualifier
+     * @return
+     */
+    boolean deleteColsByRowkey(String tableName, String ids, String family, String qualifier);
+
 }

+ 28 - 0
src/main/java/com/winhc/phoenix/example/service/impl/HbaseOperationServiceImpl.java

@@ -47,4 +47,32 @@ public class HbaseOperationServiceImpl implements HbaseOperationService {
         }
         return true;
     }
+
+    @SneakyThrows
+    @Override
+    public boolean deleteColsByRowkey(String tableName, List<String> ids, String family, String qualifier) {
+        final byte[] f = family.getBytes();
+        final byte[] c = qualifier.getBytes();
+        List<Delete> deletes = ids.stream()
+                .map(String::getBytes)
+                .map(Delete::new)
+                .peek(delete -> delete.addColumns(f, c))
+                .collect(Collectors.toList());
+        try (Table table = connection.getTable(TableName.valueOf(tableName))) {
+            log.info("deletes size:{}", deletes.size());
+            table.delete(deletes);
+        }
+        return true;
+    }
+
+    @SneakyThrows
+    @Override
+    public boolean deleteColsByRowkey(String tableName, String id, String family, String qualifier) {
+        try (Table table = connection.getTable(TableName.valueOf(tableName))) {
+            Delete delete = new Delete(id.getBytes());
+            delete.addColumns(family.getBytes(), qualifier.getBytes());
+            table.delete(delete);
+        }
+        return true;
+    }
 }