xufei 2 سال پیش
والد
کامیت
af84d41736

+ 70 - 0
src/main/java/com/winhc/task/common/SummaryArgs.java

@@ -1261,6 +1261,76 @@ public class SummaryArgs {
                         , "company_public_announcement_del_1"))
                 , (j) -> Collections.singletonList(j.getString("company_id"))
         ));
+        a.put("company_tax", new SummaryArgs(
+                "ng_rt_company_tax"
+                , Collections.singletonList(new ArgsInfo(
+                "company_id"
+                , ""
+                , "deleted"))
+                , Arrays.asList(new ValueAlias(
+                        ""
+                        , "0"
+                        , "company_tax_del_0"),
+                new ValueAlias(
+                        ""
+                        , "1"
+                        , "company_tax_del_1"))
+                , (j) -> Collections.singletonList(j.getString("company_id"))
+        ));
+        a.put("company_patent", new SummaryArgs(
+                "ng_rt_company_patent"
+                , Arrays.asList(new ArgsInfo("applicant_name"
+                , ""
+                , "deleted")
+        )
+                , Arrays.asList(new ValueAlias(""
+                        , "0"
+                        , "company_patent_del_0")
+                , new ValueAlias(""
+                        , "1"
+                        , "company_patent_del_1")
+        )
+                , (j) -> {
+            Set<String> set = new HashSet<>(parseJsonArray(j.getString("applicant_name_info"), "$.keyno"));
+            return new ArrayList<>(set);
+        }
+        ));
+        a.put("company_copyright_reg", new SummaryArgs(
+                "ng_rt_company_copyright_reg"
+                , Arrays.asList(new ArgsInfo("author_nationality"
+                , ""
+                , "deleted")
+        )
+                , Arrays.asList(new ValueAlias(""
+                        , "0"
+                        , "company_copyright_reg_del_0")
+                , new ValueAlias(""
+                        , "1"
+                        , "company_copyright_reg_del_1")
+        )
+                , (j) -> {
+            Set<String> set = new HashSet<>(parseJsonArray(j.getString("author_nationality_info"), "$.keyno"));
+            return new ArrayList<>(set);
+        }
+        ));
+        a.put("company_copyright_works", new SummaryArgs(
+                "ng_rt_company_copyright_works"
+                , Arrays.asList(new ArgsInfo("author"
+                , ""
+                , "deleted")
+        )
+                , Arrays.asList(new ValueAlias(""
+                        , "0"
+                        , "company_copyright_works_del_0")
+                , new ValueAlias(""
+                        , "1"
+                        , "company_copyright_works_del_1")
+        )
+                , (j) -> {
+            Set<String> set = new HashSet<>(parseJsonArray(j.getString("author_info"), "$.keyno"));
+            return new ArrayList<>(set);
+        }
+        ));
         return a;
     }
 

+ 4 - 3
src/main/java/com/winhc/task/job/CalcSummaryJob.java

