瀏覽代碼

feat: 企业集团添加测试数据加载代码

- 添加企业集团名称
- 精简图结构输出
许家凯 1 年之前
父節點
當前提交
b48233173b

+ 1 - 1
pom.xml

@@ -6,7 +6,7 @@
 
     <groupId>com.winhc</groupId>
     <artifactId>max-compute-graph</artifactId>
-    <version>1.3</version>
+    <version>1.4</version>
 
     <properties>
         <maven.compiler.source>8</maven.compiler.source>

+ 113 - 0
src/main/java/com/winhc/max/compute/graph/job/enterprise_group/EgReaderTestData.java

@@ -0,0 +1,113 @@
+package com.winhc.max.compute.graph.job.enterprise_group;
+
+import com.aliyun.odps.graph.Edge;
+import com.aliyun.odps.graph.MutationContext;
+import com.aliyun.odps.io.Text;
+import com.winhc.max.compute.graph.job.enterprise_group.entity.EntGroupMsg;
+import com.winhc.max.compute.graph.job.enterprise_group.entity.EnterpriseGroupVertexValue;
+import com.winhc.max.compute.graph.job.enterprise_group.entity.HolderEdge;
+import com.winhc.max.compute.graph.util.GraphLoaderTestData;
+import lombok.SneakyThrows;
+
+import java.util.Locale;
+
+/**
+ * @author: XuJiakai
+ * 2023/4/20 10:53
+ */
+public class EgReaderTestData extends GraphLoaderTestData {
+
+    @SneakyThrows
+    public static void addPersonVertex(MutationContext<Text, EnterpriseGroupVertexValue, HolderEdge, EntGroupMsg> context
+            , String personId
+    ) {
+        EnterpriseGroupVertex enterpriseGroupVertex = new EnterpriseGroupVertex();
+        enterpriseGroupVertex.setId(new Text(completeStr(personId, 33)));
+        enterpriseGroupVertex.setValue(EnterpriseGroupVertexValue.of(personId + "的人名", 6, null, null));
+        context.addVertexRequest(enterpriseGroupVertex);
+
+    }
+
+    private static String completeStr(String val, int length) {
+        StringBuilder sb = new StringBuilder();
+        if (val.length() == length) {
+            sb.append(val);
+        } else if (val.length() < length) {
+            sb.append(val);
+            while (sb.length() < length) {
+                sb.append("1");
+            }
+        } else {
+            sb.append(val, 0, length);
+        }
+        return sb.toString();
+    }
+
+    public static void addVertex(MutationContext<Text, EnterpriseGroupVertexValue, HolderEdge, EntGroupMsg> context
+            , String keyno
+    ) {
+        if (keyno.equals(keyno.toUpperCase(Locale.ROOT))) {
+            addCompanyVertex(context, keyno);
+        } else {
+            addPersonVertex(context, keyno);
+        }
+    }
+
+    @SneakyThrows
+    public static void addCompanyVertex(MutationContext<Text, EnterpriseGroupVertexValue, HolderEdge, EntGroupMsg> context
+            , String companyId
+    ) {
+        EnterpriseGroupVertex enterpriseGroupVertex = new EnterpriseGroupVertex();
+        enterpriseGroupVertex.setId(new Text(completeStr(companyId, 32)));
+        enterpriseGroupVertex.setValue(EnterpriseGroupVertexValue.of(companyId + "公司的名字"));
+        context.addVertexRequest(enterpriseGroupVertex);
+    }
+
+
+    /**
+     * @param context
+     * @param thisCompanyId        当前公司id
+     * @param holderKeyno          股东id
+     * @param investmentProportion 股东投资比例
+     */
+    @SneakyThrows
+    public static void addEdge(MutationContext<Text, EnterpriseGroupVertexValue, HolderEdge, EntGroupMsg> context
+            , String thisCompanyId, String holderKeyno, Double investmentProportion
+    ) {
+
+        thisCompanyId = completeStr(thisCompanyId, 32);
+        if (holderKeyno.equals(holderKeyno.toUpperCase(Locale.ROOT))) {
+            holderKeyno = completeStr(holderKeyno, 32);
+        } else {
+            holderKeyno = completeStr(holderKeyno, 33);
+        }
+
+        int holderTypeVal = 1;
+        if (holderKeyno.length() == 33) {
+            holderTypeVal = 3;
+        }
+        HolderEdge edgeValue = HolderEdge.of(holderKeyno, holderTypeVal, investmentProportion);
+
+        Edge<Text, HolderEdge> edge = new Edge<Text, HolderEdge>(
+                new Text(holderKeyno), edgeValue);
+        context.addEdgeRequest(new Text(thisCompanyId), edge);
+    }
+
+
+    @Override
+    public void loadTestData(MutationContext context) {
+
+        addVertex(context, "A");
+        addVertex(context, "B");
+        addVertex(context, "C");
+        addVertex(context, "D");
+        addVertex(context, "E");
+        addVertex(context, "g");
+
+        addEdge(context, "B", "A", 1d);
+        addEdge(context, "C", "B", 1d);
+        addEdge(context, "D", "B", 1d);
+        addEdge(context, "E", "B", 0.8d);
+        addEdge(context, "E", "g", 0.2d);
+    }
+}

