浏览代码

参数扩充

许家凯 4 年之前
父节点
当前提交
f3fc8d14b9

+ 16 - 14
src/main/java/com/winhc/dataworks/flow/touch/TestMain.java

@@ -5,7 +5,8 @@ import com.helospark.lightdi.LightDi;
 import com.helospark.lightdi.LightDiContext;
 import com.helospark.lightdi.annotation.Autowired;
 import com.helospark.lightdi.annotation.Service;
-import com.winhc.dataworks.flow.touch.bean.OdpsTableSchema;
+import com.winhc.dataworks.flow.touch.bean.DataWorksFlowJob;
+import com.winhc.dataworks.flow.touch.bean.DataWorksFlowTask;
 import com.winhc.dataworks.flow.touch.bean.TaskFlowEnum;
 import com.winhc.dataworks.flow.touch.bean.TaskParam;
 import com.winhc.dataworks.flow.touch.configuration.SchemaInit;
@@ -13,7 +14,7 @@ import com.winhc.dataworks.flow.touch.service.TouchService;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 
-import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -28,30 +29,31 @@ public class TestMain {
     TouchService touchService;
 
     public static void main(String[] args) {
-        if (args.length != 1) {
-            System.out.println("请输入调度的表名");
+        if (args.length != 3) {
+            System.out.println("请输入调度的 业务流程名 任务名 bizDate");
             System.exit(-9999);
         }
         LightDiContext context = LightDi.initContextByPackage(TestMain.class.getPackage().getName());
         TestMain bean = context.getBean(TestMain.class);
-        bean.start(args[0]);
+        bean.start(args[0], args[1], args[2]);
     }
 
     @SneakyThrows
-    public void start(String tableName) {
-        OdpsTableSchema odpsTableSchema = SchemaInit.MAP.get(tableName);
+    public void start(String flow, String taskName, String bizDate) {
+        List<DataWorksFlowJob> jobs = SchemaInit.LIST;
+        DataWorksFlowJob dataWorksFlowJob = jobs.stream().filter(j -> j.getFlow().equals(flow)).findFirst().orElseThrow(NullPointerException::new);
+
+        DataWorksFlowTask dataWorksFlowTask = dataWorksFlowJob.getTask().stream().filter(t -> t.getTaskName().equals(taskName)).findFirst().orElseThrow(NullPointerException::new);
 
-        HashMap<String, String> map = new HashMap<>();
-        map.put("700003366764", odpsTableSchema.toNodePara());
         TaskParam build = TaskParam.builder()
-                .projectName("winhc_test")
-                .bizDate("2020-06-22")
-                .flowName("增量处理流程_spark")
-                .nodeParam(map)
+                .projectName(dataWorksFlowJob.getProject())
+                .bizDate(bizDate)
+                .flowName(dataWorksFlowJob.getFlow())
+                .nodeParam(dataWorksFlowTask.toNodeParam())
                 .build();
         CreateManualDagResponse touch = touchService.touch(build);
         while (true) {
-            Map<String, TaskFlowEnum> query = touchService.query("winhc_test", touch.getReturnValue());
+            Map<String, TaskFlowEnum> query = touchService.query(dataWorksFlowJob.getProject(), touch.getReturnValue());
             long count = query.values()
                     .stream()
                     .filter(e -> !(TaskFlowEnum.SUCCESS.equals(e) || TaskFlowEnum.FAILURE.equals(e)))

+ 27 - 0
src/main/java/com/winhc/dataworks/flow/touch/bean/DataWorksFlowJob.java

@@ -0,0 +1,27 @@
+package com.winhc.dataworks.flow.touch.bean;
+
+import com.winhc.dataworks.flow.touch.utils.JsonUtils;
+import lombok.*;
+
+import java.util.List;
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/6/24 08:49
+ * @Description:
+ */
+@Getter
+@Setter
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class DataWorksFlowJob {
+    private String project;
+    private String flow;
+    private List<DataWorksFlowTask> task;
+
+    @Override
+    public String toString() {
+        return JsonUtils.jsonObjToString(this);
+    }
+}

+ 29 - 0
src/main/java/com/winhc/dataworks/flow/touch/bean/DataWorksFlowTask.java

@@ -0,0 +1,29 @@
+package com.winhc.dataworks.flow.touch.bean;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/6/24 09:01
+ * @Description:
+ */
+@Getter
+@Setter
+@NoArgsConstructor
+@AllArgsConstructor
+public class DataWorksFlowTask {
+    private String taskName;
+    private List<NodeParam> params;
+
+    public Map<String, String> toNodeParam() {
+        return params.stream()
+                .collect(Collectors.toMap(NodeParam::getNodeId, NodeParam::toNodeParam, (o1, o2) -> o1));
+    }
+}

+ 36 - 0
src/main/java/com/winhc/dataworks/flow/touch/bean/NodeParam.java

@@ -0,0 +1,36 @@
+package com.winhc.dataworks.flow.touch.bean;
+
+import com.winhc.dataworks.flow.touch.utils.JsonUtils;
+import lombok.*;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/6/24 08:50
+ * @Description:
+ */
+@Setter
+@Getter
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class NodeParam {
+    private String nodeId;
+    private Map<String, String> param;
+
+    public String toNodeParam() {
+        return param.entrySet().stream()
+                .filter(e -> !e.getKey().startsWith("_"))
+                .filter(e-> StringUtils.isNotEmpty(e.getValue()))
+                .map(e -> e.getKey() + "=" + e.getValue())
+                .collect(Collectors.joining(" "));
+    }
+
+    @Override
+    public String toString() {
+        return JsonUtils.jsonObjToString(this);
+    }
+}

+ 0 - 66
src/main/java/com/winhc/dataworks/flow/touch/bean/OdpsTableSchema.java

@@ -1,66 +0,0 @@
-package com.winhc.dataworks.flow.touch.bean;
-
-import com.winhc.dataworks.flow.touch.utils.JsonUtils;
-import lombok.AllArgsConstructor;
-import lombok.Getter;
-import lombok.NoArgsConstructor;
-import lombok.Setter;
-import org.apache.commons.lang3.StringUtils;
-
-import java.util.Arrays;
-import java.util.Objects;
-import java.util.stream.Collectors;
-
-/**
- * @Author: XuJiakai
- * @Date: 2020/6/23 14:16
- * @Description:
- */
-@Getter
-@Setter
-@AllArgsConstructor
-@NoArgsConstructor
-public class OdpsTableSchema {
-
-    /**
-     * 表前缀工程名,不用于业务流程调度的工程名
-     */
-    private String project;
-
-    /**
-     * 表名,不加任何前辍
-     */
-    private String tableName;
-
-    /**
-     * 有效列  逗号分割
-     */
-    private String columns;
-
-    /**
-     * 去重主列  逗号分割
-     */
-    private String dupliCols;
-
-    public String toNodePara() {
-        return Arrays.stream(this.getClass().getDeclaredFields()).map(f -> {
-            String name = f.getName();
-            try {
-                f.setAccessible(true);
-                String o = ((String) f.get(this));
-                if (StringUtils.isEmpty(o)) {
-                    return null;
-                }
-                return name + "=" + o;
-            } catch (Exception e) {
-                e.printStackTrace();
-            }
-            return null;
-        }).filter(Objects::nonNull).collect(Collectors.joining(" "));
-    }
-
-    @Override
-    public String toString() {
-        return JsonUtils.jsonObjToString(this);
-    }
-}

+ 35 - 8
src/main/java/com/winhc/dataworks/flow/touch/configuration/SchemaInit.java

@@ -1,7 +1,8 @@
 package com.winhc.dataworks.flow.touch.configuration;
 
-import cn.hutool.core.bean.BeanUtil;
-import com.winhc.dataworks.flow.touch.bean.OdpsTableSchema;
+import com.winhc.dataworks.flow.touch.bean.DataWorksFlowJob;
+import com.winhc.dataworks.flow.touch.bean.DataWorksFlowTask;
+import com.winhc.dataworks.flow.touch.bean.NodeParam;
 import org.yaml.snakeyaml.Yaml;
 
 import java.io.File;
@@ -16,8 +17,9 @@ import java.util.stream.Collectors;
  * @Date: 2020/6/23 14:23
  * @Description:
  */
+@SuppressWarnings("all")
 public class SchemaInit {
-    public static final Map<String, OdpsTableSchema> MAP;
+    public static final List<DataWorksFlowJob> LIST;
 
     static {
         Yaml yml = new Yaml();
@@ -28,10 +30,35 @@ public class SchemaInit {
         } catch (Exception e) {
             e.printStackTrace();
         }
-        Map map1 = yml.loadAs(reader, Map.class);
-        List<Map<String, String>> tables = (List<Map<String, String>>) map1.get("table");
-        MAP = tables.stream()
-                .map(m -> BeanUtil.mapToBean(m, OdpsTableSchema.class, false))
-                .collect(Collectors.toMap(OdpsTableSchema::getTableName, o -> o, (o1, o2) -> o1));
+        Map map = yml.loadAs(reader, Map.class);
+
+        List jobs = ((List) map.get("job"));
+        LIST = (List<DataWorksFlowJob>) jobs
+                .stream()
+                .map(m -> {
+                    String project = ((String) ((Map<String, Object>) m).get("project"));
+                    String flow = ((String) ((Map<String, Object>) m).get("flow"));
+                    List<DataWorksFlowTask> collect1 = (List<DataWorksFlowTask>) ((List) ((Map<String, Object>) m).get("task"))
+                            .stream()
+                            .map(t -> {
+                                String taskName = ((String) ((Map<String, Object>) t).get("taskName"));
+                                List<Map<String, Object>> ll = ((List<Map<String, Object>>) ((Map<String, Object>) t).get("param"));
+                                List<NodeParam> nodeParamList = ll.stream()
+                                        .map(mm -> {
+                                            String nodeId = String.valueOf(mm.remove("_nodeId"));
+                                            Map<String, String> collect = mm.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> String.valueOf(e.getValue()), (o1, o2) -> o1));
+                                            return NodeParam.builder().nodeId(nodeId).param(collect).build();
+                                        }).collect(Collectors.toList());
+                                return new DataWorksFlowTask(taskName, nodeParamList);
+                            })
+                            .collect(Collectors.toList());
+                    DataWorksFlowJob build = DataWorksFlowJob.builder()
+                            .project(project)
+                            .flow(flow)
+                            .task(collect1)
+                            .build();
+                    return build;
+                })
+                .collect(Collectors.toList());
     }
 }

+ 5 - 4
src/main/java/com/winhc/dataworks/flow/touch/test/TestFlow.java

@@ -1,8 +1,10 @@
 package com.winhc.dataworks.flow.touch.test;
 
-import com.winhc.dataworks.flow.touch.bean.OdpsTableSchema;
+import com.winhc.dataworks.flow.touch.bean.DataWorksFlowJob;
 import com.winhc.dataworks.flow.touch.configuration.SchemaInit;
 
+import java.util.List;
+
 /**
  * @Author: XuJiakai
  * @Date: 2020/6/22 14:51
@@ -10,8 +12,7 @@ import com.winhc.dataworks.flow.touch.configuration.SchemaInit;
  */
 public class TestFlow {
     public static void main(String[] args) {
-        OdpsTableSchema company_env_punishment = SchemaInit.MAP.get("company_env_punishment");
-
-        System.out.println(company_env_punishment.toNodePara());
+        List<DataWorksFlowJob> list =  SchemaInit.LIST;
+        System.out.println(list);
     }
 }

+ 19 - 9
src/main/resources/odps-schema.yaml

@@ -1,11 +1,21 @@
-table:
-  - project: winhc_eci_dev
-    tableName: company_env_punishment
-    columns: id,name,department,publish_time,punish_number,punish_basis,law_break,reason,content,execution,source_url,source_flag,create_time,update_time,deleted
-    dupliCols: new_cid,source_url
+job:
+  - project: winhc_eci
+    flow: 增量处理流程_spark
+    task:
+      - taskName: company_env_punishment
+        param:
+          - _nodeId: node-id
+            project: winhc_eci_dev
+            tableName: company_env_punishment
+            columns: id,name,department,publish_time,punish_number,punish_basis,law_break,reason,content,execution,source_url,source_flag,create_time,update_time,deleted
+            dupliCols: new_cid,source_url
+
+      - taskName: company_icp
+        param:
+          - _nodeId: 700003366764
+            project: winhc_eci_dev
+            tableName: company_icp
+            columns:
+            dupliCols: liscense,domain,new_cid
 
-  - project: winhc_eci_dev
-    tableName: company_icp
-    columns:
-    dupliCols: liscense,domain,new_cid