|
@@ -32,6 +32,7 @@ import java.util.stream.Collectors;
|
|
|
import java.util.stream.StreamSupport;
|
|
|
|
|
|
import static cn.hutool.crypto.SecureUtil.md5;
|
|
|
+import static com.mongodb.client.model.Filters.*;
|
|
|
import static com.winhc.data.push.constant.BaseParam.*;
|
|
|
import static com.winhc.data.push.utils.TransToExcelNewCompany.dataPathPrefix;
|
|
|
import static com.winhc.data.push.utils.TransToExcelNewCompany.run;
|
|
@@ -146,7 +147,7 @@ public class SynDataServiceImpl implements SynDataService {
|
|
|
d.put("tn", tn);
|
|
|
d.put("ds", ds);
|
|
|
mongoTemplate.getCollection(SYN_ANSHUO_TASKS)
|
|
|
- .findOneAndUpdate(Filters.eq("_id", _id), new Document("$set", d), new FindOneAndUpdateOptions().upsert(true));
|
|
|
+ .findOneAndUpdate(eq("_id", _id), new Document("$set", d), new FindOneAndUpdateOptions().upsert(true));
|
|
|
return true;
|
|
|
}
|
|
|
|
|
@@ -157,7 +158,7 @@ public class SynDataServiceImpl implements SynDataService {
|
|
|
d.put("status", status);
|
|
|
d.put("ds", ds);
|
|
|
mongoTemplate.getCollection(collectionName)
|
|
|
- .findOneAndUpdate(Filters.eq("_id", ds), new Document("$set", d), new FindOneAndUpdateOptions().upsert(true));
|
|
|
+ .findOneAndUpdate(eq("_id", ds), new Document("$set", d), new FindOneAndUpdateOptions().upsert(true));
|
|
|
return true;
|
|
|
}
|
|
|
|
|
@@ -198,7 +199,8 @@ public class SynDataServiceImpl implements SynDataService {
|
|
|
//清空七天之前数据
|
|
|
long lastWeek = Long.parseLong(ds) - 7;
|
|
|
long deletedCount = mongoTemplate.getCollection("xf_as_upload_oss_log")
|
|
|
- .deleteMany(Filters.lt("ds", lastWeek)).getDeletedCount();
|
|
|
+ .deleteMany(and(lt("ds", lastWeek), eq("status_code", 200)))
|
|
|
+ .getDeletedCount();
|
|
|
log.info("清空过期数据量 : {}", deletedCount);
|
|
|
MongoDatabase db = mongoTemplate.getDb();
|
|
|
Consumer<List<Document>> func = list -> {
|
|
@@ -208,7 +210,6 @@ public class SynDataServiceImpl implements SynDataService {
|
|
|
copyObject2OSS(keySuffix, ds);
|
|
|
});
|
|
|
};
|
|
|
-
|
|
|
MongoDbFastScan mongoDbFastScan = new MongoDbFastScan(AUCTION_TRACKING_ANSHUO_URL, func, db)
|
|
|
.batchSize(200).threadNum(100);
|
|
|
mongoDbFastScan.scan();
|
|
@@ -222,6 +223,8 @@ public class SynDataServiceImpl implements SynDataService {
|
|
|
d.put("update_time", new Date());
|
|
|
d.put("ds", ds);
|
|
|
int statusCode = 500;
|
|
|
+ UpdateOptions up = new UpdateOptions().upsert(true).bypassDocumentValidation(true);
|
|
|
+ Document id = new Document("_id", md5(keySuffix + ds));
|
|
|
try {
|
|
|
CopyObjectResult copyObjectResult = ossClient.copyObject(project.getSourceBucketName(), sourceKey, project.getTargetBucketName(), targetKey);
|
|
|
statusCode = copyObjectResult.getResponse().getStatusCode();
|
|
@@ -231,10 +234,11 @@ public class SynDataServiceImpl implements SynDataService {
|
|
|
log.error("code : {} \n,key : {}\n,error : {}", statusCode, targetKey, e.getMessage());
|
|
|
int length = e.getMessage().length();
|
|
|
d.put("error_message", length > 1000 ? e.getMessage().substring(0, 1000) : e.getMessage());
|
|
|
+ mongoTemplate.getCollection("xf_as_upload_oss_error_log")
|
|
|
+ .updateOne(id, new Document("$set", d), up);
|
|
|
}
|
|
|
mongoTemplate.getCollection("xf_as_upload_oss_log")
|
|
|
- .updateOne(new Document("_id", md5(keySuffix + ds)), new Document("$set", d)
|
|
|
- , new UpdateOptions().upsert(true).bypassDocumentValidation(true));
|
|
|
+ .updateOne(id, new Document("$set", d), up);
|
|
|
}
|
|
|
|
|
|
public void saveObject(String ds) {
|