xufei il y a 1 an
Parent
commit
aee965709b

+ 2 - 0
src/main/java/com/winhc/common/constant/Base.java

@@ -12,5 +12,7 @@ public class Base {
     public static ForkJoinPool TASK_FJ_POOL = new ForkJoinPool(Runtime.getRuntime().availableProcessors() * 3);
     public static String PID_WAIT_MERGE_V9 = "pid_wait_merge_v9";
     public static String PID_ALL_PERSON_MERGE_V9 = "pid_all_person_merge_v9";
+    //风险维度id映射
+    public static String JUDICIAL_CASE_CARD_NUM_V9 = "judicial_case_card_num_v9";
 
 }

+ 1 - 1
src/main/java/com/winhc/config/KafkaConfig.java

@@ -102,7 +102,7 @@ public class KafkaConfig {
         // 设置是否自动提交
         props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
         // 一次拉取消息数量
-        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 40);
+        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 20);
         props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 5*60*1000);
         props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 5*60*1000);
         // 最大处理时间

+ 7 - 1
src/main/java/com/winhc/kafka/consumer/KafkaConsumerNeo4jV2.java

@@ -13,6 +13,7 @@ import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.neo4j.driver.exceptions.ClientException;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.context.annotation.Bean;
 import org.springframework.data.mongodb.core.MongoTemplate;
