Explorar o código

feat: 添加企业集团

- 支持集团图关系过的剪切
- 支持投资比例为0的兼容
- 机关单位的过滤正在开发中
- 企业族群中多集团的拆分正在开发中
许家凯 hai 1 ano
pai
achega
a9432ea1f7

+ 6 - 6
pom.xml

@@ -6,7 +6,7 @@
 
     <groupId>com.winhc</groupId>
     <artifactId>max-compute-graph</artifactId>
-    <version>1.2</version>
+    <version>1.3</version>
 
     <properties>
         <maven.compiler.source>8</maven.compiler.source>
@@ -31,11 +31,11 @@
             <scope>compile</scope>
         </dependency>
 
-        <dependency>
-            <groupId>cn.hutool</groupId>
-            <artifactId>hutool-all</artifactId>
-            <version>5.7.10</version>
-        </dependency>
+<!--        <dependency>-->
+<!--            <groupId>cn.hutool</groupId>-->
+<!--            <artifactId>hutool-all</artifactId>-->
+<!--            <version>5.7.10</version>-->
+<!--        </dependency>-->
         <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
         <dependency>
             <groupId>org.projectlombok</groupId>

+ 43 - 0
src/main/java/com/winhc/max/compute/graph/job/enterprise_group/EnterpriseGroupAggregator.java

@@ -0,0 +1,43 @@
+package com.winhc.max.compute.graph.job.enterprise_group;
+
+import com.aliyun.odps.graph.Aggregator;
+import com.aliyun.odps.graph.WorkerContext;
+import com.aliyun.odps.io.BooleanWritable;
+import com.aliyun.odps.io.NullWritable;
+import com.winhc.max.compute.graph.job.enterprise_group.entity.EnterpriseGroupAggValue;
+
+import java.io.IOException;
+
+/**
+ * @author: XuJiakai
+ * 2023/2/7 14:16
+ */
+public class EnterpriseGroupAggregator extends Aggregator<EnterpriseGroupAggValue> {
+    @Override
+    public EnterpriseGroupAggValue createInitialValue(WorkerContext context) throws IOException {
+        return new EnterpriseGroupAggValue();
+    }
+
+    @Override
+    public void aggregate(EnterpriseGroupAggValue enterpriseGroupAggValue, Object item) throws IOException {
+        if (item instanceof NullWritable) {
+            enterpriseGroupAggValue.increment();
+        } else {
+            BooleanWritable tmp = ((BooleanWritable) item);
+            enterpriseGroupAggValue.update(tmp.get());
+        }
+    }
+
+    @Override
+    public void merge(EnterpriseGroupAggValue enterpriseGroupAggValue, EnterpriseGroupAggValue partial) throws IOException {
+        boolean tmp = partial.getFlag().get();
+        enterpriseGroupAggValue.update(tmp);
+        enterpriseGroupAggValue.increment(partial.getCount());
+    }
+
+    @Override
+    public boolean terminate(WorkerContext context, EnterpriseGroupAggValue enterpriseGroupAggValue) throws IOException {
+        System.out.println("step: " + context.getSuperstep() + ", 本轮迭代节点数:" + enterpriseGroupAggValue.getCount() + " ,flag=" + enterpriseGroupAggValue.getFlag().get());
+        return !enterpriseGroupAggValue.getFlag().get();
+    }
+}

+ 69 - 0
src/main/java/com/winhc/max/compute/graph/job/enterprise_group/EnterpriseGroupJob.java

