فهرست منبع

安硕工商测试

xufei 5 ماه پیش
والد
کامیت
58558f963a

+ 37 - 1
src/main/java/com/winhc/data/push/common/Constant.java

@@ -81,7 +81,7 @@ public class Constant {
             "property_rights_transaction", "company_double_random_check_info", "company_double_random_check_result_info", "company_tm", "company_tm_notice",
             "company_tm_step", "company_tm_goods", "company_patent_new", "company_copyright_reg", "company_copyright_works", "company_icp",
             "construction_qualification", "construction_person", "construction_project"
-            ,"company_holder_sponsor",
+            , "company_holder_sponsor",
             "company_app_info",
             "company_bond",
             /*"company_license",*/
@@ -99,10 +99,46 @@ public class Constant {
             "product",
             "product_competition");
 
+    /**
+     * 安硕工商数据输出维度
+     */
+    public static List<String> anshuo_company_tns = Arrays.asList(
+            "company"
+            , "company_holder"
+            , "company_staff"
+            , "company_liquidating_info"
+            , "company_change"
+            , "company_equity_info"
+            , "company_check_info"
+            , "company_double_random_check_info"
+            ,"company_judicial_assistance"
+            , "company_license"
+            , "company_punishment_info"
+            , "company_abnormal_info"
+            , "company_illegal_info"
+            , "company_brief_cancel_announcement"
+            , "cancellation_announcement"
+            , "company_annual_report"
+            , "company_annual_report_out_investment"
+            , "company_annual_report_out_guarantee"
+            , "company_annual_report_change"
+            , "company_annual_report_equity_change"
+            , "company_annual_report_holder"
+            , "company_annual_report_webinfo"
+            , "company_annual_report_social_security"
+            , "reduce_capital_info"
+    );
+
 
     public static final String show_tn_sql = ("SHOW PARTITIONS bds_data_out_tc PARTITION (ds = '@ds') ;");
+    //基础变更表
+    public static final String base_show_tn_sql = ("SHOW PARTITIONS bds_data_out_anshuo PARTITION (ds = '@ds') ;");
 
     public static final String TC_BUCKET_NAME = ("data-exchange-out-tc");
     public static final String TC_OSS_PRE = ("data/tc");
 
+    public static final String ANSHUO_BUCKET_NAME = ("data-exchange-as");
+    public static final String ANSHUO_OSS_PRE = ("anshuo");
+    public static final String ANSHUO_DATASOURCE = ("oss_data_as");
+
 }

+ 3 - 0
src/main/java/com/winhc/data/push/configuration/ProjectParamInit.java

