Prechádzať zdrojové kódy

Merge remote-tracking branch 'origin/master'

yandawei 4 rokov pred
rodič
commit
02bee576fc

+ 24 - 0
jobs/task-step01.yaml

@@ -9,3 +9,27 @@ job:
           - _nodeId: 700003471556
             PROJECT: winhc_eci_dev
   #------>
+
+  - project: winhc_eci
+    flow: company_judicial_sale
+    task:
+
+      #run-1 司法拍卖前置任务
+      - taskName: company_judicial_sale
+        param:
+          - _nodeId: 700003485951
+            project: winhc_eci_dev
+            tableName: company_judicial_sale
+
+
+
+  - project: winhc_eci
+    flow: wenshu_detail_combine
+    task:
+
+      #run-1 文书前置任务
+      - taskName: wenshu_detail_combine
+        param:
+          - _nodeId: 700003495086
+            project: winhc_eci_dev
+            tableName: wenshu_detail_combine

+ 184 - 0
jobs/task-step02.yaml

@@ -469,4 +469,188 @@ job:
             cidField: new_cid
             dupliCols: new_cid,reg_num
 
+  - project: winhc_test
+    flow: incr_calc_intellectual
+    task:
+
+      #裁判文书
+      - taskName: wenshu_detail_combine
+        param:
+          - _nodeId: 700003375026
+            project: winhc_eci_dev
+            tableName: wenshu_detail_combine
+            dupliCols: new_cid,case_no,cname
+            flag: cid
+          - _nodeId: 700003380225
+            project: winhc_eci_dev
+            tableName: wenshu_detail_combine
+            cidField: new_cid
+            dupliCols: new_cid,case_no,cname
+
+      #行政许可
+      - taskName: company_license
+        param:
+          - _nodeId: 700003375026
+            project: winhc_eci_dev
+            tableName: company_license
+            dupliCols: new_cid,license_number
+            flag: cid
+          - _nodeId: 700003380225
+            project: winhc_eci_dev
+            tableName: company_license
+            cidField: new_cid
+            dupliCols: new_cid,license_number
+
+      #行政许可-企业公示
+      - taskName: company_license_entpub
+        param:
+          - _nodeId: 700003375026
+            project: winhc_eci_dev
+            tableName: company_license_entpub
+            dupliCols: new_cid,license_number
+            flag: cid
+          - _nodeId: 700003380225
+            project: winhc_eci_dev
+            tableName: company_license_entpub
+            cidField: new_cid
+            dupliCols: new_cid,license_number
+
+      #行政许可-信用中国
+      - taskName: company_license_creditchina
+        param:
+          - _nodeId: 700003375026
+            project: winhc_eci_dev
+            tableName: company_license_creditchina
+            dupliCols: new_cid,licence_number
+            flag: cid
+          - _nodeId: 700003380225
+            project: winhc_eci_dev
+            tableName: company_license_creditchina
+            cidField: new_cid
+            dupliCols: new_cid,licence_number
+
+      #终本案件
+      - taskName: company_zxr_final_case
+        param:
+          - _nodeId: 700003375026
+            project: winhc_eci_dev
+            tableName: company_zxr_final_case
+            dupliCols: new_cid,case_no
+            flag: cid
+          - _nodeId: 700003380225
+            project: winhc_eci_dev
+            tableName: company_zxr_final_case
+            cidField: new_cid
+            dupliCols: new_cid,case_no
+
+      #司法拍卖
+      - taskName: company_judicial_sale_combine
+        param:
+          - _nodeId: 700003375026
+            project: winhc_eci_dev
+            tableName: company_judicial_sale_combine
+            dupliCols: new_cid,main_id
+            flag: cids
+          - _nodeId: 700003380225
+            project: winhc_eci_dev
+            tableName: company_judicial_sale_combine_list
+            cidField: new_cid
+            dupliCols: new_cid,main_id
+
+      #双随机抽查-结果公示
+      - taskName: company_double_random_check_result_info
+        param:
+          - _nodeId: 700003375026
+            project: winhc_eci_dev
+            tableName: company_double_random_check_result_info
+            dupliCols: new_cid,main_id,check_item
+            flag: cid
+          - _nodeId: 700003380225
+            project: winhc_eci_dev
+            tableName: company_double_random_check_result_info
+            cidField: new_cid
+            dupliCols: new_cid,main_id,check_item
+
+      #双随机抽查
+      - taskName: company_double_random_check_info
+        param:
+          - _nodeId: 700003375026
+            project: winhc_eci_dev
+            tableName: company_double_random_check_info
+            dupliCols: new_cid,check_task_num
+            flag: cid
+          - _nodeId: 700003380225
+            project: winhc_eci_dev
+            tableName: company_double_random_check_info
+            cidField: new_cid
+            dupliCols: new_cid,check_task_num
+
+      #抽查检查
+      - taskName: company_check_info
+        param:
+          - _nodeId: 700003375026
+            project: winhc_eci_dev
+            tableName: company_check_info
+            dupliCols: new_cid,check_org,check_date
+            flag: cid
+          - _nodeId: 700003380225
+            project: winhc_eci_dev
+            tableName: company_check_info
+            cidField: new_cid
+            dupliCols: new_cid,check_org,check_date
+
+      #税收违法
+      - taskName: company_tax_contravention
+        param:
+          - _nodeId: 700003375026
+            project: winhc_eci_dev
+            tableName: company_tax_contravention
+            dupliCols: new_cid,taxpayer_number,case_info
+            flag: cid
+          - _nodeId: 700003380225
+            project: winhc_eci_dev
+            tableName: company_tax_contravention
+            cidField: new_cid
+            dupliCols: new_cid,taxpayer_number,case_info
+
+      #法院公告
+      - taskName: company_court_announcement
+        param:
+          - _nodeId: 700003375026
+            project: winhc_eci_dev
+            tableName: company_court_announcement
+            dupliCols: new_cid,plaintiff,litigant,publish_date,case_no
+            flag: cids
+          - _nodeId: 700003380225
+            project: winhc_eci_dev
+            tableName: company_court_announcement_list
+            cidField: new_cid
+            dupliCols: new_cid,plaintiff,litigant,publish_date,case_no
 
