Bladeren bron

动态输出格式调整

xufei 4 jaren geleden
bovenliggende
commit
a417563c9b
15 gewijzigde bestanden met toevoegingen van 97 en 94 verwijderingen
  1. 1 1
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_check_info.scala
  2. 15 15
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_court_announcement_list.scala
  3. 12 12
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_court_open_announcement_list.scala
  4. 15 15
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_court_register_list.scala
  5. 1 1
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_double_random_check_info.scala
  6. 3 3
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_judicial_sale_combine_list.scala
  7. 1 1
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_license.scala
  8. 1 1
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_license_creditchina.scala
  9. 1 1
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_license_entpub.scala
  10. 1 3
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_own_tax.scala
  11. 21 23
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_tax_contravention.scala
  12. 11 11
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_zxr_final_case.scala
  13. 1 1
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/wenshu_detail_combine.scala
  14. 1 1
      src/main/scala/com/winhc/bigdata/spark/test/TestChangeExtract.scala
  15. 12 5
      src/main/scala/com/winhc/bigdata/spark/test/TestCompanyDynamic.scala

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_check_info.scala

@@ -39,7 +39,7 @@ case class company_check_info() extends CompanyDynamicHandle {
    * @param new_map
    * @return
    */
-  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = ""
+//  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = ""
 
   override protected def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = "2"
 }

+ 15 - 15
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_court_announcement_list.scala