+ 4 - 25
src/main/java/com/winhc/max/compute/graph/job/enterprise_group/EnterpriseGroupJob.java

@@ -4,7 +4,6 @@ import com.aliyun.odps.data.TableInfo;
 import com.aliyun.odps.graph.GraphJob;
 import com.aliyun.odps.graph.RemoveDuplicatesLoadingResolver;
 import com.winhc.max.compute.graph.job.pagerank.CompanyComputingVertexResolver;
-import com.winhc.max.compute.graph.util.BaseUtils;
 import com.winhc.max.compute.graph.util.DateUtils;
 import com.winhc.max.compute.graph.util.ParameterTool;
 
@@ -24,15 +23,9 @@ public class EnterpriseGroupJob {
     private static final String defaultOutputRelationTab = "calc_enterprise_group_relation_tab";
 
     static {
-        if (BaseUtils.isWindows()) {
-            defaultInputVertexTab = "tmp_xjk_test_enterprise_group_input_vertex";
-            defaultEdgeInputTab = "tmp_xjk_test_enterprise_group_input_edge";
-            defaultOutputParentTab = "tmp_xjk_test_enterprise_group_out";
-        } else {
-            defaultInputVertexTab = "calc_enterprise_group_input_vertex";
-            defaultEdgeInputTab = "calc_enterprise_group_input_edge";
-            defaultOutputParentTab = "calc_enterprise_group_out_parent_tab";
-        }
+        defaultInputVertexTab = "calc_enterprise_group_input_vertex";
+        defaultEdgeInputTab = "calc_enterprise_group_input_edge";
+        defaultOutputParentTab = "calc_enterprise_group_out_parent_tab";
     }
 
 
@@ -47,8 +40,6 @@ public class EnterpriseGroupJob {
         String inputVertexTab = parameterTool.getOrDefault("inputVertexTab", defaultInputVertexTab);
         String edgeInputTab = parameterTool.getOrDefault("edgeInputTab", defaultEdgeInputTab);
         String outputParentTab = parameterTool.getOrDefault("outputParentTab", defaultOutputParentTab);
-        String outputMainTab = parameterTool.getOrDefault("outputMainTab", defaultOutputMainTab);
-        String outputRelationTab = parameterTool.getOrDefault("outputRelationTab", defaultOutputRelationTab);
 
         String outPart = parameterTool.getOrDefault("outPart", nowDate);
         String debugVertexId = parameterTool.getOrDefault("debugVertexId", null);
@@ -76,6 +67,7 @@ public class EnterpriseGroupJob {
 
 
         job.setGraphLoaderClass(EnterpriseGroupReader.class);
+//        job.setGraphLoaderClass(EgReaderTestData.class);
         job.setVertexClass(EnterpriseGroupVertex.class);
 
 
@@ -101,19 +93,6 @@ public class EnterpriseGroupJob {
                 .label("parent")
                 .partSpec("ds='" + outPart + "'")
                 .build());
-        job.addOutput(TableInfo.builder()
-                .projectName("winhc_ng")
-                .tableName(outputMainTab)
-                .partSpec("ds='" + outPart + "'")
-                .label("main")
-                .build());
-        job.addOutput(TableInfo.builder()
-                .projectName("winhc_ng")
-                .tableName(outputRelationTab)
-                .partSpec("ds='" + outPart + "'")
-                .label("relation")
-                .build());
-
 
         long startTime = System.currentTimeMillis();
         job.run();

+ 7 - 0
src/main/java/com/winhc/max/compute/graph/job/enterprise_group/EnterpriseGroupReader.java

@@ -95,6 +95,11 @@ public class EnterpriseGroupReader extends
             Text companyName = record.getTextOrNull("company_name");
             Text companyOrgTypeNew = record.getTextOrNull("company_org_type_new");
             Text legalEntity = record.getTextOrNull("legal_entity_ids");
+            Text nameAlias = record.getTextOrNull("name_alias");
+
+            if (nameAlias == null || nameAlias.getLength() == 0) {
+                nameAlias = new Text(companyName);
+            }
 
 
             boolean partnership = false;
@@ -125,6 +130,8 @@ public class EnterpriseGroupReader extends
             } else {
                 enterpriseGroupVertex.setValue(EnterpriseGroupVertexValue.of(companyName.toString()));
             }
+
+            enterpriseGroupVertex.getValue().setNameAlias(nameAlias);
             context.addVertexRequest(enterpriseGroupVertex);
         }
     }

