17 Commits 670f07a9b1 ... 19e7820225

Auteur SHA1 Bericht Datum
  xufei 19e7820225 Merge remote-tracking branch 'origin/master' 3 maanden geleden
  xufei f8add6b33b 初步测试聚合 3 maanden geleden
  许家凯 16aee7dcd9 fix: 修复企业集团节点不存在问题 1 jaar geleden
  许家凯 b48233173b feat: 企业集团添加测试数据加载代码 1 jaar geleden
  许家凯 4dd01af586 feat: 企业集团 1 jaar geleden
  许家凯 29bceadd4b fix: 企业集团 1 jaar geleden
  许家凯 b9bbbec36c feat: 企业集团 1 jaar geleden
  许家凯 5e4ee542e5 fix: 企业集团优化 1 jaar geleden
  许家凯 bde2d5a44d feat: 企业集团优化 1 jaar geleden
  许家凯 28a2984c60 feat: 添加企业集团优化 1 jaar geleden
  许家凯 a9432ea1f7 feat: 添加企业集团 1 jaar geleden
  许家凯 bdc8fbf0a9 feat: 限制company_rank资源上限 1 jaar geleden
  许家凯 03361d7cc3 feat: 限制company_rank资源上限 2 jaren geleden
  xufei 127d2c8ec5 加入参数 2 jaren geleden
  xufei 44481186a3 联通子图计算 2 jaren geleden
  xufei cfe32e7a52 联通子图计算 2 jaren geleden
  许家凯 2445d20afe first commit 2 jaren geleden

+ 2 - 0
src/main/java/com/winhc/max/compute/graph/job/CompanyConnectedComponents.java

@@ -118,9 +118,11 @@ public class CompanyConnectedComponents {
         }
         Integer worker = Integer.parseInt(args[2]);
         Integer cu = Integer.parseInt(args[3]);
+        Integer mem = Integer.parseInt(args[4]);
         GraphJob job = new GraphJob();
 //        job.setNumWorkers(50);
 //        job.setWorkerCPU(100);
+        job.setWorkerMemory(mem);
         job.setNumWorkers(worker);
         job.setWorkerCPU(cu);
         job.setGraphLoaderClass(CCVertexReader.class);

+ 53 - 0
src/main/java/com/winhc/max/compute/graph/job/holder/HolderRelationGroupAggregator.java

@@ -0,0 +1,53 @@
+package com.winhc.max.compute.graph.job.holder;
+
+import com.aliyun.odps.graph.Aggregator;
+import com.aliyun.odps.graph.WorkerContext;
+import com.aliyun.odps.io.BooleanWritable;
+import com.aliyun.odps.io.NullWritable;
+import com.winhc.max.compute.graph.job.holder.entity.HolderRelationGroupAggValue;
+
+import java.io.IOException;
+
+
+public class HolderRelationGroupAggregator extends Aggregator<HolderRelationGroupAggValue> {
+
+    private static Integer run_step;
+
+    @Override
+    public HolderRelationGroupAggValue createStartupValue(WorkerContext context) throws IOException {
+        run_step = context.getConfiguration().getInt("run_step", 1);
+        return super.createStartupValue(context);
+    }
+
+    @Override
+    public HolderRelationGroupAggValue createInitialValue(WorkerContext context) throws IOException {
+        return new HolderRelationGroupAggValue();
+    }
+
+    @Override
+    public void aggregate(HolderRelationGroupAggValue enterpriseGroupAggValue, Object item) throws IOException {
+        if (item instanceof NullWritable) {
+            enterpriseGroupAggValue.increment();
+        } else {
+            BooleanWritable tmp = ((BooleanWritable) item);
+            enterpriseGroupAggValue.update(tmp.get());
+        }
+    }
+
+    @Override
+    public void merge(HolderRelationGroupAggValue enterpriseGroupAggValue, HolderRelationGroupAggValue partial) throws IOException {
+        boolean tmp = partial.getFlag().get();
+        enterpriseGroupAggValue.update(tmp);
+        enterpriseGroupAggValue.increment(partial.getCount());
+    }
+
+    @Override
+    public boolean terminate(WorkerContext context, HolderRelationGroupAggValue holderRelationGroupAggValue) throws IOException {
+        System.out.println("step: " + context.getSuperstep() + ", iterations node count: " + holderRelationGroupAggValue.getCount() + " ,flag=" + holderRelationGroupAggValue.getFlag().get());
+        if (context.getSuperstep() >= run_step) {
+            System.out.println("\t pass step " + context.getSuperstep() + "...success");
+            return true;
+        }
+        return !holderRelationGroupAggValue.getFlag().get();
+    }
+}

+ 81 - 0
src/main/java/com/winhc/max/compute/graph/job/holder/HolderRelationGroupJob.java