@@ -0,0 +1,69 @@
+package com.winhc.max.compute.graph.job.enterprise_group;
+
+import com.aliyun.odps.data.TableInfo;
+import com.aliyun.odps.graph.GraphJob;
+import com.aliyun.odps.graph.RemoveDuplicatesLoadingResolver;
+import com.winhc.max.compute.graph.job.pagerank.CompanyComputingVertexResolver;
+import com.winhc.max.compute.graph.util.ParameterTool;
+
+import java.io.IOException;
+
+/**
+ * @author: XuJiakai
+ * 2023/2/7 13:58
+ */
+public class EnterpriseGroupJob {
+
+    public static void main(String[] args) throws IOException {
+        ParameterTool parameterTool = ParameterTool.fromArgs(args);
+        String numWorkers = parameterTool.getOrDefault("numWorkers", "70");
+        String workerCPU = parameterTool.getOrDefault("workerCPU", "2");
+        String workerMem = parameterTool.getOrDefault("workerMem", "8192");
+
+        System.out.println("input args: " + String.join(" ", args));
+        GraphJob job = new GraphJob();
+
+
+
+        job.setMaxIteration(-1);
+        job.setNumWorkers(Integer.parseInt(numWorkers));
+        if(parameterTool.has("workerCPU")){
+            job.setWorkerCPU(Integer.parseInt(workerCPU) * 100);
+        }
+        if(parameterTool.has("workerMem")){
+            job.setWorkerMemory(Integer.parseInt(workerMem));
+        }
+
+
+
+        job.setGraphLoaderClass(EnterpriseGroupReader.class);
+        job.setVertexClass(EnterpriseGroupVertex.class);
+
+
+        job.setLoadingVertexResolver(RemoveDuplicatesLoadingResolver.class);
+        job.setComputingVertexResolver(CompanyComputingVertexResolver.class);
+        job.setAggregatorClass(EnterpriseGroupAggregator.class);
+
+
+        job.addInput(TableInfo.builder()
+                .projectName("winhc_ng")
+                .tableName("tmp_xjk_enterprise_group_input_vertex")
+                .build());
+        job.addInput(TableInfo.builder()
+                .projectName("winhc_ng")
+                .tableName("tmp_xjk_enterprise_group_input_edge")
+                .build());
+        job.addOutput(TableInfo.builder()
+                .projectName("winhc_ng")
+                .tableName("tmp_xjk_enterprise_group_out")
+                .build());
+
+
+        long startTime = System.currentTimeMillis();
+        job.run();
+        System.out.println("Job Finished in "
+                + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
+
+
+    }
+}

+ 115 - 0
src/main/java/com/winhc/max/compute/graph/job/enterprise_group/EnterpriseGroupReader.java

@@ -0,0 +1,115 @@
+package com.winhc.max.compute.graph.job.enterprise_group;
+
+
+import com.aliyun.odps.conf.Configuration;
+import com.aliyun.odps.data.TableInfo;
+import com.aliyun.odps.graph.Edge;
+import com.aliyun.odps.graph.GraphLoader;
+import com.aliyun.odps.graph.MutationContext;
+import com.aliyun.odps.io.DoubleWritable;
+import com.aliyun.odps.io.LongWritable;
+import com.aliyun.odps.io.Text;
+import com.aliyun.odps.io.WritableRecord;
+import com.google.gson.Gson;
+import com.winhc.max.compute.graph.job.enterprise_group.entity.EntGroupMsg;
+import com.winhc.max.compute.graph.job.enterprise_group.entity.EnterpriseGroupVertexValue;
+import com.winhc.max.compute.graph.job.enterprise_group.entity.HolderEdge;
+import com.winhc.max.compute.graph.util.JsonUtils;
+import com.winhc.max.compute.graph.util.WritableRecordExtensions;
+import lombok.experimental.ExtensionMethod;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * @author: XuJiakai
+ * 2023/2/7 11:12
+ * <p>
+ * 数据读入
+ */
+@ExtensionMethod({
+        EnterpriseGroupUtils.class
+        , JsonUtils.class
+        , WritableRecordExtensions.class
+})
+public class EnterpriseGroupReader extends
+        GraphLoader<Text, EnterpriseGroupVertexValue, HolderEdge, EntGroupMsg> {
+    private Boolean isEdgeData;
+    private Gson gson;
+
+    @Override
+    public void setup(Configuration conf, int workerId, TableInfo tableInfo, MutationContext<Text, EnterpriseGroupVertexValue, HolderEdge, EntGroupMsg> context) throws IOException {
+        if (tableInfo != null) {
+            isEdgeData = tableInfo.getTableName().endsWith("_edge");
+        }
+        gson = new Gson();
+    }
+
+    @Override
+    public void load(LongWritable recordNum, WritableRecord record, MutationContext<Text, EnterpriseGroupVertexValue, HolderEdge, EntGroupMsg> context) throws IOException {
+        if (isEdgeData) {
+            //加载边
+            Text companyId = record.getTextOrNull("company_id");
+            Text holderKeyno = record.getTextOrNull("holder_keyno");
+            Text holderName = record.getTextOrNull("holder_name");
+            DoubleWritable investmentProportion = record.getOrNull("investment_proportion", DoubleWritable.class);
+            if (investmentProportion == null) {
+                return;
+            }
+            HolderEdge edgeValue = HolderEdge.of(holderKeyno.toString(), investmentProportion.get());
+
+            Edge<Text, HolderEdge> edge = new Edge<Text, HolderEdge>(
+                    holderKeyno, edgeValue);
+            context.addEdgeRequest(companyId, edge);
+
+            if (holderKeyno.toString().length() == 33) {
+                //添加人的顶点
+                EnterpriseGroupVertex enterpriseGroupVertex = new EnterpriseGroupVertex();
+                enterpriseGroupVertex.setId(holderKeyno);
+                enterpriseGroupVertex.setValue(EnterpriseGroupVertexValue.of(holderName.toString()));
+                context.addVertexRequest(enterpriseGroupVertex);
+            }
+
+        } else {
+            //加载点,个人的顶点也要算在内
+            Text companyId = record.getTextOrNull("company_id");
+            Text companyName = record.getTextOrNull("company_name");
+            Text companyOrgTypeNew = record.getTextOrNull("company_org_type_new");
+            Text legalEntity = record.getTextOrNull("legal_entity_ids");
+
+
+            boolean partnership = false;
+
+            List<String> companyOrgTypeNewList = gson.parseListStr(companyOrgTypeNew.toString());
+            if (companyOrgTypeNewList != null)
+                for (String tmpText : companyOrgTypeNewList) {
+                    boolean flag = tmpText.contains("合伙");
+                    if (flag) {
+                        partnership = true;
+                        break;
+                    }
+                }
+
+            EnterpriseGroupVertex enterpriseGroupVertex = new EnterpriseGroupVertex();
+            enterpriseGroupVertex.setId(companyId);
+            if (partnership) {
+                List<Map<String, String>> le = gson.parseListStrByMap(legalEntity.toString());
+
+                List<String> list = le.stream().map(e -> {
+                    if (e.get("deleted").equals("0")) {
+                        return e.get("id");
+                    } else {
+                        return null;
+                    }
+                }).filter(Objects::nonNull).collect(Collectors.toList());
+                enterpriseGroupVertex.setValue(EnterpriseGroupVertexValue.of(companyName.toString(), list));
+            } else {
+                enterpriseGroupVertex.setValue(EnterpriseGroupVertexValue.of(companyName.toString()));
+            }
+            context.addVertexRequest(enterpriseGroupVertex);
+        }
+    }
+}

+ 205 - 0
src/main/java/com/winhc/max/compute/graph/job/enterprise_group/EnterpriseGroupUtils.java

@@ -0,0 +1,205 @@
+package com.winhc.max.compute.graph.job.enterprise_group;
+
+import com.aliyun.odps.graph.Edge;
+import com.aliyun.odps.io.Text;
+import com.aliyun.odps.io.Tuple;
+import com.aliyun.odps.utils.StringUtils;
+import com.winhc.max.compute.graph.job.enterprise_group.entity.HolderEdge;
+import com.winhc.max.compute.graph.util.WinhcTuple;
+
+import java.util.*;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * @author: XuJiakai
+ * 2023/2/8 10:30
+ */
+public class EnterpriseGroupUtils {
+
+
+    /**
+     * 获取大股东
+     * 执行事务合伙人要绝对控股
+     *
+     * @param edges 所有股东
+     * @return
+     */
+    public static Map<String, Double> getMajorityShareholder(List<Edge<Text, HolderEdge>> edges) {
+        return getMajorityShareholder(edges, null);
+    }
+
+    public static Map<String, Double> getMajorityShareholder(List<Edge<Text, HolderEdge>> edges, List<String> legalEntityIds) {
+        Map<String, Double> map = new HashMap<>();
+
+        if (legalEntityIds != null && !legalEntityIds.isEmpty()) {
+            Set<String> set = new HashSet<>(legalEntityIds);
+            return edges.stream().filter(e -> set.contains(e.getDestVertexId().toString())).collect(Collectors.toMap(e -> e.getDestVertexId().toString(), e -> e.getValue().getInvestmentProportion().get(), (e1, e2) -> e1));
+        }
+
+        double holdProportion = 0.0;
+
+        for (Edge<Text, HolderEdge> edge : edges) {
+            String destVertexId = edge.getDestVertexId().toString();
+
+            HolderEdge value = edge.getValue();
+            double v = value.getInvestmentProportion().get();
+            int compare = Double.compare(v, holdProportion);
+            if (compare > 0) {
+                map.clear();
+                map.put(destVertexId, v);
+                holdProportion = v;
+            } else if (compare == 0 && v > 0) {
+                map.put(destVertexId, v);
+            }
+        }
+        return map;
+    }
+
+
+    /**
+     * 获取非控股的其它股东id
+     *
+     * @param edges                    所有股东
+     * @param majorityShareholderKeyno 控股股东id
+     * @return
+     */
+    public static List<String> getOtherHolder(List<Edge<Text, HolderEdge>> edges, List<String> majorityShareholderKeyno) {
+        Set<String> set = new HashSet<>(majorityShareholderKeyno);
+
+        List<String> collect = edges.stream().map(Edge::getDestVertexId).map(Text::toString).filter(e -> !set.contains(e)).collect(Collectors.toList());
+        return collect;
+    }
+
+
+    public static List<String> tuple2List(Tuple tuple) {
+        if (tuple == null) {
+            return Collections.emptyList();
+        }
+
+
+        List<String> collect = tuple.getAll().stream().filter(Objects::nonNull).map(e -> ((Text) e).toString()).collect(Collectors.toList());
+        return collect;
+    }
+
+    public static Tuple list2Tuple(List<String> list) {
+        if (list == null) {
+            return new Tuple();
+        }
+        Tuple tuple = new Tuple(list.size());
+        for (String ele : list) {
+            tuple.append(new Text(ele));
+        }
+        return tuple;
+    }
+
+
+    private static WinhcTuple<String, String> readNode(String chain) {
+        int startIndex = chain.indexOf("(") + 1;
+        int endIndex = chain.indexOf(")");
+        String nodeId = chain.substring(startIndex, endIndex);
+        return WinhcTuple.of(nodeId, chain.substring(endIndex + 1));
+    }
+
+    private static WinhcTuple<String, String> readEdge(String chain) {
+        int startIndex = chain.indexOf("[") + 1;
+        int endIndex = chain.indexOf("]");
+        String edge = chain.substring(startIndex, endIndex);
+        return WinhcTuple.of(edge, chain.substring(endIndex + 1));
+    }
+
+
+    public static Set<String> splitStockChain(String stockChain) {
+        if (StringUtils.isBlank(stockChain)) {
+            return new HashSet<>();
+        }
+        Set<String> result = new HashSet<>();
+
+        for (String chain : stockChain.split(",")) {
+            String tempNodeId = null;
+
+            while (chain.length() > 10) {
+                String node1;
+                if (tempNodeId == null) {
+                    WinhcTuple<String, String> tuple = readNode(chain);
+                    node1 = tuple._1;
+                    chain = tuple._2;
+                } else {
+                    node1 = tempNodeId;
+                }
+                WinhcTuple<String, String> tuple = readEdge(chain);
+                String edge = tuple._1;
+                chain = tuple._2;
+                tuple = readNode(chain);
+                String node2 = tuple._1;
+                chain = tuple._2;
+                result.add("(" + node1 + ")<-[" + edge + "]-(" + node2 + ")");
+                tempNodeId = node2;
+            }
+        }
+        return result;
+    }
+
+
+    private static Set<String> getRelationNodeId(String rootNodeId, Set<String> chains) {
+        return chains.stream()
+                .filter(e -> e.split("]")[1].contains(rootNodeId))
+                .map(e -> readNode(e)._1)
+                .collect(Collectors.toSet());
+    }
+
+
+    private static void getChildNode(Set<String> resultNode, Set<String> chains, int nodeLimitNum, String rootId, int leveNum) {
+        if (resultNode.size() >= nodeLimitNum) {
+            return;
+        }
+
+        Set<String> thisNodeSet = getRelationNodeId(rootId, chains);
+        String[] thisNodeArray = thisNodeSet.toArray(new String[0]);
+
+        if (resultNode.size() + thisNodeSet.size() >= nodeLimitNum) {
+            int t = nodeLimitNum - resultNode.size();
+            for (int i = 0; i < t; i++) {
+                resultNode.add(thisNodeArray[i]);
+            }
+            return;
+        } else {
+            resultNode.addAll(thisNodeSet);
+            for (String s : thisNodeArray) {
+                getChildNode(resultNode, chains, nodeLimitNum, s, leveNum + 1);
+            }
+        }
+    }
+
+
+    /**
+     * @param rootNodeId
+     * @param stockChain
+     * @param nodeLimitNum
+     * @return
+     */
+    public static String stockChainTrim(String rootNodeId, String stockChain, int nodeLimitNum) {
+        Set<String> nodeIdSet = new HashSet<>();
+        Map<Integer, Set<String>> map = new HashMap<>();
+        Set<String> chain = splitStockChain(stockChain);
+
+        Set<String> tmpSet = new HashSet<String>() {{
+            add(rootNodeId);
+        }};
+        map.put(0, tmpSet);
+        nodeIdSet.addAll(tmpSet);
+
+        getChildNode(nodeIdSet, chain, nodeLimitNum, rootNodeId, 1);
+
+        Stream<String> stringStream = chain.stream().filter(e -> {
+            WinhcTuple<String, String> tuple = readNode(e);
+            WinhcTuple<String, String> tuple1 = readNode(tuple._2);
+            return nodeIdSet.contains(tuple1._1) && nodeIdSet.contains(tuple._1);
+        });
+        return stringStream.collect(Collectors.joining(","));
+    }
+
+    public static void main(String[] args) {
+        System.out.println(stockChainTrim("b", "(a)<-[1]-(b),(c)<-[1]-(b),(d)<-[1]-(b),(c)<-[1]-(d)", 3));
+    }
+}

+ 245 - 0
src/main/java/com/winhc/max/compute/graph/job/enterprise_group/EnterpriseGroupVertex.java

@@ -0,0 +1,245 @@
+package com.winhc.max.compute.graph.job.enterprise_group;
+
+import com.aliyun.odps.graph.ComputeContext;
+import com.aliyun.odps.graph.Edge;
+import com.aliyun.odps.graph.Vertex;
+import com.aliyun.odps.graph.WorkerContext;
+import com.aliyun.odps.io.BooleanWritable;
+import com.aliyun.odps.io.LongWritable;
+import com.aliyun.odps.io.NullWritable;
+import com.aliyun.odps.io.Text;
+import com.aliyun.odps.utils.StringUtils;
+import com.google.gson.Gson;
+import com.winhc.max.compute.graph.job.enterprise_group.entity.EntGroupMsg;
+import com.winhc.max.compute.graph.job.enterprise_group.entity.EnterpriseGroupVertexValue;
+import com.winhc.max.compute.graph.job.enterprise_group.entity.HolderEdge;
+import com.winhc.max.compute.graph.util.WritableRecordExtensions;
+import lombok.experimental.ExtensionMethod;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.stream.Collectors;
+
+/**
+ * @author: XuJiakai
+ * 2023/2/7 13:59
+ */
+@ExtensionMethod({
+        WritableRecordExtensions.class
+        , EnterpriseGroupUtils.class
+})
+@Slf4j
+public class EnterpriseGroupVertex extends
+        Vertex<Text, EnterpriseGroupVertexValue, HolderEdge, EntGroupMsg> {
+
+    private static Gson gson;
+
+    @Override
+    public void setup(WorkerContext<Text, EnterpriseGroupVertexValue, HolderEdge, EntGroupMsg> context) throws IOException {
+        gson = new Gson();
+    }
+
+    @Override
+    public void compute(ComputeContext<Text, EnterpriseGroupVertexValue, HolderEdge, EntGroupMsg> context, Iterable<EntGroupMsg> messages) throws IOException {
+        context.aggregate(NullWritable.get());
+        if (getId().toString().length() == 33) {
+            voteToHalt();
+            //跳过人员节点
+            return;
+        }
+
+        List<String> list = getValue().getLegalEntityIds().tuple2List();
+
+        //获取需要传递的下一个节点,如无,当前企业则为控股控股公司。
+        Map<String, Double> majorityShareholder = new HashMap<>();
+        List<String> thisVertexHoldKeyno = new ArrayList<>();
+        List<String> thisVertexHoldCompanyId = new ArrayList<>();
+        List<String> thisVertexOtherHolderKeyno = new ArrayList<>();
+        List<String> thisVertexOtherHolderCompanyId = new ArrayList<>();
+        if (hasEdges()) {
+            //获取下游目标节点
+            List<Edge<Text, HolderEdge>> edges = getEdges();
+            majorityShareholder = edges.getMajorityShareholder(list);
+            thisVertexHoldKeyno = new ArrayList<>(majorityShareholder.keySet());
+            thisVertexHoldCompanyId = thisVertexHoldKeyno.stream().filter(e -> e.length() == 32).collect(Collectors.toList());
+            thisVertexOtherHolderKeyno = edges.getOtherHolder(thisVertexHoldKeyno);
+            thisVertexOtherHolderCompanyId = thisVertexOtherHolderKeyno.stream().filter(e -> e.length() == 32).collect(Collectors.toList());
+
+            //todo:如果控股股东中有国有企业,则将国有企业从控股股东中移出
+            Set<String> collect = edges.stream().filter(e -> {
+                int i = e.getValue().getHolderType().get();
+                return i == 2;
+            }).map(e -> e.getDestVertexId().toString()).collect(Collectors.toSet());
+            if (!collect.isEmpty()) {
+                thisVertexHoldCompanyId = thisVertexHoldCompanyId.stream().filter(e -> !collect.contains(e)).collect(Collectors.toList());
+            }
+        }
+
+        //将本节点传递到下一节点
+        if (!getValue().isSendMsg()) {
+            for (String destVertexId : thisVertexHoldCompanyId) {
+                //传递控股企业
+                EntGroupMsg entGroupMsg = EntGroupMsg.ofByType_1(getId(), getValue().getCompanyName(), thisVertexOtherHolderCompanyId);
+                boolean flag = entGroupMsg.routeLog(destVertexId);
+                if (!flag) {
+//                    getValue().setOutput();
+                    continue;
+                }
+
+                entGroupMsg.addStockRightControlChain(getId().toString(), majorityShareholder.get(destVertexId), destVertexId);
+
+                context.sendMessage(new Text(destVertexId), entGroupMsg);
+                context.aggregate(new BooleanWritable(true));
+            }
+
+            for (String destVertexId : thisVertexOtherHolderCompanyId) {
+                //传递参股企业
+                EntGroupMsg entGroupMsg = EntGroupMsg.ofByType_2(getId(), getValue().getCompanyName());
+                boolean flag = entGroupMsg.routeLog(destVertexId);
+                if (!flag) {
+//                    getValue().setOutput();   todo:参股企业没必要输出
+                    continue;
+                }
+                context.sendMessage(new Text(destVertexId), entGroupMsg);
+                context.aggregate(new BooleanWritable(true));
+            }
+
+            if (thisVertexHoldCompanyId.isEmpty()) {
+                //如果没有控股企业,则本节点为集团,需将本节点的参股股东,添加至集团股东
+                List<String> groupHolderCompanyIds = getValue().getGroupHolderCompanyIds().tuple2List();
+                groupHolderCompanyIds.addAll(thisVertexOtherHolderCompanyId);
+                getValue().setGroupHolderCompanyIds(groupHolderCompanyIds);
+            }
+
+            getValue().setEndFlag();
+        }
+
+
+        //是否将消息汇总至该节点
+        boolean collect = thisVertexHoldCompanyId.isEmpty();
+
+        if (collect) {
+            //无法传递,则在本节点汇总
+            mergeMsg2Vertex(getValue(), messages, thisVertexHoldKeyno);
+        } else {
+            for (EntGroupMsg message : messages) {
+                //将本节点获取的消息直接传递至下一节点,只传递到控股企业。不必将接收到的参股企业做区分,直接只往控股企业传递就行
+                for (String destVertexId : thisVertexHoldCompanyId) {
+                    boolean flag = message.routeLog(destVertexId);
+                    if (!flag) {
+                        mergeMsg2Vertex(getValue(), Arrays.asList(message), Collections.emptyList());
+                        continue;
+                    }
+                    message.addStockRightControlChain(getId().toString(), majorityShareholder.get(destVertexId), destVertexId);
+
+                    context.sendMessage(new Text(destVertexId), message);
+                    context.aggregate(new BooleanWritable(true));
+                }
+            }
+        }
+
+        voteToHalt();
+    }
+
+    /**
+     * 将msg信息合并到当前节点
+     *
+     * @param vertexEntity
+     * @param messages
+     * @param thisVertexHoldKeyno
+     */
+    private static void mergeMsg2Vertex(EnterpriseGroupVertexValue vertexEntity, Iterable<EntGroupMsg> messages, List<String> thisVertexHoldKeyno) {
+        long holdNum = vertexEntity.getHoldNum().get();
+        List<String> holdCompanyId = vertexEntity.getHoldCompanyIds().tuple2List();
+        List<String> groupInvestmentCompanyId = vertexEntity.getGroupInvestmentCompanyIds().tuple2List();
+        List<String> groupHolderCompanyId = vertexEntity.getGroupHolderCompanyIds().tuple2List();
+        List<String> stockRightControlChain = Arrays.stream(vertexEntity.getStockRightControlChain().toString().split(",")).collect(Collectors.toList());
+
+        for (EntGroupMsg message : messages) {
+            holdNum += message.getHoldNum().get();
+            holdCompanyId.addAll(message.getHoldCompanyIds().tuple2List());
+            groupHolderCompanyId.addAll(message.getGroupHolderCompanyIds().tuple2List());
+            groupInvestmentCompanyId.addAll(message.getGroupInvestmentCompanyIds().tuple2List());
+            stockRightControlChain.add(message.getStockRightControlChain().toString());
+        }
+        Text groupControllerKeyno = new Text();
+        if (!thisVertexHoldKeyno.isEmpty()) {
+            groupControllerKeyno = new Text(String.join(",", thisVertexHoldKeyno));
+        }
+
+        String collect = stockRightControlChain.stream().filter(StringUtils::isNotBlank).collect(Collectors.joining(","));
+
+        vertexEntity.setHoldNum(holdNum);
+        vertexEntity.setHoldCompanyIds(holdCompanyId);
+        vertexEntity.setGroupInvestmentCompanyIds(groupInvestmentCompanyId);
+        vertexEntity.setGroupHolderCompanyIds(groupHolderCompanyId);
+        vertexEntity.setGroupControllerKeyno(groupControllerKeyno);
+        vertexEntity.setStockRightControlChain(new Text(collect));
+        vertexEntity.setOutput();
+    }
+
+    @Override
+    public void cleanup(WorkerContext<Text, EnterpriseGroupVertexValue, HolderEdge, EntGroupMsg> context) throws IOException {
+        if (getId().toString().length() == 33) {
+            //跳过人员节点
+            return;
+        }
+
+        //判断是否是集团企业
+//        if (hasEdges()) {
+//            List<Edge<Text, HolderEdge>> edges = getEdges();
+//
+//            List<String> thisVertexHoldKeyno = new ArrayList<>(edges.getMajorityShareholder().keySet());
+//            List<String> thisVertexHoldCompanyId = thisVertexHoldKeyno.stream().filter(e -> e.length() == 32).collect(Collectors.toList());
+//
+//            if (!getValue().isOutput() && !thisVertexHoldCompanyId.isEmpty()) {
+//                //不属于企业集团
+//                return;
+//            }
+//        }
+
+        if(!getValue().isOutput()){
+            return;
+        }
+
+
+        //输出
+        Text enterprise_group_hold_company_id = getId();
+        Text enterprise_group_hold_company_name = getValue().getCompanyName();
+        LongWritable hold_num = new LongWritable(getValue().getHoldNum().get() + 1);
+        Text group_company_id = new Text(gson.toJson(getValue().getHoldCompanyIds().tuple2List()));
+        Text group_investment_company_id = new Text(gson.toJson(getValue().getGroupInvestmentCompanyIds().tuple2List()));
+        Text group_holder_company_id = new Text(gson.toJson(getValue().getGroupHolderCompanyIds().tuple2List()));
+        Text group_controller_keyno = getValue().getGroupControllerKeyno();
+        Text stockRightControlChain = getValue().getStockRightControlChain();
+
+//        System.out.println("save:"
+//                + enterprise_group_hold_company_id + ","
+//                + enterprise_group_hold_company_name + ","
+//                + hold_num + ","
+//                + group_company_id + ","
+//                + group_investment_company_id + ","
+//                + group_holder_company_id + ","
+//                + group_controller_keyno + ","
+//                + stockRightControlChain
+//        );
+
+        if (hold_num.get() > 200) {
+            String s = EnterpriseGroupUtils.stockChainTrim(enterprise_group_hold_company_id.toString(), stockRightControlChain.toString(), 10);
+            stockRightControlChain = new Text(s);
+        }
+
+
+        context.write(enterprise_group_hold_company_id
+                , enterprise_group_hold_company_id
+                , enterprise_group_hold_company_name
+                , hold_num
+                , group_company_id
+                , group_investment_company_id
+                , group_holder_company_id
+                , group_controller_keyno
+                , stockRightControlChain
+        );
+    }
+}

+ 226 - 0
src/main/java/com/winhc/max/compute/graph/job/enterprise_group/entity/EntGroupMsg.java

@@ -0,0 +1,226 @@
+package com.winhc.max.compute.graph.job.enterprise_group.entity;
+
+import com.aliyun.odps.io.*;
+import com.winhc.max.compute.graph.job.enterprise_group.EnterpriseGroupUtils;
+import com.winhc.max.compute.graph.job.enterprise_group.EnterpriseGroupVertex;
+import lombok.experimental.ExtensionMethod;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * @author: XuJiakai
+ * 2023/2/7 11:13
+ */
+@ExtensionMethod({
+        EnterpriseGroupUtils.class
+})
+public class EntGroupMsg implements Writable {
+
+    /**
+     * 消息类型,1控股企业,2参股企业
+     * todo:目前可以考虑不做type区分
+     * {@link EnterpriseGroupVertex:90}
+     */
+    private IntWritable msgType;
+
+    /**
+     * 企业集团成员数
+     */
+    private LongWritable holdNum;
+
+    /**
+     * 下属控企业id,数组
+     * 企业集团成员的id
+     */
+    private Tuple holdCompanyIds;
+
+    /**
+     * 集团对外投资id
+     */
+    private Tuple groupInvestmentCompanyIds;
+
+    /**
+     * 集团股东id,集团股东的股东是否算在内?不计算
+     */
+    private Tuple groupHolderCompanyIds;
+
+
+    /**
+     * 该条消息的首发节点信息,如果是名字包含集团才记录
+     */
+    private Text sourceVertexId;
+    private Text sourceVertexName;
+
+
+    /**
+     * 途径id记录,防止陷入循环
+     */
+    private Tuple routeLog;
+
+
+    /**
+     * 股权链: (a)<-[0.51]-(b)<-[1.0]-(c)
+     * 多个用逗号分割
+     */
+    private Text stockRightControlChain;
+
+
+    public boolean addStockRightControlChain(String companyId, double investmentProportion, String holderKeyno) {
+        String s = stockRightControlChain.toString();
+        if (s.length() == 0) {
+            stockRightControlChain.set("(" + companyId + ")<-[" + investmentProportion + "]-(" + holderKeyno + ")");
+        } else {
+            int lastIndexOf = s.lastIndexOf("(");
+            String lastHolderId = s.substring(lastIndexOf + 1, s.length() - 1);
+            if (companyId.equals(lastHolderId)) {
+                s += "<-[" + investmentProportion + "]-(" + holderKeyno + ")";
+                stockRightControlChain.set(s);
+            } else {
+                return false;
+            }
+        }
+        return true;
+    }
+
+
+    /**
+     * 记录并测试是否重复
+     *
+     * @param vertexId
+     * @return false表示该节点已经走过。
+     */
+    public boolean routeLog(Text vertexId) {
+        return routeLog(vertexId.toString());
+    }
+
+    public boolean routeLog(String vertexId) {
+        long count = routeLog.getAll().stream().map(Object::toString).filter(e -> e.equals(vertexId)).count();
+
+        if (count > 0) {
+//            System.out.println("陷入循环," + String.join(",", routeLog.tuple2List()));
+            return false;
+        }
+        routeLog.append(new Text(vertexId));
+        return true;
+    }
+
+
+    public static EntGroupMsg ofByType_1(Text thisVertexCompanyId, Text thisVertexCompanyName, List<String> groupHolderCompanyIds) {
+        //发给控股企业消息
+        EntGroupMsg entGroupMsg = new EntGroupMsg();
+        entGroupMsg.msgType = new IntWritable(1);
+        entGroupMsg.holdNum = new LongWritable(1);
+        entGroupMsg.holdCompanyIds = new Tuple(1);
+        entGroupMsg.groupHolderCompanyIds = new Tuple();
+
+        entGroupMsg.holdCompanyIds.append(new Text(thisVertexCompanyId));
+        if (groupHolderCompanyIds != null) {
+            for (String s : groupHolderCompanyIds) {
+                entGroupMsg.groupHolderCompanyIds.append(new Text(s));
+            }
+        }
+
+        addOtherMsg(entGroupMsg, thisVertexCompanyId, thisVertexCompanyId);
+        return entGroupMsg;
+    }
+
+
+    public static EntGroupMsg ofByType_2(Text thisVertexCompanyId, Text thisVertexCompanyName) {
+        //发给参股企业消息
+        EntGroupMsg entGroupMsg = new EntGroupMsg();
+        entGroupMsg.msgType = new IntWritable(2);
+        entGroupMsg.holdNum = new LongWritable(0);
+        entGroupMsg.groupInvestmentCompanyIds = new Tuple(1);
+        entGroupMsg.groupInvestmentCompanyIds.append(new Text(thisVertexCompanyId));
+
+        addOtherMsg(entGroupMsg, thisVertexCompanyId, thisVertexCompanyId);
+        return entGroupMsg;
+    }
+
+
+    private static void addOtherMsg(EntGroupMsg entGroupMsg, Text thisVertexCompanyId, Text thisVertexCompanyName) {
+        entGroupMsg.sourceVertexId = thisVertexCompanyId;
+        entGroupMsg.sourceVertexName = thisVertexCompanyName;
+
+        entGroupMsg.routeLog = new Tuple();
+        entGroupMsg.routeLog.append(thisVertexCompanyId);
+        entGroupMsg.stockRightControlChain = new Text();
+    }
+
+
+    public int getMsgType() {
+        return msgType.get();
+    }
+
+    public LongWritable getHoldNum() {
+        return holdNum;
+    }
+
+    public Tuple getHoldCompanyIds() {
+        return holdCompanyIds;
+    }
+
+    public Tuple getGroupInvestmentCompanyIds() {
+        return groupInvestmentCompanyIds;
+    }
+
+    public Tuple getGroupHolderCompanyIds() {
+        return groupHolderCompanyIds;
+    }
+
+    public Text getStockRightControlChain() {
+        return stockRightControlChain;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        if (holdCompanyIds == null) {
+            holdCompanyIds = new Tuple();
+        }
+
+        if (groupInvestmentCompanyIds == null) {
+            groupInvestmentCompanyIds = new Tuple();
+        }
+
+        if (groupHolderCompanyIds == null) {
+            groupHolderCompanyIds = new Tuple();
+        }
+        if (routeLog == null) {
+            routeLog = new Tuple();
+        }
+        if (stockRightControlChain == null)
+            stockRightControlChain = new Text();
+
+        msgType.write(out);
+        holdNum.write(out);
+        holdCompanyIds.write(out);
+        groupInvestmentCompanyIds.write(out);
+        groupHolderCompanyIds.write(out);
+        routeLog.write(out);
+        stockRightControlChain.write(out);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+
+        msgType = new IntWritable();
+        holdNum = new LongWritable();
+        holdCompanyIds = new Tuple();
+        groupInvestmentCompanyIds = new Tuple();
+        groupHolderCompanyIds = new Tuple();
+        routeLog = new Tuple();
+        stockRightControlChain = new Text();
+
+        msgType.readFields(in);
+        holdNum.readFields(in);
+        holdCompanyIds.readFields(in);
+        groupInvestmentCompanyIds.readFields(in);
+        groupHolderCompanyIds.readFields(in);
+        routeLog.readFields(in);
+        stockRightControlChain.readFields(in);
+
+    }
+}

+ 66 - 0
src/main/java/com/winhc/max/compute/graph/job/enterprise_group/entity/EnterpriseGroupAggValue.java

@@ -0,0 +1,66 @@
+package com.winhc.max.compute.graph.job.enterprise_group.entity;
+
+import com.aliyun.odps.io.BooleanWritable;
+import com.aliyun.odps.io.LongWritable;
+import com.aliyun.odps.io.Writable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * @author: XuJiakai
+ * 2023/2/7 14:11
+ */
+public class EnterpriseGroupAggValue implements Writable {
+
+    /**
+     * 为false时表示迭代完毕,可以结束
+     */
+    private BooleanWritable flag;
+
+    /**
+     * 本轮迭代顶点数
+     */
+    private LongWritable num;
+
+    public BooleanWritable getFlag() {
+        return flag;
+    }
+
+    public long getCount(){
+        return this.num.get();
+    }
+
+    public void increment() {
+        increment(1);
+    }
+
+    public void increment(long num) {
+        this.num.set(this.num.get() + num);
+    }
+
+    public void update(boolean flag) {
+        this.flag.set(this.flag.get() || flag);
+    }
+
+
+    public EnterpriseGroupAggValue() {
+        flag = new BooleanWritable(false);
+        this.num = new LongWritable(0);
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        flag.write(out);
+        num.write(out);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        BooleanWritable booleanWritable = new BooleanWritable();
+        LongWritable num = new LongWritable();
+        booleanWritable.readFields(in);
+        num.readFields(in);
+    }
+}

+ 261 - 0
src/main/java/com/winhc/max/compute/graph/job/enterprise_group/entity/EnterpriseGroupVertexValue.java

@@ -0,0 +1,261 @@
+package com.winhc.max.compute.graph.job.enterprise_group.entity;
+
+import com.aliyun.odps.io.*;
+import com.winhc.max.compute.graph.job.enterprise_group.EnterpriseGroupUtils;
+import lombok.experimental.ExtensionMethod;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * @author: XuJiakai
+ * 2023/2/6 14:25
+ * <p>
+ * 企业集团顶点,公司或人
+ */
+@ExtensionMethod({
+        EnterpriseGroupUtils.class
+})
+public class EnterpriseGroupVertexValue implements Writable {
+
+    private Text companyName;
+    /**
+     * 公司类型,1为正常企业,2为分公司,3为合伙,4为机关单位,5为事业单位
+     */
+    private IntWritable companyType;
+    /**
+     * 执行事务合伙人id,数组
+     */
+    private Tuple legalEntityIds;
+
+    /**
+     * 手动停止标记,需要手动拆分集团,第二次计算时才需要
+     */
+    private BooleanWritable manualStop;
+
+    //以上为初始化信息,以下是计算相关信息
+
+
+    /**
+     * 下属控股企业数
+     * 企业集团成员数
+     */
+    private LongWritable holdNum;
+
+    /**
+     * 下属控企业id
+     * 企业集团的id
+     */
+    private Tuple holdCompanyIds;
+
+    /**
+     * 集团对外投资id
+     */
+    private Tuple groupInvestmentCompanyIds;
+
+    /**
+     * 集团股东id
+     */
+    private Tuple groupHolderCompanyIds;
+
+    /**
+     * 集团控制人
+     */
+    private Text groupControllerKeyno;
+
+    /**
+     * 集团股权链
+     */
+    private Text stockRightControlChain;
+
+
+    /**
+     * 为true表示该节点发送过,只需要传递就ok
+     * 0:未处理过,1:已经发送消息,2:集团节点,可以输出
+     */
+    private IntWritable endFlag;
+
+    public void setEndFlag() {
+        endFlag = new IntWritable(1);
+    }
+
+    public boolean isSendMsg() {
+        return endFlag.get() != 0;
+    }
+
+    public void setOutput() {
+        //由于执行顺序问题,只会从1 -> 2,不会从0 -> 2
+        endFlag = new IntWritable(2);
+    }
+
+    public boolean isOutput() {
+        return endFlag.get() == 2;
+    }
+
+
+    public Text getStockRightControlChain() {
+        return stockRightControlChain;
+    }
+
+    public void setStockRightControlChain(Text stockRightControlChain) {
+        this.stockRightControlChain = stockRightControlChain;
+    }
+
+    public Text getCompanyName() {
+        return companyName;
+    }
+
+    public LongWritable getHoldNum() {
+        return holdNum;
+    }
+
+    public Tuple getHoldCompanyIds() {
+        return holdCompanyIds;
+    }
+
+    public Tuple getGroupInvestmentCompanyIds() {
+        return groupInvestmentCompanyIds;
+    }
+
+    public Tuple getGroupHolderCompanyIds() {
+        return groupHolderCompanyIds;
+    }
+
+    public Text getGroupControllerKeyno() {
+        return groupControllerKeyno;
+    }
+
+    public void setHoldNum(long holdNum) {
+        this.holdNum = new LongWritable(holdNum);
+    }
+
+    public void setHoldCompanyIds(List<String> holdCompanyIds) {
+        this.holdCompanyIds = holdCompanyIds.list2Tuple();
+    }
+
+    public void setGroupInvestmentCompanyIds(List<String> groupInvestmentCompanyIds) {
+        this.groupInvestmentCompanyIds = groupInvestmentCompanyIds.list2Tuple();
+    }
+
+    public void setGroupHolderCompanyIds(List<String> groupHolderCompanyIds) {
+        this.groupHolderCompanyIds = groupHolderCompanyIds.list2Tuple();
+    }
+
+    public Tuple getLegalEntityIds() {
+        return legalEntityIds;
+    }
+
+    public void setGroupControllerKeyno(Text groupControllerKeyno) {
+        this.groupControllerKeyno = groupControllerKeyno;
+    }
+
+    public static EnterpriseGroupVertexValue of(String companyName) {
+        return of(companyName, null, null, null);
+    }
+
+    public static EnterpriseGroupVertexValue of(String companyName, List<String> legalEntityIds) {
+        return of(companyName, null, legalEntityIds, null);
+    }
+
+    public static EnterpriseGroupVertexValue of(String companyName, Integer companyType, List<String> legalEntityIds, Boolean manualStop) {
+        EnterpriseGroupVertexValue enterpriseGroupVertexValue = new EnterpriseGroupVertexValue();
+        enterpriseGroupVertexValue.companyName = new Text(companyName);
+        if (companyType != null) {
+            enterpriseGroupVertexValue.companyType = new IntWritable(companyType);
+        } else {
+            enterpriseGroupVertexValue.companyType = new IntWritable(1);
+        }
+
+        if (legalEntityIds != null && !legalEntityIds.isEmpty()) {
+            enterpriseGroupVertexValue.legalEntityIds = new Tuple(legalEntityIds.size());
+            enterpriseGroupVertexValue.companyType = new IntWritable(3);
+            for (String legalEntityId : legalEntityIds) {
+                Text text = new Text(legalEntityId);
+                enterpriseGroupVertexValue.legalEntityIds.append(text);
+            }
+        }
+        if (manualStop != null) {
+            enterpriseGroupVertexValue.manualStop = new BooleanWritable(manualStop);
+        } else {
+            enterpriseGroupVertexValue.manualStop = new BooleanWritable(false);
+        }
+        enterpriseGroupVertexValue.fieldComplete();
+        return enterpriseGroupVertexValue;
+    }
+
+
+    private void fieldComplete() {
+        if (holdNum == null)
+            holdNum = new LongWritable(0);
+
+        if (holdCompanyIds == null)
+            holdCompanyIds = new Tuple();
+
+        if (groupInvestmentCompanyIds == null)
+            groupInvestmentCompanyIds = new Tuple();
+
+        if (groupHolderCompanyIds == null)
+            groupHolderCompanyIds = new Tuple();
+
+        if (groupControllerKeyno == null)
+            groupControllerKeyno = new Text();
+
+        if (endFlag == null)
+            endFlag = new IntWritable(0);
+
+        if (stockRightControlChain == null)
+            stockRightControlChain = new Text();
+
+        if (legalEntityIds == null)
+            legalEntityIds = new Tuple();
+
+        if (legalEntityIds == null) {
+            manualStop = new BooleanWritable();
+        }
+    }
+
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        companyName.write(out);
+        companyType.write(out);
+        legalEntityIds.write(out);
+        holdNum.write(out);
+        holdCompanyIds.write(out);
+        groupInvestmentCompanyIds.write(out);
+        groupHolderCompanyIds.write(out);
+        groupControllerKeyno.write(out);
+        stockRightControlChain.write(out);
+        manualStop.write(out);
+        endFlag.write(out);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        companyName = new Text();
+        companyType = new IntWritable();
+        legalEntityIds = new Tuple();
+        holdNum = new LongWritable();
+        holdCompanyIds = new Tuple();
+        groupInvestmentCompanyIds = new Tuple();
+        groupHolderCompanyIds = new Tuple();
+        groupControllerKeyno = new Text();
+        stockRightControlChain = new Text();
+        manualStop = new BooleanWritable();
+        endFlag = new IntWritable();
+
+        companyName.readFields(in);
+        companyType.readFields(in);
+        legalEntityIds.readFields(in);
+        holdNum.readFields(in);
+        holdCompanyIds.readFields(in);
+        groupInvestmentCompanyIds.readFields(in);
+        groupHolderCompanyIds.readFields(in);
+        groupControllerKeyno.readFields(in);
+        stockRightControlChain.readFields(in);
+        manualStop.readFields(in);
+        endFlag.readFields(in);
+    }
+}

