AsyncTask.java 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. package com.winhc.phoenix.example.task;
  2. import com.mongodb.client.MongoCollection;
  3. import com.winhc.phoenix.example.configuration.DynamicElasticSearchClient;
  4. import com.winhc.phoenix.example.framework.es.EsFastScan;
  5. import lombok.AllArgsConstructor;
  6. import org.bson.Document;
  7. import org.elasticsearch.search.SearchHit;
  8. import org.springframework.data.mongodb.core.MongoTemplate;
  9. import org.springframework.scheduling.annotation.Async;
  10. import org.springframework.scheduling.annotation.AsyncResult;
  11. import org.springframework.stereotype.Component;
  12. import java.util.Arrays;
  13. import java.util.List;
  14. import java.util.Map;
  15. import java.util.concurrent.Future;
  16. import java.util.function.Consumer;
  17. import java.util.stream.Collectors;
  18. /**
  19. * @author: XuJiakai
  20. * 2021/5/18 13:56
  21. */
  22. @Component
  23. @AllArgsConstructor
  24. public class AsyncTask {
  25. private final DynamicElasticSearchClient dynamicElasticSearchClient;
  26. private final MongoTemplate mongoTemplate;
  27. @Async
  28. public Future<Boolean> pullCreditPunishmentTouch() {
  29. String dsl = "{\n" +
  30. " \"_source\": [\"case_no\", \"name\", \"rowkey\"],\n" +
  31. " \"query\": {\n" +
  32. " \"bool\": {\n" +
  33. " \"filter\": {\n" +
  34. " \"bool\": {\n" +
  35. " \"should\": [\n" +
  36. " {\n" +
  37. " \"bool\": {\n" +
  38. " \"must\": [\n" +
  39. " {\"terms\": {\n" +
  40. " \"label\": [\n" +
  41. " \"失信被执行人\",\n" +
  42. " \"被执行人\",\n" +
  43. " \"终本案件\"\n" +
  44. " ]\n" +
  45. " }}, {\n" +
  46. " \"term\": {\n" +
  47. " \"deleted\": {\n" +
  48. " \"value\": \"0\"\n" +
  49. " }\n" +
  50. " }\n" +
  51. " }\n" +
  52. " ]\n" +
  53. " }\n" +
  54. " }, {\n" +
  55. " \"bool\": {\n" +
  56. " \"must\": [\n" +
  57. " {\"term\": {\n" +
  58. " \"deleted\": {\n" +
  59. " \"value\": \"0\"\n" +
  60. " }\n" +
  61. " }}, {\n" +
  62. " \"term\": {\n" +
  63. " \"label\": {\n" +
  64. " \"value\": \"限制高消费\"\n" +
  65. " }\n" +
  66. " }\n" +
  67. " }, {\n" +
  68. " \"script\": {\n" +
  69. " \"script\": \"if(doc['keyno'].value==null)return false; else return !(doc['keyno'].value.length()==32)\"\n" +
  70. " }\n" +
  71. " }\n" +
  72. " ]\n" +
  73. " }\n" +
  74. " }\n" +
  75. " ]\n" +
  76. " }\n" +
  77. " }\n" +
  78. " }\n" +
  79. " }\n" +
  80. "}";
  81. try {
  82. MongoCollection<Document> person = mongoTemplate.getCollection("credit_punishment_case_info_xjk");
  83. Consumer<SearchHit[]> func = list -> {
  84. List<Document> li = Arrays.stream(list).map(d -> {
  85. String id = d.getId();
  86. Map<String, Object> sourceAsMap = d.getSourceAsMap();
  87. Object case_no = sourceAsMap.get("case_no");
  88. Object name = sourceAsMap.get("name");
  89. Object rowkey = sourceAsMap.get("rowkey");
  90. Document document = new Document();
  91. document.put("doc_id", id);
  92. document.put("case_no", case_no);
  93. document.put("name", name);
  94. document.put("rowkey", rowkey);
  95. return document;
  96. }).collect(Collectors.toList());
  97. person.insertMany(li);
  98. };
  99. new EsFastScan(dynamicElasticSearchClient.getRestHighLevelClient(), func, "credit_punishment_case_info_v1", "_doc", dsl).scan();
  100. return new AsyncResult<>(true);
  101. } catch (Exception e) {
  102. return new AsyncResult<>(false);
  103. }
  104. }
  105. }