Browse Source

调度company_icp

许家凯 4 years ago
parent
commit
4cb46b91d8

+ 68 - 0
src/main/java/com/winhc/dataworks/flow/touch/TestMain.java

@@ -0,0 +1,68 @@
+package com.winhc.dataworks.flow.touch;
+
+import com.aliyuncs.dataworks_public.model.v20180601.CreateManualDagResponse;
+import com.helospark.lightdi.LightDi;
+import com.helospark.lightdi.LightDiContext;
+import com.helospark.lightdi.annotation.Autowired;
+import com.helospark.lightdi.annotation.Service;
+import com.winhc.dataworks.flow.touch.bean.OdpsTableSchema;
+import com.winhc.dataworks.flow.touch.bean.TaskFlowEnum;
+import com.winhc.dataworks.flow.touch.bean.TaskParam;
+import com.winhc.dataworks.flow.touch.configuration.SchemaInit;
+import com.winhc.dataworks.flow.touch.service.TouchService;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/6/23 19:27
+ * @Description:
+ */
+@Slf4j
+@Service
+public class TestMain {
+    @Autowired
+    TouchService touchService;
+
+    public static void main(String[] args) {
+        if (args.length != 1) {
+            System.out.println("请输入调度的表名");
+            System.exit(-9999);
+        }
+        LightDiContext context = LightDi.initContextByPackage(TestMain.class.getPackage().getName());
+        TestMain bean = context.getBean(TestMain.class);
+        bean.start(args[0]);
+    }
+
+    @SneakyThrows
+    public void start(String tableName) {
+        OdpsTableSchema odpsTableSchema = SchemaInit.MAP.get(tableName);
+
+        HashMap<String, String> map = new HashMap<>();
+        map.put("700003366764", odpsTableSchema.toNodePara());
+        TaskParam build = TaskParam.builder()
+                .projectName("winhc_test")
+                .bizDate("2020-06-22")
+                .flowName("增量处理流程_spark")
+                .nodeParam(map)
+                .build();
+        CreateManualDagResponse touch = touchService.touch(build);
+        while (true) {
+            Map<String, TaskFlowEnum> query = touchService.query("winhc_test", touch.getReturnValue());
+            long count = query.values()
+                    .stream()
+                    .filter(e -> !(TaskFlowEnum.SUCCESS.equals(e) || TaskFlowEnum.FAILURE.equals(e)))
+                    .count();
+
+            if (count != 0) {
+                Thread.sleep(10000);
+            } else {
+                log.info("end");
+                return;
+            }
+        }
+    }
+}

+ 7 - 0
src/main/java/com/winhc/dataworks/flow/touch/bean/OdpsTableSchema.java

@@ -5,6 +5,7 @@ import lombok.AllArgsConstructor;
 import lombok.Getter;
 import lombok.NoArgsConstructor;
 import lombok.Setter;
+import org.apache.commons.lang3.StringUtils;
 
 import java.util.Arrays;
 import java.util.Objects;
