Procházet zdrojové kódy

Merge remote-tracking branch 'origin/master'

lyb před 4 roky
rodič
revize
5a0624ae94

+ 9 - 5
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyCourtAnnouncement.scala

@@ -63,8 +63,13 @@ case class CompanyCourtAnnouncement(s: SparkSession, project: String, //表所
     }
     val ads_eci_debtor_relation = s"${project}.ads_eci_debtor_relation" //债权全量表
     val debtorRelationDs = getPartion(ads_eci_debtor_relation, spark)
-    val ads_address = s"${project}.inc_ads_${tableName}_address" //增量地址表
-    val ads_yg_bg = s"${project}.inc_ads_${tableName}_bg_yg" //增量原被告-原告表
+
+    //结果表导入生产表
+//    val ads_address = s"${project}.inc_ads_${tableName}_address" //增量地址表
+//    val ads_yg_bg = s"${project}.inc_ads_${tableName}_bg_yg" //增量原被告-原告表
+
+    val ads_address = s"winhc_eci.inc_ads_${tableName}_address" //增量地址表
+    val ads_yg_bg = s"winhc_eci.inc_ads_${tableName}_bg_yg" //增量原被告-原告表
 
     //被告
     val df = sql(
@@ -458,7 +463,6 @@ case class CompanyCourtAnnouncement(s: SparkSession, project: String, //表所
   }
 
 
-
 }
 
 object CompanyCourtAnnouncement {
@@ -499,7 +503,7 @@ object CompanyCourtAnnouncement {
     if (!runOld) {
       val flag = announcement.preCalc()
       //增量没更新返回
-      if(!flag) return
+      if (!flag) return
     }
     announcement.calc(runOld)
     spark.stop()
@@ -507,7 +511,7 @@ object CompanyCourtAnnouncement {
 
 }
 
-object EsQuery{
+object EsQuery {
   def queryCompany(restClient: RestClient, companyName: String) = {
     val query =
       s"""

+ 18 - 15
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyIncCompany2Es.scala

@@ -3,6 +3,7 @@ package com.winhc.bigdata.spark.jobs
 import com.winhc.bigdata.spark.config.{EsConfig, HBaseConfig}
 import com.winhc.bigdata.spark.const.BaseConst
 import com.winhc.bigdata.spark.udf.BaseFunc
+import com.winhc.bigdata.spark.utils.BaseUtil.{atDaysAfter, nowDate}
 import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
 import org.apache.hadoop.hbase.client.Put
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable
@@ -70,16 +71,16 @@ object CompanyIncCompany2Es {
   )
   val outFields_Human = Seq(
     "NEW_CID"
-    ,"CID"
-    ,"ID"
-    ,"COMPANY_NAME"
-    ,"HUMAN_NAME"
-    ,"HID"
-    ,"HUMAN_PID"
-    ,"STATUS"
-    ,"CREATE_TIME"
-    ,"UPDATE_TIME"
-    ,"DELETED"
+    , "CID"
+    , "ID"
+    , "COMPANY_NAME"
+    , "HUMAN_NAME"
+    , "HID"
+    , "HUMAN_PID"
+    , "STATUS"
+    , "CREATE_TIME"
+    , "UPDATE_TIME"
+    , "DELETED"
   )
 
   case class Company2Es(s: SparkSession, project: String, bizDate: String) extends LoggingUtils with BaseFunc {
@@ -89,7 +90,7 @@ object CompanyIncCompany2Es {
       val code = code2Name()
       val partition = bizDate.replaceAll("\\-", "")
       if (partition.length != 8) {
-        println("biz date is error!")
+        println("biz date is error! "+partition)
         sys.exit(-99)
       }
       val inc_ods_partitions = BaseUtil.getPartitions(s"${project}.inc_ods_company", spark)
@@ -157,13 +158,14 @@ object CompanyIncCompany2Es {
 
     }
   }
+
   case class Company_Human_Relation2HBase(s: SparkSession, project: String, bizDate: String) extends LoggingUtils {
     @(transient@getter) val spark: SparkSession = s
 
     def calc() {
       val partition = bizDate.replaceAll("\\-", "")
       if (partition.length != 8) {
-        println("biz date is error!")
+        println("biz date is error! "+partition)
         sys.exit(-99)
       }
       val inc_ods_partitions = BaseUtil.getPartitions(s"${project}.inc_ods_company_human_relation", spark)
@@ -229,17 +231,18 @@ object CompanyIncCompany2Es {
   }
 
   def main(args: Array[String]): Unit = {
-    if (args.length != 2) {
-      println("please enter project and bizDate!")
+    if (args.length != 1) {
+      println("please enter project!")
       sys.exit(-99)
     }
 
-    val Array(project, bizDate) = args
+    val Array(project) = args
 
     val config = EsConfig.getEsConfigMap ++ mutable.Map(
       "spark.hadoop.odps.project.name" -> project,
       "spark.hadoop.odps.spark.local.partition.amt" -> "10"
     )
+    val bizDate = atDaysAfter(-1, nowDate("yyyyMMdd"))
 
     val spark = SparkUtils.InitEnv("company2Es", config)
 

+ 5 - 3
src/main/scala/com/winhc/bigdata/spark/jobs/chance/ChangeExtract.scala

@@ -186,6 +186,7 @@ object ChangeExtract {
       ChangeExtractHandle(spark, project, tableName, rowkey, inc_ds, pf.split(",")).calc
       spark.stop()
     } else {
+      val ds = args(0)
       val project = "winhc_eci_dev"
       val config = EsConfig.getEsConfigMap ++ mutable.Map(
         "spark.hadoop.odps.project.name" -> project,
@@ -201,10 +202,11 @@ object ChangeExtract {
           |winhc_eci_dev company_land_announcement rowkey 20200717 e_number,project_name
           |winhc_eci_dev company_bid_list rowkey 20200717 title
           |winhc_eci_dev company_land_transfer rowkey 20200717 num,location
-          |""".stripMargin
+          |winhc_eci_dev company_employment rowkey 20200717 source
+          |""".stripMargin.replace("20200717", ds)
       for (r <- rows.split("\r\n")) {
-        if(StringUtils.isNotEmpty(r)){
-          val Array(tmp,tableName,rowkey,inc_ds,pf) =r.split(" ")
+        if (StringUtils.isNotEmpty(r)) {
+          val Array(tmp, tableName, rowkey, inc_ds, pf) = r.split(" ")
           ChangeExtractHandle(spark, project, tableName, rowkey, inc_ds, pf.split(",")).calc
         }
       }

Rozdílová data souboru nebyla zobrazena, protože soubor je příliš velký
+ 22 - 7
src/main/scala/com/winhc/bigdata/spark/jobs/chance/eci_good_news.scala


+ 1 - 3
src/main/scala/com/winhc/bigdata/spark/utils/BaseUtil.scala

@@ -113,8 +113,6 @@ object BaseUtil {
   }
 
   def main(args: Array[String]): Unit = {
-    println(replaceChar(",x,"))
-    println(replaceChar("华为信息科技公司,。百度科技公司"))
-    println(replaceChar("2015)深南法蛇民初第883-887受理郑委,曹   连云,庄忠杰,曹元洪,曹硕"))
+    println(atDaysAfter(-1, nowDate("yyyyMMdd")))
   }
 }