فهرست منبع

添加多任务并行调度

许家凯 4 سال پیش
والد
کامیت
cf736c17d4

+ 95 - 32
src/main/java/com/winhc/dataworks/flow/touch/Main.java

@@ -1,18 +1,19 @@
 package com.winhc.dataworks.flow.touch;
 
 import com.aliyuncs.dataworks_public.model.v20180601.CreateManualDagResponse;
-import com.google.gson.JsonObject;
 import com.helospark.lightdi.LightDi;
 import com.helospark.lightdi.LightDiContext;
 import com.helospark.lightdi.annotation.Autowired;
 import com.helospark.lightdi.annotation.Service;
+import com.winhc.dataworks.flow.touch.bean.*;
+import com.winhc.dataworks.flow.touch.configuration.SchemaInit;
 import com.winhc.dataworks.flow.touch.service.TouchService;
-import com.winhc.dataworks.flow.touch.utils.JsonUtils;
-import com.winhc.dataworks.flow.touch.utils.YmlUtil;
+import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 
-import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 /**
  * @Author: XuJiakai
@@ -27,42 +28,104 @@ public class Main {
 
 
     public static void main(String[] args) {
+        if (args.length != 1) {
+            System.out.println("请输入调度的 bizDate");
+            System.exit(-9999);
+        }
         LightDiContext context = LightDi.initContextByPackage(Main.class.getPackage().getName());
         Main bean = context.getBean(Main.class);
-        bean.start();
-//        bean.query();
+        bean.start(args[0]);
     }
 
-    private void start() {
+    @SneakyThrows
+    private void start(String bizDate) {
         log.info("start");
-        Map<String, String> map = new HashMap<>();
-        Map<String,Object> mapTable=YmlUtil.getResMap("table");
-        for(Map.Entry<String, Object> entry : mapTable.entrySet()){
-            String tableName = entry.getKey();
-            Object mapValue = entry.getValue();
-            JsonObject dagPara = new JsonObject();
-            dagPara.addProperty("PROJECT","winhc_eci");//业务流程参数。
-            JsonObject nodePara = new JsonObject();
-            nodePara.addProperty("700003366062",mapValue.toString().replaceAll(":","="));
-            CreateManualDagResponse touch = touchService.triggerWithParam("winhc_eci","IncDataFlow","2020-06-05",dagPara.toString(),nodePara.toString());
-            System.out.println(JsonUtils.jsonObjToString(touch));
-        }
+        List<DataWorksFlowJob> jobs = SchemaInit.LIST;
+        List<TaskInfo> collect = jobs.stream()
+                .flatMap(dataWorksFlowJob -> {
+                    List<DataWorksFlowTask> task = dataWorksFlowJob.getTask();
+                    return task
+                            .stream()
+                            .map(dataWorksFlowTask -> {
+                                TaskParam build = TaskParam.builder()
+                                        .projectName(dataWorksFlowJob.getProject())
+                                        .bizDate(bizDate)
+                                        .flowName(dataWorksFlowJob.getFlow())
+                                        .nodeParam(dataWorksFlowTask.toNodeParam())
+                                        .build();
+                                CreateManualDagResponse r = touchService.touch(build);
 
-        /*List<String> strings = Arrays.asList("test1=a1"
-                , "test2=a2"
-                , "test3=a3");
-        map.put("700003342843", String.join(" ", strings));
+                                return TaskInfo.builder().project(dataWorksFlowJob.getProject())
+                                        .flow(dataWorksFlowJob.getFlow())
+                                        .taskName(dataWorksFlowTask.getTaskName())
+                                        .dagId(r.getReturnValue()).build();
+                            });
+                }).collect(Collectors.toList());
 
+        int totalTask = collect.size();
 
-        TaskParam build = TaskParam.builder().projectName("winhc_test")
-                .flowName("增量处理流程_spark")
-                .bizDate("2020-06-20")
-                .nodeParam(map).build();
-        CreateManualDagResponse touch = touchService.touch(build);
-        System.out.println(JsonUtils.jsonObjToString(touch));*/
-    }
+        while (true) {
+            int awaitTask = 0;
+            int successTask = 0;
+            int failedTask = 0;
+
+            int awaitNode = 0;
+            int successNode = 0;
+            int failedNode = 0;
+
+            boolean flag = true;
 
-    private void query() {
-        touchService.query("winhc_test", 700134329791L);
+            for (TaskInfo taskInfo : collect) {
+                Map<String, TaskFlowEnum> query = touchService.query(taskInfo.getProject(), taskInfo.getDagId());
+                Map<String, List<String>> status = query.entrySet().stream()
+                        .map(e -> {
+                            Entry<String, String> entry = new Entry<>();
+                            if (e.getValue().equals(TaskFlowEnum.SUCCESS) || e.getValue().equals(TaskFlowEnum.FAILURE)) {
+                                entry.setKey(e.getValue().getMsg());
+                            } else {
+                                entry.setKey("运行中");
+                            }
+                            return entry;
+                        }).collect(Collectors.groupingBy(Entry::getKey, Collectors.mapping(Entry::getValue, Collectors.toList())));
+                List<String> success = status.get(TaskFlowEnum.SUCCESS.getMsg());
+                List<String> failure = status.get(TaskFlowEnum.FAILURE.getMsg());
+                List<String> await = status.get("运行中");
+
+                if (await.size() != 0) {
+                    awaitTask++;
+                    awaitNode += await.size();
+                } else {
+                    if (failure.size() != 0) {
+                        failedTask++;
+                        failedNode += failure.size();
+                        log.error("failure node:{}  {}", failedTask, failedNode);
+                    } else {
+                        successNode += success.size();
+                        successTask++;
+                    }
+                }
+                flag = flag && await.size() == 0;
+            }
+            log.info("\nawait task:{}\ntotal task:{}\nsuccess task:{}\nfailure task:{}\n"
+                    , awaitTask
+                    , totalTask
+                    , successTask
+                    , failedTask
+            );
+
+            log.info("\nawait node:{}\ntotal node:{}\nsuccess node:{}\nfailure node:{}\n"
+                    , awaitNode
+                    , awaitNode + successNode + failedNode
+                    , successNode
+                    , failedNode
+            );
+
+            Thread.sleep(10000);
+
+            if (flag) {
+                break;
+            }
+        }
+        log.info("end");
     }
 }