+      #立案信息
+      - taskName: company_court_register
+        param:
+          - _nodeId: 700003375026
+            project: winhc_eci_dev
+            tableName: company_court_register
+            dupliCols: new_cid,case_no,plaintiff,defendant
+            flag: cids
+          - _nodeId: 700003380225
+            project: winhc_eci_dev
+            tableName: company_court_register_list
+            cidField: new_cid
+            dupliCols: new_cid,case_no,plaintiff,defendant
+
+      #开庭公告
+      - taskName: company_court_open_announcement
+        param:
+          - _nodeId: 700003375026
+            project: winhc_eci_dev
+            tableName: company_court_open_announcement
+            dupliCols: new_cid,case_no,plaintiff,defendant
+            flag: cids
+          - _nodeId: 700003380225
+            project: winhc_eci_dev
+            tableName: company_court_open_announcement_list
+            cidField: new_cid
+            dupliCols: new_cid,case_no,plaintiff,defendant

+ 64 - 7
src/main/java/com/winhc/dataworks/flow/touch/Main.java

@@ -8,11 +8,13 @@ import com.helospark.lightdi.LightDiContext;
 import com.helospark.lightdi.annotation.Autowired;
 import com.helospark.lightdi.annotation.Service;
 import com.winhc.dataworks.flow.touch.bean.*;
+import com.winhc.dataworks.flow.touch.configuration.DataWorksAccessProperties;
 import com.winhc.dataworks.flow.touch.configuration.SchemaInit;
 import com.winhc.dataworks.flow.touch.service.OdpsService;
 import com.winhc.dataworks.flow.touch.service.TouchService;
 import com.winhc.dataworks.flow.touch.utils.DateUtils;
 import com.winhc.dataworks.flow.touch.utils.DingUtils;
