Browse Source

fix: dynamic update

许家凯 3 years ago
parent
commit
04a777593c

+ 23 - 3
src/main/java/com/winhc/bigdata/task/jobs/DynamicPersonIdUpdateJob.java

@@ -3,6 +3,7 @@ package com.winhc.bigdata.task.jobs;
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSON;
 import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONArray;
 import com.alibaba.fastjson.JSONObject;
 import com.alibaba.fastjson.JSONObject;
+import com.mongodb.client.MongoCollection;
 import com.winhc.bigdata.task.service.DingTalkService;
 import com.winhc.bigdata.task.service.DingTalkService;
 import com.winhc.bigdata.task.util.ElasticsearchQueryUtil;
 import com.winhc.bigdata.task.util.ElasticsearchQueryUtil;
 import com.winhc.bigdata.task.util.ThrowableUtils;
 import com.winhc.bigdata.task.util.ThrowableUtils;
@@ -11,6 +12,7 @@ import lombok.RequiredArgsConstructor;
 import lombok.experimental.ExtensionMethod;
 import lombok.experimental.ExtensionMethod;
 import lombok.extern.slf4j.Slf4j;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.StringUtils;
+import org.bson.Document;
 import org.elasticsearch.common.TriFunction;
 import org.elasticsearch.common.TriFunction;
 import org.elasticsearch.common.collect.Tuple;
 import org.elasticsearch.common.collect.Tuple;
 import org.frameworkset.elasticsearch.boot.BBossESStarter;
 import org.frameworkset.elasticsearch.boot.BBossESStarter;
@@ -21,6 +23,7 @@ import org.frameworkset.elasticsearch.entity.ESDatas;
 import org.frameworkset.elasticsearch.entity.MetaMap;
 import org.frameworkset.elasticsearch.entity.MetaMap;
 import org.frameworkset.util.CollectionUtils;
 import org.frameworkset.util.CollectionUtils;
 import org.frameworkset.util.ObjectUtils;
 import org.frameworkset.util.ObjectUtils;
+import org.springframework.data.mongodb.core.MongoTemplate;
 import org.springframework.kafka.annotation.KafkaListener;
 import org.springframework.kafka.annotation.KafkaListener;
 import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.stereotype.Component;
 import org.springframework.stereotype.Component;
@@ -52,6 +55,7 @@ public class DynamicPersonIdUpdateJob {
     private final KafkaTemplate<String, String> kafkaTemplate;
     private final KafkaTemplate<String, String> kafkaTemplate;
     private final DingTalkService dingTalkService;
     private final DingTalkService dingTalkService;
     private final BulkProcessor processor;
     private final BulkProcessor processor;
+    private final MongoTemplate mongoTemplate;
 
 
     @PostConstruct
     @PostConstruct
     public void init() {
     public void init() {
@@ -223,7 +227,23 @@ public class DynamicPersonIdUpdateJob {
 
 
 
 
     private void remove8(String id, String index, String keyno, String validId, Map<String, Object> data) {
     private void remove8(String id, String index, String keyno, String validId, Map<String, Object> data) {
-//        dingTalkService.send(JSON.toJSONString(data));
+        MongoCollection<Document> coll = mongoTemplate.getCollection("a_dynamic_update_flag_8_20210820");
+        Document document = new Document();
+        document.put("_id", id);
+        document.put("keyno", keyno);
+        document.put("validId", validId);
+        document.put("content", data);
+        try {
+            coll.insertOne(document);
+        } catch (Exception e) {
+            log.error(e.getMessage(), e);
+        }
+
+        int i = new Random().nextInt(3);
+        if (i == 0) {
+            log.info("riewqujir:\n{} : {}\n{}", keyno, validId, JSON.toJSONString(data));
+        }
+
         if (StringUtils.isEmpty(id)) {
         if (StringUtils.isEmpty(id)) {
             return;
             return;
         }
         }
@@ -254,12 +274,12 @@ public class DynamicPersonIdUpdateJob {
                 map.put("deleted", "9");
                 map.put("deleted", "9");
                 ClientOptions clientOptions = new ClientOptions();
                 ClientOptions clientOptions = new ClientOptions();
                 clientOptions.setId(id);
                 clientOptions.setId(id);
-                processor.updateData(index, "dynamic", map,clientOptions);
+                processor.updateData(index, "dynamic", map, clientOptions);
             } else {
             } else {
                 jsonObject.put("content", collect);
                 jsonObject.put("content", collect);
                 ClientOptions clientOptions = new ClientOptions();
                 ClientOptions clientOptions = new ClientOptions();
                 clientOptions.setId(id);
                 clientOptions.setId(id);
-                processor.insertData(index, "dynamic", jsonObject,clientOptions);
+                processor.insertData(index, "dynamic", jsonObject, clientOptions);
             }
             }
         } catch (Exception e) {
         } catch (Exception e) {
             log.error("parse json error", e);
             log.error("parse json error", e);