Explorar el Código

增量空数据查询,各人任务配制分离

许家凯 hace 4 años
padre
commit
76b09d92ff

+ 7 - 0
pom.xml

@@ -14,10 +14,17 @@
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
         <maven.compiler.source>1.8</maven.compiler.source>
         <maven.compiler.target>1.8</maven.compiler.target>
+        <sdk.version>0.33.7-public</sdk.version>
     </properties>
 
     <dependencies>
         <dependency>
+            <groupId>com.aliyun.odps</groupId>
+            <artifactId>odps-sdk-core</artifactId>
+            <version>${sdk.version}</version>
+        </dependency>
+
+        <dependency>
             <groupId>com.squareup.okhttp3</groupId>
             <artifactId>okhttp</artifactId>
             <version>3.10.0</version>

+ 32 - 0
src/main/java/com/winhc/dataworks/flow/touch/EmptyTable.java

@@ -0,0 +1,32 @@
+package com.winhc.dataworks.flow.touch;
+
+import com.helospark.lightdi.LightDi;
+import com.helospark.lightdi.LightDiContext;
+import com.helospark.lightdi.annotation.Autowired;
+import com.helospark.lightdi.annotation.Component;
+import com.winhc.dataworks.flow.touch.service.OdpsService;
+
+import java.util.List;
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/7/6 18:24
+ * @Description:
+ */
+@Component
+public class EmptyTable {
+    public static void main(String[] args) {
+        LightDiContext context = LightDi.initContextByPackage(Main.class.getPackage().getName());
+        EmptyTable bean = context.getBean(EmptyTable.class);
+        bean.start();
+    }
+
+    @Autowired
+    private OdpsService odpsService;
+
+    private void start() {
+        List<String> emptyTable = odpsService.getEmptyTable("20200621");
+        System.out.println(emptyTable.size());
+        emptyTable.forEach(System.out::println);
+    }
+}

+ 29 - 0
src/main/java/com/winhc/dataworks/flow/touch/configuration/OdpsConfiguration.java

@@ -0,0 +1,29 @@
+package com.winhc.dataworks.flow.touch.configuration;
+
+import com.aliyun.odps.Odps;
+import com.aliyun.odps.account.Account;
+import com.aliyun.odps.account.AliyunAccount;
+import com.helospark.lightdi.annotation.Autowired;
+import com.helospark.lightdi.annotation.Bean;
+import com.helospark.lightdi.annotation.Configuration;
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/7/6 17:52
+ * @Description:
+ */
+@Configuration
+public class OdpsConfiguration {
+    @Autowired
+    private DataWorksAccessProperties project;
+    private static final String endPoint = "http://service.odps.aliyun.com/api";
+
+    @Bean
+    public Odps odps(){
+        Account account = new AliyunAccount(project.getAccessKeyId(), project.getAccessKeySecret());
+        Odps odps = new Odps(account);
+        odps.setEndpoint(endPoint);
+        odps.setDefaultProject("winhc_eci_dev");
+        return odps;
+    }
+}

+ 3 - 1
src/main/java/com/winhc/dataworks/flow/touch/configuration/SchemaInit.java

