Bläddra i källkod

全维度样例输出

xufei 2 år sedan
förälder
incheckning
0ed9c61a69

Filskillnaden har hållts tillbaka eftersom den är för stor
+ 2 - 1
src/main/java/com/winhc/task/common/Constant.java


+ 65 - 0
src/main/java/com/winhc/task/common/SummaryArgs.java

@@ -1000,6 +1000,9 @@ public class SummaryArgs {
                 , new ArgsInfo("defendant"
                         , "defendant"
                         , "deleted")
+                , new ArgsInfo("third"
+                        , "third"
+                        , "deleted")
         )
                 , Arrays.asList(new ValueAlias("plaintiff"
                         , "0"
@@ -1013,11 +1016,18 @@ public class SummaryArgs {
                 , new ValueAlias("defendant"
                         , "1"
                         , "wenshu_detail_v2_del_1_defendant")
+                , new ValueAlias("third"
+                        , "0"
+                        , "wenshu_detail_v2_del_0_third")
+                , new ValueAlias("third"
+                        , "1"
+                        , "wenshu_detail_v2_del_1_third")
         )
                 , (j) -> {
             Set<String> set = new HashSet<>();
             set.addAll(parseJsonArray(j.getString("plaintiff_info"), "$.litigant_id"));
             set.addAll(parseJsonArray(j.getString("defendant_info"), "$.litigant_id"));
+            set.addAll(parseJsonArray(j.getString("third_party"), "$.litigant_id"));
             return new ArrayList<>(set);
         }
         ));
@@ -1364,6 +1374,61 @@ public class SummaryArgs {
                         , "company_profile_del_1"))
                 , (j) -> Collections.singletonList(j.getString("company_id"))
         ));
+        a.put("company_ipr_pledge", new SummaryArgs(
+                "ng_rt_company_ipr_pledge"
+                , Arrays.asList(new ArgsInfo(
+                "related"
+                , "related"
+                , "deleted"), new ArgsInfo(
+                "pledgor"
+                , "pledgor"
+                , "deleted"), new ArgsInfo(
+                "pledgee"
+                , "pledgee"
+                , "deleted"))
+                , Arrays.asList(new ValueAlias("related"
+                        , "0"
+                        , "company_ipr_pledge_del_0_related")
+                , new ValueAlias("related"
+                        , "1"
+                        , "company_ipr_pledge_del_1_related")
+                , new ValueAlias("pledgor"
+                        , "0"
+                        , "company_ipr_pledge_del_0_pledgor")
+                , new ValueAlias("pledgor"
+                        , "1"
+                        , "company_ipr_pledge_del_1_pledgor")
+                , new ValueAlias("pledgee"
+                        , "0"
+                        , "company_ipr_pledge_del_0_pledgee")
+                , new ValueAlias("pledgee"
+                        , "1"
+                        , "company_ipr_pledge_del_1_pledgee")
+        )
+                , (j) -> {
+            Set<String> set = new HashSet<>();
+            set.add(j.getString("related_company_id"));
+            set.addAll(parseJsonArray(j.getString("pledgor_info"), "$.pledgor_id"));
+            set.addAll(parseJsonArray(j.getString("pledgee_info"), "$.pledgee_id"));
+            return new ArrayList<>(set);
+        }
+        ));
+        a.put("company_certificate", new SummaryArgs(
+                "ng_rt_company_certificate"
+                , Collections.singletonList(new ArgsInfo(
+                "company_id"
+                , ""
+                , "deleted"))
+                , Arrays.asList(new ValueAlias(
+                        ""
+                        , "0"
+                        , "company_certificate_del_0"),
+                new ValueAlias(
+                        ""
+                        , "1"
+                        , "company_certificate_del_1"))
+                , (j) -> Collections.singletonList(j.getString("company_id"))
+        ));
         return a;
     }
 

+ 46 - 0
src/main/java/com/winhc/task/listener/NoModelDataListener.java

@@ -0,0 +1,46 @@
+package com.winhc.task.listener;
+
+import com.alibaba.excel.context.AnalysisContext;
+import com.alibaba.excel.event.AnalysisEventListener;
+import com.alibaba.excel.util.ListUtils;
+import com.alibaba.fastjson.JSON;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+@Slf4j
+public class NoModelDataListener extends AnalysisEventListener<Map<Integer, String>> {
+    /**
+     * 每隔5条存储数据库,实际使用中可以100条,然后清理list ,方便内存回收
+     */
+    private List<Map<Integer, String>> cachedDataList;
+
+    public NoModelDataListener(List<Map<Integer, String>> cachedDataList) {
+        this.cachedDataList = cachedDataList;
+    }
+
+    @Override
+    public void invoke(Map<Integer, String> data, AnalysisContext context) {
+//        log.info("解析到一条数据:{}", JSON.toJSONString(data));
+        cachedDataList.add(data);
+    }
+
+    @Override
+    public void doAfterAllAnalysed(AnalysisContext context) {
+        log.info("所有数据解析完成!");
+    }
+
+    /**
+     * 加上存储数据库
+     */
+    private void saveData() {
+        log.info("{}条数据,开始存储数据库!", cachedDataList.size());
+        log.info("存储数据库成功!");
+    }
+
+    public List<Map<Integer, String>> getCachedDataList() {
+        return cachedDataList;
+    }
+}

+ 12 - 0
src/main/resources/templates/merge_sql.ftl

@@ -0,0 +1,12 @@
+
+DROP TABLE if EXISTS tmp_xf_${tn}_merge_sj;
+CREATE TABLE tmp_xf_${tn}_merge_sj as
+SELECT * FROM (
+SELECT *,ROW_NUMBER () OVER (PARTITION by rowkey order by ds desc,update_time desc) num
+FROM (
+SELECT * FROM ads_${tn}_v9 WHERE ds > 0
+UNION ALL
+SELECT * FROM inc_ads_${tn}_v9 WHERE ds > 0
+)
+) WHERE num = 1
+;

+ 9 - 0
src/main/resources/templates/merge_sql_mapping_v1.ftl

@@ -0,0 +1,9 @@
+
+DROP TABLE if EXISTS tmp_xf_${tn}_sj_export;
+CREATE TABLE tmp_xf_${tn}_sj_export as
+SELECT ${cols} FROM (
+SELECT * FROM tmp_xf_${tn}_merge_sj
+) a JOIN (
+SELECT rowkey FROM ads_all_rowkey_tn_sj WHERE tn = '${tn}' GROUP by rowkey
+) b on a.rowkey = b.rowkey
+;

+ 9 - 0
src/main/resources/templates/merge_sql_mapping_v2.ftl

@@ -0,0 +1,9 @@
+
+DROP TABLE if EXISTS tmp_xf_${tn}_sj_export;
+CREATE TABLE tmp_xf_${tn}_sj_export as
+SELECT ${cols} FROM (
+SELECT * FROM tmp_xf_${tn}_merge_sj
+) a JOIN (
+SELECT rowkey FROM ads_all_rowkey_tn_sj WHERE tn = 'company_mortgage_info' GROUP by rowkey
+) b on a.main_id = b.rowkey
+;

+ 4 - 0
src/main/resources/templates/sj_mapping_v1.ftl2

@@ -0,0 +1,4 @@
+SELECT  unnest(${keyno}) AS id
+        ,rowkey
+FROM    ng_rt_${tn}
+WHERE   deleted <> '9'

+ 15 - 0
src/main/resources/templates/sj_mapping_v2.ftl2

@@ -0,0 +1,15 @@
+
+INSERT INTO tmp_xf_all_rowkey_tn
+SELECT  a.rowkey
+        ,'${tn}' AS tn
+FROM    (
+        ${table_view}
+        ) a
+JOIN    (
+            SELECT  company_id as id
+            FROM    tmp_xf_company_id_sj_view
+            ${human_pid_view}
+        ) b
+ON      a.id = b.id
+WHERE   LENGTH(a.id) = 32
+GROUP BY rowkey

+ 542 - 0
src/test/java/com/winhc/task/CompanyHumanJob.java