+ 59 - 0
src/main/java/com/winhc/max/compute/graph/job/enterprise_group/entity/HolderEdge.java

@@ -0,0 +1,59 @@
+package com.winhc.max.compute.graph.job.enterprise_group.entity;
+
+import com.aliyun.odps.io.DoubleWritable;
+import com.aliyun.odps.io.IntWritable;
+import com.aliyun.odps.io.Writable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * @author: XuJiakai
+ * 2023/2/7 11:06
+ */
+public class HolderEdge implements Writable {
+
+    /**
+     * 投资比例,小数
+     */
+    private DoubleWritable investmentProportion;
+
+    /**
+     * 股东是个人还是企业
+     * 普通企业:1,国家机关:2,个人:3
+     */
+    private IntWritable holderType;
+
+    public DoubleWritable getInvestmentProportion() {
+        return investmentProportion;
+    }
+
+    public IntWritable getHolderType() {
+        return holderType;
+    }
+
+    public static HolderEdge of(String holderId, Double investmentProportion) {
+        HolderEdge holderEdge = new HolderEdge();
+        holderEdge.holderType = new IntWritable(holderId.length() == 32 ? 1 : 3);
+        holderEdge.investmentProportion = new DoubleWritable(investmentProportion);
+        return holderEdge;
+    }
+
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        investmentProportion.write(out);
+        holderType.write(out);
+
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        investmentProportion = new DoubleWritable();
+        holderType = new IntWritable();
+
+        investmentProportion.readFields(in);
+        holderType.readFields(in);
+    }
+}