@@ -0,0 +1,81 @@
+package com.winhc.max.compute.graph.job.holder;
+
+import com.aliyun.odps.data.TableInfo;
+import com.aliyun.odps.graph.GraphJob;
+import com.aliyun.odps.graph.RemoveDuplicatesLoadingResolver;
+import com.winhc.max.compute.graph.job.pagerank.CompanyComputingVertexResolver;
+import com.winhc.max.compute.graph.util.ParameterTool;
+
+import java.io.IOException;
+
+/**
+ * @author π
+ * @Description:个人关系聚合
+ * @date 2024/5/28 16:25
+ */
+public class HolderRelationGroupJob {
+
+
+    private static final String defaultInputVertexTab;
+    private static final String defaultOutputTab;
+
+    static {
+        defaultInputVertexTab = "tmp_xf_holder_relation_in";
+        defaultOutputTab = "tmp_xf_holder_relation_out";
+    }
+
+
+    public static void main(String[] args) throws IOException {
+
+        ParameterTool parameterTool = ParameterTool.fromArgs(args);
+
+        String numWorkers = parameterTool.getOrDefault("numWorkers", "70");
+        String workerCPU = parameterTool.getOrDefault("workerCPU", "2");
+        String workerMem = parameterTool.getOrDefault("workerMem", "8192");
+
+        String inputVertexTab = parameterTool.getOrDefault("input", defaultInputVertexTab);
+        String outputParentTab = parameterTool.getOrDefault("output", defaultOutputTab);
+        String run_step = parameterTool.getOrDefault("run_step", "1");
+        String node_limit = parameterTool.getOrDefault("node_limit", "100");
+        String jvm_ratio = parameterTool.getOrDefault("jvm_ratio", "0.7");
+
+        System.out.println("input args: " + String.join(" ", args));
+        GraphJob job = new GraphJob();
+
+        job.setNumWorkers(Integer.parseInt(numWorkers));
+        if (parameterTool.has("workerCPU")) {
+            job.setWorkerCPU(Integer.parseInt(workerCPU) * 100);
+        }
+        if (parameterTool.has("workerMem")) {
+            job.setWorkerMemory(Integer.parseInt(workerMem));
+        }
+
+        job.setGraphLoaderClass(HolderRelationGroupReader.class);
+        job.setVertexClass(HolderRelationGroupVertex.class);
+
+        job.setLoadingVertexResolver(RemoveDuplicatesLoadingResolver.class);
+        job.setComputingVertexResolver(CompanyComputingVertexResolver.class);
+        job.setAggregatorClass(HolderRelationGroupAggregator.class);
+
+        job.addInput(TableInfo.builder()
+                .projectName("winhc_ng")
+                .tableName(inputVertexTab)
+                .build());
+
+        job.addOutput(TableInfo.builder()
+                .projectName("winhc_ng")
+                .tableName(outputParentTab)
+                .build());
+
+        job.set("run_step", run_step);
+        job.set("node_limit", node_limit);
+        job.set("odps.graph.jvm.xms.ratio", jvm_ratio);
+
+        long startTime = System.currentTimeMillis();
+        job.run();
+        System.out.println("Job Finished in "
+                + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
+
+
+    }
+}

+ 86 - 0
src/main/java/com/winhc/max/compute/graph/job/holder/HolderRelationGroupReader.java

@@ -0,0 +1,86 @@
+package com.winhc.max.compute.graph.job.holder;
+
+import com.aliyun.odps.conf.Configuration;
+import com.aliyun.odps.data.TableInfo;
+import com.aliyun.odps.graph.Edge;
+import com.aliyun.odps.graph.GraphLoader;
+import com.aliyun.odps.graph.MutationContext;
+import com.aliyun.odps.io.LongWritable;
+import com.aliyun.odps.io.Text;
+import com.aliyun.odps.io.WritableRecord;
+import com.aliyun.odps.utils.StringUtils;
+import com.winhc.max.compute.graph.job.holder.entity.HolderRelationGroupEdge;
+import com.winhc.max.compute.graph.job.holder.entity.HolderRelationGroupMsg;
+import com.winhc.max.compute.graph.job.holder.entity.HolderRelationGroupVertexValue;
+import com.winhc.max.compute.graph.util.BaseUtils;
+import com.winhc.max.compute.graph.util.JsonUtils;
+import com.winhc.max.compute.graph.util.WritableRecordExtensions;
+import lombok.experimental.ExtensionMethod;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+/**
+ * @author π
+ * @Description:
+ * @date 2024/5/29 9:28
+ */
+
+@ExtensionMethod({
+        JsonUtils.class
+        , WritableRecordExtensions.class
+        , BaseUtils.class
+})
+public class HolderRelationGroupReader extends
+        GraphLoader<Text, HolderRelationGroupVertexValue, HolderRelationGroupEdge, HolderRelationGroupMsg> {
+
+
+    @Override
+    public void setup(Configuration conf, int workerId, TableInfo tableInfo, MutationContext<Text, HolderRelationGroupVertexValue, HolderRelationGroupEdge, HolderRelationGroupMsg> context) throws IOException {
+        super.setup(conf, workerId, tableInfo, context);
+    }
+
+    @Override
+    public void load(LongWritable recordNum, WritableRecord record, MutationContext<Text, HolderRelationGroupVertexValue, HolderRelationGroupEdge, HolderRelationGroupMsg> context) throws IOException {
+
+
+        Text companyId = record.getTextOrNull("company_id");
+        Text companyName = record.getTextOrNull("company_name");
+
+
+        Text holderId = record.getTextOrNull("holder_id");
+        Text holderName = record.getTextOrNull("holder_name");
+
+        //加载点起始点
+        HolderRelationGroupVertex holderVertex = new HolderRelationGroupVertex();
+        holderVertex.setId(companyId);
+        holderVertex.setValue(HolderRelationGroupVertexValue.of(
+                companyName
+                /*, Arrays.asList(companyId.toString(), holderId.toString())*/
+                /*, Collections.singletonList(
+                        String.join(",", companyId.toString(), holderId.toString()))*/
+                , Arrays.asList(holderId.toString())
+        ));
+        context.addVertexRequest(holderVertex);
+
+
+//        //加载目标点
+//        HolderRelationGroupVertex holderVertexDest = new HolderRelationGroupVertex();
+//        holderVertexDest.setId(holderId);
+//        holderVertexDest.setValue(HolderRelationGroupVertexValue.of(
+//                holderName
+//                , Arrays.asList(companyId.toString())
+//        ));
+//        context.addVertexRequest(holderVertexDest);
+
+        //加载边
+        if(StringUtils.isNotBlank(holderId.toString())){
+            HolderRelationGroupEdge edgeValue = HolderRelationGroupEdge.of(holderId.toString(), 1.0);
+            Edge<Text, HolderRelationGroupEdge> edge = new Edge<Text, HolderRelationGroupEdge>(
+                    holderId, edgeValue);
+            context.addEdgeRequest(companyId, edge);
+        }
+
+
+    }
+}

+ 280 - 0
src/main/java/com/winhc/max/compute/graph/job/holder/HolderRelationGroupVertex.java

@@ -0,0 +1,280 @@
+package com.winhc.max.compute.graph.job.holder;
+
+import com.aliyun.odps.graph.ComputeContext;
+import com.aliyun.odps.graph.Edge;
+import com.aliyun.odps.graph.Vertex;
+import com.aliyun.odps.graph.WorkerContext;
+import com.aliyun.odps.io.*;
+import com.aliyun.odps.utils.StringUtils;
+import com.google.common.collect.Sets;
+import com.google.gson.Gson;
+import com.winhc.max.compute.graph.job.holder.entity.*;
+import com.winhc.max.compute.graph.util.WritableRecordExtensions;
+import lombok.experimental.ExtensionMethod;
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.stream.Collectors;
+
+import static com.winhc.max.compute.graph.util.BaseUtils.isWindows;
+
+
+@ExtensionMethod({
+        WritableRecordExtensions.class
+})
+@Slf4j
+public class HolderRelationGroupVertex extends
+        Vertex<Text, HolderRelationGroupVertexValue, HolderRelationGroupEdge, HolderRelationGroupMsg> {
+
+    private static Integer node_limit;
+
+
+    @Override
+    public void setup(WorkerContext<Text, HolderRelationGroupVertexValue, HolderRelationGroupEdge, HolderRelationGroupMsg> context) throws IOException {
+        node_limit = context.getConfiguration().getInt("node_limit", 100);
+    }
+
+    @Override
+    public void compute(ComputeContext<Text, HolderRelationGroupVertexValue, HolderRelationGroupEdge, HolderRelationGroupMsg> context, Iterable<HolderRelationGroupMsg> messages) throws IOException {
+
+        context.aggregate(NullWritable.get());
+
+        List<Edge<Text, HolderRelationGroupEdge>> edges = getEdges();
+
+        String currentVertexId = getId().toString();
+
+        //初始化链路
+        VertexCalcInfo info = VertexCalcInfo.getVertexCalcInfo(getId().toString(), getValue(), edges);
+
+        //消息传递
+        if (getValue().isSendMsg()) {
+            for (String destVertexId : info.getCompanyIds()) {
+                if (destVertexId.equals(currentVertexId)) {
+                    continue;
+                }
+                if (StringUtils.isNotBlank(currentVertexId) && StringUtils.isNotBlank(destVertexId)) {
+
+                    HolderRelationGroupMsg msg = HolderRelationGroupMsg.of(currentVertexId, destVertexId, info.getExtendChains()
+                            , info.getCompanyIds());
+                    context.sendMessage(new Text(destVertexId), msg);
+                    context.aggregate(new BooleanWritable(true));
+                }
+            }
+            //todo 判断是否有新增传递和达到层数限制
+            getValue().setEndFlag();
+        }
+
+        HolderRelationGroupVertexValue currentVertexValue = getValue();
+
+
+        if (info.getCompanyIds().size() <= node_limit) {
+            //消息合并
+            mergeMsgChain(currentVertexValue, currentVertexId, info.getExtendChains(), messages);
+
+            List<Set<String>> currentExtendChains = tuple2List(currentVertexValue.getExtendChains());
+            List<String> currentCompanyIds = tuple2List2(currentVertexValue.getCompanyIds());
+            //List<String> currentHolderChains = tuple2List2(currentVertexValue.getHolderChains());
+
+            if (currentExtendChains.size() > 0) {
+            //发送相邻节点消息
+            List<String> res = currentExtendChains.get(0).stream()
+                    .distinct()
+                    .filter(Objects::nonNull)
+                    .filter(e -> !e.equals(currentVertexId)).collect(Collectors.toList());
+            for (String destVertexId : res) {
+                if (StringUtils.isNotBlank(currentVertexId) && StringUtils.isNotBlank(destVertexId)) {
+                    HolderRelationGroupMsg msg = HolderRelationGroupMsg.of(currentVertexId, destVertexId, currentExtendChains
+                            , new HashSet<>(currentCompanyIds));
+                    context.sendMessage(new Text(destVertexId), msg);
+                    context.aggregate(new BooleanWritable(true));
+                }
+            }
+             }
+        }
+
+        voteToHalt();
+
+    }
+
+    /**
+     * 链路合并
+     */
+    public static void mergeMsgChain(HolderRelationGroupVertexValue currentVertexValue, String currentVertexId, List<Set<String>> currentExtendChains, Iterable<HolderRelationGroupMsg> messages) {
+
+        //合并消息
+        for (HolderRelationGroupMsg message : messages) {
+            if (message == null || message.getSendVertexId() == null) {
+                continue;
+            }
+
+            String sendVertexId = message.getSendVertexId().toString();
+
+            List<Set<String>> messageExtendChains = tuple2List(message.getExtendChains());
+            if(isWindows()){
+                System.out.println(currentVertexId + "----" + currentExtendChains.toString() + "----" + sendVertexId + "----" + messageExtendChains.toString());
+            }
+
+            List<String> currentIds = tuple2List2(currentVertexValue.getCompanyIds());
+            List<String> messageIds = tuple2List2(message.getCompanyIds());
+
+            //获取id集合
+
+            Set<String> difference = Sets.difference(new HashSet<>(messageIds), new HashSet<>(currentIds));
+            //判断是否更新 重合则过滤
+            if (difference.isEmpty()) {
+                continue;
+            }
+
+            for (int i = 0; i < messageExtendChains.size(); i++) {
+                Set<String> messageHead = messageExtendChains.get(0);
+
+                if (!messageHead.contains(currentVertexId)) {
+                    break;
+                }
+                //加入发送消息顶点
+                if (i == 0) {
+                    if(currentExtendChains.size() == 0){
+                        currentExtendChains.add(new HashSet<>(Collections.singleton(sendVertexId)));
+                    }else {
+                        currentExtendChains.get(0).add(sendVertexId);
+                    }
+                }
+
+                //加入其他层数节点
+                Set<String> collect = messageExtendChains.get(i)
+                        .stream()
+                        .filter(s -> !s.equals(currentVertexId) /*&& !processIds.contains(s)*/)
+                        .collect(Collectors.toSet());
+
+                int size = currentExtendChains.size();
+
+                if (collect.size() > 0) {
+
+                    if (size - 1 > i) {
+                        currentExtendChains.get(i + 1).addAll(collect);
+                    } else {
+                        currentExtendChains.add(collect);
+                    }
+
+                }
+
+                if(isWindows()){
+                    System.out.println("merge after ----" + currentVertexId + "---" + currentExtendChains);
+                }
+            }
+
+            //重复节点合并取最短路径
+            List<Set<String>> merge_set_all = new ArrayList<>();
+            Set<String> tmp_set_all = new HashSet<>();
+
+            for (Set<String> currentExtendChain : currentExtendChains) {
+                Set<String> tmp_set_inner = new HashSet<>();
+                for (String id : currentExtendChain) {
+                    if (tmp_set_all.contains(id)) {
+                        continue;
+                    }
+                    tmp_set_inner.add(id);
+                }
+                if (tmp_set_inner.size() > 0) {
+                    merge_set_all.add(tmp_set_inner);
+                    tmp_set_all.addAll(tmp_set_inner);
+                }
+            }
+
+            if(isWindows()){
+                System.out.println("路径优化: " + currentVertexId + "----" + currentExtendChains.toString() + "----" + merge_set_all.toString());
+            }
+            //转换
+            currentExtendChains.clear();
+            currentExtendChains.addAll(merge_set_all);
+
+        }
+
+        List<String> ids = currentExtendChains
+                .stream().flatMap(Collection::stream)
+                .distinct().collect(Collectors.toList());
+        ids.add(currentVertexId);
+
+        //顶点 todo
+        //currentVertexValue.setCompanyIds(list2Tuple2(ids));
+
+        //关系
+        currentVertexValue.setExtendChains(list2Tuple(currentExtendChains));
+
+
+    }
+
+    public static List<Set<String>> tuple2List(Tuple tuple) {
+        if (tuple == null) {
+            return new ArrayList<>();
+        }
+        List<Set<String>> set = tuple.getAll().stream().filter(Objects::nonNull)
+                .map(e -> ((Tuple) e).getAll().stream().filter(Objects::nonNull)
+                        .map(e1 -> ((Text) e1).toString()).collect(Collectors.toSet()))
+                .collect(Collectors.toList());
+        return set;
+    }
+
+    public static List<String> tuple2List2(Tuple tuple) {
+        if (tuple == null) {
+            return new ArrayList<>();
+        }
+        List<String> collect = tuple.getAll().stream().filter(Objects::nonNull).map(Object::toString).distinct().collect(Collectors.toList());
+        return collect;
+    }
+
+    public static Tuple list2Tuple(List<Set<String>> list) {
+        if (list == null || list.isEmpty()) {
+            return new Tuple();
+        }
+        Tuple out = new Tuple();
+        list.forEach(e -> {
+            Tuple out2 = new Tuple();
+            for (String ele : e) {
+                out2.append(new Text(ele));
+            }
+            out.append(out2);
+        });
+        return out;
+    }
+
+    public static Tuple list2Tuple2(List<String> list) {
+        if (list == null || list.isEmpty()) {
+            return new Tuple();
+        }
+        Tuple out = new Tuple();
+        list.forEach(e -> {
+            out.append(new Text(e));
+        });
+        return out;
+    }
+
+    @Override
+    public void cleanup(WorkerContext<Text, HolderRelationGroupVertexValue, HolderRelationGroupEdge, HolderRelationGroupMsg> context) throws IOException {
+
+        //todo 判断是否完成
+
+        //输出
+        Text current_id = getId();
+        Text company_name = getValue().getCompanyName();
+
+        Tuple companyIds = getValue().getCompanyIds();
+
+        Tuple extendChains = getValue().getExtendChains();
+
+        String ids = companyIds.getAll().stream().map(Object::toString)
+                .sorted(Comparator.naturalOrder())
+                .collect(Collectors.joining(","));
+
+        List<Set<String>> sets = tuple2List(extendChains);
+
+        //合并输出
+        context.write(current_id
+                , company_name
+                , new Text(ids)
+                , new Text(new Gson().toJson(sets))
+        );
+
+    }
+}

+ 75 - 0
src/main/java/com/winhc/max/compute/graph/job/holder/entity/HolderRelationGroupAggValue.java

@@ -0,0 +1,75 @@
+package com.winhc.max.compute.graph.job.holder.entity;
+
+import com.aliyun.odps.io.BooleanWritable;
+import com.aliyun.odps.io.LongWritable;
+import com.aliyun.odps.io.Writable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+
+public class HolderRelationGroupAggValue implements Writable {
+
+    /**
+     * 为false时表示迭代完毕,可以结束
+     */
+    private BooleanWritable flag;
+
+    /**
+     * 本轮迭代顶点数
+     */
+    private LongWritable num;
+
+    public BooleanWritable getFlag() {
+        return flag;
+    }
+
+    public long getCount() {
+        return this.num.get();
+    }
+
+    public void increment() {
+        increment(1);
+    }
+
+    public void increment(long num) {
+        this.num.set(this.num.get() + num);
+    }
+
+    public void update(boolean flag) {
+        this.flag.set(this.flag.get() || flag);
+    }
+
+
+    public HolderRelationGroupAggValue() {
+        flag = new BooleanWritable(false);
+        this.num = new LongWritable(0);
+    }
+
+    @Override
+    public String toString() {
+        return "EnterpriseGroupAggValue{" +
+                "flag=" + flag +
+                ", num=" + num +
+                '}';
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        flag.write(out);
+        num.write(out);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        if (flag == null) {
+            flag = new BooleanWritable();
+        }
+        if (num == null) {
+            num = new LongWritable();
+        }
+        flag.readFields(in);
+        num.readFields(in);
+    }
+}

+ 63 - 0
src/main/java/com/winhc/max/compute/graph/job/holder/entity/HolderRelationGroupEdge.java

@@ -0,0 +1,63 @@
+package com.winhc.max.compute.graph.job.holder.entity;
+
+import com.aliyun.odps.io.DoubleWritable;
+import com.aliyun.odps.io.IntWritable;
+import com.aliyun.odps.io.Writable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+
+public class HolderRelationGroupEdge implements Writable {
+
+    /**
+     * 投资比例,小数
+     */
+    private DoubleWritable investmentProportion;
+
+    /**
+     * 股东是个人还是企业
+     * 普通企业:1,国家机关:2,个人:3,节点不存在企业:4
+     */
+    private IntWritable holderType;
+
+    public DoubleWritable getInvestmentProportion() {
+        return investmentProportion;
+    }
+
+    public IntWritable getHolderType() {
+        return holderType;
+    }
+
+    public static HolderRelationGroupEdge of(String holderId, Double investmentProportion) {
+        HolderRelationGroupEdge holderEdge = new HolderRelationGroupEdge();
+        holderEdge.holderType = new IntWritable(holderId.length() == 32 ? 1 : 3);
+        holderEdge.investmentProportion = new DoubleWritable(investmentProportion);
+        return holderEdge;
+    }
+
+    public static HolderRelationGroupEdge of(Integer holderType, Double investmentProportion) {
+        HolderRelationGroupEdge holderEdge = new HolderRelationGroupEdge();
+        holderEdge.holderType = new IntWritable(holderType);
+        holderEdge.investmentProportion = new DoubleWritable(investmentProportion);
+        return holderEdge;
+    }
+
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        investmentProportion.write(out);
+        holderType.write(out);
+
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        investmentProportion = new DoubleWritable();
+        holderType = new IntWritable();
+
+        investmentProportion.readFields(in);
+        holderType.readFields(in);
+    }
+}