@@ -0,0 +1,542 @@
+package com.winhc.task;
+
+import com.alibaba.excel.EasyExcel;
+import com.alibaba.fastjson.JSONObject;
+import com.aliyun.odps.Column;
+import com.winhc.task.bean.Summary;
+import com.winhc.task.common.Constant;
+import com.winhc.task.listener.NoModelDataListener;
+import com.winhc.task.util.BaseUtils;
+import com.winhc.task.util.EasyExcelUtil;
+import com.winhc.task.util.OdpsUtils;
+import com.winhc.task.util.QueryBuilderUtils;
+import lombok.val;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Get;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.index.query.WrapperQueryBuilder;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.boot.test.context.SpringBootTest;
+
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.*;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static com.winhc.task.util.BaseUtils.cleanup;
+import static com.winhc.task.util.BaseUtils.toDBC;
+
+/**
+ * @author π
+ * @Description:
+ * @date 2023/5/30 9:15
+ */
+@SpringBootTest
+public class CompanyHumanJob {
+
+    @Autowired
+    @Qualifier(value = "v6")
+    RestHighLevelClient getClient;
+
+    @Autowired
+    @Qualifier(value = "v5")
+    RestHighLevelClient getOldClient;
+
+    @Test
+    public void start1() throws Exception {
+
+        String fileName = "C:\\Users\\batmr\\Desktop\\yhc\\人企测试样本(3144条)(1).xlsx";
+        // 这里 只要,然后读取第一个sheet
+        List<Map<Integer, String>> dataList = new ArrayList<>();
+        EasyExcel.read(fileName, new NoModelDataListener(dataList)).sheet().doRead();
+        System.out.println(dataList.size());
+
+        val futures = new ArrayList<CompletableFuture<JSONObject>>();
+
+        dataList.stream().map(m -> {
+            JSONObject j = new JSONObject();
+            String company_name = m.get(0);
+            String human_name = m.get(1);
+            j.fluentPut("company_name", company_name)
+                    .fluentPut("human_name", human_name);
+            return j;
+        }).forEach(d -> {
+            futures.add(fetchEs(d));
+        });
+
+        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
+        List<JSONObject> re = futures.stream().map(x -> {
+            try {
+                return x.get();
+            } catch (Exception e) {
+                throw new RuntimeException("error 1111 :" + e.getMessage(), e);
+            }
+        }).filter(Objects::nonNull).collect(Collectors.toList());
+
+        val futures2 = new ArrayList<CompletableFuture<JSONObject>>();
+
+        re.forEach(t -> {
+            futures2.add(fetchOldEs(t));
+        });
+
+        CompletableFuture.allOf(futures2.toArray(new CompletableFuture[0])).get();
+        List<JSONObject> re2 = futures2.stream().map(x -> {
+            try {
+                return x.get();
+            } catch (Exception e) {
+                throw new RuntimeException("error 1111 :" + e.getMessage(), e);
+            }
+        }).filter(Objects::nonNull).collect(Collectors.toList());
+
+        saveExcel(re2);
+    }
+
+    @Test
+    public void start2() throws Exception {
+
+        String fileName = "C:\\Users\\batmr\\Desktop\\yhc\\人企测试样本_company_id.xlsx";
+        // 这里 只要,然后读取第一个sheet
+        List<Map<Integer, String>> dataList = new ArrayList<>();
+        EasyExcel.read(fileName, new NoModelDataListener(dataList)).sheet().doRead();
+        System.out.println(dataList.size());
+
+        //判断人员类型
+        val futures = new ArrayList<CompletableFuture<List<JSONObject>>>();
+        dataList.stream().map(m -> {
+            JSONObject j = new JSONObject();
+            String company_name = m.get(0);
+            String human_name = m.get(1);
+            String company_id = m.get(2);
+            String human_pid = m.get(3);
+            j.fluentPut("company_name", company_name)
+                    .fluentPut("human_name", human_name)
+                    .fluentPut("company_id", company_id)
+                    .fluentPut("human_pid", human_pid)
+            ;
+            return j;
+        }).forEach(d -> {
+            try {
+                futures.add(fetchAllCompany(d));
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+        });
+
+
+        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
+        List<JSONObject> re = futures.stream().flatMap(x -> {
+            try {
+                return x.get().stream();
+            } catch (Exception e) {
+                throw new RuntimeException("error 1111 :" + e.getMessage(), e);
+            }
+        }).filter(Objects::nonNull).collect(Collectors.toList());
+
+        val futures2 = new ArrayList<CompletableFuture<JSONObject>>();
+
+        re.forEach(t -> {
+            futures2.add(fetchEsById(t));
+        });
+
+        CompletableFuture.allOf(futures2.toArray(new CompletableFuture[0])).get();
+        List<JSONObject> re2 = futures2.stream().map(x -> {
+            try {
+                return x.get();
+            } catch (Exception e) {
+                throw new RuntimeException("error 1111 :" + e.getMessage(), e);
+            }
+        }).filter(Objects::nonNull).collect(Collectors.toList());
+
+        saveExcelV2(re2);
+    }
+
+    private CompletableFuture<JSONObject> fetchEsById(JSONObject data) {
+        return CompletableFuture.supplyAsync(() -> {
+            try {
+                String sub_company_id = data.getString("sub_company_id");
+                String sub_company_name = queryCompanyNameByCompanyId(sub_company_id);
+                data.fluentPut("sub_company_name", sub_company_name);
+                return data;
+            } catch (Exception e) {
+                e.printStackTrace();
+                data.fluentPut("sub_company_name", "");
+                return data;
+            }
+        }, BaseUtils.COMMON_POOL);
+
+    }
+
+    private CompletableFuture<List<JSONObject>> fetchAllCompany(JSONObject data) throws IOException {
+
+        return CompletableFuture.supplyAsync(() -> {
+            try {
+                String human_pid = data.getString("human_pid");
+                if (StringUtils.isBlank(human_pid)) return new ArrayList<>();
+                String tmp_data = JSONObject.toJSONString(data);
+                Map<String, Set<String>> map = queryAllCompany(human_pid);
+                List<JSONObject> re = map.entrySet().stream().map(m -> {
+                    String company_id = m.getKey();
+                    Set<String> value = m.getValue();
+                    JSONObject tj = JSONObject.parseObject(tmp_data);
+                    String labels = String.join("|", value);
+                    tj.fluentPut("label", labels);
+                    tj.fluentPut("sub_company_id", company_id);
+                    return tj;
+                }).collect(Collectors.toList());
+
+                return re;
+            } catch (Exception e) {
+                e.printStackTrace();
+                return new ArrayList<>();
+            }
+        }, BaseUtils.COMMON_POOL);
+    }
+
+    private CompletableFuture<JSONObject> fetchEs(JSONObject data) {
+        return CompletableFuture.supplyAsync(() -> {
+            try {
+                String company_name = data.getString("company_name");
+                String human_name = data.getString("human_name");
+                if ("3M中国有限公司".equals(company_name)) {
+                    company_name = toDBC(company_name);
+                }
+                String company_id = queryCompanyId(company_name, human_name);
+                data.fluentPut("company_id", company_id);
+                return data;
+            } catch (Exception e) {
+                e.printStackTrace();
+                data.fluentPut("company_id", "");
+                return data;
+            }
+        }, BaseUtils.COMMON_POOL);
+
+    }
+
+    private CompletableFuture<JSONObject> fetchOldEs(JSONObject data) {
+        return CompletableFuture.supplyAsync(() -> {
+            try {
+                String company_id = data.getString("company_id");
+                if (StringUtils.isBlank(company_id)) return data;
+                String human_name = data.getString("human_name");
+                String human_pid = queryPid(company_id, human_name);
+                data.fluentPut("human_pid", human_pid);
+                return data;
+            } catch (Exception e) {
+                e.printStackTrace();
+                data.fluentPut("human_pid", "");
+                return data;
+            }
+        }, BaseUtils.COMMON_POOL);
+
+    }
+
+    private void saveExcel(List<JSONObject> re) {
+        List<List<String>> dataList = re.stream().map(d -> {
+            List<String> dataRow = new ArrayList<>();
+            dataRow.add(d.getString("company_name"));
+            dataRow.add(d.getString("human_name"));
+            dataRow.add(d.getString("company_id"));
+            dataRow.add(d.getString("human_pid"));
+            return dataRow;
+        }).collect(Collectors.toList());
+
+        EasyExcelUtil easyExcelUtil = new EasyExcelUtil();
+
+        String fileName = "C:\\Users\\batmr\\Desktop\\yhc\\人企测试样本_company_id.xlsx";
+
+        List<List<String>> head = Stream.of("企业名称", "法定代表人", "company_id", "human_pid")
+                .map(Collections::singletonList)
+                .collect(Collectors.toList());
+        easyExcelUtil.init(fileName, "sheet1", head);
+        easyExcelUtil.doExportExcel(dataList);
+        //关闭流
+        easyExcelUtil.finish();
+    }
+
+    private void saveExcelV2(List<JSONObject> re) {
+        List<List<String>> dataList = re.stream().map(d -> {
+            List<String> dataRow = new ArrayList<>();
+            dataRow.add(d.getString("company_name"));
+            dataRow.add(d.getString("human_name"));
+            dataRow.add(d.getString("company_id"));
+            dataRow.add(d.getString("human_pid"));
+            dataRow.add(d.getString("sub_company_id"));
+            dataRow.add(d.getString("sub_company_name"));
+            dataRow.add(d.getString("label"));
+            return dataRow;
+        }).collect(Collectors.toList());
+
+        EasyExcelUtil easyExcelUtil = new EasyExcelUtil();
+
+        String fileName = "C:\\Users\\batmr\\Desktop\\yhc\\人企测试样本_label.xlsx";
+
+        List<List<String>> head = Stream.of("企业名称", "法定代表人", "company_id", "human_pid", "sub_company_id", "sub_company_name", "label")
+                .map(Collections::singletonList)
+                .collect(Collectors.toList());
+        easyExcelUtil.init(fileName, "sheet1", head);
+        easyExcelUtil.doExportExcel(dataList);
+        //关闭流
+        easyExcelUtil.finish();
+    }
+
+    private Map<String, Set<String>> queryAllCompany(String pid) throws IOException {
+        if (StringUtils.isBlank(pid)) {
+            return new HashMap<>();
+        }
+        String dsl = "{\n" +
+                "  \"query\": {\n" +
+                "    \"bool\": {\n" +
+                "      \"must\": [\n" +
+                "        {\n" +
+                "          \"terms\": {\n" +
+                "            \"deleted\": [\n" +
+                "              \"0\",\n" +
+                "              \"1\"\n" +
+                "            ]\n" +
+                "          }\n" +
+                "        },\n" +
+                "        {\n" +
+                "          \"bool\": {\n" +
+                "            \"should\": [\n" +
+                "              {\n" +
+                "                \"term\": {\n" +
+                "                  \"legal_entities.id\": {\n" +
+                "                    \"value\": \"&condition\"\n" +
+                "                  }\n" +
+                "                }\n" +
+                "              },\n" +
+                "              {\n" +
+                "                \"term\": {\n" +
+                "                  \"holder_id\": {\n" +
+                "                    \"value\": \"&condition\"\n" +
+                "                  }\n" +
+                "                }\n" +
+                "              },\n" +
+                "              {\n" +
+                "                \"term\": {\n" +
+                "                  \"hid\": {\n" +
+                "                    \"value\": \"&condition\"\n" +
+                "                  }\n" +
+                "                }\n" +
+                "              }\n" +
+                "            ],\n" +
+                "            \"minimum_should_match\": 1\n" +
+                "          }\n" +
+                "        }\n" +
+                "      ]\n" +
+                "    }\n" +
+                "  }\n" +
+                "}";
+
+        String ds1 = dsl.replaceAll("&condition", pid);
+        SearchRequest searchRequest = new SearchRequest("winhc_index_rt_company", "winhc_index_rt_company_holder", "winhc_index_rt_company_staff");
+        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
+        searchSourceBuilder.size(10000);
+        if (StringUtils.isNotEmpty(ds1)) {
+            WrapperQueryBuilder query = QueryBuilderUtils.getQuery(JSONObject.parseObject(ds1));
+            searchSourceBuilder.query(query);
+        }
+        searchRequest.source(searchSourceBuilder);
+        SearchResponse searchResponse = getClient.search(searchRequest);
+        SearchHit[] hits = searchResponse.getHits().getHits();
+
+        Map<String, Set<String>> data_tmp = new HashMap<>();
+
+        Arrays.stream(hits).forEach(m -> {
+            String rowkey = m.getId();
+            String index = m.getIndex();
+            Map<String, Object> map = m.getSourceAsMap();
+            String deleted = map.getOrDefault("deleted", "").toString();
+            Set<String> set = new HashSet<>();
+
+            String company_id;
+            if (index.contains("company_holder")) {
+                if (deleted.equals("0")) {
+                    set.add("股东");
+                } else {
+                    set.add("历史股东");
+                }
+                company_id = map.getOrDefault("company_id", "").toString();
+
+            } else if (index.contains("company_staff")) {
+                if (deleted.equals("0")) {
+                    set.add("高管");
+                } else {
+                    set.add("历史高管");
+                }
+                company_id = map.getOrDefault("company_id", "").toString();
+
+            } else {
+                if (deleted.equals("0")) {
+                    set.add("法人");
+                } else {
+                    set.add("历史法人");
+                }
+                company_id = rowkey;
+            }
+            if (data_tmp.containsKey(company_id)) {
+                set.addAll(data_tmp.get(company_id));
+            }
+            data_tmp.put(company_id, set);
+        });
+
+        return data_tmp;
+    }
+
+    private String queryCompanyId(String name, String legalName) throws IOException {
+
+        String dsl = "{\n" +
+                "  \"query\": {\n" +
+                "    \"bool\": {\n" +
+                "      \"must\": [\n" +
+                "        {\n" +
+                "          \"term\": {\n" +
+                "            \"deleted\": {\n" +
+                "              \"value\": \"0\"\n" +
+                "            }\n" +
+                "          }\n" +
+                "        },\n" +
+                "        {\n" +
+                "          \"bool\": {\n" +
+                "            \"should\": [\n" +
+                "              {\n" +
+                "                \"term\": {\n" +
+                "                  \"cname.show.keyword\": {\n" +
+                "                    \"value\": \"&condition\"\n" +
+                "                  }\n" +
+                "                }\n" +
+                "              },\n" +
+                "              {\n" +
+                "                \"term\": {\n" +
+                "                  \"history_name.show.keyword\": {\n" +
+                "                    \"value\": \"&condition\"\n" +
+                "                  }\n" +
+                "                }\n" +
+                "              }\n" +
+                "            ],\n" +
+                "            \"minimum_should_match\": 1\n" +
+                "          }\n" +
+                "        }\n" +
+                "      ]\n" +
+                "    }\n" +
+                "  }\n" +
+                "}";
+
+        String ds1 = dsl.replaceAll("&condition", name);
+        SearchRequest searchRequest = new SearchRequest("winhc_index_rt_company").types("company");
+        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
+        searchSourceBuilder.size(10);
+        if (StringUtils.isNotEmpty(ds1)) {
+            WrapperQueryBuilder query = QueryBuilderUtils.getQuery(JSONObject.parseObject(ds1));
+            searchSourceBuilder.query(query);
+        }
+        searchRequest.source(searchSourceBuilder);
+        SearchResponse searchResponse = getClient.search(searchRequest);
+        SearchHit[] hits = searchResponse.getHits().getHits();
+        if (hits == null || hits.length == 0) {
+            return "";
+        } else if (hits.length == 1) {
+            return hits[0].getId();
+        } else {
+            List<String> collect = Arrays.stream(hits).map(m -> {
+                String id = m.getId();
+                Map<String, Object> map = m.getSourceAsMap();
+                Object legal_entities = map.getOrDefault("legal_entities", "");
+                if (legal_entities == null) {
+                    return null;
+                }
+                if (legal_entities.toString().contains(legalName)) {
+                    return id;
+                }
+                return null;
+            }).filter(Objects::nonNull).collect(Collectors.toList());
+            if (collect.isEmpty()) return "";
+            return collect.get(0);
+        }
+    }
+
+    private String queryPid(String companyId, String human_name) throws IOException {
+        String dsl = "{\n" +
+                "  \"query\": {\n" +
+                "    \"bool\": {\n" +
+                "      \"must\": [\n" +
+                "        {\n" +
+                "          \"term\": {\n" +
+                "            \"company_id\": {\n" +
+                "              \"value\": \"&company_id\"\n" +
+                "            }\n" +
+                "          }\n" +
+                "        },{\n" +
+                "          \"term\": {\n" +
+                "            \"human_name_cleanup\": {\n" +
+                "              \"value\": \"&human_name\"\n" +
+                "            }\n" +
+                "          }\n" +
+                "        }\n" +
+                "      ]\n" +
+                "    }\n" +
+                "  }\n" +
+                "}";
+
+        String ds1 = dsl.replaceAll("&company_id", companyId).replaceAll("&human_name", cleanup(human_name));
+        SearchRequest searchRequest = new SearchRequest("winhc_company_human_pid_mapping_v9").types("human");
+        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
+        searchSourceBuilder.size(1);
+        if (StringUtils.isNotEmpty(ds1)) {
+            WrapperQueryBuilder query = QueryBuilderUtils.getQuery(JSONObject.parseObject(ds1));
+            searchSourceBuilder.query(query);
+        }
+        searchRequest.source(searchSourceBuilder);
+        SearchResponse searchResponse = getOldClient.search(searchRequest);
+        List<String> collect = Arrays.stream(searchResponse.getHits().getHits())
+                .map(x -> {
+                    Map<String, Object> sourceAsMap = x.getSourceAsMap();
+                    String human_pid = sourceAsMap.getOrDefault("human_pid", "").toString();
+                    return human_pid;
+                }).collect(Collectors.toList());
+        if (collect.isEmpty()) return "";
+        return collect.get(0);
+    }
+
+    private String queryCompanyNameByCompanyId(String companyId) throws IOException {
+        String dsl = "{\n" +
+                "  \"query\": {\n" +
+                "    \"term\": {\n" +
+                "      \"_id\": {\n" +
+                "        \"value\": \"&company_id\"\n" +
+                "      }\n" +
+                "    }\n" +
+                "  }\n" +
+                "}";
+
+        String ds1 = dsl.replaceAll("&company_id", companyId);
+        SearchRequest searchRequest = new SearchRequest("winhc_index_rt_company").types("company");
+        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
+        searchSourceBuilder.size(1);
+        if (StringUtils.isNotEmpty(ds1)) {
+            WrapperQueryBuilder query = QueryBuilderUtils.getQuery(JSONObject.parseObject(ds1));
+            searchSourceBuilder.query(query);
+        }
+        searchRequest.source(searchSourceBuilder);
+        SearchResponse searchResponse = getClient.search(searchRequest);
+        List<String> collect = Arrays.stream(searchResponse.getHits().getHits())
+                .map(x -> {
+                    Map<String, Object> sourceAsMap = x.getSourceAsMap();
+                    String str = JSONObject.toJSONString(sourceAsMap);
+                    JSONObject c = JSONObject.parseObject(str);
+                    String show_name = c.getJSONObject("cname").getString("show");
+                    return show_name;
+                }).collect(Collectors.toList());
+        if (collect.isEmpty()) return "";
+        return collect.get(0);
+    }
+}

+ 1 - 1
src/test/java/com/winhc/task/DataWorksSummaryJob.java

@@ -35,7 +35,7 @@ public class DataWorksSummaryJob {
 
         //确定重跑集合(集合为空跑全量)
 
-        List<String> add = Arrays.asList("company_equity_pledge_holder");
+        List<String> add = Arrays.asList("zxr_evaluate_results");
         //List<String> add = Arrays.asList("company_copyright_reg","company_copyright_works");
         //List<String> add = Arrays.asList("company_patent","company_copyright_reg","company_copyright_works");
 

+ 0 - 56
src/test/java/com/winhc/task/FreMarkTest.java

@@ -1,56 +0,0 @@
-package com.winhc.task;
-
-import com.winhc.task.util.FreeMarkUtil;
-import org.junit.jupiter.api.Test;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.boot.test.context.SpringBootTest;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-@SpringBootTest
-public class FreMarkTest {
-    @Autowired
-    FreeMarkUtil freeMarkUtil;
-
-    @Test
-    public void start() throws IOException {
-        Map<String, String> m = new HashMap<>();
-        m.put("targetTableName", "test_tmp_xf_sum_v9");
-        m.put("tn", "zxr_evaluate_results");
-        m.put("keyno", "keyno");
-        m.put("groupField", "deleted");
-        m.put("condition", "in ('0','1')");
-        String s = freeMarkUtil.genStr("agg_sample.ftl", m);
-        System.out.println(s);
-    }
-
-    @Test
-    public void start2() throws IOException {
-        List<String> list = Arrays.asList("bankruptcy_judgment_document", "bankruptcy_open_announcement", "bankruptcy_open_case"
-                , "company_court_open_announcement", /*"company_court_announcement",*/ "company_send_announcement"
-                , "company_court_register", "litigation_mediation", "restrictions_on_exit", "auction_tracking"/*,"property_rights_transaction"*/);
-        list.forEach(tn->{
-            Map<String, String> m = new HashMap<>();
-            m.put("tn", tn);
-            String s = freeMarkUtil.genStr("truncate_table.ftl", m);
-            System.out.println(s);
-        });
-    }
-
-    @Test
-    public void start3() throws IOException {
-        List<String> list = Arrays.asList("bankruptcy_judgment_document", "bankruptcy_open_announcement", "bankruptcy_open_case"
-                , "company_court_open_announcement", /*"company_court_announcement",*/ "company_send_announcement"
-                , "company_court_register", "litigation_mediation", "restrictions_on_exit", "auction_tracking"/*,"property_rights_transaction"*/);
-        list.forEach(tn->{
-            Map<String, String> m = new HashMap<>();
-            m.put("tn", tn);
-            String s = freeMarkUtil.genStr("test1.ftl", m);
-            System.out.println(s);
-        });
-    }
-}

+ 210 - 0
src/test/java/com/winhc/task/SJTaskJob.java

@@ -0,0 +1,210 @@
+package com.winhc.task;
+
+import cn.hutool.core.lang.Tuple;
+import com.alibaba.hologres.client.HoloClient;
+import com.alibaba.hologres.client.exception.HoloClientException;
+import com.aliyun.odps.Column;
+import com.aliyun.odps.Odps;
+import com.aliyun.odps.Table;
+import com.aliyun.odps.TableSchema;
+import com.winhc.task.bean.JobArgs;
+import com.winhc.task.common.Constant;
+import com.winhc.task.common.SummaryArgs;
+import com.winhc.task.util.FreeMarkUtil;
+import com.winhc.task.util.HoloUtils;
+import com.winhc.task.util.OdpsUtils;
+import lombok.SneakyThrows;
+import org.apache.calcite.avatica.proto.Common;
+import org.apache.commons.lang3.StringUtils;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+@SpringBootTest
+public class SJTaskJob {
+    @Autowired
+    FreeMarkUtil freeMarkUtil;
+
+    @Test
+    public void start() throws IOException {
+        Map<String, String> m = new HashMap<>();
+        m.put("targetTableName", "test_tmp_xf_sum_v9");
+        m.put("tn", "zxr_evaluate_results");
+        m.put("keyno", "keyno");
+        m.put("groupField", "deleted");
+        m.put("condition", "in ('0','1')");
+        String s = freeMarkUtil.genStr("agg_sample.ftl", m);
+        System.out.println(s);
+    }
+
+    @Test
+    public void start2() throws IOException {
+        List<String> list = Arrays.asList("bankruptcy_judgment_document", "bankruptcy_open_announcement", "bankruptcy_open_case"
+                , "company_court_open_announcement", /*"company_court_announcement",*/ "company_send_announcement"
+                , "company_court_register", "litigation_mediation", "restrictions_on_exit", "auction_tracking"/*,"property_rights_transaction"*/);
+        list.forEach(tn -> {
+            Map<String, String> m = new HashMap<>();
+            m.put("tn", tn);
+            String s = freeMarkUtil.genStr("truncate_table.ftl", m);
+            System.out.println(s);
+        });
+    }
+
+    @Test
+    public void start3() throws IOException {
+        List<String> list = Arrays.asList("bankruptcy_judgment_document", "bankruptcy_open_announcement", "bankruptcy_open_case"
+                , "company_court_open_announcement", /*"company_court_announcement",*/ "company_send_announcement"
+                , "company_court_register", "litigation_mediation", "restrictions_on_exit", "auction_tracking"/*,"property_rights_transaction"*/);
+        list.forEach(tn -> {
+            Map<String, String> m = new HashMap<>();
+            m.put("tn", tn);
+            String s = freeMarkUtil.genStr("test1.ftl", m);
+            System.out.println(s);
+        });
+    }
+
+    public static List<String> tns = Arrays.asList(/*"company",*/
+            "company_holder",
+            "company_staff",
+            "company_change",
+            "company_dishonest_info",
+            "company_zxr",
+            "company_zxr_restrict",
+            "company_zxr_final_case",
+            "company_send_announcement",
+            "wenshu_detail_v2",
+            "company_court_open_announcement",
+            "company_court_register",
+            "company_court_announcement",
+            "company_judicial_assistance",
+            "company_punishment_info",
+            "company_abnormal_info",
+            "company_equity_info",
+            "company_mortgage_info",
+            "company_mortgage_pawn",
+            "company_mortgage_change",
+            "company_mortgage_people",
+            "company_env_punishment",
+            "company_tax_contravention",
+            "company_own_tax",
+            "company_illegal_info",
+            "company_equity_pledge",
+            "company_ipr_pledge",
+            "company_certificate",
+            "company_license",
+            "company_tax");
+
+    @Test
+    public void calcOdpsMerge() throws IOException {
+        tns.forEach(tn -> {
+            Map<String, String> m = new HashMap<>();
+            m.put("tn", tn);
+            String s = freeMarkUtil.genStr("merge_sql.ftl", m);
+            System.out.println(s);
+        });
+    }
+
+    @SneakyThrows
+    @Test
+    public void calcHolo() throws Exception {
+        Map<String, SummaryArgs> a = SummaryArgs.SUMMARY_ARGS;
+        List<String> pid_tns = Arrays.asList("company_dishonest_info",
+                "company_zxr",
+                "company_zxr_restrict",
+                "company_zxr_final_case");
+        List<Tuple> sql_list = tns.stream().filter(a::containsKey).map(tn -> {
+            SummaryArgs args = a.get(tn);
+            String p_sql = args.getArgsInfo().stream().map(x -> {
+                Map<String, String> m = new HashMap<>();
+                m.put("tn", tn);
+                m.put("keyno", x.getFilterField());
+                return freeMarkUtil.genStr("sj_mapping_v1.ftl2", m);
+            }).collect(Collectors.joining("\nUNION ALL\n"));
+
+            Map<String, String> m = new HashMap<>();
+            m.put("table_view", p_sql);
+            m.put("human_pid_view", "");
+            if (pid_tns.contains(tn)) {
+                m.put("human_pid_view", "UNION ALL\n" +
+                        "            SELECT  human_pid as id\n" +
+                        "            FROM    tmp_xf_human_pid_sj_view");
+            }
+            m.put("tn", tn);
+            String calc_sql = freeMarkUtil.genStr("sj_mapping_v2.ftl2", m);
+            return new Tuple(tn, calc_sql);
+        }).collect(Collectors.toList());
+
+        calcHolo(sql_list);
+    }
+
+    private void calcHolo(List<Tuple> collect) throws HoloClientException {
+        String target_table = "tmp_xf_all_rowkey_tn";
+        for (int i = 0; i < collect.size(); i++) {
+            HoloClient holoClient = HoloUtils.init();
+            String s0 = "";
+            //初始化清空table
+            if (i == 0) {
+                s0 = "truncate table " + target_table;
+            }
+            String tn = collect.get(i).get(0).toString();
+            String s1 = "analyze " + target_table;
+            String s2 = "analyze ng_rt_" + tn;
+            Stream.of(s0, s1, s2, collect.get(i).get(1)).filter(StringUtils::isNotBlank).forEach(x -> HoloUtils.exexSql(holoClient, x));
+            holoClient.close();
+        }
+    }
+
+    @Test
+    public void calcOdpsExport() throws IOException {
+        List<String> subTns = Arrays.asList("company_mortgage_pawn", "company_mortgage_change", "company_mortgage_people");
+        tns.forEach(tn -> {
+            Map<String, String> m = new HashMap<>();
+            m.put("tn", tn);
+            String cols = getCols(tn);
+            m.put("cols", cols);
+            String s;
+            if (subTns.contains(tn)) {
+                s = freeMarkUtil.genStr("merge_sql_mapping_v2.ftl", m);
+            } else {
+                s = freeMarkUtil.genStr("merge_sql_mapping_v1.ftl", m);
+            }
+            System.out.println(s);
+        });
+    }
+
+    // 获取表结构
+    public static String getCols(String tn) {
+        String tableName = "ads_" + tn + "_v9";
+        // 初始化odps
+        Odps odps = OdpsUtils.getOdps();
+        // 获取表
+        Table table = odps.tables().get(tableName);
+
+        // 获取表结构
+        TableSchema tableSchema = table.getSchema();
+
+        //获取列名列注释
+        List<Column> columns = tableSchema.getColumns();
+
+        List<String> cols = new ArrayList<>();
+
+        //存在过滤字段跳过
+        List<String> filter_cols = Constant.tn_cols_filters.getOrDefault(tn, new ArrayList<>());
+        //列字段
+        for (Column column : columns) {
+            // 判断如果ds存在,则跳过
+            if (filter_cols.contains(column.getName()) || "ds".equalsIgnoreCase(column.getName())) {
+                continue;
+            }
+
+            cols.add(column.getName());
+        }
+
+        return cols.stream().collect(Collectors.joining(",","a.",""));
+    }
+}

+ 604 - 0
src/test/java/com/winhc/task/SynAllTableSummary.java

@@ -0,0 +1,604 @@
+package com.winhc.task;
+
+import cn.hutool.core.io.FileUtil;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import com.alibaba.hologres.client.HoloClient;
+import com.alibaba.hologres.client.exception.HoloClientException;
+import com.aliyun.odps.Column;
+import com.winhc.task.bean.Summary;
+import com.winhc.task.common.ArgsCompanyJob;
+import com.winhc.task.common.Constant;
+import com.winhc.task.common.SummaryArgs;
+import com.winhc.task.job.EsScanSummaryJob;
+import com.winhc.task.util.*;
+import lombok.val;
+import org.apache.calcite.avatica.proto.Common;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.filter.PageFilter;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.index.query.WrapperQueryBuilder;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.junit.jupiter.api.Test;
+import org.postgresql.model.TableSchema;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.boot.test.context.SpringBootTest;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.*;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static com.winhc.task.common.Constant.hbase_scan_v8;
+import static com.winhc.task.common.Constant.tns_out;
+
+
+@SpringBootTest
+public class SynAllTableSummary {
+    @Autowired
+    EsScanSummaryJob esScanSummaryJob;
+
+    @Autowired
+    Connection connection;
+
+    @Autowired
+    @Qualifier(value = "v6")
+    RestHighLevelClient getClient;
+
+    @Autowired
+    @Qualifier(value = "v5")
+    RestHighLevelClient getOldClient;
+
+
+    @Test
+    public void start() throws IOException {
+        esScanSummaryJob.start();
+    }
+
+    @Test
+    public void start2() throws Exception {
+
+        //计算所有ids
+        esScanSummaryJob.start();
+        //v9 holo->holo 聚合v8 rowkey
+        holo();
+        //v8 es->holo 聚合v9 rowkey
+        es2HoloV8();
+        //v8&v9 holo->hbase->excel 数据导出excel
+        holoAndHbase2Excel();
+        //v8&v9 hbase->excel 子表scan  数据导出excel
+        scanHbase2Excel();
+    }
+
+    @Test
+    public void holo() throws Exception {
+        List<String> tns = SummaryArgs.SUMMARY_ARGS.keySet()
+                .stream()
+                .filter(tns_out::contains)
+                .distinct()
+                .collect(Collectors.toList());
+        String resTable = Constant.holo_res_tab;
+        //清空表
+        HoloClient holoClient = HoloUtils.init();
+        HoloUtils.exexSql(holoClient, "truncate table " + resTable);
+        holoClient.close();
+        tns.forEach(tn -> {
+            try {
+                rowkey2HoloV9(tn, resTable);
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        });
+
+    }
+
+    @Test
+    public void oneTnTable() throws Exception {
+        //清空表
+        HoloClient holoClient = HoloUtils.init();
+        HoloUtils.exexSql(holoClient, "truncate table " + Constant.holo_res_tab);
+        holoClient.close();
+        rowkey2HoloV9("company_holder_sponsor", Constant.holo_res_tab);
+        holoAndHbase2Excel();
+    }
+
+    @Test
+    public void oneTnEs2HoloV8() throws Exception {
+        //清空表
+        HoloClient holoClient = HoloUtils.init();
+        HoloUtils.exexSql(holoClient, "truncate table " + Constant.holo_res_tab);
+        holoClient.close();
+        calc_v8("company_ipr_pledge");
+        holoAndHbase2Excel();
+    }
+
+    public void rowkey2HoloV9(String tn, String resTable) throws HoloClientException {
+
+        SummaryArgs s = SummaryArgs.SUMMARY_ARGS.get(tn);
+        String sql = "INSERT INTO " + resTable + "\n" +
+                "SELECT\n" +
+                "    rowkey, '&tn' as tn\n" +
+                "FROM\n" +
+                "    &tableName\n" +
+                "WHERE\n" +
+                "    &condition\n" +
+                "GROUP BY rowkey";
+
+        String ids = Constant.all_ids.stream().distinct()
+                .collect(Collectors.joining("','", "'", "'"));
+        String condition = s.getArgsInfo().stream()
+                .map(x -> x.getFilterField() + " && " + "ARRAY[" + ids + "]")
+                .collect(Collectors.joining(" or "));
+
+        HoloClient holoClient = HoloUtils.init();
+        String tableName = "ng_rt_" + tn;
+        String calc_sql = sql.replaceAll("&condition", condition)
+                .replaceAll("&tn", tn)
+                .replaceAll("&tableName", tableName);
+        System.out.println(calc_sql);
+        HoloUtils.exexSql(holoClient, calc_sql);
+        holoClient.close();
+
+    }
+
+    @Test
+    public void scanHbase2Excel() throws Exception {
+        List<String> ids = Constant.all_ids;
+        HoloClient holoClient = HoloUtils.init();
+        //补充年报rowkey
+        String tmpSql = "SELECT * FROM " + Constant.holo_res_tab + " where tn ='company_annual_report'";
+        List<Summary> pList = HoloUtils.exec(holoClient, tmpSql, Summary.class);
+        List<String> id_addr = pList.stream().map(Summary::getRowkey).distinct().collect(Collectors.toList());
+        ids.addAll(id_addr);
+        holoClient.close();
+
+        val futures = new ArrayList<CompletableFuture<List<JSONObject>>>();
+        Stream.concat(Constant.hbase_scan_v9.stream(), hbase_scan_v8.stream()).forEach(tn -> {
+            ids.forEach(rowkey -> {
+                CompletableFuture<List<JSONObject>> rew = CompletableFuture.supplyAsync(() -> {
+                    List<JSONObject> re;
+                    String tableName = hbase_scan_v8.contains(tn) ? "NG_" + tn.toUpperCase(Locale.ROOT) : "NG_RT_" + tn.toUpperCase(Locale.ROOT);
+                    try (val table = connection.getTable(TableName.valueOf(tableName))) {
+                        Scan scan = new Scan();
+                        scan.setRowPrefixFilter(rowkey.getBytes());
+                        PageFilter pageFilter = new PageFilter(100);
+                        scan.setFilter(pageFilter);
+                        ResultScanner scanner = table.getScanner(scan);
+                        re = StreamSupport.stream(scanner.spliterator(), false)
+                                .filter(result -> result != null && !result.isEmpty())
+                                .map(BaseUtils::toJSONObjectLowerCase)
+                                .map(j -> j.fluentPut("tn", tn))
+                                .collect(Collectors.toList());
+
+                    } catch (Exception e) {
+                        throw new RuntimeException("fetchHbase error" + e.getMessage(), e);
+                    }
+                    return re;
+                }, BaseUtils.COMMON_POOL);
+                futures.add(rew);
+            });
+        });
+
+        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
+        List<JSONObject> resList = futures.stream().flatMap(x -> {
+            try {
+                return x.get().stream();
+            } catch (Exception e) {
+                throw new RuntimeException(" 222, fetch data error" + e.getMessage(), e);
+            }
+        }).filter(Objects::nonNull).collect(Collectors.toList());
+
+        Map<String, List<JSONObject>> dataList = resList.stream()
+                .collect(Collectors.groupingBy(o -> o.getString("tn")));
+
+        dataList.forEach(this::saveExcel);
+    }
+
+    @Test
+    public void es2HoloV8() throws Exception {
+        List<String> v8_tns = Arrays.asList(
+                "company_ipr_pledge",
+                "company_bid",
+                "company_employment",
+                "company_certificate",
+                "company_customs_credit",
+                "company_bond",
+                "company_tele_license",
+                "company_weibo",
+                "company_wechat");
+        v8_tns.forEach(tn -> {
+            try {
+                calc_v8(tn);
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        });
+    }
+
+
+    public void calc_v8(String tn) throws Exception {
+
+        String dsl = "{\n" +
+                "  \"query\": {\n" +
+                "    \"bool\": {\n" +
+                "      \"should\": [\n" +
+                "        &condition\n" +
+                "      ],\n" +
+                "      \"minimum_should_match\": 1\n" +
+                "    }\n" +
+                "  }\n" +
+                "}";
+
+        List<JSONObject> collect = Constant.es_filters.get(tn).stream()
+                .map(col -> new JSONObject()
+                        .fluentPut("terms", new JSONObject()
+                                .fluentPut(col, JSONArray.parseArray(JSON.toJSONString(Constant.all_ids))))).collect(Collectors.toList());
+        String ds1 = dsl.replaceAll("&condition", JSONObject.toJSONString(collect));
+
+        SearchRequest searchRequest = new SearchRequest("winhc_index_" + tn).types("_doc");
+        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
+        searchSourceBuilder.size(10000);
+        if (StringUtils.isNotEmpty(ds1)) {
+            WrapperQueryBuilder query = QueryBuilderUtils.getQuery(JSONObject.parseObject(ds1));
+            searchSourceBuilder.query(query);
+        }
+        searchRequest.source(searchSourceBuilder);
+        SearchResponse searchResponse = getClient.search(searchRequest);
+        HoloClient holoClient = HoloUtils.init();
+        TableSchema tableSchema = holoClient.getTableSchema(Constant.holo_res_tab);
+        List<com.alibaba.hologres.client.Put> rePut = Arrays.stream(searchResponse.getHits().getHits())
+                .map(SearchHit::getId).distinct()
+                .map(rowkey -> {
+                    com.alibaba.hologres.client.Put put = new com.alibaba.hologres.client.Put(tableSchema);
+                    put.setObject("tn", tn);
+                    put.setObject("rowkey", rowkey);
+                    return put;
+                })
+                .collect(Collectors.toList());
+
+        holoClient.put(rePut);
+        holoClient.close();
+    }
+
+
+    @Test
+    public void tmp_export() throws Exception {
+
+        List<Summary> company_zxr_restrict = Stream.of("612af42066c82c55e4acd3b1704c4a9d",
+                "9ff47e218565b15dfbbb1abf073d4c74",
+                "8ecf2b9b143e1898614014a60c0e1d66",
+                "a70f0b8d2cae8acea3a2e6a6ad379121",
+                "2da7ac6f94e1b1f7851fdb8571c76622",
+                "1655bca86ec04baa42602930f15155e0",
+                "0f39074fce607d59d8e3c6ca5bbb2def",
+                "5b1a8719ac81efc77759a3afc899ebbe",
+                "d8c7aaca494787c2b8c3bba55728aea2",
+                "0f89e4e32c91204c934d3e6e3561c159",
+                "ac6b8267d9c1af4ca7bbf5196a831af6",
+                "635b78efaf35b06e9153de7b78a0e912",
+                "6789589cb652cda0691815f5f2c5c594",
+                "e406a04cfa7720eabef74dd3c0824636",
+                "7344c4c0b8ec98a2ee8aeb6e05a20bf8",
+                "b31e260be2bee4c24e22c7250b18590f",
+                "285bce8f3445cd65113f9d8f05465323",
+                "35421836922edea2c85cfc70a991b46f",
+                "2727bf887088ee84a5a84676fecbdb6e",
+                "789f5b27c018947acf7a65fbb0c93a4d")
+                .distinct()
+                .map(x -> Summary.builder().rowkey(x).tn("company_zxr_restrict").build())
+                .collect(Collectors.toList());
+
+        List<Summary> company_zxr = Stream.of("5addcaae164360fdad8e0a788999f6e0",
+                "db8b9d7c151e52a9f45a29232252e381",
+                "0287b6fa104f6b10db054e20ab906e54",
+                "85cc34393ac69c0b5ffb8803d53791e4",
+                "6c65ff8c9967ca176ca5c3768fdec93a",
+                "7c5c90004cc5c76191fdf64fa589c304",
+                "8f7ba2d0ea2fe2506b8532c5d6671fb5",
+                "f0af5e1ac1c3f43ac5a9a71335829f65",
+                "22e4a2c1bb6743c8e584a0bbcde6fbf8",
+                "b416dd4d0e136883ac0c1b4b896d4e0a",
+                "5434c3aeb9bd3cee01822f1f3ad23997",
+                "9e6781198696c882e4ebf47e22e26ad1",
+                "1ccc8cdd745925f078aabfe7137885db",
+                "001f796b7ff33cf836a89a42458a9064",
+                "1377994c283f2ddda5a333fbab0b0d58",
+                "c7eb68466cc73900b0e2a9f9c4641b39",
+                "138513080f4d55f70dbbed8be1ff362e",
+                "9ea6db8e5899e9bebec882331331a629",
+                "62ee94a37fcb6e7fb6530ec28d3e011b",
+                "4e593f437521101eb7d221e0a8d2da08",
+                "f47c3609e361794ff8e7e1d77320d9c6")
+                .distinct()
+                .map(x -> Summary.builder().rowkey(x).tn("company_zxr").build())
+                .collect(Collectors.toList());
+
+        List<Summary> company_zxr_final_case = Stream.of("81c6c69e09330203064f9417030ec312",
+                "b71702a12dabfed3b51f3e324de1893c",
+                "5dc1bb9bcd2f58160a8b032f0d7d9798",
+                "7a807b1e6fde0968da89875c11eaf585",
+                "ad760e66acfe8f74710b6454e0ab4a88",
+                "c244007ec51d4428620a23a5233abf80",
+                "b810bb460052f3c29ce00b866026b34a",
+                "a8c01fc6bc6c0c94c59fef330c095636",
+                "126db7f60b058dc94234a1c89eb3d110",
+                "82a4a4a00e842a36189bccb9802b4066",
+                "1ad5e057a582b2b9a707e537f1149da3",
+                "0456134174f3788ee268e2caabd0a242",
+                "deaa1503ba2f98603a357ea473e8d975",
+                "3cfc0a7ef488c6f1023ba59fd1c29552",
+                "60a1474d92dc7d8bb2f0c9329d97b38f",
+                "ff4ca4403de1c04150f575cc79de3ef0",
+                "9d70d130812f5686e02e6fd4b6271de0",
+                "a66cbd21e17a1b11c2c4d8e56ebb6abd",
+                "69abe29235d200389929b0e5eeadb5fa",
+                "6f38d2ff9274ade00ee4068eb188632e")
+                .distinct()
+                .map(x -> Summary.builder().rowkey(x).tn("company_zxr_final_case").build())
+                .collect(Collectors.toList());
+
+        List<Summary> company_dishonest_info = Stream.of("58a5eaf8585971519bd0377e7965c773",
+                "5e102df1abe16b346e90c85612706838",
+                "5e102df1abe16b346e90c85612706838",
+                "b3c88843c01b4438c2a2e6e8f5eec08e",
+                "693c92cf94204f6ee548fb7f09bfaab5",
+                "e004386362884ea829ff838bb4ccebdf",
+                "e004386362884ea829ff838bb4ccebdf",
+                "abea2c980390ec12c0d18cc1d4f11588",
+                "ed7bc780b7b64533f8182b0605222f9e",
+                "a70f0b8d2cae8acea3a2e6a6ad379121",
+                "c576adae7f7a24bdf82b615de7ff1224",
+                "1bb6debee375f1757bc391955ec00274",
+                "9b146ed825de3a4370a481875b660970",
+                "ece4ec017beacb26726f4f9095e110a3",
+                "f331ce1b35c7148ea3b3556070cbb5a6",
+                "b3c88843c01b4438c2a2e6e8f5eec08e",
+                "8040c3ea644d865dfc7a0a7b4e42aae5",
+                "2b8508d708c83ad4aeeb2da3db8e6584",
+                "2b8508d708c83ad4aeeb2da3db8e6584",
+                "78b75e1536da4fc39881276a09f9a07f",
+                "f331ce1b35c7148ea3b3556070cbb5a6")
+                .distinct()
+                .map(x -> Summary.builder().rowkey(x).tn("company_dishonest_info").build())
+                .collect(Collectors.toList());
+
+        Map<String, List<Summary>> gList = Stream.concat(Stream.concat(Stream.concat(company_zxr_restrict.stream(), company_zxr.stream()), company_zxr_final_case.stream()), company_dishonest_info.stream())
+                .collect(Collectors.groupingBy(Summary::getTn));
+
+//        Map<String, List<Summary>> gList = pList.stream().collect(Collectors.groupingBy(Summary::getTn));
+        val futures = new ArrayList<CompletableFuture<List<JSONObject>>>();
+        gList.forEach((tn, keys) -> {
+            if (tn.equals("wenshu_detail_v2")) {
+                futures.add(fetchES(tn, keys));
+            } else if (tn.equals("company_equity_pledge")) {
+                futures.add(fetchESDemo(tn, keys));
+            } else {
+                futures.add(fetchHbase(tn, keys));
+            }
+        });
+
+        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
+        futures.forEach(x -> {
+            try {
+                x.get();
+            } catch (Exception e) {
+                throw new RuntimeException("error 1111 :" + e.getMessage(), e);
+            }
+        });
+
+
+    }
+
+    @Test
+    public void holoAndHbase2Excel() throws Exception {
+        HoloClient holoClient = HoloUtils.init();
+        String tmpSql = "SELECT * FROM " + Constant.holo_res_tab + "";
+        List<Summary> pList = HoloUtils.exec(holoClient, tmpSql, Summary.class);
+        holoClient.close();
+
+        Map<String, List<Summary>> gList = pList.stream().collect(Collectors.groupingBy(Summary::getTn));
+        val futures = new ArrayList<CompletableFuture<List<JSONObject>>>();
+        gList.forEach((tn, keys) -> {
+            if (tn.equals("wenshu_detail_v2")) {
+                futures.add(fetchES(tn, keys));
+            } else if (tn.equals("company_equity_pledge")) {
+                futures.add(fetchESDemo(tn, keys));
+            } else {
+                futures.add(fetchHbase(tn, keys));
+            }
+        });
+
+        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
+        futures.forEach(x -> {
+            try {
+                x.get();
+            } catch (Exception e) {
+                throw new RuntimeException("error 1111 :" + e.getMessage(), e);
+            }
+        });
+
+
+    }
+
+    private CompletableFuture<List<JSONObject>> fetchHbase(String tn, List<Summary> keys) {
+        String tableName = Constant.v8_list.contains(tn) ? "NG_" + tn.toUpperCase(Locale.ROOT) : "NG_RT_" + tn.toUpperCase(Locale.ROOT);
+        return CompletableFuture.supplyAsync(() -> {
+            List<JSONObject> re = new ArrayList<>();
+            val gs = keys.stream().map(Summary::getRowkey).
+                    distinct().map(vk -> vk.getBytes(StandardCharsets.UTF_8))
+                    .map(Get::new).collect(Collectors.toList());
+            try (val table = connection.getTable(TableName.valueOf(tableName))) {
+                val rs = table.get(gs);
+                if (rs != null) {
+                    re = Stream.of(rs)
+                            .filter(result -> result != null && !result.isEmpty())
+                            .map(BaseUtils::toJSONObjectLowerCase)
+                            .collect(Collectors.toList());
+                }
+            } catch (Exception e) {
+                throw new RuntimeException("fetchHbase error" + e.getMessage(), e);
+            }
+            saveExcel(tn, re);
+            return re;
+        }, BaseUtils.COMMON_POOL);
+    }
+
+    private CompletableFuture<List<JSONObject>> fetchES(String tn, List<Summary> keys) {
+        return CompletableFuture.supplyAsync(() -> {
+            List<JSONObject> re = new ArrayList<>();
+            String dsl = "{\n" +
+                    "  \"query\": {\n" +
+                    "    \"terms\": {\n" +
+                    "      \"_id\": [\n" +
+                    "        \"&ids\"\n" +
+                    "      ]\n" +
+                    "    }\n" +
+                    "  }\n" +
+                    "}";
+
+            String ds1 = dsl.replaceAll("&ids", keys.stream().map(Summary::getRowkey)
+                    .collect(Collectors.joining("\",\"", "", "")));
+            SearchRequest searchRequest = new SearchRequest("wenshu_detail2").types("wenshu_detail_type");
+            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
+            searchSourceBuilder.size(10000);
+            if (StringUtils.isNotEmpty(ds1)) {
+                WrapperQueryBuilder query = QueryBuilderUtils.getQuery(JSONObject.parseObject(ds1));
+                searchSourceBuilder.query(query);
+            }
+            searchRequest.source(searchSourceBuilder);
+            try {
+                SearchResponse searchResponse = getOldClient.search(searchRequest);
+                re = Arrays.stream(searchResponse.getHits().getHits()).map(x -> {
+                    Map<String, Object> sourceAsMap = x.getSourceAsMap();
+                    String id = x.getId();
+                    sourceAsMap.put("rowkey", id);
+                    JSONObject re1 = new JSONObject(sourceAsMap);
+                    return re1;
+                }).collect(Collectors.toList());
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+            saveExcel(tn, re);
+            return re;
+        }, BaseUtils.COMMON_POOL);
+    }
+
+    private CompletableFuture<List<JSONObject>> fetchESDemo(String tn, List<Summary> keys) {
+        return CompletableFuture.supplyAsync(() -> {
+            List<JSONObject> re = new ArrayList<>();
+            String dsl = "{\n" +
+                    "  \"query\": {\n" +
+                    "    \"terms\": {\n" +
+                    "      \"_id\": [\n" +
+                    "        \"&ids\"\n" +
+                    "      ]\n" +
+                    "    }\n" +
+                    "  }\n" +
+                    "}";
+
+            String ds1 = dsl.replaceAll("&ids", keys.stream().map(Summary::getRowkey)
+                    .collect(Collectors.joining("\",\"", "", "")));
+            SearchRequest searchRequest = new SearchRequest("winhc_index_rt_company_equity_pledge").types("_doc");
+            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
+            searchSourceBuilder.size(10000);
+            if (StringUtils.isNotEmpty(ds1)) {
+                WrapperQueryBuilder query = QueryBuilderUtils.getQuery(JSONObject.parseObject(ds1));
+                searchSourceBuilder.query(query);
+            }
+            searchRequest.source(searchSourceBuilder);
+            try {
+                SearchResponse searchResponse = getClient.search(searchRequest);
+                re = Arrays.stream(searchResponse.getHits().getHits()).map(x -> {
+                    Map<String, Object> sourceAsMap = x.getSourceAsMap();
+                    String id = x.getId();
+                    sourceAsMap.put("rowkey", id);
+                    JSONObject re1 = new JSONObject(sourceAsMap);
+                    return re1;
+                }).collect(Collectors.toList());
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+            saveExcel(tn, re);
+            return re;
+        }, BaseUtils.COMMON_POOL);
+    }
+
+    private void saveExcel(String tn, List<JSONObject> re) {
+        String tableName = Constant.v8_list.contains(tn) ? "inc_ads_" + tn : "inc_ads_" + tn + "_v9";
+        List<String> filter_list = new ArrayList<>();
+        if (Constant.tn_cols_filters.containsKey(tn)) {
+            filter_list = Constant.tn_cols_filters.get(tn);
+        }
+        final List<String> finalFilter_list = filter_list;
+        List<String> cols = OdpsUtils.getOdps()
+                .tables()
+                .get(tableName)
+                .getSchema()
+                .getColumns()
+                .stream()
+                .map(Column::getName)
+                .distinct()
+                .filter(x -> !finalFilter_list.contains(x))
+                .collect(Collectors.toList());
+
+        List<List<String>> head = cols.stream()
+                .map(Collections::singletonList)
+                .collect(Collectors.toList());
+
+        List<List<String>> dataList = re.stream().map(d -> {
+            List<String> dataRow = new ArrayList<>();
+            cols.forEach(c -> {
+                dataRow.add(d.getString(c));
+            });
+            return dataRow;
+        }).collect(Collectors.toList());
+
+        EasyExcelUtil easyExcelUtil = new EasyExcelUtil();
+
+        String path = "D:\\tmp\\test5\\" + tn + ".xlsx";
+        easyExcelUtil.init(path, "sheet1", head);
+        easyExcelUtil.doExportExcel(dataList);
+        //关闭流
+        easyExcelUtil.finish();
+    }
+
+    @Test
+    public void compare() throws Exception {
+        List<File> fileList = FileUtil.loopFiles(new File("D:\\tmp\\test4"), new FileFilter() {
+            @Override
+            public boolean accept(File pathname) {
+                if (pathname.getName().endsWith(".xlsx")) {
+                    return true;
+                }
+                return false;
+            }
+        });
+        List<String> out_tns = fileList.stream().map(d -> d.getName().replace(".xlsx", ""))
+                .collect(Collectors.toList());
+        System.out.println(out_tns);
+//        List<String> tmp_tns = tns_out.stream().distinct().collect(Collectors.toList());
+//        System.out.println(tmp_tns);
+
+        Map<String, Long> collect = tns_out.stream().collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));
+        System.out.println(collect);
+
+
+        String loss_tns = tns_out.stream().filter(x -> !out_tns.contains(x)).collect(Collectors.joining("\n"));
+        System.out.println(loss_tns);
+
+    }
+
+}

+ 0 - 413
src/test/java/com/winhc/task/SynMongoCompanySummary.java

@@ -1,413 +0,0 @@
-package com.winhc.task;
-
-import cn.hutool.core.io.FileUtil;
-import com.alibaba.fastjson.JSON;
-import com.alibaba.fastjson.JSONArray;
-import com.alibaba.fastjson.JSONObject;
-import com.alibaba.hologres.client.HoloClient;
-import com.alibaba.hologres.client.exception.HoloClientException;
-import com.aliyun.odps.Column;
-import com.winhc.task.bean.Summary;
-import com.winhc.task.common.ArgsCompanyJob;
-import com.winhc.task.common.Constant;
-import com.winhc.task.common.SummaryArgs;
-import com.winhc.task.job.EsScanSummaryJob;
-import com.winhc.task.util.*;
-import lombok.val;
-import org.apache.calcite.avatica.proto.Common;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.*;
-import org.apache.hadoop.hbase.filter.PageFilter;
-import org.elasticsearch.action.search.SearchRequest;
-import org.elasticsearch.action.search.SearchResponse;
-import org.elasticsearch.client.RestHighLevelClient;
-import org.elasticsearch.index.query.WrapperQueryBuilder;
-import org.elasticsearch.search.SearchHit;
-import org.elasticsearch.search.builder.SearchSourceBuilder;
-import org.junit.jupiter.api.Test;
-import org.postgresql.model.TableSchema;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.boot.test.context.SpringBootTest;
-
-import java.io.File;
-import java.io.FileFilter;
-import java.io.IOException;
-import java.nio.charset.StandardCharsets;
-import java.util.*;
-import java.util.concurrent.CompletableFuture;
-import java.util.function.Function;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-import java.util.stream.StreamSupport;
-
-import static com.winhc.task.common.Constant.hbase_scan_v8;
-import static com.winhc.task.common.Constant.tns_out;
-
-
-@SpringBootTest
-public class SynMongoCompanySummary {
-    @Autowired
-    EsScanSummaryJob esScanSummaryJob;
-
-    @Autowired
-    Connection connection;
-
-    @Autowired
-    @Qualifier(value = "v6")
-    RestHighLevelClient getClient;
-
-    @Autowired
-    @Qualifier(value = "v5")
-    RestHighLevelClient getOldClient;
-
-
-    @Test
-    public void start() throws IOException {
-        esScanSummaryJob.start();
-    }
-
-    @Test
-    public void start2() throws Exception {
-
-        //计算所有ids
-        esScanSummaryJob.start();
-        //v9 holo->holo 聚合v8 rowkey
-        holo();
-        //v8 es->holo 聚合v9 rowkey
-        es2HoloV8();
-        //v8&v9 holo->hbase->excel 数据导出excel
-        holoAndHbase2Excel();
-        //v8&v9 hbase->excel 子表scan  数据导出excel
-        scanHbase2Excel();
-    }
-
-    @Test
-    public void holo() throws Exception {
-        List<String> tns = SummaryArgs.SUMMARY_ARGS.keySet()
-                .stream()
-                .filter(tns_out::contains)
-                .distinct()
-                .collect(Collectors.toList());
-        String resTable = Constant.holo_res_tab;
-        //清空表
-        HoloClient holoClient = HoloUtils.init();
-        HoloUtils.exexSql(holoClient, "truncate table " + resTable);
-        holoClient.close();
-        tns.forEach(tn -> {
-            try {
-                rowkey2HoloV9(tn, resTable);
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-        });
-
-    }
-
-    public void rowkey2HoloV9(String tn, String resTable) throws HoloClientException {
-
-        SummaryArgs s = SummaryArgs.SUMMARY_ARGS.get(tn);
-        String sql = "INSERT INTO " + resTable + "\n" +
-                "SELECT\n" +
-                "    rowkey, '&tn' as tn\n" +
-                "FROM\n" +
-                "    &tableName\n" +
-                "WHERE\n" +
-                "    &condition\n" +
-                "GROUP BY rowkey";
-
-        String ids = Constant.all_ids.stream().distinct()
-                .collect(Collectors.joining("','", "'", "'"));
-        String condition = s.getArgsInfo().stream()
-                .map(x -> x.getFilterField() + " && " + "ARRAY[" + ids + "]")
-                .collect(Collectors.joining(" or "));
-
-        HoloClient holoClient = HoloUtils.init();
-        String tableName = "ng_rt_" + tn;
-        String calc_sql = sql.replaceAll("&condition", condition)
-                .replaceAll("&tn", tn)
-                .replaceAll("&tableName", tableName);
-        System.out.println(calc_sql);
-        HoloUtils.exexSql(holoClient, calc_sql);
-        holoClient.close();
-
-    }
-
-    @Test
-    public void scanHbase2Excel() throws Exception {
-        List<String> ids = Constant.all_ids;
-        HoloClient holoClient = HoloUtils.init();
-        //补充年报rowkey
-        String tmpSql = "SELECT * FROM " + Constant.holo_res_tab + " where tn ='company_annual_report'";
-        List<Summary> pList = HoloUtils.exec(holoClient, tmpSql, Summary.class);
-        List<String> id_addr = pList.stream().map(Summary::getRowkey).distinct().collect(Collectors.toList());
-        ids.addAll(id_addr);
-        holoClient.close();
-
-        val futures = new ArrayList<CompletableFuture<List<JSONObject>>>();
-        Stream.concat(Constant.hbase_scan_v9.stream(), hbase_scan_v8.stream()).forEach(tn -> {
-            ids.forEach(rowkey -> {
-                CompletableFuture<List<JSONObject>> rew = CompletableFuture.supplyAsync(() -> {
-                    List<JSONObject> re;
-                    String tableName = hbase_scan_v8.contains(tn) ? "NG_" + tn.toUpperCase(Locale.ROOT) : "NG_RT_" + tn.toUpperCase(Locale.ROOT);
-                    try (val table = connection.getTable(TableName.valueOf(tableName))) {
-                        Scan scan = new Scan();
-                        scan.setRowPrefixFilter(rowkey.getBytes());
-                        PageFilter pageFilter = new PageFilter(100);
-                        scan.setFilter(pageFilter);
-                        ResultScanner scanner = table.getScanner(scan);
-                        re = StreamSupport.stream(scanner.spliterator(), false)
-                                .filter(result -> result != null && !result.isEmpty())
-                                .map(BaseUtils::toJSONObjectLowerCase)
-                                .map(j -> j.fluentPut("tn", tn))
-                                .collect(Collectors.toList());
-
-                    } catch (Exception e) {
-                        throw new RuntimeException("fetchHbase error" + e.getMessage(), e);
-                    }
-                    return re;
-                }, BaseUtils.COMMON_POOL);
-                futures.add(rew);
-            });
-        });
-
-        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
-        List<JSONObject> resList = futures.stream().flatMap(x -> {
-            try {
-                return x.get().stream();
-            } catch (Exception e) {
-                throw new RuntimeException(" 222, fetch data error" + e.getMessage(), e);
-            }
-        }).filter(Objects::nonNull).collect(Collectors.toList());
-
-        Map<String, List<JSONObject>> dataList = resList.stream()
-                .collect(Collectors.groupingBy(o -> o.getString("tn")));
-
-        dataList.forEach(this::saveExcel);
-    }
-
-    @Test
-    public void es2HoloV8() throws Exception {
-        List<String> v8_tns = Arrays.asList(
-                "company_ipr_pledge",
-                "company_bid",
-                "company_employment",
-                "company_certificate",
-                "company_customs_credit",
-                "company_bond",
-                "company_tele_license",
-                "company_weibo",
-                "company_wechat");
-        v8_tns.forEach(tn -> {
-            try {
-                calc_v8(tn);
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-        });
-    }
-
-    public void calc_v8(String tn) throws Exception {
-
-        String dsl = "{\n" +
-                "  \"query\": {\n" +
-                "    \"bool\": {\n" +
-                "      \"should\": [\n" +
-                "        &condition\n" +
-                "      ],\n" +
-                "      \"minimum_should_match\": 1\n" +
-                "    }\n" +
-                "  }\n" +
-                "}";
-
-        List<JSONObject> collect = Constant.es_filters.get(tn).stream()
-                .map(col -> new JSONObject()
-                        .fluentPut("terms", new JSONObject()
-                                .fluentPut(col, JSONArray.parseArray(JSON.toJSONString(Constant.all_ids))))).collect(Collectors.toList());
-        String ds1 = dsl.replaceAll("&condition", JSONObject.toJSONString(collect));
-
-        SearchRequest searchRequest = new SearchRequest("winhc_index_" + tn).types("_doc");
-        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
-        searchSourceBuilder.size(10000);
-        if (StringUtils.isNotEmpty(ds1)) {
-            WrapperQueryBuilder query = QueryBuilderUtils.getQuery(JSONObject.parseObject(ds1));
-            searchSourceBuilder.query(query);
-        }
-        searchRequest.source(searchSourceBuilder);
-        SearchResponse searchResponse = getClient.search(searchRequest);
-        HoloClient holoClient = HoloUtils.init();
-        TableSchema tableSchema = holoClient.getTableSchema(Constant.holo_res_tab);
-        List<com.alibaba.hologres.client.Put> rePut = Arrays.stream(searchResponse.getHits().getHits())
-                .map(SearchHit::getId).distinct()
-                .map(rowkey -> {
-                    com.alibaba.hologres.client.Put put = new com.alibaba.hologres.client.Put(tableSchema);
-                    put.setObject("tn", tn);
-                    put.setObject("rowkey", rowkey);
-                    return put;
-                })
-                .collect(Collectors.toList());
-
-        holoClient.put(rePut);
-        holoClient.close();
-    }
-
-
-    @Test
-    public void holoAndHbase2Excel() throws Exception {
-        HoloClient holoClient = HoloUtils.init();
-        String tmpSql = "SELECT * FROM " + Constant.holo_res_tab + "";
-        List<Summary> pList = HoloUtils.exec(holoClient, tmpSql, Summary.class);
-        holoClient.close();
-
-        Map<String, List<Summary>> gList = pList.stream().collect(Collectors.groupingBy(Summary::getTn));
-        val futures = new ArrayList<CompletableFuture<List<JSONObject>>>();
-        gList.forEach((tn, keys) -> {
-            if (!tn.equals("wenshu_detail_v2")) {
-                futures.add(fetchHbase(tn, keys));
-            } else {
-                futures.add(fetchES(tn, keys));
-            }
-        });
-
-        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
-        futures.forEach(x -> {
-            try {
-                x.get();
-            } catch (Exception e) {
-                throw new RuntimeException("error 1111 :" + e.getMessage(), e);
-            }
-        });
-
-
-    }
-
-    private CompletableFuture<List<JSONObject>> fetchHbase(String tn, List<Summary> keys) {
-        String tableName = Constant.v8_list.contains(tn) ? "NG_" + tn.toUpperCase(Locale.ROOT) : "NG_RT_" + tn.toUpperCase(Locale.ROOT);
-        return CompletableFuture.supplyAsync(() -> {
-            List<JSONObject> re = new ArrayList<>();
-            val gs = keys.stream().map(Summary::getRowkey).
-                    distinct().map(vk -> vk.getBytes(StandardCharsets.UTF_8))
-                    .map(Get::new).collect(Collectors.toList());
-            try (val table = connection.getTable(TableName.valueOf(tableName))) {
-                val rs = table.get(gs);
-                if (rs != null) {
-                    re = Stream.of(rs)
-                            .filter(result -> result != null && !result.isEmpty())
-                            .map(BaseUtils::toJSONObjectLowerCase)
-                            .collect(Collectors.toList());
-                }
-            } catch (Exception e) {
-                throw new RuntimeException("fetchHbase error" + e.getMessage(), e);
-            }
-            saveExcel(tn, re);
-            return re;
-        }, BaseUtils.COMMON_POOL);
-    }
-
-    private CompletableFuture<List<JSONObject>> fetchES(String tn, List<Summary> keys) {
-        return CompletableFuture.supplyAsync(() -> {
-            List<JSONObject> re = new ArrayList<>();
-            String dsl = "{\n" +
-                    "  \"query\": {\n" +
-                    "    \"terms\": {\n" +
-                    "      \"_id\": [\n" +
-                    "        \"&ids\"\n" +
-                    "      ]\n" +
-                    "    }\n" +
-                    "  }\n" +
-                    "}";
-
-            String ds1 = dsl.replaceAll("&ids", keys.stream().map(Summary::getRowkey)
-                    .collect(Collectors.joining("\",\"", "", "")));
-            SearchRequest searchRequest = new SearchRequest("wenshu_detail2").types("wenshu_detail_type");
-            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
-            searchSourceBuilder.size(10000);
-            if (StringUtils.isNotEmpty(ds1)) {
-                WrapperQueryBuilder query = QueryBuilderUtils.getQuery(JSONObject.parseObject(ds1));
-                searchSourceBuilder.query(query);
-            }
-            searchRequest.source(searchSourceBuilder);
-            try {
-                SearchResponse searchResponse = getOldClient.search(searchRequest);
-                re = Arrays.stream(searchResponse.getHits().getHits()).map(x -> {
-                    Map<String, Object> sourceAsMap = x.getSourceAsMap();
-                    String id = x.getId();
-                    sourceAsMap.put("rowkey", id);
-                    JSONObject re1 = new JSONObject(sourceAsMap);
-                    return re1;
-                }).collect(Collectors.toList());
-            } catch (IOException e) {
-                e.printStackTrace();
-            }
-            saveExcel(tn, re);
-            return re;
-        }, BaseUtils.COMMON_POOL);
-    }
-
-    private void saveExcel(String tn, List<JSONObject> re) {
-        String tableName = Constant.v8_list.contains(tn) ? "inc_ads_" + tn : "inc_ads_" + tn + "_v9";
-        List<String> filter_list = new ArrayList<>();
-        if (Constant.tn_cols_filters.containsKey(tn)) {
-            filter_list = Constant.tn_cols_filters.get(tn);
-        }
-        final List<String> finalFilter_list = filter_list;
-        List<String> cols = OdpsUtils.getOdps()
-                .tables()
-                .get(tableName)
-                .getSchema()
-                .getColumns()
-                .stream()
-                .map(Column::getName)
-                .distinct()
-                .filter(x -> !finalFilter_list.contains(x))
-                .collect(Collectors.toList());
-
-        List<List<String>> head = cols.stream()
-                .map(Collections::singletonList)
-                .collect(Collectors.toList());
-
-        List<List<String>> dataList = re.stream().map(d -> {
-            List<String> dataRow = new ArrayList<>();
-            cols.forEach(c -> {
-                dataRow.add(d.getString(c));
-            });
-            return dataRow;
-        }).collect(Collectors.toList());
-
-        EasyExcelUtil easyExcelUtil = new EasyExcelUtil();
-
-        String path = "D:\\tmp\\test4\\" + tn + ".xlsx";
-        easyExcelUtil.init(path, "sheet1", head);
-        easyExcelUtil.doExportExcel(dataList);
-        //关闭流
-        easyExcelUtil.finish();
-    }
-
-    @Test
-    public void compare() throws Exception {
-        List<File> fileList = FileUtil.loopFiles(new File("D:\\tmp\\test4"), new FileFilter() {
-            @Override
-            public boolean accept(File pathname) {
-                if (pathname.getName().endsWith(".xlsx")) {
-                    return true;
-                }
-                return false;
-            }
-        });
-        List<String> out_tns = fileList.stream().map(d -> d.getName().replace(".xlsx", ""))
-                .collect(Collectors.toList());
-        System.out.println(out_tns);
-//        List<String> tmp_tns = tns_out.stream().distinct().collect(Collectors.toList());
-//        System.out.println(tmp_tns);
-
-        Map<String, Long> collect = tns_out.stream().collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));
-        System.out.println(collect);
-
-
-        String loss_tns = tns_out.stream().filter(x -> !out_tns.contains(x)).collect(Collectors.joining("\n"));
-        System.out.println(loss_tns);
-
-    }
-
-}