Ver Fonte

合并逻辑依赖外部数据源

xufei há 3 anos atrás
pai
commit
2af43b7ffa

+ 1 - 1
src/main/java/com/winhc/config/KafkaConfig.java

@@ -102,7 +102,7 @@ public class KafkaConfig {
         // 设置是否自动提交
         props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
         // 一次拉取消息数量
-        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 200);
+        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 20);
         props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 5*60*1000);
         props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 5*60*1000);
         // 最大处理时间

+ 3 - 6
src/main/java/com/winhc/kafka/consumer/KafkaConsumerPersonIdUpdate.java

@@ -3,14 +3,13 @@ package com.winhc.kafka.consumer;
 
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONArray;
-import com.alibaba.fastjson.JSONAware;
 import com.alibaba.fastjson.JSONObject;
 import com.mongodb.client.MongoCollection;
 import com.winhc.common.constant.Base;
 import com.winhc.config.ConfigConstant;
 import com.winhc.kafka.KafkaProduce;
 import com.winhc.utils.CompanyUtils;
-import com.winhc.utils.ESUtils;
+import com.winhc.utils.EsQueryDsl;
 import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
@@ -31,11 +30,9 @@ import javax.annotation.PostConstruct;
 import java.util.*;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ExecutionException;
 import java.util.function.Function;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 /**
  * @author π
@@ -97,8 +94,8 @@ public class KafkaConsumerPersonIdUpdate {
             return CompletableFuture.completedFuture(null);
         }
 
-        CompletableFuture<Boolean> v1 = updateByQuery(old_human_pid, new_human_pid, "winhc_company_human_pid_mapping_v9", ESUtils.updateHumanMappingId(old_human_pid, new_human_pid));
-        CompletableFuture<Boolean> v2 = updateByQuery(old_human_pid, new_human_pid, "winhc-company-human-relation-v9", ESUtils.updateBossId(old_human_pid));
+        CompletableFuture<Boolean> v1 = updateByQuery(old_human_pid, new_human_pid, "winhc_company_human_pid_mapping_v9", EsQueryDsl.updateHumanMappingId(old_human_pid, new_human_pid));
+        CompletableFuture<Boolean> v2 = updateByQuery(old_human_pid, new_human_pid, "winhc-company-human-relation-v9", EsQueryDsl.updateBossId(old_human_pid));
 
         return CompletableFuture.allOf(v1, v2).thenApplyAsync(x -> {
             try {

+ 14 - 0
src/main/java/com/winhc/service/EsQueryService.java

@@ -0,0 +1,14 @@
+package com.winhc.service;
+
+import com.winhc.pojo.MergePerson;
+
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * @author π
+ * @date 2021/11/19 15:06
+ */
+public interface EsQueryService {
+    Collection<List<MergePerson>> queryByDsl(Collection<List<MergePerson>> mergePersonList);
+}

+ 53 - 0
src/main/java/com/winhc/service/impl/EsQueryServiceImpl.java

@@ -0,0 +1,53 @@
+package com.winhc.service.impl;
+
+import com.alibaba.fastjson.JSONObject;
+import com.winhc.pojo.MergePerson;
+import com.winhc.service.EsQueryService;
+import com.winhc.utils.EsQueryDsl;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.frameworkset.elasticsearch.boot.BBossESStarter;
+import org.frameworkset.elasticsearch.client.ClientInterface;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.stereotype.Service;
+
+import javax.annotation.PostConstruct;
+import java.util.Collection;
+import java.util.List;
+
+/**
+ * @author π
+ * @Description:
+ * @date 2022/5/25 19:22
+ */
+@Slf4j
+@Service
+@AllArgsConstructor
+public class EsQueryServiceImpl implements EsQueryService {
+
+    @Autowired
+    @Qualifier("bbossESStarterEs5")
+    private BBossESStarter bbossESStarterEs5;
+    private ClientInterface restClient;
+
+    @PostConstruct
+    public void init() {
+        restClient = bbossESStarterEs5.getRestClient("es5");
+    }
+
+    public Integer queryByDsl(String human_pid) {
+        String res = restClient.executeHttp("winhc_company_human_pid_mapping_v9/_count", EsQueryDsl.queryBoss(human_pid), ClientInterface.HTTP_POST);
+        return JSONObject.parseObject(res).getInteger("count");
+    }
+
+    @Override
+    public Collection<List<MergePerson>> queryByDsl(Collection<List<MergePerson>> mergePersonList) {
+        mergePersonList.forEach(p -> p.forEach(m -> {
+            m.setCnt(queryByDsl(m.getOld_human_pid()));
+        }));
+        return mergePersonList;
+    }
+
+
+}