+ 187 - 0
src/main/java/com/winhc/max/compute/graph/job/holder/entity/HolderRelationGroupMsg.java

@@ -0,0 +1,187 @@
+package com.winhc.max.compute.graph.job.holder.entity;
+
+import com.aliyun.odps.io.*;
+import com.winhc.max.compute.graph.job.enterprise_group.EnterpriseGroupUtils;
+import com.winhc.max.compute.graph.job.enterprise_group.EnterpriseGroupVertex;
+import lombok.experimental.ExtensionMethod;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.*;
+import java.util.stream.Collectors;
+
+import static com.winhc.max.compute.graph.job.holder.HolderRelationGroupVertex.list2Tuple2;
+
+
+public class HolderRelationGroupMsg implements Writable {
+
+//    private Tuple holderChains;
+
+//    private Tuple companyIds;
+
+    /**
+     * 外部扩展id
+     */
+    private Tuple extendChains;
+
+    /**
+     * 途径id记录,防止陷入循环
+     */
+    private Tuple routeLog;
+
+    /**
+     * 发送消息id
+     */
+    private Text sendVertexId;
+
+
+    /**
+     * 接收消息id
+     */
+    private Text receiveVertexId;
+
+
+    /**
+     * 记录并测试是否重复
+     *
+     * @param vertexId
+     * @return false表示该节点已经走过。
+     */
+    public boolean routeLog(Text vertexId) {
+        return routeLog(vertexId.toString());
+    }
+
+    public boolean routeLog(String vertexId) {
+        long count = routeLog.getAll().stream().map(Object::toString).filter(e -> e.equals(vertexId)).count();
+
+        if (count > 0) {
+//            System.out.println("陷入循环," + String.join(",", routeLog.tuple2List()));
+            return false;
+        }
+        routeLog.append(new Text(vertexId));
+        return true;
+
+    }
+
+    public static HolderRelationGroupMsg of(String sendVertexId, String receiveVertexId, List<Set<String>> extendChains,
+                                            HashSet<String> companyIds) {
+        HolderRelationGroupMsg holderRelationGroupMsg = new HolderRelationGroupMsg();
+        //holderRelationGroupMsg.companyIds = new Tuple();
+
+//        for (String id : companyIds) {
+//            holderRelationGroupMsg.companyIds.append(new Text(id));
+//        }
+
+
+
+        holderRelationGroupMsg.sendVertexId = new Text(sendVertexId);
+        holderRelationGroupMsg.receiveVertexId = new Text(receiveVertexId);
+
+        //扩展集合
+        holderRelationGroupMsg.extendChains = new Tuple();
+        for (Set<String> ex : extendChains) {
+            Tuple tuple2 = new Tuple();
+            for (String e : ex) {
+                tuple2.append(new Text(e));
+            }
+            holderRelationGroupMsg.extendChains.append(tuple2);
+        }
+
+        return holderRelationGroupMsg;
+    }
+
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        //companyIds.write(out);
+
+        //holderChains.write(out);
+        extendChains.write(out);
+
+        sendVertexId.write(out);
+        receiveVertexId.write(out);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        //companyIds = new Tuple();
+
+        //holderChains = new Tuple();
+        extendChains = new Tuple();
+
+        //companyIds.readFields(in);
+        //holderChains.readFields(in);
+        extendChains.readFields(in);
+
+
+        sendVertexId = new Text();
+        receiveVertexId = new Text();
+
+        sendVertexId.readFields(in);
+        receiveVertexId.readFields(in);
+    }
+
+//    public Tuple getHolderChains() {
+//        return holderChains;
+//    }
+//
+//    public void setHolderChains(Tuple holderChains) {
+//        this.holderChains = holderChains;
+//    }
+
+//    public Tuple getCompanyIds() {
+//
+//        return companyIds;
+//    }
+
+    public Tuple getCompanyIds() {
+        if (extendChains == null) {
+            return new Tuple();
+        }
+        List<String> collect = extendChains.getAll().stream()
+                .filter(Objects::nonNull).flatMap(xx->{
+                    return  ((Tuple)xx).getAll().stream().filter(Objects::nonNull).map(e -> ((Text) e).toString());
+                })
+                .distinct().collect(Collectors.toList());
+        Tuple res = list2Tuple2(collect);
+
+        return res;
+    }
+
+//    public void setCompanyIds(Tuple companyIds) {
+//        this.companyIds = companyIds;
+//    }
+
+    public Tuple getRouteLog() {
+        return routeLog;
+    }
+
+    public void setRouteLog(Tuple routeLog) {
+        this.routeLog = routeLog;
+    }
+
+    public Text getSendVertexId() {
+        return sendVertexId;
+    }
+
+    public void setSendVertexId(Text sendVertexId) {
+        this.sendVertexId = sendVertexId;
+    }
+
+    public Text getReceiveVertexId() {
+        return receiveVertexId;
+    }
+
+    public void setReceiveVertexId(Text receiveVertexId) {
+        this.receiveVertexId = receiveVertexId;
+    }
+
+    public Tuple getExtendChains() {
+        return extendChains;
+    }
+
+    public void setExtendChains(Tuple extendChains) {
+        this.extendChains = extendChains;
+    }
+}

