Procházet zdrojové kódy

修复异常人员

xufei před 2 roky
rodič
revize
56300be77d

+ 1 - 1
src/main/resources/application-dev.properties

@@ -9,7 +9,6 @@
 spring.data.neo4j.username.v1=neo4j
 spring.data.neo4j.password.v1=neo4j168
 #spring.data.neo4j.uri.v1=bolt://139.196.165.100:7687
-#spring.data.neo4j.uri.v1=bolt://192.168.2.57:7687
 #spring.data.neo4j.uri.v1=bolt://139.224.197.164:7687
 spring.data.neo4j.uri.v1=bolt://127.0.0.1:7687
 
@@ -18,6 +17,7 @@ spring.data.neo4j.username.v2=neo4j
 spring.data.neo4j.password.v2=neo4j168
 #spring.data.neo4j.uri.v2=bolt://139.224.197.164:7687
 spring.data.neo4j.uri.v2=bolt://127.0.0.1:7687
+#spring.data.neo4j.uri.v2=bolt://139.196.165.100:7687
 
 #数据库uri地址
 #spring.data.neo4j.uri=http://10.29.26.76:7474

+ 47 - 0
src/main/resources/data/update_pid.txt

@@ -0,0 +1,47 @@
+p798d506cc87485ba76222b8036346277
+pbe57f5b6e267b0ffeec70e829faefc94
+p6a81b1066d59580e72c2bd290d7b7019
+p8beb66e801430c54f4c457205127e5d8
+p0a67b571164b93ffea1b997c92201ac1
+p23dcb8a071bb3a62342ebb8afcfe1326
+p1af45283101d70091352729e0c129f79
+p5d00699077afd50488e5f13a2210bfc4
+pd67b1ed15fb9110f96d6c6809cc8e0e3
+p0fd9fe6b320ae3c5bf2513c898a83db1
+p79220376edf188fa39861f258e4d88a0
+p225bc26c4b8a7f18b2dd8d9bb16afa2f
+p1e3d61354bacfad1b8a1ad849eb6ac75
+p76ffa63da74f0d39ad62295606e94e17
+p095909a2804577888a929d50dd12256f
+p07e85a7f62a92b1cd484b5d09755116c
+p445d7047ea328c160d4193580ae33974
+pfcbaa4e1e0bf702dbe6a430a364f565a
+pbd12ca68af463495f19733bfbc89e998
+pda63dbf0bf075e4e8fa9347a756d372e
+p094eeea2b8fe63df13300c40a5738c9f
+p11672679f70b4a1e334e646d9b24e90c
+p987d680340305796155e6ccaed7f79f2
+pebe4f5ba1818a42f7b784680ffd514b3
+pf56382ed2bf390727c6fea8d25970c08
+p832cf9ca7fc8e1baca1202308bfe0cfb
+p3035918df0c1500e26a4cadfb8d9c8ce
+p45ced20c4896828788998eb21e6303a7
+p2077e0d9d865ddd727b13000b250c9e8
+p411a598ef629dc8bd77be45449bec069
+p7875e557f709ce34626fbb368653dcd2
+p2961e343fce6411bb75e0e68365b5731
+p1368249f45ad85e975566aa40ee50ccf
+p75e13e7a4f343c2e7040e55b7e572e62
+pa96798589938e6944c5766f3b1d3393b
+p80b449a955cdb35a74e766838dbe785a
+p105dac8f3220501bf5bbe627719ffd97
+p0d186ba6c492b3f8ba8925f317be1028
+p080a7ae7cf9731eb64b02aed6f0b32ee
+p74dab2fc3bedc6047e51f413d2cd9355
+p7450516a140b982a3a1e21baaad02953
+p973b78259a62ef208ab81931e94a0b25
+p8dd03e1f9309171b06d0357717749252
+p14bcc66a9f312a4a66601ac11e4f547f
+pb6e5547ce793b4b7326e4c374d784e8f
+p797a31204304aeea82c04245313949e5
+p9a3502a8b6599af261195e74e2f9dcf4

+ 101 - 0
src/main/resources/esmapper/scroll.xml

