许家凯 4 年之前
父節點
當前提交
7d0701b74f

+ 1 - 0
.gitignore

@@ -1,6 +1,7 @@
 # Created by .ignore support plugin (hsz.mobi)
 ### Python template
 # Byte-compiled / optimized / DLL files
+logs
 __pycache__/
 *.py[cod]
 *$py.class

+ 2 - 1
src/main/java/com/winhc/dataworks/flow/touch/EmptyTable.java

@@ -5,6 +5,7 @@ import com.helospark.lightdi.LightDiContext;
 import com.helospark.lightdi.annotation.Autowired;
 import com.helospark.lightdi.annotation.Component;
 import com.winhc.dataworks.flow.touch.service.OdpsService;
+import com.winhc.dataworks.flow.touch.utils.DateUtils;
 
 import java.util.List;
 
@@ -25,7 +26,7 @@ public class EmptyTable {
     private OdpsService odpsService;
 
     private void start() {
-        List<String> emptyTable = odpsService.getEmptyTable("20200621");
+        List<String> emptyTable = odpsService.getEmptyTable(DateUtils.getYesterday().replace("-", ""));
         System.out.println(emptyTable.size());
         emptyTable.forEach(System.out::println);
     }

+ 34 - 7
src/main/java/com/winhc/dataworks/flow/touch/Main.java

@@ -9,6 +9,7 @@ import com.helospark.lightdi.annotation.Autowired;
 import com.helospark.lightdi.annotation.Service;
 import com.winhc.dataworks.flow.touch.bean.*;
 import com.winhc.dataworks.flow.touch.configuration.SchemaInit;
+import com.winhc.dataworks.flow.touch.service.OdpsService;
 import com.winhc.dataworks.flow.touch.service.TouchService;
 import com.winhc.dataworks.flow.touch.utils.DateUtils;
 import com.winhc.dataworks.flow.touch.utils.DingUtils;
@@ -39,6 +40,8 @@ public class Main {
     private DingUtils dingUtils;
 
     static {
+        options.addOption("q", "query", false, "选填,查询昨天company增量表是否有数据");
+        options.addOption("d", "debug", false, "选填,启用debug模式,只输出任务参数不进行调度");
         options.addOption("s", "singleJob", false, "选填,是否单独触发一个业务流程");
         options.addOption("b", "bizDate", true, "选填,业务时间[2020-07-07],默认为昨天");
         options.addOption("f", "fileName", true, "必填,yaml文件");
@@ -71,6 +74,20 @@ public class Main {
         CommandLineParser parser = new DefaultParser();
         try {
             CommandLine commandLine = parser.parse(options, args);
+            if (commandLine.hasOption("q")) {
+                OdpsService bean = context.getBean(OdpsService.class);
+                String ds = DateUtils.getYesterday().replace("-", "");
+                List<String> emptyTable = bean.getEmptyTable(ds);
+                HashSet<String> set = new HashSet<>(emptyTable);
+                if (set.contains("company")) {
+                    log.error("company表{}分区数据为空,调度程序中止!", ds);
+                    dd.send("company表" + ds + "分区数据为空,调度程序中止!");
+                    System.exit(-1);
+                } else {
+                    log.info("company表{}分区数据不为空.", ds);
+                }
+                return;
+            }
             verify(commandLine);
             String bizDate = commandLine.getOptionValue("b", DateUtils.getYesterday());
             String fileName = commandLine.getOptionValue("f");
@@ -82,16 +99,23 @@ public class Main {
                 String task = commandLine.getOptionValue("task");
                 SingleJobMain singleJobMain = context.getBean(SingleJobMain.class);
                 String msg = "开始执行单个业务流程!bizDate:" + bizDate + " flow:" + flow + " task:" + task + " JobFile:" + commandLine.getOptionValue("f");
-//                dd.send(msg);
                 log.info(msg);
-                singleJobMain.start(flow, task, bizDate, jobs);
+                if (commandLine.hasOption("d")) {
+                    log.info("debug模式:{}", jobs);
+                } else {
+                    singleJobMain.start(flow, task, bizDate, jobs);
+                }
             } else {
                 log.info("触发全部业务流程!");
                 Main bean = context.getBean(Main.class);
                 String msg = "开始执行全部业务流程!bizDate:" + bizDate + " JobFile:" + commandLine.getOptionValue("f");
-                dd.send(msg);
                 log.info(msg);
-                bean.start(bizDate, jobs);
+                if (commandLine.hasOption("d")) {
+                    log.info("debug模式:{}", jobs);
+                } else {
+                    dd.send(msg);
+                    bean.start(bizDate, jobs);
+                }
             }
         } catch (ParseException e) {
             HelpFormatter formatter = new HelpFormatter();
@@ -121,7 +145,7 @@ public class Main {
                                         .projectName(dataWorksFlowJob.getProject())
                                         .bizDate(bizDate)
                                         .flowName(dataWorksFlowJob.getFlow())
-                                        .nodeParam(dataWorksFlowTask.toNodeParam())
+                                        .nodeParam(dataWorksFlowTask.toNodeParam(bizDate))
                                         .build();
                                 CreateManualDagResponse r = touchService.touch(build);
 
@@ -137,10 +161,10 @@ public class Main {
         Set<TaskInfo> end = new HashSet<>();
         TimedCache<TaskInfo, String> timedCache = CacheUtil.newTimedCache(300 * 1000);
         int i = 0;
+        int successTask = 0;
+        int failedTask = 0;
         while (true) {
             int awaitTask = 0;
-            int successTask = 0;
-            int failedTask = 0;
 
             boolean flag = true;
             List<String> empty = new ArrayList<>();
@@ -212,6 +236,9 @@ public class Main {
             }
             Thread.sleep(10000);
         }
+        if (failedTask != 0) {
+            System.exit(-1);
+        }
         log.info("end");
     }
 }

+ 1 - 1
src/main/java/com/winhc/dataworks/flow/touch/SingleJobMain.java

@@ -49,7 +49,7 @@ public class SingleJobMain {
                 .projectName(dataWorksFlowJob.getProject())
                 .bizDate(bizDate)
                 .flowName(dataWorksFlowJob.getFlow())
-                .nodeParam(dataWorksFlowTask.toNodeParam())
+                .nodeParam(dataWorksFlowTask.toNodeParam(bizDate))
                 .build();
         CreateManualDagResponse touch = touchService.touch(build);
         while (true) {

+ 8 - 2
src/main/java/com/winhc/dataworks/flow/touch/bean/DataWorksFlowTask.java

@@ -1,5 +1,6 @@
 package com.winhc.dataworks.flow.touch.bean;
 
+import com.winhc.dataworks.flow.touch.utils.JsonUtils;
 import lombok.AllArgsConstructor;
 import lombok.Getter;
 import lombok.NoArgsConstructor;
@@ -22,8 +23,13 @@ public class DataWorksFlowTask {
     private String taskName;
     private List<NodeParam> params;
 
-    public Map<String, String> toNodeParam() {
+    public Map<String, String> toNodeParam(String bizDate) {
         return params.stream()
-                .collect(Collectors.toMap(NodeParam::getNodeId, NodeParam::toNodeParam, (o1, o2) -> o1));
+                .collect(Collectors.toMap(NodeParam::getNodeId, n->n.toNodeParam(bizDate), (o1, o2) -> o1));
+    }
+
+    @Override
+    public String toString() {
+        return JsonUtils.jsonObjToString(this);
     }
 }

+ 2 - 1
src/main/java/com/winhc/dataworks/flow/touch/bean/NodeParam.java

@@ -21,7 +21,8 @@ public class NodeParam {
     private String nodeId;
     private Map<String, String> param;
 
-    public String toNodeParam() {
+    public String toNodeParam(String bizDate) {
+        param.put("bizdate", bizDate.replace("-", ""));
         return param.entrySet().stream()
                 .filter(e -> !e.getKey().startsWith("_"))
                 .filter(e-> StringUtils.isNotEmpty(e.getValue()))

+ 9 - 6
src/main/resources/log4j2.xml

@@ -10,24 +10,27 @@
             <PatternLayout pattern="%d{HH:mm:ss.SSS} %-5level %class{36} %L %M : %msg%xEx%n"/>
         </Console>
         <!--文件会打印出所有信息,这个log每次运行程序会自动清空,由append属性决定,这个也挺有用的,适合临时测试用-->
-        <File name="log" fileName="${sys:user.home}/logs/current.log" append="false">
+        <File name="log" fileName="current.log" append="false">
             <PatternLayout pattern="%d{HH:mm:ss.SSS} %-5level %class{36} %L %M - %msg%xEx%n"/>
         </File>
 
         <!--这个会打印出所有的信息,每次大小超过size,则这size大小的日志会自动存入按年份-月份建立的文件夹下面并进行压缩,作为存档-->
-        <RollingFile name="RollingFile" fileName="${sys:user.home}/logs/DataWorks-flow-touch.log"
-                     filePattern="${sys:user.home}/logs/$${date:yyyy-MM}/DataWorks-flow-touch-%d{MM-dd-yyyy}-%i.log.gz">
+        <RollingFile name="RollingFile" fileName="logs/DataWorks-flow-touch.log"
+                     filePattern="logs/$${date:yyyy-MM}/DataWorks-flow-touch-%d{MM-dd-yyyy}-%i.log.gz">
             <PatternLayout pattern="%d{yyyy-MM-dd 'at' HH:mm:ss z} %-5level %class{36} %L %M - %msg%xEx%n"/>
-            <SizeBasedTriggeringPolicy size="50MB"/>
+            <Policies>
+                <TimeBasedTriggeringPolicy modulate="true" interval="1"/>
+                <SizeBasedTriggeringPolicy size="50MB"/>
+            </Policies>
         </RollingFile>
     </appenders>
     <!--然后定义logger,只有定义了logger并引入的appender,appender才会生效-->
     <loggers>
         <!--建立一个默认的root的logger-->
         <root level="INFO">
-<!--            <appender-ref ref="log"/>-->
             <appender-ref ref="Console"/>
-<!--            <appender-ref ref="RollingFile"/>-->
+            <appender-ref ref="log"/>
+            <appender-ref ref="RollingFile"/>
         </root>
     </loggers>
 </configuration>

+ 258 - 0
src/main/resources/task-step02.yaml

@@ -0,0 +1,258 @@
+# 公司基本信息、招投标、环保处罚、购地信息、地块公示、行政处罚-信用中国、行政处罚、土地抵押、土地转让
+job:
+  #------<公司基本信息
+  - project: winhc_test
+    flow: inc_company_spark
+    task:
+      - taskName: company_inc
+        param:
+          - _nodeId: 700003381602
+            project: winhc_eci_dev
+  #------>
+  # 招聘
+  - project: winhc_test
+    flow: incr_calc_intellectual
+    task:
+      - taskName: company_employment
+        param:
+          - _nodeId: 700003375026
+            project: winhc_eci_dev
+            tableName: company_employment
+            dupliCols: title,new_cid,url_path
+            flag: cid
+          - _nodeId: 700003380225
+            project: winhc_eci_dev
+            tableName: company_employment
+            cidField: new_cid
+            dupliCols: title,new_cid,url_path
+  #------<招投标:Spark
+  - project: winhc_test
+    flow: incr_calc_intellectual
+    task:
+      - taskName: company_bid
+        param:
+          - _nodeId: 700003375026
+            project: winhc_eci_dev
+            tableName: company_bid
+            dupliCols: new_cid,title,link,publish_time
+            flag: cids
+          - _nodeId: 700003380225
+            project: winhc_eci_dev
+            tableName: company_bid_list
+            cidField: new_cid
+            dupliCols: new_cid,title,link,publish_time
+      #------>
+      #------<环保处罚
+      - taskName: company_env_punishment
+        param:
+          - _nodeId: 700003375026
+            project: winhc_eci_dev
+            tableName: company_env_punishment
+            dupliCols: new_cid,name,source_url,punish_number
+            flag: cid
+          - _nodeId: 700003380225
+            project: winhc_eci_dev
+            tableName: company_land_publicity
+            cidField: new_cid
+            dupliCols: new_cid,title,project_name,source_url
+      #------>
+      #------<购地信息:Spark
+      - taskName: company_land_announcement
+        param:
+          - _nodeId: 700003375026
+            project: winhc_eci_dev
+            tableName: company_land_announcement
+            dupliCols: new_cid,source,e_number,application_name
+            flag: cid
+          - _nodeId: 700003380225
+            project: winhc_eci_dev
+            tableName: company_land_announcement
+            cidField: new_cid
+            dupliCols: new_cid,source,e_number,application_name
+      #------>
+      #------<地块公示:Spark
+      - taskName: company_land_publicity
+        param:
+          - _nodeId: 700003375026
+            project: winhc_eci_dev
+            tableName: company_land_publicity
+            dupliCols: new_cid,title,project_name,source_url
+            flag: cid
+          - _nodeId: 700003380225
+            project: winhc_eci_dev
+            tableName: company_land_publicity
+            cidField: new_cid
+            dupliCols: new_cid,title,project_name,source_url
+      #------>
+      #------<行政处罚-信用中国:SPARK
+      - taskName: company_punishment_info_creditchina
+        param:
+          - _nodeId: 700003375026
+            project: winhc_eci_dev
+            tableName: company_punishment_info_creditchina
+            dupliCols: new_cid,company_name,source,punish_number
+            flag: cid
+          - _nodeId: 700003380225
+            project: winhc_eci_dev
+            tableName: company_punishment_info_creditchina
+            cidField: new_cid
+            dupliCols: new_cid,company_name,source,punish_number
+      #------>
+      #------<行政处罚:SPARK
+      - taskName: company_punishment_info
+        param:
+          - _nodeId: 700003375026
+            project: winhc_eci_dev
+            tableName: company_punishment_info
+            dupliCols: new_cid,name,source,desc_file_path,punish_number
+            flag: cid
+          - _nodeId: 700003380225
+            project: winhc_eci_dev
+            tableName: company_punishment_info
+            cidField: new_cid
+            dupliCols: new_cid,name,source,desc_file_path,punish_number
+  #------>
+  #------<土地抵押:ODPS SQL
+  - project: winhc_test
+    flow: inc_company_land_mortgage_sql
+    task:
+      - taskName: company_land_mortgage
+        param:
+          - _nodeId: 700003375909
+            PROJECT: winhc_eci_dev
+          #            DIM_TABLE: company_land_mortgage
+          - _nodeId: 700003375910
+            PROJECT: winhc_eci_dev
+            DIM_TABLE: company_land_mortgage
+            DIM_COLUMS: type,id,land_mark,land_num,land_aministrative_area,land_loc,land_area,other_item_num,use_right_num,mortgagor,mortgagee,nature,use_for,use_type,area,evaluate_amount,mortgage_amount,source_url,start_date,end_date,create_time,update_time,deleted
+            DUPLI_COLS: new_cid,type,source_url,land_mark,land_num
+            MD5_COLS: type,source_url,land_mark,land_num
+          - _nodeId: 700003422526
+            PROJECT: winhc_eci_dev
+            DIM_TABLE: company_land_mortgage
+  #------>
+  #------<土地转让:ODPS SQL
+  - project: winhc_test
+    flow: inc_company_land_transfer_sql
+    task:
+      - taskName: company_land_transfer
+        param:
+          - _nodeId: 700003377079
+            PROJECT: winhc_eci_dev
+            #            DIM_TABLE: company_land_transfer
+            DS: 20200717
+          - _nodeId: 700003377080
+            PROJECT: winhc_eci_dev
+            DIM_TABLE: company_land_transfer
+            DIM_COLUMS: type,id,mark,num,location,aministrative_area,user_pre,user_now,area,use_for,use_type,years_of_use,situation,level,merchandise_type,merchandise_price,merchandise_time,url,create_time,update_time,deleted
+            DUPLI_COLS: new_cid,type,url,num,mark,location
+            MD5_COLS: type,url,num,mark,location
+            DS: 20200717
+          - _nodeId: 700003421692
+            PROJECT: winhc_eci_dev
+            DIM_TABLE: company_land_transfer
+  #------>
+
+  - project: winhc_test
+    flow: incr_calc_intellectual
+    task:
+      #run-1
+      - taskName: company_icp
+        param:
+          - _nodeId: 700003375026
+            project: winhc_eci_dev
+            tableName: company_icp
+            dupliCols: new_cid,liscense,domain
+            flag: cid
+          - _nodeId: 700003380225
+            project: winhc_eci_dev
+            tableName: company_icp
+            cidField: new_cid
+            dupliCols: new_cid,liscense,domain
+
+      #run-1
+      - taskName: company_app_info
+        param:
+          - _nodeId: 700003375026
+            project: winhc_eci_dev
+            tableName: company_app_info
+            dupliCols: new_cid,name
+            flag: cid
+          - _nodeId: 700003380225
+            project: winhc_eci_dev
+            tableName: company_app_info
+            cidField: new_cid
+            dupliCols: new_cid,name
+
+      #run -1
+      - taskName: company_copyright_reg
+        param:
+          - _nodeId: 700003375026
+            project: winhc_eci_dev
+            tableName: company_copyright_reg
+            dupliCols: new_cid,reg_num
+            flag: cids
+          - _nodeId: 700003380225
+            project: winhc_eci_dev
+            tableName: company_copyright_reg_list
+            cidField: new_cid
+            dupliCols: new_cid,reg_num
+
+      #run-1
+      - taskName: company_wechat
+        param:
+          - _nodeId: 700003375026
+            project: winhc_eci_dev
+            tableName: company_wechat
+            dupliCols: new_cid,public_num
+            flag: cid
+          - _nodeId: 700003380225
+            project: winhc_eci_dev
+            tableName: company_wechat
+            cidField: new_cid
+            dupliCols: new_cid,public_num
+
+      #run -1
+      - taskName: company_tm
+        param:
+          - _nodeId: 700003375026
+            project: winhc_eci_dev
+            tableName: company_tm
+            dupliCols: new_cid,reg_no
+            flag: cid
+          - _nodeId: 700003380225
+            project: winhc_eci_dev
+            tableName: company_tm
+            cidField: new_cid
+            dupliCols: new_cid,reg_no
+
+        #run -1
+      - taskName: company_patent
+        param:
+          - _nodeId: 700003375026
+            project: winhc_eci_dev
+            tableName: company_patent
+            dupliCols: new_cid,pub_number,app_number
+            flag: cids
+          - _nodeId: 700003380225
+            project: winhc_eci_dev
+            tableName: company_patent_list
+            cidField: new_cid
+            dupliCols: new_cid,pub_number,app_number
+
+
+      #run-1
+      - taskName: company_copyright_works
+        param:
+          - _nodeId: 700003375026
+            project: winhc_eci_dev
+            tableName: company_copyright_works
+            dupliCols: new_cid,reg_num
+            flag: cids
+          - _nodeId: 700003380225
+            project: winhc_eci_dev
+            tableName: company_copyright_works_list
+            cidField: new_cid
+            dupliCols: new_cid,reg_num
+
+

+ 19 - 0
src/main/resources/task-step03.yaml

@@ -0,0 +1,19 @@
+job:
+  - project: winhc_eci
+    flow: company_court_announcement
+    task:
+
+      #run-1 法院公告
+      - taskName: company_court_announcement
+        param:
+          - _nodeId: 700003422539
+            project: winhc_eci_dev
+            tableName: company_court_announcement
+  - project: winhc_eci
+    flow: company_change_dynamic
+    task:
+      #run-1 企业动态
+      - taskName: company_change_dynamic
+        param:
+          - _nodeId: 700003430766
+            project: winhc_eci_dev

+ 9 - 0
src/main/resources/task-step04.yaml

@@ -0,0 +1,9 @@
+job:
+  - project: winhc_eci
+    flow: good_news
+    task:
+      #run-1 利好消息
+      - taskName: good_news
+        param:
+          - _nodeId: 700003430821
+            project: winhc_eci_dev

+ 74 - 0
start.sh

@@ -0,0 +1,74 @@
+#!/bin/bash
+
+export JAVA_HOME=/root/jdk1.8.0_181
+export PATH=$JAVA_HOME/bin:$PATH
+export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
+#!/usr/bin/env bash
+
+if [ $# -eq 0 ];then
+        echo "---------------------------------------------------------------------------"
+        # OS Type
+        echo "                    OS Type: $(uname -o)"
+        # OS Release Version and Name
+        echo "OS Release Version and Name: $(cat /etc/issue | head -n 1)"
+        #Architecture
+        echo "               Architecture: $(uname -m)"
+        #Kernel Release
+        echo "             Kernel Release: $(uname -r)"
+        #hostname
+        echo "                   hostname: $HOSTNAME"
+        #Internal IP
+        echo "                Internal IP: $(hostname -I)"
+        #External IP
+        echo "                External IP: $(curl --connect-timeout 10 -m 20 -s ipecho.net/plain)"
+        #DNS
+        dnsservers=""
+        for i in $(cat /etc/resolv.conf | grep '^nameserver' | awk '{print $NF}')
+        do
+                dnsservers="$dnsservers$i "
+        done
+        echo "                        DNS: $dnsservers"
+        #if connected to Internet or not
+        echo "            internet status: $( ping -c 2 www.baidu.com &> /dev/null && echo 'connected' || echo 'disconnected')"
+        echo "---------------------------------------------------------------------------"
+fi
+
+java -jar DataWorks-flow-touch.jar -q
+
+if [ $? -eq 0 ]; then
+     echo "company is not empty!"
+else
+     echo "company is empty!"
+     exit $?
+fi
+
+java -jar DataWorks-flow-touch.jar -f jobs/task-step02.yaml
+
+if [ $? -eq 0 ]; then
+     echo "step02 succeed"
+else
+     echo "step02 failed"
+     exit $?
+fi
+
+#java -jar DataWorks-flow-touch.jar -f jobs/task-step03.yaml -d
+#
+#if [ $? -eq 0 ]; then
+#     echo "step03 succeed"
+#else
+#     echo "step03 failed"
+#     exit $?
+#fi
+#
+#java -jar DataWorks-flow-touch.jar -f jobs/task-step04.yaml -d
+#
+#if [ $? -eq 0 ]; then
+#     echo "step04 succeed"
+#else
+#     echo "step04 failed"
+#     exit $?
+#fi
+
+echo "end"
+
+