Browse Source

全维度样例输出

xufei 1 year ago
parent
commit
76b613511b

+ 24 - 0
src/main/java/com/winhc/task/bean/Summary.java

@@ -0,0 +1,24 @@
+package com.winhc.task.bean;
+
+import lombok.*;
+
+/**
+ * @Author: π
+ */
+@Getter
+@Setter
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class Summary {
+    private String rowkey;
+    private String tn;
+
+    @Override
+    public String toString() {
+        return "Summary{" +
+                "rowkey='" + rowkey + '\'' +
+                ", tn='" + tn + '\'' +
+                '}';
+    }
+}

File diff suppressed because it is too large
+ 1017 - 0
src/main/java/com/winhc/task/common/ArgsCompanyJob.java


File diff suppressed because it is too large
+ 78 - 3
src/main/java/com/winhc/task/common/Constant.java


+ 108 - 0
src/main/java/com/winhc/task/common/MetaInfo.java

@@ -0,0 +1,108 @@
+package com.winhc.task.common;
+
+import com.alibaba.fastjson.JSONObject;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.Map;
+import java.util.function.Function;
+
+/**
+ * @author ZhangJi
+ * @since 2021-10-22 11:13
+ */
+@Data
+@NoArgsConstructor
+public class MetaInfo {
+    private CompanyInfo company;
+    private ElasticSearchInfo es;
+    private HbaseInfo hbase;
+    private HologresInfo holo;
+
+    public MetaInfo(CompanyInfo company, ElasticSearchInfo es, HbaseInfo hbase, HologresInfo holo) {
+        this.company = company;
+        this.es = es;
+        this.hbase = hbase;
+        this.holo = holo;
+    }
+
+    @Data
+    @NoArgsConstructor
+    public static class CompanyInfo {
+        private String companyId;
+        private String companyName;
+
+        public CompanyInfo(String companyId, String companyName) {
+            this.companyId = companyId;
+            this.companyName = companyName;
+        }
+    }
+
+
+    @Data
+    @NoArgsConstructor
+    public static class ElasticSearchInfo {
+        private String index;
+        private String type;
+
+        private int version;
+
+        public ElasticSearchInfo(String index, String type) {
+            this.index = index;
+            this.type = type;
+            this.version = 0;
+        }
+
+        public ElasticSearchInfo(String index, String type, int version) {
+            this.index = index;
+            this.type = type;
+            this.version = version;
+        }
+    }
+
+    @Data
+    @NoArgsConstructor
+
+    public static class HbaseInfo {
+        private String table;
+        private String cf;
+        private Function<JSONObject, JSONObject> handle;
+
+        public HbaseInfo(String table, String cf) {
+            this.table = table;
+            this.cf = cf;
+        }
+
+        public HbaseInfo(String table, String cf, Function<JSONObject, JSONObject> handle) {
+            this.table = table;
+            this.cf = cf;
+            this.handle = handle;
+        }
+    }
+
+    @Data
+    @NoArgsConstructor
+    public static class HologresInfo {
+        private String table;
+
+        public HologresInfo(String table, String shema) {
+            this.table = table;
+            this.shema = shema;
+        }
+
+        private String shema;
+    }
+
+    @Data
+    @NoArgsConstructor
+    public static class GroupByInfo {
+        private String field;
+        private Map<String, String> value_alias;
+
+        public GroupByInfo(String field, Map<String, String> value_alias) {
+            this.field = field;
+            this.value_alias = value_alias;
+        }
+    }
+
+}

+ 16 - 0
src/main/java/com/winhc/task/common/SummaryArgs.java

@@ -1348,6 +1348,22 @@ public class SummaryArgs {
             return new ArrayList<>(set);
             return new ArrayList<>(set);
         }
         }
         ));
         ));
+        a.put("company_profile", new SummaryArgs(
+                "ng_rt_company_profile"
+                , Collections.singletonList(new ArgsInfo(
+                "company_id"
+                , ""
+                , "deleted"))
+                , Arrays.asList(new ValueAlias(
+                        ""
+                        , "0"
+                        , "company_profile_del_0"),
+                new ValueAlias(
+                        ""
+                        , "1"
+                        , "company_profile_del_1"))
+                , (j) -> Collections.singletonList(j.getString("company_id"))
+        ));
         return a;
         return a;
     }
     }
 
 

+ 41 - 1
src/main/java/com/winhc/task/configuration/ElasticSearchConfiguration.java

@@ -12,6 +12,7 @@ import org.elasticsearch.client.RestHighLevelClient;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.beans.factory.annotation.Value;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Primary;
 
 
 import java.util.stream.Stream;
 import java.util.stream.Stream;
 
 
@@ -29,6 +30,13 @@ public class ElasticSearchConfiguration {
     @Value("${es.host}")
     @Value("${es.host}")
     private String host;
     private String host;
 
 
+    @Value("${es.username1}")
+    private String username1;
+    @Value("${es.password1}")
+    private String password1;
+    @Value("${es.host1}")
+    private String host1;
+
     @Value("${es.schema:http}")
     @Value("${es.schema:http}")
     String schema;
     String schema;
     @Value(value = "${es.connect-timeout:100000}")
     @Value(value = "${es.connect-timeout:100000}")
@@ -58,7 +66,8 @@ public class ElasticSearchConfiguration {
     }
     }
 
 
 
 
-    @Bean
+    @Bean("v6")
+    @Primary
     public RestHighLevelClient getClient() {
     public RestHighLevelClient getClient() {
         HttpHost[] httpHosts = Stream.of(host.split(",")).map(host -> {
         HttpHost[] httpHosts = Stream.of(host.split(",")).map(host -> {
             String[] split = host.split(":");
             String[] split = host.split(":");
@@ -88,4 +97,35 @@ public class ElasticSearchConfiguration {
                 }).build()
                 }).build()
         );
         );
     }
     }