@@ -42,21 +42,21 @@ case class company_court_announcement_list() extends CompanyDynamicHandle {
    * @param new_map
    * @return
    */
-  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = new_map.toJson(Seq(
-    "announcement_type_code->公告类型"
-    , "plaintiff->原告"
-    , "bltn_no->公告号"
-    , "court_name->法院名"
-    , "deal_grade->处理等级名称"
-    , "litigant->当事人"
-    , "judge->法官"
-    , "province->省份"
-    , "judge_phone->法官电话"
-    , "case_no->案件号"
-    , "content->案件内容"
-    , "publish_page->刊登版面"
-    , "publish_date->刊登日期"
-  ))
+//  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = new_map.toJson(Seq(
+//    "announcement_type_code->公告类型"
+//    , "plaintiff->原告"
+//    , "bltn_no->公告号"
+//    , "court_name->法院名"
+//    , "deal_grade->处理等级名称"
+//    , "litigant->当事人"
+//    , "judge->法官"
+//    , "province->省份"
+//    , "judge_phone->法官电话"
+//    , "case_no->案件号"
+//    , "content->案件内容"
+//    , "publish_page->刊登版面"
+//    , "publish_date->刊登日期"
+//  ))
 
   override protected def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = {
     val oldcname = new_map("cname")

+ 12 - 12
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_court_open_announcement_list.scala

@@ -42,18 +42,18 @@ case class company_court_open_announcement_list() extends CompanyDynamicHandle {
    * @param new_map
    * @return
    */
-  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = new_map.toJson(Seq(
-    "case_reason->案由"
-    , "case_no->案号"
-    , "start_date->开庭时间"
-    , "area->地区"
-    , "plan_date->排期日期"
-    , "judge->审判长/主审人"
-    , "litigant->当事人"
-    , "court->法院"
-    , "court_room->法庭"
-    , "content->公告内容"
-  ))
+//  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = new_map.toJson(Seq(
+//    "case_reason->案由"
+//    , "case_no->案号"
+//    , "start_date->开庭时间"
+//    , "area->地区"
+//    , "plan_date->排期日期"
+//    , "judge->审判长/主审人"
+//    , "litigant->当事人"
+//    , "court->法院"
+//    , "court_room->法庭"
+//    , "content->公告内容"
+//  ))
 
   override protected def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = {
     val oldcname = new_map("cname")

+ 15 - 15
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_court_register_list.scala

@@ -42,21 +42,21 @@ case class company_court_register_list() extends CompanyDynamicHandle {
    * @param new_map
    * @return
    */
-  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = new_map.toJson(Seq(
-    "case_reason->案由"
-    ,"case_no->案号"
-    ,"filing_date->立案日期"
-    ,"start_time->开庭时间"
-    ,"department->承办部门"
-    ,"court->法院"
-    ,"judge->承办法官"
-    ,"assistant->法官助理"
-    ,"case_type->案件类型"
-    ,"case_status->案件状态"
-    ,"plaintiff->上诉人"
-    ,"defendant->被上诉人"
-    ,"third->第三人"
-  ))
+//  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = new_map.toJson(Seq(
+//    "case_reason->案由"
+//    ,"case_no->案号"
+//    ,"filing_date->立案日期"
+//    ,"start_time->开庭时间"
+//    ,"department->承办部门"
+//    ,"court->法院"
+//    ,"judge->承办法官"
+//    ,"assistant->法官助理"
+//    ,"case_type->案件类型"
+//    ,"case_status->案件状态"
+//    ,"plaintiff->上诉人"
+//    ,"defendant->被上诉人"
+//    ,"third->第三人"
+//  ))
 
   override protected def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = {
     val oldcname = new_map("cname")

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_double_random_check_info.scala

@@ -40,7 +40,7 @@ case class company_double_random_check_info() extends CompanyDynamicHandle {
    * @param new_map
    * @return
    */
-  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = ""
+//  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = ""
 
   override protected def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = {
     "2"

+ 3 - 3
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_judicial_sale_combine_list.scala

@@ -41,9 +41,9 @@ case class company_judicial_sale_combine_list() extends CompanyDynamicHandle {
    * @param new_map
    * @return
    */
-  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = new_map.toJson(Seq(
-    "source_id->address"
-  ))
+//  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = new_map.toJson(Seq(
+//    "source_id->address"
+//  ))
 
   override protected def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = {
     "4"

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_license.scala

@@ -42,7 +42,7 @@ case class company_license() extends CompanyDynamicHandle {
    * @param new_map
    * @return
    */
-  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = ""
+//  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = ""
 
   override protected def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = "2"
 }

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_license_creditchina.scala

@@ -43,7 +43,7 @@ case class company_license_creditchina() extends CompanyDynamicHandle {
    * @param new_map
    * @return
    */
-  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = ""
+//  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = ""
 
   override protected def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = "2"
 }

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_license_entpub.scala

@@ -42,7 +42,7 @@ case class company_license_entpub() extends CompanyDynamicHandle {
    * @param new_map
    * @return
    */
-  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = ""
+//  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = ""
 
   override protected def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = "2"
 }

+ 1 - 3
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_own_tax.scala

@@ -19,11 +19,9 @@ case class company_own_tax() extends CompanyDynamicHandle {
   /**
    * 变更内容
    *
-   * @param old_map
-   * @param new_map
    * @return
    */
-  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String = null): String = new_map.toJson(Seq("tax_balance->欠税余额","tax_category->欠税税种","put_reason->列入经营异常目录原因","new_tax_balance->当前新发生的欠税余额","publish_date->发布日期","department->发布单位"))
+//  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String = null): String = new_map.toJson(Seq("tax_balance->欠税余额","tax_category->欠税税种","put_reason->列入经营异常目录原因","new_tax_balance->当前新发生的欠税余额","publish_date->发布日期","department->发布单位"))
 
 
   /**

+ 21 - 23
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_tax_contravention.scala

@@ -36,31 +36,29 @@ case class company_tax_contravention() extends CompanyDynamicHandle {
   /**
    * 变更内容
    *
-   * @param old_map
-   * @param new_map
    * @return
    */
-  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = {
-      new_map.toJson(Seq(
-        "taxpayer_name->纳税人名称"
-        , "taxpayer_number->纳税人识别号"
-        , "taxpayer_code->组织机构代码"
-        , "address->注册地址"
-        , "publish_time->发布日期"
-        , "case_type->案件性质"
-        , "department->所属税务机关"
-        , "case_info->主要违法事实"
-        , "legal_person_info->法定代表人或负责人"
-//        , s"${splitInfo(new_map("legal_person_info"))._2}->性别"
-//        , s"${splitInfo(new_map("legal_person_info"))._3}->证件名称"
-//        , s"${splitInfo(new_map("legal_person_info"))._4}->证件号码"
-        , "responsible_person_info->负有责任的财务负责人"
-//        , s"${splitInfo(new_map("legal_person_info"))._2}->性别"
-//        , s"${splitInfo(new_map("legal_person_info"))._3}->证件名称"
-//        , s"${splitInfo(new_map("legal_person_info"))._4}->证件号码"
-        , "responsible_department_info->负有直接责任的中介机构"
-      ))
-  }
+//  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = {
+//      new_map.toJson(Seq(
+//        "taxpayer_name->纳税人名称"
+//        , "taxpayer_number->纳税人识别号"
+//        , "taxpayer_code->组织机构代码"
+//        , "address->注册地址"
+//        , "publish_time->发布日期"
+//        , "case_type->案件性质"
+//        , "department->所属税务机关"
+//        , "case_info->主要违法事实"
+//        , "legal_person_info->法定代表人或负责人"
+////        , s"${splitInfo(new_map("legal_person_info"))._2}->性别"
+////        , s"${splitInfo(new_map("legal_person_info"))._3}->证件名称"
+////        , s"${splitInfo(new_map("legal_person_info"))._4}->证件号码"
+//        , "responsible_person_info->负有责任的财务负责人"
+////        , s"${splitInfo(new_map("legal_person_info"))._2}->性别"
+////        , s"${splitInfo(new_map("legal_person_info"))._3}->证件名称"
+////        , s"${splitInfo(new_map("legal_person_info"))._4}->证件号码"
+//        , "responsible_department_info->负有直接责任的中介机构"
+//      ))
+//  }
 
   def splitInfo(s: String) = {
     if (StringUtils.isNotBlank(s) && s.replaceAll(":", ",").split(",").size == 4) {

+ 11 - 11
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_zxr_final_case.scala

@@ -42,17 +42,17 @@ case class company_zxr_final_case() extends CompanyDynamicHandle {
    * @param new_map
    * @return
    */
-  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = new_map.toJson(Seq(
-    "name->被执行人姓名"
-    ,"identity_num->身份证号码/组织机构代码"
-    ,"sex->性别"
-    ,"case_no->案号"
-    ,"court_name->执行法院"
-    ,"case_create_time->立案时间"
-    ,"case_final_time->终本日期"
-    ,"exec_amount->执行标的(元)"
-    ,"no_exec_amount->未履行金额"
-  ))
+//  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = new_map.toJson(Seq(
+//    "name->被执行人姓名"
+//    ,"identity_num->身份证号码/组织机构代码"
+//    ,"sex->性别"
+//    ,"case_no->案号"
+//    ,"court_name->执行法院"
+//    ,"case_create_time->立案时间"
+//    ,"case_final_time->终本日期"
+//    ,"exec_amount->执行标的(元)"
+//    ,"no_exec_amount->未履行金额"
+//  ))
 
   override protected def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = {
     "4"

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/wenshu_detail_combine.scala

@@ -50,7 +50,7 @@ case class wenshu_detail_combine() extends CompanyDynamicHandle {
    * @param new_map
    * @return
    */
-  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = ""
+//  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = ""
 
   override protected def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = {
     val t1 = new_map("name_type")

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/test/TestChangeExtract.scala

@@ -86,7 +86,7 @@ object TestChangeExtract {
   }
 
   val seq = List(
-    "company_mortgage_info"
+    "company_own_tax"
     , "company_check_info"
     , "company_court_announcement_list"
     , "company_court_open_announcement_list"

+ 12 - 5
src/main/scala/com/winhc/bigdata/spark/test/TestCompanyDynamic.scala

@@ -45,21 +45,22 @@ object TestCompanyDynamic {
     , Args(tableName = "company_annual_report_out_guarantee", bName = 1)
     , Args(tableName = "company_zxr_restrict", bName = 1)
 
+    , Args(tableName = "company_own_tax", bName = 1) //终本案件
     , Args(tableName = "company_zxr_final_case", bName = 1) //终本案件
     , Args(tableName = "company_license_creditchina", bName = 1) //行政许可-信用中国
     , Args(tableName = "company_license_entpub", bName = 1) //行政许可-企业公示
     , Args(tableName = "company_license", bName = 1) //行政许可
     , Args(tableName = "company_check_info", bName = 1) //抽查检查
-    , Args(tableName = "company_court_announcement_list", bName = 1) //法院公告
-    , Args(tableName = "company_court_open_announcement_list", bName = 1) //开庭公告
-    , Args(tableName = "company_court_register_list", bName = 1) //立案信息
+    , Args(tableName = "company_court_announcement_list", bName = 2) //法院公告
+    , Args(tableName = "company_court_open_announcement_list", bName = 2) //开庭公告
+    , Args(tableName = "company_court_register_list", bName = 2) //立案信息
     , Args(tableName = "company_double_random_check_info", bName = 1) //双随机抽查
     , Args(tableName = "company_judicial_sale_combine_list", bName = 1) //司法拍卖
     , Args(tableName = "company_tax_contravention", bName = 1) //税收违法
     , Args(tableName = "wenshu_detail_combine", bName = 1) //裁判文书
   )
   val seq = List(
-    "company_mortgage_info"
+      "company_own_tax"
     , "company_check_info"
     , "company_court_announcement_list"
     , "company_court_open_announcement_list"
@@ -76,6 +77,8 @@ object TestCompanyDynamic {
 
   def main(args: Array[String]): Unit = {
 
+    val ds = args(0)
+    println(s"$ds")
     val project = "winhc_eci_dev"
 
     val config = EsConfig.getEsConfigMap ++ mutable.Map(
@@ -83,7 +86,11 @@ object TestCompanyDynamic {
       "spark.hadoop.odps.spark.local.partition.amt" -> "1000"
     )
     val spark = SparkUtils.InitEnv("CompanyDynamic", config)
-    TestCompanyDynamic(spark, project, "20200817").calc()
+    val s = spark.sparkContext.statusTracker
+      s.getExecutorInfos.map(x=>{
+      x.numRunningTasks()
+    })
+    TestCompanyDynamic(spark, project, ds).calc()
     spark.stop()
   }