123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133 |
- package com.winhc.max.compute.graph.job;
- import com.aliyun.odps.data.TableInfo;
- import com.aliyun.odps.graph.*;
- import com.aliyun.odps.io.*;
- import com.aliyun.odps.utils.StringUtils;
- import java.io.IOException;
- import java.util.Arrays;
- /**
- * π
- * 联通子图计算
- */
- public class CompanyConnectedComponents {
- public static class CCVertex extends
- Vertex<LongWritable, LongWritable, NullWritable, LongWritable> {
- @Override
- public void compute(
- ComputeContext<LongWritable, LongWritable, NullWritable, LongWritable> context,
- Iterable<LongWritable> msgs) throws IOException {
- if (context.getSuperstep() == 0L) {
- this.setValue(getId());
- context.sendMessageToNeighbors(this, getValue());
- return;
- }
- long minID = Long.MAX_VALUE;
- for (LongWritable id : msgs) {
- if (id.get() < minID) {
- minID = id.get();
- }
- }
- if (minID < this.getValue().get()) {
- this.setValue(new LongWritable(minID));
- context.sendMessageToNeighbors(this, getValue());
- } else {
- this.voteToHalt();
- }
- }
- @Override
- public void cleanup(
- WorkerContext<LongWritable, LongWritable, NullWritable, LongWritable> context)
- throws IOException {
- context.write(getId(), getValue());
- }
- }
- public static class CCVertexReader extends
- GraphLoader<LongWritable, LongWritable, NullWritable, LongWritable> {
- @Override
- public void load(
- LongWritable recordNum,
- WritableRecord record,
- MutationContext<LongWritable, LongWritable, NullWritable, LongWritable> context)
- throws IOException {
- CCVertex vertex = new CCVertex();
- vertex.setId((LongWritable) record.get(0));
- Writable es = record.get(1);
- if (es != null) {
- String[] split = es.toString().split(",");
- Arrays.stream(split).filter(StringUtils::isNotBlank).forEach(
- x -> {
- long destID = Long.parseLong(x);
- vertex.addEdge(new LongWritable(destID), NullWritable.get());
- }
- );
- }
- context.addVertexRequest(vertex);
- }
- }
- public static class MinLongCombiner extends Combiner<LongWritable, LongWritable> {
- @Override
- public void combine(LongWritable vertexId, LongWritable combinedMessage,
- LongWritable messageToCombine) throws IOException {
- if (combinedMessage.get() > messageToCombine.get()) {
- combinedMessage.set(messageToCombine.get());
- }
- }
- }
- public static class CompanyComputingVertexResolver<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> extends DefaultComputingVertexResolver {
- @Override
- public Vertex resolveNotExistVertexMutations(WritableComparable vertexId, VertexChanges vertexChanges, boolean hasMessages) throws IOException {
- /**
- * 1. If creation of vertex desired, pick first vertex.
- */
- Vertex<I, V, E, M> vertex = addVertexIfDesired(vertexId, vertexChanges);
- /** 2. If edge addition, add the edges */
- addEdges(vertexId, vertex, vertexChanges);
- /** 3. If the vertex exists, first prune the edges. */
- removeEdges(vertexId, vertex, vertexChanges);
- /** 4. If vertex removal desired, remove the vertex. */
- vertex = removeVertexIfDesired(vertexId, vertex, vertexChanges);
- /** 5. If send messages to not exist vertex, throw exception.*/
- if (vertex == null && hasMessages) {
- }
- return vertex;
- }
- }
- public static void main(String[] args) throws IOException {
- if (args.length < 2) {
- System.out.println("Usage: <input> <output>");
- System.exit(-1);
- }
- GraphJob job = new GraphJob();
- job.setGraphLoaderClass(CCVertexReader.class);
- job.setVertexClass(CCVertex.class);
- job.setCombinerClass(MinLongCombiner.class);
- job.setLoadingVertexResolver(RemoveDuplicatesLoadingResolver.class);
- job.setComputingVertexResolver(CompanyComputingVertexResolver.class);
- job.addInput(TableInfo.builder().tableName(args[0]).build());
- job.addOutput(TableInfo.builder().tableName(args[1]).build());
- long startTime = System.currentTimeMillis();
- job.run();
- System.out.println("Job Finished in "
- + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
- }
- }
|