@@ -21,9 +21,11 @@ import java.util.stream.Collectors;
 public class SchemaInit {
     public static final List<DataWorksFlowJob> LIST;
 
+    private static final String fileName = "task-xjk.yaml";
+
     static {
         Yaml yml = new Yaml();
-        String path = Object.class.getResource("/").getPath().substring(1) + "odps-schema.yaml";
+        String path = Object.class.getResource("/").getPath().substring(1) + fileName;
         Reader reader = null;
         try {
             reader = new FileReader(new File(path));

+ 44 - 0
src/main/java/com/winhc/dataworks/flow/touch/service/OdpsService.java

@@ -0,0 +1,44 @@
+package com.winhc.dataworks.flow.touch.service;
+
+import com.aliyun.odps.Instance;
+import com.aliyun.odps.Odps;
+import com.aliyun.odps.OdpsException;
+import com.aliyun.odps.data.Record;
+import com.aliyun.odps.task.SQLTask;
+import com.helospark.lightdi.annotation.Autowired;
+import com.helospark.lightdi.annotation.Service;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/7/6 17:52
+ * @Description:
+ */
+@Service
+public class OdpsService {
+
+    @Autowired
+    private Odps odps;
+
+    public List<String> getEmptyTable(String ds) {
+        ds = ds.replace("-", "");
+        String sql = "SELECT table_name FROM information_schema.PARTITIONS WHERE regexp_extract(table_name,'^inc_ods_(.+)$') IS NOT NULL AND partition_name = 'ds=" + ds + "' and data_length = 0 ;";
+        Instance i;
+        List<String> list = new ArrayList<>();
+        try {
+            i = SQLTask.run(odps, sql);
+            i.waitForSuccess();
+            List<Record> records = SQLTask.getResult(i);
+            for (Record r : records) {
+                list.add(r.get(0).toString());
+            }
+        } catch (OdpsException e) {
+            e.printStackTrace();
+        }
+        return list.stream().map(s -> s.replace("inc_ods_", "")).collect(Collectors.toList());
+    }
+
+}

+ 0 - 68
src/main/java/com/winhc/dataworks/flow/touch/utils/YmlUtil.java

@@ -1,68 +0,0 @@
-package com.winhc.dataworks.flow.touch.utils;
-
-/**
- * @Author yyn
- * @Date 2020/6/23
- * @Description TODO
- */
-import org.yaml.snakeyaml.Yaml;
-
-import java.io.File;
-import java.io.FileReader;
-import java.io.Reader;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Set;
-
-public class YmlUtil {
-//    private static String schemaFileName="schema-file.yml";
-    private static String schemaFileName="odps-schema.yaml";
-    /**
-     * 获取yml文件中的指定字段,返回一个map
-     *
-     * @param sourcename
-     * @return
-     */
-    public static Map<String, Object> getResMap(String sourcename) {
-        return YmlInit.getMapByName(YmlInit.ymlMap, sourcename);
-    }
-    // 配置文件仅需要读取一次,读取配置文件的同时把数据保存到map中,map定义为final,仅可以被赋值一次
-    private static class YmlInit {
-        //初始化文件得到的map
-        private static final Map<String, Object> ymlMap = getYml();
-
-        // 读取配置文件,并初始化ymlMap
-        private static Map<String, Object> getYml() {
-            Yaml yml = new Yaml();
-            String path = Object.class.getResource("/").getPath().substring(1) + schemaFileName;
-            Reader reader = null;
-            try {
-                reader = new FileReader(new File(path));
-            } catch (Exception e) {
-                // TODO: handle exception
-                e.printStackTrace();
-            }
-            return yml.loadAs(reader, Map.class);
-        }
-
-        // //传入想要得到的字段
-        private static Map<String, Object> getMapByName(Map<String, Object> map, String name) {
-            Map<String, Object> maps = new HashMap<String, Object>();
-            Set<Map.Entry<String, Object>> set = map.entrySet();
-            for (Map.Entry<String, Object> entry : set) {// 遍历map
-                Object obj = entry.getValue();
-                if (entry.getKey().equals(name))      // 递归结束条件
-                    return (Map<String, Object>) obj;
-
-                if (entry.getValue() instanceof Map) {//如果value是Map集合递归
-                    maps = getMapByName((Map<String, Object>) obj, name);
-                    if (maps == null)				   //递归的结果如果为空,继续遍历
-                        continue;
-                    return maps;					  //不为空返回
-                }
-            }
-            return null;
-        }
-    }
-
-}

+ 10 - 0
src/main/resources/odps-schema.yaml

@@ -1,4 +1,14 @@
 job:
+  #------<公司基本信息
+  - project: winhc_test
+    flow: inc_company_spark
+    task:
+      - taskName: company_inc
+        param:
+          - _nodeId: 700003381602
+            project: winhc_eci_dev
+            bizdate: 20200618
+  #------>
 #------<环保处罚
   - project: winhc_test
     flow: inc_cid_sql

+ 0 - 7
src/main/resources/schema-file.yml

@@ -1,7 +0,0 @@
-table:
-  company_env_punishment:
-    PROJECT:winhc_eci_dev
-    DIM_TABLE:company_env_punishment
-    DIM_COLUMS:id,name,department,publish_time,punish_number,punish_basis,law_break,reason,content,execution,source_url,source_flag,create_time,update_time,deleted
-    DUPLI_COLS:new_cid,source_url
-    DS:20200605

+ 11 - 0
src/main/resources/task-xjk.yaml

@@ -0,0 +1,11 @@
+job:
+  #------<公司基本信息
+  - project: winhc_test
+    flow: inc_company_spark
+    task:
+      - taskName: company_inc
+        param:
+          - _nodeId: 700003381602
+            project: winhc_eci_dev
+            bizdate: 20200618
+  #------>