|
@@ -9,6 +9,7 @@ import org.apache.hadoop.hbase.TableName;
|
|
|
import org.apache.hadoop.hbase.client.*;
|
|
|
import org.apache.hadoop.hbase.util.Bytes;
|
|
|
|
|
|
+import java.io.IOException;
|
|
|
import java.util.ArrayList;
|
|
|
import java.util.List;
|
|
|
import java.util.Map;
|
|
@@ -17,6 +18,7 @@ import java.util.concurrent.RejectedExecutionException;
|
|
|
import java.util.concurrent.ThreadPoolExecutor;
|
|
|
import java.util.concurrent.TimeUnit;
|
|
|
import java.util.function.Consumer;
|
|
|
+import java.util.function.Supplier;
|
|
|
|
|
|
/**
|
|
|
* @author: XuJiakai
|
|
@@ -27,13 +29,14 @@ public class HbaseFastScan {
|
|
|
private int thread_num = 1;
|
|
|
private int batchSize = 500;
|
|
|
|
|
|
- private Connection connection;
|
|
|
+ // private Connection connection;
|
|
|
private TableName tableName;
|
|
|
|
|
|
- private Consumer<List<Map<String, Object>>> func;
|
|
|
+ private Consumer<List<Map<String, String>>> func;
|
|
|
+ private Supplier<Connection> supplier;
|
|
|
|
|
|
- public HbaseFastScan(Connection connection, String tableName, Consumer<List<Map<String, Object>>> func) {
|
|
|
- this.connection = connection;
|
|
|
+ public HbaseFastScan(Supplier<Connection> supplier, String tableName, Consumer<List<Map<String, String>>> func) {
|
|
|
+ this.supplier = supplier;
|
|
|
this.tableName = TableName.valueOf(tableName);
|
|
|
this.func = func;
|
|
|
}
|
|
@@ -53,20 +56,22 @@ public class HbaseFastScan {
|
|
|
private byte[] endKey;
|
|
|
private String regionName;
|
|
|
private String tn;
|
|
|
+ private Connection connection;
|
|
|
|
|
|
- public Task(RegionInfo regionInfo) {
|
|
|
+ public Task(Supplier<Connection> supplier, RegionInfo regionInfo) {
|
|
|
this.regionName = regionInfo.getRegionNameAsString();
|
|
|
this.startKey = regionInfo.getStartKey();
|
|
|
this.endKey = regionInfo.getEndKey();
|
|
|
this.tn = regionName + "-" + getKeyAsString(startKey) + "_" + getKeyAsString(endKey);
|
|
|
log.info("{}:{} {}", regionName, getKeyAsString(startKey), getKeyAsString(endKey));
|
|
|
setName(tn);
|
|
|
+ this.connection = supplier.get();
|
|
|
}
|
|
|
|
|
|
@Override
|
|
|
public void run() {
|
|
|
log.info("start。。。");
|
|
|
- List<Map<String, Object>> list = new ArrayList<>();
|
|
|
+ List<Map<String, String>> list = new ArrayList<>();
|
|
|
try (Table table = connection.getTable(tableName)) {
|
|
|
Scan scan = new Scan();
|
|
|
if (startKey.length != 0) {
|
|
@@ -79,7 +84,7 @@ public class HbaseFastScan {
|
|
|
|
|
|
ResultScanner scanner = table.getScanner(scan);
|
|
|
for (Result result : scanner) {
|
|
|
- Map<String, Object> columnMap = HbaseResultUtils.parseResult(result);
|
|
|
+ Map<String, String> columnMap = HbaseResultUtils.parseResult(result);
|
|
|
list.add(columnMap);
|
|
|
if (list.size() >= batchSize) {
|
|
|
func.accept(list);
|
|
@@ -94,19 +99,27 @@ public class HbaseFastScan {
|
|
|
log.info("{} 执行完成!", tn);
|
|
|
} catch (Exception e) {
|
|
|
log.error(e.getMessage(), e);
|
|
|
+ } finally {
|
|
|
+
|
|
|
+ try {
|
|
|
+ connection.close();
|
|
|
+ } catch (IOException e) {
|
|
|
+ log.error(e.getMessage(), e);
|
|
|
+ connection = null;
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-
|
|
|
}
|
|
|
|
|
|
@SneakyThrows
|
|
|
public void scan() {
|
|
|
+ Connection connection = supplier.get();
|
|
|
try (RegionLocator regionLocator = connection.getRegionLocator(tableName);) {
|
|
|
|
|
|
List<HRegionLocation> allRegionLocations = regionLocator.getAllRegionLocations();
|
|
|
|
|
|
- int poolSize = 10;
|
|
|
+ int poolSize = thread_num;
|
|
|
ArrayBlockingQueue<Runnable> objects = new ArrayBlockingQueue<>(poolSize);
|
|
|
ThreadPoolExecutor executorService = new ThreadPoolExecutor(
|
|
|
poolSize, poolSize,
|
|
@@ -127,10 +140,8 @@ public class HbaseFastScan {
|
|
|
log.info("region host: {}", hRegionLocation.getHostname());
|
|
|
RegionInfo regionInfo = hRegionLocation.getRegion();
|
|
|
|
|
|
- String regionName = regionInfo.getRegionNameAsString();
|
|
|
byte[] startKey = regionInfo.getStartKey();
|
|
|
byte[] endKey = regionInfo.getEndKey();
|
|
|
- String tn = regionName + "-" + getKeyAsString(startKey) + "_" + getKeyAsString(endKey);
|
|
|
Scan scan = new Scan();
|
|
|
if (startKey.length != 0) {
|
|
|
scan.withStartRow(startKey);
|
|
@@ -138,37 +149,10 @@ public class HbaseFastScan {
|
|
|
if (endKey.length != 0) {
|
|
|
scan.withStopRow(endKey);
|
|
|
}
|
|
|
- new Task(regionInfo).start();
|
|
|
-
|
|
|
-// executorService.submit(new Task(regionInfo));
|
|
|
- /* executorService.submit(() -> {
|
|
|
- log.info("start。。。");
|
|
|
- List<Map<String, Object>> list = new ArrayList<>();
|
|
|
- try (Table table = connection.getTable(tableName)) {
|
|
|
-
|
|
|
- scan.setCaching(batchSize);
|
|
|
-
|
|
|
- ResultScanner scanner = table.getScanner(scan);
|
|
|
- for (Result result : scanner) {
|
|
|
- Map<String, Object> columnMap = HbaseResultUtils.parseResult(result);
|
|
|
- list.add(columnMap);
|
|
|
- if (list.size() >= batchSize) {
|
|
|
- func.accept(list);
|
|
|
- list.clear();
|
|
|
- log.info("current rowkey: {}", columnMap.getOrDefault("rowkey", null));
|
|
|
- }
|
|
|
- }
|
|
|
- if (!list.isEmpty()) {
|
|
|
- func.accept(list);
|
|
|
- list.clear();
|
|
|
- }
|
|
|
- log.info("{} 执行完成!", tn);
|
|
|
- } catch (Exception e) {
|
|
|
- log.error(e.getMessage(), e);
|
|
|
- }
|
|
|
- });*/
|
|
|
-
|
|
|
+ executorService.submit(new Task(supplier, regionInfo));
|
|
|
}
|
|
|
+ } finally {
|
|
|
+ connection.close();
|
|
|
}
|
|
|
}
|
|
|
|