浏览代码

添加触发并传参ODPS SQL手动业务流程的节点

yongnian 4 年之前
父节点
当前提交
0722df68ce

+ 14 - 0
pom.xml

@@ -60,6 +60,12 @@
             <artifactId>aliyun-java-sdk-dataworks-public</artifactId>
             <version>1.8.3</version>
         </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.dataformat</groupId>
+            <artifactId>jackson-dataformat-yaml</artifactId>
+            <version>2.9.5</version>
+        </dependency>
+
     </dependencies>
 
     <build>
@@ -88,6 +94,14 @@
                     </execution>
                 </executions>
             </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <configuration>
+                    <source>7</source>
+                    <target>7</target>
+                </configuration>
+            </plugin>
         </plugins>
     </build>
 

+ 20 - 8
src/main/java/com/winhc/dataworks/flow/touch/Main.java

@@ -1,6 +1,7 @@
 package com.winhc.dataworks.flow.touch;
 
 import com.aliyuncs.dataworks_public.model.v20180601.CreateManualDagResponse;
+import com.google.gson.JsonObject;
 import com.helospark.lightdi.LightDi;
 import com.helospark.lightdi.LightDiContext;
 import com.helospark.lightdi.annotation.Autowired;
@@ -8,12 +9,11 @@ import com.helospark.lightdi.annotation.Service;
 import com.winhc.dataworks.flow.touch.bean.TaskParam;
 import com.winhc.dataworks.flow.touch.service.TouchService;
 import com.winhc.dataworks.flow.touch.utils.JsonUtils;