+ 40 - 0
src/main/java/com/winhc/max/compute/graph/util/JsonUtils.java

@@ -0,0 +1,40 @@
+package com.winhc.max.compute.graph.util;
+
+import com.google.common.reflect.TypeToken;
+import com.google.gson.Gson;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author: XuJiakai
+ * 2023/2/10 16:38
+ */
+public class JsonUtils {
+
+    public static List<String> parseListStr(Gson gson, String jsonArrayStr) {
+        List<String> list = gson.fromJson(jsonArrayStr, new TypeToken<List<String>>() {
+        }.getType());
+        if (list == null) {
+            return Collections.emptyList();
+        }
+        return list;
+    }
+
+    public static List<Map<String, String>> parseListStrByMap(Gson gson, String jsonArrayStr) {
+        List<Map<String, String>> list = gson.fromJson(jsonArrayStr, new TypeToken<List<Map<String, String>>>() {
+        }.getType());
+        if (list == null) {
+            return Collections.emptyList();
+        }
+        return list;
+    }
+
+
+    public static void main(String[] args) {
+        Gson gson = new Gson();
+        List<Map<String, String>> maps = parseListStrByMap(gson, "[{\"name\":\"汤玲玲\",\"id\":\"p4b9f3db9b6f462e5008497107e853095\",\"type\":1,\"deleted\":0}]");
+        System.out.println(maps);
+    }
+}

