ソースを参照

feat: 企业动态调整

- 全量输出到hbase
- 用户关注的输出到es
许家凯 4 年 前
コミット
5b4a861848

+ 86 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamic2Es.scala

@@ -0,0 +1,86 @@
+package com.winhc.bigdata.spark.jobs.dynamic
+
+import com.winhc.bigdata.spark.config.EsConfig
+import com.winhc.bigdata.spark.utils.{LoggingUtils, SparkUtils}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/9/15 14:27
+ * @Description:
+ */
+object CompanyDynamic2Es {
+
+  def main(args: Array[String]): Unit = {
+    val Array(project, ds) = args
+
+    println(
+      s"""
+         |project: $project
+         |ds: $ds
+         |""".stripMargin)
+
+    val config = EsConfig.getEsConfigMap ++ mutable.Map(
+      "spark.hadoop.odps.project.name" -> project,
+      "spark.hadoop.odps.spark.local.partition.amt" -> "1000"
+    )
+    val spark = SparkUtils.InitEnv("CompanyDynamic2Es", config)
+    CompanyDynamic2Es(s = spark, project = project).save(ds)
+    spark.stop()
+  }
+
+}
+
+case class CompanyDynamic2Es(s: SparkSession,
+                             project: String
+                            ) extends LoggingUtils with Logging {
+  private val env = "prod"
+
+  @(transient@getter) val spark: SparkSession = s
+
+  def save(ds: String): Unit = {
+
+    sql(
+      s"""
+         |INSERT OVERWRITE TABLE ${getEnvProjectName(env, "winhc_eci_dev")}.ads_company_dynamic_2_es   PARTITION(ds,tn)
+         |SELECT  t4.id
+         |        ,t4.cid
+         |        ,t4.cname
+         |        ,t4.info_type
+         |        ,t4.rta_desc
+         |        ,t4.change_content
+         |        ,t4.change_time
+         |        ,t4.biz_id
+         |        ,t4.sub_info_type
+         |        ,t4.info_risk_level
+         |        ,t4.winhc_suggest
+         |        ,t4.create_time
+         |        ,t4.ds
+         |        ,t4.tn
+         |FROM    (
+         |            SELECT  DISTINCT t2.new_cid AS cid
+         |            FROM    (
+         |                        SELECT  *
+         |                        FROM    ${getEnvProjectName(env, "winhc_eci_dev")}.ods_radar_rta
+         |                        WHERE   ds = '$ds'
+         |                    ) AS t1
+         |            JOIN    (
+         |                        SELECT  *
+         |                        FROM    winhc_eci_dev.base_company_mapping
+         |                        WHERE   ds = '$ds'
+         |                    ) AS t2
+         |            ON      t1.comp_name = t2.cname
+         |        ) AS t3
+         |JOIN    (
+         |            SELECT  *
+         |            FROM    ${getEnvProjectName(env, "winhc_eci_dev")}.ads_company_dynamic
+         |            WHERE   ds = '$ds'
+         |        ) AS t4
+         |ON      t3.cid = t4.cid
+         |""".stripMargin)
+  }
+}

+ 3 - 3
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamicHandleUtils.scala

@@ -10,11 +10,11 @@ import com.winhc.bigdata.spark.utils.DateUtils
  */
 object CompanyDynamicHandleUtils {
   def getDynamicId(cid: String, rta_desc: String, biz_id: String, change_time: String): String = {
-    val id = 9999999999L - DateUtils.toUnixTimestamp(date = change_time)
+    val timestamp = DateUtils.toUnixTimestamp(date = change_time)
     // 过滤1970年以前的数据
-    if (id < 0)
+    if (timestamp < 0)
       null
     else
-      s"${cid}_${id}_${SecureUtil.md5(rta_desc + biz_id)}"
+      s"${cid}_${9999999999L - timestamp}_${SecureUtil.md5(rta_desc + biz_id)}"
   }
 }