Browse Source

清理es数据

xufei 4 years ago
parent
commit
27b6b3cbda

+ 91 - 0
src/main/java/com/winhc/config/ElasticSearchConfiguration.java

@@ -0,0 +1,91 @@
+package com.winhc.config;
+
+import org.apache.http.HttpHost;
+import org.apache.http.auth.AuthScope;
+import org.apache.http.auth.UsernamePasswordCredentials;
+import org.apache.http.client.CredentialsProvider;
+import org.apache.http.impl.client.BasicCredentialsProvider;
+import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestClientBuilder;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import java.util.stream.Stream;
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/9/15 20:01
+ * @Description:
+ */
+@Configuration
+public class ElasticSearchConfiguration {
+    @Value("${es.username}")
+    private String username;
+    @Value("${es.password}")
+    private String password;
+    @Value("${es.host}")
+    private String host;
+
+    @Value("${es.schema:http}")
+    String schema;
+    @Value(value = "${es.connect-timeout:100000}")
+    String connectTimeout;
+    @Value(value = "${es.socket-timeout:600000}")
+    String socketTimeout;
+    @Value(value = "${es.connection-request-timeout:50000}")
+    String connectionRequestTimeout;
+    @Value(value = "${es.max-conn-total:100}")
+    String maxConnTotal;
+    @Value(value = "${es.max-conn-per-route:100}")
+    String maxConnPerRoute;
+
+    @Bean
+    public RestClient bean() {
+        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+        credentialsProvider.setCredentials(AuthScope.ANY,
+                new UsernamePasswordCredentials(username, password));
+        // 单击所创建的Elasticsearch实例ID,在基本信息页面获取公网地址,即为HOST。
+        return RestClient.builder(new HttpHost(host, 9200))
+                .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
+                    @Override
+                    public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
+                        return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
+                    }
+                }).build();
+    }
+
+
+    @Bean
+    public RestHighLevelClient getClient() {
+        HttpHost[] httpHosts = Stream.of(host.split(",")).map(host -> {
+            String[] split = host.split(":");
+            return new HttpHost(split[0], 9200, schema);
+        }).toArray(HttpHost[]::new);
+
+        // 阿里云Elasticsearch集群需要basic auth验证。
+        final CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
+        //访问用户名和密码为您创建阿里云Elasticsearch实例时设置的用户名和密码,也是Kibana控制台的登录用户名和密码。
+        credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(username, password));
+
+
+        return new RestHighLevelClient(RestClient
+                .builder(httpHosts)
+                .setMaxRetryTimeoutMillis(60000 * 3)
+                .setRequestConfigCallback(builder -> {
+                    builder.setConnectTimeout(!connectTimeout.isEmpty() ? Integer.valueOf(connectTimeout) : 1000);
+                    builder.setSocketTimeout(!socketTimeout.isEmpty() ? Integer.valueOf(socketTimeout) : 60000);
+                    builder.setConnectionRequestTimeout(!connectionRequestTimeout.isEmpty() ? Integer.valueOf(connectionRequestTimeout) : 500);
+                    return builder;
+                })
+                .setHttpClientConfigCallback(httpAsyncClientBuilder -> {
+                    httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
+                    httpAsyncClientBuilder.setMaxConnTotal(!maxConnTotal.isEmpty() ? Integer.valueOf(maxConnTotal) : 100);
+                    httpAsyncClientBuilder.setMaxConnPerRoute(!maxConnPerRoute.isEmpty() ? Integer.valueOf(maxConnPerRoute) : 100);
+                    return httpAsyncClientBuilder;
+                }).build()
+        );
+    }
+}

+ 1 - 1
src/main/java/com/winhc/task/CaseIncrementTask.java