+ 25 - 0
src/main/java/com/winhc/max/compute/graph/util/WinhcTuple.java

@@ -0,0 +1,25 @@
+package com.winhc.max.compute.graph.util;
+
+/**
+ * @author: XuJiakai
+ * 2023/2/9 17:04
+ */
+public class WinhcTuple<K, V> {
+    public K _1;
+    public V _2;
+
+    public static <K, V> WinhcTuple<K, V> of(K k, V v) {
+        WinhcTuple<K, V> kvWinhcTuple = new WinhcTuple<>();
+        kvWinhcTuple._1 = k;
+        kvWinhcTuple._2 = v;
+        return kvWinhcTuple;
+    }
+
+    @Override
+    public String toString() {
+        return " " +
+                _1 +
+                ", " + _2
+                ;
+    }
+}

+ 13 - 0
src/main/java/com/winhc/max/compute/graph/util/WritableRecordExtensions.java

@@ -4,6 +4,7 @@ import com.aliyun.odps.io.NullWritable;
 import com.aliyun.odps.io.Text;
 import com.aliyun.odps.io.Writable;
 import com.aliyun.odps.io.WritableRecord;
+import lombok.SneakyThrows;
 
 import java.io.IOException;
 
@@ -12,6 +13,18 @@ import java.io.IOException;
  * 2022/7/11 14:30
  */
 public class WritableRecordExtensions {
+
+
+    @SneakyThrows
+    public static <T extends Writable> T getOrNull(WritableRecord writableRecord, String filedName, Class<T> type) throws IOException {
+        Writable writable = writableRecord.get(filedName);
+        if (writable == null || writable.equals(NullWritable.get())) {
+            return type.newInstance();
+        } else {
+            return ((T) writable);
+        }
+    }
+
     public static Text getTextOrNull(WritableRecord writableRecord, String filedName) throws IOException {
         Writable writable = writableRecord.get(filedName);
         if (writable.equals(NullWritable.get())) {