+import com.winhc.dataworks.flow.touch.utils.YmlUtil;
 import lombok.extern.slf4j.Slf4j;
 
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.io.File;
+import java.util.*;
 
 /**
  * @Author: XuJiakai
@@ -30,14 +30,26 @@ public class Main {
     public static void main(String[] args) {
         LightDiContext context = LightDi.initContextByPackage(Main.class.getPackage().getName());
         Main bean = context.getBean(Main.class);
-//        bean.start();
-        bean.query();
+        bean.start();
+//        bean.query();
     }
 
     private void start() {
         log.info("start");
         Map<String, String> map = new HashMap<>();
-        List<String> strings = Arrays.asList("test1=a1"
+        Map<String,Object> mapTable=YmlUtil.getResMap("table");
+        for(Map.Entry<String, Object> entry : mapTable.entrySet()){
+            String tableName = entry.getKey();
+            Object mapValue = entry.getValue();
+            JsonObject dagPara = new JsonObject();
+            dagPara.addProperty("PROJECT","winhc_eci");//业务流程参数。
+            JsonObject nodePara = new JsonObject();
+            nodePara.addProperty("700003366062",mapValue.toString().replaceAll(":","="));
+            CreateManualDagResponse touch = touchService.triggerWithParam("winhc_eci","IncDataFlow","2020-06-05",dagPara.toString(),nodePara.toString());
+            System.out.println(JsonUtils.jsonObjToString(touch));
+        }
+
+        /*List<String> strings = Arrays.asList("test1=a1"
                 , "test2=a2"
                 , "test3=a3");
         map.put("700003342843", String.join(" ", strings));
@@ -48,7 +60,7 @@ public class Main {
                 .bizDate("2020-06-04")
                 .nodeParam(map).build();
         CreateManualDagResponse touch = touchService.touch(build);
-        System.out.println(JsonUtils.jsonObjToString(touch));
+        System.out.println(JsonUtils.jsonObjToString(touch));*/
     }
 
     private void query(){

+ 22 - 0
src/main/java/com/winhc/dataworks/flow/touch/service/TouchService.java

@@ -6,6 +6,7 @@ import com.aliyuncs.dataworks_public.model.v20180601.CreateManualDagResponse;
 import com.aliyuncs.dataworks_public.model.v20180601.SearchManualDagNodeInstanceRequest;
 import com.aliyuncs.dataworks_public.model.v20180601.SearchManualDagNodeInstanceResponse;
 import com.aliyuncs.http.ProtocolType;
+import com.google.gson.JsonObject;
 import com.helospark.lightdi.annotation.Autowired;
 import com.helospark.lightdi.annotation.Service;
 import com.winhc.dataworks.flow.touch.bean.TaskParam;
@@ -47,6 +48,27 @@ public class TouchService {
         return response;
     }
 
+    @SneakyThrows
+    public CreateManualDagResponse triggerWithParam(String projectName
+            , String flowName, String bizDate,String dagPara,String nodePara) {
+        log.info("触发任务:{}.{} {}", projectName, flowName, bizDate);
+        CreateManualDagRequest request = new CreateManualDagRequest();
+        request.setProjectName(projectName);
+        request.setFlowName(flowName);
+        request.setBizdate(bizDate + " 00:00:00");
+        request.setRegionId(dataWorksAccessProperties.getRegionId());
+        request.setProtocol(ProtocolType.HTTP);
+        request.setDagPara(dagPara);
+        request.setNodePara(nodePara);
+        CreateManualDagResponse response = client.getAcsResponse(request);
+        log.info("\n任务结果:\n\trequestId:{},\n\treturnCode:{},\n\treturnErrorSolution:{},\n\treturnMessage:{},\n\treturnValue:{}",
+                response.getRequestId()
+                , response.getReturnCode()
+                , response.getReturnErrorSolution()
+                , response.getReturnMessage()
+                , response.getReturnValue());
+        return response;
+    }
 
     @SneakyThrows
     public CreateManualDagResponse touch(TaskParam taskParam) {

+ 67 - 0
src/main/java/com/winhc/dataworks/flow/touch/utils/YmlUtil.java

@@ -0,0 +1,67 @@
+package com.winhc.dataworks.flow.touch.utils;
+
+/**
+ * @Author yyn
+ * @Date 2020/6/23
+ * @Description TODO
+ */
+import java.io.File;
+import java.io.FileReader;
+import java.io.Reader;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.yaml.snakeyaml.Yaml;
+
+public class YmlUtil {
+    private static String schemaFileName="schema-file.yml";
+    /**
+     * 获取yml文件中的指定字段,返回一个map
+     *
+     * @param sourcename
+     * @return
+     */
+    public static Map<String, Object> getResMap(String sourcename) {
+        return YmlInit.getMapByName(YmlInit.ymlMap, sourcename);
+    }
+    // 配置文件仅需要读取一次,读取配置文件的同时把数据保存到map中,map定义为final,仅可以被赋值一次
+    private static class YmlInit {
+        //初始化文件得到的map
+        private static final Map<String, Object> ymlMap = getYml();
+
+        // 读取配置文件,并初始化ymlMap
+        private static Map<String, Object> getYml() {
+            Yaml yml = new Yaml();
+            String path = Object.class.getResource("/").getPath().substring(1) + schemaFileName;
+            Reader reader = null;
+            try {
+                reader = new FileReader(new File(path));
+            } catch (Exception e) {
+                // TODO: handle exception
+                e.printStackTrace();
+            }
+            return yml.loadAs(reader, Map.class);
+        }
+
+        // //传入想要得到的字段
+        private static Map<String, Object> getMapByName(Map<String, Object> map, String name) {
+            Map<String, Object> maps = new HashMap<String, Object>();
+            Set<Map.Entry<String, Object>> set = map.entrySet();
+            for (Map.Entry<String, Object> entry : set) {// 遍历map
+                Object obj = entry.getValue();
+                if (entry.getKey().equals(name))      // 递归结束条件
+                    return (Map<String, Object>) obj;
+
+                if (entry.getValue() instanceof Map) {//如果value是Map集合递归
+                    maps = getMapByName((Map<String, Object>) obj, name);
+                    if (maps == null)				   //递归的结果如果为空,继续遍历
+                        continue;
+                    return maps;					  //不为空返回
+                }
+            }
+            return null;
+        }
+    }
+
+}

+ 7 - 0
src/main/resources/schema-file.yml

@@ -0,0 +1,7 @@
+table:
+  company_env_punishment:
+    PROJECT:winhc_eci_dev
+    DIM_TABLE:company_env_punishment
+    DIM_COLUMS:id,name,department,publish_time,punish_number,punish_basis,law_break,reason,content,execution,source_url,source_flag,create_time,update_time,deleted
+    DUPLI_COLS:new_cid,source_url
+    DS:20200605