Kaynağa Gözat

国网工商测试

xufei 4 ay önce
ebeveyn
işleme
d89db38f57

+ 52 - 0
src/main/java/com/winhc/data/push/bean/TaskRunParam.java

@@ -0,0 +1,52 @@
+package com.winhc.data.push.bean;
+
+import lombok.*;
+
+import java.util.List;
+
+/**
+ * @author π
+ * @Description:任务运行参数
+ * @date 2024/4/29 15:30
+ */
+
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class TaskRunParam {
+
+    //customer name
+    private String customer_name;
+
+    //odps table
+    private String source_table;
+
+    //上传任务表
+    private String syn_upload_tasks;
+
+    //输出数据源
+    private String datasource;
+
+    //bucket_name
+    private String bucket_name;
+
+    //bucket 前缀
+    private String oss_pre;
+
+    //file suffix
+    private String detail_suffix;
+
+    //获取 tns sql
+    private String tns_sql_pre;
+
+    //输出tns 维度
+    private List<String> tns_out;
+
+    //flow name
+    private String flow_name;
+
+    //flow name
+    private String flow_task_name;
+
+}

+ 98 - 1
src/main/java/com/winhc/data/push/common/Constant.java

@@ -129,11 +129,106 @@ public class Constant {
             , "reduce_capital_info"
     );
 
+    /**
+     * 国网工商数据输出维度
+     */
+    public static List<String> gw_company_tns = Arrays.asList(
+            "company",
+            "company_holder",
+            "company_staff",
+            "private_enterprise",
+            "finance_info_v3",
+            "company_liquidating_info",
+            "company_change",
+            "reduce_capital_info",
+            "company_annual_report",
+            "company_annual_report_out_investment",
+            "company_annual_report_out_guarantee",
+            "company_annual_report_change",
+            "company_annual_report_equity_change",
+            "company_annual_report_holder",
+            "company_annual_report_webinfo",
+            "company_annual_report_social_security",
+            "company_dishonest_info",
+            "company_zxr",
+            "company_zxr_restrict",
+            "company_zxr_final_case",
+            "company_send_announcement",
+            "wenshu_detail_v2",
+            "company_court_open_announcement",
+            "company_court_register",
+            "company_court_announcement",
+            "company_judicial_assistance",
+            "auction_tracking",
+            "zxr_evaluate_results",
+            "zxr_evaluate",
+            "restrictions_on_exit",
+            "litigation_mediation",
+            "bankruptcy_open_case",
+            "bankruptcy_open_announcement",
+            "bankruptcy_judgment_document",
+            "company_punishment_info_creditchina",
+            "company_abnormal_info",
+            "company_equity_info",
+            "company_mortgage_info",
+            "company_mortgage_pawn",
+            "company_mortgage_change",
+            "company_mortgage_people",
+            "company_env_punishment",
+            "company_tax_contravention",
+            "company_own_tax",
+            "company_illegal_info",
+            "company_brief_cancel_announcement",
+            "company_brief_cancel_announcement_objection",
+            "company_brief_cancel_announcement_result",
+            "company_equity_pledge",
+            "company_ipr_pledge",
+            "cancellation_announcement",
+            "company_bid_all",
+            "company_employment",
+            "company_certificate",
+            "company_customs_credit",
+            "company_customs_credit_administrative_penalty",
+            "company_customs_credit_rating",
+            "company_license_creditchina_new",
+            "company_land_publicity",
+            "company_land_announcement",
+            "company_land_transfer",
+            "company_land_mortgage",
+            "company_tax",
+            "company_finance",
+            "company_check_info",
+            "property_rights_transaction",
+            "company_double_random_check_info",
+            "company_double_random_check_result_info",
+            "company_tm",
+            "company_tm_notice",
+            "company_tm_step",
+            "company_tm_goods",
+            "company_patent_new",
+            "company_copyright_reg",
+            "company_copyright_works",
+            "company_icp",
+            "construction_qualification",
+            "construction_person",
+            "construction_project",
+            "participating_units",
+            "construction_bid",
+            "construction_contract",
+            "working_drawing",
+            "completion_acceptance",
+            "construction_project_detail",
+            "construction_permit"
+    );
 
+    //同城变更表
     public static final String show_tn_sql = ("SHOW PARTITIONS bds_data_out_tc PARTITION (ds = '@ds') ;");