+ 11 - 9
src/main/java/com/winhc/service/impl/PersonMergeV2Impl.java

@@ -3,9 +3,8 @@ package com.winhc.service.impl;
 import com.winhc.common.enums.CompanyEnum;
 import com.winhc.config.ConfigConstant;
 import com.winhc.kafka.KafkaProduce;
-import com.winhc.pojo.MergePerson;
+import com.winhc.service.EsQueryService;
 import com.winhc.service.RelationService;
-import com.winhc.utils.CompanyUtils;
 import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
@@ -15,12 +14,11 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.beans.factory.annotation.Qualifier;
 import org.springframework.stereotype.Service;
 
-import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
-import static com.winhc.utils.CompanyUtils.mergePerson;
+import static com.winhc.utils.CompanyUtils.*;
 
 /**
  * @author π
@@ -39,6 +37,8 @@ public class PersonMergeV2Impl implements RelationService {
     KafkaProduce kafkaProduce;
     @Autowired
     ConfigConstant configConstant;
+    @Autowired
+    EsQueryService esQueryService;
 
     @Override
     public String save(List<Map<String, Object>> batch_list) {
@@ -52,14 +52,16 @@ public class PersonMergeV2Impl implements RelationService {
                 "WHERE ID(p) <> ID(q)\n" +
                 "WITH p.person_id as person_id, p as first_node, apoc.coll.sort(collect(distinct ID(p)) + collect(distinct ID(q))) as all_ids\n" +
                 "UNWIND all_ids as all_id\n" +
-                "MATCH(m:" + CompanyEnum.Lable.PERSON.code + ")-[r]-(x:" + CompanyEnum.Lable.COMPANY.code + ")\n" +
+                "MATCH(m:" + CompanyEnum.Lable.PERSON.code + ")\n" +
                 "WHERE ID(m) = all_id\n" +
-                "WITH person_id, first_node, m as other_node, count(distinct x) as cnt\n" +
-                "RETURN first_node.person_id as new_human_pid, first_node.name as new_human_name, other_node.person_id as old_human_pid, other_node.name as old_human_name, cnt" + "\n";
+                "WITH person_id, first_node, m as other_node\n" +
+                "RETURN first_node.person_id as new_human_pid, first_node.name as new_human_name, other_node.person_id as old_human_pid, other_node.name as old_human_name" + "\n";
+
         log.info("query size: {}, cql:{}", batch_list.size(), query_cql);
-        List<Map<String, String>> mergeData = CompanyUtils.writeNeo4j1(session, query_cql, new HashMap<String, Object>() {{
+
+        List<Map<String, String>> mergeData = trans(esQueryService.queryByDsl(getMergePerson(session, query_cql, new HashMap<String, Object>() {{
             put("batch_list", batch_list);
-        }});
+        }})));
 
         //2.合并逻辑
         String data = mergePerson(session, mergeData);

+ 2 - 3
src/main/java/com/winhc/task/AsynAllPersonMergeTask.java

@@ -1,16 +1,14 @@
 package com.winhc.task;
 
 import com.alibaba.fastjson.JSON;
-import com.mongodb.client.FindIterable;
 import com.mongodb.client.MongoCollection;
-import com.mongodb.client.model.Filters;
 import com.winhc.common.constant.Base;
 import com.winhc.config.ConfigConstant;
 import com.winhc.kafka.KafkaProduce;
+import com.winhc.utils.CompanyUtils;
 import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.bson.Document;
-import org.bson.conversions.Bson;
 import org.bson.types.ObjectId;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
@@ -47,6 +45,7 @@ public class AsynAllPersonMergeTask {
     @Scheduled(cron = "*/10 * * * * ?")
     //@Scheduled(cron = "0 /2 * * * ? ")
     public void start() throws InterruptedException {
+        if(CompanyUtils.isWindows()) return;
         log.info("start AsynAllPersonMergeTask !!! ");
         MongoCollection<Document> collection = mongoTemplate.getCollection(Base.PID_ALL_PERSON_MERGE_V9);
         //1.遍历mongo

+ 1 - 1
src/main/java/com/winhc/task/AsynMergePersonTask.java

@@ -58,7 +58,7 @@ public class AsynMergePersonTask {
             //2.成功后删除
             if (!ids.isEmpty()) {
                 long deleteResult = collection.deleteMany(in("_id", ids)).getDeletedCount();
-                log.info("deleted size : {} ,ids : {}", deleteResult, ids);
+                log.info("deleted size : {} ,ids : {}", deleteResult, ids.get(0));
             } else {
                 break;
             }

+ 33 - 43
src/main/java/com/winhc/utils/CompanyUtils.java

@@ -4,7 +4,6 @@ import cn.hutool.json.JSONUtil;
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
-import com.alibaba.fastjson.serializer.SerializerFeature;
 import com.google.common.collect.ImmutableMap;
 import com.winhc.common.enums.CompanyEnum;
 import com.winhc.db.mongodb.dataobject.NodeRelationError;
@@ -83,21 +82,6 @@ public class CompanyUtils {
         return list.stream().filter(r -> (r.getOrDefault("topic_type", "-1").equals(type))).collect(toList());
     }
 
-    public static List<String> getMergeIds(List<Map<String, Object>> list) {
-        return list.stream()
-                .filter(r -> r.getOrDefault("start_id", "0").toString().length() == 33)
-                .collect(Collectors.toMap(t -> t.getOrDefault("start_id", "0").toString(), t -> t, (n, o) -> n))
-                .values().stream().map(x -> {
-                    Map<String, String> m = (Map) x;
-                    ImmutableMap<String, String> m2 = ImmutableMap.of(
-                            "person_id", m.get("start_id")
-                            , "name", m.get("start_name")
-                            , "topic_type", "800"
-                    );
-                    return JSONObject.toJSONString(m2, SerializerFeature.WriteMapNullValue);
-                }).collect(toList());
-    }
-
     public static List<Map<String, Object>> getMergeIds2(List<Map<String, Object>> list) {
         return list.stream()
                 .filter(r -> r.getOrDefault("start_id", "0").toString().length() == 33)
@@ -151,10 +135,7 @@ public class CompanyUtils {
     }
 
     public static String writeNeo4j2(Session session, String cql, HashMap<String, Object> parameters) {
-        List<Record> dataList = session.writeTransaction(tx -> {
-            Result result = tx.run(cql, parameters);
-            return result.list();
-        });
+        List<Record> dataList = getRecords(session, cql, parameters);
         Map<String, MergePerson> data = dataList.stream()
                 .map(record -> JSON.parseObject(JSON.toJSONString(record.asMap()), MergePerson.class))
                 .collect(Collectors.toMap(MergePerson::getId, t -> t, (n, o) -> n));
@@ -163,26 +144,25 @@ public class CompanyUtils {
     }
 
     public static List<Map<String, String>> writeNeo4j1(Session session, String cql, HashMap<String, Object> parameters) {
-        List<Record> dataList = session.writeTransaction(tx -> {
-            Result result = tx.run(cql, parameters);
-            return result.list();
-        });
-        Map<String, List<MergePerson>> collect = dataList.stream()
-                .map(record -> JSON.parseObject(JSON.toJSONString(record.asMap()), MergePerson.class))
-                .collect(Collectors.groupingBy(MergePerson::getNew_human_pid));
+        Collection<List<MergePerson>> collect = getMergePerson(session, cql, parameters);
+        return trans(collect);
+
+    }
+
+    public static List<Map<String, String>> trans(Collection<List<MergePerson>> collect) {
         List<Map<String, String>> list = new ArrayList<>();
-        collect.forEach((k, v) -> {
-            v.stream().max(Comparator.comparing(MergePerson::getCnt)).ifPresent(maxPerson -> v.forEach(m -> {
-                if (!m.getOld_human_pid().equals(maxPerson.getOld_human_pid())) {
-                    HashMap<String, String> map = new HashMap<>();
-                    map.put("new_human_pid", maxPerson.getOld_human_pid());
-                    map.put("new_human_name", maxPerson.getOld_human_name());
-                    map.put("old_human_pid", m.getOld_human_pid());
-                    map.put("old_human_name", m.getOld_human_name());
-                    list.add(map);
-                }
-            }));
-        });
+        collect.forEach(v -> v.stream()
+                .max(Comparator.comparing(MergePerson::getCnt))
+                .ifPresent(maxPerson -> v.forEach(m -> {
+                    if (!m.getOld_human_pid().equals(maxPerson.getOld_human_pid())) {
+                        HashMap<String, String> map = new HashMap<>();
+                        map.put("new_human_pid", maxPerson.getOld_human_pid());
+                        map.put("new_human_name", maxPerson.getOld_human_name());
+                        map.put("old_human_pid", m.getOld_human_pid());
+                        map.put("old_human_name", m.getOld_human_name());
+                        list.add(map);
+                    }
+                })));
         return list.stream().collect(
                 Collectors.collectingAndThen(
                         Collectors.toCollection(
@@ -190,14 +170,24 @@ public class CompanyUtils {
                         ), ArrayList::new
                 )
         );
+    }
 
+    public static Collection<List<MergePerson>> getMergePerson(Session session, String cql, HashMap<String, Object> parameters) {
+        List<Record> dataList = getRecords(session, cql, parameters);
+        return dataList.stream()
+                .map(record -> JSON.parseObject(JSON.toJSONString(record.asMap()), MergePerson.class))
+                .collect(Collectors.groupingBy(MergePerson::getNew_human_pid)).values();
     }
 
-    public static List<Map<String, String>> writeNeo4j3(Session session, String cql, HashMap<String, Object> parameters) {
-        List<Record> dataList = session.writeTransaction(tx -> {
+    private static List<Record> getRecords(Session session, String cql, HashMap<String, Object> parameters) {
+        return session.writeTransaction(tx -> {
             Result result = tx.run(cql, parameters);
             return result.list();
         });
+    }
+
+    public static List<Map<String, String>> writeNeo4j3(Session session, String cql, HashMap<String, Object> parameters) {
+        List<Record> dataList = getRecords(session, cql, parameters);
         Set<String> newIds = new HashSet<>();
         Set<String> oldIds = new HashSet<>();
         List<Map<String, String>> list = dataList.stream()
@@ -266,8 +256,8 @@ public class CompanyUtils {
         }
     }
 
-   public static String mergePerson(Session session, List<Map<String, String>> mergeData) {
-       String data = null;
+    public static String mergePerson(Session session, List<Map<String, String>> mergeData) {
+        String data = null;
         if (!mergeData.isEmpty()) {
             final String merge_cql =
                     "\nWITH  {merge_list} AS batch_list \n" +

+ 0 - 43
src/main/java/com/winhc/utils/ESUtils.java

@@ -1,43 +0,0 @@
-package com.winhc.utils;
-
-/**
- * @author π
- * @Description:
- * @date 2022/1/10 14:13
- */
-public class ESUtils {
-    public static String updateHumanMappingId(String old_human_pid, String new_human_pid) {
-        String dsl = "{\n" +
-                "  \"query\": {\n" +
-                "    \"term\": {\n" +
-                "      \"human_pid\": {\n" +
-                "        \"value\": \"" + old_human_pid + "\"\n" +
-                "      }\n" +
-                "    }\n" +
-                "  },\n" +
-                "  \"script\": {\n" +
-                "    \"inline\": \"ctx._source['human_pid'] = params.new_human_pid \",\n" +
-                "    \"params\": {\n" +
-                "      \"new_human_pid\": \"" + new_human_pid + "\"\n" +
-                "    }\n" +
-                "  }\n" +
-                "}";
-        return dsl;
-    }
-
-    public static String updateBossId(String old_human_pid) {
-        String dsl = "{\n" +
-                "  \"query\": {\n" +
-                "    \"term\": {\n" +
-                "      \"_id\": {\n" +
-                "        \"value\": \"" + old_human_pid + "\"\n" +
-                "      }\n" +
-                "    }\n" +
-                "  },\n" +
-                "  \"script\": {\n" +
-                "    \"inline\": \"ctx._source['deleted'] = '9' \"\n" +
-                "  }\n" +
-                "}";
-        return dsl;
-    }
-}

+ 70 - 0
src/main/java/com/winhc/utils/EsQueryDsl.java

@@ -0,0 +1,70 @@
+package com.winhc.utils;
+
+/**
+ * @author π
+ * @Description:
+ * @date 2022/1/10 14:13
+ */
+public class EsQueryDsl {
+    public static String updateHumanMappingId(String old_human_pid, String new_human_pid) {
+        String dsl = "{\n" +
+                "  \"query\": {\n" +
+                "    \"term\": {\n" +
+                "      \"human_pid\": {\n" +
+                "        \"value\": \"" + old_human_pid + "\"\n" +
+                "      }\n" +
+                "    }\n" +
+                "  },\n" +
+                "  \"script\": {\n" +
+                "    \"inline\": \"ctx._source['human_pid'] = params.new_human_pid \",\n" +
+                "    \"params\": {\n" +
+                "      \"new_human_pid\": \"" + new_human_pid + "\"\n" +
+                "    }\n" +
+                "  }\n" +
+                "}";
+        return dsl;
+    }
+
+    public static String updateBossId(String old_human_pid) {
+        String dsl = "{\n" +
+                "  \"query\": {\n" +
+                "    \"term\": {\n" +
+                "      \"_id\": {\n" +
+                "        \"value\": \"" + old_human_pid + "\"\n" +
+                "      }\n" +
+                "    }\n" +
+                "  },\n" +
+                "  \"script\": {\n" +
+                "    \"inline\": \"ctx._source['deleted'] = '9' \"\n" +
+                "  }\n" +
+                "}";
+        return dsl;
+    }
+
+    public static String queryBoss(String human_pid) {
+        String dsl = "{\n" +
+                "  \"query\": {\n" +
+                "    \"bool\": {\n" +
+                "      \"must\": [\n" +
+                "        {\n" +
+                "          \"term\": {\n" +
+                "            \"human_pid\": {\n" +
+                "              \"value\": \"" + human_pid + "\"\n" +
+                "            }\n" +
+                "          }\n" +
+                "        },\n" +
+                "        {\n" +
+                "          \"terms\": {\n" +
+                "            \"deleted\": [\n" +
+                "              \"0\",\n" +
+                "              \"1\"\n" +
+                "            ]\n" +
+                "          }\n" +
+                "        }\n" +
+                "      ]\n" +
+                "    }\n" +
+                "  }\n" +
+                "}";
+        return dsl;
+    }
+}

+ 1 - 1
src/main/resources/application.properties

@@ -1,4 +1,4 @@
-spring.profiles.active=prd
+spring.profiles.active=${K_PROFILES_ACTIVE:prd}
 
 ###server
 server.port=9098

+ 3 - 1
src/test/java/com/winhc/test/TestMongo.java

@@ -3,6 +3,7 @@ package com.winhc.test;
 import com.google.common.collect.ImmutableMap;
 import com.winhc.db.mongodb.dataobject.NodeRelationError;
 import com.winhc.db.mongodb.repository.NodeRelatonErrorRepository;
+import com.winhc.service.EsQueryService;
 import com.winhc.service.PidToMongoService;
 import com.winhc.service.impl.NodeRelationSuccessStatusServiceImpl;
 import org.junit.Test;
@@ -33,7 +34,8 @@ public class TestMongo {
     @Autowired
     PidToMongoService pidToMongoService;
 
-
+    @Autowired
+    EsQueryService esQueryService;
 
     @Test
     public void saveData() {

+ 2 - 2
src/test/java/com/winhc/test/TestSendKafka.java

@@ -174,8 +174,8 @@ public class TestSendKafka {
             }
         };
         Arrays.asList(m, m0, m1, m2, m3, m4).forEach(x -> {
-//            kafkaProduce.produce(topic, JSONObject.toJSONString(x));
-            mongoTemplate.save(x, "pid_all_person_merge_v9");
+            kafkaProduce.produce(topic, JSONObject.toJSONString(x));
+//            mongoTemplate.save(x, "pid_all_person_merge_v9");
         });
     }