+ 159 - 0
src/main/java/com/winhc/max/compute/graph/job/holder/entity/HolderRelationGroupVertexValue.java

@@ -0,0 +1,159 @@
+package com.winhc.max.compute.graph.job.holder.entity;
+
+import com.aliyun.odps.io.IntWritable;
+import com.aliyun.odps.io.Text;
+import com.aliyun.odps.io.Tuple;
+import com.aliyun.odps.io.Writable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static com.winhc.max.compute.graph.job.holder.HolderRelationGroupVertex.list2Tuple2;
+
+
+public class HolderRelationGroupVertexValue implements Writable {
+
+    private Text companyName;
+
+//    private Tuple companyIds;
+
+//    private Tuple holderChains;
+
+    /**
+     * 外部扩展id
+     */
+    private Tuple extendChains;
+
+    /**
+     * 为true表示该节点发送过,只需要传递就ok
+     * 0:未处理过,1:已经发送消息
+     */
+    private IntWritable endFlag;
+
+    public boolean isSendMsg() {
+        return endFlag.get() == 0;
+    }
+
+    public static HolderRelationGroupVertexValue of(Text companyName/*, List<String> companyIds*/
+            , List<String> extendChains) {
+        HolderRelationGroupVertexValue hh = new HolderRelationGroupVertexValue();
+
+        hh.setCompanyName(companyName);
+//        if (companyIds != null && companyIds.size() > 0) {
+//            //hh.companyIds = new Tuple();
+//            for (String id : companyIds) {
+//                Text text = new Text(id);
+//                //hh.companyIds.append(text);
+//            }
+//        }
+
+
+
+        if (hh.endFlag == null) {
+            hh.endFlag = new IntWritable(0);
+        }
+
+        //初始化关系
+        if (extendChains != null && extendChains.size() > 0) {
+            hh.extendChains = new Tuple();
+            Tuple tp2 = new Tuple();
+            for (String cc : extendChains) {
+                tp2.append(new Text(cc));
+            }
+            hh.extendChains.append(tp2);
+        }
+        return hh;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        companyName.write(out);
+
+        //companyIds.write(out);
+
+        extendChains.write(out);
+        endFlag.write(out);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+
+        companyName = new Text();
+
+        //companyIds = new Tuple();
+
+        extendChains = new Tuple();
+        endFlag = new IntWritable();
+
+
+        companyName.readFields(in);
+
+        //companyIds.readFields(in);
+
+        extendChains.readFields(in);
+        endFlag.readFields(in);
+    }
+
+
+    public Text getCompanyName() {
+        return companyName;
+    }
+
+    public void setCompanyName(Text companyName) {
+        this.companyName = companyName;
+    }
+
+/*    public Tuple getCompanyIds() {
+        return companyIds;
+    }*/
+    public Tuple getCompanyIds() {
+        if (extendChains == null) {
+            return new Tuple();
+        }
+        List<String> collect = extendChains.getAll().stream()
+                .filter(Objects::nonNull).flatMap(xx->{
+                   return  ((Tuple)xx).getAll().stream().filter(Objects::nonNull).map(e -> ((Text) e).toString());
+                })
+                .distinct().collect(Collectors.toList());
+        Tuple res = list2Tuple2(collect);
+
+        return res;
+    }
+
+
+//    public void setCompanyIds(Tuple companyIds) {
+//        this.companyIds = companyIds;
+//    }
+
+//    public Tuple getHolderChains() {
+//        return holderChains;
+//    }
+//
+//    public void setHolderChains(Tuple holderChains) {
+//        this.holderChains = holderChains;
+//    }
+
+    public IntWritable getEndFlag() {
+        return endFlag;
+    }
+
+    public void setEndFlag() {
+        endFlag = new IntWritable(1);
+    }
+
+    public Tuple getExtendChains() {
+        return extendChains;
+    }
+
+    public void setExtendChains(Tuple extendChains) {
+        this.extendChains = extendChains;
+    }
+
+    public void setEndFlag(IntWritable endFlag) {
+        this.endFlag = endFlag;
+    }
+}