-    //基础变更表
+    //安硕变更表
     public static final String base_show_tn_sql = ("SHOW PARTITIONS bds_data_out_anshuo PARTITION (ds = '@ds') ;");
 
+    //安硕变更表
+    public static final String gw_show_tn_sql = ("SHOW PARTITIONS bds_change_out_sql_gw PARTITION (ds = '@ds') ;");
+
     public static final String TC_BUCKET_NAME = ("data-exchange-out-tc");
     public static final String TC_OSS_PRE = ("data/tc");
 
@@ -141,4 +236,6 @@ public class Constant {
     public static final String ANSHUO_OSS_PRE = ("anshuo");
     public static final String ANSHUO_DATASOURCE = ("oss_data_as");
 
+    public static final String GW_DATASOURCE = "oss_data_gw";
+
 }

+ 54 - 0
src/main/java/com/winhc/data/push/common/TaskRunMap.java

@@ -0,0 +1,54 @@
+package com.winhc.data.push.common;
+
+import com.alibaba.fastjson.JSONObject;
+import com.aliyun.odps.Column;
+import com.winhc.data.push.bean.TaskRunParam;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Function;
+
+
+/**
+ * @author π
+ * @since 2021-11-2 15:50
+ */
+@Data
+@NoArgsConstructor
+public class TaskRunMap {
+
+
+    private TaskRunParam taskRunParam;
+
+    public static final Map<String, TaskRunMap> TASK_ARGS = createArgs();
+
+
+    public TaskRunMap(TaskRunParam taskRunParam) {
+        this.taskRunParam = taskRunParam;
+    }
+
+
+    private static Map<String, TaskRunMap> createArgs() {
+        HashMap<String, TaskRunMap> a = new HashMap<>();
+        a.put("gw", new TaskRunMap(
+                TaskRunParam
+                        .builder()
+                        .customer_name("gw")
+                        .source_table("bds_change_out_sql_gw")
+                        .syn_upload_tasks("syn_gw_upload_tasks")
+                        .datasource("oss_data_gw")
+                        .bucket_name("data-exchange-out-gw")
+                        .oss_pre("data/gw")
+                        .detail_suffix(".sql")
+                        .tns_sql_pre("SHOW PARTITIONS bds_change_out_sql_gw PARTITION (ds = '@ds') ;")
+                        .tns_out(Constant.gw_company_tns)
+                        .flow_name("base_opds_push_data_oss_v2")
+                        .flow_task_name("winhc_ng:base_opds_push_data_oss_v2:base_opds_push_data_oss_v2")
+                        .build()
+        ));
+        return a;
+    }
+}

+ 3 - 0
src/main/java/com/winhc/data/push/constant/BaseParam.java

@@ -18,6 +18,9 @@ public class BaseParam {
     public static final String SYN_ANSHUO_UPLOAD_TASKS = "syn_anshuo_upload_tasks";
     //58同城数据状态表
     public static final String SYN_TC_UPLOAD_TASKS = "syn_tc_upload_tasks";
+
+    //国网数据测试
+    public static final String SYN_GW_UPLOAD_TASKS = "syn_gw_upload_tasks";
     public static final String SYN_TC_ID_SUF = "tc";
 
 }

+ 97 - 0
src/main/java/com/winhc/data/push/controller/GwPushDataController.java