@@ -30,6 +30,9 @@ public class ProjectParamInit implements BeanFactoryPostProcessor {
     public static final String exportOssFlowNameTC = "push_data_tc_to_oss";
     public static final String exportOssBeanNameTC = "winhc_ng:push_data_tc_to_oss:oss_all_data_push_ads";
 
+    public static final String exportOssFlowNameAnshuo = "base_opds_push_data_oss";
+    public static final String exportOssBeanNameAnshuo = "winhc_ng:base_opds_push_data_oss:base_opds_push_data_oss";
+
     /**
      * @param beanFactory
      * @throws BeansException

+ 2 - 0
src/main/java/com/winhc/data/push/constant/BaseParam.java

@@ -9,6 +9,8 @@ public class BaseParam {
     public static final String sign = "da742d3391af7d14acf1539e72f98323";
     //安硕任务状态表
     public static final String SYN_ANSHUO_TASKS = "syn_anshuo_tasks";
+    //安硕工商任务推送
+    public static final String SYN_ANSHUO_GONGSHANG_TASKS = "syn_anshuo_gongshang_tasks";
     //每日url集合
     public static final String AUCTION_TRACKING_ANSHUO_URL = "auction_tracking_anshuo_url";
 

+ 27 - 1
src/main/java/com/winhc/data/push/controller/PushDataController.java

@@ -11,9 +11,15 @@ import io.swagger.annotations.Api;
 import io.swagger.annotations.ApiOperation;
 import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.bson.Document;
 import org.springframework.web.bind.annotation.*;
 
-import static com.winhc.data.push.constant.BaseParam.SYN_ANSHUO_UPLOAD_TASKS;
+import java.util.Date;
+import java.util.List;
+
+import static com.winhc.data.push.constant.BaseParam.*;
+import static com.winhc.data.push.controller.TcPushDataController.getList;
+import static com.winhc.data.push.utils.BaseUtils.getObjectId;
 
 
 /**
@@ -64,6 +70,26 @@ public class PushDataController {
         }
     }
 
+    @ApiOperation(value = "强制触发任务-工商任务")
+    @GetMapping("repush_company/{ds}")
+    public ResponseVo reTouchCompany(@PathVariable String ds, @RequestParam String tns, @RequestParam String sign) {
+        List<String> out_tns = getList(ds, tns, sign);
+
+        long start = System.currentTimeMillis();
+        try {
+            Document d = new Document();
+            d.put("update_time", new Date());
+            d.put("status", "create");
+            d.put("ds", ds);
+            d.put("tns", out_tns);
+            Boolean re = synDataService.updateTaskStatus(SYN_ANSHUO_GONGSHANG_TASKS, d, ds);
+            return ResponseVo.success(start, re ? "restart" : "error");
+        } catch (Exception e) {
+            log.error(e.getMessage(), e);
+            return ResponseVo.failure(start, e.getMessage());
+        }
+    }
+
 
     @ApiOperation(value = "上传数据")
     @GetMapping("upload/{ds}")

+ 1 - 1
src/main/java/com/winhc/data/push/controller/TcPushDataController.java

@@ -76,7 +76,7 @@ public class TcPushDataController {
     }
 
     @NotNull
-    private static List<String> getList(String ds, String tns, String sign) {
+    public static List<String> getList(String ds, String tns, String sign) {
         Assert.isTrue(BaseUtils.verifyDate(ds), "ds 不合法");
         Assert.isTrue(BaseParam.sign.equals(sign), "验签不通过!");
         Assert.isTrue(StringUtils.isNotBlank(tns), "参数不能为空!");

+ 3 - 1
src/main/java/com/winhc/data/push/service/SynDataService.java

@@ -16,7 +16,7 @@ public interface SynDataService {
     void pushTc(List<String> tns, String ds);
     void pushTc(String ds);
 
-
+    void pushAnshuoCompany(List<String> tns, String ds);
     void sendMessage(String message) throws Exception;
 
     String saveTask(String tn, String ds);
@@ -42,6 +42,8 @@ public interface SynDataService {
     void uploadToOss(String ds) throws FileNotFoundException;
     void saveObject(String ds, String targetBucketName, String ossPre);
 
+    void saveObjectAnshuo(String ds, String targetBucketName, String ossPre);
+
     void saveEmptyObject(String ds);
 
 }

+ 2 - 0
src/main/java/com/winhc/data/push/service/TouchService.java

@@ -20,4 +20,6 @@ public interface TouchService {
 
     Boolean export2OSSTc(String  project, String  ds,List<String> tns);
 
+    Boolean export2OSSAnshuoCompany(String  project, String  ds,List<String> tns);
+
 }

+ 52 - 0
src/main/java/com/winhc/data/push/service/impl/SynDataServiceImpl.java

@@ -37,6 +37,7 @@ import static com.winhc.data.push.constant.BaseParam.*;
 import static com.winhc.data.push.utils.BaseUtils.getObjectId;
 import static com.winhc.data.push.utils.BaseUtils.getPrcessDate;
 import static com.winhc.data.push.utils.DateUtils.getPrcessDateV2;
+import static com.winhc.data.push.utils.OssUtils.existsBucketName;
 import static com.winhc.data.push.utils.OssUtils.listKeys;
 import static com.winhc.data.push.utils.TransToExcelNewCompany.dataPathPrefix;
 import static com.winhc.data.push.utils.TransToExcelNewCompany.run;
@@ -128,6 +129,38 @@ public class SynDataServiceImpl implements SynDataService {
         }
     }
 
+    /**
+     * 安硕数据工商数据输出
+     *
+     * @param tns
+     * @param ds
+     */
+    @Override
+    public void pushAnshuoCompany(List<String> tns, String ds) {
+        String id = ds;
+        String message_pre = "安硕工商推送 ";
+        try {
+            updateTaskStatus(SYN_ANSHUO_GONGSHANG_TASKS, ds, TaskEnum.STATUS.RUNNING.code, id);
+            sendMessage(message_pre + "task running !!! " + id);
+            String project = "winhc_ng";
+            if (touchService.export2OSSAnshuoCompany(project, ds, tns)) {
+                //待修改
+                saveObjectAnshuo(ds, ANSHUO_BUCKET_NAME, ANSHUO_OSS_PRE);
+                updateTaskStatus(SYN_ANSHUO_GONGSHANG_TASKS, ds, TaskEnum.STATUS.SUCCESS.code, id);
+                sendMessage(message_pre + "导出 任务 成功 !!! " + id);
+            } else {
+                log.error(message_pre + "导出OSS 失败 !!!");
+                sendMessage(message_pre + "导出 OSS 失败!!! " + id);
+                updateTaskStatus(SYN_ANSHUO_GONGSHANG_TASKS, ds, TaskEnum.STATUS.ERROR.code, id);
+            }
+        } catch (Exception e) {
+            sendMessage(message_pre + "任务失败!!! " + id + " |  error " + e.getMessage());
+            updateTaskStatus(SYN_ANSHUO_GONGSHANG_TASKS, ds, TaskEnum.STATUS.TASK_ERROR.code, id);
+            e.printStackTrace();
+            log.error(message_pre + "export error: {}", e.getMessage());
+        }
+    }
+
 
     @Override
     public String saveTask(String tn, String ds) {
@@ -361,7 +394,26 @@ public class SynDataServiceImpl implements SynDataService {
         String targetKey = pre + "out_tc.success";
         PutObjectRequest putObjectRequest = new PutObjectRequest(targetBucketName, targetKey, new ByteArrayInputStream(sum.getBytes()));
         ossClient.putObject(putObjectRequest);
+    }
 
+    /**
+     * 安硕工商数据测试
+     *
+     * @param ds
+     * @param targetBucketName
+     * @param ossPre
+     */
+    @Override
+    public void saveObjectAnshuo(String ds, String targetBucketName, String ossPre) {
+        String prcessDate = getPrcessDateV2(ds, 0);
+        anshuo_company_tns.forEach(tn -> {
+            String targetSuccessKey = ossPre + "/" + tn + "/" + prcessDate + "/" + "anshuo.txt";
+            String targetDataKey = ossPre + "/" + tn + "/" + prcessDate + "/" + tn + ".json";
+            if (existsBucketName(targetBucketName, targetDataKey)) {
+                PutObjectRequest putObjectRequest = new PutObjectRequest(targetBucketName, targetSuccessKey, new ByteArrayInputStream("".getBytes()));
+                ossClient.putObject(putObjectRequest);
+            }
+        });
     }
 
     public String getTn(String key) {

+ 42 - 2
src/main/java/com/winhc/data/push/service/impl/TouchServiceImpl.java

@@ -19,7 +19,7 @@ import java.time.LocalDateTime;
 import java.util.*;
 import java.util.stream.Collectors;
 
-import static com.winhc.data.push.common.Constant.TC_OSS_PRE;
+import static com.winhc.data.push.common.Constant.*;
 import static com.winhc.data.push.utils.BaseUtils.getPrcessDate;
 import static com.winhc.data.push.utils.DateUtils.getPrcessDateV2;
 
@@ -91,6 +91,46 @@ public class TouchServiceImpl implements TouchService {
         return queryList(project, task_ids);
     }
 
+    /**
+     * 安硕工商数据导出测试
+     *
+     * @param project
+     * @param ds
+     * @param tns
+     * @return
+     */
+    @Override
+    public Boolean export2OSSAnshuoCompany(String project, String ds, List<String> tns) {
+        String prcessDate = getPrcessDateV2(ds, 0);
+        //当日分区数据
+        List<String> ds_tns = odpsService.query(Constant.base_show_tn_sql, ds);
+        //需求输出数据
+        List<String> out_tns = ds_tns.stream()
+                .filter(tn -> Constant.anshuo_company_tns.contains(tn))
+                .distinct()
+                .collect(Collectors.toList());
+        //取交集
+        if (tns != null && tns.size() > 0) {
+            out_tns.retainAll(tns);
+        }
+
+        Map<Long, String> task_ids = out_tns.stream().map(tn -> {
+            Map<String, String> otherParams = new HashMap<>();
+            otherParams.put("tn", tn);
+            otherParams.put("ossPrefix", ANSHUO_OSS_PRE);
+            otherParams.put("datasource", ANSHUO_DATASOURCE);
+            otherParams.put("prcessDate", prcessDate);
+            otherParams.put("detail", tn + ".json");
+            Long returnValue = dataWorksService.export2OSS(ds, ProjectParamInit.exportOssFlowNameAnshuo,
+                    DateUtils.parseDate(ds),
+                    dataWorksFlowTaskMap.get(ProjectParamInit.exportOssBeanNameAnshuo),
+                    otherParams).getReturnValue();
+            return Arrays.asList(returnValue, tn);
+        }).distinct().collect(Collectors.toMap(c -> Long.parseLong(c.get(0).toString()), c -> c.get(1).toString(), (c, v) -> v));
+
+        return queryList(project, task_ids);
+    }
+
     @SneakyThrows
     @Override
     public Boolean queryList(String projectName, Map<Long, String> task_ids_maps) {
@@ -155,10 +195,10 @@ public class TouchServiceImpl implements TouchService {
                     , totalTask - awaitTask - failedTask
                     , failedTask
             );
+            Thread.sleep(20000);
             if (flag) {
                 break;
             }
-            Thread.sleep(20 * 000);
         }
         return true;
     }

+ 22 - 1
src/main/java/com/winhc/data/push/task/PushDataTask.java

@@ -14,9 +14,10 @@ import org.springframework.scheduling.annotation.Scheduled;
 import org.springframework.stereotype.Component;
 
 import java.io.FileNotFoundException;
+import java.util.Arrays;
 import java.util.List;
 
-import static com.winhc.data.push.constant.BaseParam.SYN_ANSHUO_UPLOAD_TASKS;
+import static com.winhc.data.push.constant.BaseParam.*;
 
 /**
  * @author π
@@ -70,4 +71,24 @@ public class PushDataTask {
         log.info("stop upload task !!! ");
 
     }
+
+    /**
+     * 工商相关推送任务
+     */
+    @Scheduled(cron = "*/10 * * * * ?")
+    //@Scheduled(cron = "0 /2 * * * ? ")
+    public synchronized void companyTask() {
+        log.info("start " + this.getClass() + " !!! ");
+        //获取运行任务
+        List<Document> allTask = synDataService.findAllTask(SYN_ANSHUO_GONGSHANG_TASKS, new Document("status", "create"));
+        allTask.forEach(task -> {
+            String ds = task.getString("ds");
+            List<String> tns = task.getList("tns", String.class);
+            if (StringUtils.isNotBlank(ds)) {
+                synDataService.pushAnshuoCompany(tns, ds);
+            }
+        });
+        log.info("stop " + this.getClass() + " !!! ");
+
+    }
 }

+ 5 - 0
src/main/java/com/winhc/data/push/utils/BaseUtils.java

@@ -42,6 +42,11 @@ public class BaseUtils {
         return ds + "_" + SYN_TC_ID_SUF;
     }
 
+    public static String getObjectId(String ds,String pre) {
+        assert StringUtils.isNotBlank(ds);
+        return ds + "_" + pre;
+    }
+
     public static void main(String[] args) {
         System.out.println(getDetailName("20220920", "product"));
         System.out.println(getDetailName("20220921", "product"));

+ 14 - 1
src/main/java/com/winhc/data/push/utils/OssUtils.java

@@ -33,6 +33,7 @@ public class OssUtils {
     public static String endpoint ;
     public static String bucketName = "data-exchange-as";
 
+    private static OSS ossClientDefault;
 
     static {
         if (BaseUtils.isWindows()) {
@@ -40,6 +41,12 @@ public class OssUtils {
         } else {
             endpoint = "oss-cn-shanghai-internal.aliyuncs.com";
         }
+
+        load();
+    }
+
+    private static void load() {
+        ossClientDefault = new OSSClientBuilder().build(endpoint, accessKeyId, accessKeySecret);
     }
 
     public static OSS getOssClient() {
@@ -91,14 +98,20 @@ public class OssUtils {
         return list_keys;
     }
 
+    public static Boolean existsBucketName(String bucket_name,String ObjectName) {
+        return ossClientDefault.doesObjectExist(bucket_name, ObjectName);
+    }
+
 
     public static void main(String[] args) {
 
         String objectName = "anshuo/tmp/20221122.zip";
         String filePath = "D:\\tmp\\data\\20221122.zip";
 
+        System.out.println(existsBucketName("data-exchange-as", "anshuo/wenshu/20240407/wenshu_detail_v2.json"));
+
         //uploadFile(objectName, filePath);
-        List<String> res = listKeys("data-exchange-as", "anshuo/wenshu/20240114/");
+        List<String> res = listKeys("data-exchange-as", "anshuo/wenshu/20240401/");
 //        List<String> res1 = listKeys("data-exchange-out-tc", "data/");
 //        List<String> res2 = listKeys("data-exchange-out-tc", "data/tc");
         System.out.println(res);

+ 9 - 0
src/main/resources/data-works-param.yaml

@@ -25,3 +25,12 @@ job:
           - _nodeId: 700006274093
             project: winhc_ng
             sourceTable: bds_data_out_tc
+
+  - project: winhc_ng
+    flow: base_opds_push_data_oss
+    task:
+      - taskName: base_opds_push_data_oss
+        param:
+          - _nodeId: 700006424784
+            project: winhc_ng
+            sourceTable: bds_data_out_anshuo

+ 5 - 6
src/test/java/com/winhc/data/push/TestPush.java

@@ -10,6 +10,7 @@ import org.springframework.test.context.junit4.SpringRunner;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
@@ -54,16 +55,14 @@ public class TestPush {
 
     @Test
     public void touchTaskTC() {
-        String ds = "20231221";
-        touchService.export2OSSTc("winhc_ng", ds, Arrays.asList("zxr_evaluate"));
+        String ds = "20240322";
+        touchService.export2OSSTc("winhc_ng", ds, new ArrayList<>());
         synDataService.saveObject(ds, TC_BUCKET_NAME, TC_OSS_PRE);
     }
 
     @Test
     public void touchTaskTCSum() {
-        synDataService.saveObject("20231226", TC_BUCKET_NAME, TC_OSS_PRE);
-        synDataService.saveObject("20231225", TC_BUCKET_NAME, TC_OSS_PRE);
-        synDataService.saveObject("20231227", TC_BUCKET_NAME, TC_OSS_PRE);
+        synDataService.saveObject("20240323", TC_BUCKET_NAME, TC_OSS_PRE);
     }
 
     @Test
@@ -75,7 +74,7 @@ public class TestPush {
 
     @Test
     public void saveEmpty() {
-        synDataService.saveEmptyObject("20240115");
+        synDataService.saveEmptyObject("20240322");
     }
     @Test
     public void tt() {