|
@@ -4,10 +4,7 @@ import com.aliyun.odps.graph.ComputeContext;
|
|
import com.aliyun.odps.graph.Edge;
|
|
import com.aliyun.odps.graph.Edge;
|
|
import com.aliyun.odps.graph.Vertex;
|
|
import com.aliyun.odps.graph.Vertex;
|
|
import com.aliyun.odps.graph.WorkerContext;
|
|
import com.aliyun.odps.graph.WorkerContext;
|
|
-import com.aliyun.odps.io.BooleanWritable;
|
|
|
|
-import com.aliyun.odps.io.LongWritable;
|
|
|
|
-import com.aliyun.odps.io.NullWritable;
|
|
|
|
-import com.aliyun.odps.io.Text;
|
|
|
|
|
|
+import com.aliyun.odps.io.*;
|
|
import com.aliyun.odps.utils.StringUtils;
|
|
import com.aliyun.odps.utils.StringUtils;
|
|
import com.google.gson.Gson;
|
|
import com.google.gson.Gson;
|
|
import com.winhc.max.compute.graph.job.enterprise_group.entity.EntGroupMsg;
|
|
import com.winhc.max.compute.graph.job.enterprise_group.entity.EntGroupMsg;
|
|
@@ -34,12 +31,15 @@ public class EnterpriseGroupVertex extends
|
|
Vertex<Text, EnterpriseGroupVertexValue, HolderEdge, EntGroupMsg> {
|
|
Vertex<Text, EnterpriseGroupVertexValue, HolderEdge, EntGroupMsg> {
|
|
|
|
|
|
private static Gson gson;
|
|
private static Gson gson;
|
|
|
|
+ private static ProdEnvDebug prodEnvDebug;
|
|
|
|
|
|
@Override
|
|
@Override
|
|
public void setup(WorkerContext<Text, EnterpriseGroupVertexValue, HolderEdge, EntGroupMsg> context) throws IOException {
|
|
public void setup(WorkerContext<Text, EnterpriseGroupVertexValue, HolderEdge, EntGroupMsg> context) throws IOException {
|
|
gson = new Gson();
|
|
gson = new Gson();
|
|
|
|
+ prodEnvDebug = ProdEnvDebug.build(context.getConfiguration());
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+
|
|
@Override
|
|
@Override
|
|
public void compute(ComputeContext<Text, EnterpriseGroupVertexValue, HolderEdge, EntGroupMsg> context, Iterable<EntGroupMsg> messages) throws IOException {
|
|
public void compute(ComputeContext<Text, EnterpriseGroupVertexValue, HolderEdge, EntGroupMsg> context, Iterable<EntGroupMsg> messages) throws IOException {
|
|
context.aggregate(NullWritable.get());
|
|
context.aggregate(NullWritable.get());
|
|
@@ -61,6 +61,7 @@ public class EnterpriseGroupVertex extends
|
|
//获取下游目标节点
|
|
//获取下游目标节点
|
|
List<Edge<Text, HolderEdge>> edges = getEdges();
|
|
List<Edge<Text, HolderEdge>> edges = getEdges();
|
|
majorityShareholder = edges.getMajorityShareholder(list);
|
|
majorityShareholder = edges.getMajorityShareholder(list);
|
|
|
|
+
|
|
thisVertexHoldKeyno = new ArrayList<>(majorityShareholder.keySet());
|
|
thisVertexHoldKeyno = new ArrayList<>(majorityShareholder.keySet());
|
|
thisVertexHoldCompanyId = thisVertexHoldKeyno.stream().filter(e -> e.length() == 32).collect(Collectors.toList());
|
|
thisVertexHoldCompanyId = thisVertexHoldKeyno.stream().filter(e -> e.length() == 32).collect(Collectors.toList());
|
|
thisVertexOtherHolderKeyno = edges.getOtherHolder(thisVertexHoldKeyno);
|
|
thisVertexOtherHolderKeyno = edges.getOtherHolder(thisVertexHoldKeyno);
|
|
@@ -74,13 +75,26 @@ public class EnterpriseGroupVertex extends
|
|
if (!collect.isEmpty()) {
|
|
if (!collect.isEmpty()) {
|
|
thisVertexHoldCompanyId = thisVertexHoldCompanyId.stream().filter(e -> !collect.contains(e)).collect(Collectors.toList());
|
|
thisVertexHoldCompanyId = thisVertexHoldCompanyId.stream().filter(e -> !collect.contains(e)).collect(Collectors.toList());
|
|
}
|
|
}
|
|
|
|
+
|
|
|
|
+ //todo:如果该节点需要拆分子节点,则不向下传递
|
|
|
|
+ if (getValue().getManualStop().get()) {
|
|
|
|
+ thisVertexHoldCompanyId.clear();
|
|
|
|
+ throw new RuntimeException("vId: " + getId() + "\n" + getValue().toString());
|
|
|
|
+ }
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+
|
|
|
|
+ //todo:debug
|
|
|
|
+ prodEnvDebug.debug_1(getId()
|
|
|
|
+ , "thisVertexHoldCompanyId: {}\nmajorityShareholder: {}"
|
|
|
|
+ , thisVertexHoldCompanyId, majorityShareholder
|
|
|
|
+ );
|
|
|
|
+
|
|
//将本节点传递到下一节点
|
|
//将本节点传递到下一节点
|
|
if (!getValue().isSendMsg()) {
|
|
if (!getValue().isSendMsg()) {
|
|
for (String destVertexId : thisVertexHoldCompanyId) {
|
|
for (String destVertexId : thisVertexHoldCompanyId) {
|
|
//传递控股企业
|
|
//传递控股企业
|
|
- EntGroupMsg entGroupMsg = EntGroupMsg.ofByType_1(getId(), getValue().getCompanyName(), thisVertexOtherHolderCompanyId);
|
|
|
|
|
|
+ EntGroupMsg entGroupMsg = EntGroupMsg.ofByType_1(getId(), getValue().getCompanyName(), getValue().getCompanyType(), thisVertexOtherHolderCompanyId);
|
|
boolean flag = entGroupMsg.routeLog(destVertexId);
|
|
boolean flag = entGroupMsg.routeLog(destVertexId);
|
|
if (!flag) {
|
|
if (!flag) {
|
|
// getValue().setOutput();
|
|
// getValue().setOutput();
|
|
@@ -154,14 +168,22 @@ public class EnterpriseGroupVertex extends
|
|
List<String> holdCompanyId = vertexEntity.getHoldCompanyIds().tuple2List();
|
|
List<String> holdCompanyId = vertexEntity.getHoldCompanyIds().tuple2List();
|
|
List<String> groupInvestmentCompanyId = vertexEntity.getGroupInvestmentCompanyIds().tuple2List();
|
|
List<String> groupInvestmentCompanyId = vertexEntity.getGroupInvestmentCompanyIds().tuple2List();
|
|
List<String> groupHolderCompanyId = vertexEntity.getGroupHolderCompanyIds().tuple2List();
|
|
List<String> groupHolderCompanyId = vertexEntity.getGroupHolderCompanyIds().tuple2List();
|
|
|
|
+ List<String> subgroup = vertexEntity.getSubgroup().tuple2List();
|
|
List<String> stockRightControlChain = Arrays.stream(vertexEntity.getStockRightControlChain().toString().split(",")).collect(Collectors.toList());
|
|
List<String> stockRightControlChain = Arrays.stream(vertexEntity.getStockRightControlChain().toString().split(",")).collect(Collectors.toList());
|
|
|
|
|
|
|
|
+ if (subgroup == null) {
|
|
|
|
+ subgroup = new ArrayList<>();
|
|
|
|
+ }
|
|
|
|
+
|
|
for (EntGroupMsg message : messages) {
|
|
for (EntGroupMsg message : messages) {
|
|
holdNum += message.getHoldNum().get();
|
|
holdNum += message.getHoldNum().get();
|
|
holdCompanyId.addAll(message.getHoldCompanyIds().tuple2List());
|
|
holdCompanyId.addAll(message.getHoldCompanyIds().tuple2List());
|
|
groupHolderCompanyId.addAll(message.getGroupHolderCompanyIds().tuple2List());
|
|
groupHolderCompanyId.addAll(message.getGroupHolderCompanyIds().tuple2List());
|
|
groupInvestmentCompanyId.addAll(message.getGroupInvestmentCompanyIds().tuple2List());
|
|
groupInvestmentCompanyId.addAll(message.getGroupInvestmentCompanyIds().tuple2List());
|
|
stockRightControlChain.add(message.getStockRightControlChain().toString());
|
|
stockRightControlChain.add(message.getStockRightControlChain().toString());
|
|
|
|
+ if (message.getSourceVertexId() != null && StringUtils.isNotBlank(message.getSourceVertexId().toString())) {
|
|
|
|
+ subgroup.add(message.getSourceVertexId().toString());
|
|
|
|
+ }
|
|
}
|
|
}
|
|
Text groupControllerKeyno = new Text();
|
|
Text groupControllerKeyno = new Text();
|
|
if (!thisVertexHoldKeyno.isEmpty()) {
|
|
if (!thisVertexHoldKeyno.isEmpty()) {
|
|
@@ -176,6 +198,7 @@ public class EnterpriseGroupVertex extends
|
|
vertexEntity.setGroupHolderCompanyIds(groupHolderCompanyId);
|
|
vertexEntity.setGroupHolderCompanyIds(groupHolderCompanyId);
|
|
vertexEntity.setGroupControllerKeyno(groupControllerKeyno);
|
|
vertexEntity.setGroupControllerKeyno(groupControllerKeyno);
|
|
vertexEntity.setStockRightControlChain(new Text(collect));
|
|
vertexEntity.setStockRightControlChain(new Text(collect));
|
|
|
|
+ vertexEntity.setSubgroup(subgroup.list2Tuple());
|
|
vertexEntity.setOutput();
|
|
vertexEntity.setOutput();
|
|
}
|
|
}
|
|
|
|
|
|
@@ -186,6 +209,12 @@ public class EnterpriseGroupVertex extends
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ //todo:debug
|
|
|
|
+ prodEnvDebug.debug_2(getId()
|
|
|
|
+ , "vertex value:\n{}"
|
|
|
|
+ , getValue()
|
|
|
|
+ );
|
|
|
|
+
|
|
//判断是否是集团企业
|
|
//判断是否是集团企业
|
|
// if (hasEdges()) {
|
|
// if (hasEdges()) {
|
|
// List<Edge<Text, HolderEdge>> edges = getEdges();
|
|
// List<Edge<Text, HolderEdge>> edges = getEdges();
|
|
@@ -199,7 +228,7 @@ public class EnterpriseGroupVertex extends
|
|
// }
|
|
// }
|
|
// }
|
|
// }
|
|
|
|
|
|
- if(!getValue().isOutput()){
|
|
|
|
|
|
+ if (!getValue().isOutput()) {
|
|
return;
|
|
return;
|
|
}
|
|
}
|
|
|
|
|
|
@@ -214,6 +243,13 @@ public class EnterpriseGroupVertex extends
|
|
Text group_controller_keyno = getValue().getGroupControllerKeyno();
|
|
Text group_controller_keyno = getValue().getGroupControllerKeyno();
|
|
Text stockRightControlChain = getValue().getStockRightControlChain();
|
|
Text stockRightControlChain = getValue().getStockRightControlChain();
|
|
|
|
|
|
|
|
+ List<String> subgroupList = getValue().getSubgroup().tuple2List();
|
|
|
|
+ Text subgroup = new Text();
|
|
|
|
+ if (getValue().getCompanyName().toString().contains("集团")) {
|
|
|
|
+ subgroup = new Text(subgroupList.stream().filter(e -> !e.equals(getId().toString())).collect(Collectors.joining(",")));
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+
|
|
// System.out.println("save:"
|
|
// System.out.println("save:"
|
|
// + enterprise_group_hold_company_id + ","
|
|
// + enterprise_group_hold_company_id + ","
|
|
// + enterprise_group_hold_company_name + ","
|
|
// + enterprise_group_hold_company_name + ","
|
|
@@ -225,6 +261,9 @@ public class EnterpriseGroupVertex extends
|
|
// + stockRightControlChain
|
|
// + stockRightControlChain
|
|
// );
|
|
// );
|
|
|
|
|
|
|
|
+ if (hold_num.get() < 3) {
|
|
|
|
+ return;
|
|
|
|
+ }
|
|
if (hold_num.get() > 200) {
|
|
if (hold_num.get() > 200) {
|
|
String s = EnterpriseGroupUtils.stockChainTrim(enterprise_group_hold_company_id.toString(), stockRightControlChain.toString(), 10);
|
|
String s = EnterpriseGroupUtils.stockChainTrim(enterprise_group_hold_company_id.toString(), stockRightControlChain.toString(), 10);
|
|
stockRightControlChain = new Text(s);
|
|
stockRightControlChain = new Text(s);
|
|
@@ -240,6 +279,7 @@ public class EnterpriseGroupVertex extends
|
|
, group_holder_company_id
|
|
, group_holder_company_id
|
|
, group_controller_keyno
|
|
, group_controller_keyno
|
|
, stockRightControlChain
|
|
, stockRightControlChain
|
|
|
|
+ , subgroup
|
|
);
|
|
);
|
|
}
|
|
}
|
|
}
|
|
}
|