@@ -0,0 +1,97 @@
+package com.winhc.data.push.controller;
+
+import cn.hutool.core.lang.Assert;
+import com.aliyun.openservices.shade.org.apache.commons.lang3.StringUtils;
+import com.winhc.data.push.bean.TaskRunParam;
+import com.winhc.data.push.common.TaskEnum;
+import com.winhc.data.push.common.TaskRunMap;
+import com.winhc.data.push.constant.BaseParam;
+import com.winhc.data.push.service.SynDataService;
+import com.winhc.data.push.utils.BaseUtils;
+import com.winhc.data.push.vo.ResponseVo;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.bson.Document;
+import org.jetbrains.annotations.NotNull;
+import org.springframework.web.bind.annotation.*;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Date;
+import java.util.List;
+
+import static com.winhc.data.push.common.TaskRunMap.TASK_ARGS;
+import static com.winhc.data.push.constant.BaseParam.SYN_GW_UPLOAD_TASKS;
+import static com.winhc.data.push.constant.BaseParam.SYN_TC_UPLOAD_TASKS;
+import static com.winhc.data.push.utils.BaseUtils.getObjectId;
+
+
+/**
+ * @author π
+ * @Description :国网推送数据
+ * @date 2022/7/20 16:24
+ */
+@Slf4j
+@AllArgsConstructor
+@RestController
+@Api(tags = "任务触发gw", value = "task/gw")
+@RequestMapping("task/gw")
+public class GwPushDataController {
+
+    private final SynDataService synDataService;
+
+
+    @ApiOperation(value = "触发任务")
+    @GetMapping("push/{ds}")
+    public ResponseVo touch(@PathVariable String ds, @RequestParam String tns, @RequestParam String sign) {
+        //日期是昨天之前数据
+        List<String> out_tns = getList(ds, tns, sign);
+        long start = System.currentTimeMillis();
+        try {
+            TaskRunParam runParam = TASK_ARGS.get("gw").getTaskRunParam();
+            String table = runParam.getSyn_upload_tasks();
+            String id = getObjectId(ds, runParam.getCustomer_name());
+            String re = synDataService.saveTask(table, out_tns, ds, id);
+            return ResponseVo.success(start, re);
+        } catch (Exception e) {
+            log.error(e.getMessage(), e);
+            return ResponseVo.failure(start, e.getMessage());
+        }
+    }
+
+    @ApiOperation(value = "强制触发任务")
+    @GetMapping("repush/{ds}")
+    public ResponseVo reTouch(@PathVariable String ds, @RequestParam String tns, @RequestParam String sign) {
+        List<String> out_tns = getList(ds, tns, sign);
+
+        long start = System.currentTimeMillis();
+        try {
+            Document d = new Document();
+            d.put("update_time", new Date());
+            d.put("status", TaskEnum.STATUS.CREATE.code);
+            d.put("ds", ds);
+            d.put("tns", out_tns);
+            TaskRunParam runParam = TASK_ARGS.get("gw").getTaskRunParam();
+            String table = runParam.getSyn_upload_tasks();
+            String id = getObjectId(ds, runParam.getCustomer_name());
+            Boolean re = synDataService.updateTaskStatus(table, d, id);
+            return ResponseVo.success(start, re ? "restart" : "error");
+        } catch (Exception e) {
+            log.error(e.getMessage(), e);
+            return ResponseVo.failure(start, e.getMessage());
+        }
+    }
+
+    @NotNull
+    public static List<String> getList(String ds, String tns, String sign) {
+        Assert.isTrue(BaseUtils.verifyDate(ds), "ds 不合法");
+        Assert.isTrue(BaseParam.sign.equals(sign), "验签不通过!");
+        Assert.isTrue(StringUtils.isNotBlank(tns), "参数不能为空!");
+        List<String> out_tns = tns.equals("all") ? new ArrayList<>() : Arrays.asList(tns.split(","));
+        return out_tns;
+    }
+
+
+}

+ 5 - 0
src/main/java/com/winhc/data/push/service/SynDataService.java

@@ -1,5 +1,6 @@
 package com.winhc.data.push.service;
 
+import com.winhc.data.push.bean.TaskRunParam;
 import org.bson.Document;
 
 import java.io.FileNotFoundException;
