Parcourir la source

58数据推送

xufei il y a 10 mois
Parent
commit
a88fd90267

+ 47 - 0
src/main/java/com/winhc/data/push/common/Constant.java

@@ -1,6 +1,9 @@
 package com.winhc.data.push.common;
 
 
+import java.util.Arrays;
+import java.util.List;
+
 /**
  * @author π
  * @Description:
@@ -58,4 +61,48 @@ public class Constant {
 
     public static final String change_summary_head = "自有企业id@公司名称@注册号@统一信用代码@维度名称@发送时间@变更时间";
 
+
+    /**
+     * 58同城输出维度
+     */
+    public static List<String> tc_tns = Arrays.asList("company", "company_holder", "company_staff", "private_enterprise", "finance_info_v3", "company_liquidating_info",
+            "company_change", "company_annual_report", "company_annual_report_out_investment", "company_annual_report_out_guarantee",
+            "company_annual_report_change", "company_annual_report_equity_change", "company_annual_report_holder", "company_annual_report_webinfo",
+            "company_annual_report_social_security", "company_dishonest_info", "company_zxr", "company_zxr_restrict", "company_zxr_final_case",
+            "company_send_announcement", "wenshu_detail_v2", "company_court_open_announcement", "company_court_register", "company_court_announcement",
+            "company_judicial_assistance", "auction_tracking", "zxr_evaluate_results", "zxr_evaluate", "restrictions_on_exit", "litigation_mediation",
+            "bankruptcy_open_case", "bankruptcy_open_announcement", "bankruptcy_judgment_document", "company_punishment_info_creditchina", "company_abnormal_info",
+            "company_equity_info", "company_mortgage_info", "company_mortgage_pawn", "company_mortgage_change", "company_mortgage_people",
+            "company_env_punishment", "company_tax_contravention", "company_own_tax", "company_illegal_info", "company_brief_cancel_announcement",
+            "company_brief_cancel_announcement_objection", "company_brief_cancel_announcement_result", "company_equity_pledge", "company_ipr_pledge",
+            "cancellation_announcement", "company_bid_new", "company_employment", "company_certificate", "company_customs_credit",
+            "company_customs_credit_administrative_penalty", "company_customs_credit_rating", "company_license_creditchina_new", "company_land_publicity",
+            "company_land_announcement", "company_land_transfer", "company_land_mortgage", "company_tax", "company_finance", "company_check_info",
+            "property_rights_transaction", "company_double_random_check_info", "company_double_random_check_result_info", "company_tm", "company_tm_notice",
+            "company_tm_step", "company_tm_goods", "company_patent_new", "company_copyright_reg", "company_copyright_works", "company_icp",
+            "construction_qualification", "construction_person", "construction_project"
+            ,"company_holder_sponsor",
+            "company_app_info",
+            "company_bond",
+            "company_license",
+            "company_public_announcement",
+            "company_stock",
+            "company_team_member",
+            "company_tele_license",
+            "company_tele_license_annual_report",
+            "company_tele_license_communication_badness",
+            "company_wechat",
+            "company_weibo",
+            "organization",
+            "organization_company_relation",
+            "organization_invest",
+            "product",
+            "product_competition");
+
+
+    public static final String show_tn_sql = ("SHOW PARTITIONS bds_data_out_tc PARTITION (ds = '@ds') ;");
+
+    public static final String TC_BUCKET_NAME = ("data-exchange-out-tc");
+    public static final String TC_OSS_PRE = ("data/tc");
+
 }

+ 1 - 1
src/main/java/com/winhc/data/push/common/TaskEnum.java

