|
@@ -0,0 +1,166 @@
|
|
|
+package com.winhc.data.push.service.impl;
|
|
|
+
|
|
|
+import cn.cocowwy.dingtalk.DingTalkGroupApi;
|
|
|
+import com.aliyun.openservices.shade.org.apache.commons.lang3.StringUtils;
|
|
|
+import com.aliyun.oss.OSS;
|
|
|
+import com.aliyun.oss.model.PutObjectRequest;
|
|
|
+import com.mongodb.client.FindIterable;
|
|
|
+import com.mongodb.client.MongoDatabase;
|
|
|
+import com.mongodb.client.model.Filters;
|
|
|
+import com.mongodb.client.model.FindOneAndUpdateOptions;
|
|
|
+import com.mongodb.client.model.UpdateOneModel;
|
|
|
+import com.mongodb.client.model.UpdateOptions;
|
|
|
+import com.winhc.data.push.configuration.OSSAccessProperties;
|
|
|
+import com.winhc.data.push.framework.MongoDbFastScan;
|
|
|
+import com.winhc.data.push.service.SynDataService;
|
|
|
+import com.winhc.data.push.service.TouchService;
|
|
|
+import lombok.AllArgsConstructor;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+import org.bson.Document;
|
|
|
+import org.springframework.data.mongodb.core.MongoTemplate;
|
|
|
+import org.springframework.stereotype.Service;
|
|
|
+
|
|
|
+import java.io.ByteArrayInputStream;
|
|
|
+import java.util.Collections;
|
|
|
+import java.util.Date;
|
|
|
+import java.util.List;
|
|
|
+import java.util.function.Consumer;
|
|
|
+import java.util.stream.Collectors;
|
|
|
+import java.util.stream.Stream;
|
|
|
+import java.util.stream.StreamSupport;
|
|
|
+
|
|
|
+import static com.winhc.data.push.constant.BaseParam.AUCTION_TRACKING_ANSHUO_URL;
|
|
|
+import static com.winhc.data.push.constant.BaseParam.SYN_ANSHUO_TASKS;
|
|
|
+
|
|
|
+/**
|
|
|
+ * @author π
|
|
|
+ * @Description:
|
|
|
+ * @date 2022/7/20 17:02
|
|
|
+ */
|
|
|
+@Slf4j
|
|
|
+@Service
|
|
|
+@AllArgsConstructor
|
|
|
+public class SynDataServiceImpl implements SynDataService {
|
|
|
+
|
|
|
+ private final MongoTemplate mongoTemplate;
|
|
|
+ private final OSS ossClient;
|
|
|
+ private final OSSAccessProperties project;
|
|
|
+ private final TouchService touchService;
|
|
|
+ private final DingTalkGroupApi dingTalkApi;
|
|
|
+
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void push(String tn, String ds) {
|
|
|
+ String id = tn + "_" + ds;
|
|
|
+ try {
|
|
|
+ updateTask(tn, ds, "running");
|
|
|
+ sendMessage("task running !!! " + id);
|
|
|
+ String project = "winhc_ng";
|
|
|
+ if (touchService.export2Mongo(project, ds)) {
|
|
|
+ copyOSS(ds);
|
|
|
+ if (touchService.export2OSS(project, ds)) {
|
|
|
+ saveObject(ds);
|
|
|
+ updateTask(tn, ds, "success");
|
|
|
+ sendMessage("导出 任务 成功 !!! " + id);
|
|
|
+ } else {
|
|
|
+ log.error("导出OSS 失败 !!!");
|
|
|
+ sendMessage("导出 OSS 失败!!! " + id);
|
|
|
+ updateTask(tn, ds, "oss copy error");
|
|
|
+ }
|
|
|
+ } else {
|
|
|
+ log.error("导出Mongo 失败!!!");
|
|
|
+ sendMessage("导出 Mongo 失败!!! " + id);
|
|
|
+ updateTask(tn, ds, "export mongo error");
|
|
|
+ }
|
|
|
+
|
|
|
+ } catch (Exception e) {
|
|
|
+ sendMessage("任务失败!!! " + id + " | error " + e.getMessage());
|
|
|
+ updateTask(tn, ds, "task error");
|
|
|
+ e.printStackTrace();
|
|
|
+ log.error("export error: {}", e.getMessage());
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public String saveTask(String tn, String ds) {
|
|
|
+ String id = tn + "_" + ds;
|
|
|
+ if (findTaskExists(new Document("_id", id))) {
|
|
|
+ return "task repeat";
|
|
|
+ } else {
|
|
|
+ Document d = new Document();
|
|
|
+ d.put("_id", id);
|
|
|
+ d.put("create_time", new Date());
|
|
|
+ d.put("update_time", new Date());
|
|
|
+ d.put("status", "create");
|
|
|
+ d.put("tn", tn);
|
|
|
+ d.put("ds", ds);
|
|
|
+ mongoTemplate.getCollection(SYN_ANSHUO_TASKS).insertOne(d);
|
|
|
+ return "task create success";
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public void sendMessage(String message) {
|
|
|
+ try {
|
|
|
+ dingTalkApi.sendTextByPhones("data_push_message", message, Collections.singletonList("17602140784"));
|
|
|
+ } catch (Exception e) {
|
|
|
+ log.error("send message error : {}", e.getMessage());
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Boolean findTaskExists(Document doc) {
|
|
|
+ return mongoTemplate.getCollection(SYN_ANSHUO_TASKS).find(doc).iterator().hasNext();
|
|
|
+ }
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public List<Document> findAllTask(Document doc) {
|
|
|
+ FindIterable<Document> it = mongoTemplate.getCollection(SYN_ANSHUO_TASKS).find(doc);
|
|
|
+ return StreamSupport.stream(it.spliterator(), false).collect(Collectors.toList());
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ @Override
|
|
|
+ public Boolean updateTask(String tn, String ds, String status) {
|
|
|
+ String _id = tn + "_" + ds;
|
|
|
+ Document d = new Document();
|
|
|
+ d.put("update_time", new Date());
|
|
|
+ d.put("status", status);
|
|
|
+ d.put("tn", tn);
|
|
|
+ d.put("ds", ds);
|
|
|
+ mongoTemplate.getCollection(SYN_ANSHUO_TASKS)
|
|
|
+ .findOneAndUpdate(Filters.eq("_id", _id), new Document("$set", d), new FindOneAndUpdateOptions().upsert(true));
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+ private void copyOSS(String ds) {
|
|
|
+ MongoDatabase db = mongoTemplate.getDb();
|
|
|
+ Consumer<List<Document>> func = list -> {
|
|
|
+ list.stream().map(d -> d.getString("file_path"))
|
|
|
+ .filter(StringUtils::isNotEmpty)
|
|
|
+ .forEach(keySuffix -> {
|
|
|
+ copyObject2OSS(keySuffix, ds);
|
|
|
+ });
|
|
|
+ };
|
|
|
+
|
|
|
+ MongoDbFastScan mongoDbFastScan = new MongoDbFastScan(AUCTION_TRACKING_ANSHUO_URL, func, db)
|
|
|
+ .batchSize(200).threadNum(10);
|
|
|
+ mongoDbFastScan.scan();
|
|
|
+ }
|
|
|
+
|
|
|
+ public void copyObject2OSS(String keySuffix, String ds) {
|
|
|
+ String sourceKey = project.getSourcePathPrefix() + keySuffix;
|
|
|
+ String targetKey = project.getTargetPathprefix() + ds + "/file/" + keySuffix;
|
|
|
+ ossClient.copyObject(project.getSourceBucketName(), sourceKey, project.getTargetBucketName(), targetKey);
|
|
|
+ log.info(targetKey);
|
|
|
+ }
|
|
|
+
|
|
|
+ public void saveObject(String ds) {
|
|
|
+ String targetKey = project.getTargetPathprefix() + ds + "/anshuo.txt";
|
|
|
+ PutObjectRequest putObjectRequest = new PutObjectRequest(project.getTargetBucketName(), targetKey, new ByteArrayInputStream("".getBytes()));
|
|
|
+ ossClient.putObject(putObjectRequest);
|
|
|
+
|
|
|
+ }
|
|
|
+}
|