@@ -14,6 +15,8 @@ public interface SynDataService {
     void push(String tn, String ds);
 
     void pushTc(List<String> tns, String ds);
+
+    void pushGw(List<String> tns, String ds, TaskRunParam runParam);
     void pushTc(String ds);
 
     void pushAnshuoCompany(List<String> tns, String ds);
@@ -23,6 +26,8 @@ public interface SynDataService {
 
     String saveTask(String collectionName,List<String> tns, String ds);
 
+    String saveTask(String collectionName,List<String> tns, String ds,String id);
+
     Boolean findTaskExists(Document doc);
 
     Boolean findTaskExists(String collectionName, Document doc);

+ 4 - 0
src/main/java/com/winhc/data/push/service/TouchService.java

@@ -1,5 +1,7 @@
 package com.winhc.data.push.service;
 
+import com.winhc.data.push.bean.TaskRunParam;
+
 import java.util.List;
 import java.util.Map;
 
@@ -20,6 +22,8 @@ public interface TouchService {
 
     Boolean export2OSSTc(String  project, String  ds,List<String> tns);
 
+    Boolean export2OSSGw(String  project, String  ds,List<String> tns, TaskRunParam runParam);
+
     Boolean export2OSSAnshuoCompany(String  project, String  ds,List<String> tns);
 
 }

+ 69 - 0
src/main/java/com/winhc/data/push/service/impl/SynDataServiceImpl.java

@@ -10,6 +10,7 @@ import com.mongodb.client.FindIterable;
 import com.mongodb.client.MongoDatabase;
 import com.mongodb.client.model.FindOneAndUpdateOptions;
 import com.mongodb.client.model.UpdateOptions;
+import com.winhc.data.push.bean.TaskRunParam;
 import com.winhc.data.push.common.Constant;
 import com.winhc.data.push.common.TaskEnum;
 import com.winhc.data.push.configuration.OSSAccessProperties;
@@ -130,6 +131,39 @@ public class SynDataServiceImpl implements SynDataService {
     }
 
     /**
+     * 国网数据输出
+     *
+     * @param tns
+     * @param ds
+     */
+    @Override
+    public void pushGw(List<String> tns, String ds, TaskRunParam runParam) {
+        //String id = ds + "_" + runParam.getCustomer_name();
+        String id = getObjectId(ds, runParam.getCustomer_name());
+        try {
+            updateTaskStatus(runParam.getSyn_upload_tasks(), ds, TaskEnum.STATUS.RUNNING.code, id);
+            sendMessage("task running !!! " + id + "\n" + runParam.getSource_table());
+            String project = "winhc_ng";
+            if (touchService.export2OSSGw(project, ds, tns, runParam)) {
+                //保存集合
+                saveObjectBase(ds, runParam);
+                updateTaskStatus(runParam.getSyn_upload_tasks(), ds, TaskEnum.STATUS.SUCCESS.code, id);
+                sendMessage("导出 任务 成功 !!! " + id + "\n" + runParam.getSource_table());
+            } else {
+                log.error("导出OSS 失败 !!!");
+                sendMessage("导出 OSS 失败!!! " + id + "\n" + runParam.getSource_table());
+                updateTaskStatus(runParam.getSyn_upload_tasks(), ds, TaskEnum.STATUS.ERROR.code, id);
+            }
+
+        } catch (Exception e) {
+            sendMessage("任务失败!!! " + id + " |  error " + e.getMessage() + "\n" + runParam.getSource_table());
+            updateTaskStatus(runParam.getSyn_upload_tasks(), ds, TaskEnum.STATUS.TASK_ERROR.code, id);
+            e.printStackTrace();
+            log.error("export error: {}", e.getMessage());
+        }
+    }
+
+    /**
      * 安硕数据工商数据输出
      *
      * @param tns
@@ -204,6 +238,27 @@ public class SynDataServiceImpl implements SynDataService {
     }
 
     @Override
+    public String saveTask(String collectionName, List<String> tns, String ds, String id) {
+        if (findTaskExists(collectionName, new Document("_id", id))) {
+            return "task repeat";
+        } else {
+            if (tns == null || tns.size() == 0) {
+                tns = new ArrayList<>();
+            }
+            Document d = new Document();
+            d.put("_id", id);
+            d.put("create_time", new Date());
+            d.put("update_time", new Date());
+            d.put("status", "create");
+            d.put("tns", tns);
+            d.put("ds", ds);
+            mongoTemplate.getCollection(collectionName).insertOne(d);
+            return "task create success";
+        }
+
+    }
+
+    @Override
     public void sendMessage(String message) {
         try {
             dingTalkApi.sendTextByPhones("data_push_message", "winhc\n" + message, Collections.singletonList("17602140784"));
@@ -396,6 +451,20 @@ public class SynDataServiceImpl implements SynDataService {
         ossClient.putObject(putObjectRequest);
     }
 
+    public void saveObjectBase(String ds, TaskRunParam runParam) {
+        String prcessDate = getPrcessDateV2(ds, 0);
+        String pre = runParam.getOss_pre() + "/" + prcessDate + "/";
+        List<String> keys = listKeys(runParam.getBucket_name(), pre, runParam.getDetail_suffix());
+        List<String> filters_keys = keys.stream()
+                .filter(k -> runParam.getTns_out().contains(getTn(k)))
+                .distinct()
+                .collect(Collectors.toList());
+        String sum = JSON.toJSONString(filters_keys);
+        String targetKey = pre + "out.success";
+        PutObjectRequest putObjectRequest = new PutObjectRequest(runParam.getBucket_name(), targetKey, new ByteArrayInputStream(sum.getBytes()));
+        ossClient.putObject(putObjectRequest);
+    }
+
     /**
      * 安硕工商数据测试
      *

+ 43 - 1
src/main/java/com/winhc/data/push/service/impl/TouchServiceImpl.java

@@ -20,6 +20,7 @@ import java.util.*;
 import java.util.stream.Collectors;
 
 import static com.winhc.data.push.common.Constant.*;
+import static com.winhc.data.push.utils.BaseUtils.getDetailName;
 import static com.winhc.data.push.utils.BaseUtils.getPrcessDate;
 import static com.winhc.data.push.utils.DateUtils.getPrcessDateV2;
 
@@ -80,7 +81,7 @@ public class TouchServiceImpl implements TouchService {
             otherParams.put("tn", tn);
             otherParams.put("ossPrefix", TC_OSS_PRE);
             otherParams.put("prcessDate", prcessDate);
-            otherParams.put("detail", BaseUtils.getDetailName(ds, tn));
+            otherParams.put("detail", getDetailName(ds, tn));
             Long returnValue = dataWorksService.export2OSS(ds, ProjectParamInit.exportOssFlowNameTC,
                     DateUtils.parseDate(ds),
                     dataWorksFlowTaskMap.get(ProjectParamInit.exportOssBeanNameTC),
@@ -92,6 +93,47 @@ public class TouchServiceImpl implements TouchService {
     }
 
     /**
+     * 国网数据导出
+     *
+     * @param project
+     * @param ds
+     * @param tns
+     * @return
+     */
+    @Override
+    public Boolean export2OSSGw(String project, String ds, List<String> tns, TaskRunParam runParam) {
+        String prcessDate = getPrcessDateV2(ds, 0);
+        //当日分区数据
+        List<String> ds_tns = odpsService.query(runParam.getTns_sql_pre(), ds);
+        //需求输出数据
+        List<String> out_tns = ds_tns.stream()
+                .filter(tn -> runParam.getTns_out().contains(tn))
+                .distinct()
+                .collect(Collectors.toList());
+        //取交集
+        if (tns != null && tns.size() > 0) {
+            out_tns.retainAll(tns);
+        }
+
+        Map<Long, String> task_ids = out_tns.stream().map(tn -> {
+            Map<String, String> otherParams = new HashMap<>();
+            otherParams.put("tn", tn);
+            otherParams.put("ossPrefix", runParam.getOss_pre());
+            otherParams.put("datasource", runParam.getDatasource());
+            otherParams.put("prcessDate", prcessDate);
+            otherParams.put("sourceTable", runParam.getSource_table());
+            otherParams.put("detail", getDetailName(ds, tn, runParam.getDetail_suffix()));
+            Long returnValue = dataWorksService.export2OSS(ds, runParam.getFlow_name(),
+                    DateUtils.parseDate(ds),
+                    dataWorksFlowTaskMap.get(runParam.getFlow_task_name()),
+                    otherParams).getReturnValue();
+            return Arrays.asList(returnValue, tn);
+        }).distinct().collect(Collectors.toMap(c -> Long.parseLong(c.get(0).toString()), c -> c.get(1).toString(), (c, v) -> v));
+
+        return queryList(project, task_ids);
+    }
+
+    /**
      * 安硕工商数据导出测试
      *
      * @param project

+ 58 - 0
src/main/java/com/winhc/data/push/task/GwPushDataTask.java

@@ -0,0 +1,58 @@
+package com.winhc.data.push.task;
+
+
+import com.aliyun.openservices.shade.org.apache.commons.lang3.StringUtils;
+import com.winhc.data.push.bean.TaskRunParam;
+import com.winhc.data.push.common.Constant;
+import com.winhc.data.push.common.TaskEnum;
+import com.winhc.data.push.service.SynDataService;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.bson.Document;
+import org.springframework.scheduling.annotation.EnableScheduling;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+
+import java.util.Arrays;
+import java.util.List;
+
+import static com.winhc.data.push.common.Constant.GW_DATASOURCE;
+import static com.winhc.data.push.common.TaskRunMap.TASK_ARGS;
+import static com.winhc.data.push.constant.BaseParam.SYN_GW_UPLOAD_TASKS;
+import static com.winhc.data.push.constant.BaseParam.SYN_TC_UPLOAD_TASKS;
+
+/**
+ * @author π
+ * @Description:国网数据触发任务
+ * @date 2021/6/22 17:07
+ */
+
+@Component
+@Slf4j
+@EnableScheduling
+@AllArgsConstructor
+public class GwPushDataTask {
+
+    private final SynDataService synDataService;
+
+    @Scheduled(cron = "*/10 * * * * ?")
+    //@Scheduled(cron = "0 /2 * * * ? ")
+    public synchronized void start() {
+        log.info("start " + this.getClass() + " !!! ");
+        //任务配置参数
+        TaskRunParam runParam = TASK_ARGS.get("gw").getTaskRunParam();
+        //获取运行任务
+        List<Document> allTask = synDataService.findAllTask(runParam.getSyn_upload_tasks(), new Document("status", TaskEnum.STATUS.CREATE.code));
+        allTask.forEach(task -> {
+            String ds = task.getString("ds");
+            List<String> tns = task.getList("tns", String.class);
+            if (StringUtils.isNotBlank(ds)) {
+                synDataService.pushGw(tns, ds, runParam);
+            }
+        });
+        log.info("stop " + this.getClass() + " !!! ");
+
+    }
+
+
+}

+ 8 - 1
src/main/java/com/winhc/data/push/utils/BaseUtils.java

@@ -32,6 +32,12 @@ public class BaseUtils {
         return md5(ds + "@" + tn + "@" + salt) + ".json";
     }
 
+    public static String getDetailName(String ds, String tn, String suffix) {
+        assert StringUtils.isNotBlank(ds);
+        assert StringUtils.isNotBlank(tn);
+        return md5(ds + "@" + tn + "@" + salt) + suffix;
+    }
+
     public static String getPrcessDate(String ds) {
         assert StringUtils.isNotBlank(ds);
         return String.valueOf(Long.parseLong(ds) + 7);
@@ -42,7 +48,8 @@ public class BaseUtils {
         return ds + "_" + SYN_TC_ID_SUF;
     }
 
-    public static String getObjectId(String ds,String pre) {
+
+    public static String getObjectId(String ds, String pre) {
         assert StringUtils.isNotBlank(ds);
         return ds + "_" + pre;
     }

+ 20 - 0
src/main/java/com/winhc/data/push/utils/OssUtils.java

@@ -98,6 +98,26 @@ public class OssUtils {
         return list_keys;
     }
 
+    public static List<String> listKeys(String bucket_name,String prefix,String endpath) {
+        OSS ossClient = getOssClient();
+        // 构造ListObjectsRequest请求
+        ListObjectsRequest listObjectsRequest = new ListObjectsRequest(bucket_name);
+        listObjectsRequest.setMaxKeys(1000);
+        //Delimiter 设置为 “/” 时,罗列该文件夹下的文件
+        //listObjectsRequest.setDelimiter("/");
+        //Prefix 设为某个文件夹名,罗列以此 Prefix 开头的文件
+        listObjectsRequest.setPrefix(prefix);
+        ObjectListing listing = ossClient.listObjects(listObjectsRequest);
+        // 遍历所有Object:目录下的文件
+        List<String> list_keys = listing.getObjectSummaries()
+                .stream()
+                .map(OSSObjectSummary::getKey)
+                .filter(s -> s.endsWith(endpath))
+                .collect(Collectors.toList());
+        ossClient.shutdown();
+        return list_keys;
+    }
+
     public static Boolean existsBucketName(String bucket_name,String ObjectName) {
         return ossClientDefault.doesObjectExist(bucket_name, ObjectName);
     }

+ 9 - 1
src/main/resources/data-works-param.yaml

@@ -33,4 +33,12 @@ job:
         param:
           - _nodeId: 700006424784
             project: winhc_ng
-            sourceTable: bds_data_out_anshuo
+            sourceTable: bds_data_out_anshuo
+
+  - project: winhc_ng
+    flow: base_opds_push_data_oss_v2
+    task:
+      - taskName: base_opds_push_data_oss_v2
+        param:
+          - _nodeId: 700006464754
+            project: winhc_ng