Explorar o código

Merge branch 'master' of http://139.224.213.4:3000/bigdata/Spark_Max

# Conflicts:
#	src/main/scala/com/winhc/bigdata/spark/jobs/chance/ChangeExtract.scala
#	src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/company_brief_cancel_announcement_result.scala
晏永年 %!s(int64=4) %!d(string=hai) anos
pai
achega
3eb02dc245

+ 93 - 81
src/main/scala/com/winhc/bigdata/spark/jobs/chance/ChangeExtract.scala

@@ -1,8 +1,11 @@
 package com.winhc.bigdata.spark.jobs.chance
 
+import java.util.concurrent.{CountDownLatch, TimeUnit}
+
 import com.winhc.bigdata.spark.config.EsConfig
+import com.winhc.bigdata.spark.test.TestChangeExtract.seq
 import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
-import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, ReflectUtils, SparkUtils}
+import com.winhc.bigdata.spark.utils._
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.functions.col
 import org.apache.spark.sql.types.{MapType, StringType, StructField, StructType}
@@ -21,7 +24,7 @@ object ChangeExtract {
   //判断两个map在指定key上是否相等,如不等反回不相等字段
   def getDoubleDataMap(iterable: Iterable[Map[String, String]]): (Map[String, String], Map[String, String]) = {
     val map = iterable.map(m => (m("change_flag"), m)).toMap
-    (map("0"), map("1"))
+    (map.getOrElse("0", null), map.getOrElse("1", null))
   }
 
   def getHandleClazz(tableName: String, equCols: Seq[String]): {def handle(rowkey: String, oldMap: Map[String, String], newMap: Map[String, String]): (String, String, String, Map[String, String], String, String, String, String, Map[String, String])} = {
@@ -208,17 +211,29 @@ object ChangeExtract {
             val rowkey = x._1
             val map_list = x._2
             //          try {
-            if (map_list.size == 1) {
+            //            if (map_list.size == 1) {
+            //              val res = handle.handle(rowkey, null, map_list.head)
+            //              Row(res._1, res._2, tableName, res._3, res._4, res._5, res._6, res._7, res._8, update_time, res._9)
+            //            } else {
+            //              if (map_list.size > 2) {
+            //                logInfo("list.size > 2! rowkey:" + rowkey)
+            //              }
+            val m = getDoubleDataMap(map_list)
+
+            val new_map = m._1
+            val old_map = m._2
+            if (new_map == null && old_map == null) {
+              null
+            } else if (old_map == null) {
               val res = handle.handle(rowkey, null, map_list.head)
-              Row(res._1, res._2, tableName, res._3, res._4, res._5, res._6, res._7, res._8, update_time, res._9)
-            } else {
-              if (map_list.size > 2) {
-                logInfo("list.size > 2! rowkey:" + rowkey)
+              if (res == null) {
+                null
+              } else {
+                Row(res._1, res._2, tableName, res._3, res._4, res._5, res._6, res._7, res._8, update_time, res._9)
               }
-              val m = getDoubleDataMap(map_list)
-
-              val new_map = m._1
-              val old_map = m._2
+            } else if (new_map == null) {
+              null
+            } else {
               val res = handle.handle(rowkey, old_map, new_map)
               if (res == null) {
                 null
@@ -226,6 +241,7 @@ object ChangeExtract {
                 Row(res._1, res._2, tableName, res._3, res._4, res._5, res._6, res._7, res._8, update_time, res._9)
               }
             }
+            //            }
             /* } catch {
                case e: Exception => {
                  logError(s"xjk rowkey:$rowkey msg:${e.getMessage} equCols:$cols")
@@ -296,33 +312,33 @@ object ChangeExtract {
     , Args(tableName = "company_land_announcement", primaryFields = "e_number,project_name")
     , Args(tableName = "company_bid_list", primaryFields = "title")
     , Args(tableName = "company_land_transfer", primaryFields = "num,location")
-    , Args(tableName = "company_employment", primaryFields = "title,cid,url_path")
+    , Args(tableName = "company_employment", primaryFields = "title,url_path")
     , Args(tableName = "company_env_punishment", primaryFields = "punish_number")
     , Args(tableName = "company_icp", primaryFields = "domain")
     , Args(tableName = "company_punishment_info", primaryFields = "punish_number")
     , Args(tableName = "company_punishment_info_creditchina", primaryFields = "punish_number")
-    , Args(tableName = "bankruptcy_open_case", primaryFields = "case_no", isCopy=false) //破产重整
-    , Args(tableName = "company_public_announcement2_list", primaryFields = "applicant_cid,owner_cid,drawer_cid,gather_name_cid,bill_num")//公示催告
-    , Args(tableName = "company_mortgage_info", primaryFields = "reg_num")//动产抵押
-    , Args(tableName = "company_stock_announcement", primaryFields = "title")//企业公告
-    , Args(tableName = "company_check_info", primaryFields = "check_result")//抽查检查
-    , Args(tableName = "company_court_announcement_list", primaryFields = "content")//法院公告
-    , Args(tableName = "company_court_open_announcement_list", primaryFields = "case_reason")//开庭公告
-    , Args(tableName = "company_court_register_list", primaryFields = "area")//立案信息
-    , Args(tableName = "company_double_random_check_info", primaryFields = "check_plan_name")//双随机抽查
-    , Args(tableName = "company_judicial_sale_combine_list", primaryFields = "title")//司法拍卖
-    , Args(tableName = "company_tax_contravention", primaryFields = "case_type")//税收违法
-    , Args(tableName = "company_send_announcement_list", primaryFields = "title")//送达公告
-    , Args(tableName = "company_annual_report_out_guarantee", primaryFields = "id")//年报-对外担保
-    , Args(tableName = "company_zxr_restrict", primaryFields = "status")//限制消费令,发现最新状态
-    , Args(tableName = "company_brief_cancel_announcement", primaryFields = "credit_code")//简易注销
-    , Args(tableName = "company_liquidating_info", primaryFields = "id")//清算信息
-
-    , Args(tableName = "company_zxr_final_case", primaryFields = "identity_num")//终本案件
-    , Args(tableName = "company_license_creditchina", primaryFields = "licence_content")//行政许可-信用中国
-    , Args(tableName = "company_license_entpub", primaryFields = "license_name")//行政许可-企业公示
-    , Args(tableName = "company_license", primaryFields = "license_name")//行政许可
-    , Args(tableName = "wenshu_detail_combine", primaryFields = "cname")//文书
+    , Args(tableName = "bankruptcy_open_case", primaryFields = "case_no", isCopy = false) //破产重整
+    , Args(tableName = "company_public_announcement2_list", primaryFields = "applicant_cid,owner_cid,drawer_cid,gather_name_cid,bill_num") //公示催告
+    , Args(tableName = "company_mortgage_info", primaryFields = "reg_num") //动产抵押
+    , Args(tableName = "company_stock_announcement", primaryFields = "title") //企业公告
+    , Args(tableName = "company_check_info", primaryFields = "check_result") //抽查检查
+    , Args(tableName = "company_court_announcement_list", primaryFields = "content") //法院公告
+    , Args(tableName = "company_court_open_announcement_list", primaryFields = "case_reason") //开庭公告
+    , Args(tableName = "company_court_register_list", primaryFields = "area") //立案信息
+    , Args(tableName = "company_double_random_check_info", primaryFields = "check_plan_name") //双随机抽查
+    , Args(tableName = "company_judicial_sale_combine_list", primaryFields = "title") //司法拍卖
+    , Args(tableName = "company_tax_contravention", primaryFields = "case_type") //税收违法
+    , Args(tableName = "company_send_announcement_list", primaryFields = "title") //送达公告
+    , Args(tableName = "company_annual_report_out_guarantee", primaryFields = "id") //年报-对外担保
+    , Args(tableName = "company_zxr_restrict", primaryFields = "status") //限制消费令,发现最新状态
+    , Args(tableName = "company_brief_cancel_announcement", primaryFields = "credit_code") //简易注销
+    , Args(tableName = "company_liquidating_info", primaryFields = "id") //清算信息
+
+    , Args(tableName = "company_zxr_final_case", primaryFields = "identity_num") //终本案件
+    , Args(tableName = "company_license_creditchina", primaryFields = "licence_content") //行政许可-信用中国
+    , Args(tableName = "company_license_entpub", primaryFields = "license_name") //行政许可-企业公示
+    , Args(tableName = "company_license", primaryFields = "license_name") //行政许可
+    , Args(tableName = "wenshu_detail_combine", primaryFields = "cname") //文书
 
     , Args(tableName = "company_certificate", primaryFields = "type")
     , Args(tableName = "company_abnormal_info", primaryFields = "remove_reason")
@@ -333,11 +349,36 @@ object ChangeExtract {
     , Args(tableName = "company_staff", primaryFields = "staff_type")
     //公司名称,法人ID:人标识或公司标识,公司类型,注册地址,营业期限终止日期,经营范围,登记机关,企业状态                 ,注册资本,实收资本金额(单位:分),注销日期,注销原因
     , Args(tableName = "company", primaryKey = "cid", primaryFields = "name,legal_entity_id,company_org_type,reg_location,to_time,business_scope,reg_institute,reg_status,reg_capital,actual_capital_amount,cancel_date,cancel_reason")
-    , Args(tableName = "company_illegal_info",  primaryFields = "remove_reason")
-    , Args(tableName = "company_finance",  primaryFields = "round")
-    , Args(tableName = "company_dishonest_info",  primaryFields = "case_no")
+    , Args(tableName = "company_illegal_info", primaryFields = "remove_reason")
+    , Args(tableName = "company_finance", primaryFields = "round")
+    , Args(tableName = "company_dishonest_info", primaryFields = "case_no")
+    //    , Args(tableName = "company_holder",  primaryFields = "amount")
   )
 
+
+  case class ChangeExtract(s: SparkSession,
+                           ds: String //表名(不加前后辍)
+                          ) extends Watching {
+
+    override protected val spark: SparkSession = s
+
+    def calc(): Unit = {
+      val latch = new CountDownLatch(seq.length)
+      startArgs
+        //        .filter(s => seq.contains(s.tableName))
+        .foreach(e => {
+          asyncWatch(e.tableName, () => {
+            println("______________________________" + e.tableName + "___________________________________")
+            ChangeExtractHandle(spark, e.project, e.tableName, e.primaryKey, ds, e.primaryFields.split(",")).calc(e.isCopy)
+            latch.countDown()
+          })
+        })
+      latch.await(60, TimeUnit.MINUTES)
+    }
+
+  }
+
+
   private case class Args(project: String = "winhc_eci_dev"
                           , tableName: String
                           , primaryKey: String = "rowkey"
@@ -345,53 +386,24 @@ object ChangeExtract {
                           , isCopy: Boolean = true)
 
   def main(args: Array[String]): Unit = {
-    if (args.length == 2) {
-      val Array(tableName, inc_ds) = args
-
-      val e = startArgs.filter(_.tableName.equals(tableName)).head
-      val config = EsConfig.getEsConfigMap ++ mutable.Map(
-        "spark.hadoop.odps.project.name" -> e.project,
-        "spark.hadoop.odps.spark.local.partition.amt" -> "10"
-      )
-      val spark = SparkUtils.InitEnv("ChangeExtract", config)
-
-      ChangeExtractHandle(spark, e.project, tableName, e.primaryKey, inc_ds, e.primaryFields.split(",")).calc(e.isCopy)
-      spark.stop()
-    } else {
-      val ds = args(0)
-      val project = "winhc_eci_dev"
-      val config = EsConfig.getEsConfigMap ++ mutable.Map(
-        "spark.hadoop.odps.project.name" -> project,
-        "spark.hadoop.odps.spark.local.partition.amt" -> "10"
-      )
-      val spark = SparkUtils.InitEnv("ChangeExtract", config)
+    val Array(tableName, inc_ds) = args
+
+    val config = EsConfig.getEsConfigMap ++ mutable.Map(
+      "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "10"
+    )
+    val spark = SparkUtils.InitEnv("ChangeExtract", config)
 
+    if (tableName.equals("all")) {
       startArgs.foreach(e => {
-        ChangeExtractHandle(spark, e.project, e.tableName, e.primaryKey, ds, e.primaryFields.split(",")).calc(e.isCopy)
+        ChangeExtractHandle(spark, e.project, e.tableName, e.primaryKey, inc_ds, e.primaryFields.split(",")).calc(e.isCopy)
+      })
+    } else {
+      val set = tableName.split(",").toSet
+      startArgs.filter(a => set.contains(a.tableName)).foreach(e => {
+        ChangeExtractHandle(spark, e.project, e.tableName, e.primaryKey, inc_ds, e.primaryFields.split(",")).calc(e.isCopy)
       })
-
-
-      /* val rows =
-         """winhc_eci_dev company_tm rowkey 20200717 status_new
-           |winhc_eci_dev company_patent_list rowkey 20200717 lprs
-           |winhc_eci_dev company_copyright_works_list rowkey 20200717 type
-           |winhc_eci_dev company_copyright_reg_list rowkey 20200717 version
-           |winhc_eci_dev company_land_publicity rowkey 20200717 title,location,use_for
-           |winhc_eci_dev company_land_announcement rowkey 20200717 e_number,project_name
-           |winhc_eci_dev company_bid_list rowkey 20200717 title
-           |winhc_eci_dev company_land_transfer rowkey 20200717 num,location
-           |winhc_eci_dev company_employment rowkey 20200717 source
-           |winhc_eci_dev company_env_punishment rowkey 20200717 punish_number
-           |winhc_eci_dev company_icp rowkey 20200717 domain
-           |""".stripMargin.replace("20200717", ds)
-       for (r <- rows.split("\r\n")) {
-         if (StringUtils.isNotEmpty(r)) {
-           val as = r.split(" ")
-           val Array(tmp, tableName, rowkey, inc_ds, pf, isCopy) = if (as.length == 6) as else as :+ "true"
-           ChangeExtractHandle(spark, project, tableName, rowkey, inc_ds, pf.split(",")).calc(isCopy.toBoolean)
-         }
-       }*/
-      spark.stop()
     }
+    spark.stop()
   }
 }

+ 3 - 6
src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/company_zxr_restrict.scala

@@ -7,12 +7,9 @@ import com.winhc.bigdata.spark.utils.ChangeExtractUtils
 /**
  * @Author: Yan Yongnian
  * @Date: 2020/8/14
- * @Description:
+ * @Description: fixme 不能运行
  */
-
-
 //限制消费令
-
 case class company_zxr_restrict(equCols: Seq[String]) extends CompanyChangeHandle with Serializable {
 
   override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = {
@@ -22,7 +19,7 @@ case class company_zxr_restrict(equCols: Seq[String]) extends CompanyChangeHandl
 
   override def getBizTime(newMap: Map[String, String]): String = newMap("case_create_time")
 
-  override def getUpdateTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("case_no"), s"${newMap("title")}限制消费令发生变更")
+  override def getUpdateTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("case_no"), s"${newMap("case_no")}限制消费令发生变更")
 
-  override def getInsertTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("case_no"), s"新增${newMap("title")}限制消费令")
+  override def getInsertTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("case_no"), s"新增${newMap("case_no")}限制消费令")
 }