@@ -25,14 +26,17 @@ public class OdpsTableSchema {
      * 表前缀工程名,不用于业务流程调度的工程名
      */
     private String project;
+
     /**
      * 表名,不加任何前辍
      */
     private String tableName;
+
     /**
      * 有效列  逗号分割
      */
     private String columns;
+
     /**
      * 去重主列  逗号分割
      */
@@ -44,6 +48,9 @@ public class OdpsTableSchema {
             try {
                 f.setAccessible(true);
                 String o = ((String) f.get(this));
+                if (StringUtils.isEmpty(o)) {
+                    return null;
+                }
                 return name + "=" + o;
             } catch (Exception e) {
                 e.printStackTrace();

+ 7 - 9
src/main/java/com/winhc/dataworks/flow/touch/bean/TaskFlowEnum.java

@@ -2,6 +2,8 @@ package com.winhc.dataworks.flow.touch.bean;
 
 import lombok.Getter;
 
+import java.util.Arrays;
+
 /**
  * @Author: XuJiakai
  * @Date: 2020/6/23 10:58
@@ -19,7 +21,7 @@ public enum TaskFlowEnum {
     CHECKING(7, "校检中"),
 
     ;
-    private final int code;
+    private final Integer code;
     private final String msg;
 
     TaskFlowEnum(int code, String msg) {
@@ -28,14 +30,10 @@ public enum TaskFlowEnum {
     }
 
     public static TaskFlowEnum getTaskFlowEnumByCode(Integer code) {
-        if (code != null) {
-            for (TaskFlowEnum value : TaskFlowEnum.values()) {
-                if (code == value.getCode()) {
-                    return value;
-                }
-            }
-        }
-        return null;
+        return Arrays.stream(TaskFlowEnum.values())
+                .filter(taskFlowEnum -> taskFlowEnum.getCode().equals(code))
+                .findFirst()
+                .orElse(null);
     }
 
     @Override

+ 9 - 4
src/main/java/com/winhc/dataworks/flow/touch/service/TouchService.java

@@ -15,6 +15,9 @@ import com.winhc.dataworks.flow.touch.utils.JsonUtils;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 
+import java.util.Map;
+import java.util.stream.Collectors;
+
 /**
  * @Author: XuJiakai
  * @Date: 2020/6/22 11:28
@@ -50,7 +53,7 @@ public class TouchService {
 
     @SneakyThrows
     public CreateManualDagResponse triggerWithParam(String projectName
-            , String flowName, String bizDate,String dagPara,String nodePara) {
+            , String flowName, String bizDate, String dagPara, String nodePara) {
         log.info("触发任务:{}.{} {}", projectName, flowName, bizDate);
         CreateManualDagRequest request = new CreateManualDagRequest();
         request.setProjectName(projectName);
@@ -92,18 +95,20 @@ public class TouchService {
 
 
     @SneakyThrows
-    public void query(String projectName, Long dagId) {
+    public Map<String, TaskFlowEnum> query(String projectName, Long dagId) {
         SearchManualDagNodeInstanceRequest searchNodeInstanceListRequest
                 = new SearchManualDagNodeInstanceRequest();
         searchNodeInstanceListRequest.setDagId(dagId);
-        searchNodeInstanceListRequest.setProjectName(projectName); //项目名。
+        searchNodeInstanceListRequest.setProjectName(projectName);
         searchNodeInstanceListRequest.setProtocol(ProtocolType.HTTP);
         SearchManualDagNodeInstanceResponse searchResponse = client
-                .getAcsResponse(searchNodeInstanceListRequest);     //查询实例。
+                .getAcsResponse(searchNodeInstanceListRequest);
         java.util.List<SearchManualDagNodeInstanceResponse.NodeInsInfo> nodeInsfos = searchResponse.getData();
         for (SearchManualDagNodeInstanceResponse.NodeInsInfo nodeInsInfo : nodeInsfos) {
             log.info("{}:{} {}", nodeInsInfo.getNodeName(), nodeInsInfo.getStatus(), TaskFlowEnum.getTaskFlowEnumByCode(nodeInsInfo.getStatus()));
         }
+        return nodeInsfos.stream()
+                .collect(Collectors.toMap(SearchManualDagNodeInstanceResponse.NodeInsInfo::getNodeName, node -> TaskFlowEnum.getTaskFlowEnumByCode(node.getStatus()), (o1, o2) -> o1));
     }
 
 }

+ 5 - 0
src/main/resources/odps-schema.yaml

@@ -4,3 +4,8 @@ table:
     columns: id,name,department,publish_time,punish_number,punish_basis,law_break,reason,content,execution,source_url,source_flag,create_time,update_time,deleted
     dupliCols: new_cid,source_url
 
+  - project: winhc_eci_dev
+    tableName: company_icp
+    columns:
+    dupliCols: liscense,domain,new_cid
+