Browse Source

Merge branch 'master' of http://139.224.213.4:3000/xjk/winhc-bigdata-task

JimZhang 3 years ago
parent
commit
faeff16541

+ 1 - 0
pom.xml

@@ -214,6 +214,7 @@
                 <groupId>org.springframework.boot</groupId>
                 <artifactId>spring-boot-maven-plugin</artifactId>
             </plugin>
+
         </plugins>
     </build>
 

+ 2 - 2
src/main/java/com/winhc/bigdata/task/controller/PullDataController.java

@@ -31,8 +31,8 @@ public class PullDataController {
 
 
     @GetMapping("sql")
-    public Object sql(@RequestParam(required = false) String sign, String sqlId, String pullBy) {
+    public Object sql(@RequestParam(required = false) String sign, String sqlId, String pullBy, @RequestParam(defaultValue = "out_pull_spider_data") String toMongoDbName) {
         Assert.isTrue(SystemParams.sign.equals(sign), "验签不通过!");
-        return pullRequestRecordService.pull(sqlId, pullBy);
+        return pullRequestRecordService.pull(sqlId, toMongoDbName, pullBy);
     }
 }

+ 2 - 3
src/main/java/com/winhc/bigdata/task/controller/SqlController.java

@@ -1,7 +1,6 @@
 package com.winhc.bigdata.task.controller;
 
 import com.winhc.bigdata.task.framework.odps.service.OdpsSqlInfoService;
-import com.winhc.bigdata.task.framework.odps.vo.OdpsSqlInfoVO;
 import com.winhc.bigdata.task.framework.odps.vo.RegisterSqlVO;
 import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
@@ -25,10 +24,10 @@ public class SqlController {
     }
 
 
-    @PostMapping("save")
+  /*  @PostMapping("save")
     public Object save(@RequestBody OdpsSqlInfoVO body) {
         return odpsSqlInfoService.add(body);
-    }
+    }*/
 
     @GetMapping
     public Object get(@RequestParam String sqlId) {

+ 2 - 2
src/main/java/com/winhc/bigdata/task/controller/TestController.java

@@ -30,8 +30,8 @@ public class TestController {
     private final KafkaTemplate<String, String> kafkaTemplate;
 
     @GetMapping("pull")
-    public Object pull(String sqlId, String pullBy) {
-        return pullRequestRecordService.pull(sqlId, pullBy);
+    public Object pull(String sqlId,String toMongoDbName, String pullBy) {
+        return pullRequestRecordService.pull(sqlId,toMongoDbName, pullBy);
     }
 
 

+ 4 - 3
src/main/java/com/winhc/bigdata/task/framework/odps/ExportDataCenter.java

@@ -53,14 +53,15 @@ public class ExportDataCenter {
                 .list();
         for (PullRequestRecord record : list) {
             String sqlId = record.getSqlId();
+            String toMongoDbName = record.getToMongoDbName();
             String operatorName = record.getPullBy();
             log.info("拉取数据人:{} 拉取sqlId:{}", operatorName, sqlId);
-            pullDetail(sqlId, operatorName);
+            pullDetail(sqlId,toMongoDbName, operatorName);
         }
     }
 
 
-    private void pullDetail(String sqlId, String operatorName) {
+    private void pullDetail(String sqlId,String toMongoDbName, String operatorName) {
         List<OdpsResultInfo> list = odpsResultInfoService.lambdaQuery()
                 .eq(OdpsResultInfo::getSqlId, sqlId)
                 .eq(OdpsResultInfo::getPartitionTaskStatus, TaskStatusEnum.生成完毕)
@@ -80,7 +81,7 @@ public class ExportDataCenter {
                     .isNull(PullRequestRecord::getPullInstanceId)
                     .one();
 
-            String exportInstanceId = exportFlowUtils.export(odpsResultInfo.getPartitionName()) + "";
+            String exportInstanceId = exportFlowUtils.export(odpsResultInfo.getPartitionName(),toMongoDbName) + "";
 
             log.info("启动导数据任务:{} {}", odpsResultInfo.getTableName(), odpsResultInfo.getPartitionName());
             one.setPullInstanceId(exportInstanceId);

+ 2 - 0
src/main/java/com/winhc/bigdata/task/framework/odps/entity/PullRequestRecord.java

@@ -28,6 +28,8 @@ public class PullRequestRecord implements Serializable {
 
     private Long odpsResultInfoId;
 
+    private String toMongoDbName;
+
     /**
      * 导出数据的实例id
      */

+ 1 - 1
src/main/java/com/winhc/bigdata/task/framework/odps/service/PullRequestRecordService.java

@@ -16,6 +16,6 @@ public interface PullRequestRecordService extends IService<PullRequestRecord> {
      * @param pullBy
      * @return
      */
-    Object pull(String sqlId, String pullBy);
+    Object pull(String sqlId,String toMongoDbName, String pullBy);
 
 }

+ 13 - 6
src/main/java/com/winhc/bigdata/task/framework/odps/service/impl/OdpsSqlInfoServiceImpl.java

@@ -75,25 +75,31 @@ public class OdpsSqlInfoServiceImpl extends ServiceImpl<OdpsSqlInfoMapper, OdpsS
     @Override
     public OdpsSqlInfoVO registerSql(RegisterSqlVO registerSqlVO) {
         String tn = registerSqlVO.getTn();
+        String allSql = null;
+        if (StringUtils.isEmpty(tn)) {
+            allSql = registerSqlVO.getSql();
+        } else {
+            checkColumns(registerSqlVO.getColumns(), tn);
+            allSql = odpsSchemaUtils.getAllSql(tn, "winhc_ng", "ads", registerSqlVO.getWhere());
+        }
+        Assert.isTrue(StringUtils.isNotEmpty(allSql), "注册失败!");
+
         String json = registerSqlVO.getColumns().stream().map(r -> "\"" + r + "\"," + r).collect(Collectors.joining(","));
         Integer partitionNum = registerSqlVO.getPartitionNum();
         String registerBy = registerSqlVO.getRegisterBy();
 
-        checkColumns(registerSqlVO.getColumns(), tn);
-
-        String allSql = odpsSchemaUtils.getAllSql(tn, "winhc_ng", "ads", registerSqlVO.getWhere());
-
+        String finalAllSql = allSql;
         Map<String, String> map = new HashMap<String, String>(12) {
             {
                 put("maps", json);
                 put("partitionNum", partitionNum + "");
-                put("allTab", allSql);
+                put("allTab", finalAllSql);
             }
         };
 
         List<SortVO> sort = registerSqlVO.getSort();
         String orderBy = "";
-        if (!sort.isEmpty()) {
+        if (sort != null && !sort.isEmpty()) {
             orderBy = sort.stream().map(r -> {
                 if (r.getAse()) {
                     return r.getField() + " ase";
@@ -106,6 +112,7 @@ public class OdpsSqlInfoServiceImpl extends ServiceImpl<OdpsSqlInfoMapper, OdpsS
 
         String sql = StrUtil.format(sqlTemp, map);
         OdpsSqlInfoVO odpsSqlInfoVO = OdpsSqlInfoVO.builder()
+                .id(registerSqlVO.getId())
                 .sql(sql)
                 .outputTab("out_pull_spider_data")
                 .createBy(registerBy)

+ 2 - 1
src/main/java/com/winhc/bigdata/task/framework/odps/service/impl/PullRequestRecordServiceImpl.java

@@ -27,7 +27,7 @@ public class PullRequestRecordServiceImpl extends ServiceImpl<PullRequestRecordM
 
 
     @Override
-    public synchronized Object pull(String sqlId, String pullBy) {
+    public synchronized Object pull(String sqlId, String toMongoDbName, String pullBy) {
         Assert.isTrue(odpsSqlInfoService.check(sqlId), "sql id 不存在!请先注册函数 ");
         List<PullRequestRecord> list = lambdaQuery()
                 .eq(PullRequestRecord::getSqlId, sqlId)
@@ -37,6 +37,7 @@ public class PullRequestRecordServiceImpl extends ServiceImpl<PullRequestRecordM
             return "已经有任务在导出中... pull id:" + list.stream().map(PullRequestRecord::getId).map(r -> r + "").collect(Collectors.joining(","));
         }
         PullRequestRecord build = PullRequestRecord.builder()
+                .toMongoDbName(toMongoDbName)
                 .sqlId(sqlId)
                 .pullBy(pullBy)
                 .partitionTaskStatus(TaskStatusEnum.等待中)

+ 4 - 2
src/main/java/com/winhc/bigdata/task/framework/odps/utils/ExportFlowUtils.java

@@ -23,7 +23,8 @@ public class ExportFlowUtils {
     private static final String task = "base";
     private final Map<String, DataWorksFlowTask> map;
     private final DingTalkService dingTalkService;
-    public Long export(String ds) {
+
+    public Long export(String ds, String toMongoDbName) {
         String beanName = project + ":" + flow + ":" + task;
         String yesterday = DateUtils.getYesterday();
 
@@ -31,9 +32,10 @@ public class ExportFlowUtils {
 
         Map<String, String> map = new HashMap<>();
         map.put("ds", ds);
+        map.put("toMongoDbName", toMongoDbName);
         Long touch = dataWorksService.touch(project, flow, yesterday, dataWorksFlowTask, map);
         dingTalkService.info("触发运维中心手动业务流程:{}  dag-id:{}; request body:{}", beanName, touch, map);
-        return touch ;
+        return touch;
 
     }
 

+ 8 - 2
src/main/java/com/winhc/bigdata/task/framework/odps/vo/RegisterSqlVO.java

@@ -12,10 +12,16 @@ import java.util.List;
 @Data
 @Builder
 public class RegisterSqlVO {
+    private String id;
+
     private String tn;
+    private String where;
+
+    private String sql;
+
     private List<String> columns;
     private Integer partitionNum;
-    private String registerBy;
     private List<SortVO> sort;
-    private String where;
+
+    private String registerBy;
 }