AsynAllPersonMergeTask.java 2.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. package com.winhc.task;
  2. import com.alibaba.fastjson.JSON;
  3. import com.mongodb.client.FindIterable;
  4. import com.mongodb.client.MongoCollection;
  5. import com.mongodb.client.model.Filters;
  6. import com.winhc.common.constant.Base;
  7. import com.winhc.config.ConfigConstant;
  8. import com.winhc.kafka.KafkaProduce;
  9. import lombok.AllArgsConstructor;
  10. import lombok.extern.slf4j.Slf4j;
  11. import org.bson.Document;
  12. import org.bson.conversions.Bson;
  13. import org.bson.types.ObjectId;
  14. import org.springframework.beans.factory.annotation.Autowired;
  15. import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
  16. import org.springframework.data.mongodb.core.MongoTemplate;
  17. import org.springframework.scheduling.annotation.EnableScheduling;
  18. import org.springframework.scheduling.annotation.Scheduled;
  19. import org.springframework.stereotype.Component;
  20. import java.util.ArrayList;
  21. import java.util.List;
  22. import static com.mongodb.client.model.Filters.in;
  23. import static com.winhc.utils.DateUtil.getMinuteTime;
  24. /**
  25. * @author π
  26. * @Description:天眼查线索数据合并
  27. * @date 2021/6/22 17:07
  28. */
  29. @Component
  30. @Slf4j
  31. @EnableScheduling
  32. @AllArgsConstructor
  33. @ConditionalOnProperty(prefix = "scheduling", name = "enabled", havingValue = "true")
  34. public class AsynAllPersonMergeTask {
  35. private final MongoTemplate mongoTemplate;
  36. private final KafkaProduce kafkaProduce;
  37. @Autowired
  38. ConfigConstant configConstant;
  39. //@Scheduled(cron = "50 20 17 07 05 ?")
  40. @Scheduled(cron = "*/10 * * * * ?")
  41. //@Scheduled(cron = "0 /2 * * * ? ")
  42. public void start() throws InterruptedException {
  43. log.info("start AsynAllPersonMergeTask !!! ");
  44. MongoCollection<Document> collection = mongoTemplate.getCollection(Base.PID_ALL_PERSON_MERGE_V9);
  45. //1.遍历mongo
  46. List<ObjectId> ids = new ArrayList<>();
  47. for (Document d : collection.find().batchSize(200).noCursorTimeout(true).limit(200)) {
  48. ids.add(new ObjectId(d.get("_id").toString()));
  49. d.remove("_id");
  50. kafkaProduce.produce(configConstant.topic_node_relation_union, JSON.toJSONString(d));
  51. }
  52. //2.成功后删除
  53. if (!ids.isEmpty()) {
  54. long deleteResult = collection.deleteMany(in("_id", ids)).getDeletedCount();
  55. log.info("deleted size : {} ,ids : {}", deleteResult, ids.get(0));
  56. }
  57. log.info("stop AsynAllPersonMergeTask !!! ");
  58. }
  59. }