Browse Source

切换mapping,查老板索引

xufei 3 năm trước cách đây
mục cha
commit
c0acbc3f31

+ 1 - 0
src/main/java/com/winhc/kafka/consumer/KafkaConsumerCaseMergeV2.java

@@ -25,6 +25,7 @@ import java.util.Map;
 @Slf4j
 //@Service
 @AllArgsConstructor
+@Deprecated
 public class KafkaConsumerCaseMergeV2 {
 
     private final Map<String, RelationService> map;

+ 1 - 0
src/main/java/com/winhc/kafka/consumer/KafkaConsumerPersonCompanys.java

@@ -26,6 +26,7 @@ import java.util.stream.Collectors;
 @Slf4j
 //@Service
 @AllArgsConstructor
+@Deprecated
 public class KafkaConsumerPersonCompanys {
 
     private final Map<String, RelationService> map;

+ 2 - 2
src/main/java/com/winhc/kafka/consumer/KafkaConsumerPersonIdUpdate.java

@@ -94,8 +94,8 @@ public class KafkaConsumerPersonIdUpdate {
             return CompletableFuture.completedFuture(null);
         }
 
-        CompletableFuture<Boolean> v1 = updateByQuery(old_human_pid, new_human_pid, "company-human-pid-mapping-v1", ESUtils.updateHumanMappingId(old_human_pid, new_human_pid));
-        CompletableFuture<Boolean> v2 = updateByQuery(old_human_pid, new_human_pid, "company-human-relation-v1", ESUtils.updateBossId(old_human_pid));
+        CompletableFuture<Boolean> v1 = updateByQuery(old_human_pid, new_human_pid, "winhc_company_human_pid_mapping_v9", ESUtils.updateHumanMappingId(old_human_pid, new_human_pid));
+        CompletableFuture<Boolean> v2 = updateByQuery(old_human_pid, new_human_pid, "winhc-company-human-relation-v9", ESUtils.updateBossId(old_human_pid));
 
         return CompletableFuture.allOf(v1, v2).thenApplyAsync(x -> {
             try {

+ 1 - 0
src/main/java/com/winhc/kafka/consumer/KafkaConsumerPersonMerge.java

@@ -22,6 +22,7 @@ import java.util.Map;
 @Slf4j
 //@Service
 @AllArgsConstructor
+@Deprecated
 public class KafkaConsumerPersonMerge {
 
     private final Map<String, RelationService> map;

+ 27 - 16
src/test/java/com/winhc/test/TestJson.java

@@ -7,6 +7,7 @@ import com.alibaba.fastjson.parser.Feature;
 import com.alibaba.fastjson.serializer.DoubleSerializer;
 import com.alibaba.fastjson.serializer.SerializeConfig;
 import com.google.gson.Gson;
+import com.winhc.pojo.MergePerson;
 import com.winhc.utils.CompanyUtils;
 import org.apache.ibatis.javassist.expr.NewArray;
 import org.junit.Test;
@@ -25,16 +26,16 @@ public class TestJson {
         List<Map<String, Object>> batch_list = new ArrayList<>();
 
 //        for (int i = 0; i <= 10; i++) {
-            Integer i = 3;
-            HashMap<String, Object> m1 = new HashMap<>();
-            m1.put("startId", "startId" + i);
-            m1.put("endId", "endId" + i);
-            m1.put("startName", "startName" + i);
-            m1.put("endName", "endName" + i);
-            m1.put("deleted", 1);
-            m1.put("percent", i * 0.5);
-            m1.put("rid", i * 3);
-            batch_list.add(m1);
+        Integer i = 3;
+        HashMap<String, Object> m1 = new HashMap<>();
+        m1.put("startId", "startId" + i);
+        m1.put("endId", "endId" + i);
+        m1.put("startName", "startName" + i);
+        m1.put("endName", "endName" + i);
+        m1.put("deleted", 1);
+        m1.put("percent", i * 0.5);
+        m1.put("rid", i * 3);
+        batch_list.add(m1);
 //        }
         String s = JSON.toJSONString(m1);
         JSON.DEFAULT_PARSER_FEATURE &= ~Feature.UseBigDecimal.getMask();
@@ -55,19 +56,19 @@ public class TestJson {
     }
 
     @Test
-    public void test2(){
+    public void test2() {
         String r1 = CompanyUtils.getIncrPersonLabel();
         System.out.println(r1);
     }
 
     @Test
-    public void test3(){
-        List<String> list = Arrays.asList("小米","淘宝","京东");
+    public void test3() {
+        List<String> list = Arrays.asList("小米", "淘宝", "京东");
         StringBuilder sb = new StringBuilder();
         sb.append("[");
         for (int i = 0; i < list.size(); i++) {
             sb.append("'").append(list.get(i)).append("'");
-            if(i != list.size() -1){
+            if (i != list.size() - 1) {
                 sb.append(",");
             }
         }
@@ -77,9 +78,19 @@ public class TestJson {
     }
 
     @Test
-    public void test4(){
-        String id = "07bbf1d166b27b44116e8a5c575e23ed";
+    public void test4() {
+        Collection<MergePerson> values = new ArrayList<>();
+        MergePerson m = new MergePerson("1", "n1", "2", "n2");
+        values.add(m);
+//        MergePerson m1 = new MergePerson("1", "n1", "2", "n2");
+//        values.add(m1);
+        System.out.println(JSON.toJSONString(values));
 
+        JSONObject j = new JSONObject();
+        j.put("k1", "1");
+        j.put("k2", "2");
+
+        System.out.println(JSON.toJSONString(Collections.singletonList(j)));
 
     }
 }

+ 49 - 7
src/test/java/com/winhc/test/TestMongo.java

@@ -1,17 +1,19 @@
 package com.winhc.test;
 
+import com.google.common.collect.ImmutableMap;
 import com.winhc.db.mongodb.dataobject.NodeRelationError;
 import com.winhc.db.mongodb.repository.NodeRelatonErrorRepository;
+import com.winhc.service.PidToMongoService;
 import com.winhc.service.impl.NodeRelationSuccessStatusServiceImpl;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.test.context.junit4.SpringRunner;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Map;
+
+import java.util.*;
+
+import static com.winhc.utils.DateUtil.formatDate_YYYY_MM_DD_HH_MM_SS;
 
 /**
  * @author π
@@ -26,6 +28,10 @@ public class TestMongo {
 
     @Autowired
     NodeRelationSuccessStatusServiceImpl nodeRelationSuccessStatusServiceImpl;
+
+    @Autowired
+    PidToMongoService pidToMongoService;
+
     @Test
     public void saveData() {
         ArrayList<NodeRelationError> list = new ArrayList<>();
@@ -39,14 +45,50 @@ public class TestMongo {
         list.add(n1);
         nodeRelatonErrorRepository.saveAll(list);
     }
+
     @Test
     public void saveStaus() {
         HashMap<String, Object> m = new HashMap<>();
-        m.put("topic_type","100");
-        m.put("ds","20210219");
-        m.put("status","1");
+        m.put("topic_type", "100");
+        m.put("ds", "20210219");
+        m.put("status", "1");
         ArrayList<Map<String, Object>> list = new ArrayList<>();
         list.add(m);
         nodeRelationSuccessStatusServiceImpl.save(list);
     }
+
+    @Test
+    public void insertMongo() {
+        ImmutableMap<String, Object> r1 = ImmutableMap.<String, Object>of(
+                "_id", "p1",
+                "update_time", formatDate_YYYY_MM_DD_HH_MM_SS(),
+                "data", ImmutableMap.of(
+                        "person_id", "p1"
+                        , "name", "钱胜前"
+                        , "topic_type", "800"
+                )
+        );
+        ImmutableMap<String, Object> r2 = ImmutableMap.<String, Object>of(
+                "_id", "p6",
+                "update_time", formatDate_YYYY_MM_DD_HH_MM_SS(),
+                "data", ImmutableMap.of(
+                        "person_id", "p6"
+                        , "name", "曾妍"
+                        , "topic_type", "800"
+                )
+        );
+        ImmutableMap<String, Object> r3 = ImmutableMap.<String, Object>of(
+                "_id", "p1",
+                "update_time", formatDate_YYYY_MM_DD_HH_MM_SS(),
+                "data", ImmutableMap.of(
+                        "person_id", "p1"
+                        , "name", "钱胜前"
+                        , "topic_type", "800"
+                )
+        );
+        long start = System.currentTimeMillis();
+        int res = pidToMongoService.save(Arrays.asList(r1, r2, r3));
+        System.out.println("cost : " + (System.currentTimeMillis() - start));
+        System.out.println(res);
+    }
 }

+ 37 - 0
src/test/java/com/winhc/test/TestSendKafka.java

@@ -1,6 +1,7 @@
 package com.winhc.test;
 
 import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
 import com.winhc.kafka.KafkaProduce;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -10,6 +11,7 @@ import org.springframework.beans.factory.annotation.Autowired;
 import org.springframework.boot.test.context.SpringBootTest;
 import org.springframework.test.context.junit4.SpringRunner;
 import java.util.HashMap;
+import java.util.Map;
 
 /**
  * @author π
@@ -39,5 +41,40 @@ public class TestSendKafka {
         System.out.println(System.currentTimeMillis() - start);
     }
 
+    @Test
+    public void sendMergePerson() {
+        String topic = "test3";
+        Map<String,String> m1 = new HashMap<String,String>(){
+            {
+                put("person_id","p2");
+                put("name","钱胜前");
+                put("topic_type","800");
+            }
+        };
+        Map<String,String> m2 = new HashMap<String,String>(){
+            {
+                put("person_id","p6");
+                put("name","曾妍");
+                put("topic_type","800");
+            }
+        };
+        kafkaProduce.produce(topic, JSONObject.toJSONString(m1));
+        kafkaProduce.produce(topic, JSONObject.toJSONString(m2));
+    }
+
+    @Test
+    public void sendUpdatePerson() {
+        String message = "[{\"new_human_name\":\"钱胜前\",\"new_human_pid\":\"p2\",\"old_human_name\":\"钱胜前\",\"old_human_pid\":\"p1\",\"time\":\"2022-01-18 15:51:38\"},{\"new_human_name\":\"钱胜前\",\"new_human_pid\":\"p2\",\"old_human_name\":\"钱胜前\",\"old_human_pid\":\"p3\",\"time\":\"2022-01-18 15:51:38\"},{\"new_human_name\":\"曾妍\",\"new_human_pid\":\"p6\",\"old_human_name\":\"曾妍\",\"old_human_pid\":\"p4\",\"time\":\"2022-01-18 15:51:38\"}]";
+        String topic = "test5";
+//        Map<String,String> m1 = new HashMap<String,String>(){
+//            {
+//                put("new_human_pid","kfe7feabdc3bd7a148562ab94b3bb08bc");
+//                put("old_human_pid","pfe7feabdc3bd7a148562ab94b3bb08bc");
+//                put("new_human_name","张伟");
+//            }
+//        };
+        kafkaProduce.produce(topic, message);
+    }
+
 
 }