@@ -0,0 +1,101 @@
+<properties>
+    <!--
+    简单的scroll query案例,复杂的条件修改query dsl即可
+    -->
+    <property name="scrollQuery">
+        <![CDATA[
+         {
+            "size":#[size],
+            "query": {
+    "bool": {
+      "should": [
+        {
+          "term": {
+            "legal_entities.id": {
+              "value": #[pid]
+            }
+          }
+        },{
+          "term": {
+            "holder_id": {
+              "value": #[pid]
+            }
+          }
+        },{
+          "term": {
+            "hid": {
+              "value": #[pid]
+            }
+          }
+        }
+      ]
+    }
+  },
+            "sort": [
+                "_doc"
+            ]
+        }
+        ]]>
+    </property>
+    <!--
+        简单的slice scroll query案例,复杂的条件修改query dsl即可
+    -->
+    <property name="scrollSliceQuery">
+        <![CDATA[
+         {
+           "slice": {
+                "id": #[sliceId], ## 必须使用sliceId作为变量名称
+                "max": #[sliceMax] ## 必须使用sliceMax作为变量名称
+            },
+            "size":#[size],
+            "query": {"match_all": {}},
+            "sort": [
+                "_doc"
+            ]
+        }
+        ]]>
+    </property>
+    <!--
+       一个简单的检索dsl,中有四个变量
+       applicationName1
+       applicationName2
+       startTime
+       endTime
+       通过map传递变量参数值
+
+       变量语法参考文档:
+   -->
+    <property name="searchPagineDatas">
+        <![CDATA[{
+            "query": {
+                "bool": {
+                  "should": [
+                    {
+                      "term": {
+                        "legal_entities.id": {
+                          "value": #[pid]
+                        }
+                      }
+                    },{
+                      "term": {
+                        "holder_id": {
+                          "value": #[pid]
+                        }
+                      }
+                    },{
+                      "term": {
+                        "hid": {
+                          "value": #[pid]
+                        }
+                      }
+                    }
+                  ]
+                }
+            },
+            ## 分页起点
+            "from":#[from],
+            ## 最多返回size条记录
+            "size":#[size]
+        }]]>
+    </property>
+</properties>

+ 224 - 0
src/test/java/com/winhc/test/TestPersonUpdate.java

