Jelajahi Sumber

案件关联新增

xufei 3 tahun lalu
induk
melakukan
db7b96e0fd

+ 3 - 1
src/main/java/com/winhc/common/enums/CompanyEnum.java

@@ -24,7 +24,9 @@ public class CompanyEnum {
         PERSON_NODE_LABEL("7", "personNodeLabelServiceImpl"),
         NODE_RELATION_SUCCESS_STATUS("100", "nodeRelationSuccessStatusServiceImpl"),
         PERSON_COMPANYS("200", "personCompanysImpl"),
-        PERSON_MERGE("300", "personMergeImpl")
+        PERSON_MERGE("300", "personMergeImpl"),
+        CASE_RELATION("400", "caseRelationImpl"),
+        CASE_NODE("500", "caseNodeImpl")
         ;
 
         public final String CODE;

+ 49 - 0
src/main/java/com/winhc/kafka/consumer/KafkaConsumerCaseMerge.java

@@ -0,0 +1,49 @@
+package com.winhc.kafka.consumer;
+
+import com.winhc.common.enums.CompanyEnum;
+import com.winhc.service.RelationService;
+import com.winhc.utils.CompanyUtils;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.springframework.context.annotation.Bean;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;
+import org.springframework.stereotype.Service;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author π
+ * @Description:案件增量消费
+ * @date 2021/5/17 16:12
+ */
+@Slf4j
+@Service
+@AllArgsConstructor
+public class KafkaConsumerCaseMerge {
+
+    private final Map<String, RelationService> map;
+
+    @KafkaListener(id = "${spring.kafka.topic_case}"
+            , topics = "${spring.kafka.topic_case}"
+            , groupId = "${spring.kafka.consumer.group-id}", containerFactory = "containerFactory", errorHandler = "KafkaConsumerCaseMerge")
+    public void consumerCompanyNode(List<ConsumerRecord<?, ?>> records) {
+        List<Map<String, Object>> listMap = CompanyUtils.map(records);
+        this.map.get(CompanyEnum.TopicType.CASE_RELATION.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.CASE_RELATION.CODE));
+        this.map.get(CompanyEnum.TopicType.CASE_NODE.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.CASE_NODE.CODE));
+    }
+
+    /**
+     * 因为手动确认消费,若消费失败,记录重刷
+     */
+    @Bean("KafkaConsumerCaseMerge")
+    public ConsumerAwareListenerErrorHandler dealError() {
+        return (message, e, consumer) -> {
+            System.out.println(e.toString());
+            System.out.println(message);
+            return null;
+        };
+    }
+}

+ 49 - 0
src/main/java/com/winhc/service/impl/CaseNodeImpl.java

@@ -0,0 +1,49 @@
+package com.winhc.service.impl;
+
+import com.winhc.service.RelationService;
+import com.winhc.utils.CompanyUtils;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.neo4j.driver.Driver;
+import org.neo4j.driver.Session;
+import org.springframework.stereotype.Service;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author π
+ * @Description:案件-新增节点
+ * @date 2021/5/17 10:03
+ */
+@Slf4j
+@Service("caseNodeImpl")
+@AllArgsConstructor
+public class CaseNodeImpl implements RelationService {
+
+    private final Driver driver;
+
+    @Override
+    public String save(List<Map<String, Object>> batch_list) {
+        if (batch_list.isEmpty()) return null;
+        long start = System.currentTimeMillis();
+        Session session = driver.session();
+        final String cql = "\nWITH  {batch_list} AS batch_list \n" +
+                "UNWIND batch_list AS row \n" +
+                " MERGE(s:CASE{case_id:row.start_id})\n" +
+                " SET s.case_id = row.start_id\n" +
+                " WITH s\n" +
+                " CALL apoc.path.subgraphNodes(s, {maxLevel:-1}) YIELD node\n" +
+                " WITH node,s\n" +
+                " SET node.component_id = -id(s)\n" +
+                " SET node:" + CompanyUtils.getIncrPersonLabel("新增");
+        log.info("consumer size: {}, cql:{}", batch_list.size(), cql);
+        String data = CompanyUtils.writeNeo4j(session, cql, new HashMap<String, Object>() {{
+            put("batch_list", batch_list);
+        }});
+        session.close();
+        log.info("class:{} | save size:{} | cost:{}", CaseNodeImpl.class.getSimpleName(), batch_list.size(), (System.currentTimeMillis() - start));
+        return data;
+    }
+}

