许家凯 5 年之前
父節點
當前提交
e56064b53f

+ 6 - 0
pom.xml

@@ -10,6 +10,12 @@
 
     <dependencies>
         <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>fastjson</artifactId>
+            <version>1.2.70</version>
+        </dependency>
+
+        <dependency>
             <groupId>org.apache.logging.log4j</groupId>
             <artifactId>log4j-slf4j-impl</artifactId>
             <version>2.6.2</version>

+ 26 - 1
src/main/java/com/winhc/dataworks/flow/touch/Main.java

@@ -1,12 +1,20 @@
 package com.winhc.dataworks.flow.touch;
 
+import com.aliyuncs.dataworks_public.model.v20180601.CreateManualDagResponse;
 import com.helospark.lightdi.LightDi;
 import com.helospark.lightdi.LightDiContext;
 import com.helospark.lightdi.annotation.Autowired;
 import com.helospark.lightdi.annotation.Service;
+import com.winhc.dataworks.flow.touch.bean.TaskParam;
 import com.winhc.dataworks.flow.touch.service.TouchService;
+import com.winhc.dataworks.flow.touch.utils.JsonUtils;
 import lombok.extern.slf4j.Slf4j;
 
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
 /**
  * @Author: XuJiakai
  * @Date: 2020/6/22 10:45
@@ -22,11 +30,28 @@ public class Main {
     public static void main(String[] args) {
         LightDiContext context = LightDi.initContextByPackage(Main.class.getPackage().getName());
         Main bean = context.getBean(Main.class);
-        bean.start();
+//        bean.start();
+        bean.query();
     }
 
     private void start() {
         log.info("start");
+        Map<String, String> map = new HashMap<>();
+        List<String> strings = Arrays.asList("test1=a1"
+                , "test2=a2"
+                , "test3=a3");
+        map.put("700003342843", String.join(" ", strings));
+
+
+        TaskParam build = TaskParam.builder().projectName("winhc_test")
+                .flowName("test_touch")
+                .bizDate("2020-06-04")
+                .nodeParam(map).build();
+        CreateManualDagResponse touch = touchService.touch(build);
+        System.out.println(JsonUtils.jsonObjToString(touch));
+    }
 
+    private void query(){
+        touchService.query("winhc_test",700134013109L);
     }
 }

+ 27 - 0
src/main/java/com/winhc/dataworks/flow/touch/bean/NodeParam.java

@@ -0,0 +1,27 @@
+package com.winhc.dataworks.flow.touch.bean;
+
+import com.winhc.dataworks.flow.touch.utils.JsonUtils;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/6/22 14:42
+ * @Description:
+ */
+@Getter
+@Setter
+@NoArgsConstructor
+@AllArgsConstructor
+public class NodeParam {
+
+    private String nodeId;
+    private String params;
+
+    @Override
+    public String toString() {
+        return JsonUtils.jsonObjToString(this);
+    }
+}

+ 28 - 0
src/main/java/com/winhc/dataworks/flow/touch/bean/TaskParam.java

@@ -0,0 +1,28 @@
+package com.winhc.dataworks.flow.touch.bean;
+
+import com.winhc.dataworks.flow.touch.utils.JsonUtils;
+import lombok.*;
+
+import java.util.Map;
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/6/22 14:45
+ * @Description:
+ */
+@Getter
+@Setter
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class TaskParam {
+    private String projectName;
+    private String flowName;
+    private String bizDate;
+    private Map<String, String> nodeParam;
+
+    @Override
+    public String toString() {
+        return JsonUtils.jsonObjToString(this);
+    }
+}

+ 43 - 0
src/main/java/com/winhc/dataworks/flow/touch/service/TouchService.java

@@ -3,10 +3,14 @@ package com.winhc.dataworks.flow.touch.service;
 import com.aliyuncs.IAcsClient;
 import com.aliyuncs.dataworks_public.model.v20180601.CreateManualDagRequest;
 import com.aliyuncs.dataworks_public.model.v20180601.CreateManualDagResponse;
