Pārlūkot izejas kodu

feat(DynamicJob): add flag 8 handler

JimZhang 3 gadi atpakaļ
vecāks
revīzija
e5e4377a01

+ 50 - 1
src/main/java/com/winhc/bigdata/task/configuration/BBossESConfiguration.java

@@ -1,18 +1,67 @@
 package com.winhc.bigdata.task.configuration;
 
+import com.winhc.bigdata.task.service.DingTalkService;
+import com.winhc.bigdata.task.util.ThrowableUtils;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.frameworkset.elasticsearch.ElasticSearchHelper;
 import org.frameworkset.elasticsearch.boot.BBossESStarter;
+import org.frameworkset.elasticsearch.bulk.BulkCommand;
+import org.frameworkset.elasticsearch.bulk.BulkInterceptor;
+import org.frameworkset.elasticsearch.bulk.BulkProcessorBuilder;
 import org.frameworkset.elasticsearch.client.ClientInterface;
 import org.springframework.context.annotation.Bean;
 import org.springframework.context.annotation.Configuration;
+import org.frameworkset.elasticsearch.bulk.BulkProcessor;
 
 /**
  * @author ZhangJi
  * @since 2021-08-03 09:36
  */
 @Configuration
+@Slf4j
+@RequiredArgsConstructor
 public class BBossESConfiguration {
+    private final DingTalkService dingTalkService;
+
     @Bean
     public ClientInterface bbossESClient(BBossESStarter bBossESStarter) {
-        return bBossESStarter.getRestClient();
+        return bBossESStarter.getConfigRestClient("esmapper/scroll.xml");
+    }
+
+    @Bean
+    public BulkProcessor buildBulkProcessor() {
+        //定义BulkProcessor批处理组件构建器
+        BulkProcessorBuilder bulkProcessorBuilder = new BulkProcessorBuilder();
+        bulkProcessorBuilder.setBlockedWaitTimeout(0)//指定bulk数据缓冲队列已满时后续添加的bulk数据排队等待时间,如果超过指定的时候数据将被拒绝处理,单位:毫秒,默认为0,不拒绝并一直等待成功为止
+                .setBulkFailRetry(1)//如果处理失败,重试次数,暂时不起作用
+                .setBulkSizes(1000)//按批处理数据记录数
+                .setFlushInterval(5000)//强制bulk操作时间,单位毫秒,如果自上次bulk操作flushInterval毫秒后,数据量没有满足BulkSizes对应的记录数,但是有记录,那么强制进行bulk处理
+                .setWarnMultsRejects(1000)//bulk处理操作被每被拒绝WarnMultsRejects次(1000次),在日志文件中输出拒绝告警信息
+                .setWorkThreads(20)//bulk处理工作线程数
+                .setWorkThreadQueue(20)//bulk处理工作线程池缓冲队列大小
+                .setBulkProcessorName("dynamic_processor")//工作线程名称,实际名称为BulkProcessorName-+线程编号
+                .setBulkRejectMessage("Reject dynamic bulk processor")//bulk处理操作被每被拒绝WarnMultsRejects次(1000次),在日志文件中输出拒绝告警信息提示前缀
+//				.setElasticsearch("default")//指定Elasticsearch集群数据源名称,bboss可以支持多数据源
+                .addBulkInterceptor(new BulkInterceptor() {
+                    public void beforeBulk(BulkCommand bulkCommand) {
+
+                    }
+
+                    public void afterBulk(BulkCommand bulkCommand, String result) {
+
+                    }
+
+                    public void exceptionBulk(BulkCommand bulkCommand, Throwable exception) {
+                        dingTalkService.error("{}", exception.getMessage());
+                        exception.printStackTrace();
+                    }
+
+                    public void errorBulk(BulkCommand bulkCommand, String result) {
+                        log.error("insert some error {}", result);
+                    }
+                });
+
+        return bulkProcessorBuilder.build();//构建批处理作业组件
     }
 }

+ 9 - 0
src/main/java/com/winhc/bigdata/task/controller/TestController.java

@@ -1,10 +1,12 @@
 package com.winhc.bigdata.task.controller;
 
+import com.alibaba.fastjson.JSONObject;
 import com.aliyun.odps.OdpsException;
 import com.winhc.bigdata.task.framework.odps.ExportDataCenter;
 import com.winhc.bigdata.task.framework.odps.service.PullRequestRecordService;
 import com.winhc.bigdata.task.framework.odps.utils.ExecOdpsSqlUtil;
 import com.winhc.bigdata.task.framework.odps.vo.MaxComputeSqlInstance;
+import com.winhc.bigdata.task.jobs.DynamicPersonIdUpdateJob;
 import lombok.AllArgsConstructor;
 import lombok.RequiredArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