+import com.winhc.dataworks.flow.touch.utils.SparkDaemonThread;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.cli.*;
@@ -33,6 +35,10 @@ import java.util.stream.Collectors;
 public class Main {
     @Autowired
     private TouchService touchService;
+
+    @Autowired
+    private DataWorksAccessProperties dataWorksAccessProperties;
+
     private static final Options options = new Options();
 
 
@@ -51,6 +57,8 @@ public class Main {
 
         options.addOption("flow", "flow", true, "单任务必填,业务流程名");
         options.addOption("task", "taskName", true, "单任务必填,任务名");
+        options.addOption("odps", "odps_home", true, "odps cmd 根目录");
+
     }
 
     private static void verify(CommandLine commandLine) {
@@ -58,6 +66,9 @@ public class Main {
             if (!commandLine.hasOption("f")) {
                 throw new RuntimeException();
             }
+            if (!commandLine.hasOption("odps")) {
+                throw new RuntimeException();
+            }
             if (commandLine.hasOption("s")) {
                 if (!commandLine.hasOption("flow") || !commandLine.hasOption("task")) {
                     throw new RuntimeException();
@@ -127,8 +138,9 @@ public class Main {
                 if (commandLine.hasOption("d")) {
                     log.info("debug模式:{}", jobs);
                 } else {
+                    String odpsCmdPath = commandLine.getOptionValue("odps");
                     dd.send(msg);
-                    bean.start(bizDate, jobs);
+                    bean.start(bizDate, jobs, odpsCmdPath);
                 }
             }
         } catch (ParseException e) {
@@ -146,7 +158,52 @@ public class Main {
     }
 
     @SneakyThrows
-    private void start(String bizDate, List<DataWorksFlowJob> jobs) {
+    private void start(String bizDate, List<DataWorksFlowJob> jobs, String odpsCmdHome) {
+        //为所有项目空间启动守护线程
+        String accessKeyId = dataWorksAccessProperties.getAccessKeyId();
+        String accessKeySecret = dataWorksAccessProperties.getAccessKeySecret();
+        Set<String> ps = jobs.stream().map(DataWorksFlowJob::getProject).collect(Collectors.toSet());
+        for (String p : ps) {
+            new SparkDaemonThread(p, accessKeyId, accessKeySecret, odpsCmdHome, 90L).start();
+        }
+
+        //运行job,并接收失败参数,最大重试三次
+        Set<TaskInfo> failureTask = run(bizDate, jobs);
+        int i = 3;
+        while (!failureTask.isEmpty() && i-- > 0) {
+            Set<String> fSet = failureTask.stream().map(TaskInfo::getKey).collect(Collectors.toSet());
+
+            List<DataWorksFlowJob> js = jobs.stream().map(job -> {
+                String project = job.getProject();
+                String flow = job.getFlow();
+                List<DataWorksFlowTask> task = job.getTask();
+
+                List<DataWorksFlowTask> collect = task.stream().filter(t -> fSet.contains(project + ":" + flow + ":" + t.getTaskName())
+                ).collect(Collectors.toList());
+
+                if (collect.isEmpty()) {
+                    return null;
+                } else {
+                    return new DataWorksFlowJob(project, flow, collect);
+                }
+            }).filter(Objects::nonNull).collect(Collectors.toList());
+
+            String collect = js.stream().flatMap(job -> {
+                String project = job.getProject();
+                String flow = job.getFlow();
+                List<DataWorksFlowTask> task = job.getTask();
+                return task.stream().map(t -> project + ":" + flow + ":" + t.getTaskName());
+            }).collect(Collectors.joining(","));
+            dingUtils.send("【" + (3 - i) + "】重新启动以下job:" + collect);
+            failureTask = run(bizDate, js);
+        }
+        if (!failureTask.isEmpty()) {
+            System.exit(-1);
+        }
+    }
+
+    @SneakyThrows
+    private Set<TaskInfo> run(String bizDate, List<DataWorksFlowJob> jobs) {
         log.info("start!");
         LocalDateTime start = LocalDateTime.now();
         List<TaskInfo> collect = jobs.stream()
@@ -173,6 +230,7 @@ public class Main {
         int totalTask = collect.size();
 
         Set<TaskInfo> end = new HashSet<>();
+        Set<TaskInfo> failureTask = new HashSet<>();
         TimedCache<TaskInfo, String> timedCache = CacheUtil.newTimedCache(300 * 1000);
         int i = 0;
         int successTask = 0;
@@ -207,8 +265,9 @@ public class Main {
                 if (failure.size() != 0) {
                     failedTask++;
                     log.error("failure node:{} ", failure);
-                    DingMsg error = new DingMsg("任务失败", taskInfo.getProject(), taskInfo.getFlow(), String.join(",", failure), TaskFlowEnum.FAILURE.getMsg());
+                    DingMsg error = new DingMsg("任务失败", taskInfo.getProject(), taskInfo.getFlow(), taskInfo.getTaskName(), String.join(",", failure), TaskFlowEnum.FAILURE.getMsg());
                     dingUtils.send(error);
+                    failureTask.add(taskInfo);
                 } else {
                     if (await.size() != 0) {
                         awaitTask++;
@@ -230,7 +289,7 @@ public class Main {
                         if (!timedCache.containsKey(taskInfo) && i <= 6) {
                             //超两小时
                             i++;
-                            DingMsg error = new DingMsg("【" + i + "】任务长时间未结束", taskInfo.getProject(), taskInfo.getFlow(), String.join(",", failure), TaskFlowEnum.RUNNING.getMsg());
+                            DingMsg error = new DingMsg("【" + i + "】任务长时间未结束", taskInfo.getProject(), taskInfo.getFlow(), taskInfo.getTaskName(), String.join(",", failure), TaskFlowEnum.RUNNING.getMsg());
                             dingUtils.send(error);
                             timedCache.put(taskInfo, "1");
                         }
@@ -250,9 +309,7 @@ public class Main {
             }
             Thread.sleep(10000);
         }
-        if (failedTask != 0) {
-            System.exit(-1);
-        }
         log.info("end");
+        return failureTask;
     }
 }

+ 4 - 2
src/main/java/com/winhc/dataworks/flow/touch/bean/DingMsg.java

@@ -28,10 +28,11 @@ public class DingMsg {
                 "\n\n" + "> 系统信息:" + os();
     }
 
-    public DingMsg(String msgLevel, String project, String flow, String nodeName, String status) {
+    public DingMsg(String msgLevel, String project, String flow, String task, String nodeName, String status) {
         this.msgLevel = msgLevel;
         this.project = project;
         this.flow = flow;
+        this.task = task;
         this.nodeName = nodeName;
         this.status = status;
         LocalDateTime date = LocalDateTime.now();
@@ -42,6 +43,7 @@ public class DingMsg {
     private String msgLevel;
     private String project;
     private String flow;
+    private String task;
     private String nodeName;
     private String status;
     private String date;
@@ -50,7 +52,7 @@ public class DingMsg {
     public String toMd() {
         StringBuilder sb = new StringBuilder();
         return sb.append("#### ").append(msgLevel)
-                .append("\n\n").append("位置:").append(project).append(":").append(flow)
+                .append("\n\n").append("位置:").append(project).append(":").append(flow).append(":").append(task)
                 .append("\n\n").append("节点:").append(nodeName)
                 .append("\n\n").append("状态:").append(status)
                 .append("\n\n").append("> 时间:").append(date)

+ 28 - 0
src/main/java/com/winhc/dataworks/flow/touch/bean/SparkJobInfo.java

@@ -0,0 +1,28 @@
+package com.winhc.dataworks.flow.touch.bean;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+
+import java.time.LocalDateTime;
+import java.time.temporal.ChronoUnit;
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/8/5 15:44
+ * @Description:
+ */
+@Getter
+@Setter
+@AllArgsConstructor
+public class SparkJobInfo {
+    private LocalDateTime startTime;
+    private String instanceId;
+    private String state;
+    private String runningMode;
+    private String applicationName;
+
+    public Long runningMinutes() {
+        return ChronoUnit.MINUTES.between(startTime, LocalDateTime.now());
+    }
+}

+ 132 - 0
src/main/java/com/winhc/dataworks/flow/touch/cmd/OdpsCmd.java

@@ -0,0 +1,132 @@
+package com.winhc.dataworks.flow.touch.cmd;
+
+import cn.hutool.core.io.file.FileWriter;
+import cn.hutool.core.util.RuntimeUtil;
+import com.winhc.dataworks.flow.touch.bean.SparkJobInfo;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.SystemUtils;
+
+import java.io.File;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Locale;
+import java.util.Objects;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/8/5 15:26
+ * @Description:
+ */
+@Slf4j
+public class OdpsCmd {
+    private String project_name;
+    private String access_id;
+    private String access_key;
+    private String basePath;
+    private String baseCmd;
+
+    public OdpsCmd(String project_name, String access_id, String access_key, String basePath) {
+        this.project_name = project_name;
+        this.access_id = access_id;
+        this.access_key = access_key;
+        this.basePath = basePath;
+        this.baseCmd = basePath + File.separator + "bin" + File.separator + "odpscmd";
+        this.baseCmd += SystemUtils.IS_OS_WINDOWS ? ".bat" : "";
+        this.writeConfig(new OdpsConfg(project_name, access_id, access_key).toString());
+    }
+
+    @Getter
+    @Setter
+    @AllArgsConstructor
+    private static class OdpsConfg {
+        private String project_name;
+        private String access_id;
+        private String access_key;
+
+        @Override
+        public String toString() {
+            return "project_name=" + project_name + "\n" +
+                    "access_id=" + access_id + "\n" +
+                    "access_key=" + access_key + "\n" +
+                    "end_point=http://service.odps.aliyun.com/api\n" +
+                    "log_view_host=http://logview.odps.aliyun.com\n" +
+                    "https_check=true\n" +
+                    "data_size_confirm=100.0\n" +
+                    "update_url=http://repo.aliyun.com/odpscmd\n" +
+                    "use_instance_tunnel=true\n" +
+                    "instance_tunnel_max_record=10000";
+        }
+    }
+
+    private void writeConfig(String str) {
+        FileWriter writer = new FileWriter(basePath + File.separator + "conf" + File.separator + "odps_config.ini");
+        writer.write(str);
+    }
+
+
+    private synchronized List<String> execute(String cmd) {
+        String c = baseCmd + " --project winhc_eci_dev " + " -e \"" + cmd + "\";";
+        log.info(c);
+        List<String> execute = new ArrayList<>();
+        if (SystemUtils.IS_OS_WINDOWS) {
+            execute = RuntimeUtil.execForLines(c);
+        } else {
+            String fs = basePath + "/xjk_start.sh";
+            try {
+                File file = new File(fs);
+                if (!file.exists()) {
+                    file.createNewFile();
+                }
+                file.setExecutable(true);
+                List<String> list = new ArrayList<String>();
+                list.add("#!/bin/bash");
+                list.add(c);
+                FileUtils.writeLines(file, list, false);
+                execute = RuntimeUtil.execForLines(fs);
+            } catch (Exception e) {
+                log.error(e.getMessage(), e);
+            }
+        }
+        log.info("out:\n{}", String.join("\n", execute));
+        return execute;
+    }
+
+    private static final Pattern pattern = Pattern.compile("(\\d{4}-\\d{2}-\\d{2} \\d{2}:\\d{2}:\\d{2}) +(\\w+?) +(\\w+?) +(\\w+?) +([0-9a-zA-Z\\._]+?) *");
+
+    public Boolean kill(String instanceId) {
+        execute("spark kill -i " + instanceId);
+        return true;
+    }
+
+    public List<SparkJobInfo> query(String state) {
+        List<String> execute = execute("spark list -s " + state.toUpperCase());
+        return execute.stream().filter(str -> !str.startsWith("StartTime")).map(str -> {
+            Matcher m = pattern.matcher(str);
+            if (m.find()) {
+                return new SparkJobInfo(parse(m.group(1)), m.group(2), m.group(3), m.group(4), m.group(5));
+            } else {
+                log.error("error str : {}", str);
+                return null;
+            }
+        }).filter(Objects::nonNull).collect(Collectors.toList());
+    }
+
+    DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")
+            .withLocale(Locale.CHINA)
+            .withZone(ZoneId.systemDefault());
+
+    private LocalDateTime parse(String str) {
+        LocalDateTime parse = LocalDateTime.parse(str, dtf);
+        return parse;
+    }
+}

+ 45 - 0
src/main/java/com/winhc/dataworks/flow/touch/service/OdpsCmdService.java

@@ -0,0 +1,45 @@
+package com.winhc.dataworks.flow.touch.service;
+
+import com.helospark.lightdi.annotation.Autowired;
+import com.helospark.lightdi.annotation.Service;
+import com.winhc.dataworks.flow.touch.bean.SparkJobInfo;
+import com.winhc.dataworks.flow.touch.cmd.OdpsCmd;
+import com.winhc.dataworks.flow.touch.configuration.DataWorksAccessProperties;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/8/5 14:43
+ * @Description:
+ */
+@Service
+public class OdpsCmdService {
+    private DataWorksAccessProperties dataWorksAccessProperties;
+    private OdpsCmd odpsCmd;
+
+
+
+    @Autowired
+    public OdpsCmdService(DataWorksAccessProperties dataWorksAccessProperties) {
+        this.dataWorksAccessProperties = dataWorksAccessProperties;
+        this.odpsCmd = new OdpsCmd("", dataWorksAccessProperties.getAccessKeyId(), dataWorksAccessProperties.getAccessKeySecret(), "");
+
+    }
+
+    public List<String> getTimeOutJobs(Long minutes) {
+        List<SparkJobInfo> running = odpsCmd.query("RUNNING");
+        return running.stream().filter(e -> e.runningMinutes() >= minutes).map(SparkJobInfo::getInstanceId).collect(Collectors.toList());
+    }
+
+    public Boolean kill(String instanceId) {
+        return odpsCmd.kill(instanceId);
+    }
+
+
+    private static final String basePath = "C:\\Users\\x\\Downloads\\Compressed\\odpscmd_public";
+
+
+
+}

+ 2 - 1
src/main/java/com/winhc/dataworks/flow/touch/service/TouchService.java

@@ -105,7 +105,8 @@ public class TouchService {
                 .getAcsResponse(searchNodeInstanceListRequest);
         java.util.List<SearchManualDagNodeInstanceResponse.NodeInsInfo> nodeInsfos = searchResponse.getData();
         for (SearchManualDagNodeInstanceResponse.NodeInsInfo nodeInsInfo : nodeInsfos) {
-            log.info("{}:{} {}", nodeInsInfo.getNodeName(), nodeInsInfo.getStatus(), TaskFlowEnum.getTaskFlowEnumByCode(nodeInsInfo.getStatus()));
+            TaskFlowEnum code = TaskFlowEnum.getTaskFlowEnumByCode(nodeInsInfo.getStatus());
+            log.info("{}:{} {}", nodeInsInfo.getNodeName(), nodeInsInfo.getStatus(), code);
         }
         return nodeInsfos.stream()
                 .collect(Collectors.toMap(SearchManualDagNodeInstanceResponse.NodeInsInfo::getNodeName, node -> TaskFlowEnum.getTaskFlowEnumByCode(node.getStatus()), (o1, o2) -> o1));

+ 1 - 1
src/main/java/com/winhc/dataworks/flow/touch/utils/DingUtils.java

@@ -32,7 +32,7 @@ public class DingUtils {
     }
 
     public boolean send(String msg) {
-        return sendByBody(getMdBody(msg));
+        return sendByBody(getMdBody(msg.replace("\\", "\\\\")));
     }
 
     private boolean sendByBody(String body) {

+ 50 - 0
src/main/java/com/winhc/dataworks/flow/touch/utils/SparkDaemonKill.java

@@ -0,0 +1,50 @@
+package com.winhc.dataworks.flow.touch.utils;
+
+import com.winhc.dataworks.flow.touch.cmd.OdpsCmd;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Queue;
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/8/17 11:12
+ * @Description:
+ */
+@Slf4j
+public class SparkDaemonKill extends Thread {
+
+    private Queue<String> queue;
+    private OdpsCmd odpsCmd;
+
+    /**
+     * @param project_name 工程名
+     * @param access_id    key
+     * @param access_key   secret
+     * @param basePath     odps根目录
+     * @param queue        对列
+     */
+    public SparkDaemonKill(String project_name, String access_id, String access_key, String basePath, Queue<String> queue) {
+        super.setDaemon(true);
+        super.setName("spark-kill-daemon");
+        this.queue = queue;
+        this.odpsCmd = new OdpsCmd(project_name, access_id, access_key, basePath);
+    }
+
+
+    @Override
+    public void run() {
+        try {
+            while (true) {
+                String poll = queue.poll();
+                if (poll != null) {
+                    odpsCmd.kill(poll);
+                }
+                if (queue.isEmpty()) {
+                    Thread.sleep(10000);
+                }
+            }
+        } catch (Exception e) {
+            log.error(e.getMessage(), e);
+        }
+    }
+}

+ 67 - 0
src/main/java/com/winhc/dataworks/flow/touch/utils/SparkDaemonThread.java

@@ -0,0 +1,67 @@
+package com.winhc.dataworks.flow.touch.utils;
+
+import com.winhc.dataworks.flow.touch.bean.SparkJobInfo;
+import com.winhc.dataworks.flow.touch.cmd.OdpsCmd;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/8/17 10:26
+ * @Description:
+ */
+@Slf4j
+public class SparkDaemonThread extends Thread {
+    private OdpsCmd odpsCmd;
+    private long maxMinutes;
+    private long interval;
+
+    /**
+     * @param project_name    工程名
+     * @param access_id       key
+     * @param access_key      secret
+     * @param basePath        odps根目录
+     * @param maxMinutes      超时间 分钟
+     * @param intervalMinutes 守护间隔时间,默认为10分钟
+     */
+    public SparkDaemonThread(String project_name, String access_id, String access_key, String basePath, long maxMinutes, double intervalMinutes) {
+        super.setDaemon(true);
+        super.setName("spark-daemon");
+        this.maxMinutes = maxMinutes;
+        this.interval = (long) (intervalMinutes * 60 * 1000);
+        this.odpsCmd = new OdpsCmd(project_name, access_id, access_key, basePath);
+    }
+
+    /**
+     * @param project_name 工程名
+     * @param access_id    key
+     * @param access_key   secret
+     * @param basePath     odps根目录
+     * @param maxMinutes   超时间 分钟
+     */
+    public SparkDaemonThread(String project_name, String access_id, String access_key, String basePath, long maxMinutes) {
+        this(project_name, access_id, access_key, basePath, maxMinutes, 5L);
+    }
+
+
+    @Override
+    public void run() {
+        try {
+            while (true) {
+                List<SparkJobInfo> running = odpsCmd.query("RUNNING");
+                List<String> timeOut = running.stream().filter(e -> e.runningMinutes() >= maxMinutes).map(SparkJobInfo::getInstanceId).collect(Collectors.toList());
+                if (!timeOut.isEmpty()) {
+                    for (String s : timeOut) {
+                        log.warn("kill InstanceId : {}", s);
+                        odpsCmd.kill(s);
+                    }
+                }
+                Thread.sleep(interval);
+            }
+        } catch (Exception e) {
+            log.error(e.getMessage(), e);
+        }
+    }
+}

+ 42 - 0
src/main/java/com/winhc/dataworks/flow/touch/utils/SparkDaemonUtils.java

@@ -0,0 +1,42 @@
+package com.winhc.dataworks.flow.touch.utils;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/8/17 11:20
+ * @Description:
+ */
+public class SparkDaemonUtils {
+
+
+    public static final Long sparkTimeoutMinutes = 90L;
+
+    private static final Map<String, Queue<String>> queueMap = new HashMap<>();
+
+    private static void createOrIgnore(String project) {
+        if (!queueMap.containsKey(project)) {
+            synchronized (SparkDaemonUtils.class) {
+                if (!queueMap.containsKey(project)) {
+                    queueMap.put(project, new ArrayBlockingQueue<>(100));
+                }
+            }
+        }
+    }
+
+    public static Queue<String> getQueue(String project) {
+        createOrIgnore(project);
+        return queueMap.get(project);
+    }
+
+    public static boolean put(String project, String val) {
+        if (!queueMap.containsKey(project)) {
+            throw new RuntimeException("对列没有进行初始化!");
+        }
+        queueMap.get(project).add(val);
+        return true;
+    }
+}

+ 2 - 2
src/main/resources/dataworks-config.properties

@@ -1,4 +1,4 @@
-access-key-id:LTAI4G4n7pAW8tUbJVkkZQPD
-access-key-secret:uNJOBskzcDqHq1TYG3m2rebR4c1009
+access-key-id:LTAI4FynxS5nNuKyZ3LHhMX5
+access-key-secret:r6gWoySXC8kSK4qnfKRxEuWJ5uHIiE
 region-id:cn-shanghai
 ding-secret:SECe7b26876f443e77f872b8b10880e39b3c5dfaf44855f1aa3235372bb73698ab6

+ 6 - 6
src/main/resources/task-xjk.yaml

@@ -1,11 +1,11 @@
 job:
-  #------<公司基本信息
   - project: winhc_test
-    flow: inc_company_spark
+    flow: test_kill
     task:
-      - taskName: company_inc
+      - taskName: test_kill
         param:
-          - _nodeId: 700003381602
+          - _nodeId: 700003494765
             project: winhc_eci_dev
-            bizdate: 20200618
-  #------>
+            tableName: test_kill
+            dupliCols: new_cid,case_no
+            flag: cid

+ 2 - 1
start.sh

@@ -5,6 +5,7 @@ export PATH=$JAVA_HOME/bin:$PATH
 export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
 
 bashPath=/root/maxcompute_job/flow
+odps_cmd=/root/maxcompute_job/flow/odpscmd
 debug=0
 
 if [ $# -eq 0 ]; then
@@ -53,7 +54,7 @@ start_job() {
     return
   else
     echo ""
-    java -jar $bashPath/DataWorks-flow-touch.jar -f $bashPath/jobs/$1
+    java -jar $bashPath/DataWorks-flow-touch.jar -f $bashPath/jobs/$1 -odps $odps_cmd
   fi
 
   if [ $? -eq 0 ]; then