|
@@ -0,0 +1,114 @@
|
|
|
|
+package com.winhc.task.job;
|
|
|
|
+
|
|
|
|
+import com.mongodb.client.MongoCollection;
|
|
|
|
+import com.mongodb.client.MongoDatabase;
|
|
|
|
+import com.mongodb.client.model.UpdateOneModel;
|
|
|
|
+import com.mongodb.client.model.UpdateOptions;
|
|
|
|
+import com.mongodb.client.model.WriteModel;
|
|
|
|
+import com.winhc.task.framework.es.EsFastScan;
|
|
|
|
+import com.winhc.task.framework.mongo.MongoDbFastScan;
|
|
|
|
+import com.winhc.task.util.BaseUtils;
|
|
|
|
+import lombok.AllArgsConstructor;
|
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
|
+import lombok.val;
|
|
|
|
+import org.apache.commons.lang3.StringUtils;
|
|
|
|
+import org.apache.hadoop.hbase.TableName;
|
|
|
|
+import org.apache.hadoop.hbase.client.Connection;
|
|
|
|
+import org.apache.hadoop.hbase.client.Get;
|
|
|
|
+import org.bson.Document;
|
|
|
|
+import org.elasticsearch.client.RestHighLevelClient;
|
|
|
|
+import org.elasticsearch.search.SearchHit;
|
|
|
|
+import org.springframework.beans.factory.annotation.Autowired;
|
|
|
|
+import org.springframework.data.mongodb.core.MongoTemplate;
|
|
|
|
+import org.springframework.stereotype.Component;
|
|
|
|
+
|
|
|
|
+import java.nio.charset.StandardCharsets;
|
|
|
|
+import java.util.ArrayList;
|
|
|
|
+import java.util.Arrays;
|
|
|
|
+import java.util.List;
|
|
|
|
+import java.util.function.Consumer;
|
|
|
|
+import java.util.stream.Collectors;
|
|
|
|
+import java.util.stream.Stream;
|
|
|
|
+
|
|
|
|
+/**
|
|
|
|
+ * π
|
|
|
|
+ */
|
|
|
|
+@Slf4j
|
|
|
|
+@Component
|
|
|
|
+@AllArgsConstructor
|
|
|
|
+public class CompanyHolderMongoUpdate {
|
|
|
|
+ private final MongoTemplate mongoTemplate;
|
|
|
|
+ private RestHighLevelClient restHighLevelClient;
|
|
|
|
+
|
|
|
|
+ @Autowired
|
|
|
|
+ Connection connection;
|
|
|
|
+
|
|
|
|
+ public void start() {
|
|
|
|
+ MongoDatabase db = mongoTemplate.getDb();
|
|
|
|
+ MongoCollection<Document> re_mongo = mongoTemplate.getCollection("xf_company_holder_update_0705_v2_result");
|
|
|
|
+ Consumer<List<Document>> func = list -> {
|
|
|
|
+ List<String> ids = list.stream().map(d -> d.getString("_id")).collect(Collectors.toList());
|
|
|
|
+ extracted(re_mongo, ids);
|
|
|
|
+ };
|
|
|
|
+
|
|
|
|
+ MongoDbFastScan mongoDbFastScan = new MongoDbFastScan("xf_company_holder_update_0705_v2", func, db)
|
|
|
|
+ .batchSize(200).threadNum(5);
|
|
|
|
+ mongoDbFastScan.scan();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public void start2(String dsl, String mongo_table, String index, String type) {
|
|
|
|
+ MongoCollection<Document> re_mongo = mongoTemplate.getCollection(mongo_table);
|
|
|
|
+ Consumer<SearchHit[]> func = list -> {
|
|
|
|
+ List<String> ids = Arrays.stream(list).map(SearchHit::getId).collect(Collectors.toList());
|
|
|
|
+ extracted(re_mongo, ids);
|
|
|
|
+ };
|
|
|
|
+
|
|
|
|
+ new EsFastScan(restHighLevelClient, func, index, type, dsl, null).scan();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ private void extracted(MongoCollection<Document> re_mongo, List<String> ids) {
|
|
|
|
+ val gs = ids.stream().
|
|
|
|
+ distinct().map(vk -> vk.getBytes(StandardCharsets.UTF_8))
|
|
|
|
+ .map(Get::new).collect(Collectors.toList());
|
|
|
|
+ List<Document> res = new ArrayList<>();
|
|
|
|
+ try (val table = connection.getTable(TableName.valueOf("NG_RT_COMPANY_HOLDER"))) {
|
|
|
|
+ val rs = table.get(gs);
|
|
|
|
+ if (rs != null) {
|
|
|
|
+ res = Stream.of(rs)
|
|
|
|
+ .filter(result -> result != null && !result.isEmpty())
|
|
|
|
+ .map(BaseUtils::toJSONObjectLowerCase)
|
|
|
|
+ .map(m -> {
|
|
|
|
+ String country_cn = m.getString("country_cn");
|
|
|
|
+ if (StringUtils.isNotBlank(country_cn)) {
|
|
|
|
+ if (country_cn.contains("香港")) {
|
|
|
|
+ country_cn = "中国香港";
|
|
|
|
+ } else if (country_cn.contains("澳门")) {
|
|
|
|
+ country_cn = "中国澳门";
|
|
|
|
+ } else if (country_cn.contains("台湾")) {
|
|
|
|
+ country_cn = "中国台湾";
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ Document document = new Document();
|
|
|
|
+ document.put("_id", m.getString("rowkey"));
|
|
|
|
+ document.put("company_id", m.getString("company_id"));
|
|
|
|
+ document.put("company_name", m.getString("company_name"));
|
|
|
|
+ document.put("country_cn", country_cn);
|
|
|
|
+ document.put("holder_name", m.getString("holder_name"));
|
|
|
|
+ document.put("holder_type", m.getString("holder_type"));
|
|
|
|
+ document.put("holder_id", m.getString("holder_id"));
|
|
|
|
+ return document;
|
|
|
|
+ })
|
|
|
|
+ .collect(Collectors.toList());
|
|
|
|
+ }
|
|
|
|
+ List<WriteModel<Document>> list2 = new ArrayList<>();
|
|
|
|
+ res.forEach(m -> {
|
|
|
|
+ UpdateOptions updateOptions = new UpdateOptions().upsert(true).bypassDocumentValidation(true);
|
|
|
|
+ list2.add(new UpdateOneModel<>(new Document("_id", m.get("_id")), new Document("$set", m), updateOptions));
|
|
|
|
+ });
|
|
|
|
+ re_mongo.bulkWrite(list2);
|
|
|
|
+
|
|
|
|
+ } catch (Exception e) {
|
|
|
|
+ throw new RuntimeException("fetchHbase error" + e.getMessage(), e);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+}
|