+ 10 - 4
src/main/java/com/winhc/max/compute/graph/job/enterprise_group/EnterpriseGroupVertex.java

@@ -206,6 +206,7 @@ public class EnterpriseGroupVertex extends
         //输出
         Text enterprise_group_hold_company_id = getId();
         Text enterprise_group_hold_company_name = getValue().getCompanyName();
+        Text nameAlias = getValue().getNameAlias();
 
         Set<String> holdCompanyId = getValue().getHoldCompanyIds().tuple2Set();
         Text group_company_id = new Text(gson.toJson(holdCompanyId));
@@ -241,11 +242,15 @@ public class EnterpriseGroupVertex extends
             stockRightControlChain = new Text(s);
         }
 
+        Text enterpriseGroupType = hold_num.get() <= 9 ? new Text("2") : new Text("1");
+
 
         //合并输出
         context.write(
                 "parent"
                 , enterprise_group_hold_company_id
+                , nameAlias
+                , enterpriseGroupType
                 , enterprise_group_hold_company_id
                 , enterprise_group_hold_company_name
                 , hold_num
@@ -258,7 +263,7 @@ public class EnterpriseGroupVertex extends
         );
 
         //分批输出
-        context.write(
+      /*  context.write(
                 "main"
                 , enterprise_group_hold_company_id
                 , enterprise_group_hold_company_id
@@ -298,7 +303,7 @@ public class EnterpriseGroupVertex extends
         }
 
         for (String e : groupHolderCompanyId) {
-            if(e.length()==32){
+            if (e.length() == 32) {
                 context.write(
                         "relation"
                         , new Text(enterprise_group_hold_company_id + "_3_" + e)
@@ -306,7 +311,7 @@ public class EnterpriseGroupVertex extends
                         , new Text("3")
                         , new Text(e)
                 );
-            }else{
+            } else {
                 context.write(
                         "relation"
                         , new Text(enterprise_group_hold_company_id + "_4_" + e)
@@ -315,7 +320,8 @@ public class EnterpriseGroupVertex extends
                         , new Text(e)
                 );
             }
-        }
+        }*/
+
 
     }
 }