+import com.aliyuncs.dataworks_public.model.v20180601.SearchManualDagNodeInstanceRequest;
+import com.aliyuncs.dataworks_public.model.v20180601.SearchManualDagNodeInstanceResponse;
 import com.aliyuncs.http.ProtocolType;
 import com.helospark.lightdi.annotation.Autowired;
 import com.helospark.lightdi.annotation.Service;
+import com.winhc.dataworks.flow.touch.bean.TaskParam;
 import com.winhc.dataworks.flow.touch.configuration.DataWorksAccessProperties;
+import com.winhc.dataworks.flow.touch.utils.JsonUtils;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 
@@ -43,4 +47,43 @@ public class TouchService {
         return response;
     }
 
+
+    @SneakyThrows
+    public CreateManualDagResponse touch(TaskParam taskParam) {
+        log.info("触发任务:{}", taskParam.toString());
+        CreateManualDagRequest request = new CreateManualDagRequest();
+        request.setProjectName(taskParam.getProjectName());
+        request.setFlowName(taskParam.getFlowName());
+        request.setBizdate(taskParam.getBizDate() + " 00:00:00");
+        request.setRegionId(dataWorksAccessProperties.getRegionId());
+        request.setProtocol(ProtocolType.HTTP);
+        request.setNodePara(JsonUtils.jsonObjToString(taskParam.getNodeParam()));
+        CreateManualDagResponse response = client.getAcsResponse(request);
+        log.info("\n任务结果:\n\trequestId:{},\n\treturnCode:{},\n\treturnErrorSolution:{},\n\treturnMessage:{},\n\treturnValue:{}",
+                response.getRequestId()
+                , response.getReturnCode()
+                , response.getReturnErrorSolution()
+                , response.getReturnMessage()
+                , response.getReturnValue());
+        return response;
+    }
+
+
+    @SneakyThrows
+    public void query(String projectName, Long dagId) {
+        SearchManualDagNodeInstanceRequest searchNodeInstanceListRequest
+                = new SearchManualDagNodeInstanceRequest();
+        searchNodeInstanceListRequest.setDagId(dagId);
+        searchNodeInstanceListRequest.setProjectName(projectName); //项目名。
+        searchNodeInstanceListRequest.setProtocol(ProtocolType.HTTP);
+        SearchManualDagNodeInstanceResponse searchResponse = client
+                .getAcsResponse(searchNodeInstanceListRequest);     //查询实例。
+        java.util.List<SearchManualDagNodeInstanceResponse.NodeInsInfo> nodeInsfos = searchResponse.getData();
+        System.out.println(searchResponse.getErrMsg()); //错误码。0代码正常;非0数值代表报错。
+        for (SearchManualDagNodeInstanceResponse.NodeInsInfo nodeInsInfo : nodeInsfos) {
+            System.out.println(nodeInsInfo.getNodeName()); //输出节点名称。
+            System.out.println(nodeInsInfo.getStatus());  //输出状态信息。
+        }
+    }
+
 }

+ 22 - 0
src/main/java/com/winhc/dataworks/flow/touch/test/TestFlow.java

@@ -0,0 +1,22 @@
+package com.winhc.dataworks.flow.touch.test;
+
+import com.winhc.dataworks.flow.touch.utils.JsonUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/6/22 14:51
+ * @Description:
+ */
+public class TestFlow {
+    public static void main(String[] args) {
+        Map<String, String> map = new HashMap<>();
+
+        map.put("a", "b");
+        map.put("b", "c");
+        System.out.println(JsonUtils.jsonObjToString(map));
+
+    }
+}

+ 19 - 0
src/main/java/com/winhc/dataworks/flow/touch/utils/JsonUtils.java

@@ -0,0 +1,19 @@
+package com.winhc.dataworks.flow.touch.utils;
+
+import com.alibaba.fastjson.JSONObject;
+import com.alibaba.fastjson.serializer.SerializerFeature;
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/6/22 14:18
+ * @Description:
+ */
+public class JsonUtils {
+    public static String jsonObjToString(Object jsonObject) {
+        return JSONObject.toJSONString(jsonObject, SerializerFeature.WriteMapNullValue);
+    }
+
+    public static String jsonObjToStringNotWriteMapNull(Object jsonObject) {
+        return JSONObject.toJSONString(jsonObject, SerializerFeature.IgnoreNonFieldGetter);
+    }
+}