许家凯 преди 3 години
родител
ревизия
ad11291e9d

+ 40 - 39
src/main/scala/com/winhc/bigdata/spark/ng/utils/CompanySummaryNg_new.scala

@@ -223,8 +223,8 @@ case class CompanySummaryNg_new(s: SparkSession,
   def calc(): Unit = {
     val summary_tab = "summary_tab_xjk"
     val summary_tabs = args.map(get_tab_summary).seq
-    merge_table(spark, summary_tabs, "company_id").calc(summary_tab)
-
+    val merge = merge_table(spark, summary_tabs, "company_id")
+    merge.calc(summary_tab)
     val cols = getColumns(summary_tab).diff(Seq("company_id"))
 
     sql(
@@ -240,6 +240,7 @@ case class CompanySummaryNg_new(s: SparkSession,
          |FROM
          |    xjk_tmp_summary_tab
          |""".stripMargin)
+    merge.drop()
   }
 
   private def getCastCols(name: String, pre: String): String = {
@@ -265,11 +266,11 @@ object CompanySummaryNg_new {
     )
   }
 
-  private def get_default_summary_args(tableName: String,company_id:String): company_summary_args = {
+  private def get_default_summary_args(tableName: String, company_id: String): company_summary_args = {
     company_summary_args(
       table_name = tableName
       , companyIdField = company_id
-      , where = s"$company_id is not null and length($company_id) = 32 and deleted <> 9"
+      , where = s"$company_id is not null and length($company_id) = 32 "
       , groupByInfo = GroupByInfoNg(field = "deleted", value_alias = Seq(
         ("0", s"${tableName}_del_0")
         , ("1", s"${tableName}_del_1")
@@ -287,7 +288,7 @@ object CompanySummaryNg_new {
     , company_summary_args(table_name = "company_court_open_announcement_explode"
       , companyIdField = "plaintiff_info_id_explode"
       , distinctField = "rowkey,plaintiff_info_id_explode"
-      , where = "plaintiff_info_id_explode is not null and length(plaintiff_info_id_explode) = 32 and deleted <> 9"
+      , where = "plaintiff_info_id_explode is not null and length(plaintiff_info_id_explode) = 32 "
       , groupByInfo = GroupByInfoNg(field = "deleted", value_alias = Seq(
         ("0", s"company_court_open_announcement_del_0_plaintiff")
         , ("1", s"company_court_open_announcement_del_1_plaintiff")
@@ -296,7 +297,7 @@ object CompanySummaryNg_new {
     , company_summary_args(table_name = "company_court_open_announcement_explode"
       , companyIdField = "defendant_info_id_explode"
       , distinctField = "rowkey,defendant_info_id_explode"
-      , where = "defendant_info_id_explode is not null and length(defendant_info_id_explode) = 32 and deleted <> 9"
+      , where = "defendant_info_id_explode is not null and length(defendant_info_id_explode) = 32 "
       , groupByInfo = GroupByInfoNg(field = "deleted", value_alias = Seq(
         ("0", s"company_court_open_announcement_del_0_defendant")
         , ("1", s"company_court_open_announcement_del_1_defendant")
@@ -305,7 +306,7 @@ object CompanySummaryNg_new {
 
     , company_summary_args(table_name = "company_dishonest_info"
       , companyIdField = "keyno"
-      , where = "keyno is not null and length(keyno) = 32 and deleted <> 9"
+      , where = "keyno is not null and length(keyno) = 32 "
       , groupByInfo = GroupByInfoNg(field = "deleted", value_alias = Seq(
         ("0", s"company_dishonest_info_del_0")
         , ("1", s"company_dishonest_info_del_1")
@@ -313,7 +314,7 @@ object CompanySummaryNg_new {
     )
     , company_summary_args(table_name = "company_zxr_restrict"
       , companyIdField = "company_id"
-      , where = "company_id is not null and length(company_id) = 32 and deleted <> 9"
+      , where = "company_id is not null and length(company_id) = 32 "
       , groupByInfo = GroupByInfoNg(field = "deleted", value_alias = Seq(
         ("0", s"company_zxr_restrict_del_0")
         , ("1", s"company_zxr_restrict_del_1")
@@ -322,7 +323,7 @@ object CompanySummaryNg_new {
 
     , company_summary_args(table_name = "company_abnormal_info"
       , companyIdField = "company_id"
-      , where = "company_id is not null and length(company_id) = 32 and deleted <> 9"
+      , where = "company_id is not null and length(company_id) = 32 "
       , groupByInfo = GroupByInfoNg(field = "deleted", value_alias = Seq(
         ("0", s"company_abnormal_info_del_0")
         , ("1", s"company_abnormal_info_del_1")
@@ -332,7 +333,7 @@ object CompanySummaryNg_new {
     //todo
     , company_summary_args(table_name = "company_public_announcement"
       , companyIdField = "company_id"
-      , where = "company_id is not null and length(company_id) = 32 and deleted <> 9"
+      , where = "company_id is not null and length(company_id) = 32 "
       , groupByInfo = GroupByInfoNg(field = "deleted", value_alias = Seq(
         ("0", s"company_public_announcement_del_0")
         , ("1", s"company_public_announcement_del_1")
@@ -340,7 +341,7 @@ object CompanySummaryNg_new {
     )
     , company_summary_args(table_name = "company_illegal_info"
       , companyIdField = "company_id"
-      , where = "company_id is not null and length(company_id) = 32 and deleted <> 9"
+      , where = "company_id is not null and length(company_id) = 32 "
       , groupByInfo = GroupByInfoNg(field = "deleted", value_alias = Seq(
         ("0", s"company_illegal_info_del_0")
         , ("1", s"company_illegal_info_del_1")
@@ -350,7 +351,7 @@ object CompanySummaryNg_new {
 
     , company_summary_args(table_name = "company_land_mortgage"
       , companyIdField = "mortgagor_company_id"
-      , where = "mortgagor_company_id is not null and length(mortgagor_company_id) = 32 and deleted <> 9"
+      , where = "mortgagor_company_id is not null and length(mortgagor_company_id) = 32 "
       , groupByInfo = GroupByInfoNg(field = "deleted", value_alias = Seq(
         ("0", s"company_land_mortgage_del_0_mortgagor")
         , ("1", s"company_land_mortgage_del_1_mortgagor")
@@ -360,7 +361,7 @@ object CompanySummaryNg_new {
 
     , company_summary_args(table_name = "company_land_mortgage"
       , companyIdField = "mortgagee_company_id"
-      , where = "mortgagee_company_id is not null and length(mortgagee_company_id) = 32 and deleted <> 9"
+      , where = "mortgagee_company_id is not null and length(mortgagee_company_id) = 32 "
       , groupByInfo = GroupByInfoNg(field = "deleted", value_alias = Seq(
         ("0", s"company_land_mortgage_del_0_mortgagee")
         , ("1", s"company_land_mortgage_del_1_mortgagee")
@@ -370,7 +371,7 @@ object CompanySummaryNg_new {
 
     , company_summary_args(table_name = "company_judicial_assistance"
       , companyIdField = "company_id"
-      , where = "company_id is not null and length(company_id) = 32 and deleted <> 9"
+      , where = "company_id is not null and length(company_id) = 32 "
       , groupByInfo = GroupByInfoNg(field = "deleted", value_alias = Seq(
         ("0", s"company_judicial_assistance_del_0")
         , ("1", s"company_judicial_assistance_del_1")
@@ -380,7 +381,7 @@ object CompanySummaryNg_new {
 
     , company_summary_args(table_name = "company_equity_info"
       , companyIdField = "related_company_id"
-      , where = "related_company_id is not null and length(related_company_id) = 32 and deleted <> 9"
+      , where = "related_company_id is not null and length(related_company_id) = 32 "
       , groupByInfo = GroupByInfoNg(field = "deleted", value_alias = Seq(
         ("0", s"company_equity_info_del_0_related")
         , ("1", s"company_equity_info_del_1_related")
@@ -389,7 +390,7 @@ object CompanySummaryNg_new {
     , company_summary_args(table_name = "company_equity_info_explode"
       , companyIdField = "pledgor_keyno_explode"
       , distinctField = "rowkey,pledgor_keyno_explode"
-      , where = "pledgor_keyno_explode is not null and length(pledgor_keyno_explode) = 32 and deleted <> 9"
+      , where = "pledgor_keyno_explode is not null and length(pledgor_keyno_explode) = 32 "
       , groupByInfo = GroupByInfoNg(field = "deleted", value_alias = Seq(
         ("0", s"company_equity_info_del_0_pledgor")
         , ("1", s"company_equity_info_del_1_pledgor")
@@ -398,7 +399,7 @@ object CompanySummaryNg_new {
     , company_summary_args(table_name = "company_equity_info_explode"
       , companyIdField = "pledgee_keyno_explode"
       , distinctField = "rowkey,pledgee_keyno_explode"
-      , where = "pledgee_keyno_explode is not null and length(pledgee_keyno_explode) = 32 and deleted <> 9"
+      , where = "pledgee_keyno_explode is not null and length(pledgee_keyno_explode) = 32 "
       , groupByInfo = GroupByInfoNg(field = "deleted", value_alias = Seq(
         ("0", s"company_equity_info_del_0_pledgee")
         , ("1", s"company_equity_info_del_1_pledgee")
@@ -408,7 +409,7 @@ object CompanySummaryNg_new {
     , company_summary_args(table_name = "company_court_announcement_explode"
       , companyIdField = "plaintiff_info_id_explode"
       , distinctField = "rowkey,plaintiff_info_id_explode"
-      , where = "plaintiff_info_id_explode is not null and length(plaintiff_info_id_explode) = 32 and deleted <> 9"
+      , where = "plaintiff_info_id_explode is not null and length(plaintiff_info_id_explode) = 32 "
       , groupByInfo = GroupByInfoNg(field = "deleted", value_alias = Seq(
         ("0", s"company_court_announcement_del_0_plaintiff")
         , ("1", s"company_court_announcement_del_1_plaintiff")
@@ -417,7 +418,7 @@ object CompanySummaryNg_new {
     , company_summary_args(table_name = "company_court_announcement_explode"
       , companyIdField = "litigant_info_id_explode"
       , distinctField = "rowkey,litigant_info_id_explode"
-      , where = "litigant_info_id_explode is not null and length(litigant_info_id_explode) = 32 and deleted <> 9"
+      , where = "litigant_info_id_explode is not null and length(litigant_info_id_explode) = 32 "
       , groupByInfo = GroupByInfoNg(field = "deleted", value_alias = Seq(
         ("0", s"company_court_announcement_del_0_litigant")
         , ("1", s"company_court_announcement_1_litigant")
@@ -426,7 +427,7 @@ object CompanySummaryNg_new {
     , company_summary_args(table_name = "company_send_announcement_explode"
       , companyIdField = "plaintiff_info_id_explode"
       , distinctField = "rowkey,plaintiff_info_id_explode"
-      , where = "plaintiff_info_id_explode is not null and length(plaintiff_info_id_explode) = 32 and deleted <> 9"
+      , where = "plaintiff_info_id_explode is not null and length(plaintiff_info_id_explode) = 32 "
       , groupByInfo = GroupByInfoNg(field = "deleted", value_alias = Seq(
         ("0", s"company_send_announcement_del_0_plaintiff")
         , ("1", s"company_send_announcement_1_plaintiff")
@@ -435,14 +436,14 @@ object CompanySummaryNg_new {
     , company_summary_args(table_name = "company_send_announcement_explode"
       , companyIdField = "defendant_info_id_explode"
       , distinctField = "rowkey,defendant_info_id_explode"
-      , where = "defendant_info_id_explode is not null and length(defendant_info_id_explode) = 32 and deleted <> 9"
+      , where = "defendant_info_id_explode is not null and length(defendant_info_id_explode) = 32 "
       , groupByInfo = GroupByInfoNg(field = "deleted", value_alias = Seq(
         ("0", s"company_send_announcement_del_0_defendant")
         , ("1", s"company_send_announcement_1_defendant")
       ))
     ), company_summary_args(table_name = "company_zxr_final_case"
       , companyIdField = "keyno"
-      , where = "keyno is not null and length(keyno) = 32 and deleted <> 9"
+      , where = "keyno is not null and length(keyno) = 32 "
       , groupByInfo = GroupByInfoNg(field = "deleted", value_alias = Seq(
         ("0", s"company_send_announcement_del_0")
         , ("1", s"company_send_announcement_del_1")
@@ -452,7 +453,7 @@ object CompanySummaryNg_new {
     , company_summary_args(table_name = "company_court_register_explode"
       , companyIdField = "plaintiff_info_id_explode"
       , distinctField = "rowkey,plaintiff_info_id_explode"
-      , where = "plaintiff_info_id_explode is not null and length(plaintiff_info_id_explode) = 32 and deleted <> 9"
+      , where = "plaintiff_info_id_explode is not null and length(plaintiff_info_id_explode) = 32 "
       , groupByInfo = GroupByInfoNg(field = "deleted", value_alias = Seq(
         ("0", s"company_court_register_del_0_plaintiff")
         , ("1", s"company_court_register_del_1_plaintiff")
@@ -462,7 +463,7 @@ object CompanySummaryNg_new {
     , company_summary_args(table_name = "company_court_register_explode"
       , companyIdField = "defendant_info_id_explode"
       , distinctField = "rowkey,defendant_info_id_explode"
-      , where = "defendant_info_id_explode is not null and length(defendant_info_id_explode) = 32 and deleted <> 9"
+      , where = "defendant_info_id_explode is not null and length(defendant_info_id_explode) = 32 "
       , groupByInfo = GroupByInfoNg(field = "deleted", value_alias = Seq(
         ("0", s"company_court_register_del_0_defendant")
         , ("1", s"company_court_register_del_1_defendant")
@@ -474,7 +475,7 @@ object CompanySummaryNg_new {
     , company_summary_args(table_name = "bankruptcy_open_case_explode"
       , companyIdField = "applicant_info_id_explode"
       , distinctField = "rowkey,applicant_info_id_explode"
-      , where = "applicant_info_id_explode is not null and length(applicant_info_id_explode) = 32 and deleted <> 9"
+      , where = "applicant_info_id_explode is not null and length(applicant_info_id_explode) = 32 "
       , groupByInfo = GroupByInfoNg(field = "deleted", value_alias = Seq(
         ("0", s"bankruptcy_open_case_del_0_applicant")
         , ("1", s"bankruptcy_open_case_del_1_applicant")
@@ -484,7 +485,7 @@ object CompanySummaryNg_new {
     , company_summary_args(table_name = "bankruptcy_open_case_explode"
       , companyIdField = "respondent_info_id_explode"
       , distinctField = "rowkey,respondent_info_id_explode"
-      , where = "respondent_info_id_explode is not null and length(respondent_info_id_explode) = 32 and deleted <> 9"
+      , where = "respondent_info_id_explode is not null and length(respondent_info_id_explode) = 32 "
       , groupByInfo = GroupByInfoNg(field = "deleted", value_alias = Seq(
         ("0", s"bankruptcy_open_case_del_0_respondent")
         , ("1", s"bankruptcy_open_case_del_1_respondent")
@@ -492,22 +493,22 @@ object CompanySummaryNg_new {
     )
 
 
-      //====================================================
+    //====================================================
 
-    , get_default_summary_args("company_zxr","keyno")
-    , get_default_summary_args("company_punishment_info","company_id")
-    , get_default_summary_args("company_tax_contravention","company_id")
-    , get_default_summary_args("company_own_tax","company_id")
-    , get_default_summary_args("company_check_info","company_id")
-    , get_default_summary_args("company_punishment_info_creditchina","company_id")
-    , get_default_summary_args("company_mortgage_info","company_id")
-    , get_default_summary_args("company_brief_cancel_announcement","company_id")
-    , get_default_summary_args("company_double_random_check_info","company_id")
+    , get_default_summary_args("company_zxr", "keyno")
+    , get_default_summary_args("company_punishment_info", "company_id")
+    , get_default_summary_args("company_tax_contravention", "company_id")
+    , get_default_summary_args("company_own_tax", "company_id")
+    , get_default_summary_args("company_check_info", "company_id")
+    , get_default_summary_args("company_punishment_info_creditchina", "company_id")
+    , get_default_summary_args("company_mortgage_info", "company_id")
+    , get_default_summary_args("company_brief_cancel_announcement", "company_id")
+    , get_default_summary_args("company_double_random_check_info", "company_id")
 
     , company_summary_args(table_name = "auction_tracking_explode"
       , companyIdField = "company_info_id_explode"
       , distinctField = "rowkey,company_info_id_explode"
-      , where = "company_info_id_explode is not null and length(company_info_id_explode) = 32 and deleted <> 9"
+      , where = "company_info_id_explode is not null and length(company_info_id_explode) = 32 "
       , groupByInfo = GroupByInfoNg(field = "deleted", value_alias = Seq(
         ("0", s"auction_tracking_del_0")
         , ("1", s"auction_tracking_del_1")
@@ -515,9 +516,9 @@ object CompanySummaryNg_new {
     )
 
 
-
   )
 
+
   def main(args: Array[String]): Unit = {
 
     val config = mutable.Map(
@@ -527,7 +528,7 @@ object CompanySummaryNg_new {
     )
 
     val spark = SparkUtils.InitEnv(getClass.getSimpleName, config)
-    CompanySummaryNg_new(s = spark, project = "winhc_ng", args = start_args).calc()
+    PersonSummaryNg_new(s = spark, project = "winhc_ng", args = start_args).calc()
     spark.stop()
   }
 }

+ 2 - 3
src/main/scala/com/winhc/bigdata/spark/ng/utils/merge_table.scala

@@ -19,7 +19,7 @@ case class merge_table(s: SparkSession,
                        join_key: String
                       ) extends LoggingUtils with BaseFunc {
   @(transient@getter) val spark: SparkSession = s
-  private val tmp_tab_prefix = "winhc_ng.tmp_"
+  private val tmp_tab_prefix = "winhc_ng.tmp_p_"
 
 
   private def init(): Unit = {
@@ -82,11 +82,10 @@ case class merge_table(s: SparkSession,
       df = df.join(df_2, Seq(join_key), "full").cache()
     }
     df.createTempView(tempView)
-    drop()
   }
 
 
-  private def drop(): Unit = {
+  def drop(): Unit = {
     for (elem <- tables) {
       sql(
         s"""