+ 17 - 1
src/main/java/com/winhc/max/compute/graph/job/enterprise_group/entity/EnterpriseGroupVertexValue.java

@@ -21,6 +21,8 @@ import java.util.List;
 public class EnterpriseGroupVertexValue implements Writable {
 
     private Text companyName;
+
+    private Text nameAlias;
     /**
      * 公司类型,1为正常企业,2为分公司,3为合伙,4为机关单位,5为事业单位,6为人员节点
      */
@@ -133,6 +135,14 @@ public class EnterpriseGroupVertexValue implements Writable {
         return holdNum;
     }
 
+    public Text getNameAlias() {
+        return nameAlias;
+    }
+
+    public void setNameAlias(Text nameAlias) {
+        this.nameAlias = nameAlias;
+    }
+
     public Tuple getHoldCompanyIds() {
         return holdCompanyIds;
     }
@@ -212,6 +222,9 @@ public class EnterpriseGroupVertexValue implements Writable {
         if (holdNum == null)
             holdNum = new LongWritable(0);
 
+        if (nameAlias == null)
+            nameAlias = new Text();
+
         if (holdCompanyIds == null)
             holdCompanyIds = new Tuple();
 
@@ -246,6 +259,7 @@ public class EnterpriseGroupVertexValue implements Writable {
     @Override
     public void write(DataOutput out) throws IOException {
         companyName.write(out);
+        nameAlias.write(out);
         companyType.write(out);
         legalEntityIds.write(out);
         manualStop.write(out);
@@ -262,6 +276,7 @@ public class EnterpriseGroupVertexValue implements Writable {
     @Override
     public void readFields(DataInput in) throws IOException {
         companyName = new Text();
+        nameAlias = new Text();
         companyType = new IntWritable();
         legalEntityIds = new Tuple();
         manualStop = new BooleanWritable();
@@ -275,6 +290,7 @@ public class EnterpriseGroupVertexValue implements Writable {
         endFlag = new IntWritable();
 
         companyName.readFields(in);
+        nameAlias.readFields(in);
         companyType.readFields(in);
         legalEntityIds.readFields(in);
         manualStop.readFields(in);
@@ -288,11 +304,11 @@ public class EnterpriseGroupVertexValue implements Writable {
         endFlag.readFields(in);
     }
 
-
     @Override
     public String toString() {
         return "EnterpriseGroupVertexValue{" +
                 "companyName=" + companyName +
+                ", nameAlias=" + nameAlias +
                 ", companyType=" + companyType +
                 ", legalEntityIds=" + legalEntityIds +
                 ", manualStop=" + manualStop +

+ 39 - 0
src/main/java/com/winhc/max/compute/graph/util/GraphLoaderTestData.java

@@ -0,0 +1,39 @@
+package com.winhc.max.compute.graph.util;
+
+import com.aliyun.odps.conf.Configuration;
+import com.aliyun.odps.data.TableInfo;
+import com.aliyun.odps.graph.GraphLoader;
+import com.aliyun.odps.graph.MutationContext;
+import com.aliyun.odps.io.LongWritable;
+import com.aliyun.odps.io.WritableRecord;
+
+import java.io.IOException;
+
+/**
+ * @author: XuJiakai
+ * 2023/5/5 16:08
+ */
+public abstract class GraphLoaderTestData extends GraphLoader {
+    private Boolean loadTestData;
+
+    @Override
+    public void setup(Configuration conf, int workerId, TableInfo tableInfo, MutationContext context) throws IOException {
+        loadTestData = true;
+    }
+
+    @Override
+    public void load(LongWritable recordNum, WritableRecord record, MutationContext context) throws IOException {
+        if (loadTestData) {
+            loadTestData(context);
+            loadTestData = false;
+        }
+    }
+
+
+    /**
+     * 添加测试数据
+     *
+     * @param context
+     */
+    public abstract void loadTestData(MutationContext context);
+}