@@ -66,7 +66,7 @@ public class CaseIncrementTask {
     CaseConfig caseConfig;
 
     //TODO 启动合并任务
-    @Scheduled(cron = "00 00 13 * * ?")
+    @Scheduled(cron = "00 10 13 * * ?")
     //@Scheduled(cron = "*/20 * * * * ?")
     //@Scheduled(cron = "30 07 10 21 06 ?")
     public void mergeCaseScheduled() throws UnsupportedEncodingException {

+ 48 - 0
src/main/java/com/winhc/task/ESCleanTask.java

@@ -0,0 +1,48 @@
+package com.winhc.task;
+
+import com.winhc.utils.EsUtils;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.elasticsearch.client.RestClient;
+import org.springframework.scheduling.annotation.EnableScheduling;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+import java.io.IOException;
+
+/**
+ * @author π
+ * @Description:清理移除数据
+ * @date 2021/6/22 17:07
+ */
+
+@Component
+@Slf4j
+@EnableScheduling
+@AllArgsConstructor
+public class ESCleanTask {
+
+    private final RestClient restClient;
+
+    @Scheduled(cron = "00 00 19 * * ?")
+    //@Scheduled(cron = "*/20 * * * * ?")
+    //@Scheduled(cron = "30 17 14 23 06 ?")
+    public void cleanEsScheduled() throws IOException, InterruptedException {
+        String dsl = "{\n" +
+                "  \"query\": {\n" +
+                "    \"term\": {\n" +
+                "      \"deleted\": {\n" +
+                "        \"value\": \"1\"\n" +
+                "      }\n" +
+                "    }\n" +
+                "  }\n" +
+                "}";
+        String[] indexs = new String[]{"winhc_judicial_case", "winhc_judicial_case_detail"};
+        log.info("cleanEsScheduled start !!!");
+        for (String index : indexs) {
+            EsUtils.deletedByQuery(restClient, dsl, index);
+            Thread.sleep(1000 * 60 * 5);
+        }
+        log.info("cleanEsScheduled end !!!");
+
+    }
+}

+ 35 - 0
src/main/java/com/winhc/utils/EsUtils.java

@@ -0,0 +1,35 @@
+package com.winhc.utils;
+
+import com.alibaba.fastjson.JSON;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.http.HttpEntity;
+import org.apache.http.entity.ContentType;
+import org.apache.http.nio.entity.NStringEntity;
+import org.apache.http.util.EntityUtils;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.client.RestClient;
+
+import java.io.IOException;
+import java.util.Collections;
+
+/**
+ * @author π
+ * @Description:
+ * @date 2021/6/22 19:42
+ */
+@Slf4j
+public class EsUtils {
+
+    public static String deletedByQuery(RestClient restClient, String dsl,String index) throws IOException {
+        log.info("deletedByQuery start | index:{} | dsl: \n {} \n", index,dsl);
+        HttpEntity entity = new NStringEntity(dsl, ContentType.APPLICATION_JSON);
+        Response response = restClient.performRequest(
+                "POST",
+                "/"+index+"/_delete_by_query?slices=auto&conflicts=proceed&wait_for_completion=false",
+                Collections.emptyMap(),
+                entity);
+        String taskId = JSON.parseObject(EntityUtils.toString(response.getEntity())).getString("task");
+        log.info("deletedByQuery end | taskId:{} | !!!", taskId);
+        return taskId;
+    }
+}

+ 6 - 0
src/main/resources/application-dev.properties

@@ -70,3 +70,9 @@ spring.data.mongodb.uri=mongodb://itslaw:itslaw_168@dds-uf6ff5dfd9aef3641601-pub
 case.task.flow=judicial_case_relation_dev
 case.task.taskName=judicial_case_relation_aggs_dev
 case.task.topic=inc_case_id_dev
+
+#es环境
+es.username=elastic
+es.password=elastic_168
+es.host=es-cn-oew22t8bw002iferu.public.elasticsearch.aliyuncs.com
+

+ 6 - 1
src/main/resources/application-prd.properties

@@ -64,4 +64,9 @@ spring.data.mongodb.uri=mongodb://itslaw:itslaw_168@dds-uf6ff5dfd9aef3641.mongod
 #案件配置
 case.task.flow=judicial_case_relation
 case.task.taskName=judicial_case_relation_aggs
-case.task.topic=inc_case_id_prod
+case.task.topic=inc_case_id_prod
+
+#es环境
+es.username=elastic
+es.password=elastic_168
+es.host=es-cn-oew22t8bw002iferu.elasticsearch.aliyuncs.com

+ 53 - 0
src/test/java/com/winhc/ESSearchlTest.java

@@ -0,0 +1,53 @@
+package com.winhc;
+
+import com.alibaba.fastjson.JSON;
+import org.apache.http.HttpEntity;
+import org.apache.http.entity.ContentType;
+import org.apache.http.nio.entity.NStringEntity;
+import org.apache.http.util.EntityUtils;
+import org.elasticsearch.client.Response;
+import org.elasticsearch.client.RestClient;
+import org.junit.jupiter.api.Test;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.boot.test.context.SpringBootTest;
+import java.io.IOException;
+import java.util.Collections;
+
+@SpringBootTest
+class ESSearchlTest {
+
+    @Qualifier("bean")
+    @Autowired
+    RestClient restClient;
+
+
+    @Test
+    void tips() throws IOException {
+
+        String query = "{\n" +
+                "  \"query\": {\n" +
+                "    \"term\": {\n" +
+                "      \"deleted\": {\n" +
+                "        \"value\": \"1\"\n" +
+                "      }\n" +
+                "    }\n" +
+                "  }\n" +
+                "}";
+        HttpEntity entity = new NStringEntity(query, ContentType.APPLICATION_JSON);
+        Response response = restClient.performRequest(
+                "POST",
+                "/judicial_case_detail_v20210617_dev/_delete_by_query?slices=auto&conflicts=proceed&wait_for_completion=false",
+                Collections.emptyMap(),
+                entity);
+        String res = EntityUtils.toString(response.getEntity());
+        System.out.println(res);
+        String taskId = JSON.parseObject(res).getString("task");
+
+        String task = "/_tasks/" + taskId;
+        Response get = restClient.performRequest("GET", task);
+        String s = EntityUtils.toString(get.getEntity());
+        System.out.println(s);
+
+    }
+}