Procházet zdrojové kódy

fix: 跨业务时间下的并发提交job问题

- 添加批量补数据入口
许家凯 před 4 roky
rodič
revize
ce877d6d9d

+ 55 - 0
src/main/java/com/winhc/dataworks/flow/touch/BulkSupplementaryDataMain.java

@@ -0,0 +1,55 @@
+package com.winhc.dataworks.flow.touch;
+
+import com.helospark.lightdi.LightDi;
+import com.helospark.lightdi.LightDiContext;
+import com.winhc.dataworks.flow.touch.bean.DataWorksFlowJob;
+import com.winhc.dataworks.flow.touch.configuration.SchemaInit;
+import com.winhc.dataworks.flow.touch.utils.DateUtils;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/9/15 17:46
+ * @Description: 批量补数据
+ */
+@Slf4j
+public class BulkSupplementaryDataMain {
+    private static final String file = "D:\\code\\IdeaProjects\\DataWorks-flow-touch\\jobs\\task-step03.yaml";
+
+    @SneakyThrows
+    public static void main(String[] args) {
+        String flow = "company_change_dynamic";
+        String taskName = "company_change_dynamic";
+
+        LightDiContext context = LightDi.initContextByPackage(SingleJobMain.class.getPackage().getName());
+        SingleJobMain bean = context.getBean(SingleJobMain.class);
+        List<DataWorksFlowJob> jobs = SchemaInit.getJobs(file);
+        List<String> bizDates = new ArrayList<>();
+        String bizDate = DateUtils.getMinusDay(29);
+        System.out.println(bizDate);
+        bean.start(flow, taskName, bizDate, jobs);
+//        for (int i = 1; i <= 18; i++) {
+//            String bizDate = DateUtils.getMinusDay(i);
+//            bizDates.add(bizDate);
+//            System.out.println(bizDate);
+////            bean.start(flow, taskName, bizDate, jobs);
+//        }
+       /* final CountDownLatch count = new CountDownLatch(bizDates.size());
+
+        bizDates.forEach(bizDate -> {
+            Thread thread = new Thread(() -> {
+                bean.start(flow, taskName, bizDate, jobs);
+                log.info(bizDate);
+                count.countDown();
+            });
+            thread.setName("Thread-" + bizDate + ": ");
+            thread.start();
+        });
+        count.await();*/
+        System.out.println("end");
+    }
+}

+ 7 - 2
src/main/java/com/winhc/dataworks/flow/touch/SingleJobMain.java

@@ -27,15 +27,17 @@ import java.util.Map;
 public class SingleJobMain {
     @Autowired
     TouchService touchService;
+    private static final String file = "D:\\code\\IdeaProjects\\DataWorks-flow-touch\\jobs\\task-step02.yaml";
 
+    @SneakyThrows
     public static void main(String[] args) {
         if (args.length != 3) {
-            System.out.println("请输入调度的 业务流程名 任务名 bizDate");
+            System.out.println("请输入调度的业务流程名 任务名 bizDate");
             System.exit(-9999);
         }
         LightDiContext context = LightDi.initContextByPackage(SingleJobMain.class.getPackage().getName());
         SingleJobMain bean = context.getBean(SingleJobMain.class);
-        List<DataWorksFlowJob> jobs = SchemaInit.getJobs();
+        List<DataWorksFlowJob> jobs = SchemaInit.getJobs(file);
         bean.start(args[0], args[1], args[2], jobs);
     }
 
@@ -52,6 +54,9 @@ public class SingleJobMain {
                 .nodeParam(dataWorksFlowTask.toNodeParam(bizDate))
                 .build();
         CreateManualDagResponse touch = touchService.touch(build);
+        if(touch == null){
+            return;
+        }
         while (true) {
             Map<String, TaskFlowEnum> query = touchService.query(dataWorksFlowJob.getProject(), touch.getReturnValue());
             long count = query.values()

+ 5 - 3
src/main/java/com/winhc/dataworks/flow/touch/bean/NodeParam.java

@@ -4,6 +4,7 @@ import com.winhc.dataworks.flow.touch.utils.JsonUtils;
 import lombok.*;
 import org.apache.commons.lang3.StringUtils;
 
+import java.util.HashMap;
 import java.util.Map;
 import java.util.stream.Collectors;
 
@@ -22,10 +23,11 @@ public class NodeParam {
     private Map<String, String> param;
 
     public String toNodeParam(String bizDate) {
-        param.put("bizdate", bizDate.replace("-", ""));
-        return param.entrySet().stream()
+        HashMap<String, String> map = new HashMap<>(param);
+        map.put("bizdate", bizDate.replace("-", ""));
+        return map.entrySet().stream()
                 .filter(e -> !e.getKey().startsWith("_"))
-                .filter(e-> StringUtils.isNotEmpty(e.getValue()))
+                .filter(e -> StringUtils.isNotEmpty(e.getValue()))
                 .map(e -> e.getKey() + "=" + e.getValue())
                 .collect(Collectors.joining(" "));
     }