许家凯 4 years ago
parent
commit
17e1014dc5

+ 18 - 15
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyIncCompany2Es.scala

@@ -3,6 +3,7 @@ package com.winhc.bigdata.spark.jobs
 import com.winhc.bigdata.spark.config.{EsConfig, HBaseConfig}
 import com.winhc.bigdata.spark.const.BaseConst
 import com.winhc.bigdata.spark.udf.BaseFunc
+import com.winhc.bigdata.spark.utils.BaseUtil.{atDaysAfter, nowDate}
 import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
 import org.apache.hadoop.hbase.client.Put
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable
@@ -70,16 +71,16 @@ object CompanyIncCompany2Es {
   )
   val outFields_Human = Seq(
     "NEW_CID"
-    ,"CID"
-    ,"ID"
-    ,"COMPANY_NAME"
-    ,"HUMAN_NAME"
-    ,"HID"
-    ,"HUMAN_PID"
-    ,"STATUS"
-    ,"CREATE_TIME"
-    ,"UPDATE_TIME"
-    ,"DELETED"
+    , "CID"
+    , "ID"
+    , "COMPANY_NAME"
+    , "HUMAN_NAME"
+    , "HID"
+    , "HUMAN_PID"
+    , "STATUS"
+    , "CREATE_TIME"
+    , "UPDATE_TIME"
+    , "DELETED"
   )
 
   case class Company2Es(s: SparkSession, project: String, bizDate: String) extends LoggingUtils with BaseFunc {
@@ -89,7 +90,7 @@ object CompanyIncCompany2Es {
       val code = code2Name()
       val partition = bizDate.replaceAll("\\-", "")
       if (partition.length != 8) {
-        println("biz date is error!")
+        println("biz date is error! "+partition)
         sys.exit(-99)
       }
       val inc_ods_partitions = BaseUtil.getPartitions(s"${project}.inc_ods_company", spark)
@@ -157,13 +158,14 @@ object CompanyIncCompany2Es {
 
     }
   }
+
   case class Company_Human_Relation2HBase(s: SparkSession, project: String, bizDate: String) extends LoggingUtils {
     @(transient@getter) val spark: SparkSession = s
 
     def calc() {
       val partition = bizDate.replaceAll("\\-", "")
       if (partition.length != 8) {
-        println("biz date is error!")
+        println("biz date is error! "+partition)
         sys.exit(-99)
       }
       val inc_ods_partitions = BaseUtil.getPartitions(s"${project}.inc_ods_company_human_relation", spark)
@@ -229,17 +231,18 @@ object CompanyIncCompany2Es {
   }
 
   def main(args: Array[String]): Unit = {
-    if (args.length != 2) {
-      println("please enter project and bizDate!")
+    if (args.length != 1) {
+      println("please enter project!")
       sys.exit(-99)
     }
 
-    val Array(project, bizDate) = args
+    val Array(project) = args
 
     val config = EsConfig.getEsConfigMap ++ mutable.Map(
       "spark.hadoop.odps.project.name" -> project,
       "spark.hadoop.odps.spark.local.partition.amt" -> "10"
     )
+    val bizDate = atDaysAfter(-1, nowDate("yyyyMMdd"))
 
     val spark = SparkUtils.InitEnv("company2Es", config)
 

+ 1 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/chance/ChangeExtract.scala

@@ -202,6 +202,7 @@ object ChangeExtract {
           |winhc_eci_dev company_land_announcement rowkey 20200717 e_number,project_name
           |winhc_eci_dev company_bid_list rowkey 20200717 title
           |winhc_eci_dev company_land_transfer rowkey 20200717 num,location
+          |winhc_eci_dev company_employment rowkey 20200717 source
           |""".stripMargin.replace("20200717", ds)
       for (r <- rows.split("\r\n")) {
         if (StringUtils.isNotEmpty(r)) {

+ 6 - 4
src/main/scala/com/winhc/bigdata/spark/jobs/chance/eci_good_news.scala

@@ -33,8 +33,10 @@ object eci_good_news {
       , "company_certificate" //资质证书
     )
 
-    val target_ads_case_chance = "xjk_test_ads_case_chance"
-    val target_ads_case_chance_element = "xjk_test_ads_case_chance_element"
+    val target_ads_case_chance = "ads_case_chance_good_news"
+    val target_ads_case_chance_element = "ads_case_chance_element_good_news"
+
+    private val env = "prod"
 
     def company_ip(): Unit = {
       cleanup()
@@ -76,7 +78,7 @@ object eci_good_news {
 
       sql(
         s"""
-           |INSERT  OVERWRITE TABLE winhc_eci_dev.$target_ads_case_chance_element PARTITION(ds='$ds')
+           |INSERT  OVERWRITE TABLE ${if (env.equals("dev")) "winhc_eci_dev" else "winhc_eci"}.$target_ads_case_chance_element PARTITION(ds='$ds')
            |SELECT  md5(cleanup(CONCAT_WS('',case_chance_id,case_chance_type,type,province,city,dynamic_time))) AS id
            |        ,CASE_CHANCE_ID
            |        ,TYPE
@@ -117,7 +119,7 @@ object eci_good_news {
       sql(
         s"""
            |
-           |INSERT OVERWRITE TABLE winhc_eci_dev.$target_ads_case_chance PARTITION(ds='$ds')
+           |INSERT OVERWRITE TABLE ${if (env.equals("dev")) "winhc_eci_dev" else "winhc_eci"}.$target_ads_case_chance PARTITION(ds='$ds')
            |SELECT  detail_rowkey AS case_chance_id
            |        ,detail_title AS title
            |        ,null AS plaintiff

+ 1 - 3
src/main/scala/com/winhc/bigdata/spark/utils/BaseUtil.scala

@@ -113,8 +113,6 @@ object BaseUtil {
   }
 
   def main(args: Array[String]): Unit = {
-    println(replaceChar(",x,"))
-    println(replaceChar("华为信息科技公司,。百度科技公司"))
-    println(replaceChar("2015)深南法蛇民初第883-887受理郑委,曹   连云,庄忠杰,曹元洪,曹硕"))
+    println(atDaysAfter(-1, nowDate("yyyyMMdd")))
   }
 }