|
@@ -1,159 +1,159 @@
|
|
|
-package com.winhc.task.job;
|
|
|
-
|
|
|
-import cn.hutool.core.lang.Tuple;
|
|
|
-import com.alibaba.hologres.client.HoloClient;
|
|
|
-import com.alibaba.hologres.client.exception.HoloClientException;
|
|
|
-import com.winhc.task.bean.Alias;
|
|
|
-import com.winhc.task.bean.JobArgs;
|
|
|
-import com.winhc.task.common.Constant;
|
|
|
-import com.winhc.task.common.SummaryArgs;
|
|
|
-import com.winhc.task.util.FreeMarkUtil;
|
|
|
-import com.winhc.task.util.HoloUtils;
|
|
|
-import lombok.extern.slf4j.Slf4j;
|
|
|
-import org.apache.commons.lang3.StringUtils;
|
|
|
-import org.springframework.beans.factory.annotation.Autowired;
|
|
|
-import org.springframework.stereotype.Service;
|
|
|
-
|
|
|
-import java.io.IOException;
|
|
|
-import java.util.*;
|
|
|
-import java.util.stream.Collectors;
|
|
|
-import java.util.stream.Stream;
|
|
|
-
|
|
|
-@Slf4j
|
|
|
-@Service
|
|
|
-public class CalcSummaryJob {
|
|
|
- @Autowired
|
|
|
- SingleSynHoloJob holoSynJobs;
|
|
|
- @Autowired
|
|
|
- MultipleSynHoloJob multipleSynHoloJob;
|
|
|
- @Autowired
|
|
|
- EsIndexJobs esIndexJobs;
|
|
|
- @Autowired
|
|
|
- FreeMarkUtil freeMarkUtil;
|
|
|
-
|
|
|
- public void start(List<String> list, JobArgs jobArgs, Boolean synData) throws HoloClientException, IOException {
|
|
|
-
|
|
|
- String summary_company = "ng_rt_summary_company";
|
|
|
- String summary_person = "ng_rt_summary_person";
|
|
|
-
|
|
|
- ArrayList<String> tnList = new ArrayList<>();
|
|
|
- //初始化sql
|
|
|
- List<Tuple> collect = initSql(list, jobArgs, tnList);
|
|
|
- //计算摘要
|
|
|
- if (synData) {
|
|
|
- calcSummary(jobArgs, collect);
|
|
|
- }
|
|
|
- //建索引
|
|
|
- createIndex(jobArgs, tnList);
|
|
|
- //同步数据
|
|
|
- if (synData) {
|
|
|
- multipleSynHoloJob.start(tnList, jobArgs);
|
|
|
- }
|
|
|
- //切换个人,公司索引到生产
|
|
|
- //switch_index(jobArgs, summary_company, summary_person, tnList);
|
|
|
-
|
|
|
- }
|
|
|
-
|
|
|
- private void switch_index(JobArgs jobArgs, String summary_company, String summary_person, ArrayList<String> tnList) throws IOException {
|
|
|
- for (String tn : tnList) {
|
|
|
- if (Constant.PERSON_SUMMARYS.contains(tn)) {
|
|
|
- switchIndex(jobArgs.getTargetIndexPrefix(), summary_person, "person", jobArgs.getTargetIndexSuffix(), tn);
|
|
|
- }
|
|
|
- switchIndex(jobArgs.getTargetIndexPrefix(), summary_company, "", jobArgs.getTargetIndexSuffix(), tn);
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private void createIndex(JobArgs jobArgs, ArrayList<String> tnList) {
|
|
|
- for (String tn : tnList) {
|
|
|
- try {
|
|
|
- log.info("start create index tn : {} ", tn);
|
|
|
- if (Constant.PERSON_SUMMARYS.contains(tn)) {
|
|
|
- tranIndex(jobArgs, tn, "person");
|
|
|
- }
|
|
|
- tranIndex(jobArgs, tn, "");
|
|
|
- } catch (Exception e) {
|
|
|
- log.error("create index error : {} ,tn : {}", e.getMessage(), tn);
|
|
|
- }
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private void calcSummary(JobArgs jobArgs, List<Tuple> collect) throws HoloClientException {
|
|
|
- for (int i = 0; i < collect.size(); i++) {
|
|
|
- HoloClient holoClient = HoloUtils.init();
|
|
|
- String s0 = "";
|
|
|
- //初始化清空table
|
|
|
- if (i == 0) {
|
|
|
- s0 = "truncate table " + jobArgs.getHoloTable();
|
|
|
- }
|
|
|
- String tn = collect.get(i).get(0).toString();
|
|
|
- String s1 = "analyze " + jobArgs.getHoloTable();
|
|
|
- String s2 = "analyze ng_rt_" + tn;
|
|
|
- Stream.of(s0, s1, s2, collect.get(i).get(1)).filter(StringUtils::isNotBlank).forEach(x -> HoloUtils.exexSql(holoClient, x));
|
|
|
- holoClient.close();
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- private List<Tuple> initSql(List<String> list, JobArgs jobArgs, ArrayList<String> tnList) {
|
|
|
- return SummaryArgs.SUMMARY_ARGS.entrySet()
|
|
|
- .stream()
|
|
|
- .filter(x -> (list.isEmpty() || list.contains(x.getKey())))
|
|
|
- .map(x -> {
|
|
|
- String tn = x.getKey();
|
|
|
- SummaryArgs args = x.getValue();
|
|
|
- tnList.add(tn);
|
|
|
- String p_sql = args.getArgsInfo().stream().map(a -> {
|
|
|
- Map<String, String> m = new HashMap<>();
|
|
|
- m.put("holoTable", jobArgs.getHoloTable());
|
|
|
- m.put("tn", tn);
|
|
|
- m.put("keyno", a.getFilterField());
|
|
|
- if (StringUtils.isNotBlank(a.getCategory())) {
|
|
|
- m.put("category", "_" + a.getCategory());
|
|
|
- } else {
|
|
|
- m.put("category", "");
|
|
|
- }
|
|
|
- m.put("groupField", a.getGroupField());
|
|
|
- m.put("condition", "in ('0','1')");
|
|
|
- return freeMarkUtil.genStr("agg_sample_v1.ftl", m);
|
|
|
- }).collect(Collectors.joining("\nUNION ALL\n"));
|
|
|
- Map<String, String> m2 = new HashMap<>();
|
|
|
- m2.put("table_view", p_sql);
|
|
|
- m2.put("tn", tn);
|
|
|
- String calc_sql = freeMarkUtil.genStr("agg_sample_v2.ftl", m2);
|
|
|
- return new Tuple(tn, calc_sql);
|
|
|
- }).collect(Collectors.toList());
|
|
|
- }
|
|
|
-
|
|
|
- private void tranIndex(JobArgs jobArgs, String tn, String type) throws IOException {
|
|
|
- String c_index = Stream.of(jobArgs.getTargetIndexPrefix(), type, tn, jobArgs.getTargetIndexSuffix())
|
|
|
- .filter(StringUtils::isNotBlank).collect(Collectors.joining("_"));
|
|
|
- //esIndexJobs.deletedIndex(c_index);
|
|
|
- esIndexJobs.createIndex(c_index);
|
|
|
- }
|
|
|
-
|
|
|
- private void switchIndex(String targetIndexPre, String summary_company, String type, String targetIndexSuf, String tn) throws IOException {
|
|
|
- String index = Stream.of(targetIndexPre, type, tn, targetIndexSuf)
|
|
|
- .filter(StringUtils::isNotBlank)
|
|
|
- .collect(Collectors.joining("_"));
|
|
|
- String alias = Stream.of(targetIndexPre, type, tn)
|
|
|
- .filter(StringUtils::isNotBlank)
|
|
|
- .collect(Collectors.joining("_"));
|
|
|
- List<Alias> addList = Stream.of(
|
|
|
- Alias.builder().alias(summary_company).index(index).build(),
|
|
|
- Alias.builder().alias(alias).index(index).build()
|
|
|
- ).collect(Collectors.toList());
|
|
|
- List<Alias> removeList = esIndexJobs.getAliases(Stream.of(targetIndexPre, type, tn).filter(StringUtils::isNotBlank).collect(Collectors.joining("_")))
|
|
|
- .stream().filter(x -> !x.getIndex().contains(targetIndexSuf)).collect(Collectors.toList());
|
|
|
- esIndexJobs.addRemoveAliases(addList, removeList);
|
|
|
- //删除多余索引
|
|
|
- removeList.stream().filter(x -> !x.getIndex().contains(targetIndexSuf))
|
|
|
- .map(Alias::getIndex).distinct().forEach(i -> {
|
|
|
- try {
|
|
|
- esIndexJobs.deletedIndex(i);
|
|
|
- } catch (Exception e) {
|
|
|
- e.printStackTrace();
|
|
|
- log.error("deleted index error {}", i);
|
|
|
- System.exit(-1);
|
|
|
- }
|
|
|
- });
|
|
|
- }
|
|
|
-}
|
|
|
+//package com.winhc.task.job;
|
|
|
+//
|
|
|
+//import cn.hutool.core.lang.Tuple;
|
|
|
+//import com.alibaba.hologres.client.HoloClient;
|
|
|
+//import com.alibaba.hologres.client.exception.HoloClientException;
|
|
|
+//import com.winhc.task.bean.Alias;
|
|
|
+//import com.winhc.task.bean.JobArgs;
|
|
|
+//import com.winhc.task.common.Constant;
|
|
|
+//import com.winhc.task.common.SummaryArgs;
|
|
|
+//import com.winhc.task.util.FreeMarkUtil;
|
|
|
+//import com.winhc.task.util.HoloUtils;
|
|
|
+//import lombok.extern.slf4j.Slf4j;
|
|
|
+//import org.apache.commons.lang3.StringUtils;
|
|
|
+//import org.springframework.beans.factory.annotation.Autowired;
|
|
|
+//import org.springframework.stereotype.Service;
|
|
|
+//
|
|
|
+//import java.io.IOException;
|
|
|
+//import java.util.*;
|
|
|
+//import java.util.stream.Collectors;
|
|
|
+//import java.util.stream.Stream;
|
|
|
+//
|
|
|
+//@Slf4j
|
|
|
+//@Service
|
|
|
+//public class CalcSummaryJob {
|
|
|
+// @Autowired
|
|
|
+// SingleSynHoloJob holoSynJobs;
|
|
|
+// @Autowired
|
|
|
+// MultipleSynHoloJob multipleSynHoloJob;
|
|
|
+// @Autowired
|
|
|
+// EsIndexJobs esIndexJobs;
|
|
|
+// @Autowired
|
|
|
+// FreeMarkUtil freeMarkUtil;
|
|
|
+//
|
|
|
+// public void start(List<String> list, JobArgs jobArgs, Boolean synData) throws HoloClientException, IOException {
|
|
|
+//
|
|
|
+// String summary_company = "ng_rt_summary_company";
|
|
|
+// String summary_person = "ng_rt_summary_person";
|
|
|
+//
|
|
|
+// ArrayList<String> tnList = new ArrayList<>();
|
|
|
+// //初始化sql
|
|
|
+// List<Tuple> collect = initSql(list, jobArgs, tnList);
|
|
|
+// //计算摘要
|
|
|
+// if (synData) {
|
|
|
+// calcSummary(jobArgs, collect);
|
|
|
+// }
|
|
|
+// //建索引
|
|
|
+// createIndex(jobArgs, tnList);
|
|
|
+// //同步数据
|
|
|
+// if (synData) {
|
|
|
+// multipleSynHoloJob.start(tnList, jobArgs);
|
|
|
+// }
|
|
|
+// //切换个人,公司索引到生产
|
|
|
+// //switch_index(jobArgs, summary_company, summary_person, tnList);
|
|
|
+//
|
|
|
+// }
|
|
|
+//
|
|
|
+// private void switch_index(JobArgs jobArgs, String summary_company, String summary_person, ArrayList<String> tnList) throws IOException {
|
|
|
+// for (String tn : tnList) {
|
|
|
+// if (Constant.PERSON_SUMMARYS.contains(tn)) {
|
|
|
+// switchIndex(jobArgs.getTargetIndexPrefix(), summary_person, "person", jobArgs.getTargetIndexSuffix(), tn);
|
|
|
+// }
|
|
|
+// switchIndex(jobArgs.getTargetIndexPrefix(), summary_company, "", jobArgs.getTargetIndexSuffix(), tn);
|
|
|
+// }
|
|
|
+// }
|
|
|
+//
|
|
|
+// private void createIndex(JobArgs jobArgs, ArrayList<String> tnList) {
|
|
|
+// for (String tn : tnList) {
|
|
|
+// try {
|
|
|
+// log.info("start create index tn : {} ", tn);
|
|
|
+// if (Constant.PERSON_SUMMARYS.contains(tn)) {
|
|
|
+// tranIndex(jobArgs, tn, "person");
|
|
|
+// }
|
|
|
+// tranIndex(jobArgs, tn, "");
|
|
|
+// } catch (Exception e) {
|
|
|
+// log.error("create index error : {} ,tn : {}", e.getMessage(), tn);
|
|
|
+// }
|
|
|
+// }
|
|
|
+// }
|
|
|
+//
|
|
|
+// private void calcSummary(JobArgs jobArgs, List<Tuple> collect) throws HoloClientException {
|
|
|
+// for (int i = 0; i < collect.size(); i++) {
|
|
|
+// HoloClient holoClient = HoloUtils.init();
|
|
|
+// String s0 = "";
|
|
|
+// //初始化清空table
|
|
|
+// if (i == 0) {
|
|
|
+// s0 = "truncate table " + jobArgs.getHoloTable();
|
|
|
+// }
|
|
|
+// String tn = collect.get(i).get(0).toString();
|
|
|
+// String s1 = "analyze " + jobArgs.getHoloTable();
|
|
|
+// String s2 = "analyze ng_rt_" + tn;
|
|
|
+// Stream.of(s0, s1, s2, collect.get(i).get(1)).filter(StringUtils::isNotBlank).forEach(x -> HoloUtils.exexSql(holoClient, x));
|
|
|
+// holoClient.close();
|
|
|
+// }
|
|
|
+// }
|
|
|
+//
|
|
|
+// private List<Tuple> initSql(List<String> list, JobArgs jobArgs, ArrayList<String> tnList) {
|
|
|
+// return SummaryArgs.SUMMARY_ARGS.entrySet()
|
|
|
+// .stream()
|
|
|
+// .filter(x -> (list.isEmpty() || list.contains(x.getKey())))
|
|
|
+// .map(x -> {
|
|
|
+// String tn = x.getKey();
|
|
|
+// SummaryArgs args = x.getValue();
|
|
|
+// tnList.add(tn);
|
|
|
+// String p_sql = args.getArgsInfo().stream().map(a -> {
|
|
|
+// Map<String, String> m = new HashMap<>();
|
|
|
+// m.put("holoTable", jobArgs.getHoloTable());
|
|
|
+// m.put("tn", tn);
|
|
|
+// m.put("keyno", a.getFilterField());
|
|
|
+// if (StringUtils.isNotBlank(a.getCategory())) {
|
|
|
+// m.put("category", "_" + a.getCategory());
|
|
|
+// } else {
|
|
|
+// m.put("category", "");
|
|
|
+// }
|
|
|
+// m.put("groupField", a.getGroupField());
|
|
|
+// m.put("condition", "in ('0','1')");
|
|
|
+// return freeMarkUtil.genStr("agg_sample_v1.ftl", m);
|
|
|
+// }).collect(Collectors.joining("\nUNION ALL\n"));
|
|
|
+// Map<String, String> m2 = new HashMap<>();
|
|
|
+// m2.put("table_view", p_sql);
|
|
|
+// m2.put("tn", tn);
|
|
|
+// String calc_sql = freeMarkUtil.genStr("agg_sample_v2.ftl", m2);
|
|
|
+// return new Tuple(tn, calc_sql);
|
|
|
+// }).collect(Collectors.toList());
|
|
|
+// }
|
|
|
+//
|
|
|
+// private void tranIndex(JobArgs jobArgs, String tn, String type) throws IOException {
|
|
|
+// String c_index = Stream.of(jobArgs.getTargetIndexPrefix(), type, tn, jobArgs.getTargetIndexSuffix())
|
|
|
+// .filter(StringUtils::isNotBlank).collect(Collectors.joining("_"));
|
|
|
+// //esIndexJobs.deletedIndex(c_index);
|
|
|
+// esIndexJobs.createIndex(c_index);
|
|
|
+// }
|
|
|
+//
|
|
|
+// private void switchIndex(String targetIndexPre, String summary_company, String type, String targetIndexSuf, String tn) throws IOException {
|
|
|
+// String index = Stream.of(targetIndexPre, type, tn, targetIndexSuf)
|
|
|
+// .filter(StringUtils::isNotBlank)
|
|
|
+// .collect(Collectors.joining("_"));
|
|
|
+// String alias = Stream.of(targetIndexPre, type, tn)
|
|
|
+// .filter(StringUtils::isNotBlank)
|
|
|
+// .collect(Collectors.joining("_"));
|
|
|
+// List<Alias> addList = Stream.of(
|
|
|
+// Alias.builder().alias(summary_company).index(index).build(),
|
|
|
+// Alias.builder().alias(alias).index(index).build()
|
|
|
+// ).collect(Collectors.toList());
|
|
|
+// List<Alias> removeList = esIndexJobs.getAliases(Stream.of(targetIndexPre, type, tn).filter(StringUtils::isNotBlank).collect(Collectors.joining("_")))
|
|
|
+// .stream().filter(x -> !x.getIndex().contains(targetIndexSuf)).collect(Collectors.toList());
|
|
|
+// esIndexJobs.addRemoveAliases(addList, removeList);
|
|
|
+// //删除多余索引
|
|
|
+// removeList.stream().filter(x -> !x.getIndex().contains(targetIndexSuf))
|
|
|
+// .map(Alias::getIndex).distinct().forEach(i -> {
|
|
|
+// try {
|
|
|
+// esIndexJobs.deletedIndex(i);
|
|
|
+// } catch (Exception e) {
|
|
|
+// e.printStackTrace();
|
|
|
+// log.error("deleted index error {}", i);
|
|
|
+// System.exit(-1);
|
|
|
+// }
|
|
|
+// });
|
|
|
+// }
|
|
|
+//}
|