Parcourir la source

Merge remote-tracking branch 'origin/master'

许家凯 il y a 4 ans
Parent
commit
2667ca2bcc

+ 57 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyHidNameMapping.scala

@@ -0,0 +1,57 @@
+package com.winhc.bigdata.spark.jobs
+
+import com.winhc.bigdata.spark.udf.{BaseFunc, CompanyMapping}
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
+import org.apache.spark.sql.SparkSession
+
+import scala.collection.mutable
+
+object CompanyHidNameMapping {
+  def main(args: Array[String]): Unit = {
+    val project = "winhc_eci_dev"
+
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "100"
+    )
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+    CompanyHidNameMapping(spark, project).calc
+    spark.stop()
+  }
+
+}
+
+case class CompanyHidNameMapping(s: SparkSession,
+                                 project: String //表所在工程名
+                           ) extends LoggingUtils with CompanyMapping with BaseFunc{
+  override protected val spark: SparkSession = s
+
+  def calc = {
+    val inc_ads_company_human_relation = s"$project.inc_ads_company_human_relation"
+
+    val ds = BaseUtil.getPartion(inc_ads_company_human_relation, spark)
+
+    val inc_ads_hid_name_mapping = s"inc_ads_hid_name_mapping"
+
+    val env = "prod"
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE ${if (env.equals("dev")) "winhc_eci_dev" else "winhc_eci"}.$inc_ads_hid_name_mapping PARTITION(ds='$ds')
+         |SELECT  hid,human_name
+         |FROM    (
+         |            SELECT  cast(hid as string) hid,human_name
+         |                    ,ROW_NUMBER() OVER(PARTITION BY hid ORDER BY ds DESC ,update_time DESC) AS num
+         |            FROM    (
+         |                        SELECT  human_name,hid,update_time,ds
+         |                        FROM    $inc_ads_company_human_relation
+         |                        WHERE   ds = '$ds'
+         |                        and human_name <> ''
+         |                    )
+         |        )
+         |WHERE   num = 1
+         |""".stripMargin)
+
+    addEmptyPartitionOrSkip(s"${if (env.equals("dev")) "winhc_eci_dev" else "winhc_eci"}.$inc_ads_hid_name_mapping", ds)
+  }
+}

+ 1 - 0
src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncrForCidsUtils.scala

@@ -24,6 +24,7 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
   val tabMapping =
     Map("company_court_open_announcement" -> ("litigant_cids", ";") //开庭公告
       , "company_send_announcement" -> ("litigant_cids", ",") //送达公告
+      , "auction_tracking" -> ("cids", ",") //送达公告
     )
 
   val funMap =

+ 21 - 7
src/main/scala/com/winhc/bigdata/spark/utils/MaxComputer2Phoenix.scala

@@ -30,13 +30,27 @@ case class MaxComputer2Phoenix(spark: SparkSession,
 
 
     val key = s"$rowkey AS rowkey"
-    val res = phoenixCols.filter(!_.equalsIgnoreCase("id")).map(s => {
-      if ("NEW_CID".equals(s.toUpperCase())) {
-        s"cast ($s as string) as CID"
-      } else {
-        s"cast ($s as string) as ${s.toUpperCase}"
-      }
-    }) ++ Seq(key)
+    var res = Seq[String]()
+    if ("auction_tracking_list".equalsIgnoreCase(htable)){
+      res = phoenixCols.map(s => {
+        if ("NEW_CID".equals(s.toUpperCase())) {
+          s"cast ($s as string) as CID"
+        } else {
+          s"cast ($s as string) as ${s.toUpperCase}"
+        }
+      }) ++ Seq(key)
+
+    }else{
+      res = phoenixCols.filter(!_.equalsIgnoreCase("id")).map(s => {
+        if ("NEW_CID".equals(s.toUpperCase())) {
+          s"cast ($s as string) as CID"
+        } else {
+          s"cast ($s as string) as ${s.toUpperCase}"
+        }
+      }) ++ Seq(key)
+    }
+
+
 
     val df = sql(
       s"""