CompanyConnectedComponents.java 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133
  1. package com.winhc.max.compute.graph.job;
  2. import com.aliyun.odps.data.TableInfo;
  3. import com.aliyun.odps.graph.*;
  4. import com.aliyun.odps.io.*;
  5. import com.aliyun.odps.utils.StringUtils;
  6. import java.io.IOException;
  7. import java.util.Arrays;
  8. /**
  9. * π
  10. * 联通子图计算
  11. */
  12. public class CompanyConnectedComponents {
  13. public static class CCVertex extends
  14. Vertex<LongWritable, LongWritable, NullWritable, LongWritable> {
  15. @Override
  16. public void compute(
  17. ComputeContext<LongWritable, LongWritable, NullWritable, LongWritable> context,
  18. Iterable<LongWritable> msgs) throws IOException {
  19. if (context.getSuperstep() == 0L) {
  20. this.setValue(getId());
  21. context.sendMessageToNeighbors(this, getValue());
  22. return;
  23. }
  24. long minID = Long.MAX_VALUE;
  25. for (LongWritable id : msgs) {
  26. if (id.get() < minID) {
  27. minID = id.get();
  28. }
  29. }
  30. if (minID < this.getValue().get()) {
  31. this.setValue(new LongWritable(minID));
  32. context.sendMessageToNeighbors(this, getValue());
  33. } else {
  34. this.voteToHalt();
  35. }
  36. }
  37. @Override
  38. public void cleanup(
  39. WorkerContext<LongWritable, LongWritable, NullWritable, LongWritable> context)
  40. throws IOException {
  41. context.write(getId(), getValue());
  42. }
  43. }
  44. public static class CCVertexReader extends
  45. GraphLoader<LongWritable, LongWritable, NullWritable, LongWritable> {
  46. @Override
  47. public void load(
  48. LongWritable recordNum,
  49. WritableRecord record,
  50. MutationContext<LongWritable, LongWritable, NullWritable, LongWritable> context)
  51. throws IOException {
  52. CCVertex vertex = new CCVertex();
  53. vertex.setId((LongWritable) record.get(0));
  54. Writable es = record.get(1);
  55. if (es != null) {
  56. String[] split = es.toString().split(",");
  57. Arrays.stream(split).filter(StringUtils::isNotBlank).forEach(
  58. x -> {
  59. long destID = Long.parseLong(x);
  60. vertex.addEdge(new LongWritable(destID), NullWritable.get());
  61. }
  62. );
  63. }
  64. context.addVertexRequest(vertex);
  65. }
  66. }
  67. public static class MinLongCombiner extends Combiner<LongWritable, LongWritable> {
  68. @Override
  69. public void combine(LongWritable vertexId, LongWritable combinedMessage,
  70. LongWritable messageToCombine) throws IOException {
  71. if (combinedMessage.get() > messageToCombine.get()) {
  72. combinedMessage.set(messageToCombine.get());
  73. }
  74. }
  75. }
  76. public static class CompanyComputingVertexResolver<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> extends DefaultComputingVertexResolver {
  77. @Override
  78. public Vertex resolveNotExistVertexMutations(WritableComparable vertexId, VertexChanges vertexChanges, boolean hasMessages) throws IOException {
  79. /**
  80. * 1. If creation of vertex desired, pick first vertex.
  81. */
  82. Vertex<I, V, E, M> vertex = addVertexIfDesired(vertexId, vertexChanges);
  83. /** 2. If edge addition, add the edges */
  84. addEdges(vertexId, vertex, vertexChanges);
  85. /** 3. If the vertex exists, first prune the edges. */
  86. removeEdges(vertexId, vertex, vertexChanges);
  87. /** 4. If vertex removal desired, remove the vertex. */
  88. vertex = removeVertexIfDesired(vertexId, vertex, vertexChanges);
  89. /** 5. If send messages to not exist vertex, throw exception.*/
  90. if (vertex == null && hasMessages) {
  91. }
  92. return vertex;
  93. }
  94. }
  95. public static void main(String[] args) throws IOException {
  96. if (args.length < 2) {
  97. System.out.println("Usage: <input> <output>");
  98. System.exit(-1);
  99. }
  100. GraphJob job = new GraphJob();
  101. job.setGraphLoaderClass(CCVertexReader.class);
  102. job.setVertexClass(CCVertex.class);
  103. job.setCombinerClass(MinLongCombiner.class);
  104. job.setLoadingVertexResolver(RemoveDuplicatesLoadingResolver.class);
  105. job.setComputingVertexResolver(CompanyComputingVertexResolver.class);
  106. job.addInput(TableInfo.builder().tableName(args[0]).build());
  107. job.addOutput(TableInfo.builder().tableName(args[1]).build());
  108. long startTime = System.currentTimeMillis();
  109. job.run();
  110. System.out.println("Job Finished in "
  111. + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
  112. }
  113. }