xufei 2 éve
szülő
commit
8ec0be2fe7

+ 26 - 8
src/main/java/com/winhc/data/push/service/impl/SynDataServiceImpl.java

@@ -3,19 +3,18 @@ package com.winhc.data.push.service.impl;
 import cn.cocowwy.dingtalk.DingTalkGroupApi;
 import com.aliyun.openservices.shade.org.apache.commons.lang3.StringUtils;
 import com.aliyun.oss.OSS;
+import com.aliyun.oss.model.CopyObjectResult;
 import com.aliyun.oss.model.PutObjectRequest;
 import com.mongodb.client.FindIterable;
 import com.mongodb.client.MongoDatabase;
 import com.mongodb.client.model.Filters;
 import com.mongodb.client.model.FindOneAndUpdateOptions;
-import com.mongodb.client.model.UpdateOneModel;
 import com.mongodb.client.model.UpdateOptions;
 import com.winhc.data.push.common.TaskEnum;
 import com.winhc.data.push.configuration.OSSAccessProperties;
 import com.winhc.data.push.framework.MongoDbFastScan;
 import com.winhc.data.push.service.SynDataService;
 import com.winhc.data.push.service.TouchService;
-import com.winhc.data.push.utils.TransToExcelNewCompany;
 import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.io.FileUtils;
@@ -25,16 +24,14 @@ import org.springframework.stereotype.Service;
 
 import java.io.ByteArrayInputStream;
 import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
 import java.util.Collections;
 import java.util.Date;
 import java.util.List;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
-import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
 
+import static cn.hutool.crypto.SecureUtil.md5;
 import static com.winhc.data.push.constant.BaseParam.*;
 import static com.winhc.data.push.utils.TransToExcelNewCompany.dataPathPrefix;
 import static com.winhc.data.push.utils.TransToExcelNewCompany.run;
@@ -198,6 +195,11 @@ public class SynDataServiceImpl implements SynDataService {
     }
 
     private void copyOSS(String ds) {
+        //清空七天之前数据
+        long lastWeek = Long.parseLong(ds) - 7;
+        long deletedCount = mongoTemplate.getCollection("xf_as_upload_oss_log")
+                .deleteMany(Filters.lt("ds", lastWeek)).getDeletedCount();
+        log.info("清空过期数据量 : {}", deletedCount);
         MongoDatabase db = mongoTemplate.getDb();
         Consumer<List<Document>> func = list -> {
             list.stream().map(d -> d.getString("file_path"))
@@ -208,15 +210,31 @@ public class SynDataServiceImpl implements SynDataService {
         };
 
         MongoDbFastScan mongoDbFastScan = new MongoDbFastScan(AUCTION_TRACKING_ANSHUO_URL, func, db)
-                .batchSize(200).threadNum(10);
+                .batchSize(200).threadNum(100);
         mongoDbFastScan.scan();
     }
 
     public void copyObject2OSS(String keySuffix, String ds) {
         String sourceKey = project.getSourcePathPrefix() + keySuffix;
         String targetKey = project.getTargetPathprefix() + ds + "/file/" + keySuffix;
-        ossClient.copyObject(project.getSourceBucketName(), sourceKey, project.getTargetBucketName(), targetKey);
-        log.info(targetKey);
+        Document d = new Document();
+        d.put("file_path", keySuffix);
+        d.put("update_time", new Date());
+        d.put("ds", ds);
+        int statusCode = 500;
+        try {
+            CopyObjectResult copyObjectResult = ossClient.copyObject(project.getSourceBucketName(), sourceKey, project.getTargetBucketName(), targetKey);
+            statusCode = copyObjectResult.getResponse().getStatusCode();
+            d.put("status_code", statusCode);
+            log.info("code : {} \n,key : {}", statusCode, targetKey);
+        } catch (Exception e) {
+            log.error("code : {} \n,key : {}\n,error : {}", statusCode, targetKey, e.getMessage());
+            int length = e.getMessage().length();
+            d.put("error_message", length > 1000 ? e.getMessage().substring(0, 1000) : e.getMessage());
+        }
+        mongoTemplate.getCollection("xf_as_upload_oss_log")
+                .updateOne(new Document("_id", md5(keySuffix + ds)), new Document("$set", d)
+                        , new UpdateOptions().upsert(true).bypassDocumentValidation(true));
     }
 
     public void saveObject(String ds) {