Browse Source

更新测试

xufei 4 years ago
parent
commit
c64650f6d3

+ 4 - 1
src/main/java/com/winhc/common/enums/CompanyEnum.java

@@ -22,7 +22,10 @@ public class CompanyEnum {
         LEGAL_ENTITY_RELATION_V2("5", "legalEntityRelationV2ServiceImpl"),
         STAFF_RELATION("6", "staffRelationServiceImpl"),
         PERSON_NODE_LABEL("7", "personNodeLabelServiceImpl"),
-        NODE_RELATION_SUCCESS_STATUS("100", "nodeRelationSuccessStatusServiceImpl");
+        NODE_RELATION_SUCCESS_STATUS("100", "nodeRelationSuccessStatusServiceImpl"),
+        PERSON_COMPANYS("200", "personCompanysImpl"),
+        PERSON_MERGE("300", "personMergeImpl")
+        ;
 
         public final String CODE;
         public final String VALUE;

+ 51 - 0
src/main/java/com/winhc/kafka/consumer/KafkaConsumerPersonCompanys.java

@@ -0,0 +1,51 @@
+package com.winhc.kafka.consumer;
+
+import cn.hutool.json.JSONUtil;
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import com.winhc.common.enums.CompanyEnum;
+import com.winhc.db.mongodb.dataobject.NodeRelationError;
+import com.winhc.service.RelationService;
+import com.winhc.utils.CompanyUtils;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.springframework.context.annotation.Bean;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;
+import org.springframework.stereotype.Service;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+/**
+ * @author π
+ * @Description:
+ * @date 2021/4/8 16:12
+ */
+@Slf4j
+//@Service
+@AllArgsConstructor
+public class KafkaConsumerPersonCompanys {
+
+    private final Map<String, RelationService> map;
+
+    @KafkaListener(id = "${spring.kafka.topic_person_companys}"
+            , topics = "${spring.kafka.topic_person_companys}"
+            , groupId = "${spring.kafka.consumer.group-id}", containerFactory = "containerFactory", errorHandler = "KafkaConsumerPersonCompanys")
+    public void consumerCompanyNode(List<ConsumerRecord<?, ?>> records) {
+        List<Map<String, Object>> list = CompanyUtils.mapPlus(records);
+        this.map.get(CompanyEnum.TopicType.PERSON_COMPANYS.VALUE).save(list);
+    }
+
+    /**
+     * 因为手动确认消费,若消费失败,记录重刷
+     */
+    @Bean("KafkaConsumerPersonCompanys")
+    public ConsumerAwareListenerErrorHandler dealError() {
+        return (message, e, consumer) -> {
+            System.out.println(message);
+            return null;
+        };
+    }
+}

+ 47 - 0
src/main/java/com/winhc/kafka/consumer/KafkaConsumerPersonMerge.java

@@ -0,0 +1,47 @@
+package com.winhc.kafka.consumer;
+
+import com.winhc.common.enums.CompanyEnum;
+import com.winhc.service.RelationService;
+import com.winhc.utils.CompanyUtils;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.springframework.context.annotation.Bean;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;
+import org.springframework.stereotype.Service;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author π
+ * @Description:人员合并
+ * @date 2021/4/8 16:12
+ */
+@Slf4j
+//@Service
+@AllArgsConstructor
+public class KafkaConsumerPersonMerge {
+
+    private final Map<String, RelationService> map;
+
+    @KafkaListener(id = "${spring.kafka.topic_person_merge}"
+            , topics = "${spring.kafka.topic_person_merge}"
+            , groupId = "${spring.kafka.consumer.group-id}", containerFactory = "containerFactory", errorHandler = "KafkaConsumerPersonMerge")
+    public void consumerCompanyNode(List<ConsumerRecord<?, ?>> records) {
+        List<Map<String, Object>> list = CompanyUtils.mapPlusV1(records);
+        this.map.get(CompanyEnum.TopicType.PERSON_MERGE.VALUE).save(list);
+    }
+
+    /**
+     * 因为手动确认消费,若消费失败,记录重刷
+     */
+    @Bean("KafkaConsumerPersonMerge")
+    public ConsumerAwareListenerErrorHandler dealError() {
+        return (message, e, consumer) -> {
+            System.out.println(message);
+            return null;
+        };
+    }
+}

+ 66 - 0
src/main/java/com/winhc/service/impl/PersonCompanysImpl.java

@@ -0,0 +1,66 @@
+package com.winhc.service.impl;
+
+import com.alibaba.fastjson.JSON;
+import com.winhc.common.enums.CompanyEnum;
+import com.winhc.service.RelationService;
+import com.winhc.utils.CompanyUtils;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.neo4j.driver.Driver;
+import org.neo4j.driver.Session;
+import org.springframework.stereotype.Service;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author π
+ * @Description:股东关系 人->公司
+ * @date 2021/1/11 10:03
+ */
+@Slf4j
+@Service("personCompanysImpl")
+@AllArgsConstructor
+public class PersonCompanysImpl implements RelationService {
+
+    private final Driver driver;
+
+    @Override
+    public String save(List<Map<String, Object>> batch_list) {
+        if (batch_list.isEmpty()) return null;
+        long start = System.currentTimeMillis();
+        Session session = driver.session();
+//        final String cql =
+//                        "\nWITH  {batch_list} AS batch_list \n" +
+//                           "UNWIND batch_list AS row \n" +
+//                           "MATCH (n:企业)-[r]-(m:个人) \n" +
+//                           "WHERE n.company_id in split(row.company_ids,'&')  and m.name in [row.name] \n" +
+//                           "return m.name as name\n";
+//                           //"return m.name as name,collect(distinct m) as nodes\n";
+        System.out.println(JSON.toJSONString(batch_list));
+        final String cql =
+                "\nWITH  {batch_list} AS batch_list \n" +
+                        "UNWIND batch_list AS row \n" +
+                        "MATCH (n:企业)<-[r]-(m:个人{name:row.name}) \n" +
+                        "WHERE n.company_id in split(row.company_ids,'&')\n" +
+                        "WITH m.name as name,collect(distinct m) as nodes\n" +
+                        "WITH head(nodes) as first_node, nodes\n" +
+                        "UNWIND nodes as other_node\n" +
+                        "MATCH (other_node)-[r]-(x:企业)\n" +
+                        "WHERE other_node.person_id <> first_node.person_id\n" +
+                        "WITH first_node,r,other_node,x\n" +
+                        "CALL apoc.merge.relationship(first_node, TYPE(r), properties(r),{}, x,{}) YIELD rel\n" +
+                        "SET first_node:合并20210409\n" +
+                        "SET other_node:删除20210409\n" +
+                        "DELETE r";
+        log.info("consumer size: {}, cql:{}", batch_list.size(), cql);
+        String data = CompanyUtils.writeNeo4j(session, cql, new HashMap<String, Object>() {{
+            put("batch_list", batch_list);
+        }});
+        session.close();
+        log.info("class:{} | save size:{} | cost:{}", PersonCompanysImpl.class.getSimpleName(), batch_list.size(), (System.currentTimeMillis() - start));
+        return data;
+    }
+}

+ 53 - 0
src/main/java/com/winhc/service/impl/PersonMergeImpl.java

@@ -0,0 +1,53 @@
+package com.winhc.service.impl;
+
+import com.alibaba.fastjson.JSON;
+import com.winhc.service.RelationService;
+import com.winhc.utils.CompanyUtils;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.neo4j.driver.Driver;
+import org.neo4j.driver.Session;
+import org.springframework.stereotype.Service;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author π
+ * @Description:股东关系 人->公司
+ * @date 2021/1/11 10:03
+ */
+@Slf4j
+@Service("personMergeImpl")
+@AllArgsConstructor
+public class PersonMergeImpl implements RelationService {
+
+    private final Driver driver;
+
+    @Override
+    public String save(List<Map<String, Object>> batch_list) {
+        if (batch_list.isEmpty()) return null;
+        long start = System.currentTimeMillis();
+        Session session = driver.session();
+
+        //System.out.println(JSON.toJSONString(batch_list));
+        final String cql =
+                "\nWITH  {batch_list} AS batch_list \n" +
+                        "UNWIND batch_list AS row \n" +
+                        "MATCH (m:个人{person_id:row.merge_human_pid})\n" +
+                        "MATCH (n:个人{person_id:row.deleted_human_pid})-[r]-(x:企业)\n" +
+                        "with m,n,r,x\n" +
+                        "CALL apoc.merge.relationship(m, TYPE(r), properties(r),{}, x,{}) YIELD rel\n" +
+                        "SET m:update20210415\n" +
+                        "SET n:deleted20210415\n" +
+                        "DELETE r";
+        log.info("consumer size: {}, cql:{}", batch_list.size(), cql);
+        String data = CompanyUtils.writeNeo4j(session, cql, new HashMap<String, Object>() {{
+            put("batch_list", batch_list);
+        }});
+        session.close();
+        log.info("class:{} | save size:{} | cost:{}", PersonMergeImpl.class.getSimpleName(), batch_list.size(), (System.currentTimeMillis() - start));
+        return data;
+    }
+}

+ 51 - 8
src/main/java/com/winhc/utils/CompanyUtils.java

@@ -1,15 +1,15 @@
 package com.winhc.utils;
 
 import cn.hutool.json.JSONUtil;
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
 import com.winhc.common.enums.CompanyEnum;
 import com.winhc.db.mongodb.dataobject.NodeRelationError;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.neo4j.driver.Result;
 import org.neo4j.driver.Session;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+
+import java.util.*;
 import java.util.stream.Collectors;
 
 /**
@@ -25,8 +25,51 @@ public class CompanyUtils {
         }).collect(Collectors.toList());
     }
 
+    public static List<Map<String, Object>> mapPlus(List<ConsumerRecord<?, ?>> records) {
+        return records.stream().filter(r -> (r != null && r.value() != null)).map(r -> {
+            JSONObject jsonObject = JSONObject.parseObject(r.value().toString());
+            String person_name = jsonObject.getString("person_name");
+            JSONArray company_infos = jsonObject.getJSONArray("company_infos");
+            ArrayList<String> list = new ArrayList<>();
+            HashMap<String, Object> map = new HashMap<>();
+            for (Object company_info : company_infos) {
+                JSONObject jsonObject1 = (JSONObject) company_info;
+                String company_id = jsonObject1.getString("company_id");
+                list.add(company_id);
+            }
+            map.put("name", person_name);
+            map.put("company_ids", listToString(list));
+            return map;
+        }).collect(Collectors.toList());
+    }
+
+    public static List<Map<String, Object>> mapPlusV1(List<ConsumerRecord<?, ?>> records) {
+        return records.stream().filter(r -> (r != null && r.value() != null)).map(r -> {
+            HashMap<String, Object> map = new HashMap<>();
+            JSONObject jsonObject = JSONObject.parseObject(r.value().toString());
+            String merge_human_pid = jsonObject.getString("merge_human_pid");
+            String deleted_human_pid = jsonObject.getString("deleted_human_pid");
+
+            map.put("merge_human_pid", merge_human_pid);
+            map.put("deleted_human_pid", deleted_human_pid);
+            return map;
+        }).collect(Collectors.toList());
+    }
+
+    public static String listToString(List<String> list) {
+        if (list.size() == 0) return "";
+        StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < list.size(); i++) {
+            sb.append(list.get(i));
+            if (i != list.size() - 1) {
+                sb.append("&");
+            }
+        }
+        return sb.toString();
+    }
+
     public static List<Map<String, Object>> filterList(List<Map<String, Object>> list, String type) {
-        return list.stream().filter(r -> (r.getOrDefault("topic_type","-1").equals(type))).collect(Collectors.toList());
+        return list.stream().filter(r -> (r.getOrDefault("topic_type", "-1").equals(type))).collect(Collectors.toList());
     }
 
     public static List<NodeRelationError> toMessage(List<ConsumerRecord> records, String errorMessage) {
@@ -46,7 +89,7 @@ public class CompanyUtils {
         }).collect(Collectors.toList());
     }
 
-    public static String writeNeo4j(Session session, String cql, HashMap<String,Object> parameters) {
+    public static String writeNeo4j(Session session, String cql, HashMap<String, Object> parameters) {
         String data = session.writeTransaction(tx -> {
             Result result = tx.run(cql, parameters);
             return "success";
@@ -54,8 +97,8 @@ public class CompanyUtils {
         return data;
     }
 
-    public static String getIncrPersonLabel(){
-        return CompanyEnum.Lable.新增.code +DateUtil.getDateBefore(-1).replace("-", "");
+    public static String getIncrPersonLabel() {
+        return CompanyEnum.Lable.新增.code + DateUtil.getDateBefore(-1).replace("-", "");
     }
 
     public static void main(String[] args) {

+ 9 - 3
src/main/resources/application-dev.properties

@@ -6,7 +6,10 @@ spring.data.neo4j.password=neo4j168
 #spring.data.neo4j.uri=bolt://127.0.0.1:7687
 
 #爬虫
-spring.data.neo4j.uri=bolt://192.168.2.56:7687
+#spring.data.neo4j.uri=bolt://192.168.2.56:7687
+#测试
+#spring.data.neo4j.uri=bolt://139.224.197.164:7687
+spring.data.neo4j.uri=bolt://192.168.2.60:7687
 
 #数据库uri地址
 #spring.data.neo4j.uri=http://10.29.26.76:7474
@@ -24,9 +27,12 @@ scheduling.enabled = false
 
 #============== kafka ===================
 # 指定kafka 代理地址,可以多个
-spring.kafka.bootstrap-servers=47.100.177.224:9092
+#spring.kafka.bootstrap-servers=47.101.221.131:9092
+spring.kafka.bootstrap-servers=192.168.4.239:9092,192.168.4.241:9092,192.168.4.240:9092
 #topic
-spring.kafka.topic_node_relation_union=inc_node_relation_union
+spring.kafka.topic_node_relation_union=inc_node_relation_union_test
+spring.kafka.topic_person_companys=inc_person_companys
+spring.kafka.topic_person_merge=inc_person_merge
 
 #spring.kafka.bootstrap-servers=192.168.4.237:9092,192.168.4.235:9092,192.168.4.236:9092
 #spring.kafka.topic=xf_test

+ 2 - 1
src/main/resources/application-prd.properties

@@ -35,7 +35,8 @@ spring.kafka.bootstrap-servers=192.168.4.239:9092,192.168.4.241:9092,192.168.4.2
 
 #topic
 spring.kafka.topic_node_relation_union=inc_node_relation_union
-
+spring.kafka.topic_person_companys=inc_person_companys
+spring.kafka.topic_person_merge=inc_person_merge
 #=============== provider  =======================
 spring.kafka.producer.retries=3
 # 每次批量发送消息的数量

+ 43 - 0
src/test/java/com/winhc/test/TestCreateNode.java

@@ -368,5 +368,48 @@ public class TestCreateNode {
 
     }
 
+    @Test
+    public void ng_inc_pid_update_tab() {
+        String topic = "ng_inc_pid_update_tab";
+        String s = "D:\\data\\opt\\40c154a6-a752-4ef5-86cb-a7999c658457.csv";
+        FileReader fileReader = new FileReader(s);
+        List<String> lists = fileReader.readLines();
+        int i = 0;
+        for (String mesaage : lists) {
+
+            if(i >0){
+                System.out.println(mesaage);
+                kafkaProduce.produce(topic, mesaage);
+            }
+            i++;
+
+        }
+
+    }
+
+    @Test
+    public void testPersonCompany() {
+        String topic = "inc_person_companys";
+        String mesaage1 = "{\"company_infos\":[{\"company_name\":\"京东\",\"company_id\":\"c111\"},{\"company_name\":\"淘宝\",\"company_id\":\"c222\"},{\"company_name\":\"拼多多\",\"company_id\":\"c333\"},{\"company_name\":\"唯品会\",\"company_id\":\"c444\"}],\"person_name\":\"马云\"}\n";
+        String mesaage2 = "{\"company_infos\":[{\"company_name\":\"京东\",\"company_id\":\"c111\"},{\"company_name\":\"淘宝\",\"company_id\":\"c222\"},{\"company_name\":\"拼多多\",\"company_id\":\"c333\"},{\"company_name\":\"唯品会\",\"company_id\":\"c444\"}],\"person_name\":\"刘强东\"}\n";
+
+        kafkaProduce.produce(topic, mesaage1);
+        kafkaProduce.produce(topic, mesaage2);
+    }
+
+    @Test
+    public void testPersonCompany2() {
+        HashMap<String, String> map = new HashMap<>();
+        map.put("company_ids","111");
+        map.put("name","22");
+        System.out.println(JSON.toJSONString(map));
+    }
+
+    @Test
+    public void testPersonMerge() {
+        String topic = "inc_person_merge";
+        String message = "{\"merge_human_pid\":\"pb568da88c866aa662797a6f99a59f596\",\"deleted_human_pid\":\"p3c936538470361a1a621f274890f1ec8\"}";
+        kafkaProduce.produce(topic, message);
+    }
 
 }

+ 17 - 4
src/test/java/com/winhc/test/TestJson.java

@@ -11,10 +11,7 @@ import com.winhc.utils.CompanyUtils;
 import org.apache.ibatis.javassist.expr.NewArray;
 import org.junit.Test;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 
 /**
  * @author π
@@ -62,4 +59,20 @@ public class TestJson {
         String r1 = CompanyUtils.getIncrPersonLabel();
         System.out.println(r1);
     }
+
+    @Test
+    public void test3(){
+        List<String> list = Arrays.asList("小米","淘宝","京东");
+        StringBuilder sb = new StringBuilder();
+        sb.append("[");
+        for (int i = 0; i < list.size(); i++) {
+            sb.append("'").append(list.get(i)).append("'");
+            if(i != list.size() -1){
+                sb.append(",");
+            }
+        }
+
+        sb.append("]");
+        System.out.println(sb);
+    }
 }