+ 108 - 0
src/main/java/com/winhc/max/compute/graph/job/holder/entity/VertexCalcInfo.java

@@ -0,0 +1,108 @@
+package com.winhc.max.compute.graph.job.holder.entity;
+
+import com.aliyun.odps.graph.Edge;
+import com.aliyun.odps.io.Text;
+
+import java.util.*;
+
+import static com.winhc.max.compute.graph.job.holder.HolderRelationGroupVertex.tuple2List;
+
+/**
+ * @author π
+ * @Description:中转信息
+ * @date 2024/5/29 17:00
+ */
+public class VertexCalcInfo {
+
+
+    /**
+     * 股权扩展集合
+     */
+    private List<Set<String>> extendChains = new ArrayList<>();
+
+    /**
+     * 当前企业id
+     */
+    private String currentVertexId;
+
+    /**
+     * 涉及企业id
+     */
+    //private HashSet<String> companyIds = new HashSet<>();
+
+
+    public static VertexCalcInfo getVertexCalcInfo(String currentVertexId, HolderRelationGroupVertexValue currentVertexValue, List<Edge<Text, HolderRelationGroupEdge>> edges) {
+        VertexCalcInfo vc = new VertexCalcInfo();
+
+        if (currentVertexId != null) {
+            vc.currentVertexId = currentVertexId;
+            //vc.companyIds.add(currentVertexId);
+        }
+        if (edges == null || edges.isEmpty()) {
+            return vc;
+        }
+
+
+
+        //扩展链路
+        Set<String> set = new HashSet<>();
+        edges.forEach(edge -> {
+            set.add(edge.getDestVertexId().toString());
+        });
+
+        List<Set<String>> current_chanis = tuple2List(currentVertexValue.getExtendChains());
+        if (current_chanis.isEmpty()) {
+            vc.extendChains.add(set);
+        } else {
+            //替换
+            vc.extendChains = current_chanis;
+            vc.extendChains.get(0).addAll(set);
+        }
+        //vc.companyIds.addAll(set);
+        return vc;
+    }
+
+
+
+   /* public HashSet<String> getCompanyIds() {
+        return companyIds;
+    }*/
+
+    /**
+     * 关联id提取
+     * @return
+     */
+    public HashSet<String> getCompanyIds() {
+        HashSet<String> resSet = new HashSet<>();
+        if(currentVertexId != null) {
+            resSet.add(currentVertexId);
+        }
+        if(extendChains != null) {
+            extendChains.forEach(resSet::addAll);
+        }
+        return resSet;
+    }
+
+//    public void setCompanyIds(HashSet<String> companyIds) {
+//        this.companyIds = companyIds;
+//    }
+
+
+    public String getCurrentVertexId() {
+        return currentVertexId;
+    }
+
+    public void setCurrentVertexId(String currentVertexId) {
+        this.currentVertexId = currentVertexId;
+    }
+
+
+    public List<Set<String>> getExtendChains() {
+        return extendChains;
+    }
+
+    public void setExtendChains(List<Set<String>> extendChains) {
+        this.extendChains = extendChains;
+    }
+}
+