瀏覽代碼

司法案件更新

xufei 3 年之前
父節點
當前提交
cee525d1f4

+ 63 - 11
src/main/java/com/winhc/task/CaseIncrementTask.java

@@ -3,14 +3,16 @@ package com.winhc.task;
 
 import cn.hutool.core.io.FileUtil;
 import com.alibaba.fastjson.JSON;
-import com.winhc.bean.Constant;
-import com.winhc.bean.MergeParam;
+import com.aliyuncs.dataworks_public.model.v20180601.CreateManualDagResponse;
+import com.aliyuncs.exceptions.ClientException;
+import com.winhc.bean.*;
 import com.winhc.config.CaseConfig;
 import com.winhc.kafka.produce.KafkaProduce;
 import com.winhc.service.TouchService;
 import com.winhc.utils.CompanyUtils;
 import com.winhc.utils.DateUtil;
 import com.winhc.utils.DingUtils;
+import com.winhc.utils.SchemaInit;
 import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.io.FileUtils;
@@ -64,19 +66,29 @@ public class CaseIncrementTask {
     CaseConfig caseConfig;
 
     //TODO 启动合并任务
-    //@Scheduled(cron = "00 00 01 * * ?")
+    @Scheduled(cron = "00 00 13 * * ?")
     //@Scheduled(cron = "*/20 * * * * ?")
-    //@Scheduled(cron = "55 13 16 02 06 ?")
-    public void mergePersonScheduled() throws UnsupportedEncodingException {
+    //@Scheduled(cron = "30 07 10 21 06 ?")
+    public void mergeCaseScheduled() throws UnsupportedEncodingException {
         long start = System.currentTimeMillis();
+        if (!CompanyUtils.isWindows()) {
+            dingUtils.send("mergeCaseScheduled start !!!");
+        }
         try {
             MergeParam param = initParams();
-            exportIncrPerson2CSV(param.getLastPartition(), driver.session(), param.getIncrPath());
+            exportIncrCase2CSV(param.getLastPartition(), driver.session(), param.getIncrPath());
             loadCSVSendKafka(param.getPathPre() + param.getIncrPath(), param.getTopic());
+            startJob(param);
         } catch (Exception e) {
-            log.error("MergePersonScheduled error | message:{} | .", e.getMessage());
+            log.error("mergeCaseScheduled error | message:{} | .", e.getMessage());
+            if (!CompanyUtils.isWindows()) {
+                dingUtils.send("mergeCaseScheduled job run error : \n" + e.getMessage() + "\n!!!!!!! ");
+            }
         }
-        log.info("MergePersonScheduled end | cost:{} | !", (System.currentTimeMillis() - start));
+        if (!CompanyUtils.isWindows()) {
+            dingUtils.send("mergeCaseScheduled end !!!");
+        }
+        log.info("mergeCaseScheduled end | cost:{} | !", (System.currentTimeMillis() - start));
     }
 
     public MergeParam initParams() {
@@ -113,8 +125,8 @@ public class CaseIncrementTask {
      * @param session
      * @param INCR_NAME_PATH
      */
-    private void exportIncrPerson2CSV(String date, Session session, String INCR_NAME_PATH) {
-        log.info("exportIncrPerson2CSV start!");
+    private void exportIncrCase2CSV(String date, Session session, String INCR_NAME_PATH) {
+        log.info("exportIncrCase2CSV start!");
         long start = System.currentTimeMillis();
         final String cql5 = " CALL apoc.export.csv.query('MATCH (p:" + CompanyUtils.getIncrPersonLabel("新增") + ")\n" +
                 "RETURN p.case_id as case_id,p.component_id as component_id',\n" +
@@ -123,7 +135,7 @@ public class CaseIncrementTask {
                 "YIELD file,rows";
         log.info("cql5 : \n {} ", cql5);
         String res5 = CompanyUtils.runNeo4j(session, cql5);
-        log.info("exportIncrPerson2CSV | cost:{} | result:{} | end !", (System.currentTimeMillis() - start), res5);
+        log.info("exportIncrCase2CSV | cost:{} | result:{} | end !", (System.currentTimeMillis() - start), res5);
     }
 
     /**
@@ -169,5 +181,45 @@ public class CaseIncrementTask {
         log.info("loadCSVSendKafka | size:{} | cost:{} | end !", i - 1, (System.currentTimeMillis() - start));
     }
 
+    public void startJob(MergeParam param) throws InterruptedException, ClientException, IOException {
+        log.info("startJob start !");
+        long start = System.currentTimeMillis();
+        DataWorksFlowJob dataWorksFlowJob = SchemaInit.getJobs().stream().filter(j -> j.getFlow().equals(param.getFlow())).findFirst().orElseThrow(NullPointerException::new);
+        DataWorksFlowTask dataWorksFlowTask = dataWorksFlowJob.getTask().stream().filter(t -> t.getTaskName().equals(param.getTaskName())).findFirst().orElseThrow(NullPointerException::new);
+        TaskParam build = TaskParam.builder()
+                .projectName(dataWorksFlowJob.getProject())
+                .bizDate(param.getBizdate())
+                .flowName(dataWorksFlowJob.getFlow())
+                .nodeParam(dataWorksFlowTask.toNodeParam(param))
+                .build();
+        CreateManualDagResponse touch = touchService.touch(build);
+        if (touch == null) {
+            return;
+        }
+        while (true) {
+            Map<String, TaskFlowEnum> query = touchService.query(dataWorksFlowJob.getProject(), touch.getReturnValue());
+            long count = query.values()
+                    .stream()
+                    .filter(e -> !(TaskFlowEnum.SUCCESS.equals(e) || TaskFlowEnum.FAILURE.equals(e)))
+                    .count();
+
+            long count1 = query.values().stream().filter(TaskFlowEnum.FAILURE::equals).count();
+            if (count1 != 0) {
+                if (!CompanyUtils.isWindows()) {
+                    log.error("startJob job run error : \n" + "案件job失败" + "\n!!!!!!! ");
+                    dingUtils.send("startJob job run error : \n" + "案件job失败" + "\n!!!!!!! ");
+                    return;
+                }
+            }
+
+            if (count != 0) {
+                Thread.sleep(2 * 60 * 1000);
+            } else {
+                log.info("startJob end | cost:{} | !", (System.currentTimeMillis() - start));
+                return;
+            }
+        }
+    }
+
 
 }

+ 2 - 2
src/main/resources/application-dev.properties

@@ -67,6 +67,6 @@ spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.S
 spring.data.mongodb.uri=mongodb://itslaw:itslaw_168@dds-uf6ff5dfd9aef3641601-pub.mongodb.rds.aliyuncs.com:3717,dds-uf6ff5dfd9aef3642555-pub.mongodb.rds.aliyuncs.com:3717/itslaw?replicaSet=mgset-6501997
 
 #案件配置
-case.task.flow=inc_case_task
-case.task.taskName=inc_case_task_kafka
+case.task.flow=judicial_case_relation_dev
+case.task.taskName=judicial_case_relation_aggs_dev
 case.task.topic=inc_case_id_dev

+ 2 - 2
src/main/resources/application-prd.properties

@@ -62,6 +62,6 @@ spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.S
 spring.data.mongodb.uri=mongodb://itslaw:itslaw_168@dds-uf6ff5dfd9aef3641.mongodb.rds.aliyuncs.com:3717,dds-uf6ff5dfd9aef3642.mongodb.rds.aliyuncs.com:3717/itslaw?replicaSet=mgset-6501997
 
 #案件配置
-case.task.flow=inc_case_task
-case.task.taskName=inc_case_task_kafka
+case.task.flow=judicial_case_relation
+case.task.taskName=judicial_case_relation_aggs
 case.task.topic=inc_case_id_prod

+ 13 - 1
src/main/resources/task.yaml

@@ -16,4 +16,16 @@ job:
           - _nodeId: 700004512886
             project: winhc_ng
           - _nodeId: 700004512938
-            project: winhc_ng
+            project: winhc_ng
+
+  #司法案件表数据合并
+  - project: winhc_ng
+    flow: judicial_case_relation
+    task:
+      - taskName: judicial_case_relation_aggs
+        param:
+          - _nodeId: 700004678408
+            project: winhc_ng
+          - _nodeId: 700004670010
+            project: winhc_ng
+