+ 50 - 0
src/main/java/com/winhc/dataworks/flow/touch/bean/Entry.java

@@ -0,0 +1,50 @@
+package com.winhc.dataworks.flow.touch.bean;
+
+import com.winhc.dataworks.flow.touch.utils.JsonUtils;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/6/24 14:05
+ * @Description:
+ */
+@Getter
+@Setter
+@AllArgsConstructor
+@NoArgsConstructor
+public class Entry<K, V> {
+    private K key;
+    private V value;
+
+    @Override
+    public String toString() {
+        return JsonUtils.jsonObjToString(this);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+
+        if (o == null || getClass() != o.getClass()) return false;
+
+        Entry<?, ?> entry = (Entry<?, ?>) o;
+
+        return new EqualsBuilder()
+                .append(key, entry.key)
+                .append(value, entry.value)
+                .isEquals();
+    }
+
+    @Override
+    public int hashCode() {
+        return new HashCodeBuilder(17, 37)
+                .append(key)
+                .append(value)
+                .toHashCode();
+    }
+}

+ 24 - 0
src/main/java/com/winhc/dataworks/flow/touch/bean/TaskInfo.java

@@ -0,0 +1,24 @@
+package com.winhc.dataworks.flow.touch.bean;
+
+import lombok.*;
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/6/24 14:53
+ * @Description:
+ */
+@Getter
+@Setter
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class TaskInfo {
+    private String project;
+    private String flow;
+    private String taskName;
+    private Long dagId;
+
+    public String getKey() {
+        return project + ":" + flow + ":" + taskName;
+    }
+}