瀏覽代碼

feat: 限制company_rank资源上限

许家凯 2 年之前
父節點
當前提交
03361d7cc3

+ 7 - 0
pom.xml

@@ -36,6 +36,13 @@
             <artifactId>hutool-all</artifactId>
             <version>5.7.10</version>
         </dependency>
+        <!-- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+            <version>1.18.20</version>
+            <scope>provided</scope>
+        </dependency>
     </dependencies>
 
 </project>

+ 5 - 1
src/main/java/com/winhc/max/compute/graph/job/CompanyRankGraphJob.java

@@ -25,12 +25,16 @@ public class CompanyRankGraphJob {
         String outPart = parameterTool.getOrDefault("outPart", "" + System.currentTimeMillis());
         String maxIter = parameterTool.getOrDefault("maxIter", "30");
         String variance = parameterTool.getOrDefault("variance", "0.00000001");
+        String numWorkers = parameterTool.getOrDefault("numWorkers", "70");
 
         System.out.println("input args: " + String.join(" ", args));
 
         GraphJob job = new GraphJob();
 
         job.set("company.rank.variance", variance);
+        job.setMaxIteration(-1);
+        job.setNumWorkers(Integer.parseInt(numWorkers));
+
 
         job.setGraphLoaderClass(CompanyRankVertexReader.class);
         job.setVertexClass(CompanyRankVertex.class);
@@ -50,7 +54,7 @@ public class CompanyRankGraphJob {
                 .tableName("calc_company_rank_out")
                 .partSpec("ds='" + outPart + "'")
                 .build());
-        job.setMaxIteration(-1);
+
 
         long startTime = System.currentTimeMillis();
         job.run();

+ 7 - 2
src/main/java/com/winhc/max/compute/graph/job/pagerank/CompanyRankVertexReader.java

@@ -3,6 +3,8 @@ package com.winhc.max.compute.graph.job.pagerank;
 import com.aliyun.odps.graph.GraphLoader;
 import com.aliyun.odps.graph.MutationContext;
 import com.aliyun.odps.io.*;
+import com.winhc.max.compute.graph.util.WritableRecordExtensions;
+import lombok.experimental.ExtensionMethod;
 
 import java.io.IOException;
 
@@ -10,14 +12,17 @@ import java.io.IOException;
  * @author: XuJiakai
  * 2022/6/8 09:58
  */
+@ExtensionMethod({
+        WritableRecordExtensions.class
+})
 public class CompanyRankVertexReader extends
         GraphLoader<Text, CompanyRank, DoubleWritable, DoubleWritable> {
     @Override
     public void load(LongWritable recordNum, WritableRecord record, MutationContext<Text, CompanyRank, DoubleWritable, DoubleWritable> context) throws IOException {
         CompanyRankVertex vertex = new CompanyRankVertex();
 
-        Text companyId = (Text) record.get("investment_company_id");
-        Text company_name = (Text) record.get("investment_company_name");
+        Text companyId = record.getTextOrNull("investment_company_id");
+        Text company_name = record.getTextOrNull("investment_company_name");
 //        DoubleWritable initScore = ((DoubleWritable) record.get("investment_company_init_score"));
         DoubleWritable initScore = new DoubleWritable(1);
         vertex.setId(companyId);

+ 23 - 0
src/main/java/com/winhc/max/compute/graph/util/WritableRecordExtensions.java

@@ -0,0 +1,23 @@
+package com.winhc.max.compute.graph.util;
+
+import com.aliyun.odps.io.NullWritable;
+import com.aliyun.odps.io.Text;
+import com.aliyun.odps.io.Writable;
+import com.aliyun.odps.io.WritableRecord;
+
+import java.io.IOException;
+
+/**
+ * @author: XuJiakai
+ * 2022/7/11 14:30
+ */
+public class WritableRecordExtensions {
+    public static Text getTextOrNull(WritableRecord writableRecord, String filedName) throws IOException {
+        Writable writable = writableRecord.get(filedName);
+        if (writable.equals(NullWritable.get())) {
+            return new Text();
+        } else {
+            return ((Text) writable);
+        }
+    }
+}