@@ -28,6 +30,7 @@ public class TestController {
     private final ExportDataCenter exportDataCenter;
     private final ExecOdpsSqlUtil execOdpsSqlUtil;
     private final KafkaTemplate<String, String> kafkaTemplate;
+    private final DynamicPersonIdUpdateJob dynamicPersonIdUpdateJob;
 
     @GetMapping("pull")
     public Object pull(String sqlId,String toMongoDbName, String pullBy) {
@@ -68,4 +71,10 @@ public class TestController {
         return kafkaTemplate.send(topic, payload)
                 .get().toString();
     }
+
+    @PostMapping("dynamic8")
+    public Object dynamic8(@RequestBody DynamicPersonIdUpdateJob.ScanParams jo) {
+        dynamicPersonIdUpdateJob.scan(jo);
+        return "ok";
+    }
 }

+ 88 - 5
src/main/java/com/winhc/bigdata/task/jobs/DynamicPersonIdUpdateJob.java

@@ -6,22 +6,26 @@ import com.alibaba.fastjson.JSONObject;
 import com.winhc.bigdata.task.service.DingTalkService;
 import com.winhc.bigdata.task.util.ElasticsearchQueryUtil;
 import com.winhc.bigdata.task.util.ThrowableUtils;
+import lombok.Data;
 import lombok.RequiredArgsConstructor;
 import lombok.experimental.ExtensionMethod;
 import lombok.extern.slf4j.Slf4j;
+import lombok.var;
 import org.apache.commons.lang3.StringUtils;
 import org.elasticsearch.common.TriFunction;
 import org.elasticsearch.common.collect.Tuple;
+import org.frameworkset.elasticsearch.bulk.BulkProcessor;
 import org.frameworkset.elasticsearch.client.ClientInterface;
+import org.frameworkset.elasticsearch.entity.ESDatas;
+import org.frameworkset.elasticsearch.entity.MetaMap;
+import org.frameworkset.elasticsearch.entity.SearchHit;
 import org.frameworkset.util.CollectionUtils;
+import org.frameworkset.util.ObjectUtils;
 import org.springframework.kafka.annotation.KafkaListener;
 import org.springframework.kafka.core.KafkaTemplate;
 import org.springframework.stereotype.Component;
 
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
@@ -44,6 +48,7 @@ public class DynamicPersonIdUpdateJob {
     private final ClientInterface restClient;
     private final KafkaTemplate<String, String> kafkaTemplate;
     private final DingTalkService dingTalkService;
+    private final BulkProcessor processor;
 
     public static <T> Predicate<T> distinctByKey(Function<? super T, ?> keyExtractor) {
         Set<Object> seen = ConcurrentHashMap.newKeySet();
@@ -115,11 +120,14 @@ public class DynamicPersonIdUpdateJob {
         batchFromCreate(collect);
     }
 
+
+    /*=======================flag 0 8 9================*/
     private void batchFromCreate(Map<String, List<JSONObject>> params) {
         CompletableFuture<Void> do0 = doAll(params.get("0"), ElasticsearchQueryUtil::createPersonId);
         CompletableFuture<Void> do9 = doAll(params.get("9"), ElasticsearchQueryUtil::deleteDynamic);
+        CompletableFuture<Void> do8 = do8(params.get("8"));
         try {
-            CompletableFuture.allOf(do0, do9).get();
+            CompletableFuture.allOf(do0, do9, do8).get();
         } catch (InterruptedException | ExecutionException e) {
             log.error("waiting processor error", e);
             dingTalkService.error("{}\n{}", e.getMessage(), ThrowableUtils.getStackTraceByPn(e, "com.winhc.bigdata.task"));
@@ -127,6 +135,7 @@ public class DynamicPersonIdUpdateJob {
         }
     }
 
+
     private CompletableFuture<Void> doAll(List<JSONObject> params, TriFunction<String, String, List<Tuple<String, String>>, String> function) {
         if (CollectionUtils.isEmpty(params)) {
             return CompletableFuture.completedFuture(null);
@@ -168,12 +177,86 @@ public class DynamicPersonIdUpdateJob {
         }, TASK_FJ_POOL);
     }
 
+    private CompletableFuture<Void> do8(List<JSONObject> params) {
+        if (CollectionUtils.isEmpty(params)) {
+            return CompletableFuture.completedFuture(null);
+        }
+        List<CompletableFuture<Void>> cr = params.stream().map(j -> new ScanParams(j.getString("tn"), j.getString("company_id"), j.getString("relation_id"))).map(p -> CompletableFuture.runAsync(() -> scan(p))).collect(Collectors.toList());
+        List<CompletableFuture<Void>> rc = params.stream().map(j -> new ScanParams(j.getString("tn"), j.getString("relation_id"), j.getString("company_id"))).map(p -> CompletableFuture.runAsync(() -> scan(p))).collect(Collectors.toList());
+        cr.addAll(rc);
+        return CompletableFuture.allOf(cr.toArray(new CompletableFuture[0]));
+    }
+
+    @RequiredArgsConstructor
+    @Data
+    public static class ScanParams {
+        private final String tn;
+        private final String keyno;
+        private final String validId;
+    }
+
+    public void scan(ScanParams params) {
+        restClient.scroll("winhc-company-dynamic/_search", "scrollQuery", "1m", params.toMap(), MetaMap.class, (response, handlerInfo) -> {
+            Optional
+                    .ofNullable(response)
+                    .map(ESDatas::getDatas)
+                    .ifPresent(
+                            l -> l.forEach(m -> remove8(m.getId(), m.getIndex(), params.getKeyno(), params.getValidId(), m))
+//                            l -> l.forEach(hit-> remove8(hit.getId(), params.getKeyno(), params.getValidId(),hit.asMap()))
+                    );
+        });
+    }
+
+
+    private void remove8(String id, String index, String keyno, String validId, Map<String, Object> data) {
+        JSONObject jData = new JSONObject(data);
+        String dynamic_info = jData.getString("dynamic_info");
+        if (StringUtils.isEmpty(dynamic_info)) return;
+        try {
+            JSONObject jsonObject = JSON.parseObject(dynamic_info);
+            JSONArray content = jsonObject.getJSONArray("content");
+            if (ObjectUtils.isEmpty(content)) {
+                log.warn("content为空");
+                return;
+            }
+            List<JSONObject> collect = content.toJavaList(JSONObject.class).stream().filter(j -> {
+                JSONObject entity = j.getJSONObject("entity");
+                if (!keyno.equals(entity.getString("keyno"))) return true;
+                return !j.anyMatchByNestedObject(validId, "keyno", "before", "after");
+            }).collect(Collectors.toList());
+            if (collect.isEmpty()) {
+                Map<String, Object> map = new HashMap<>();
+                map.put("deleted", "9");
+                map.put("id", id);
+                processor.updateData(index, "dynamic", map);
+            } else {
+                jsonObject.put("content", collect);
+                jsonObject.put("id", id);
+                processor.insertData(index, "dynamic", jsonObject);
+            }
+        } catch (Exception e) {
+            log.error("parse json error", e);
+            dingTalkService.error("{}\n{}", e.getMessage(), ThrowableUtils.getStackTraceByPn(e, "com.winhc.bigdata.task"));
+        }
+    }
+
     @SuppressWarnings("unused")
     public static class StringStringMapExt {
+        public static boolean anyMatchByNestedObject(JSONObject jo, String val, String key, String... fields) {
+            return Stream.of(fields).map(jo::getJSONObject).filter(Objects::nonNull).map(j -> j.getString(key)).anyMatch(val::equals);
+        }
+
         public static Stream<JSONObject> toJSONStream(List<String> strings) {
             return strings.stream().map(JSONObject::parseObject);
         }
 
+        public static Map<String, Object> toMap(ScanParams sp) {
+            Map<String, Object> stringObjectHashMap = new HashMap<>();
+            stringObjectHashMap.put("tn", sp.getTn());
+            stringObjectHashMap.put("keyno", sp.getKeyno());
+            return stringObjectHashMap;
+        }
+
         public static JSONArray orDefault(JSONArray j, JSONArray dj) {
             return CollectionUtils.isEmpty(j) ? dj : j;
         }

+ 50 - 0
src/main/resources/esmapper/scroll.xml

@@ -0,0 +1,50 @@
+<properties>
+    <!--
+    简单的scroll query案例,复杂的条件修改query dsl即可
+    -->
+    <property name="scrollQuery">
+        <![CDATA[
+        {
+          "size": 1000,
+          "query": {
+            "bool": {
+              "must": [
+                {
+                  "term": {
+                    "tn": {
+                      "value": #[tn]
+                    }
+                  }
+                },
+                {
+                  "term": {
+                    "association_entity_info.keyno": {
+                      "value": #[keyno]
+                    }
+                  }
+                }
+              ]
+            }
+          }
+        }
+        ]]>
+    </property>
+    <!--
+        简单的slice scroll query案例,复杂的条件修改query dsl即可
+    -->
+    <property name="scrollSliceQuery">
+        <![CDATA[
+         {
+           "slice": {
+                "id": #[sliceId], ## 必须使用sliceId作为变量名称
+                "max": #[sliceMax] ## 必须使用sliceMax作为变量名称
+            },
+            "size":#[size],
+            "query": {"match_all": {}},
+            "sort": [
+                "_doc"
+            ]
+        }
+        ]]>
+    </property>
+</properties>