|
@@ -4,18 +4,24 @@ import com.aliyun.odps.graph.ComputeContext;
|
|
|
import com.aliyun.odps.graph.Edge;
|
|
|
import com.aliyun.odps.graph.Vertex;
|
|
|
import com.aliyun.odps.graph.WorkerContext;
|
|
|
-import com.aliyun.odps.io.*;
|
|
|
+import com.aliyun.odps.io.BooleanWritable;
|
|
|
+import com.aliyun.odps.io.LongWritable;
|
|
|
+import com.aliyun.odps.io.NullWritable;
|
|
|
+import com.aliyun.odps.io.Text;
|
|
|
import com.aliyun.odps.utils.StringUtils;
|
|
|
import com.google.gson.Gson;
|
|
|
import com.winhc.max.compute.graph.job.enterprise_group.entity.EntGroupMsg;
|
|
|
import com.winhc.max.compute.graph.job.enterprise_group.entity.EnterpriseGroupVertexValue;
|
|
|
import com.winhc.max.compute.graph.job.enterprise_group.entity.HolderEdge;
|
|
|
+import com.winhc.max.compute.graph.job.enterprise_group.entity.VertexComputeInfo;
|
|
|
import com.winhc.max.compute.graph.util.WritableRecordExtensions;
|
|
|
import lombok.experimental.ExtensionMethod;
|
|
|
import lombok.extern.slf4j.Slf4j;
|
|
|
|
|
|
import java.io.IOException;
|
|
|
-import java.util.*;
|
|
|
+import java.util.ArrayList;
|
|
|
+import java.util.Arrays;
|
|
|
+import java.util.List;
|
|
|
import java.util.stream.Collectors;
|
|
|
|
|
|
/**
|
|
@@ -39,7 +45,6 @@ public class EnterpriseGroupVertex extends
|
|
|
prodEnvDebug = ProdEnvDebug.build(context.getConfiguration());
|
|
|
}
|
|
|
|
|
|
-
|
|
|
@Override
|
|
|
public void compute(ComputeContext<Text, EnterpriseGroupVertexValue, HolderEdge, EntGroupMsg> context, Iterable<EntGroupMsg> messages) throws IOException {
|
|
|
context.aggregate(NullWritable.get());
|
|
@@ -49,65 +54,38 @@ public class EnterpriseGroupVertex extends
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- List<String> list = getValue().getLegalEntityIds().tuple2List();
|
|
|
+ List<String> legalEntityIds = getValue().getLegalEntityIds().tuple2List();
|
|
|
|
|
|
- //获取需要传递的下一个节点,如无,当前企业则为控股控股公司。
|
|
|
- Map<String, Double> majorityShareholder = new HashMap<>();
|
|
|
- List<String> thisVertexHoldKeyno = new ArrayList<>();
|
|
|
- List<String> thisVertexHoldCompanyId = new ArrayList<>();
|
|
|
- List<String> thisVertexOtherHolderKeyno = new ArrayList<>();
|
|
|
- List<String> thisVertexOtherHolderCompanyId = new ArrayList<>();
|
|
|
- if (hasEdges()) {
|
|
|
- //获取下游目标节点
|
|
|
- List<Edge<Text, HolderEdge>> edges = getEdges();
|
|
|
- majorityShareholder = edges.getMajorityShareholder(list);
|
|
|
-
|
|
|
- thisVertexHoldKeyno = new ArrayList<>(majorityShareholder.keySet());
|
|
|
- thisVertexHoldCompanyId = thisVertexHoldKeyno.stream().filter(e -> e.length() == 32).collect(Collectors.toList());
|
|
|
- thisVertexOtherHolderKeyno = edges.getOtherHolder(thisVertexHoldKeyno);
|
|
|
- thisVertexOtherHolderCompanyId = thisVertexOtherHolderKeyno.stream().filter(e -> e.length() == 32).collect(Collectors.toList());
|
|
|
-
|
|
|
- //todo:如果控股股东中有国有企业,则将国有企业从控股股东中移出
|
|
|
- Set<String> collect = edges.stream().filter(e -> {
|
|
|
- int i = e.getValue().getHolderType().get();
|
|
|
- return i == 2;
|
|
|
- }).map(e -> e.getDestVertexId().toString()).collect(Collectors.toSet());
|
|
|
- if (!collect.isEmpty()) {
|
|
|
- thisVertexHoldCompanyId = thisVertexHoldCompanyId.stream().filter(e -> !collect.contains(e)).collect(Collectors.toList());
|
|
|
- }
|
|
|
|
|
|
- //todo:如果该节点需要拆分子节点,则不向下传递
|
|
|
- if (getValue().getManualStop().get()) {
|
|
|
- thisVertexHoldCompanyId.clear();
|
|
|
- throw new RuntimeException("vId: " + getId() + "\n" + getValue().toString());
|
|
|
- }
|
|
|
- }
|
|
|
+ //获取需要传递的下一个节点,如无,当前企业则为控股控股公司。
|
|
|
+ List<Edge<Text, HolderEdge>> edges = getEdges();
|
|
|
+ VertexComputeInfo vertexInfo = VertexComputeInfo.getVertexInfo(edges, legalEntityIds, getValue().getManualStop().get());
|
|
|
|
|
|
|
|
|
//todo:debug
|
|
|
prodEnvDebug.debug_1(getId()
|
|
|
- , "thisVertexHoldCompanyId: {}\nmajorityShareholder: {}"
|
|
|
- , thisVertexHoldCompanyId, majorityShareholder
|
|
|
+ , "\nvertexInfo: {}\n"
|
|
|
+ , vertexInfo
|
|
|
);
|
|
|
|
|
|
//将本节点传递到下一节点
|
|
|
if (!getValue().isSendMsg()) {
|
|
|
- for (String destVertexId : thisVertexHoldCompanyId) {
|
|
|
+ for (String destVertexId : vertexInfo.getHoldCompanyId()) {
|
|
|
//传递控股企业
|
|
|
- EntGroupMsg entGroupMsg = EntGroupMsg.ofByType_1(getId(), getValue().getCompanyName(), getValue().getCompanyType(), thisVertexOtherHolderCompanyId);
|
|
|
+ EntGroupMsg entGroupMsg = EntGroupMsg.ofByType_1(getId(), getValue().getCompanyName(), getValue().getCompanyType(), vertexInfo.getNotHoldCompanyId());
|
|
|
boolean flag = entGroupMsg.routeLog(destVertexId);
|
|
|
if (!flag) {
|
|
|
// getValue().setOutput();
|
|
|
continue;
|
|
|
}
|
|
|
|
|
|
- entGroupMsg.addStockRightControlChain(getId().toString(), majorityShareholder.get(destVertexId), destVertexId);
|
|
|
+ entGroupMsg.addStockRightControlChain(getId().toString(), vertexInfo.getInvestmentProportion(destVertexId), destVertexId);
|
|
|
|
|
|
context.sendMessage(new Text(destVertexId), entGroupMsg);
|
|
|
context.aggregate(new BooleanWritable(true));
|
|
|
}
|
|
|
|
|
|
- for (String destVertexId : thisVertexOtherHolderCompanyId) {
|
|
|
+ for (String destVertexId : vertexInfo.getNotHoldCompanyId()) {
|
|
|
//传递参股企业
|
|
|
EntGroupMsg entGroupMsg = EntGroupMsg.ofByType_2(getId(), getValue().getCompanyName());
|
|
|
boolean flag = entGroupMsg.routeLog(destVertexId);
|
|
@@ -119,10 +97,10 @@ public class EnterpriseGroupVertex extends
|
|
|
context.aggregate(new BooleanWritable(true));
|
|
|
}
|
|
|
|
|
|
- if (thisVertexHoldCompanyId.isEmpty()) {
|
|
|
+ if (vertexInfo.getHoldCompanyId().isEmpty()) {
|
|
|
//如果没有控股企业,则本节点为集团,需将本节点的参股股东,添加至集团股东
|
|
|
List<String> groupHolderCompanyIds = getValue().getGroupHolderCompanyIds().tuple2List();
|
|
|
- groupHolderCompanyIds.addAll(thisVertexOtherHolderCompanyId);
|
|
|
+ groupHolderCompanyIds.addAll(vertexInfo.getNotHoldCompanyId());
|
|
|
getValue().setGroupHolderCompanyIds(groupHolderCompanyIds);
|
|
|
}
|
|
|
|
|
@@ -131,21 +109,21 @@ public class EnterpriseGroupVertex extends
|
|
|
|
|
|
|
|
|
//是否将消息汇总至该节点
|
|
|
- boolean collect = thisVertexHoldCompanyId.isEmpty();
|
|
|
+ boolean collect = vertexInfo.getHoldCompanyId().isEmpty();
|
|
|
|
|
|
if (collect) {
|
|
|
//无法传递,则在本节点汇总
|
|
|
- mergeMsg2Vertex(getValue(), messages, thisVertexHoldKeyno);
|
|
|
+ mergeMsg2Vertex(getValue(), messages, vertexInfo.getControllerKeyno());
|
|
|
} else {
|
|
|
for (EntGroupMsg message : messages) {
|
|
|
//将本节点获取的消息直接传递至下一节点,只传递到控股企业。不必将接收到的参股企业做区分,直接只往控股企业传递就行
|
|
|
- for (String destVertexId : thisVertexHoldCompanyId) {
|
|
|
+ for (String destVertexId : vertexInfo.getHoldCompanyId()) {
|
|
|
boolean flag = message.routeLog(destVertexId);
|
|
|
if (!flag) {
|
|
|
- mergeMsg2Vertex(getValue(), Arrays.asList(message), Collections.emptyList());
|
|
|
+ mergeMsg2Vertex(getValue(), Arrays.asList(message), null);
|
|
|
continue;
|
|
|
}
|
|
|
- message.addStockRightControlChain(getId().toString(), majorityShareholder.get(destVertexId), destVertexId);
|
|
|
+ message.addStockRightControlChain(getId().toString(), vertexInfo.getInvestmentProportion(destVertexId), destVertexId);
|
|
|
|
|
|
context.sendMessage(new Text(destVertexId), message);
|
|
|
context.aggregate(new BooleanWritable(true));
|
|
@@ -161,9 +139,9 @@ public class EnterpriseGroupVertex extends
|
|
|
*
|
|
|
* @param vertexEntity
|
|
|
* @param messages
|
|
|
- * @param thisVertexHoldKeyno
|
|
|
+ * @param controllerKeyno
|
|
|
*/
|
|
|
- private static void mergeMsg2Vertex(EnterpriseGroupVertexValue vertexEntity, Iterable<EntGroupMsg> messages, List<String> thisVertexHoldKeyno) {
|
|
|
+ private static void mergeMsg2Vertex(EnterpriseGroupVertexValue vertexEntity, Iterable<EntGroupMsg> messages, String controllerKeyno) {
|
|
|
long holdNum = vertexEntity.getHoldNum().get();
|
|
|
List<String> holdCompanyId = vertexEntity.getHoldCompanyIds().tuple2List();
|
|
|
List<String> groupInvestmentCompanyId = vertexEntity.getGroupInvestmentCompanyIds().tuple2List();
|
|
@@ -185,9 +163,11 @@ public class EnterpriseGroupVertex extends
|
|
|
subgroup.add(message.getSourceVertexId().toString());
|
|
|
}
|
|
|
}
|
|
|
- Text groupControllerKeyno = new Text();
|
|
|
- if (!thisVertexHoldKeyno.isEmpty()) {
|
|
|
- groupControllerKeyno = new Text(String.join(",", thisVertexHoldKeyno));
|
|
|
+ Text groupControllerKeyno;
|
|
|
+ if (controllerKeyno != null) {
|
|
|
+ groupControllerKeyno = new Text(controllerKeyno);
|
|
|
+ } else {
|
|
|
+ groupControllerKeyno = new Text();
|
|
|
}
|
|
|
|
|
|
String collect = stockRightControlChain.stream().filter(StringUtils::isNotBlank).collect(Collectors.joining(","));
|