+
+    @Bean("v5")
+    public RestHighLevelClient getOldClient() {
+        HttpHost[] httpHosts = Stream.of(host1.split(",")).map(host -> {
+            String[] split = host.split(":");
+            return new HttpHost(split[0], 9200, schema);
+        }).toArray(HttpHost[]::new);
+
+        // 阿里云Elasticsearch集群需要basic auth验证。
+        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+        //访问用户名和密码为您创建阿里云Elasticsearch实例时设置的用户名和密码,也是Kibana控制台的登录用户名和密码。
+        credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username1, password1));
+
+
+        return new RestHighLevelClient(RestClient
+                .builder(httpHosts)
+                .setMaxRetryTimeoutMillis(60000 * 3)
+                .setRequestConfigCallback(builder -> {
+                    builder.setConnectTimeout(1000 * 3);
+                    builder.setSocketTimeout(60000 * 3);
+                    builder.setConnectionRequestTimeout(0);
+                    return builder;
+                })
+                .setHttpClientConfigCallback(httpAsyncClientBuilder -> {
+                    httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
+                    httpAsyncClientBuilder.setMaxConnTotal(100 * 3);
+                    httpAsyncClientBuilder.setMaxConnPerRoute(100 * 3);
+                    return httpAsyncClientBuilder;
+                }).build()
+        );
+    }
 }
 }

+ 6 - 0
src/main/java/com/winhc/task/dao/impl/SearchDaoImpl.java

@@ -20,6 +20,8 @@ import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
 import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
 import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder;
 import org.elasticsearch.search.rescore.RescoreBuilder;
 import org.elasticsearch.search.rescore.RescoreBuilder;
 import org.elasticsearch.search.sort.SortBuilder;
 import org.elasticsearch.search.sort.SortBuilder;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.stereotype.Repository;
 import org.springframework.stereotype.Repository;
 
 
 import java.util.HashMap;
 import java.util.HashMap;