@@ -3,7 +3,7 @@ package com.winhc.data.push.common;
 public class TaskEnum {
 
     public enum STATUS  {
-        CREATE("create"), ERROR("error"),SUCCESS("success"), RUNNING("running");
+        CREATE("create"), ERROR("error"),SUCCESS("success"), RUNNING("running"), TASK_ERROR("task_error");
 
         public final String code;
 

+ 3 - 0
src/main/java/com/winhc/data/push/configuration/ProjectParamInit.java

@@ -27,6 +27,9 @@ public class ProjectParamInit implements BeanFactoryPostProcessor {
     public static final String exportOssFlowName = "push_data_as_to_oss";
     public static final String exportOssBeanName = "winhc_ng:push_data_as_to_oss:oss_auction_tracking_push_ads";
 
+    public static final String exportOssFlowNameTC = "push_data_tc_to_oss";
+    public static final String exportOssBeanNameTC = "winhc_ng:push_data_tc_to_oss:oss_all_data_push_ads";
+
     /**
      * @param beanFactory
      * @throws BeansException

+ 3 - 0
src/main/java/com/winhc/data/push/constant/BaseParam.java

@@ -14,5 +14,8 @@ public class BaseParam {
 
     //安硕任务状态表
     public static final String SYN_ANSHUO_UPLOAD_TASKS = "syn_anshuo_upload_tasks";
+    //58同城数据状态表
+    public static final String SYN_TC_UPLOAD_TASKS = "syn_tc_upload_tasks";
+    public static final String SYN_TC_ID_SUF = "tc";
 
 }

+ 88 - 0
src/main/java/com/winhc/data/push/controller/TcPushDataController.java

@@ -0,0 +1,88 @@
+package com.winhc.data.push.controller;
+
+import cn.hutool.core.lang.Assert;
+import com.aliyun.openservices.shade.org.apache.commons.lang3.StringUtils;
+import com.winhc.data.push.common.TaskEnum;
+import com.winhc.data.push.constant.BaseParam;
+import com.winhc.data.push.service.SynDataService;
+import com.winhc.data.push.utils.BaseUtils;
+import com.winhc.data.push.vo.ResponseVo;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.bson.Document;
+import org.jetbrains.annotations.NotNull;
+import org.springframework.web.bind.annotation.*;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+
+import static com.winhc.data.push.constant.BaseParam.SYN_ANSHUO_UPLOAD_TASKS;
+import static com.winhc.data.push.constant.BaseParam.SYN_TC_UPLOAD_TASKS;
+import static com.winhc.data.push.utils.BaseUtils.getObjectId;
+
+
+/**
+ * @author π
+ * @Description:推送数据
+ * @date 2022/7/20 16:24
+ */
+@Slf4j
+@AllArgsConstructor
+@RestController
+@Api(tags = "任务触发tc", value = "task/tc")
+@RequestMapping("task/tc")
+public class TcPushDataController {
+
+    private final SynDataService synDataService;
+
+
+    @ApiOperation(value = "触发任务")
+    @GetMapping("push/{ds}")
+    public ResponseVo touch(@PathVariable String ds, @RequestParam String tns, @RequestParam String sign) {
+        //日期是昨天之前数据
+        List<String> out_tns = getList(ds, tns, sign);
+        long start = System.currentTimeMillis();
+        try {
+            String re = synDataService.saveTask(SYN_TC_UPLOAD_TASKS, out_tns, ds);
+            return ResponseVo.success(start, re);
+        } catch (Exception e) {
+            log.error(e.getMessage(), e);
+            return ResponseVo.failure(start, e.getMessage());
+        }
+    }
+
+    @ApiOperation(value = "强制触发任务")
+    @GetMapping("repush/{ds}")
+    public ResponseVo reTouch(@PathVariable String ds, @RequestParam String tns, @RequestParam String sign) {
+        List<String> out_tns = getList(ds, tns, sign);
+
+        long start = System.currentTimeMillis();
+        try {
+            Document d = new Document();
+            d.put("update_time", new Date());
+            d.put("status", "create");
+            d.put("ds", ds);
+            d.put("tns", out_tns);
+            Boolean re = synDataService.updateTaskStatus(SYN_TC_UPLOAD_TASKS, d, getObjectId(ds));
+            return ResponseVo.success(start, re ? "restart" : "error");
+        } catch (Exception e) {
+            log.error(e.getMessage(), e);
+            return ResponseVo.failure(start, e.getMessage());
+        }
+    }
+
+    @NotNull
+    private static List<String> getList(String ds, String tns, String sign) {
+        Assert.isTrue(BaseUtils.verifyDate(ds), "ds 不合法");
+        Assert.isTrue(BaseParam.sign.equals(sign), "验签不通过!");
+        Assert.isTrue(StringUtils.isNotBlank(tns), "参数不能为空!");
+        List<String> out_tns = tns.equals("all") ? new ArrayList<>() : Arrays.asList(tns.split(","));
+        return out_tns;
+    }
+
+
+}

+ 2 - 0
src/main/java/com/winhc/data/push/service/DataWorksService.java

@@ -5,6 +5,7 @@ import com.aliyuncs.dataworks_public.model.v20180601.CreateManualDagResponse;
 import com.winhc.data.push.bean.DataWorksFlowTask;
 import com.winhc.data.push.bean.TaskFlowEnum;
 import com.winhc.data.push.bean.TaskParam;
+import com.winhc.data.push.configuration.ProjectParamInit;
 
 import java.util.Map;
 
@@ -17,6 +18,7 @@ public interface DataWorksService {
      CreateManualDagResponse export2Mongo(String ds);
 
      CreateManualDagResponse export2OSS(String ds);
+     CreateManualDagResponse export2OSS(String ds, String flowName, String bizDate, DataWorksFlowTask dataWorksFlowTask, Map<String, String> otherParams);
 
      CreateManualDagResponse touch(TaskParam task);
 

+ 14 - 0
src/main/java/com/winhc/data/push/service/OdpsService.java

@@ -0,0 +1,14 @@
+package com.winhc.data.push.service;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author π
+ * @Description:
+ * @date 2022/7/21 18:17
+ */
+public interface OdpsService {
+    List<String> query(String sql,String ds);
+
+}

+ 9 - 0
src/main/java/com/winhc/data/push/service/SynDataService.java

@@ -13,10 +13,16 @@ import java.util.List;
 public interface SynDataService {
     void push(String tn, String ds);
 
+    void pushTc(List<String> tns, String ds);
+    void pushTc(String ds);
+
+
     void sendMessage(String message) throws Exception;
 
     String saveTask(String tn, String ds);
 
+    String saveTask(String collectionName,List<String> tns, String ds);
+
     Boolean findTaskExists(Document doc);
 
     Boolean findTaskExists(String collectionName, Document doc);
@@ -29,8 +35,11 @@ public interface SynDataService {
 
     Boolean updateTaskStatus(String collectionName, String ds, String status);
 
+    Boolean updateTaskStatus(String collectionName, String ds, String status,String id);
+    Boolean updateTaskStatus(String collectionName, Document d, String id);
     String uploadTask(String ds);
 
     void uploadToOss(String ds) throws FileNotFoundException;
+    void saveObject(String ds, String targetBucketName, String ossPre);
 
 }

+ 7 - 1
src/main/java/com/winhc/data/push/service/TouchService.java

@@ -1,6 +1,7 @@
 package com.winhc.data.push.service;
 
-import com.aliyuncs.dataworks_public.model.v20180601.CreateManualDagResponse;
+import java.util.List;
+import java.util.Map;
 
 /**
  * @author π
@@ -9,9 +10,14 @@ import com.aliyuncs.dataworks_public.model.v20180601.CreateManualDagResponse;
  */
 public interface TouchService {
     Boolean query(String projectName, Long dagId);
+    Boolean  queryList(String projectName, Map<Long, String> ids);
 
     Boolean export2Mongo(String  project,String  ds);
 
     Boolean export2OSS(String  project,String  ds);
 
+    Boolean export2OSSTc(String  project, String  ds);
+
+    Boolean export2OSSTc(String  project, String  ds,List<String> tns);
+
 }

+ 4 - 0
src/main/java/com/winhc/data/push/service/impl/DataWorksServiceImpl.java

@@ -47,6 +47,10 @@ public class DataWorksServiceImpl implements DataWorksService {
         return touch(ProjectParamInit.project, ProjectParamInit.exportOssFlowName, DateUtils.parseDate(ds), dataWorksFlowTaskMap.get(ProjectParamInit.exportOssBeanName), new HashMap<>());
     }
 
+    public CreateManualDagResponse export2OSS(String ds, String flowName, String bizDate, DataWorksFlowTask dataWorksFlowTask, Map<String, String> otherParams) {
+        return touch(ProjectParamInit.project, flowName, bizDate, dataWorksFlowTask, otherParams);
+    }
+
     @Override
     public CreateManualDagResponse touch(String project, String flowName, String bizDate, DataWorksFlowTask dataWorksFlowTask, Map<String, String> otherParams) {
         TaskParam build = TaskParam.builder()

+ 48 - 0
src/main/java/com/winhc/data/push/service/impl/OdpsServiceImpl.java

@@ -0,0 +1,48 @@
+package com.winhc.data.push.service.impl;
+
+import cn.cocowwy.dingtalk.DingTalkGroupApi;
+import com.aliyun.odps.Instance;
+import com.aliyun.odps.Odps;
+import com.aliyun.odps.data.Record;
+import com.aliyun.odps.task.SQLTask;
+import com.winhc.data.push.service.DingTalkService;
+import com.winhc.data.push.service.OdpsService;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * @author π
+ * @Description:
+ * @date 2022/7/24 19:08
+ */
+@Slf4j
+@Service
+@AllArgsConstructor
+public class OdpsServiceImpl implements OdpsService {
+
+    private final Odps odps;
+
+    @Override
+    public List<String> query(String sql,String ds) {
+        Instance i;
+        try {
+            i = SQLTask.run(odps, sql.replaceAll("@ds",ds));
+            i.waitForSuccess();
+            List<Record> records = SQLTask.getResult(i);
+            return records.
+                    stream()
+                    .map(x -> x.get(0).toString().split("tn=")[1])
+                    .distinct()
+                    .collect(Collectors.toList());
+        } catch (Exception e) {
+            e.printStackTrace();
+            return new ArrayList<>();
+        }
+    }
+}

+ 103 - 2
src/main/java/com/winhc/data/push/service/impl/SynDataServiceImpl.java

@@ -1,15 +1,16 @@
 package com.winhc.data.push.service.impl;
 
 import cn.cocowwy.dingtalk.DingTalkGroupApi;
+import com.alibaba.fastjson.JSON;
 import com.aliyun.openservices.shade.org.apache.commons.lang3.StringUtils;
 import com.aliyun.oss.OSS;
 import com.aliyun.oss.model.CopyObjectResult;
 import com.aliyun.oss.model.PutObjectRequest;
 import com.mongodb.client.FindIterable;
 import com.mongodb.client.MongoDatabase;
-import com.mongodb.client.model.Filters;
 import com.mongodb.client.model.FindOneAndUpdateOptions;
 import com.mongodb.client.model.UpdateOptions;
+import com.winhc.data.push.common.Constant;
 import com.winhc.data.push.common.TaskEnum;
 import com.winhc.data.push.configuration.OSSAccessProperties;
 import com.winhc.data.push.framework.MongoDbFastScan;
@@ -24,6 +25,7 @@ import org.springframework.stereotype.Service;
 
 import java.io.ByteArrayInputStream;
 import java.io.File;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Date;
 import java.util.List;
@@ -33,7 +35,11 @@ import java.util.stream.StreamSupport;
 
 import static cn.hutool.crypto.SecureUtil.md5;
 import static com.mongodb.client.model.Filters.*;
+import static com.winhc.data.push.common.Constant.*;
 import static com.winhc.data.push.constant.BaseParam.*;
+import static com.winhc.data.push.utils.BaseUtils.getObjectId;
+import static com.winhc.data.push.utils.BaseUtils.getPrcessDate;
+import static com.winhc.data.push.utils.OssUtils.listKeys;
 import static com.winhc.data.push.utils.TransToExcelNewCompany.dataPathPrefix;
 import static com.winhc.data.push.utils.TransToExcelNewCompany.run;
 
@@ -87,6 +93,44 @@ public class SynDataServiceImpl implements SynDataService {
     }
 
     @Override
+    public void pushTc(String ds) {
+        this.pushTc(null, ds);
+    }
+
+
+    /**
+     * 同城数据输出
+     *
+     * @param tns
+     * @param ds
+     */
+    @Override
+    public void pushTc(List<String> tns, String ds) {
+        String id = getObjectId(ds);
+        try {
+            updateTaskStatus(SYN_TC_UPLOAD_TASKS, ds, TaskEnum.STATUS.RUNNING.code, id);
+            sendMessage("task running !!! " + id);
+            String project = "winhc_ng";
+            if (touchService.export2OSSTc(project, ds, tns)) {
+                saveObject(ds, TC_BUCKET_NAME, TC_OSS_PRE);
+                updateTaskStatus(SYN_TC_UPLOAD_TASKS, ds, TaskEnum.STATUS.SUCCESS.code, id);
+                sendMessage("导出 任务 成功 !!! " + id);
+            } else {
+                log.error("导出OSS 失败 !!!");
+                sendMessage("导出 OSS 失败!!! " + id);
+                updateTaskStatus(SYN_TC_UPLOAD_TASKS, ds, TaskEnum.STATUS.ERROR.code, id);
+            }
+
+        } catch (Exception e) {
+            sendMessage("任务失败!!! " + id + " |  error " + e.getMessage());
+            updateTaskStatus(SYN_TC_UPLOAD_TASKS, ds, TaskEnum.STATUS.TASK_ERROR.code, id);
+            e.printStackTrace();
+            log.error("export error: {}", e.getMessage());
+        }
+    }
+
+
+    @Override
     public String saveTask(String tn, String ds) {
         String id = tn + "_" + ds;
         if (findTaskExists(new Document("_id", id))) {
@@ -106,6 +150,28 @@ public class SynDataServiceImpl implements SynDataService {
     }
 
     @Override
+    public String saveTask(String collectionName, List<String> tns, String ds) {
+        String id = getObjectId(ds);
+        if (findTaskExists(collectionName, new Document("_id", id))) {
+            return "task repeat";
+        } else {
+            if (tns == null || tns.size() == 0) {
+                tns = new ArrayList<>();
+            }
+            Document d = new Document();
+            d.put("_id", id);
+            d.put("create_time", new Date());
+            d.put("update_time", new Date());
+            d.put("status", "create");
+            d.put("tns", tns);
+            d.put("ds", ds);
+            mongoTemplate.getCollection(collectionName).insertOne(d);
+            return "task create success";
+        }
+
+    }
+
+    @Override
     public void sendMessage(String message) {
         try {
             dingTalkApi.sendTextByPhones("data_push_message", "winhc\n" + message, Collections.singletonList("17602140784"));
@@ -131,6 +197,7 @@ public class SynDataServiceImpl implements SynDataService {
         return StreamSupport.stream(it.spliterator(), false).collect(Collectors.toList());
     }
 
+
     @Override
     public List<Document> findAllTask(String collectionName, Document doc) {
         FindIterable<Document> it = mongoTemplate.getCollection(collectionName).find(doc);
@@ -153,12 +220,25 @@ public class SynDataServiceImpl implements SynDataService {
 
     @Override
     public Boolean updateTaskStatus(String collectionName, String ds, String status) {
+        return this.updateTaskStatus(collectionName, ds, status, ds);
+    }
+
+
+    @Override
+    public Boolean updateTaskStatus(String collectionName, Document d, String id) {
+        mongoTemplate.getCollection(collectionName)
+                .findOneAndUpdate(eq("_id", id), new Document("$set", d), new FindOneAndUpdateOptions().upsert(true));
+        return true;
+    }
+
+    @Override
+    public Boolean updateTaskStatus(String collectionName, String ds, String status, String id) {
         Document d = new Document();
         d.put("update_time", new Date());
         d.put("status", status);
         d.put("ds", ds);
         mongoTemplate.getCollection(collectionName)
-                .findOneAndUpdate(eq("_id", ds), new Document("$set", d), new FindOneAndUpdateOptions().upsert(true));
+                .findOneAndUpdate(eq("_id", id), new Document("$set", d), new FindOneAndUpdateOptions().upsert(true));
         return true;
     }
 
@@ -247,4 +327,25 @@ public class SynDataServiceImpl implements SynDataService {
         ossClient.putObject(putObjectRequest);
 
     }
+
+    @Override
+    public void saveObject(String ds, String targetBucketName, String ossPre) {
+        String prcessDate = getPrcessDate(ds);
+        String pre = ossPre + "/" + prcessDate + "/";
+        List<String> keys = listKeys(targetBucketName, pre);
+        List<String> filters_keys = keys.stream()
+                .filter(k -> tc_tns.contains(getTn(k)))
+                .distinct()
+                .collect(Collectors.toList());
+        String sum = JSON.toJSONString(filters_keys);
+        String targetKey = pre + "out_tc.success";
+        PutObjectRequest putObjectRequest = new PutObjectRequest(targetBucketName, targetKey, new ByteArrayInputStream(sum.getBytes()));
+        ossClient.putObject(putObjectRequest);
+
+    }
+
+    public String getTn(String key) {
+        String re = key.substring(0, key.lastIndexOf("/"));
+        return re.substring(re.lastIndexOf("/") + 1);
+    }
 }

+ 123 - 0
src/main/java/com/winhc/data/push/service/impl/TouchServiceImpl.java

@@ -1,9 +1,13 @@
 package com.winhc.data.push.service.impl;
 
 import com.winhc.data.push.bean.*;
+import com.winhc.data.push.common.Constant;
+import com.winhc.data.push.configuration.ProjectParamInit;
 import com.winhc.data.push.service.DataWorksService;
 import com.winhc.data.push.service.DingTalkService;
+import com.winhc.data.push.service.OdpsService;
 import com.winhc.data.push.service.TouchService;
+import com.winhc.data.push.utils.BaseUtils;
 import com.winhc.data.push.utils.DateUtils;
 import lombok.AllArgsConstructor;
 import lombok.SneakyThrows;
@@ -15,6 +19,9 @@ import java.time.LocalDateTime;
 import java.util.*;
 import java.util.stream.Collectors;
 
+import static com.winhc.data.push.common.Constant.TC_OSS_PRE;
+import static com.winhc.data.push.utils.BaseUtils.getPrcessDate;
+
 /**
  * @author π
  * @Description:
@@ -27,6 +34,9 @@ public class TouchServiceImpl implements TouchService {
 
     private final DataWorksService dataWorksService;
     private final DingTalkService dingTalkService;
+    private final Map<String, DataWorksFlowTask> dataWorksFlowTaskMap;
+
+    private final OdpsService odpsService;
 
     @Override
     public Boolean export2Mongo(String project, String ds) {
@@ -38,6 +48,119 @@ public class TouchServiceImpl implements TouchService {
         return query(project, dataWorksService.export2OSS(ds).getReturnValue());
     }
 
+    /**
+     * 58数据导出
+     *
+     * @param project
+     * @param ds
+     * @return
+     */
+    @Override
+    public Boolean export2OSSTc(String project, String ds) {
+        return this.export2OSSTc(project, ds, null);
+    }
+
+    @Override
+    public Boolean export2OSSTc(String project, String ds, List<String> tns) {
+        String prcessDate = getPrcessDate(ds);
+        //当日分区数据
+        List<String> ds_tns = odpsService.query(Constant.show_tn_sql, ds);
+        //需求输出数据
+        List<String> out_tns = ds_tns.stream()
+                .filter(tn -> Constant.tc_tns.contains(tn))
+                .distinct()
+                .collect(Collectors.toList());
+        //取交集
+        if (tns != null && tns.size() > 0) {
+            out_tns.retainAll(tns);
+        }
+        Map<Long, String> task_ids = out_tns.stream().map(tn -> {
+            Map<String, String> otherParams = new HashMap<>();
+            otherParams.put("tn", tn);
+            otherParams.put("ossPrefix", TC_OSS_PRE);
+            otherParams.put("prcessDate", prcessDate);
+            otherParams.put("detail", BaseUtils.getDetailName(ds, tn));
+            Long returnValue = dataWorksService.export2OSS(ds, ProjectParamInit.exportOssFlowNameTC,
+                    DateUtils.parseDate(ds),
+                    dataWorksFlowTaskMap.get(ProjectParamInit.exportOssBeanNameTC),
+                    otherParams).getReturnValue();
+            return Arrays.asList(returnValue, tn);
+        }).distinct().collect(Collectors.toMap(c -> Long.parseLong(c.get(0).toString()), c -> c.get(1).toString(), (c, v) -> v));
+
+        return queryList(project, task_ids);
+    }
+
+    @SneakyThrows
+    @Override
+    public Boolean queryList(String projectName, Map<Long, String> task_ids_maps) {
+        LocalDateTime start = LocalDateTime.now();
+        int totalTask = task_ids_maps.size();
+        int i = 0;
+        int successTask = 0;
+        int failedTask = 0;
+        ArrayList<Long> hours = new ArrayList<>();
+        while (true) {
+            int awaitTask = 0;
+            boolean flag;
+
+            for (Long id : task_ids_maps.keySet()) {
+                Map<String, TaskFlowEnum> query = dataWorksService.query(projectName, id);
+                Map<String, List<String>> status = query.entrySet().stream()
+                        .map(e -> {
+                            Entry<String, String> entry = new Entry<>();
+                            if (e.getValue().equals(TaskFlowEnum.SUCCESS) || e.getValue().equals(TaskFlowEnum.FAILURE)) {
+                                entry.setKey(e.getValue().getMsg());
+                            } else {
+                                entry.setKey("运行中");
+                            }
+                            entry.setValue(e.getKey());
+                            return entry;
+                        }).collect(Collectors.groupingBy(Entry::getKey, Collectors.mapping(Entry::getValue, Collectors.toList())));
+                List<String> failure = status.getOrDefault(TaskFlowEnum.FAILURE.getMsg(), new ArrayList<>());
+                List<String> await = status.getOrDefault(TaskFlowEnum.RUNNING.getMsg(), new ArrayList<>());
+
+                if (failure.size() != 0) {
+                    failedTask++;
+                    log.error("导出任务失败 tn : {} , id : {}", task_ids_maps.getOrDefault(id, null), id);
+                    dingTalkService.sendMessage("step1 导出数据报错, \n tn: " + task_ids_maps.getOrDefault(id, null) + " !!!!!!");
+                } else {
+                    if (await.size() != 0) {
+                        awaitTask++;
+                        LocalDateTime now = LocalDateTime.now();
+                        Duration duration = Duration.between(start, now);
+                        long hour = duration.toHours();
+                        if (hour >= 1 && !hours.contains(hour)) {
+                            i++;
+                            hours.add(hour);
+                            log.info("导出数据阻塞告警 !!!!!!");
+                            dingTalkService.sendMessage("导出数据阻塞告警, \n tn: " + task_ids_maps.getOrDefault(id, null) + " !!!!!!");
+                        }
+                    } else {
+                        successTask++;
+                    }
+                }
+            }
+
+            flag = awaitTask == 0 || failedTask != 0;
+            //超时退出
+            if (DateUtils.getCurrentHours() >= 22 || i > 6) {
+                dingTalkService.sendMessage("导出数据超时, 退出 !!!!!!");
+                log.info("超时程序主动退出 !!!!!!");
+                return false;
+            }
+            log.info("\nawait task:{}\ntotal task:{}\nsuccess task:{}\nfailure task:{}\n"
+                    , awaitTask
+                    , totalTask
+                    , totalTask - awaitTask - failedTask
+                    , failedTask
+            );
+            if (flag) {
+                break;
+            }
+            Thread.sleep(20 * 000);
+        }
+        return true;
+    }
 
     @SneakyThrows
     @Override

+ 52 - 0
src/main/java/com/winhc/data/push/task/TcPushDataTask.java

@@ -0,0 +1,52 @@
+package com.winhc.data.push.task;
+
+
+import com.aliyun.openservices.shade.org.apache.commons.lang3.StringUtils;
+import com.winhc.data.push.common.TaskEnum;
+import com.winhc.data.push.service.SynDataService;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.bson.Document;
+import org.springframework.scheduling.annotation.EnableScheduling;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+import java.io.FileNotFoundException;
+import java.util.List;
+
+import static com.winhc.data.push.constant.BaseParam.SYN_ANSHUO_UPLOAD_TASKS;
+import static com.winhc.data.push.constant.BaseParam.SYN_TC_UPLOAD_TASKS;
+
+/**
+ * @author π
+ * @Description:58同城触发任务
+ * @date 2021/6/22 17:07
+ */
+
+@Component
+@Slf4j
+@EnableScheduling
+@AllArgsConstructor
+public class TcPushDataTask {
+
+    private final SynDataService synDataService;
+
+    @Scheduled(cron = "*/10 * * * * ?")
+    //@Scheduled(cron = "0 /2 * * * ? ")
+    public synchronized void start() {
+        log.info("start " + this.getClass() + " !!! ");
+        //获取运行任务
+        List<Document> allTask = synDataService.findAllTask(SYN_TC_UPLOAD_TASKS, new Document("status", "create"));
+        allTask.forEach(task -> {
+            String ds = task.getString("ds");
+            List<String> tns = task.getList("tns", String.class);
+            if (StringUtils.isNotBlank(ds)) {
+                synDataService.pushTc(tns, ds);
+            }
+        });
+        log.info("stop " + this.getClass() + " !!! ");
+
+    }
+
+
+}

+ 22 - 0
src/main/java/com/winhc/data/push/utils/BaseUtils.java

@@ -2,6 +2,8 @@ package com.winhc.data.push.utils;
 
 import com.aliyun.openservices.shade.org.apache.commons.lang3.StringUtils;
 
+import static cn.hutool.crypto.SecureUtil.md5;
+import static com.winhc.data.push.constant.BaseParam.SYN_TC_ID_SUF;
 import static com.winhc.data.push.utils.DateUtils.*;
 
 /**
@@ -22,7 +24,27 @@ public class BaseUtils {
         return ds.compareTo(now) < 0;
     }
 
+    public static final String salt = "winhc_data_push";
+
+    public static String getDetailName(String ds, String tn) {
+        assert StringUtils.isNotBlank(ds);
+        assert StringUtils.isNotBlank(tn);
+        return md5(ds + "@" + tn + "@" + salt) + ".json";
+    }
+
+    public static String getPrcessDate(String ds) {
+        assert StringUtils.isNotBlank(ds);
+        return String.valueOf(Long.parseLong(ds) + 7);
+    }
+
+    public static String getObjectId(String ds) {
+        assert StringUtils.isNotBlank(ds);
+        return ds + "_" + SYN_TC_ID_SUF;
+    }
+
     public static void main(String[] args) {
+        System.out.println(getDetailName("20220920", "product"));
+        System.out.println(getDetailName("20220921", "product"));
         System.out.println(verifyDate("20220920"));
         System.out.println(verifyDate("20220919"));
         System.out.println(verifyDate("2022-09-19"));

+ 39 - 10
src/main/java/com/winhc/data/push/utils/OssUtils.java

@@ -1,16 +1,17 @@
 package com.winhc.data.push.utils;
 
 
-import com.aliyun.oss.ClientException;
-import com.aliyun.oss.OSS;
-import com.aliyun.oss.OSSClientBuilder;
-import com.aliyun.oss.OSSException;
+import com.aliyun.oss.*;
 import com.aliyun.oss.internal.OSSHeaders;
 import com.aliyun.oss.model.*;
 
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.InputStreamReader;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
 
 /**
  * @author π
@@ -45,13 +46,7 @@ public class OssUtils {
         return new OSSClientBuilder().build(endpoint, accessKeyId, accessKeySecret);
     }
 
-    public static void main(String[] args) {
-
-        String objectName = "anshuo/tmp/20221122.zip";
-        String filePath = "D:\\tmp\\data\\20221122.zip";
 
-        uploadFile(objectName, filePath);
-    }
 
     public static void uploadFile(String objectName, String filePath) {
         OSS ossClient = null;
@@ -76,4 +71,38 @@ public class OssUtils {
         }
     }
 
+    public static List<String> listKeys(String bucket_name,String prefix) {
+        OSS ossClient = getOssClient();
+        // 构造ListObjectsRequest请求
+        ListObjectsRequest listObjectsRequest = new ListObjectsRequest(bucket_name);
+        listObjectsRequest.setMaxKeys(1000);
+        //Delimiter 设置为 “/” 时,罗列该文件夹下的文件
+        //listObjectsRequest.setDelimiter("/");
+        //Prefix 设为某个文件夹名,罗列以此 Prefix 开头的文件
+        listObjectsRequest.setPrefix(prefix);
+        ObjectListing listing = ossClient.listObjects(listObjectsRequest);
+        // 遍历所有Object:目录下的文件
+        List<String> list_keys = listing.getObjectSummaries()
+                .stream()
+                .map(OSSObjectSummary::getKey)
+                .filter(s -> s.endsWith(".json"))
+                .collect(Collectors.toList());
+        ossClient.shutdown();
+        return list_keys;
+    }
+
+
+    public static void main(String[] args) {
+
+        String objectName = "anshuo/tmp/20221122.zip";
+        String filePath = "D:\\tmp\\data\\20221122.zip";
+
+        //uploadFile(objectName, filePath);
+        List<String> res = listKeys("data-exchange-out-tc", "data/tc/20231220");
+        List<String> res1 = listKeys("data-exchange-out-tc", "data/");
+        List<String> res2 = listKeys("data-exchange-out-tc", "data/tc");
+        System.out.println(res);
+    }
+
+
 }

+ 8 - 1
src/main/resources/data-works-param.yaml

@@ -17,4 +17,11 @@ job:
             sourceTable: inc_ads_auction_tracking_anshuo_push
             ossPrefix: anshuo/auction_tracking/
 
-
+  - project: winhc_ng
+    flow: push_data_tc_to_oss
+    task:
+      - taskName: oss_all_data_push_ads
+        param:
+          - _nodeId: 700006274093
+            project: winhc_ng
+            sourceTable: bds_data_out_tc

+ 24 - 7
src/test/java/com/winhc/data/push/Demo.java

@@ -6,22 +6,24 @@ import com.aliyun.oss.OSSClientBuilder;
 import com.aliyun.oss.OSSException;
 import com.aliyun.oss.model.*;
 
+import java.util.UUID;
+
 
 public class Demo {
     public static void main(String[] args) throws Exception {
         // Endpoint以华东1(杭州)为例,其它Region请按实际情况填写。
         String endpoint = "https://oss-cn-shanghai.aliyuncs.com";
         // 阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维,请登录RAM控制台创建RAM用户。
+        //安硕key
         String accessKeyId = "LTAI5tDrbWcjKkzTnJXCrwkC";
         String accessKeySecret = "AWDvBsQc16AYMszNVRga5ANbzuLowy";
 
 
-
-//        accessKeyId = "LTAI6HKo33DbtiBI";
-//        accessKeySecret = "8FmSBhHyaz4jtyJhiimK0NGF5rBPMl";
+        accessKeyId = "LTAI6HKo33DbtiBI";
+        accessKeySecret = "8FmSBhHyaz4jtyJhiimK0NGF5rBPMl";
         // 填写Bucket名称,例如examplebucket。
-        //String bucketName = "data-exchange-as";
-        String bucketName = "bigdata-rt";
+        String bucketName = "data-exchange-as";
+        //String bucketName = "bigdata-rt";
 
         // 创建OSSClient实例。
         OSS ossClient = new OSSClientBuilder().build(endpoint, accessKeyId, accessKeySecret);
@@ -31,25 +33,40 @@ public class Demo {
             ListObjectsV2Request listObjectsV2Request = new ListObjectsV2Request(bucketName);
 
             // 设置prefix参数来获取fun目录下的文件。
-            //listObjectsV2Request.setPrefix("anshuo/auction_tracking/20220721/file/");
+            listObjectsV2Request.setPrefix("anshuo/auction_tracking/20230210/file/");
+            //listObjectsV2Request.setPrefix("bigdata/auction_tracking/679191096747/");
             //listObjectsV2Request.setPrefix("anshuo/");
             //listObjectsV2Request.setPrefix("bigdata/auction_tracking/667169871995/");
-            listObjectsV2Request.setPrefix("business-schema/");
+            //listObjectsV2Request.setPrefix("business-schema/");
 
             // 发起列举请求。
             ListObjectsV2Result result = ossClient.listObjectsV2(listObjectsV2Request);
 
             // 遍历文件。
             System.out.println("Objects:");
+            String targetPre = "anshuo/auction_tracking/egg/";
+
+            int i = 0;
             for (OSSObjectSummary objectSummary : result.getObjectSummaries()) {
                 System.out.println(objectSummary.getKey());
+                System.out.println(i++);
+                String key = objectSummary.getKey();
+                //String targetKey = targetPre + key.substring(key.lastIndexOf("/") + 1);
+                String targetKey = targetPre + UUID.randomUUID().toString();
+                CopyObjectResult copyObjectResult = ossClient.copyObject(bucketName, objectSummary.getKey(), bucketName, targetKey);
+
+                System.out.println("status code : " + copyObjectResult.getResponse().getStatusCode());
             }
 
             // 遍历commonPrefix。
             System.out.println("\nCommonPrefixes:");
+
             for (String commonPrefix : result.getCommonPrefixes()) {
                 System.out.println(commonPrefix);
+                System.out.println(i++);
+
             }
+
         } catch (OSSException oe) {
             System.out.println("Caught an OSSException, which means your request made it to OSS, "
                     + "but was rejected with an error response for some reason.");

+ 70 - 0
src/test/java/com/winhc/data/push/Demo2.java

@@ -0,0 +1,70 @@
+package com.winhc.data.push;
+
+import com.aliyun.oss.ClientException;
+import com.aliyun.oss.OSS;
+import com.aliyun.oss.OSSClientBuilder;
+import com.aliyun.oss.OSSException;
+import com.aliyun.oss.model.*;
+
+import java.util.List;
+
+
+public class Demo2 {
+    public static void main(String[] args) throws Exception {
+        // Endpoint以华东1(杭州)为例,其它Region请按实际情况填写。
+        String endpoint = "https://oss-cn-shanghai.aliyuncs.com";
+        // 阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维,请登录RAM控制台创建RAM用户。
+        //安硕key
+        String accessKeyId = "LTAI5tDrbWcjKkzTnJXCrwkC";
+        String accessKeySecret = "AWDvBsQc16AYMszNVRga5ANbzuLowy";
+
+
+
+        accessKeyId = "LTAI6HKo33DbtiBI";
+        accessKeySecret = "8FmSBhHyaz4jtyJhiimK0NGF5rBPMl";
+        // 填写Bucket名称,例如examplebucket。
+        String bucketName = "data-exchange-as";
+        //String bucketName = "bigdata-rt";
+        //10567
+
+        // 创建OSSClient实例。
+        OSS ossClient = new OSSClientBuilder().build(endpoint, accessKeyId, accessKeySecret);
+
+        int i = 0;
+        String keyPre = "anshuo/auction_tracking/20230210/file/";
+        try {
+            String nextMarker = null;
+            ObjectListing objectListing;
+
+            do {
+                objectListing = ossClient.listObjects(new ListObjectsRequest(bucketName).withMarker(nextMarker).withMaxKeys(200).withPrefix(keyPre));
+
+                List<OSSObjectSummary> sums = objectListing.getObjectSummaries();
+                for (OSSObjectSummary s : sums) {
+                    System.out.println("\t" + s.getKey());
+                    System.out.println(++i);
+                }
+
+                nextMarker = objectListing.getNextMarker();
+
+            } while (objectListing.isTruncated());
+
+        } catch (OSSException oe) {
+            System.out.println("Caught an OSSException, which means your request made it to OSS, "
+                    + "but was rejected with an error response for some reason.");
+            System.out.println("Error Message:" + oe.getErrorMessage());
+            System.out.println("Error Code:" + oe.getErrorCode());
+            System.out.println("Request ID:" + oe.getRequestId());
+            System.out.println("Host ID:" + oe.getHostId());
+        } catch (ClientException ce) {
+            System.out.println("Caught an ClientException, which means the client encountered "
+                    + "a serious internal problem while trying to communicate with OSS, "
+                    + "such as not being able to access the network.");
+            System.out.println("Error Message:" + ce.getMessage());
+        } finally {
+            if (ossClient != null) {
+                ossClient.shutdown();
+            }
+        }
+    }
+}

+ 72 - 0
src/test/java/com/winhc/data/push/Demo3.java

@@ -0,0 +1,72 @@
+package com.winhc.data.push;
+
+import com.aliyun.oss.ClientException;
+import com.aliyun.oss.OSS;
+import com.aliyun.oss.OSSClientBuilder;
+import com.aliyun.oss.OSSException;
+import com.aliyun.oss.model.*;
+
+import java.util.List;
+
+
+public class Demo3 {
+    public static void main(String[] args) throws Exception {
+        // Endpoint以华东1(杭州)为例,其它Region请按实际情况填写。
+        String endpoint = "https://oss-cn-shanghai.aliyuncs.com";
+        // 阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维,请登录RAM控制台创建RAM用户。
+        //安硕key
+        String accessKeyId = "LTAI5tDrbWcjKkzTnJXCrwkC";
+        String accessKeySecret = "AWDvBsQc16AYMszNVRga5ANbzuLowy";
+
+
+        accessKeyId = "LTAI6HKo33DbtiBI";
+        accessKeySecret = "8FmSBhHyaz4jtyJhiimK0NGF5rBPMl";
+        // 填写Bucket名称,例如examplebucket。
+        String bucketName = "data-exchange-as";
+        //String bucketName = "bigdata-rt";
+        //10567
+
+        // 创建OSSClient实例。
+        OSS ossClient = new OSSClientBuilder().build(endpoint, accessKeyId, accessKeySecret);
+
+        int i = 0;
+        String keyPre = "anshuo/auction_tracking/20230529/file/";
+        try {
+            String nextContinuationToken = null;
+            ListObjectsV2Result result = null;
+
+            // 分页列举,每次传入上次返回结果中的nextContinuationToken。
+            do {
+                ListObjectsV2Request listObjectsV2Request = new ListObjectsV2Request(bucketName).withMaxKeys(200).withPrefix(keyPre);
+                listObjectsV2Request.setContinuationToken(nextContinuationToken);
+                result = ossClient.listObjectsV2(listObjectsV2Request);
+
+                List<OSSObjectSummary> sums = result.getObjectSummaries();
+                for (OSSObjectSummary s : sums) {
+                    System.out.println("\t" + s.getKey());
+                    System.out.println(++i);
+                }
+
+                nextContinuationToken = result.getNextContinuationToken();
+
+            } while (result.isTruncated());
+
+        } catch (OSSException oe) {
+            System.out.println("Caught an OSSException, which means your request made it to OSS, "
+                    + "but was rejected with an error response for some reason.");
+            System.out.println("Error Message:" + oe.getErrorMessage());
+            System.out.println("Error Code:" + oe.getErrorCode());
+            System.out.println("Request ID:" + oe.getRequestId());
+            System.out.println("Host ID:" + oe.getHostId());
+        } catch (ClientException ce) {
+            System.out.println("Caught an ClientException, which means the client encountered "
+                    + "a serious internal problem while trying to communicate with OSS, "
+                    + "such as not being able to access the network.");
+            System.out.println("Error Message:" + ce.getMessage());
+        } finally {
+            if (ossClient != null) {
+                ossClient.shutdown();
+            }
+        }
+    }
+}

+ 38 - 2
src/test/java/com/winhc/data/push/TestPush.java

@@ -1,5 +1,7 @@
 package com.winhc.data.push;
 
+import com.aliyun.odps.Odps;
+import com.winhc.data.push.service.OdpsService;
 import com.winhc.data.push.service.SynDataService;
 import com.winhc.data.push.service.TouchService;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -8,6 +10,12 @@ import org.springframework.test.context.junit4.SpringRunner;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 
+import java.util.Arrays;
+import java.util.List;
+
+import static com.winhc.data.push.common.Constant.TC_BUCKET_NAME;
+import static com.winhc.data.push.common.Constant.TC_OSS_PRE;
+
 
 /**
  * @author π
@@ -24,19 +32,47 @@ public class TestPush {
     @Autowired
     TouchService touchService;
 
+    @Autowired
+    OdpsService odpsService;
+
+
     @Test
     public void sendKafka() {
         String ds = "20220721";
-        synDataService.push("","");
+        synDataService.push("", "");
     }
 
     @Test
     public void touchTask() {
-        touchService.export2OSS("winhc_ng","20220721");
+        touchService.export2OSS("winhc_ng", "20220721");
     }
 
     @Test
     public void touchTask1() throws Exception {
         synDataService.sendMessage("运行中...");
     }
+
+
+    @Test
+    public void touchTaskTC() {
+        String ds = "20231221";
+        touchService.export2OSSTc("winhc_ng", ds, Arrays.asList("zxr_evaluate"));
+        synDataService.saveObject(ds, TC_BUCKET_NAME, TC_OSS_PRE);
+    }
+
+    @Test
+    public void showTable() {
+
+        List<String> query = odpsService.query("SHOW PARTITIONS bds_data_out_tc PARTITION (ds = '@ds') ;", "20231225");
+        System.out.println(query);
+    }
+
+    @Test
+    public void tt() {
+        String ss = "data/tc/20231220/company_send_announcement/0a727f5343df6bc6b7861baa2998b13a.json";
+
+        String re = ss.substring(0, ss.lastIndexOf("/"));
+        String re2 = re.substring(re.lastIndexOf("/") + 1);
+        System.out.println(re2);
+    }
 }