Ver código fonte

添加摘要计算

许家凯 5 anos atrás
pai
commit
95e6684aa4

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/TestSparkSql.scala

@@ -27,7 +27,7 @@ object TestSparkSql {
     throw new RuntimeException("warehouse dir not exists")
   }
   def main(args: Array[String]): Unit = {
-    val spark: SparkSession = SparkUtils.InitEnv
+    val spark: SparkSession = SparkUtils.InitEnv("appName")
 
     import spark._
     val tableName = "ods_company_all"

+ 39 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/CompanySummaryCalculator.scala

@@ -0,0 +1,39 @@
+package com.winhc.bigdata.spark.jobs
+
+import com.winhc.bigdata.spark.utils.SparkUtils
+import org.apache.spark.sql.DataFrame
+
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/5/20 15:38
+ * @Description:
+ */
+object CompanySummaryCalculator {
+  val tableName2FieldName = Map(
+    "ads_judrisk_court_annc_list" -> "company_id",
+    "ods_company_jud_assist_list" -> "company_id"
+  )
+
+  def getSql(tableName: String, companyIdFieldName: String) = s"select $companyIdFieldName as company_id,count(1) as ${tableName}_num from $tableName where $companyIdFieldName <>0 group by $companyIdFieldName"
+
+  def main(args: Array[String]): Unit = {
+    val spark = SparkUtils.InitEnv("CompanySummaryCalculator")
+    import spark._
+    val outputTable = "ads_company_summary"
+
+
+    var selectField = List("company_id")
+    var df: DataFrame = null
+    for (elem <- tableName2FieldName) {
+      selectField = selectField :+ elem._1 + "_num"
+      if (df == null) {
+        df = sql(getSql(elem._1, elem._2))
+      } else {
+        df = df.join(sql(getSql(elem._1, elem._2)), "company_id").select(selectField.head, selectField.tail: _*)
+      }
+    }
+    df.write.mode("overwrite").insertInto(outputTable)
+    spark.stop()
+  }
+}