浏览代码

feat: 企业集团

- 增加输出的数据表
- 增加逻辑手动干预图迭代次数
- 加入日志点,用于排查图计算提前终止的问题
许家凯 1 年之前
父节点
当前提交
b9bbbec36c

+ 11 - 1
src/main/java/com/winhc/max/compute/graph/job/enterprise_group/EnterpriseGroupAggregator.java

@@ -4,6 +4,7 @@ import com.aliyun.odps.graph.Aggregator;
 import com.aliyun.odps.graph.WorkerContext;
 import com.aliyun.odps.io.BooleanWritable;
 import com.aliyun.odps.io.NullWritable;
+import com.aliyun.odps.io.Writable;
 import com.winhc.max.compute.graph.job.enterprise_group.entity.EnterpriseGroupAggValue;
 
 import java.io.IOException;
@@ -37,7 +38,16 @@ public class EnterpriseGroupAggregator extends Aggregator<EnterpriseGroupAggValu
 
     @Override
     public boolean terminate(WorkerContext context, EnterpriseGroupAggValue enterpriseGroupAggValue) throws IOException {
-        System.out.println("step: " + context.getSuperstep() + ", iterations:" + enterpriseGroupAggValue.getCount() + " ,flag=" + enterpriseGroupAggValue.getFlag().get());
+        System.out.println("step: " + context.getSuperstep() + ", iterations: " + enterpriseGroupAggValue.getCount() + " ,flag=" + enterpriseGroupAggValue.getFlag().get());
+        Writable lastAggregatedValue = context.getLastAggregatedValue();
+        if (lastAggregatedValue != null) {
+            EnterpriseGroupAggValue lastAgg = ((EnterpriseGroupAggValue) lastAggregatedValue);
+            long count = lastAgg.getCount();
+            if (count > 10) {
+                System.out.println("\tmanual operation...");
+                return false;
+            }
+        }
         return !enterpriseGroupAggValue.getFlag().get();
     }
 }

+ 38 - 13
src/main/java/com/winhc/max/compute/graph/job/enterprise_group/EnterpriseGroupJob.java

