1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465 |
- package com.winhc.task;
- import com.alibaba.fastjson.JSON;
- import com.mongodb.client.FindIterable;
- import com.mongodb.client.MongoCollection;
- import com.mongodb.client.model.Filters;
- import com.winhc.common.constant.Base;
- import com.winhc.config.ConfigConstant;
- import com.winhc.kafka.KafkaProduce;
- import lombok.AllArgsConstructor;
- import lombok.extern.slf4j.Slf4j;
- import org.bson.Document;
- import org.bson.conversions.Bson;
- import org.springframework.beans.factory.annotation.Autowired;
- import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
- import org.springframework.data.mongodb.core.MongoTemplate;
- import org.springframework.scheduling.annotation.EnableScheduling;
- import org.springframework.scheduling.annotation.Scheduled;
- import org.springframework.stereotype.Component;
- import java.util.ArrayList;
- import java.util.List;
- import static com.mongodb.client.model.Filters.in;
- import static com.winhc.utils.DateUtil.getMinuteTime;
- /**
- * @author π
- * @Description:拉取合并人员pid
- * @date 2021/6/22 17:07
- */
- @Component
- @Slf4j
- @EnableScheduling
- @AllArgsConstructor
- @ConditionalOnProperty(prefix = "scheduling", name = "enabled", havingValue = "true")
- public class AsynMergePersonTask {
- private final MongoTemplate mongoTemplate;
- private final KafkaProduce kafkaProduce;
- @Autowired
- ConfigConstant configConstant;
- @Scheduled(cron = "*/10 * * * * ?")
- //@Scheduled(cron = "0 /2 * * * ? ")
- public void start() throws InterruptedException {
- MongoCollection<Document> collection = mongoTemplate.getCollection(Base.PID_WAIT_MERGE_V9);
- while (true) {
- //1.查询mongo 2分钟之前数据
- Bson query = Filters.and(Filters.lte("update_time", getMinuteTime(-2)));
- FindIterable<Document> documents = collection.find(query).batchSize(200).noCursorTimeout(true);
- List<String> ids = new ArrayList<>();
- for (Document d : documents) {
- ids.add(d.get("_id").toString());
- kafkaProduce.produce(configConstant.topic_node_relation_union, JSON.toJSONString(d.get("data")));
- }
- //2.成功后删除
- if (!ids.isEmpty()) {
- long deleteResult = collection.deleteMany(in("_id", ids)).getDeletedCount();
- log.info("deleted size : {} ,ids : {}", deleteResult, ids);
- } else {
- break;
- }
- }
- }
- }
|