@@ -35,6 +37,10 @@ import java.util.List;
 public class SearchDaoImpl implements SearchDao {
 public class SearchDaoImpl implements SearchDao {
     private RestHighLevelClient restHighLevelClient;
     private RestHighLevelClient restHighLevelClient;
 
 
+//    @Qualifier("v6")
+//    @Autowired
+//    RestHighLevelClient getClient;
+
 
 
     private ObjectMapper mapper;
     private ObjectMapper mapper;
     private static final TypeReference<HashMap<String, Object>> typeRef
     private static final TypeReference<HashMap<String, Object>> typeRef

+ 185 - 0
src/main/java/com/winhc/task/job/EsScanSummaryJob.java

@@ -0,0 +1,185 @@
+package com.winhc.task.job;
+
+import cn.hutool.core.io.FileUtil;
+import cn.hutool.core.io.IORuntimeException;
+import cn.hutool.core.util.CharsetUtil;
+import com.alibaba.fastjson.JSONObject;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.model.InsertManyOptions;
+import com.mongodb.client.result.InsertManyResult;
+import com.winhc.task.common.ArgsCompanyJob;
+import com.winhc.task.common.Constant;
+import com.winhc.task.framework.es.EsFastScan;
+import com.winhc.task.util.QueryBuilderUtils;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.bson.BsonValue;
+import org.bson.Document;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.index.query.WrapperQueryBuilder;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.data.mongodb.core.MongoTemplate;
+import org.springframework.stereotype.Component;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+
+@Slf4j
+@Component
+@AllArgsConstructor
+public class EsScanSummaryJob {
+    private RestHighLevelClient restHighLevelClient;
+    private final MongoTemplate mongoTemplate;
+
+//    @Autowired
+//    @Qualifier(value = "v6")
+//    RestHighLevelClient getClient;
+
+    public void start() throws IOException {
+        String dsl0 = "{\n" +
+                "  \"query\": {\n" +
+                "    \"terms\": {\n" +
+                "      \"cname.show.keyword\": [\n" +
+                "        \"沈阳国际软件园产业发展有限公司\",\n" +
+                "        \"三橡股份有限公司\",\n" +
+                "        \"沈阳飞行船数码喷印设备有限公司\",\n" +
+                "        \"沈阳中之杰流体控制系统有限公司\",\n" +
+                "        \"沈阳中北通磁科技股份有限公司\",\n" +
+                "        \"沈阳沈大内窥镜有限公司\",\n" +
+                "        \"沈阳化工集团有限公司\",\n" +
+                "        \"华晨宝马汽车有限公司\",\n" +
+                "        \"特变电工沈阳变压器集团有限公司\",\n" +
+                "        \"沈阳电缆厂销售处\"\n" +
+                "      ]\n" +
+                "    }\n" +
+                "  }\n" +
+                "}";
+        String dsl1 = "{\n" +
+                "  \"query\": {\n" +
+                "    \"terms\": {\n" +
+                "      \"_id\": [\n" +
+                "        &ids " +
+                "      ]\n" +
+                "    }\n" +
+                "  }\n" +
+                "}";
+        scan(dsl0, dsl1);
+    }
+
+    public void scan(String dsl0, String dsl1) throws IOException {
+
+        SearchRequest searchRequest = new SearchRequest("winhc_index_rt_company").types("company");
+        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
+        searchSourceBuilder.size(10000);
+        if (StringUtils.isNotEmpty(dsl0)) {
+            WrapperQueryBuilder query = QueryBuilderUtils.getQuery(JSONObject.parseObject(dsl0));
+            searchSourceBuilder.query(query);
+        }
+        searchRequest.source(searchSourceBuilder);
+
+        SearchResponse searchResponse = restHighLevelClient.search(searchRequest);
+
+        List<String> ids = Arrays.stream(searchResponse.getHits().getHits()).map(SearchHit::getId).distinct().collect(Collectors.toList());
+        System.out.println(ids);
+        //补充维度缺失
+        List<String> id_add = Arrays.asList(
+                "91cf436e1959f7d8ccf3baf275f8ba17",
+                "575c80b44ab6a8d9ef8bcc101241c074",
+                "1aefccaa52fb392de544c641c7977e2b",
+                "3b316c0d15f765450f315d217f2dcaa5",
+                "baaf16ec0963bcecf62286fa1a38d5f5",
+                "b70ee50a247536457489c20c8a45d8c6",
+                "b5c2e4002f2caea6c2200b2d61c496f9",
+                "210a3c670ed4036ee8d89d0e33de52c4",
+                "4a80cef767b7979ec0f19acb2e5f1edb",
+                "843eb83e1b238c6a34cbf02c44d31b32",
+                "190f4302a77d2c5fec225d19c173f76e",
+                "dcd5b075ccef4385883ef8e8688a44c1",
+                "57453fd00b48456e33bef2105058a499",
+                "be9b3fb64b79d27eec53c282bdc86c11",
+                "6820fae3c815fb899214617b54efadf7",
+                "2cc25a8e43426abc893429b6b6f9cd22");//询价机构
+
+        //v9子表维度
+        List<String> id_add2 = Arrays.asList(
+                "f638994f9b45aed70b18f057d5215b97",
+                "b41f46dad10f3e311aeb5101fb5e1a99",
+                "98c5a5bb0405681672a8f1b06c30eaf2",
+                "c40d568887c2c24c1446c747847de3ab",
+                "79dba12967e83e080c3493ad4826e5dd",
+                "8ac84008ff8e0682fd5b9d385e9abcf1",
+                "bda6c23ecfeceff5d867a7fe922068bb",
+                "63ff6b100d7e1729fb1e18123ed88831",
+                "7649322a219335dfdfc3eea26c1d7dcf",
+                "285e6cbb96c57776c80b067c25aca51c",
+                "f5f04a699e0f488e46f676f0c99e14e6",
+                "9bf46052cab0fd62a81827ebe63a7be2",
+                "54038eefa74235b6031653fac59cd50f",
+                "609f846039bf87d3af3797e78476b372",
+                "af01810f76ba395a798cb9dafdebfeb2");
+
+        //v8 维度
+        List<String> id_add3 = Arrays.asList(
+                "0d21340a46b1a749e17b8fb260e1b6b6",
+                "ca19f7ef107045af8d66e4302261204f",
+                "aa0b8980af16ebbccfa39f2ca97891cc",
+                "653acbb94796852c1382f49ee445d25e",
+                "64eac2f662d5826c475a0cce1120e85d",
+                "c76b33a126a0a8a27cdc428b5395b1ec",
+                "dae0b5bea9342644e8acdbcca3f69a60",
+                "8e2298078c52221f5837909933732f70",
+                "5cb3e29512491ab8f2b16df5aa9c1d0e");
+        //v8子表维度
+        List<String> id_add4 = Arrays.asList(
+                "c1979818791446c47ae15885564bfcef", "0088f0bbc061e2d57ad301822ab92a44",
+                "b14b2e11bd959af3a9f022e8c7eb3ec5", "00329ba5d9ff6c7549496a9291791801",
+                "5a286c96967bc3516e8c8f055df53da4", "05ec800bff8f224b85329a86817149a4",
+                "73a1a35da3c1165eaa3ae385854405cb", "0000c8416a8f9b8e51b9d0bf8a7a5b95",
+                "dc4273e21f9d7b520ce01cb80b02f626", "03272efaf00d239fa240b03ee19a7ea7"
+        );
+
+        ids.addAll(id_add);
+        ids.addAll(id_add2);
+        ids.addAll(id_add3);
+        ids.addAll(id_add4);
+//        全部企业id
+        String all_ids = ids.stream().distinct().collect(Collectors.joining("\n"));
+        System.out.println(ids.stream().distinct().collect(Collectors.joining("\n")));
+        //path指定路径下的文件如不存在,则创建
+        try {
+            FileUtil.writeString(all_ids, Constant.all_ids_path, CharsetUtil.UTF_8);
+        } catch (IORuntimeException e) {
+            //抛出一个运行时异常(直接停止掉程序)
+            throw new RuntimeException("运行时异常", e);
+        }
+
+        String r_ids = ids.stream().distinct().collect(Collectors.joining("\",\"", "\"", "\""));
+        String dsl = dsl1.replace("&ids", r_ids);
+
+        Consumer<SearchHit[]> func = list -> {
+            List<String> li = Arrays.stream(list).map(d -> {
+                String index = d.getIndex();
+                return index.substring(0, index.lastIndexOf("_")).replace("ng_rt_summary_", "");
+            }).distinct().collect(Collectors.toList());
+
+            List<String> tns = ArgsCompanyJob.JOB_ARGS.keySet()
+                    .stream()
+                    .filter(Constant.tns_out::contains)
+                    .distinct()
+                    .collect(Collectors.toList());
+            String re = tns.stream().filter(m -> !li.contains(m)).collect(Collectors.joining("\n"));
+            System.out.println(re);
+        };
+        new EsFastScan(restHighLevelClient, func, "ng_rt_summary_company", "_doc", dsl, null).scan();
+    }
+
+}

+ 100 - 0
src/main/java/com/winhc/task/util/BaseUtils.java

@@ -8,11 +8,17 @@ import lombok.val;
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.codec.digest.DigestUtils;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.collections.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
+
 import java.text.DecimalFormat;
 import java.text.DecimalFormat;
 import java.text.ParseException;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.text.SimpleDateFormat;
 import java.util.*;
 import java.util.*;
+import java.util.concurrent.ForkJoinPool;
 import java.util.function.Function;
 import java.util.function.Function;
 import java.util.regex.Pattern;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import java.util.stream.Collectors;
@@ -51,6 +57,78 @@ public class BaseUtils {
     public static String dim2tn(String dim) {
     public static String dim2tn(String dim) {
         return "NG_RT_" + dim;
         return "NG_RT_" + dim;
     }
     }
+    public static String equity_info_rowkey(JSONArray pledgee_info, JSONArray pledgor_info) {
+        return getKey2Str("pledgee", pledgee_info) + " " + getKey2Str("pledgor", pledgor_info);
+    }
+
+    public static String getKey2Str(String key, JSONArray value) {
+        if (value == null || value.isEmpty()) {
+            return "";
+        }
+        HashSet<String> set = new HashSet<>();
+        value.forEach(x -> {
+            JSONObject j = (JSONObject) x;
+            set.add(j.getString(key));
+        });
+        return set.stream().distinct().sorted().collect(Collectors.joining("、"));
+    }
+
+    public static String getCompanyCourtOpenAnnouncementRowkey(String companyId, JSONObject j) {
+        String start_date = splitDate(j.getString("start_date"));
+        if (StringUtils.isEmpty(start_date)) {
+            return null;
+        }
+        JSONArray defendant_info = Optional.ofNullable(j.getJSONArray("defendant_info")).orElseGet(JSONArray::new);
+        JSONArray plaintiff_info = Optional.ofNullable(j.getJSONArray("plaintiff_info")).orElseGet(JSONArray::new);
+
+        String sorted_litigant = StreamEnhance.append(defendant_info.toJavaList(JSONObject.class).stream(), plaintiff_info.toJavaList(JSONObject.class).stream())
+                .map(l -> l.getString("name"))
+                .filter(StringUtils::isNotEmpty)
+                .distinct()
+                .sorted()
+                .collect(Collectors.joining());
+        if (StringUtils.isEmpty(sorted_litigant)) {
+            return null;
+        }
+        return md5(cleanup(concatws("", start_date, sorted_litigant)));
+
+    }
+
+    public static String get_text_from_json(String json, String name) {
+        if (StringUtils.isEmpty(json)) return "";
+        try {
+            List<JSONObject> list = JSON.parseArray(json, JSONObject.class);
+            if (list.isEmpty()) return "";
+            return list.stream().map(d -> d.getString(name))
+                    .filter(StringUtils::isNotBlank)
+                    .sorted(Comparator.naturalOrder())
+                    .collect(Collectors.joining(""));
+        } catch (Exception e) {
+            return "";
+        }
+    }
+    private static final Pattern first_p = Pattern.compile("[^\\u4e00-\\u9fa50-9a-zA-Z()()]");
+    private static final Pattern second_p = Pattern.compile("[((][^((]+[))]$");
+    private static final Pattern third_p = Pattern.compile("[^\\u4e00-\\u9fa50-9a-zA-Z]");
+    private static final Pattern html_p = Pattern.compile("<[^>]+>");
+    private static final Pattern date_format_p = Pattern.compile("^\\d{4}-\\d{2}-\\d{2}$");
+    private static final Pattern replace_char_p = Pattern.compile("[年月]");
+
+    public static String cleanupChange(String val) {
+        if (com.aliyun.odps.utils.StringUtils.isEmpty(val)) return "";
+        val = html_p.matcher(val).replaceAll("");
+        val = first_p.matcher(val).replaceAll("");
+        val = second_p.matcher(val).replaceAll("");
+        return third_p.matcher(val).replaceAll("");
+    }
+
+    public static String substr(String str, Integer num) {
+        if (StringUtils.isEmpty(str)) {
+            return "";
+        } else {
+            return str.substring(num);
+        }
+    }
 
 
     public static String formatDate(String date) {
     public static String formatDate(String date) {
         if (StringUtils.isEmpty(date)) return null;
         if (StringUtils.isEmpty(date)) return null;
@@ -86,6 +164,25 @@ public class BaseUtils {
         return TableName.valueOf("NG_RT_" + dim.toUpperCase(Locale.ROOT));
         return TableName.valueOf("NG_RT_" + dim.toUpperCase(Locale.ROOT));
     }
     }
 
 
+    public static JSONObject toJSONObjectLowerCase(Result r) {
+        if (r == null || r.isEmpty()) {
+            return null;
+        }
+        val rowkey = getROWString(r);
+        JSONObject result = new JSONObject();
+        result.put("rowkey", rowkey);
+        for (Cell cell : r.listCells()) {
+            val key = Bytes.toString(CellUtil.cloneQualifier(cell));
+            val value = Bytes.toString(CellUtil.cloneValue(cell));
+            result.put(key.toLowerCase(Locale.ENGLISH), StringUtils.isBlank(value) ? null : value);
+        }
+        return result;
+    }
+
+    public static String getROWString(Result r) {
+        return Bytes.toString(r.getRow());
+    }
+
     public static JSONObject transFields(JSONObject j, List<String> exportFields, Function<JSONObject, JSONObject> handles) {
     public static JSONObject transFields(JSONObject j, List<String> exportFields, Function<JSONObject, JSONObject> handles) {
         JSONObject tmp = new JSONObject(j);
         JSONObject tmp = new JSONObject(j);
         if (handles != null) {
         if (handles != null) {
@@ -336,6 +433,9 @@ public class BaseUtils {
         }
         }
     }
     }
 
 
+    public static ForkJoinPool COMMON_POOL = new ForkJoinPool(Math.max(Runtime.getRuntime().availableProcessors() * 3, 16));
+
+
     public static void main(String[] args) throws ParseException {
     public static void main(String[] args) throws ParseException {
         String u1 = "www.baijinggame.cn,baijinggame.com,www.baijinggame.com,yuxianweb.cn,www.yuxianweb.cn";
         String u1 = "www.baijinggame.cn,baijinggame.com,www.baijinggame.com,yuxianweb.cn,www.yuxianweb.cn";
         String u2 = "https://网络.中国,www.yuxianweb.com,baijinggame.cn,yuxianweb.cn,www.yuxianweb.cn";
         String u2 = "https://网络.中国,www.yuxianweb.com,baijinggame.cn,yuxianweb.cn,www.yuxianweb.cn";

+ 80 - 0
src/main/java/com/winhc/task/util/HoloUtils.java

@@ -7,13 +7,21 @@ import com.alibaba.hologres.client.model.WriteMode;
 import lombok.AllArgsConstructor;
 import lombok.AllArgsConstructor;
 import lombok.SneakyThrows;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
 import org.springframework.context.annotation.Configuration;
 
 
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.sql.ResultSet;
+import java.sql.ResultSetMetaData;
+import java.sql.SQLException;
 import java.sql.Statement;
 import java.sql.Statement;
+import java.util.*;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
 
 
 /**
 /**
  * @Author: π
  * @Author: π
@@ -67,4 +75,76 @@ public class HoloUtils {
         }).get(1000 * 60 * 30, TimeUnit.MILLISECONDS);
         }).get(1000 * 60 * 30, TimeUnit.MILLISECONDS);
     }
     }
 
 
+    @SneakyThrows
+    public static <T> List<T> exec(HoloClient holoClient, String sql, Class<T> typeClazz) {
+        List<T> list = new ArrayList<>();
+        Field[] declaredFields = typeClazz.getDeclaredFields();
+        Method[] declaredMethods = typeClazz.getDeclaredMethods();
+        Set<String> methodSet = Arrays.stream(declaredMethods).map(Method::getName).collect(Collectors.toSet());
+        try {
+            ResultSet resultSet1 = holoClient.sql(conn -> {
+                ResultSet resultSet;
+                try (Statement stat = conn.createStatement()) {
+                    resultSet = stat.executeQuery(sql);
+                }
+                return resultSet;
+            }).get();
+            Set<String> fieldSet = getFieldSet(resultSet1, typeClazz);
+
+            while (resultSet1.next()) {
+                T instance = typeClazz.newInstance();
+                for (Field field : declaredFields) {
+                    Class<?> type = field.getType();
+                    String fieldSetName = "set" + StringUtils.capitalize(field.getName());
+                    if (!methodSet.contains(fieldSetName) || !fieldSet.contains(field.getName())) {
+                        continue;
+                    }
+                    Method fieldSetMet = typeClazz.getMethod(fieldSetName, field
+                            .getType());
+                    switch (type.getSimpleName()) {
+                        case "String":
+                            fieldSetMet.invoke(instance, resultSet1.getString(field.getName()));
+                            break;
+                        case "Integer":
+                            fieldSetMet.invoke(instance, resultSet1.getInt(field.getName()));
+                            break;
+                        default:
+                            throw new RuntimeException();
+                    }
+                }
+                list.add(instance);
+            }
+            return list;
+        } catch (HoloClientException | InterruptedException | ExecutionException | SQLException e) {
+            log.error(e.getMessage(), e);
+            log.error("holo exec sql error: {}", e.getMessage());
+            throw new RuntimeException("holo exec sql error: " + e.getMessage(), e);
+        }
+    }
+    private static final Map<Class<?>, Set<String>> map = new HashMap<>();
+
+    private static Set<String> getFieldSet(ResultSet resultSet, Class<?> typeClazz) {
+        if (!map.containsKey(typeClazz)) {
+            synchronized (HoloUtils.class) {
+                if (!map.containsKey(typeClazz)) {
+                    try {
+                        ResultSetMetaData metaData = resultSet.getMetaData();
+                        int columnCount = metaData.getColumnCount();
+                        HashSet<String> set = new HashSet<>();
+                        for (int i = 1; i <= columnCount; i++) {
+                            String columnName = metaData.getColumnName(i);
+                            set.add(columnName);
+                        }
+                        map.put(typeClazz, set);
+                    } catch (Exception e) {
+                        log.error(e.getMessage(), e);
+                        e.printStackTrace();
+                        throw new RuntimeException(e);
+                    }
+                }
+            }
+        }
+        return map.get(typeClazz);
+    }
+
 }
 }

+ 32 - 0
src/main/java/com/winhc/task/util/OdpsUtils.java

@@ -0,0 +1,32 @@
+package com.winhc.task.util;
+
+import com.aliyun.odps.Odps;
+import com.aliyun.odps.account.Account;
+import com.aliyun.odps.account.AliyunAccount;
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * @Author: π
+ */
+@Slf4j
+public class OdpsUtils {
+    private static final Odps odps;
+
+    private static final String username = "LTAI4G4n7pAW8tUbJVkkZQPD";
+    private static final String password = "uNJOBskzcDqHq1TYG3m2rebR4c1009";
+
+    static {
+        Account account = new AliyunAccount(username, password);
+        odps = new Odps(account);
+        String odpsUrl = "http://service.odps.aliyun.com/api";
+        odps.setEndpoint(odpsUrl);
+        odps.setDefaultProject("winhc_ng");
+    }
+
+    public static Odps getOdps(){
+        return odps;
+    }
+
+
+
+}

+ 20 - 0
src/main/java/com/winhc/task/util/PreUDF.java

@@ -0,0 +1,20 @@
+package com.winhc.task.util;
+
+import org.apache.commons.lang3.StringUtils;
+
+/**
+ * @author ZhangJi
+ * @since 2021-10-22 15:26
+ */
+public class PreUDF {
+    public static String replace_rowkey(String use_user_defined_rowkey, String new_rowkey) {
+        if (StringUtils.isEmpty(use_user_defined_rowkey)) return new_rowkey;
+        return use_user_defined_rowkey;
+    }
+
+    public static String company_holder_rowkey(long holder_type, String holder_id, String holder_name) {
+        if (holder_type == 2) return holder_id;
+        return holder_name;
+    }
+
+}

+ 55 - 0
src/main/java/com/winhc/task/util/StreamEnhance.java

@@ -0,0 +1,55 @@
+package com.winhc.task.util;
+
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.stream.Stream;
+
+/**
+ * @author ZhangJi
+ * @since 2021-10-29 15:56
+ */
+public class StreamEnhance {
+    private static final Logger LOG = LoggerFactory.getLogger(StreamEnhance.class);
+
+    public static <T> void parForEach(Stream<T> stream, Consumer<T> consumer, ExecutorService es) {
+        try {
+            es.submit(() -> stream.parallel().forEach(consumer)).get();
+        } catch (InterruptedException | ExecutionException e) {
+            LOG.error("parallel stream failed", e);
+        }
+    }
+
+    public static <T> Stream<T> append(Stream<T> source, T first) {
+        return Stream.concat(source, Stream.of(first));
+    }
+
+    public static <T> Stream<T> append(Stream<T> source, Collection<T> elements) {
+        return Stream.concat(source, elements.stream());
+    }
+
+    public static <T> Stream<T> append(Stream<T> source, Stream<T> other) {
+        return Stream.concat(source, other);
+    }
+
+    public static <T> Predicate<T> distinctByKey(
+            Function<? super T, ?> keyExtractor) {
+
+        Map<Object, Boolean> seen = new ConcurrentHashMap<>();
+        return t -> seen.putIfAbsent(keyExtractor.apply(t), Boolean.TRUE) == null;
+    }
+
+    public static <T> Stream<T> distinctBy(Stream<T> source, Function<? super T, ?> keyExtractor) {
+        return source.filter(distinctByKey(keyExtractor));
+    }
+
+}

+ 9 - 3
src/main/resources/application.yml

@@ -19,12 +19,15 @@ spring:
 
 
   data:
   data:
     mongodb:
     mongodb:
-      uri: mongodb://itslaw:itslaw_168@dds-uf6ff5dfd9aef3641601-pub.mongodb.rds.aliyuncs.com:3717,dds-uf6ff5dfd9aef3642555-pub.mongodb.rds.aliyuncs.com:3717/itslaw?replicaSet=mgset-6501997
+      uri: mongodb://itslaw:itslaw_168@dds-uf6ff5dfd9aef3641601-pub.mongodb.rds.aliyuncs.com:3717,dds-uf6ff5dfd9aef3642555-pub.mongodb.rds.aliyuncs.com:3717/itslaw
 es:
 es:
   username: elastic
   username: elastic
   password: elastic_168
   password: elastic_168
   host: es-cn-oew22t8bw002iferu.public.elasticsearch.aliyuncs.com #new
   host: es-cn-oew22t8bw002iferu.public.elasticsearch.aliyuncs.com #new
-  #host: es-cn-0pp0r32zf000ipovd.public.elasticsearch.aliyuncs.com
+
+  username1: elastic
+  password1: elastic_168
+  host1: es-cn-zxu362ii6000oj8y3.public.elasticsearch.aliyuncs.com #old
 
 
 hbase:
 hbase:
   config:
   config:
@@ -45,7 +48,7 @@ spring:
   profiles: prod
   profiles: prod
   data:
   data:
     mongodb:
     mongodb:
-      uri: mongodb://itslaw:itslaw_168@dds-uf6ff5dfd9aef3641.mongodb.rds.aliyuncs.com:3717,dds-uf6ff5dfd9aef3642.mongodb.rds.aliyuncs.com:3717/itslaw?replicaSet=mgset-6501997
+      uri: mongodb://itslaw:itslaw_168@dds-uf6ff5dfd9aef3641.mongodb.rds.aliyuncs.com:3717,dds-uf6ff5dfd9aef3642.mongodb.rds.aliyuncs.com:3717/itslaw
 
 
   kafka:
   kafka:
     bootstrap-servers: 192.168.4.237:9092,192.168.4.235:9092,192.168.4.236:9092
     bootstrap-servers: 192.168.4.237:9092,192.168.4.235:9092,192.168.4.236:9092
@@ -79,6 +82,9 @@ es:
   password: elastic_168
   password: elastic_168
   host: es-cn-oew22t8bw002iferu.elasticsearch.aliyuncs.com #es-cn-0pp0r32zf000ipovd.elasticsearch.aliyuncs.com
   host: es-cn-oew22t8bw002iferu.elasticsearch.aliyuncs.com #es-cn-0pp0r32zf000ipovd.elasticsearch.aliyuncs.com
 
 
+  username1: elastic
+  password1: elastic_168
+  host1: es-cn-zxu362ii6000oj8y3.elasticsearch.aliyuncs.com #old
 hbase:
 hbase:
   config:
   config:
     hbase.zookeeper.quorum: hb-uf6m8e1nu4ivp06m5-master1-001.hbase.rds.aliyuncs.com:2181,hb-uf6m8e1nu4ivp06m5-master2-001.hbase.rds.aliyuncs.com:2181,hb-uf6m8e1nu4ivp06m5-master3-001.hbase.rds.aliyuncs.com:2181
     hbase.zookeeper.quorum: hb-uf6m8e1nu4ivp06m5-master1-001.hbase.rds.aliyuncs.com:2181,hb-uf6m8e1nu4ivp06m5-master2-001.hbase.rds.aliyuncs.com:2181,hb-uf6m8e1nu4ivp06m5-master3-001.hbase.rds.aliyuncs.com:2181

+ 413 - 0
src/test/java/com/winhc/task/SynMongoCompanySummary.java

@@ -0,0 +1,413 @@
+package com.winhc.task;
+
+import cn.hutool.core.io.FileUtil;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import com.alibaba.hologres.client.HoloClient;
+import com.alibaba.hologres.client.exception.HoloClientException;
+import com.aliyun.odps.Column;
+import com.winhc.task.bean.Summary;
+import com.winhc.task.common.ArgsCompanyJob;
+import com.winhc.task.common.Constant;
+import com.winhc.task.common.SummaryArgs;
+import com.winhc.task.job.EsScanSummaryJob;
+import com.winhc.task.util.*;
+import lombok.val;
+import org.apache.calcite.avatica.proto.Common;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.filter.PageFilter;
+import org.elasticsearch.action.search.SearchRequest;
+import org.elasticsearch.action.search.SearchResponse;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.index.query.WrapperQueryBuilder;
+import org.elasticsearch.search.SearchHit;
+import org.elasticsearch.search.builder.SearchSourceBuilder;
+import org.junit.jupiter.api.Test;
+import org.postgresql.model.TableSchema;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.boot.test.context.SpringBootTest;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.*;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+import static com.winhc.task.common.Constant.hbase_scan_v8;
+import static com.winhc.task.common.Constant.tns_out;
+
+
+@SpringBootTest
+public class SynMongoCompanySummary {
+    @Autowired
+    EsScanSummaryJob esScanSummaryJob;
+
+    @Autowired
+    Connection connection;
+
+    @Autowired
+    @Qualifier(value = "v6")
+    RestHighLevelClient getClient;
+
+    @Autowired
+    @Qualifier(value = "v5")
+    RestHighLevelClient getOldClient;
+
+
+    @Test
+    public void start() throws IOException {
+        esScanSummaryJob.start();
+    }
+
+    @Test
+    public void start2() throws Exception {
+
+        //计算所有ids
+        esScanSummaryJob.start();
+        //v9 holo->holo 聚合v8 rowkey
+        holo();
+        //v8 es->holo 聚合v9 rowkey
+        es2HoloV8();
+        //v8&v9 holo->hbase->excel 数据导出excel
+        holoAndHbase2Excel();
+        //v8&v9 hbase->excel 子表scan  数据导出excel
+        scanHbase2Excel();
+    }
+
+    @Test
+    public void holo() throws Exception {
+        List<String> tns = SummaryArgs.SUMMARY_ARGS.keySet()
+                .stream()
+                .filter(tns_out::contains)
+                .distinct()
+                .collect(Collectors.toList());
+        String resTable = Constant.holo_res_tab;
+        //清空表
+        HoloClient holoClient = HoloUtils.init();
+        HoloUtils.exexSql(holoClient, "truncate table " + resTable);
+        holoClient.close();
+        tns.forEach(tn -> {
+            try {
+                rowkey2HoloV9(tn, resTable);
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        });
+
+    }
+
+    public void rowkey2HoloV9(String tn, String resTable) throws HoloClientException {
+
+        SummaryArgs s = SummaryArgs.SUMMARY_ARGS.get(tn);
+        String sql = "INSERT INTO " + resTable + "\n" +
+                "SELECT\n" +
+                "    rowkey, '&tn' as tn\n" +
+                "FROM\n" +
+                "    &tableName\n" +
+                "WHERE\n" +
+                "    &condition\n" +
+                "GROUP BY rowkey";
+
+        String ids = Constant.all_ids.stream().distinct()
+                .collect(Collectors.joining("','", "'", "'"));
+        String condition = s.getArgsInfo().stream()
+                .map(x -> x.getFilterField() + " && " + "ARRAY[" + ids + "]")
+                .collect(Collectors.joining(" or "));
+
+        HoloClient holoClient = HoloUtils.init();
+        String tableName = "ng_rt_" + tn;
+        String calc_sql = sql.replaceAll("&condition", condition)
+                .replaceAll("&tn", tn)
+                .replaceAll("&tableName", tableName);
+        System.out.println(calc_sql);
+        HoloUtils.exexSql(holoClient, calc_sql);
+        holoClient.close();
+
+    }
+
+    @Test
+    public void scanHbase2Excel() throws Exception {
+        List<String> ids = Constant.all_ids;
+        HoloClient holoClient = HoloUtils.init();
+        //补充年报rowkey
+        String tmpSql = "SELECT * FROM " + Constant.holo_res_tab + " where tn ='company_annual_report'";
+        List<Summary> pList = HoloUtils.exec(holoClient, tmpSql, Summary.class);
+        List<String> id_addr = pList.stream().map(Summary::getRowkey).distinct().collect(Collectors.toList());
+        ids.addAll(id_addr);
+        holoClient.close();
+
+        val futures = new ArrayList<CompletableFuture<List<JSONObject>>>();
+        Stream.concat(Constant.hbase_scan_v9.stream(), hbase_scan_v8.stream()).forEach(tn -> {
+            ids.forEach(rowkey -> {
+                CompletableFuture<List<JSONObject>> rew = CompletableFuture.supplyAsync(() -> {
+                    List<JSONObject> re;
+                    String tableName = hbase_scan_v8.contains(tn) ? "NG_" + tn.toUpperCase(Locale.ROOT) : "NG_RT_" + tn.toUpperCase(Locale.ROOT);
+                    try (val table = connection.getTable(TableName.valueOf(tableName))) {
+                        Scan scan = new Scan();
+                        scan.setRowPrefixFilter(rowkey.getBytes());
+                        PageFilter pageFilter = new PageFilter(100);
+                        scan.setFilter(pageFilter);
+                        ResultScanner scanner = table.getScanner(scan);
+                        re = StreamSupport.stream(scanner.spliterator(), false)
+                                .filter(result -> result != null && !result.isEmpty())
+                                .map(BaseUtils::toJSONObjectLowerCase)
+                                .map(j -> j.fluentPut("tn", tn))
+                                .collect(Collectors.toList());
+
+                    } catch (Exception e) {
+                        throw new RuntimeException("fetchHbase error" + e.getMessage(), e);
+                    }
+                    return re;
+                }, BaseUtils.COMMON_POOL);
+                futures.add(rew);
+            });
+        });
+
+        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
+        List<JSONObject> resList = futures.stream().flatMap(x -> {
+            try {
+                return x.get().stream();
+            } catch (Exception e) {
+                throw new RuntimeException(" 222, fetch data error" + e.getMessage(), e);
+            }
+        }).filter(Objects::nonNull).collect(Collectors.toList());
+
+        Map<String, List<JSONObject>> dataList = resList.stream()
+                .collect(Collectors.groupingBy(o -> o.getString("tn")));
+
+        dataList.forEach(this::saveExcel);
+    }
+
+    @Test
+    public void es2HoloV8() throws Exception {
+        List<String> v8_tns = Arrays.asList(
+                "company_ipr_pledge",
+                "company_bid",
+                "company_employment",
+                "company_certificate",
+                "company_customs_credit",
+                "company_bond",
+                "company_tele_license",
+                "company_weibo",
+                "company_wechat");
+        v8_tns.forEach(tn -> {
+            try {
+                calc_v8(tn);
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        });
+    }
+
+    public void calc_v8(String tn) throws Exception {
+
+        String dsl = "{\n" +
+                "  \"query\": {\n" +
+                "    \"bool\": {\n" +
+                "      \"should\": [\n" +
+                "        &condition\n" +
+                "      ],\n" +
+                "      \"minimum_should_match\": 1\n" +
+                "    }\n" +
+                "  }\n" +
+                "}";
+
+        List<JSONObject> collect = Constant.es_filters.get(tn).stream()
+                .map(col -> new JSONObject()
+                        .fluentPut("terms", new JSONObject()
+                                .fluentPut(col, JSONArray.parseArray(JSON.toJSONString(Constant.all_ids))))).collect(Collectors.toList());
+        String ds1 = dsl.replaceAll("&condition", JSONObject.toJSONString(collect));
+
+        SearchRequest searchRequest = new SearchRequest("winhc_index_" + tn).types("_doc");
+        SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
+        searchSourceBuilder.size(10000);
+        if (StringUtils.isNotEmpty(ds1)) {
+            WrapperQueryBuilder query = QueryBuilderUtils.getQuery(JSONObject.parseObject(ds1));
+            searchSourceBuilder.query(query);
+        }
+        searchRequest.source(searchSourceBuilder);
+        SearchResponse searchResponse = getClient.search(searchRequest);
+        HoloClient holoClient = HoloUtils.init();
+        TableSchema tableSchema = holoClient.getTableSchema(Constant.holo_res_tab);
+        List<com.alibaba.hologres.client.Put> rePut = Arrays.stream(searchResponse.getHits().getHits())
+                .map(SearchHit::getId).distinct()
+                .map(rowkey -> {
+                    com.alibaba.hologres.client.Put put = new com.alibaba.hologres.client.Put(tableSchema);
+                    put.setObject("tn", tn);
+                    put.setObject("rowkey", rowkey);
+                    return put;
+                })
+                .collect(Collectors.toList());
+
+        holoClient.put(rePut);
+        holoClient.close();
+    }
+
+
+    @Test
+    public void holoAndHbase2Excel() throws Exception {
+        HoloClient holoClient = HoloUtils.init();
+        String tmpSql = "SELECT * FROM " + Constant.holo_res_tab + "";
+        List<Summary> pList = HoloUtils.exec(holoClient, tmpSql, Summary.class);
+        holoClient.close();
+
+        Map<String, List<Summary>> gList = pList.stream().collect(Collectors.groupingBy(Summary::getTn));
+        val futures = new ArrayList<CompletableFuture<List<JSONObject>>>();
+        gList.forEach((tn, keys) -> {
+            if (!tn.equals("wenshu_detail_v2")) {
+                futures.add(fetchHbase(tn, keys));
+            } else {
+                futures.add(fetchES(tn, keys));
+            }
+        });
+
+        CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).get();
+        futures.forEach(x -> {
+            try {
+                x.get();
+            } catch (Exception e) {
+                throw new RuntimeException("error 1111 :" + e.getMessage(), e);
+            }
+        });
+
+
+    }
+
+    private CompletableFuture<List<JSONObject>> fetchHbase(String tn, List<Summary> keys) {
+        String tableName = Constant.v8_list.contains(tn) ? "NG_" + tn.toUpperCase(Locale.ROOT) : "NG_RT_" + tn.toUpperCase(Locale.ROOT);
+        return CompletableFuture.supplyAsync(() -> {
+            List<JSONObject> re = new ArrayList<>();
+            val gs = keys.stream().map(Summary::getRowkey).
+                    distinct().map(vk -> vk.getBytes(StandardCharsets.UTF_8))
+                    .map(Get::new).collect(Collectors.toList());
+            try (val table = connection.getTable(TableName.valueOf(tableName))) {
+                val rs = table.get(gs);
+                if (rs != null) {
+                    re = Stream.of(rs)
+                            .filter(result -> result != null && !result.isEmpty())
+                            .map(BaseUtils::toJSONObjectLowerCase)
+                            .collect(Collectors.toList());
+                }
+            } catch (Exception e) {
+                throw new RuntimeException("fetchHbase error" + e.getMessage(), e);
+            }
+            saveExcel(tn, re);
+            return re;
+        }, BaseUtils.COMMON_POOL);
+    }
+
+    private CompletableFuture<List<JSONObject>> fetchES(String tn, List<Summary> keys) {
+        return CompletableFuture.supplyAsync(() -> {
+            List<JSONObject> re = new ArrayList<>();
+            String dsl = "{\n" +
+                    "  \"query\": {\n" +
+                    "    \"terms\": {\n" +
+                    "      \"_id\": [\n" +
+                    "        \"&ids\"\n" +
+                    "      ]\n" +
+                    "    }\n" +
+                    "  }\n" +
+                    "}";
+
+            String ds1 = dsl.replaceAll("&ids", keys.stream().map(Summary::getRowkey)
+                    .collect(Collectors.joining("\",\"", "", "")));
+            SearchRequest searchRequest = new SearchRequest("wenshu_detail2").types("wenshu_detail_type");
+            SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
+            searchSourceBuilder.size(10000);
+            if (StringUtils.isNotEmpty(ds1)) {
+                WrapperQueryBuilder query = QueryBuilderUtils.getQuery(JSONObject.parseObject(ds1));
+                searchSourceBuilder.query(query);
+            }
+            searchRequest.source(searchSourceBuilder);
+            try {
+                SearchResponse searchResponse = getOldClient.search(searchRequest);
+                re = Arrays.stream(searchResponse.getHits().getHits()).map(x -> {
+                    Map<String, Object> sourceAsMap = x.getSourceAsMap();
+                    String id = x.getId();
+                    sourceAsMap.put("rowkey", id);
+                    JSONObject re1 = new JSONObject(sourceAsMap);
+                    return re1;
+                }).collect(Collectors.toList());
+            } catch (IOException e) {
+                e.printStackTrace();
+            }
+            saveExcel(tn, re);
+            return re;
+        }, BaseUtils.COMMON_POOL);
+    }
+
+    private void saveExcel(String tn, List<JSONObject> re) {
+        String tableName = Constant.v8_list.contains(tn) ? "inc_ads_" + tn : "inc_ads_" + tn + "_v9";
+        List<String> filter_list = new ArrayList<>();
+        if (Constant.tn_cols_filters.containsKey(tn)) {
+            filter_list = Constant.tn_cols_filters.get(tn);
+        }
+        final List<String> finalFilter_list = filter_list;
+        List<String> cols = OdpsUtils.getOdps()
+                .tables()
+                .get(tableName)
+                .getSchema()
+                .getColumns()
+                .stream()
+                .map(Column::getName)
+                .distinct()
+                .filter(x -> !finalFilter_list.contains(x))
+                .collect(Collectors.toList());
+
+        List<List<String>> head = cols.stream()
+                .map(Collections::singletonList)
+                .collect(Collectors.toList());
+
+        List<List<String>> dataList = re.stream().map(d -> {
+            List<String> dataRow = new ArrayList<>();
+            cols.forEach(c -> {
+                dataRow.add(d.getString(c));
+            });
+            return dataRow;
+        }).collect(Collectors.toList());
+
+        EasyExcelUtil easyExcelUtil = new EasyExcelUtil();
+
+        String path = "D:\\tmp\\test4\\" + tn + ".xlsx";
+        easyExcelUtil.init(path, "sheet1", head);
+        easyExcelUtil.doExportExcel(dataList);
+        //关闭流
+        easyExcelUtil.finish();
+    }
+
+    @Test
+    public void compare() throws Exception {
+        List<File> fileList = FileUtil.loopFiles(new File("D:\\tmp\\test4"), new FileFilter() {
+            @Override
+            public boolean accept(File pathname) {
+                if (pathname.getName().endsWith(".xlsx")) {
+                    return true;
+                }
+                return false;
+            }
+        });
+        List<String> out_tns = fileList.stream().map(d -> d.getName().replace(".xlsx", ""))
+                .collect(Collectors.toList());
+        System.out.println(out_tns);
+//        List<String> tmp_tns = tns_out.stream().distinct().collect(Collectors.toList());
+//        System.out.println(tmp_tns);
+
+        Map<String, Long> collect = tns_out.stream().collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));
+        System.out.println(collect);
+
+
+        String loss_tns = tns_out.stream().filter(x -> !out_tns.contains(x)).collect(Collectors.joining("\n"));
+        System.out.println(loss_tns);
+
+    }
+
+}