|
@@ -0,0 +1,280 @@
|
|
|
|
+package com.winhc.max.compute.graph.job.holder;
|
|
|
|
+
|
|
|
|
+import com.aliyun.odps.graph.ComputeContext;
|
|
|
|
+import com.aliyun.odps.graph.Edge;
|
|
|
|
+import com.aliyun.odps.graph.Vertex;
|
|
|
|
+import com.aliyun.odps.graph.WorkerContext;
|
|
|
|
+import com.aliyun.odps.io.*;
|
|
|
|
+import com.aliyun.odps.utils.StringUtils;
|
|
|
|
+import com.google.common.collect.Sets;
|
|
|
|
+import com.google.gson.Gson;
|
|
|
|
+import com.winhc.max.compute.graph.job.holder.entity.*;
|
|
|
|
+import com.winhc.max.compute.graph.util.WritableRecordExtensions;
|
|
|
|
+import lombok.experimental.ExtensionMethod;
|
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
|
+
|
|
|
|
+import java.io.IOException;
|
|
|
|
+import java.util.*;
|
|
|
|
+import java.util.stream.Collectors;
|
|
|
|
+
|
|
|
|
+import static com.winhc.max.compute.graph.util.BaseUtils.isWindows;
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+@ExtensionMethod({
|
|
|
|
+ WritableRecordExtensions.class
|
|
|
|
+})
|
|
|
|
+@Slf4j
|
|
|
|
+public class HolderRelationGroupVertex extends
|
|
|
|
+ Vertex<Text, HolderRelationGroupVertexValue, HolderRelationGroupEdge, HolderRelationGroupMsg> {
|
|
|
|
+
|
|
|
|
+ private static Integer node_limit;
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public void setup(WorkerContext<Text, HolderRelationGroupVertexValue, HolderRelationGroupEdge, HolderRelationGroupMsg> context) throws IOException {
|
|
|
|
+ node_limit = context.getConfiguration().getInt("node_limit", 100);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public void compute(ComputeContext<Text, HolderRelationGroupVertexValue, HolderRelationGroupEdge, HolderRelationGroupMsg> context, Iterable<HolderRelationGroupMsg> messages) throws IOException {
|
|
|
|
+
|
|
|
|
+ context.aggregate(NullWritable.get());
|
|
|
|
+
|
|
|
|
+ List<Edge<Text, HolderRelationGroupEdge>> edges = getEdges();
|
|
|
|
+
|
|
|
|
+ String currentVertexId = getId().toString();
|
|
|
|
+
|
|
|
|
+ //初始化链路
|
|
|
|
+ VertexCalcInfo info = VertexCalcInfo.getVertexCalcInfo(getId().toString(), getValue(), edges);
|
|
|
|
+
|
|
|
|
+ //消息传递
|
|
|
|
+ if (getValue().isSendMsg()) {
|
|
|
|
+ for (String destVertexId : info.getCompanyIds()) {
|
|
|
|
+ if (destVertexId.equals(currentVertexId)) {
|
|
|
|
+ continue;
|
|
|
|
+ }
|
|
|
|
+ if (StringUtils.isNotBlank(currentVertexId) && StringUtils.isNotBlank(destVertexId)) {
|
|
|
|
+
|
|
|
|
+ HolderRelationGroupMsg msg = HolderRelationGroupMsg.of(currentVertexId, destVertexId, info.getExtendChains()
|
|
|
|
+ , info.getCompanyIds());
|
|
|
|
+ context.sendMessage(new Text(destVertexId), msg);
|
|
|
|
+ context.aggregate(new BooleanWritable(true));
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ //todo 判断是否有新增传递和达到层数限制
|
|
|
|
+ getValue().setEndFlag();
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ HolderRelationGroupVertexValue currentVertexValue = getValue();
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ if (info.getCompanyIds().size() <= node_limit) {
|
|
|
|
+ //消息合并
|
|
|
|
+ mergeMsgChain(currentVertexValue, currentVertexId, info.getExtendChains(), messages);
|
|
|
|
+
|
|
|
|
+ List<Set<String>> currentExtendChains = tuple2List(currentVertexValue.getExtendChains());
|
|
|
|
+ List<String> currentCompanyIds = tuple2List2(currentVertexValue.getCompanyIds());
|
|
|
|
+ //List<String> currentHolderChains = tuple2List2(currentVertexValue.getHolderChains());
|
|
|
|
+
|
|
|
|
+ if (currentExtendChains.size() > 0) {
|
|
|
|
+ //发送相邻节点消息
|
|
|
|
+ List<String> res = currentExtendChains.get(0).stream()
|
|
|
|
+ .distinct()
|
|
|
|
+ .filter(Objects::nonNull)
|
|
|
|
+ .filter(e -> !e.equals(currentVertexId)).collect(Collectors.toList());
|
|
|
|
+ for (String destVertexId : res) {
|
|
|
|
+ if (StringUtils.isNotBlank(currentVertexId) && StringUtils.isNotBlank(destVertexId)) {
|
|
|
|
+ HolderRelationGroupMsg msg = HolderRelationGroupMsg.of(currentVertexId, destVertexId, currentExtendChains
|
|
|
|
+ , new HashSet<>(currentCompanyIds));
|
|
|
|
+ context.sendMessage(new Text(destVertexId), msg);
|
|
|
|
+ context.aggregate(new BooleanWritable(true));
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ voteToHalt();
|
|
|
|
+
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ /**
|
|
|
|
+ * 链路合并
|
|
|
|
+ */
|
|
|
|
+ public static void mergeMsgChain(HolderRelationGroupVertexValue currentVertexValue, String currentVertexId, List<Set<String>> currentExtendChains, Iterable<HolderRelationGroupMsg> messages) {
|
|
|
|
+
|
|
|
|
+ //合并消息
|
|
|
|
+ for (HolderRelationGroupMsg message : messages) {
|
|
|
|
+ if (message == null || message.getSendVertexId() == null) {
|
|
|
|
+ continue;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ String sendVertexId = message.getSendVertexId().toString();
|
|
|
|
+
|
|
|
|
+ List<Set<String>> messageExtendChains = tuple2List(message.getExtendChains());
|
|
|
|
+ if(isWindows()){
|
|
|
|
+ System.out.println(currentVertexId + "----" + currentExtendChains.toString() + "----" + sendVertexId + "----" + messageExtendChains.toString());
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ List<String> currentIds = tuple2List2(currentVertexValue.getCompanyIds());
|
|
|
|
+ List<String> messageIds = tuple2List2(message.getCompanyIds());
|
|
|
|
+
|
|
|
|
+ //获取id集合
|
|
|
|
+
|
|
|
|
+ Set<String> difference = Sets.difference(new HashSet<>(messageIds), new HashSet<>(currentIds));
|
|
|
|
+ //判断是否更新 重合则过滤
|
|
|
|
+ if (difference.isEmpty()) {
|
|
|
|
+ continue;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ for (int i = 0; i < messageExtendChains.size(); i++) {
|
|
|
|
+ Set<String> messageHead = messageExtendChains.get(0);
|
|
|
|
+
|
|
|
|
+ if (!messageHead.contains(currentVertexId)) {
|
|
|
|
+ break;
|
|
|
|
+ }
|
|
|
|
+ //加入发送消息顶点
|
|
|
|
+ if (i == 0) {
|
|
|
|
+ if(currentExtendChains.size() == 0){
|
|
|
|
+ currentExtendChains.add(new HashSet<>(Collections.singleton(sendVertexId)));
|
|
|
|
+ }else {
|
|
|
|
+ currentExtendChains.get(0).add(sendVertexId);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ //加入其他层数节点
|
|
|
|
+ Set<String> collect = messageExtendChains.get(i)
|
|
|
|
+ .stream()
|
|
|
|
+ .filter(s -> !s.equals(currentVertexId) /*&& !processIds.contains(s)*/)
|
|
|
|
+ .collect(Collectors.toSet());
|
|
|
|
+
|
|
|
|
+ int size = currentExtendChains.size();
|
|
|
|
+
|
|
|
|
+ if (collect.size() > 0) {
|
|
|
|
+
|
|
|
|
+ if (size - 1 > i) {
|
|
|
|
+ currentExtendChains.get(i + 1).addAll(collect);
|
|
|
|
+ } else {
|
|
|
|
+ currentExtendChains.add(collect);
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if(isWindows()){
|
|
|
|
+ System.out.println("merge after ----" + currentVertexId + "---" + currentExtendChains);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ //重复节点合并取最短路径
|
|
|
|
+ List<Set<String>> merge_set_all = new ArrayList<>();
|
|
|
|
+ Set<String> tmp_set_all = new HashSet<>();
|
|
|
|
+
|
|
|
|
+ for (Set<String> currentExtendChain : currentExtendChains) {
|
|
|
|
+ Set<String> tmp_set_inner = new HashSet<>();
|
|
|
|
+ for (String id : currentExtendChain) {
|
|
|
|
+ if (tmp_set_all.contains(id)) {
|
|
|
|
+ continue;
|
|
|
|
+ }
|
|
|
|
+ tmp_set_inner.add(id);
|
|
|
|
+ }
|
|
|
|
+ if (tmp_set_inner.size() > 0) {
|
|
|
|
+ merge_set_all.add(tmp_set_inner);
|
|
|
|
+ tmp_set_all.addAll(tmp_set_inner);
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ if(isWindows()){
|
|
|
|
+ System.out.println("路径优化: " + currentVertexId + "----" + currentExtendChains.toString() + "----" + merge_set_all.toString());
|
|
|
|
+ }
|
|
|
|
+ //转换
|
|
|
|
+ currentExtendChains.clear();
|
|
|
|
+ currentExtendChains.addAll(merge_set_all);
|
|
|
|
+
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ List<String> ids = currentExtendChains
|
|
|
|
+ .stream().flatMap(Collection::stream)
|
|
|
|
+ .distinct().collect(Collectors.toList());
|
|
|
|
+ ids.add(currentVertexId);
|
|
|
|
+
|
|
|
|
+ //顶点 todo
|
|
|
|
+ //currentVertexValue.setCompanyIds(list2Tuple2(ids));
|
|
|
|
+
|
|
|
|
+ //关系
|
|
|
|
+ currentVertexValue.setExtendChains(list2Tuple(currentExtendChains));
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public static List<Set<String>> tuple2List(Tuple tuple) {
|
|
|
|
+ if (tuple == null) {
|
|
|
|
+ return new ArrayList<>();
|
|
|
|
+ }
|
|
|
|
+ List<Set<String>> set = tuple.getAll().stream().filter(Objects::nonNull)
|
|
|
|
+ .map(e -> ((Tuple) e).getAll().stream().filter(Objects::nonNull)
|
|
|
|
+ .map(e1 -> ((Text) e1).toString()).collect(Collectors.toSet()))
|
|
|
|
+ .collect(Collectors.toList());
|
|
|
|
+ return set;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public static List<String> tuple2List2(Tuple tuple) {
|
|
|
|
+ if (tuple == null) {
|
|
|
|
+ return new ArrayList<>();
|
|
|
|
+ }
|
|
|
|
+ List<String> collect = tuple.getAll().stream().filter(Objects::nonNull).map(Object::toString).distinct().collect(Collectors.toList());
|
|
|
|
+ return collect;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public static Tuple list2Tuple(List<Set<String>> list) {
|
|
|
|
+ if (list == null || list.isEmpty()) {
|
|
|
|
+ return new Tuple();
|
|
|
|
+ }
|
|
|
|
+ Tuple out = new Tuple();
|
|
|
|
+ list.forEach(e -> {
|
|
|
|
+ Tuple out2 = new Tuple();
|
|
|
|
+ for (String ele : e) {
|
|
|
|
+ out2.append(new Text(ele));
|
|
|
|
+ }
|
|
|
|
+ out.append(out2);
|
|
|
|
+ });
|
|
|
|
+ return out;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ public static Tuple list2Tuple2(List<String> list) {
|
|
|
|
+ if (list == null || list.isEmpty()) {
|
|
|
|
+ return new Tuple();
|
|
|
|
+ }
|
|
|
|
+ Tuple out = new Tuple();
|
|
|
|
+ list.forEach(e -> {
|
|
|
|
+ out.append(new Text(e));
|
|
|
|
+ });
|
|
|
|
+ return out;
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ @Override
|
|
|
|
+ public void cleanup(WorkerContext<Text, HolderRelationGroupVertexValue, HolderRelationGroupEdge, HolderRelationGroupMsg> context) throws IOException {
|
|
|
|
+
|
|
|
|
+ //todo 判断是否完成
|
|
|
|
+
|
|
|
|
+ //输出
|
|
|
|
+ Text current_id = getId();
|
|
|
|
+ Text company_name = getValue().getCompanyName();
|
|
|
|
+
|
|
|
|
+ Tuple companyIds = getValue().getCompanyIds();
|
|
|
|
+
|
|
|
|
+ Tuple extendChains = getValue().getExtendChains();
|
|
|
|
+
|
|
|
|
+ String ids = companyIds.getAll().stream().map(Object::toString)
|
|
|
|
+ .sorted(Comparator.naturalOrder())
|
|
|
|
+ .collect(Collectors.joining(","));
|
|
|
|
+
|
|
|
|
+ List<Set<String>> sets = tuple2List(extendChains);
|
|
|
|
+
|
|
|
|
+ //合并输出
|
|
|
|
+ context.write(current_id
|
|
|
|
+ , company_name
|
|
|
|
+ , new Text(ids)
|
|
|
|
+ , new Text(new Gson().toJson(sets))
|
|
|
|
+ );
|
|
|
|
+
|
|
|
|
+ }
|
|
|
|
+}
|