@@ -133,7 +133,7 @@ public class CalcSummaryJob {
     private List<Tuple> initSql(List<String> list, JobArgs jobArgs, ArrayList<String> tnList) {
         return SummaryArgs.SUMMARY_ARGS.entrySet()
                 .stream()
-                .filter(x -> (list.isEmpty() || list.contains(x.getKey())))
+                .filter(x -> (list.contains(x.getKey())))
                 .map(x -> {
                     String tn = x.getKey();
                     SummaryArgs args = x.getValue();
@@ -208,7 +208,7 @@ public class CalcSummaryJob {
     private void tranIndex(JobArgs jobArgs, String tn, String type) throws IOException {
         String c_index = Stream.of(jobArgs.getTargetIndexPrefix(), type, tn, jobArgs.getTargetIndexSuffix())
                 .filter(StringUtils::isNotBlank).collect(Collectors.joining("_"));
-        //esIndexJobs.deletedIndex(c_index);
+        esIndexJobs.deletedIndex(c_index);
         esIndexJobs.createIndex(c_index);
     }
 
@@ -230,7 +230,8 @@ public class CalcSummaryJob {
         removeList.stream().filter(x -> !x.getIndex().contains(targetIndexSuf))
                 .map(Alias::getIndex).distinct().forEach(i -> {
             try {
-                esIndexJobs.deletedIndex(i);
+                System.out.println("wait deleted index : \n " + i);
+                //esIndexJobs.deletedIndex(i);
             } catch (Exception e) {
                 e.printStackTrace();
                 log.error("deleted index error {}", i);

+ 40 - 37
src/main/java/com/winhc/task/util/SchemaInit.java

@@ -139,48 +139,51 @@ public class SchemaInit {
     }
 
     public static void main(String[] args) throws IOException {
-//        List<DataWorksFlowJob> list = new SchemaInit().getJobs();
-//        System.out.println(list);
-//        List<String> tns = Arrays.asList("zxr_evaluate_results", "company_zxr");
-//        String project = "winhc_ng";
-//        String flow = "syn_summary";
-//        String taskName = "summary_v9";
-//        String nodeId = "700005089066";
-//        String holoTable = "test_tmp_xf_sum_v9";
-//        String targetIndexPre = "ng_rt_summary";
-//        String targetIndexSuf = DateUtils.getYesterday_ymd();
-//        JobArgs jobArgs = JobArgs.builder()
-//                .project(project)
-//                .flow(flow)
-//                .taskName(taskName)
-//                .nodeId(nodeId)
-//                .holoTable(holoTable)
-//                .targetIndexPrefix(targetIndexPre)
-//                .targetIndexSuffix(targetIndexSuf)
-//                .build();
-//        System.out.println(new SchemaInit().getJobs(tns, jobArgs));
-
+        List<DataWorksFlowJob> list = new SchemaInit().getJobs();
+        System.out.println(list);
+        List<String> tns = Arrays.asList("zxr_evaluate_results", "company_zxr");
         String project = "winhc_ng";
-        String flow = "kafka_data_re_send";
-        String taskName = "kafka_data_re_send_task";
-        String topic = "test001";
-        String beginDateTime = "20220126";
-        String endDateTime = "20220129";
-        String bizdate = "20220127";
-        List<String> nodes = Arrays.asList("700005182395", "700005182396", "700005182398");
-        List<String> tns2 = Arrays.asList("zxr_evaluate_results", "company_zxr");
-        SynKafKaJobArgs jobArgs2 = SynKafKaJobArgs.builder()
+        String flow = "syn_summary";
+        String taskName = "summary_v9";
+        String nodeId = "700005089066";
+        String holoTable = "test_tmp_xf_sum_v9";
+        String targetIndexPre = "ng_rt_summary";
+        String targetIndexSuf = DateUtils.getYesterday_ymd();
+        JobArgs jobArgs = JobArgs.builder()
                 .project(project)
                 .flow(flow)
                 .taskName(taskName)
-                .nodeIds(nodes)
-                .beginDateTime(beginDateTime)
-                .endDateTime(endDateTime)
-                .bizdate(bizdate)
-                .topic(topic)
+                .nodeId(nodeId)
+                .holoTable(holoTable)
+                .targetIndexPrefix(targetIndexPre)
+                .targetIndexSuffix(targetIndexSuf)
                 .build();
-        List<DataWorksFlowJob> r = new SchemaInit().getJobs2(tns2, jobArgs2);
-        System.out.println(JSONObject.toJSONString(r, SerializerFeature.DisableCircularReferenceDetect));
+        List<DataWorksFlowJob> jobs = new SchemaInit().getJobs(tns, jobArgs);
+        Map<String, String> param = jobs.get(0).getTask().get(0).toNodeParam("20220722");
+        System.out.println(param);
+        System.out.println(new SchemaInit().getJobs(tns, jobArgs));
+
+//        String project = "winhc_ng";
+//        String flow = "kafka_data_re_send";
+//        String taskName = "kafka_data_re_send_task";
+//        String topic = "test001";
+//        String beginDateTime = "20220126";
+//        String endDateTime = "20220129";
+//        String bizdate = "20220127";
+//        List<String> nodes = Arrays.asList("700005182395", "700005182396", "700005182398");
+//        List<String> tns2 = Arrays.asList("zxr_evaluate_results", "company_zxr");
+//        SynKafKaJobArgs jobArgs2 = SynKafKaJobArgs.builder()
+//                .project(project)
+//                .flow(flow)
+//                .taskName(taskName)
+//                .nodeIds(nodes)
+//                .beginDateTime(beginDateTime)
+//                .endDateTime(endDateTime)
+//                .bizdate(bizdate)
+//                .topic(topic)
+//                .build();
+//        List<DataWorksFlowJob> r = new SchemaInit().getJobs2(tns2, jobArgs2);
+//        System.out.println(JSONObject.toJSONString(r, SerializerFeature.DisableCircularReferenceDetect));
 
     }
 }

+ 18 - 3
src/test/java/com/winhc/task/DataWorksSummaryJob.java

@@ -14,6 +14,8 @@ import java.io.IOException;
 import java.util.*;
 import java.util.stream.Collectors;
 
+import static org.junit.Assert.assertEquals;
+
 
 @SpringBootTest
 @Slf4j
@@ -27,14 +29,15 @@ public class DataWorksSummaryJob {
 
         //是否同步数据
         Boolean synData = true;
-        //synData = false;
         //过滤集合(集合为空跑全量)
         //List<String> filter = Arrays.asList("company_holder", "company_staff", "company_change");
         List<String> filter = new ArrayList<>();
 
         //确定重跑集合(集合为空跑全量)
 
-        List<String> add = Arrays.asList("company_punishment_info");
+        List<String> add = Arrays.asList("company_patent");
+        //List<String> add = Arrays.asList("company_copyright_reg","company_copyright_works");
+        //List<String> add = Arrays.asList("company_patent","company_copyright_reg","company_copyright_works");
 
         List<String> list = SummaryArgs.SUMMARY_ARGS.keySet().stream()
                 .filter(x -> {
@@ -43,6 +46,7 @@ public class DataWorksSummaryJob {
                     return b1 && b2;
                 }).distinct().collect(Collectors.toList());
 
+        assertEquals(add.size(),list.size());
         String project = "winhc_ng";
         String flow = "syn_summary";
         String taskName = "summary_v9";
@@ -65,9 +69,20 @@ public class DataWorksSummaryJob {
     }
 
     @Test
-    public void start2() throws HoloClientException, IOException {
+    public void start2() {
         List<String> list = new ArrayList<>(SummaryArgs.SUMMARY_ARGS.keySet());
         list.forEach(System.out::println);
     }
 
+    @Test
+    public void start3() throws HoloClientException, IOException {
+        List<String> list = Arrays.asList("company_dishonest_info", "bankruptcy_open_case", "company_punishment_info",
+                "company_liquidating_info", "company_mortgage_info", "company_abnormal_info",
+                "company_judicial_assistance", "company_zxr", "company_zxr_restrict"
+                , "company_zxr_final_case", "general_taxpayer");
+        String source_table = "tmp_xf_sum_agg_v9";
+        String result_table = "test_tmp_xf_sum_result_v9";
+        calcSummaryJob.start2(list, source_table, result_table);
+    }
+
 }

+ 69 - 0
src/test/java/com/winhc/task/EsTest.java

@@ -1,9 +1,17 @@
 package com.winhc.task;
 
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import com.alibaba.fastjson.JSONPath;
 import org.junit.jupiter.api.Test;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.SpringBootTest;
+
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
 
 @SpringBootTest
 public class EsTest {
@@ -19,4 +27,65 @@ public class EsTest {
     public void start2() throws IOException {
         esIndexJobs.start2();
     }
+
+//    static List<String> l = Arrays.asList("company_env_punishment", "company_own_tax", "company_land_announcement", "company_land_publicity",
+//            "company_equity_pledge", "company_tax_contravention", "company_punishment_info_creditchina", "company_license_creditchina"
+//            , "company_land_transfer", "company_equity_pledge_holder", "company_land_mortgage");
+
+    static List<String> l = Arrays.asList("company_license_entpub","company_public_announcement");
+
+    @Test
+    public void start3() {
+        l.forEach(x -> {
+            try {
+                createNewIndex(x);
+            } catch (IOException ioException) {
+                ioException.printStackTrace();
+            }
+        });
+    }
+
+    private void createNewIndex(String tn) throws IOException {
+        String res = esIndexJobs.action("GET", "winhc_index_" + tn, null);
+        System.out.println(res);
+        ArrayList<JSONObject> e1 = (ArrayList) JSONPath.eval(JSON.parse(res), "$..mappings");
+        JSONArray s1 = (JSONArray) JSONPath.eval(JSON.parse(res), "$..settings.index.number_of_shards");
+        JSONArray s2 = (JSONArray) JSONPath.eval(JSON.parse(res), "$..settings.index.number_of_replicas");
+        JSONObject j = new JSONObject();
+        j.put("mappings", e1.get(0));
+        j.fluentPut("settings",
+                new JSONObject()
+                        .fluentPut("index",
+                                new JSONObject()
+                                        .fluentPut("number_of_shards", s1.get(0))
+                                        .fluentPut("number_of_replicas", s2.get(0))));
+        String r = JSONObject.toJSONString(j);
+        System.out.println(r);
+        String index = "winhc_index_rt_" + tn + "_v1";
+        String alias = "winhc_index_rt_" + tn;
+        String r2 = esIndexJobs.action("PUT", index, r);
+        System.out.println(r2);
+        esIndexJobs.addOrRemoveAliases(index, alias, "add");
+
+    }
+
+    @Test
+    public void start4() {
+        String s = "POST _reindex?slices=auto&refresh&wait_for_completion=false\n" +
+                "{\n" +
+                "  \"source\": {\n" +
+                "    \"index\": \"$source\"\n" +
+                "  },\n" +
+                "  \"dest\": {\n" +
+                "    \"index\": \"$dest\"\n" +
+                "  }\n" +
+                "}";
+        l.forEach(x -> {
+            String source = "winhc_index_" + x;
+            String dest = "winhc_index_rt_" + x;
+            String res = s.replaceAll("\\$source", source).replaceAll("\\$dest", dest);
+            System.out.println(res);
+        });
+
+    }
 }

+ 36 - 0
src/test/java/com/winhc/task/ExcelTest.java

@@ -0,0 +1,36 @@
+package com.winhc.task;
+
+import com.alibaba.excel.EasyExcel;
+import com.alibaba.excel.ExcelWriter;
+import com.winhc.task.bean.User;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author π
+ * @Description:
+ * @date 2022/6/30 11:40
+ */
+public class ExcelTest {
+    public static void main(String[] args) {
+
+        // 导入:
+        // 构建一些数据
+        List<User> u = new ArrayList<>();
+
+        for (int i = 0; i < 10; i++) {
+            User user = new User(i, "张三" + i, "背景" + i);
+            u.add(user);
+        }
+        File filePath = new File("D:\\data\\tmp\\uuser.xlsx");
+        // 创建一个EasyExcel对象  ,一个对象,一个头信息
+       // ExcelWriter build = EasyExcel.write(file, User.class).build();  // 需要两个参数,一个是文件,一个是数据.然后build
+        EasyExcel.write(filePath, User.class).sheet("模板").doWrite(u);
+
+//        // 写入用户组  调用写的功能
+//        build.write(u, EasyExcel.writerSheet("Sheet1").build());
+//        build.finish();  // 关闭资源
+    }
+}