Browse Source

联通子图计算

xufei 2 years ago
parent
commit
cfe32e7a52

+ 133 - 0
src/main/java/com/winhc/max/compute/graph/job/CompanyConnectedComponents.java

@@ -0,0 +1,133 @@
+package com.winhc.max.compute.graph.job;
+
+import com.aliyun.odps.data.TableInfo;
+import com.aliyun.odps.graph.*;
+import com.aliyun.odps.io.*;
+import com.aliyun.odps.utils.StringUtils;
+
+import java.io.IOException;
+import java.util.Arrays;
+
+/**
+ * π
+ * 联通子图计算
+ */
+public class CompanyConnectedComponents {
+    public static class CCVertex extends
+            Vertex<LongWritable, LongWritable, NullWritable, LongWritable> {
+        @Override
+        public void compute(
+                ComputeContext<LongWritable, LongWritable, NullWritable, LongWritable> context,
+                Iterable<LongWritable> msgs) throws IOException {
+            if (context.getSuperstep() == 0L) {
+                this.setValue(getId());
+                context.sendMessageToNeighbors(this, getValue());
+                return;
+            }
+            long minID = Long.MAX_VALUE;
+            for (LongWritable id : msgs) {
+                if (id.get() < minID) {
+                    minID = id.get();
+                }
+            }
+            if (minID < this.getValue().get()) {
+                this.setValue(new LongWritable(minID));
+                context.sendMessageToNeighbors(this, getValue());
+            } else {
+                this.voteToHalt();
+            }
+        }
+
+
+        @Override
+        public void cleanup(
+                WorkerContext<LongWritable, LongWritable, NullWritable, LongWritable> context)
+                throws IOException {
+            context.write(getId(), getValue());
+        }
+    }
+
+
+    public static class CCVertexReader extends
+            GraphLoader<LongWritable, LongWritable, NullWritable, LongWritable> {
+        @Override
+        public void load(
+                LongWritable recordNum,
+                WritableRecord record,
+                MutationContext<LongWritable, LongWritable, NullWritable, LongWritable> context)
+                throws IOException {
+            CCVertex vertex = new CCVertex();
+            vertex.setId((LongWritable) record.get(0));
+            Writable es = record.get(1);
+            if (es != null) {
+                String[] split = es.toString().split(",");
+                Arrays.stream(split).filter(StringUtils::isNotBlank).forEach(
+                        x -> {
+                            long destID = Long.parseLong(x);
+                            vertex.addEdge(new LongWritable(destID), NullWritable.get());
+                        }
+                );
+            }
+            context.addVertexRequest(vertex);
+        }
+    }
+
+    public static class MinLongCombiner extends Combiner<LongWritable, LongWritable> {
+
+        @Override
+        public void combine(LongWritable vertexId, LongWritable combinedMessage,
+                            LongWritable messageToCombine) throws IOException {
+            if (combinedMessage.get() > messageToCombine.get()) {
+                combinedMessage.set(messageToCombine.get());
+            }
+        }
+
+    }
+
+    public static class CompanyComputingVertexResolver<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> extends DefaultComputingVertexResolver {
+
+        @Override
+        public Vertex resolveNotExistVertexMutations(WritableComparable vertexId, VertexChanges vertexChanges, boolean hasMessages) throws IOException {
+
+            /**
+             * 1. If creation of vertex desired, pick first vertex.
+             */
+            Vertex<I, V, E, M> vertex = addVertexIfDesired(vertexId, vertexChanges);
+
+            /** 2. If edge addition, add the edges */
+            addEdges(vertexId, vertex, vertexChanges);
+
+            /** 3. If the vertex exists, first prune the edges. */
+            removeEdges(vertexId, vertex, vertexChanges);
+
+            /** 4. If vertex removal desired, remove the vertex. */
+            vertex = removeVertexIfDesired(vertexId, vertex, vertexChanges);
+
+            /** 5. If send messages to not exist vertex, throw exception.*/
+            if (vertex == null && hasMessages) {
+            }
+            return vertex;
+        }
+    }
+
+
+    public static void main(String[] args) throws IOException {
+        if (args.length < 2) {
+            System.out.println("Usage: <input> <output>");
+            System.exit(-1);
+        }
+        GraphJob job = new GraphJob();
+        job.setGraphLoaderClass(CCVertexReader.class);
+        job.setVertexClass(CCVertex.class);
+        job.setCombinerClass(MinLongCombiner.class);
+        job.setLoadingVertexResolver(RemoveDuplicatesLoadingResolver.class);
+        job.setComputingVertexResolver(CompanyComputingVertexResolver.class);
+
+        job.addInput(TableInfo.builder().tableName(args[0]).build());
+        job.addOutput(TableInfo.builder().tableName(args[1]).build());
+        long startTime = System.currentTimeMillis();
+        job.run();
+        System.out.println("Job Finished in "
+                + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
+    }
+}