Explorar o código

加入合并过滤阈值

xufei %!s(int64=3) %!d(string=hai) anos
pai
achega
6dda1a9ef7

+ 0 - 2
src/main/java/com/winhc/kafka/consumer/KafkaConsumerNeo4jV2.java

@@ -61,13 +61,11 @@ public class KafkaConsumerNeo4jV2 {
         this.map.get(CompanyEnum.TopicType.STAFF_RELATION.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.STAFF_RELATION.CODE));
         this.map.get(CompanyEnum.TopicType.PERSON_NODE_LABEL.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.PERSON_NODE_LABEL.CODE));
         this.map.get(CompanyEnum.TopicType.PERSON_MERGE_V2.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.PERSON_MERGE_V2.CODE));
-        //this.map.get(CompanyEnum.TopicType.PERSON_MERGE_All.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.PERSON_MERGE_All.CODE));
         this.map.get(CompanyEnum.TopicType.PERSON_MERGE_All_V2.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.PERSON_MERGE_All_V2.CODE));
 
         //todo 等两分钟,再触发合并程序
         this.pidToMongoService.save(CompanyUtils.getMergeIds2(listMap));
         //发送合并人员kafka
-        //this.sendMergeService.save(CompanyUtils.getMergeIds(listMap));
         this.map.get(CompanyEnum.TopicType.NODE_RELATION_SUCCESS_STATUS.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.NODE_RELATION_SUCCESS_STATUS.CODE));
 
     }

+ 1 - 0
src/main/java/com/winhc/service/EsQueryService.java

@@ -11,4 +11,5 @@ import java.util.List;
  */
 public interface EsQueryService {
     Collection<List<MergePerson>> queryByDsl(Collection<List<MergePerson>> mergePersonList);
+    Integer queryByDsl(String humanPid);
 }

+ 1 - 0
src/main/java/com/winhc/service/impl/EsQueryServiceImpl.java

@@ -36,6 +36,7 @@ public class EsQueryServiceImpl implements EsQueryService {
         restClient = bbossESStarterEs5.getRestClient("es5");
     }
 
+    @Override
     public Integer queryByDsl(String human_pid) {
         String res = restClient.executeHttp("winhc_company_human_pid_mapping_v9/_count", EsQueryDsl.queryBoss(human_pid), ClientInterface.HTTP_POST);
         return JSONObject.parseObject(res).getInteger("count");

+ 11 - 3
src/main/java/com/winhc/task/AsynMergePersonTask.java

@@ -7,6 +7,7 @@ import com.mongodb.client.model.Filters;
 import com.winhc.common.constant.Base;
 import com.winhc.config.ConfigConstant;
 import com.winhc.kafka.KafkaProduce;
+import com.winhc.service.EsQueryService;
 import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.bson.Document;
@@ -38,13 +39,15 @@ public class AsynMergePersonTask {
 
     private final MongoTemplate mongoTemplate;
     private final KafkaProduce kafkaProduce;
+    private final EsQueryService esQueryService;
     @Autowired
     ConfigConstant configConstant;
 
     @Scheduled(cron = "*/15 * * * * ?")
     //@Scheduled(cron = "0 /2 * * * ? ")
     public void start() throws InterruptedException {
-        if(isWindows()) return;
+        if (isWindows()) return;
+        log.info("start AsynMergePersonTask !!! ");
         MongoCollection<Document> collection = mongoTemplate.getCollection(Base.PID_WAIT_MERGE_V9);
         while (true) {
             //1.查询mongo 2分钟之前数据
@@ -52,8 +55,12 @@ public class AsynMergePersonTask {
             FindIterable<Document> documents = collection.find(query).batchSize(200).noCursorTimeout(true);
             List<String> ids = new ArrayList<>();
             for (Document d : documents) {
-                ids.add(d.get("_id").toString());
-                kafkaProduce.produce(configConstant.topic_node_relation_union, JSON.toJSONString(d.get("data")));
+                String human_pid = d.get("_id").toString();
+                ids.add(human_pid);
+                Integer count = esQueryService.queryByDsl(human_pid);
+                if (count < 100) {
+                    kafkaProduce.produce(configConstant.topic_node_relation_union, JSON.toJSONString(d.get("data")));
+                }
             }
             //2.成功后删除
             if (!ids.isEmpty()) {
@@ -63,5 +70,6 @@ public class AsynMergePersonTask {
                 break;
             }
         }
+        log.info("stop AsynMergePersonTask !!! ");
     }
 }