+ 54 - 0
src/main/java/com/winhc/service/impl/CaseRelationImpl.java

@@ -0,0 +1,54 @@
+package com.winhc.service.impl;
+
+import com.winhc.common.enums.CompanyEnum;
+import com.winhc.service.RelationService;
+import com.winhc.utils.CompanyUtils;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.neo4j.driver.Driver;
+import org.neo4j.driver.Session;
+import org.springframework.stereotype.Service;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author π
+ * @Description:案件-新增关系
+ * @date 2021/5/17 10:03
+ */
+@Slf4j
+@Service("caseRelationImpl")
+@AllArgsConstructor
+public class CaseRelationImpl implements RelationService {
+
+    private final Driver driver;
+
+    @Override
+    public String save(List<Map<String, Object>> batch_list) {
+        if (batch_list.isEmpty()) return null;
+        long start = System.currentTimeMillis();
+        Session session = driver.session();
+        final String cql = "\nWITH  {batch_list} AS batch_list \n" +
+                "UNWIND batch_list AS row \n" +
+                " MERGE(s:CASE{case_id:row.start_id})\n" +
+                " SET s.case_id = row.start_id\n" +
+                " MERGE(m:CASE{case_id:row.end_id})\n" +
+                " SET m.case_id = row.end_id\n" +
+                " MERGE(s)-[r:RELATION]->(m)\n" +
+                " SET r.connect_type=row.connect_type\n" +
+                " WITH s,m\n" +
+                " CALL apoc.path.subgraphNodes(s, {maxLevel:-1}) YIELD node\n" +
+                " WITH node,s\n" +
+                " SET node.component_id = -id(s)\n" +
+                " SET node:" + CompanyUtils.getIncrPersonLabel("新增");
+        log.info("consumer size: {}, cql:{}", batch_list.size(), cql);
+        String data = CompanyUtils.writeNeo4j(session, cql, new HashMap<String, Object>() {{
+            put("batch_list", batch_list);
+        }});
+        session.close();
+        log.info("class:{} | save size:{} | cost:{}", CaseRelationImpl.class.getSimpleName(), batch_list.size(), (System.currentTimeMillis() - start));
+        return data;
+    }
+}

+ 6 - 5
src/main/resources/application-dev.properties

@@ -8,8 +8,8 @@ spring.data.neo4j.password=neo4j168
 #爬虫
 #spring.data.neo4j.uri=bolt://192.168.2.56:7687
 #测试
-#spring.data.neo4j.uri=bolt://139.224.197.164:7687
-spring.data.neo4j.uri=bolt://192.168.2.60:7687
+spring.data.neo4j.uri=bolt://139.224.197.164:7687
+#spring.data.neo4j.uri=bolt://192.168.2.60:7687
 
 #数据库uri地址
 #spring.data.neo4j.uri=http://10.29.26.76:7474
@@ -31,12 +31,13 @@ scheduling.enabled = false
 
 #============== kafka ===================
 # 指定kafka 代理地址,可以多个
-#spring.kafka.bootstrap-servers=47.101.221.131:9092
-spring.kafka.bootstrap-servers=192.168.4.239:9092,192.168.4.241:9092,192.168.4.240:9092
+spring.kafka.bootstrap-servers=47.101.221.131:9092
+#spring.kafka.bootstrap-servers=192.168.4.239:9092,192.168.4.241:9092,192.168.4.240:9092
 #topic
 spring.kafka.topic_node_relation_union=inc_node_relation_union_dev
 spring.kafka.topic_person_companys=inc_person_companys_dev
 spring.kafka.topic_person_merge=inc_person_merge_dev
