AsynMergePersonTask.java 2.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. package com.winhc.task;
  2. import com.alibaba.fastjson.JSON;
  3. import com.mongodb.client.FindIterable;
  4. import com.mongodb.client.MongoCollection;
  5. import com.mongodb.client.model.Filters;
  6. import com.winhc.common.constant.Base;
  7. import com.winhc.config.ConfigConstant;
  8. import com.winhc.kafka.KafkaProduce;
  9. import lombok.AllArgsConstructor;
  10. import lombok.extern.slf4j.Slf4j;
  11. import org.bson.Document;
  12. import org.bson.conversions.Bson;
  13. import org.springframework.beans.factory.annotation.Autowired;
  14. import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
  15. import org.springframework.data.mongodb.core.MongoTemplate;
  16. import org.springframework.scheduling.annotation.EnableScheduling;
  17. import org.springframework.scheduling.annotation.Scheduled;
  18. import org.springframework.stereotype.Component;
  19. import java.util.ArrayList;
  20. import java.util.List;
  21. import static com.mongodb.client.model.Filters.in;
  22. import static com.winhc.utils.DateUtil.getMinuteTime;
  23. /**
  24. * @author π
  25. * @Description:拉取合并人员pid
  26. * @date 2021/6/22 17:07
  27. */
  28. @Component
  29. @Slf4j
  30. @EnableScheduling
  31. @AllArgsConstructor
  32. @ConditionalOnProperty(prefix = "scheduling", name = "enabled", havingValue = "true")
  33. public class AsynMergePersonTask {
  34. private final MongoTemplate mongoTemplate;
  35. private final KafkaProduce kafkaProduce;
  36. @Autowired
  37. ConfigConstant configConstant;
  38. @Scheduled(cron = "*/10 * * * * ?")
  39. //@Scheduled(cron = "0 /2 * * * ? ")
  40. public void start() throws InterruptedException {
  41. MongoCollection<Document> collection = mongoTemplate.getCollection(Base.PID_WAIT_MERGE_V9);
  42. while (true) {
  43. //1.查询mongo 2分钟之前数据
  44. Bson query = Filters.and(Filters.lte("update_time", getMinuteTime(-2)));
  45. FindIterable<Document> documents = collection.find(query).batchSize(200).noCursorTimeout(true);
  46. List<String> ids = new ArrayList<>();
  47. for (Document d : documents) {
  48. ids.add(d.get("_id").toString());
  49. kafkaProduce.produce(configConstant.topic_node_relation_union, JSON.toJSONString(d.get("data")));
  50. }
  51. //2.成功后删除
  52. if (!ids.isEmpty()) {
  53. long deleteResult = collection.deleteMany(in("_id", ids)).getDeletedCount();
  54. log.info("deleted size : {} ,ids : {}", deleteResult, ids);
  55. } else {
  56. break;
  57. }
  58. }
  59. }
  60. }