@@ -78,7 +79,12 @@ public class KafkaConsumerNeo4jV2 {
     public ConsumerAwareListenerErrorHandler dealError() {
         return (message, e, consumer) -> {
             List<String> list = CompanyUtils.toMessage((List<ConsumerRecord>) message.getPayload());
-            if(isWindows()) return null;
+            if (isWindows()) return null;
+            Throwable cause = e.getCause();
+            if (cause instanceof ClientException && cause.getMessage().contains("transaction.timeout")) {
+                log.error("query time out !!!\n{}", e.getMessage());
+                return null;
+            }
             for (String msg : list) {
                 kafkaProduce.produce(configConstant.topic_node_relation_union, msg);
             }

+ 17 - 2
src/main/java/com/winhc/kafka/consumer/KafkaConsumerPersonIdUpdate.java

@@ -5,6 +5,7 @@ import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
 import com.mongodb.client.MongoCollection;
+import com.mongodb.client.model.UpdateOptions;
 import com.winhc.common.constant.Base;
 import com.winhc.config.ConfigConstant;
 import com.winhc.kafka.KafkaProduce;
@@ -64,7 +65,7 @@ public class KafkaConsumerPersonIdUpdate {
             , topics = "${spring.kafka.topic_pid_update_v1}"
             , groupId = "${spring.kafka.consumer.group-id}", containerFactory = "containerFactory", errorHandler = "handlerV1")
     public void updatePid(List<String> messages) {
-        if(CompanyUtils.isWindows()){
+        if (CompanyUtils.isWindows()) {
             return;
         }
         List<Tuple<Map<String, String>, JSONObject>> list = messages.stream()
@@ -96,10 +97,11 @@ public class KafkaConsumerPersonIdUpdate {
 
         CompletableFuture<Boolean> v1 = updateByQuery(old_human_pid, new_human_pid, "winhc_company_human_pid_mapping_v9", EsQueryDsl.updateHumanMappingId(old_human_pid, new_human_pid));
         CompletableFuture<Boolean> v2 = updateByQuery(old_human_pid, new_human_pid, "winhc-company-human-relation-v9", EsQueryDsl.updateBossId(old_human_pid));
+        CompletableFuture<Boolean> v3 = updateManyByQueryMongo(old_human_pid, new_human_pid);
 
         return CompletableFuture.allOf(v1, v2).thenApplyAsync(x -> {
             try {
-                if (v1.get() && v2.get()) {
+                if (v1.get() && v2.get() && v3.get()) {
                     sendMessage(update.v2().toJSONString(), configConstant.topic_pid_update_v2);
                 } else {
                     sendMessage(JSON.toJSONString(Collections.singletonList(update.v2())), configConstant.topic_pid_update_v1);
@@ -130,6 +132,19 @@ public class KafkaConsumerPersonIdUpdate {
         }, Base.TASK_FJ_POOL);
     }
 
+    private CompletableFuture<Boolean> updateManyByQueryMongo(String old_human_pid, String new_human_pid) {
+        return CompletableFuture.supplyAsync(() -> {
+            try {
+                MongoCollection<Document> collection = mongoTemplate.getCollection(Base.JUDICIAL_CASE_CARD_NUM_V9);
+                //UpdateOptions updateOptions = new UpdateOptions().upsert(true).bypassDocumentValidation(true);
+                collection.updateMany(new Document("keyno", old_human_pid), new Document("$set", new Document("keyno", new_human_pid)));
+                return true;
+            } catch (Exception e) {
+                return false;
+            }
+        }, Base.TASK_FJ_POOL);
+    }
+
     private void sendMessage(String message, String topic) {
         kafkaProduce.produce(topic, message);
 

+ 12 - 12
src/main/java/com/winhc/service/impl/PersonMergeV2Impl.java

@@ -46,20 +46,9 @@ public class PersonMergeV2Impl implements RelationService {
         long start = System.currentTimeMillis();
         Session session = driver.session();
         //1.查询关联人员
-//        final String query_cql = "\nWITH  {batch_list} AS batch_list \n" +
-//                "UNWIND batch_list AS row \n" +
-//                "MATCH (p:" + CompanyEnum.Lable.PERSON.code + "{person_id: row.person_id})-[*0..4]-(q:" + CompanyEnum.Lable.PERSON.code + "{name:row.name})\n" +
-//                "WHERE ID(p) <> ID(q)\n" +
-//                "WITH p.person_id as person_id, p as first_node, apoc.coll.sort(collect(distinct ID(p)) + collect(distinct ID(q))) as all_ids\n" +
-//                "UNWIND all_ids as all_id\n" +
-//                "MATCH(m:" + CompanyEnum.Lable.PERSON.code + ")\n" +
-//                "WHERE ID(m) = all_id\n" +
-//                "WITH person_id, first_node, m as other_node\n" +
-//                "RETURN first_node.person_id as new_human_pid, first_node.name as new_human_name, other_node.person_id as old_human_pid, other_node.name as old_human_name" + "\n";
-
         final String query_cql = "\nWITH  {batch_list} AS batch_list \n" +
                 "UNWIND batch_list AS row \n" +
-                "MATCH (p:" + CompanyEnum.Lable.PERSON.code + "{person_id: row.person_id})-[r]-(c:"+CompanyEnum.Lable.COMPANY.code+"{company_id:row.company_id})-[*0..3]-(q:" + CompanyEnum.Lable.PERSON.code + "{name:row.name})\n" +
+                "MATCH (p:" + CompanyEnum.Lable.PERSON.code + "{person_id: row.person_id})-[*0..3]-(q:" + CompanyEnum.Lable.PERSON.code + "{name:row.name})\n" +
                 "WHERE ID(p) <> ID(q)\n" +
                 "WITH p.person_id as person_id, p as first_node, apoc.coll.sort(collect(distinct ID(p)) + collect(distinct ID(q))) as all_ids\n" +
                 "UNWIND all_ids as all_id\n" +
@@ -67,6 +56,17 @@ public class PersonMergeV2Impl implements RelationService {
                 "WHERE ID(m) = all_id\n" +
                 "WITH person_id, first_node, m as other_node\n" +
                 "RETURN first_node.person_id as new_human_pid, first_node.name as new_human_name, other_node.person_id as old_human_pid, other_node.name as old_human_name" + "\n";
+//
+//        final String query_cql = "\nWITH  {batch_list} AS batch_list \n" +
+//                "UNWIND batch_list AS row \n" +
+//                "MATCH (p:" + CompanyEnum.Lable.PERSON.code + "{person_id: row.person_id})-[r]-(c:"+CompanyEnum.Lable.COMPANY.code+"{company_id:row.company_id})-[*0..3]-(q:" + CompanyEnum.Lable.PERSON.code + "{name:row.name})\n" +
+//                "WHERE ID(p) <> ID(q)\n" +
+//                "WITH p.person_id as person_id, p as first_node, apoc.coll.sort(collect(distinct ID(p)) + collect(distinct ID(q))) as all_ids\n" +
+//                "UNWIND all_ids as all_id\n" +
+//                "MATCH(m:" + CompanyEnum.Lable.PERSON.code + ")\n" +
+//                "WHERE ID(m) = all_id\n" +
+//                "WITH person_id, first_node, m as other_node\n" +
+//                "RETURN first_node.person_id as new_human_pid, first_node.name as new_human_name, other_node.person_id as old_human_pid, other_node.name as old_human_name" + "\n";
 
 
         log.info("query size: {}, cql:{}", batch_list.size(), query_cql);

+ 16 - 2
src/main/java/com/winhc/utils/CompanyUtils.java

@@ -14,7 +14,9 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.neo4j.driver.Record;
 import org.neo4j.driver.Result;
 import org.neo4j.driver.Session;
+import org.neo4j.driver.TransactionConfig;
 
+import java.time.Duration;
 import java.util.*;
 import java.util.stream.Collectors;
 
@@ -139,7 +141,7 @@ public class CompanyUtils {
     }
 
     public static String writeNeo4j2(Session session, String cql, HashMap<String, Object> parameters) {
-        List<Record> dataList = getRecords(session, cql, parameters);
+        List<Record> dataList = getRecordsV2(session, cql, parameters);
         Map<String, MergePerson> data = dataList.stream()
                 .map(record -> JSON.parseObject(JSON.toJSONString(record.asMap()), MergePerson.class))
                 .collect(Collectors.toMap(MergePerson::getId, t -> t, (n, o) -> n));
@@ -186,12 +188,14 @@ public class CompanyUtils {
     }
 
     public static Collection<List<MergePerson>> getMergePerson(Session session, String cql, HashMap<String, Object> parameters) {
-        List<Record> dataList = getRecords(session, cql, parameters);
+        List<Record> dataList = getRecordsV2(session, cql, parameters);
         return dataList.stream()
                 .map(record -> JSON.parseObject(JSON.toJSONString(record.asMap()), MergePerson.class))
                 .collect(Collectors.groupingBy(MergePerson::getNew_human_pid)).values();
     }
 
+
+
     private static List<Record> getRecords(Session session, String cql, HashMap<String, Object> parameters) {
         return session.writeTransaction(tx -> {
             Result result = tx.run(cql, parameters);
@@ -199,6 +203,16 @@ public class CompanyUtils {
         });
     }
 
+    private static List<Record> getRecordsV2(Session session, String cql, HashMap<String, Object> parameters) {
+        TransactionConfig config = TransactionConfig.builder()
+                .withTimeout(Duration.ofSeconds(5))
+                .build();
+        return session.writeTransaction(tx -> {
+            Result result = tx.run(cql, parameters);
+            return result.list();
+        },config);
+    }
+
     public static List<Map<String, String>> writeNeo4j3(Session session, String cql, HashMap<String, Object> parameters) {
         List<Record> dataList = getRecords(session, cql, parameters);
         Set<String> newIds = new HashSet<>();

+ 28 - 28
src/main/resources/application-dev.properties

@@ -1,6 +1,6 @@
 #eureka.client.serviceUrl.defaultZone= http://106.14.81.247:8900/eureka/
 
-#Neo4j配置(第一台机器)
+#Neo4j\u914D\u7F6E(\u7B2C\u4E00\u53F0\u673A\u5668)
 #spring.data.neo4j.username=neo4j
 #spring.data.neo4j.password=neo4j168
 #spring.data.neo4j.uri=bolt://127.0.0.1:7687
@@ -9,21 +9,21 @@
 spring.data.neo4j.username.v1=neo4j
 spring.data.neo4j.password.v1=neo4j168
 #spring.data.neo4j.uri.v1=bolt://139.196.165.100:7687
-#spring.data.neo4j.uri.v1=bolt://139.224.197.164:7687
-spring.data.neo4j.uri.v1=bolt://127.0.0.1:7687
+spring.data.neo4j.uri.v1=bolt://139.224.197.164:7687
+#spring.data.neo4j.uri.v1=bolt://127.0.0.1:7687
 
-#Neo4j配置(第二台机器)
+#Neo4j\u914D\u7F6E(\u7B2C\u4E8C\u53F0\u673A\u5668)
 spring.data.neo4j.username.v2=neo4j
 spring.data.neo4j.password.v2=neo4j168
-#spring.data.neo4j.uri.v2=bolt://139.224.197.164:7687
-spring.data.neo4j.uri.v2=bolt://127.0.0.1:7687
+spring.data.neo4j.uri.v2=bolt://139.224.197.164:7687
+#spring.data.neo4j.uri.v2=bolt://127.0.0.1:7687
 #spring.data.neo4j.uri.v2=bolt://139.196.165.100:7687
 
-#数据库uri地址
+#\u6570\u636E\u5E93uri\u5730\u5740
 #spring.data.neo4j.uri=http://10.29.26.76:7474
 #spring.data.neo4j.uri=http://47.101.212.122:7474
 
-#内网地址
+#\u5185\u7F51\u5730\u5740
 spring.datasource.url = jdbc:mysql://rm-uf61r3m23ba1p5z3d.mysql.rds.aliyuncs.com:3306/prism1?useUnicode=true&characterEncoding=utf-8
 spring.datasource.username = wenshu
 spring.datasource.password = wenshu_168
@@ -38,7 +38,7 @@ scheduling.enabled = true
 
 
 #============== kafka ===================
-# 指定kafka 代理地址,可以多个
+# \u6307\u5B9Akafka \u4EE3\u7406\u5730\u5740\uFF0C\u53EF\u4EE5\u591A\u4E2A
 spring.kafka.bootstrap-servers=47.101.221.131:9092
 #spring.kafka.bootstrap-servers=192.168.4.239:9092,192.168.4.241:9092,192.168.4.240:9092
 #topic
@@ -55,27 +55,27 @@ spring.kafka.topic_person_merge_v2=inc_person_merge_v2_dev
 #spring.kafka.topic=xf_test
 #=============== provider  =======================
 spring.kafka.producer.retries=3
-# 每次批量发送消息的数量
+# \u6BCF\u6B21\u6279\u91CF\u53D1\u9001\u6D88\u606F\u7684\u6570\u91CF
 spring.kafka.producer.batch-size=16384
 spring.kafka.producer.buffer-memory=33554432
 
-# 指定消息key和消息体的编解码方式
+# \u6307\u5B9A\u6D88\u606Fkey\u548C\u6D88\u606F\u4F53\u7684\u7F16\u89E3\u7801\u65B9\u5F0F
 spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
 spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
 
 #=============== consumer  =======================
-# 指定默认消费者group id
+# \u6307\u5B9A\u9ED8\u8BA4\u6D88\u8D39\u8005group id
 spring.kafka.consumer.group-id=neo4j_node_relation_dev
 
 spring.kafka.consumer.auto-offset-reset=earliest
-# 取消自动提交
+# \u53D6\u6D88\u81EA\u52A8\u63D0\u4EA4
 spring.kafka.consumer.enable-auto-commit=true
 spring.kafka.consumer.auto-commit-interval=100
 
-# 指定消息key和消息体的编解码方式
+# \u6307\u5B9A\u6D88\u606Fkey\u548C\u6D88\u606F\u4F53\u7684\u7F16\u89E3\u7801\u65B9\u5F0F
 spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
 spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
-# 手动提交
+# \u624B\u52A8\u63D0\u4EA4
 #spring.kafka.listener.ack-mode=MANUAL_IMMEDIATE
 
 
@@ -84,7 +84,7 @@ spring.data.mongodb.uri=mongodb://itslaw:itslaw_168@dds-uf6ff5dfd9aef3641601-pub
 
 
 
-#bbos 配置
+#bbos \u914D\u7F6E
 spring.elasticsearch.bboss.es6.name = es6
 ##es6
 spring.elasticsearch.bboss.es6.elasticUser=elastic
@@ -95,11 +95,11 @@ spring.elasticsearch.bboss.es6.elasticsearch.rest.hostNames=es-cn-oew22t8bw002if
 spring.elasticsearch.bboss.es6.elasticsearch.dateFormat=yyyy.MM.dd
 spring.elasticsearch.bboss.es6.elasticsearch.timeZone=Asia/Shanghai
 spring.elasticsearch.bboss.es6.elasticsearch.ttl=2d
-#在控制台输出脚本调试开关showTemplate,false关闭,true打开,同时log4j至少是info级别
+#\u5728\u63A7\u5236\u53F0\u8F93\u51FA\u811A\u672C\u8C03\u8BD5\u5F00\u5173showTemplate,false\u5173\u95ED\uFF0Ctrue\u6253\u5F00\uFF0C\u540C\u65F6log4j\u81F3\u5C11\u662Finfo\u7EA7\u522B
 spring.elasticsearch.bboss.es6.elasticsearch.showTemplate=true
 spring.elasticsearch.bboss.es6.elasticsearch.discoverHost=false
 
-##es6连接池配置
+##es6\u8FDE\u63A5\u6C60\u914D\u7F6E
 spring.elasticsearch.bboss.es6.http.timeoutConnection = 50000
 spring.elasticsearch.bboss.es6.http.timeoutSocket = 50000
 spring.elasticsearch.bboss.es6.http.connectionRequestTimeout=50000
@@ -110,19 +110,19 @@ spring.elasticsearch.bboss.es6.http.maxTotal = 400
 spring.elasticsearch.bboss.es6.http.defaultMaxPerRoute = 200
 spring.elasticsearch.bboss.es6.http.keystore =
 spring.elasticsearch.bboss.es6.http.keyPassword =
-# ssl 主机名称校验,是否采用es6配置,
-# 如果指定为es6,就采用DefaultHostnameVerifier,否则采用 SSLConnectionSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER
+# ssl \u4E3B\u673A\u540D\u79F0\u6821\u9A8C\uFF0C\u662F\u5426\u91C7\u7528es6\u914D\u7F6E\uFF0C
+# \u5982\u679C\u6307\u5B9A\u4E3Aes6\uFF0C\u5C31\u91C7\u7528DefaultHostnameVerifier,\u5426\u5219\u91C7\u7528 SSLConnectionSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER
 spring.elasticsearch.bboss.es6.http.hostnameVerifier =
 
 
 
 
-##es5集群配置
+##es5\u96C6\u7FA4\u914D\u7F6E
 spring.elasticsearch.bboss.es5.name = es5
 spring.elasticsearch.bboss.es5.elasticUser=elastic
 spring.elasticsearch.bboss.es5.elasticPassword=elastic_168
 
-#5.x版本es
+#5.x\u7248\u672Ces
 #spring.elasticsearch.bboss.es5.elasticsearch.rest.hostNames=es-cn-0pp0r32zf000ipovd.public.elasticsearch.aliyuncs.com:9200
 spring.elasticsearch.bboss.es5.elasticsearch.rest.hostNames=es-cn-zxu362ii6000oj8y3.public.elasticsearch.aliyuncs.com:9200
 #spring.elasticsearch.bboss.es5.elasticsearch.rest.hostNames=es-cn-0pp0r32zf000ipovd.elasticsearch.aliyuncs.com:9200
@@ -131,11 +131,11 @@ spring.elasticsearch.bboss.es5.elasticsearch.rest.hostNames=es-cn-zxu362ii6000oj
 spring.elasticsearch.bboss.es5.elasticsearch.dateFormat=yyyy.MM.dd
 spring.elasticsearch.bboss.es5.elasticsearch.timeZone=Asia/Shanghai
 spring.elasticsearch.bboss.es5.elasticsearch.ttl=2d
-#在控制台输出脚本调试开关showTemplate,false关闭,true打开,同时log4j至少是info级别
+#\u5728\u63A7\u5236\u53F0\u8F93\u51FA\u811A\u672C\u8C03\u8BD5\u5F00\u5173showTemplate,false\u5173\u95ED\uFF0Ctrue\u6253\u5F00\uFF0C\u540C\u65F6log4j\u81F3\u5C11\u662Finfo\u7EA7\u522B
 spring.elasticsearch.bboss.es5.elasticsearch.showTemplate=true
 spring.elasticsearch.bboss.es5.elasticsearch.discoverHost=false
 
-##es5集群对应的连接池配置
+##es5\u96C6\u7FA4\u5BF9\u5E94\u7684\u8FDE\u63A5\u6C60\u914D\u7F6E
 spring.elasticsearch.bboss.es5.http.timeoutConnection = 50000
 spring.elasticsearch.bboss.es5.http.timeoutSocket = 50000
 spring.elasticsearch.bboss.es5.http.connectionRequestTimeout=50000
@@ -144,11 +144,11 @@ spring.elasticsearch.bboss.es5.http.maxLineLength = -1
 spring.elasticsearch.bboss.es5.http.maxHeaderCount = 200
 spring.elasticsearch.bboss.es5.http.maxTotal = 400
 spring.elasticsearch.bboss.es5.http.defaultMaxPerRoute = 200
-# https证书配置
+# https\u8BC1\u4E66\u914D\u7F6E
 spring.elasticsearch.bboss.es5.http.keystore =
 spring.elasticsearch.bboss.es5.http.keyPassword =
-# ssl 主机名称校验,是否采用default配置,
-# 如果指定为default,就采用DefaultHostnameVerifier,否则采用 SSLConnectionSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER
+# ssl \u4E3B\u673A\u540D\u79F0\u6821\u9A8C\uFF0C\u662F\u5426\u91C7\u7528default\u914D\u7F6E\uFF0C
+# \u5982\u679C\u6307\u5B9A\u4E3Adefault\uFF0C\u5C31\u91C7\u7528DefaultHostnameVerifier,\u5426\u5219\u91C7\u7528 SSLConnectionSocketFactory.ALLOW_ALL_HOSTNAME_VERIFIER
 spring.elasticsearch.bboss.es5.http.hostnameVerifier =
-# dsl配置文件热加载扫描时间间隔,毫秒为单位,默认5秒扫描一次,<= 0时关闭扫描机制
+# dsl\u914D\u7F6E\u6587\u4EF6\u70ED\u52A0\u8F7D\u626B\u63CF\u65F6\u95F4\u95F4\u9694\uFF0C\u6BEB\u79D2\u4E3A\u5355\u4F4D\uFF0C\u9ED8\u8BA45\u79D2\u626B\u63CF\u4E00\u6B21\uFF0C<= 0\u65F6\u5173\u95ED\u626B\u63CF\u673A\u5236
 spring.elasticsearch.bboss.dslfile.refreshInterval = -1

+ 26 - 0
src/test/java/com/winhc/test/TestMongo.java

@@ -1,11 +1,16 @@
 package com.winhc.test;
 
+import com.alibaba.fastjson.JSON;
 import com.google.common.collect.ImmutableMap;
+import com.mongodb.client.MongoCollection;
+import com.mongodb.client.result.UpdateResult;
+import com.winhc.common.constant.Base;
 import com.winhc.db.mongodb.dataobject.NodeRelationError;
 import com.winhc.db.mongodb.repository.NodeRelatonErrorRepository;
 import com.winhc.service.EsQueryService;
 import com.winhc.service.PidToMongoService;
 import com.winhc.service.impl.NodeRelationSuccessStatusServiceImpl;
+import org.bson.Document;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -36,6 +41,8 @@ public class TestMongo {
 
     @Autowired
     EsQueryService esQueryService;
+    @Autowired
+    MongoTemplate mongoTemplate;
 
     @Test
     public void saveData() {
@@ -96,4 +103,23 @@ public class TestMongo {
         System.out.println("cost : " + (System.currentTimeMillis() - start));
         System.out.println(res);
     }
+
+    @Test
+    public void test11() {
+        ImmutableMap<String, String> re = ImmutableMap.of(
+                "person_id", "p15d13edfdb5a26ff0285059153b1b9fe"
+                , "name", "黄俊强"
+                , "topic_type", "800"
+        );
+        System.out.println(JSON.toJSONString(re));
+    }
+
+    @Test
+    public void test12() {
+        String old_human_pid = "p3f080ea8e694ec29674bdaa606a90c71";
+        String new_human_pid = "";
+        MongoCollection<Document> collection = mongoTemplate.getCollection(Base.JUDICIAL_CASE_CARD_NUM_V9);
+        UpdateResult re = collection.updateMany(new Document("keyno", old_human_pid), new Document("$set", new Document("keyno", new_human_pid)));
+        System.out.println(JSON.toJSONString(re));
+    }
 }