+spring.kafka.topic_case=inc_case_dev
 
 #spring.kafka.bootstrap-servers=192.168.4.237:9092,192.168.4.235:9092,192.168.4.236:9092
 #spring.kafka.topic=xf_test
@@ -52,7 +53,7 @@ spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.Str
 
 #=============== consumer  =======================
 # 指定默认消费者group id
-spring.kafka.consumer.group-id=neo4j_node_relation
+spring.kafka.consumer.group-id=neo4j_node_relation_dev
 
 spring.kafka.consumer.auto-offset-reset=earliest
 # 取消自动提交

+ 1 - 0
src/main/resources/application-prd.properties

@@ -37,6 +37,7 @@ spring.kafka.bootstrap-servers=192.168.4.239:9092,192.168.4.241:9092,192.168.4.2
 spring.kafka.topic_node_relation_union=inc_node_relation_union
 spring.kafka.topic_person_companys=inc_person_companys
 spring.kafka.topic_person_merge=inc_person_merge
+spring.kafka.topic_case=inc_case_prod
 #=============== provider  =======================
 spring.kafka.producer.retries=3
 # 每次批量发送消息的数量

+ 68 - 0
src/test/java/com/winhc/test/TestCaseNode.java

@@ -0,0 +1,68 @@
+package com.winhc.test;
+
+import com.alibaba.fastjson.JSON;
+import com.winhc.kafka.KafkaProduce;
+import com.winhc.service.impl.CaseRelationImpl;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.neo4j.driver.*;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.junit4.SpringRunner;
+
+import java.util.HashMap;
+
+/**
+ * @author π
+ * @Description:案件关联测试
+ * @date 2021/05/17 16:24
+ */
+@RunWith(SpringRunner.class)
+@SpringBootTest
+public class TestCaseNode {
+    Log log = LogFactory.getLog(TestCaseNode.class);
+    @Autowired
+    CaseRelationImpl caseRelationImpl;
+
+    @Autowired
+    Driver driver;
+    @Autowired
+    KafkaProduce kafkaProduce;
+
+    @Test
+    public void sendKafka() {
+        log.info("start push save !");
+        String topic = "inc_case_dev";
+        long start = System.currentTimeMillis();
+        HashMap<Object, Object> map = new HashMap<>();
+        map.put("start_id","94ddf154ceaf380ff43b93298d0ca755_1");
+//        map.put("start_no","888");
+        map.put("end_id","78f8d51759e8afd169130aacb96b27d4_0");
+//        map.put("end_no","999");
+        map.put("connect_type","7");
+        map.put("topic_type","400");
+        String msg = JSON.toJSONString(map);
+        System.out.println(msg);
+        kafkaProduce.produce(topic, msg);
+        System.out.println(System.currentTimeMillis() - start);
+    }
+
+    @Test
+    public void sendKafka2() {
+        log.info("start push save !");
+        String topic = "inc_case_dev";
+        long start = System.currentTimeMillis();
+        HashMap<Object, Object> map = new HashMap<>();
+        map.put("start_id","94ddf154ceaf380ff43b93298d0ca755_1");
+//        map.put("start_no","999");
+        map.put("connect_type","6");
+        map.put("topic_type","500");
+        String msg = JSON.toJSONString(map);
+        System.out.println(msg);
+        kafkaProduce.produce(topic, msg);
+        System.out.println(System.currentTimeMillis() - start);
+    }
+
+}

+ 7 - 0
src/test/java/com/winhc/test/TestJson.java

@@ -75,4 +75,11 @@ public class TestJson {
         sb.append("]");
         System.out.println(sb);
     }
+
+    @Test
+    public void test4(){
+        String id = "07bbf1d166b27b44116e8a5c575e23ed";
+
+
+    }
 }