@@ -0,0 +1,224 @@
+package com.winhc.test;
+
+import com.alibaba.fastjson.JSONObject;
+import com.google.common.base.Charsets;
+import com.google.common.io.Resources;
+import com.mongodb.bulk.BulkWriteResult;
+import com.mongodb.client.model.UpdateOneModel;
+import com.mongodb.client.model.UpdateOptions;
+import com.mongodb.client.model.WriteModel;
+import com.winhc.kafka.KafkaProduce;
+import com.winhc.service.impl.PersonMergeImpl;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.bson.Document;
+import org.frameworkset.elasticsearch.ElasticSearchHelper;
+import org.frameworkset.elasticsearch.boot.BBossESStarter;
+import org.frameworkset.elasticsearch.client.ClientInterface;
+import org.frameworkset.elasticsearch.entity.ESDatas;
+import org.frameworkset.elasticsearch.entity.RestResponse;
+import org.frameworkset.elasticsearch.entity.SearchHit;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.neo4j.driver.Driver;
+import org.neo4j.driver.Record;
+import org.neo4j.driver.Session;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.data.mongodb.core.MongoTemplate;
+import org.springframework.test.context.junit4.SpringRunner;
+
+import javax.annotation.PostConstruct;
+import java.io.IOException;
+import java.net.URL;
+import java.util.*;
+import java.util.stream.Collectors;
+
+/**
+ * @author π
+ * @Description:人员移除测试
+ * @date 2023/2/23 16:37
+ */
+@RunWith(SpringRunner.class)
+@SpringBootTest
+@Slf4j
+public class TestPersonUpdate {
+
+    @Autowired
+    @Qualifier("bbossESStarterEs5")
+    public BBossESStarter bbossESStarterEs5;
+
+    public ClientInterface restClient5;
+
+    @Autowired
+    @Qualifier("bbossESStarterEs6")
+    public BBossESStarter bbossESStarterEs6;
+
+    public ClientInterface restClient6;
+
+    @Autowired
+    public MongoTemplate mongoTemplate;
+
+    @Autowired
+    @Qualifier("DriverV1")
+    Driver driver;
+
+    @Autowired
+    KafkaProduce kafkaProduce;
+
+    public ClientInterface clientUtil;
+
+    public List<String> indexs = Arrays.asList("winhc_index_rt_company", "winhc_index_rt_company_holder", "winhc_index_rt_company_staff");
+
+    @PostConstruct
+    public void init() {
+        restClient5 = bbossESStarterEs5.getRestClient("es5");
+        restClient6 = bbossESStarterEs6.getRestClient("es6");
+        clientUtil = ElasticSearchHelper.getConfigRestClientUtil("es6", "esmapper/scroll.xml");
+    }
+
+    private String loadResourceAsString(String resource) throws IOException {
+        URL url = Resources.getResource(resource);
+        return Resources.toString(url, Charsets.UTF_8);
+    }
+
+    @Test
+    public void startUpdate() throws IOException {
+
+        //String out_mongo = "xf_test1";
+        String out_mongo = "company_back_update_v9";
+        //人员id映射表
+        //String mapping_index = "winhc_company_human_pid_mapping";
+        String mapping_index = "winhc_company_human_pid_mapping_v9";
+        //老板映射表
+        //String relation_index = "winhc-company-human-relation";
+        String relation_index = "winhc-company-human-relation-v9";
+
+        Arrays.stream(loadResourceAsString("data/update_pid.txt")
+                .split("\n"))
+                .filter(StringUtils::isNotBlank)
+                .distinct()
+                .forEach(pid -> {
+                    //执行删除
+                    run(pid, out_mongo, mapping_index, relation_index);
+                });
+
+
+    }
+
+    private void run(String pid, String out_mongo, String mapping_index, String relation_index) {
+        //更新映射表摘要 -ES
+        updateMappingEsIndex(pid, mapping_index);
+        //更新老板摘要  -ES
+        updateRelationEsIndex(pid, relation_index);
+        //移除图库人员 -Neo4j
+        deletedGraphNode(pid);
+
+        //待更新 数据回推 -HBase -Mongo
+        callBackUpdate(pid, out_mongo);
+    }
+
+    public void callBackUpdate(String pid, String out_mongo) {
+        indexs.forEach(index1 -> callRowkey(pid, index1, out_mongo));
+    }
+
+    public void callRowkey(String pid, String queryIndex, String out_mongo) {
+
+        //回推mongo
+        Map params = new HashMap();
+        params.put("from", 0);
+        params.put("size", 10000);
+        params.put("pid", pid);
+        ESDatas<Map> redata = clientUtil.searchList(queryIndex + "/_search",
+                "searchPagineDatas", params, Map.class);
+
+        RestResponse restResponse = (RestResponse) redata.getRestResponse();
+        List<SearchHit> searchHits = restResponse.getSearchHits().getHits();
+        List<Document> collect = searchHits.stream().map(d -> {
+            JSONObject doc = new JSONObject();
+            String rowkey = d.getId();
+            String company_id = rowkey.split("_")[0];
+            doc.put("rowkey", rowkey);
+            doc.put("company_id", company_id);
+            String index = d.getIndex();
+            String tn;
+            if (index.contains("staff")) {
+                tn = "company_staff";
+            } else if (index.contains("holder")) {
+                tn = "company_holder";
+            } else {
+                tn = "company";
+            }
+            doc.put("tn", tn);
+            JSONObject j = new JSONObject();
+            j.put(company_id, Collections.singletonList(doc));
+            j.put("_id", rowkey + tn);
+            return new Document(j);
+        }).collect(Collectors.toList());
+
+        if (!collect.isEmpty()) {
+            UpdateOptions uo = new UpdateOptions().upsert(true).bypassDocumentValidation(true);
+            List<WriteModel<Document>> list = new ArrayList<>();
+            collect.forEach(m -> {
+                list.add(new UpdateOneModel<>(new Document("_id", m.get("_id")), new Document("$set", m), uo));
+            });
+            BulkWriteResult writeResult = mongoTemplate.getCollection(out_mongo).bulkWrite(list);
+            System.out.println(writeResult.getMatchedCount());
+        }
+
+    }
+
+    public void updateMappingEsIndex(String pid, String mapping_index) {
+        //移除映射表
+        String dsl1 = "{\n" +
+                "  \"query\": {\n" +
+                "    \"term\": {\n" +
+                "      \"human_pid\": {\n" +
+                "        \"value\": \"" + pid + "\"\n" +
+                "      }\n" +
+                "    }\n" +
+                "  }\n" +
+                "}";
+        System.out.println(dsl1);
+        String res1 = restClient5.updateByQuery(mapping_index + "/_delete_by_query?conflicts=proceed&refresh=true&wait_for_completion=false", dsl1);
+        System.out.println(res1);
+    }
+
+    public void updateRelationEsIndex(String pid, String relation_index) {
+
+        //移除老板聚合表
+        String dsl2 = "{\n" +
+                "  \"query\": {\n" +
+                "    \"terms\": {\n" +
+                "      \"_id\": [\n" +
+                "        \"" + pid + "\"\n" +
+                "      ]\n" +
+                "    }\n" +
+                "  },\n" +
+                "  \"script\": {\n" +
+                "    \"inline\": \" ctx._source['deleted'] = '9' \"\n" +
+                "  }\n" +
+                "}";
+        System.out.println(dsl2);
+        String res2 = restClient5.updateByQuery(relation_index + "/_update_by_query?conflicts=proceed&refresh=true&wait_for_completion=false", dsl2);
+        System.out.println(res2);
+
+
+    }
+
+    public void deletedGraphNode(String pid) {
+        //移除图库节点
+        long start = System.currentTimeMillis();
+        Session session = driver.session();
+        final String cql =
+                "MATCH (n:`个人`{person_id:\"" + pid + "\"}) \n" +
+                        "DETACH DELETE n";
+        log.info("consumer  cql:{}", cql);
+
+        List<Map<String, Object>> re = session.writeTransaction(tx -> tx.run(cql).list()).stream().map(Record::asMap).collect(Collectors.toList());
+        System.out.println(JSONObject.toJSONString(re));
+        session.close();
+        log.info("class:{} |  cost:{}", PersonMergeImpl.class.getSimpleName(), (System.currentTimeMillis() - start));
+    }
+}