12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667 |
- package com.winhc.task;
- import com.alibaba.fastjson.JSON;
- import com.mongodb.client.FindIterable;
- import com.mongodb.client.MongoCollection;
- import com.mongodb.client.model.Filters;
- import com.winhc.common.constant.Base;
- import com.winhc.config.ConfigConstant;
- import com.winhc.kafka.KafkaProduce;
- import lombok.AllArgsConstructor;
- import lombok.extern.slf4j.Slf4j;
- import org.bson.Document;
- import org.bson.conversions.Bson;
- import org.bson.types.ObjectId;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
- import org.springframework.data.mongodb.core.MongoTemplate;
- import org.springframework.scheduling.annotation.EnableScheduling;
- import org.springframework.scheduling.annotation.Scheduled;
- import org.springframework.stereotype.Component;
- import java.util.ArrayList;
- import java.util.List;
- import static com.mongodb.client.model.Filters.in;
- import static com.winhc.utils.DateUtil.getMinuteTime;
- /**
- * @author π
- * @Description:天眼查线索数据合并
- * @date 2021/6/22 17:07
- */
- @Component
- @Slf4j
- @EnableScheduling
- @AllArgsConstructor
- @ConditionalOnProperty(prefix = "scheduling", name = "enabled", havingValue = "true")
- public class AsynAllPersonMergeTask {
- private final MongoTemplate mongoTemplate;
- private final KafkaProduce kafkaProduce;
- @Autowired
- ConfigConstant configConstant;
- //@Scheduled(cron = "50 20 17 07 05 ?")
- @Scheduled(cron = "*/10 * * * * ?")
- //@Scheduled(cron = "0 /2 * * * ? ")
- public void start() throws InterruptedException {
- log.info("start AsynAllPersonMergeTask !!! ");
- MongoCollection<Document> collection = mongoTemplate.getCollection(Base.PID_ALL_PERSON_MERGE_V9);
- //1.遍历mongo
- List<ObjectId> ids = new ArrayList<>();
- for (Document d : collection.find().batchSize(200).noCursorTimeout(true).limit(200)) {
- ids.add(new ObjectId(d.get("_id").toString()));
- d.remove("_id");
- kafkaProduce.produce(configConstant.topic_node_relation_union, JSON.toJSONString(d));
- }
- //2.成功后删除
- if (!ids.isEmpty()) {
- long deleteResult = collection.deleteMany(in("_id", ids)).getDeletedCount();
- log.info("deleted size : {} ,ids : {}", deleteResult, ids.get(0));
- }
- log.info("stop AsynAllPersonMergeTask !!! ");
- }
- }
|