Selaa lähdekoodia

软件著作权

xufei 4 vuotta sitten
vanhempi
commit
c2603b0992

+ 89 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyCopyrightReg.scala

@@ -0,0 +1,89 @@
+package com.winhc.bigdata.spark.jobs
+
+import java.util.Date
+
+import com.winhc.bigdata.spark.utils.SparkUtils
+import org.apache.spark.sql.SparkSession
+
+import scala.collection.mutable
+
+/**
+ * 软件著作权
+ */
+object CompanyCopyrightReg {
+  def main(args: Array[String]): Unit = {
+    if (args.length != 3) {
+      println("请配置计算资源: instances, cores, memory .")
+      System.exit(-1)
+    }
+
+    var config = mutable.Map.empty[String, String]
+    val Array(instances, cores, memory) = args;
+
+    println(
+      s"""
+         |instances : $instances,
+         |cores : $cores,
+         |memory : $memory
+         |""".stripMargin)
+
+    config = mutable.Map("spark.executor.instances" -> instances,
+      "spark.executor.cores" -> cores,
+      "spark.executor.memory" -> memory
+    )
+
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+
+    import spark.implicits._
+    import spark._
+    import org.apache.spark.sql.functions._
+    println("CompanyCopyrightReg calc start! " + new Date().toString)
+
+    val sourceTable = "ods_company_copyright_reg_mysql"
+    val resultTable = "ads_company_copyright_reg"
+    val ds = "20200513"
+
+    sql(s"select * from $sourceTable where ds = $ds")
+      .dropDuplicates("reg_num", "full_name")
+      .createOrReplaceTempView("t1")
+    sql(s"CACHE TABLE t1")
+
+    //拆平新表
+    sql(
+      s"""
+         |SELECT  c.*
+         |        ,coalesce(d.res_cid,c.cid) as res_cid
+         |FROM    (
+         |            SELECT  *
+         |                    ,cid
+         |            FROM    t1 a
+         |            LATERAL VIEW explode(split(cids, ';')) b AS cid
+         |            WHERE   a.ds = $ds
+         |        ) c
+         |LEFT JOIN company_name_mapping d
+         |ON      c.cid = d.cid
+         |""".stripMargin)
+      .createOrReplaceTempView(s"t2")
+
+    //聚合新cids
+    sql(
+      """
+        |SELECT
+        |t1.id ,cids ,reg_num,cat_num,full_name,simple_name,version,author_nationality
+        |,publish_time,reg_time,source_url,create_time,update_time,deleted,x.new_cids
+        |FROM    t1
+        |LEFT JOIN (
+        |              SELECT  id
+        |                      ,concat_ws(';',collect_set(res_cid)) new_cids
+        |              FROM    t2
+        |              GROUP BY id
+        |          ) x
+        |ON      t1.id = x.id
+        |""".stripMargin)
+      .write.mode("overwrite").insertInto(resultTable)
+
+    println("CompanyCopyrightReg calc stop! " + new Date().toString)
+
+    spark.stop()
+  }
+}