package com.winhc.task; import com.alibaba.fastjson.JSON; import com.mongodb.client.FindIterable; import com.mongodb.client.MongoCollection; import com.mongodb.client.model.Filters; import com.winhc.common.constant.Base; import com.winhc.config.ConfigConstant; import com.winhc.kafka.KafkaProduce; import lombok.AllArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.bson.Document; import org.bson.conversions.Bson; import org.bson.types.ObjectId; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.data.mongodb.core.MongoTemplate; import org.springframework.scheduling.annotation.EnableScheduling; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import java.util.ArrayList; import java.util.List; import static com.mongodb.client.model.Filters.in; import static com.winhc.utils.DateUtil.getMinuteTime; /** * @author π * @Description:天眼查线索数据合并 * @date 2021/6/22 17:07 */ @Component @Slf4j @EnableScheduling @AllArgsConstructor @ConditionalOnProperty(prefix = "scheduling", name = "enabled", havingValue = "true") public class AsynAllPersonMergeTask { private final MongoTemplate mongoTemplate; private final KafkaProduce kafkaProduce; @Autowired ConfigConstant configConstant; //@Scheduled(cron = "50 20 17 07 05 ?") @Scheduled(cron = "*/10 * * * * ?") //@Scheduled(cron = "0 /2 * * * ? ") public void start() throws InterruptedException { log.info("start AsynAllPersonMergeTask !!! "); MongoCollection collection = mongoTemplate.getCollection(Base.PID_ALL_PERSON_MERGE_V9); //1.遍历mongo List ids = new ArrayList<>(); for (Document d : collection.find().batchSize(200).noCursorTimeout(true).limit(200)) { ids.add(new ObjectId(d.get("_id").toString())); d.remove("_id"); kafkaProduce.produce(configConstant.topic_node_relation_union, JSON.toJSONString(d)); } //2.成功后删除 if (!ids.isEmpty()) { long deleteResult = collection.deleteMany(in("_id", ids)).getDeletedCount(); log.info("deleted size : {} ,ids : {}", deleteResult, ids.get(0)); } log.info("stop AsynAllPersonMergeTask !!! "); } }