@@ -17,19 +17,21 @@ import java.io.IOException;
 public class EnterpriseGroupJob {
 
 
-    private static final String vertexTableName;
-    private static final String edgeTableName;
-    private static final String outputTable;
+    private static final String defaultInputVertexTab;
+    private static final String defaultEdgeInputTab;
+    private static final String defaultOutputParentTab;
+    private static final String defaultOutputMainTab = "calc_enterprise_group_main_tab";
+    private static final String defaultOutputRelationTab = "calc_enterprise_group_relation_tab";
 
     static {
         if (BaseUtils.isWindows()) {
-            vertexTableName = "tmp_xjk_test_enterprise_group_input_vertex";
-            edgeTableName = "tmp_xjk_test_enterprise_group_input_edge";
-            outputTable = "tmp_xjk_test_enterprise_group_out";
+            defaultInputVertexTab = "tmp_xjk_test_enterprise_group_input_vertex";
+            defaultEdgeInputTab = "tmp_xjk_test_enterprise_group_input_edge";
+            defaultOutputParentTab = "tmp_xjk_test_enterprise_group_out";
         } else {
-            vertexTableName = "tmp_xjk_enterprise_group_input_vertex";
-            edgeTableName = "tmp_xjk_enterprise_group_input_edge";
-            outputTable = "tmp_xjk_enterprise_group_out";
+            defaultInputVertexTab = "calc_enterprise_group_input_vertex";
+            defaultEdgeInputTab = "calc_enterprise_group_input_edge";
+            defaultOutputParentTab = "calc_enterprise_group_out_parent_tab";
         }
     }
 
@@ -41,6 +43,13 @@ public class EnterpriseGroupJob {
         String numWorkers = parameterTool.getOrDefault("numWorkers", "70");
         String workerCPU = parameterTool.getOrDefault("workerCPU", "2");
         String workerMem = parameterTool.getOrDefault("workerMem", "8192");
+
+        String inputVertexTab = parameterTool.getOrDefault("inputVertexTab", defaultInputVertexTab);
+        String edgeInputTab = parameterTool.getOrDefault("edgeInputTab", defaultEdgeInputTab);
+        String outputParentTab = parameterTool.getOrDefault("outputParentTab", defaultOutputParentTab);
+        String outputMainTab = parameterTool.getOrDefault("outputMainTab", defaultOutputMainTab);
+        String outputRelationTab = parameterTool.getOrDefault("outputRelationTab", defaultOutputRelationTab);
+
         String outPart = parameterTool.getOrDefault("outPart", nowDate);
         String debugVertexId = parameterTool.getOrDefault("debugVertexId", null);
         System.out.println("input args: " + String.join(" ", args));
@@ -57,7 +66,7 @@ public class EnterpriseGroupJob {
         }
 
         if (parameterTool.has("enableDebug")) {
-           String debugIndex =  parameterTool.getOrDefault("debugIndex", null);
+            String debugIndex = parameterTool.getOrDefault("debugIndex", null);
             job.set("winhc.debug", "true");
             job.set("winhc.debug.index", debugIndex);
             job.set("winhc.debug.debugVertexId", debugVertexId);
@@ -77,16 +86,32 @@ public class EnterpriseGroupJob {
 
         job.addInput(TableInfo.builder()
                 .projectName("winhc_ng")
-                .tableName(vertexTableName)
+                .label("vertex")
+                .tableName(inputVertexTab)
                 .build());
         job.addInput(TableInfo.builder()
                 .projectName("winhc_ng")
-                .tableName(edgeTableName)
+                .tableName(edgeInputTab)
+                .label("edge")
+                .build());
+
+        job.addOutput(TableInfo.builder()
+                .projectName("winhc_ng")
+                .tableName(outputParentTab)
+                .label("parent")
+                .partSpec("ds='" + outPart + "'")
+                .build());
+        job.addOutput(TableInfo.builder()
+                .projectName("winhc_ng")
+                .tableName(outputMainTab)
+                .partSpec("ds='" + outPart + "'")
+                .label("main")
                 .build());
         job.addOutput(TableInfo.builder()
                 .projectName("winhc_ng")
-                .tableName(outputTable)
+                .tableName(outputRelationTab)
                 .partSpec("ds='" + outPart + "'")
+                .label("relation")
                 .build());
 
 

+ 1 - 1
src/main/java/com/winhc/max/compute/graph/job/enterprise_group/EnterpriseGroupReader.java

@@ -45,7 +45,7 @@ public class EnterpriseGroupReader extends
     @Override
     public void setup(Configuration conf, int workerId, TableInfo tableInfo, MutationContext<Text, EnterpriseGroupVertexValue, HolderEdge, EntGroupMsg> context) throws IOException {
         if (tableInfo != null) {
-            isEdgeData = tableInfo.getTableName().endsWith("_edge");
+            isEdgeData = tableInfo.getTableName().contains("edge");
         }
         gson = new Gson();
     }

+ 60 - 26
src/main/java/com/winhc/max/compute/graph/job/enterprise_group/EnterpriseGroupVertex.java

@@ -202,36 +202,27 @@ public class EnterpriseGroupVertex extends
                 , getValue()
         );
 
-        //判断是否是集团企业
-//        if (hasEdges()) {
-//            List<Edge<Text, HolderEdge>> edges = getEdges();
-//
-//            List<String> thisVertexHoldKeyno = new ArrayList<>(edges.getMajorityShareholder().keySet());
-//            List<String> thisVertexHoldCompanyId = thisVertexHoldKeyno.stream().filter(e -> e.length() == 32).collect(Collectors.toList());
-//
-//            if (!getValue().isOutput() && !thisVertexHoldCompanyId.isEmpty()) {
-//                //不属于企业集团
-//                return;
-//            }
-//        }
-
         if (!getValue().isOutput()) {
             return;
         }
-        Set<String> holdCompanyId = getValue().getHoldCompanyIds().tuple2Set();
 
         //输出
         Text enterprise_group_hold_company_id = getId();
         Text enterprise_group_hold_company_name = getValue().getCompanyName();
+
+        Set<String> holdCompanyId = getValue().getHoldCompanyIds().tuple2Set();
         Text group_company_id = new Text(gson.toJson(holdCompanyId));
 
         //todo:可能会出现集团股东 本就是控股企业,需要排除
         Set<String> groupInvestmentCompanyId = getValue().getGroupInvestmentCompanyIds().tuple2Set();
         groupInvestmentCompanyId.removeAll(holdCompanyId);
         Text group_investment_company_id = new Text(gson.toJson(groupInvestmentCompanyId));
+
         Set<String> groupHolderCompanyId = getValue().getGroupHolderCompanyIds().tuple2Set();
         groupHolderCompanyId.removeAll(holdCompanyId);
         Text group_holder_company_id = new Text(gson.toJson(groupHolderCompanyId));
+
+
         Text group_controller_keyno = getValue().getGroupControllerKeyno();
         Text stockRightControlChain = getValue().getStockRightControlChain();
 
@@ -245,17 +236,6 @@ public class EnterpriseGroupVertex extends
         }
 
 
-//        System.out.println("save:"
-//                + enterprise_group_hold_company_id + ","
-//                + enterprise_group_hold_company_name + ","
-//                + hold_num + ","
-//                + group_company_id + ","
-//                + group_investment_company_id + ","
-//                + group_holder_company_id + ","
-//                + group_controller_keyno + ","
-//                + stockRightControlChain
-//        );
-
         if (hold_num.get() < 3) {
             return;
         }
@@ -265,7 +245,10 @@ public class EnterpriseGroupVertex extends
         }
 
 
-        context.write(enterprise_group_hold_company_id
+        //合并输出
+        context.write(
+                "parent"
+                , enterprise_group_hold_company_id
                 , enterprise_group_hold_company_id
                 , enterprise_group_hold_company_name
                 , hold_num
@@ -276,5 +259,56 @@ public class EnterpriseGroupVertex extends
                 , stockRightControlChain
                 , subgroup
         );
+
+        //分批输出
+        context.write(
+                "main"
+                , enterprise_group_hold_company_id
+                , enterprise_group_hold_company_id
+                , enterprise_group_hold_company_name
+                , hold_num
+                , group_controller_keyno
+                , stockRightControlChain
+                , subgroup
+        );
+
+        context.write(
+                "relation"
+                , new Text(enterprise_group_hold_company_id + "_0")
+                , enterprise_group_hold_company_id
+                , new Text("0")
+                , enterprise_group_hold_company_id
+        );
+
+        for (String e : holdCompanyId) {
+            context.write(
+                    "relation"
+                    , new Text(enterprise_group_hold_company_id + "_1_" + e)
+                    , enterprise_group_hold_company_id
+                    , new Text("1")
+                    , new Text(e)
+            );
+        }
+
+        for (String e : groupInvestmentCompanyId) {
+            context.write(
+                    "relation"
+                    , new Text(enterprise_group_hold_company_id + "_2_" + e)
+                    , enterprise_group_hold_company_id
+                    , new Text("2")
+                    , new Text(e)
+            );
+        }
+
+        for (String e : groupHolderCompanyId) {
+            context.write(
+                    "relation"
+                    , new Text(enterprise_group_hold_company_id + "_3_" + e)
+                    , enterprise_group_hold_company_id
+                    , new Text("3")
+                    , new Text(e)
+            );
+        }
+
     }
 }