Browse Source

爬虫增量合并到天眼查增量

xufei 4 năm trước cách đây
mục cha
commit
0cb66efebd

+ 61 - 0
src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncrCombineUtils.scala

@@ -0,0 +1,61 @@
+package com.winhc.bigdata.spark.utils
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql.SparkSession
+
+import scala.collection.mutable
+
+/**
+ * @Description: 自有增量数据合并到天眼查数据
+ * @author π
+ * @date 2020/8/3114:07
+ */
+object CompanyIncrCombineUtils {
+  def main(args: Array[String]): Unit = {
+    val Array(project, source, target) = args
+
+    println(
+      s"""
+         |project:$project
+         |source:$source
+         |target:$target
+         |""".stripMargin)
+
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> project,
+      "spark.hadoop.odps.spark.local.partition.amt" -> "1000"
+    )
+    val spark = SparkUtils.InitEnv(getClass.getSimpleName, config)
+    CompanyIncrCombineUtils(spark, source, target).calc()
+    spark.stop()
+  }
+}
+
+case class CompanyIncrCombineUtils(s: SparkSession, source: String, target: String) extends LoggingUtils {
+  override protected val spark: SparkSession = s
+
+  def calc(): Unit = {
+
+    val ds2 = BaseUtil.getPartion(s"$target", spark) //目标表数据
+
+    val cols: Seq[String] = spark.table(target).schema.map(_.name).filter(s => {
+      !s.equals("ds")
+    })
+
+    //判断目标表是否之前合并过
+    val list = sql(
+      s"""
+         |select max(ds) max_ds from $target where id = -1 and ds > '0'
+         |""".stripMargin).collect().toList.map(_.getAs[String]("max_ds"))
+
+    println(s"list: $list")
+
+    sql(
+      s"""
+         |INSERT into table $target PARTITION(ds=$ds2)
+         |SELECT ${cols.mkString(",")} from
+         |$source
+         |where ds > '${if (StringUtils.isNotBlank(list.head)) s"${list.head}" else s"0"}'
+         |""".stripMargin)
+  }
+}