xufei 2 лет назад
Родитель
Сommit
60e63e2f15

+ 13 - 1
src/main/java/com/winhc/service/impl/PersonMergeV2Impl.java

@@ -46,9 +46,20 @@ public class PersonMergeV2Impl implements RelationService {
         long start = System.currentTimeMillis();
         Session session = driver.session();
         //1.查询关联人员
+//        final String query_cql = "\nWITH  {batch_list} AS batch_list \n" +
+//                "UNWIND batch_list AS row \n" +
+//                "MATCH (p:" + CompanyEnum.Lable.PERSON.code + "{person_id: row.person_id})-[*0..4]-(q:" + CompanyEnum.Lable.PERSON.code + "{name:row.name})\n" +
+//                "WHERE ID(p) <> ID(q)\n" +
+//                "WITH p.person_id as person_id, p as first_node, apoc.coll.sort(collect(distinct ID(p)) + collect(distinct ID(q))) as all_ids\n" +
+//                "UNWIND all_ids as all_id\n" +
+//                "MATCH(m:" + CompanyEnum.Lable.PERSON.code + ")\n" +
+//                "WHERE ID(m) = all_id\n" +
+//                "WITH person_id, first_node, m as other_node\n" +
+//                "RETURN first_node.person_id as new_human_pid, first_node.name as new_human_name, other_node.person_id as old_human_pid, other_node.name as old_human_name" + "\n";
+
         final String query_cql = "\nWITH  {batch_list} AS batch_list \n" +
                 "UNWIND batch_list AS row \n" +
-                "MATCH (p:" + CompanyEnum.Lable.PERSON.code + "{person_id: row.person_id})-[*0..3]-(q:" + CompanyEnum.Lable.PERSON.code + "{name:row.name})\n" +
+                "MATCH (p:" + CompanyEnum.Lable.PERSON.code + "{person_id: row.person_id})-[r]-(c:"+CompanyEnum.Lable.COMPANY.code+"{company_id:row.company_id})-[*0..3]-(q:" + CompanyEnum.Lable.PERSON.code + "{name:row.name})\n" +
                 "WHERE ID(p) <> ID(q)\n" +
                 "WITH p.person_id as person_id, p as first_node, apoc.coll.sort(collect(distinct ID(p)) + collect(distinct ID(q))) as all_ids\n" +
                 "UNWIND all_ids as all_id\n" +
@@ -57,6 +68,7 @@ public class PersonMergeV2Impl implements RelationService {
                 "WITH person_id, first_node, m as other_node\n" +
                 "RETURN first_node.person_id as new_human_pid, first_node.name as new_human_name, other_node.person_id as old_human_pid, other_node.name as old_human_name" + "\n";
 
+
         log.info("query size: {}, cql:{}", batch_list.size(), query_cql);
 
         List<Map<String, String>> mergeData = trans(esQueryService.queryByDsl(getMergePerson(session, query_cql, new HashMap<String, Object>() {{

+ 4 - 0
src/main/java/com/winhc/utils/CompanyUtils.java

@@ -90,6 +90,8 @@ public class CompanyUtils {
                     Map<String, Object> m = x;
                     String person_id = m.get("start_id").toString();
                     String person_name = m.get("start_name").toString();
+                    String end_id = m.get("end_id").toString();
+                    String end_name = m.get("end_name").toString();
                     return ImmutableMap.<String, Object>of(
                             "_id", person_id,
                             "update_time", formatDate_YYYY_MM_DD_HH_MM_SS(),
@@ -97,6 +99,8 @@ public class CompanyUtils {
                                     "person_id", person_id
                                     , "name", person_name
                                     , "topic_type", "800"
+                                    , "company_id", end_id
+                                    , "company_name", end_name
                             )
                     );
                 }).collect(toList());

+ 16 - 0
src/test/java/com/winhc/test/TestSendKafka.java

@@ -351,4 +351,20 @@ public class TestSendKafka {
         });
     }
 
+    @Test
+    public void sendPerson() {
+        String topic = "test3";
+        Map<String, String> m = new HashMap<String, String>() {
+            {
+                put("person_id", "p4126320f7936e2db919895a70366db64");
+                put("name", "周亚男");
+                put("company_id", "10604df5830d4b0f85352bc3db35e29e");
+                put("topic_type", "800");
+            }
+        };
+        Arrays.asList(m).forEach(x -> {
+            kafkaProduce.produce(topic, JSONObject.toJSONString(x));
+        });
+    }
+
 }