lyb před 3 roky
rodič
revize
b9da47acd6

+ 122 - 5
src/main/scala/com/winhc/bigdata/spark/ng/jobs/args_company_job.scala

@@ -278,17 +278,134 @@ object args_company_job {
       , is_super_filter = false
     )
 
-    , args_company_job("company_ipr_pledge", Seq("reg_number", "pledgor_info", "pledgee_info", "related_company_name")
-      , rowkey_udf = "md5(cleanup(concat_ws('', reg_number, equity_info_rowkey(pledgor_info, pledgee_info), related_company_name )))"
+
+
+    , args_company_job("company_finance", Seq("company_id", "round")
+      , rowkey_udf = "md5(cleanup(concat_ws('', company_id, round)))"
+      , is_super_filter = false
+    )
+
+
+    , args_company_job("company_ipr_pledge", Seq("ipr_reg_num", "pledgee_info", "pledgor_info", "related_company_name")
+      , rowkey_udf = "md5(cleanup(concat_ws('', ipr_reg_num, equity_info_rowkey(pledgee_info, pledgor_info), related_company_name  )))"
       , is_super_filter = false
+
       , explode_args = Seq(
         explode_args("pledgor_info", "$.pledgor_id", "pledgor_keyno_explode")
         , explode_args("pledgee_info", "$.pledgee_id", "pledgee_keyno_explode")
-        )
+      )
+
     )
 
-    , args_company_job("company_finance", Seq("company_id", "round")
-      , rowkey_udf = "md5(cleanup(concat_ws('', company_id, round)))"
+
+    , args_company_job("company_copyright_reg", Seq("reg_num")
+      , rowkey_udf = "md5(cleanup(concat_ws('', reg_num)))"
+      , is_super_filter = false
+      , explode_args = Seq(
+        explode_args("author_nationality_info", "$.keyno", "author_nationality_info_keyno_explode")
+      )
+    )
+
+    , args_company_job("company_copyright_works", Seq("reg_num")
+      , rowkey_udf = "md5(cleanup(concat_ws('', reg_num)))"
+      , is_super_filter = false
+      , explode_args = Seq(
+        explode_args("author_info", "$.keyno", "author_info_keyno_explode")
+      )
+    )
+
+
+    , args_company_job("company_patent", Seq("app_number", "pub_number", "applicant_name_info")
+      , rowkey_udf = "md5(cleanup(concat_ws('', SUBSTR(app_number, 3), SUBSTR(pub_number, 3), get_text_from_json(applicant_name_info, 'name'))))"
+      , is_super_filter = false
+      , explode_args = Seq(
+        explode_args("applicant_name_info", "$.keyno", "applicant_name_info_keyno_explode")
+      )
+    )
+
+    , args_company_job("company_certificate", Seq("company_name", "type")
+      , rowkey_udf = "md5(cleanup(concat_ws('', company_name, type)))"
+      , is_super_filter = false
+    )
+//    , args_company_job("company_icp", Seq("company_name", "domain", "web_baxh")
+//      , rowkey_udf = "md5(cleanup(concat_ws('', company_name, domain, web_baxh)))"
+//      , is_super_filter = false
+//    )
+    , args_company_job("company_weibo", Seq("company_name", "name")
+      , rowkey_udf = "md5(cleanup(concat_ws('', company_name, name)))"
+      , is_super_filter = false
+    )
+    , args_company_job("company_customs_credit", Seq("company_name", "record_date")
+      , rowkey_udf = "md5(cleanup(concat_ws('', company_name, split_date(cast(record_date as String)))))"
+      , id_user_defined_rowkey = true
+      , is_super_filter = false
+    )
+
+    , args_company_job("company_customs_credit_administrative_penalty", Seq("main_id", "decision_number", "penalty_date")
+      , rowkey_udf = "concat_ws('_', main_id, md5(cleanup(concat_ws('',main_id, split_date(cast(penalty_date as String)), decision_number))) )"
+      , is_super_filter = false
+    )
+
+    , args_company_job("company_customs_credit_rating", Seq("main_id", "identification_time", "authentication_code")
+      , rowkey_udf = "concat_ws('_', main_id, md5(cleanup(concat_ws('',main_id, split_date(cast(identification_time as String)), authentication_code))) )"
+      , is_super_filter = false
+    )
+
+    , args_company_job("company_license", Seq("company_name", "license_number")
+      , rowkey_udf = "md5(cleanup(concat_ws('', company_name, license_number)))"
+      , is_super_filter = false
+    )
+
+    , args_company_job("company_license_creditchina", Seq("company_name", "licence_number")
+      , rowkey_udf = "md5(cleanup(concat_ws('', company_name, licence_number)))"
+      , is_super_filter = false
+    )
+
+    , args_company_job("company_license_entpub", Seq("company_name", "license_number")
+      , rowkey_udf = "md5(cleanup(concat_ws('', company_name, license_number)))"
+      , is_super_filter = false
+    )
+
+    , args_company_job("company_tax", Seq("company_name", "year")
+      , rowkey_udf = "md5(cleanup(concat_ws('', company_name, year)))"
+      , is_super_filter = false
+    )
+
+
+
+    , args_company_job("company_land_announcement", Seq("e_number", "project_name")
+      , rowkey_udf = "md5(cleanup(concat_ws('',  e_number, project_name)))"
+      , is_super_filter = false
+    )
+
+    , args_company_job("company_land_publicity", Seq("title", "project_name")
+      , rowkey_udf = "md5(cleanup(concat_ws('', title, project_name)))"
+      , is_super_filter = false
+    )
+
+    , args_company_job("company_land_transfer", Seq("num", "location")
+      , rowkey_udf = "md5(cleanup(concat_ws('', num, location)))"
+      , is_super_filter = false
+    )
+
+    , args_company_job("company_bond", Seq("publisher_name", "bond_name", "bond_num")
+      , rowkey_udf = "md5(cleanup(concat_ws('', publisher_name, bond_name, bond_num)))"
+      , is_super_filter = false
+    )
+
+    , args_company_job("company_tele_license", Seq("company_name", "license_number")
+      , rowkey_udf = "md5(cleanup(concat_ws('', company_name, license_number)))"
+      , id_user_defined_rowkey = true
+      , is_super_filter = false
+    )
+
+    , args_company_job("company_tele_license_annual_report", Seq("main_id")
+      , rowkey_udf = "concat_ws('_', main_id, md5(cleanup(concat_ws('',main_id))) )"
+      , is_super_filter = false
+    )
+
+    , args_company_job("company_tele_license_communication_badness", Seq("main_id", "deal_time")
+      , rowkey_udf = "concat_ws('_', main_id, md5(cleanup(concat_ws('',main_id, split_date(cast(deal_time as String))))) )"
       , is_super_filter = false
     )
   )

+ 36 - 8
src/main/scala/com/winhc/bigdata/spark/ng/jobs/general_handler.scala

@@ -117,9 +117,37 @@ case class general_handler(s: SparkSession,
 
     spark.udf.register("equity_info_rowkey", equity_info_rowkey _)
 
+    def get_text_from_json(json: String, name: String): String = {
+      try {
+        def get_seq(key: String, value: String): String = {
+          val jSONArray = JSON.parseArray(value)
+          if (jSONArray.isEmpty) {
+            return ""
+          }
+          val arr = mutable.ArrayBuffer[String]()
+          for (i <- 0 until jSONArray.size()) {
+            val jSONObject = jSONArray.getJSONObject(i)
+            arr.append(jSONObject.getString(key))
+          }
+          arr.mkString("")
+        }
+
+        val str = get_seq(name, json)
+        str
+      } catch {
+        case ex: Exception => {
+          ""
+        }
+      }
+    }
+
+    spark.udf.register("get_text_from_json", get_text_from_json _)
 
   }
 
+
+
+
   private def get_rowkey_udf(): String = {
     def get_row(): String = {
       if (StringUtils.isNotEmpty(job_args.rowkey_udf)) {
@@ -215,7 +243,7 @@ case class general_handler(s: SparkSession,
       }
          |FROM    (
          |            SELECT  *
-         |                    ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY $up) AS num
+         |                    ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY $up) AS row_num
          |            FROM    (
          |                        SELECT  $rowkey_f as rowkey
          |                                ,${inter_cols.mkString(",")}
@@ -230,7 +258,7 @@ case class general_handler(s: SparkSession,
          |                        $clean_up
          |                    ) AS t1
          |        ) AS t2
-         |WHERE   t2.num = 1
+         |WHERE   t2.row_num = 1
          |""".stripMargin)
     explode_calc()
   }
@@ -281,7 +309,7 @@ case class general_handler(s: SparkSession,
       }
          |FROM    (
          |            SELECT  *
-         |                    ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY ${up} ) AS num
+         |                    ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY ${up} ) AS row_num
          |            FROM    (
          |                        SELECT  $rowkey_f as rowkey
          |                                ,${inter_cols.mkString(",")}
@@ -290,7 +318,7 @@ case class general_handler(s: SparkSession,
          |                        $clean_up
          |                    ) AS t1
          |        ) AS t2
-         |WHERE   t2.num = 1
+         |WHERE   t2.row_num = 1
          |""".stripMargin)
 
     addEmptyPartitionOrSkip(inc_ads_tab, target_ds)
@@ -328,14 +356,14 @@ case class general_handler(s: SparkSession,
            |SELECT  *
            |FROM    (
            |            SELECT  *
-           |                    ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY $up ) AS num
+           |                    ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY $up ) AS row_num
            |            FROM    (
            |                        SELECT  *
            |                        FROM    $inc_ads_tab
            |                        WHERE   ds > ${args.inc_tab_gt_ds}
            |                    )
            |        )
-           |WHERE   num = 1
+           |WHERE   row_num = 1
            |""".stripMargin)
         .createTempView(all_date_tmp_view)
     } else {
@@ -344,7 +372,7 @@ case class general_handler(s: SparkSession,
            |SELECT  *
            |FROM    (
            |            SELECT  *
-           |                    ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY $up ) AS num
+           |                    ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY $up ) AS row_num
            |            FROM    (
            |                        SELECT  *
            |                        FROM    $ads_tab
@@ -355,7 +383,7 @@ case class general_handler(s: SparkSession,
            |                        WHERE   ds > 0
            |                    )
            |        )
-           |WHERE   num = 1
+           |WHERE   row_num = 1
            |""".stripMargin)
         .createTempView(all_date_tmp_view)
     }

+ 95 - 0
src/main/scala/com/winhc/bigdata/spark/ng/utils/CompanySummaryNg_new.scala

@@ -45,6 +45,21 @@ object CompanySummaryNg_new {
     , get_default_summary_args("private_enterprise", "company_id")
     , get_default_summary_args("company_finance", "company_id")
     , get_default_summary_args("company_equity_pledge", "company_id")
+    , get_default_summary_args("company_env_punishment", "company_id")
+    , get_default_summary_args("company_liquidating_info", "company_id")
+    , get_default_summary_args("company_certificate", "company_id")
+//    , get_default_summary_args("company_icp", "company_id")
+    , get_default_summary_args("company_weibo", "company_id")
+    , get_default_summary_args("company_customs_credit", "company_id")
+    , get_default_summary_args("company_license", "company_id")
+    , get_default_summary_args("company_license_creditchina", "company_id")
+    , get_default_summary_args("company_license_entpub", "company_id")
+    , get_default_summary_args("company_tax", "company_id")
+    , get_default_summary_args("company_land_announcement", "company_id")
+    , get_default_summary_args("company_land_publicity", "company_id")
+    , get_default_summary_args("company_bond", "company_id")
+    , get_default_summary_args("company_tele_license", "company_id")
+
 
     , SummaryArgs(table_name = "company_court_open_announcement_explode"
       , companyIdField = "plaintiff_info_id_explode"
@@ -157,6 +172,8 @@ object CompanySummaryNg_new {
         , ("1", s"company_equity_info_del_1_related")
       ))
     )
+
+
     , SummaryArgs(table_name = "company_equity_info_explode"
       , companyIdField = "pledgor_keyno_explode"
       , distinctField = "rowkey,pledgor_keyno_explode"
@@ -284,7 +301,35 @@ object CompanySummaryNg_new {
     )
 
 
+    , SummaryArgs(table_name = "company_ipr_pledge"
+      , companyIdField = "related_company_id"
+      , where = "related_company_id is not null and length(related_company_id) = 32 "
+      , groupByInfo = GroupByInfoNg(field = "deleted", value_alias = Seq(
+        ("0", s"company_ipr_pledge_del_0_related")
+        , ("1", s"company_ipr_pledge_del_1_related")
+      ))
+    )
+
+
+    , SummaryArgs(table_name = "company_ipr_pledge_explode"
+      , companyIdField = "pledgor_keyno_explode"
+      , distinctField = "rowkey,pledgor_keyno_explode"
+      , where = "pledgor_keyno_explode is not null and length(pledgor_keyno_explode) = 32 "
+      , groupByInfo = GroupByInfoNg(field = "deleted", value_alias = Seq(
+        ("0", s"company_ipr_pledge_del_0_pledgor")
+        , ("1", s"company_ipr_pledge_del_1_pledgor")
+      ))
+    )
 
+    , SummaryArgs(table_name = "company_ipr_pledge_explode"
+      , companyIdField = "pledgee_keyno_explode"
+      , distinctField = "rowkey,pledgee_keyno_explode"
+      , where = "pledgee_keyno_explode is not null and length(pledgee_keyno_explode) = 32 "
+      , groupByInfo = GroupByInfoNg(field = "deleted", value_alias = Seq(
+        ("0", s"company_ipr_pledge_del_0_pledgee")
+        , ("1", s"company_ipr_pledge_del_1_pledgee")
+      ))
+    )
 
     //====================================================
 
@@ -311,6 +356,56 @@ object CompanySummaryNg_new {
       ))
     )
 
+    , SummaryArgs(table_name = "company_copyright_reg_explode"
+      , companyIdField = "author_nationality_info_keyno_explode"
+      , distinctField = "rowkey,author_nationality_info_keyno_explode"
+      , where = "author_nationality_info_keyno_explode is not null and length(author_nationality_info_keyno_explode) = 32 "
+      , groupByInfo = GroupByInfoNg(field = "deleted", value_alias = Seq(
+        ("0", s"company_copyright_reg_del_0")
+        , ("1", s"company_copyright_reg_del_1")
+      ))
+    )
+
+    , SummaryArgs(table_name = "company_copyright_works_explode"
+      , companyIdField = "author_info_keyno_explode"
+      , distinctField = "rowkey,author_info_keyno_explode"
+      , where = "author_info_keyno_explode is not null and length(author_info_keyno_explode) = 32 "
+      , groupByInfo = GroupByInfoNg(field = "deleted", value_alias = Seq(
+        ("0", s"company_copyright_works_del_0")
+        , ("1", s"company_copyright_works_del_1")
+      ))
+    )
+
+    , SummaryArgs(table_name = "company_patent_explode"
+      , companyIdField = "applicant_name_info_keyno_explode"
+      , distinctField = "rowkey,applicant_name_info_keyno_explode"
+      , where = "applicant_name_info_keyno_explode is not null and length(applicant_name_info_keyno_explode) = 32 "
+      , groupByInfo = GroupByInfoNg(field = "deleted", value_alias = Seq(
+        ("0", s"company_patent_del_0")
+        , ("1", s"company_patent_del_1")
+      ))
+    )
+
+
+
+    , SummaryArgs(table_name = "company_land_transfer"
+      , companyIdField = "pre_keyno"
+      , where = "pre_keyno is not null and length(pre_keyno) = 32 "
+      , groupByInfo = GroupByInfoNg(field = "deleted", value_alias = Seq(
+        ("0", s"company_land_transfer_del_0_user_pre")
+        , ("1", s"company_land_transfer_del_1_user_pre")
+      ))
+    )
+
+
+    , SummaryArgs(table_name = "company_land_transfer"
+      , companyIdField = "now_keyno"
+      , where = "now_keyno is not null and length(now_keyno) = 32 "
+      , groupByInfo = GroupByInfoNg(field = "deleted", value_alias = Seq(
+        ("0", s"company_land_transfer_del_0_user_now")
+        , ("1", s"company_land_transfer_del_1_user_now")
+      ))
+    )
 
   )
 

+ 42 - 4
src/main/scala/com/winhc/bigdata/spark/ng/utils/export_company_index_2_es.scala

@@ -80,7 +80,7 @@ case class export_company_index_2_es(s: SparkSession,
          |SELECT  *
          |FROM    (
          |            SELECT  *
-         |                    ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY ds DESC ) AS num
+         |                    ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY ds DESC ) AS row_num
          |            FROM    (
          |                        SELECT  *
          |                        FROM    winhc_ng.ads_${export_args.tn}
@@ -91,7 +91,7 @@ case class export_company_index_2_es(s: SparkSession,
          |                        WHERE   ds > 0
          |                    )
          |        )
-         |WHERE   num = 1
+         |WHERE   row_num = 1
          |""".stripMargin)
       .createTempView("export_all_tab")
   }
@@ -102,14 +102,14 @@ case class export_company_index_2_es(s: SparkSession,
          |SELECT  *
          |FROM    (
          |            SELECT  *
-         |                    ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY ds DESC ) AS num
+         |                    ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY ds DESC ) AS row_num
          |            FROM    (
          |                        SELECT  *
          |                        FROM    winhc_ng.inc_ads_${export_args.tn}
          |                        WHERE   ds > $start_ds
          |                    )
          |        )
-         |WHERE   num = 1
+         |WHERE   row_num = 1
          |""".stripMargin).createTempView("export_all_tab")
   }
 
@@ -262,6 +262,44 @@ object export_company_index_2_es {
       , "rowkey,company_id,company_name,year,so110,so210,so310,so410,so510,empnum,empnum_dis,create_time,update_time,deleted".split(","))
     , export_2_es_args("company_finance"
       , "rowkey,company_id,company_name,report_date,finance_time,invest_id,money,news_title,news_url,round,percent,inverstors,inverstor_json,valuation,create_time,update_time,deleted".split(","))
+    , export_2_es_args("company_ipr_pledge"
+      , "rowkey,related_company_name,related_company_id,ipr_reg_num,ipr_name,ipr_type,pledgor_info,pledgee_info,pledge_reg_period,status,pub_date,cancel_date,cancel_reason,change_item,content_before,content_after,change_time,create_time,update_time,deleted".split(","))
+    , export_2_es_args("company_copyright_reg"
+      , "rowkey,author_nationality_info,reg_num,cat_num,full_name,simple_name,version,author_nationality,publish_time,reg_time,source_url,create_time,update_time,deleted".split(","))
+    , export_2_es_args("company_copyright_works"
+      , "rowkey,author_info,reg_num,name,type,finish_time,first_publish_time,reg_time,publish_time,country,province,city,writer,source_url,create_time,update_time,deleted".split(","))
+
+    , export_2_es_args("company_patent"
+      , "rowkey,applicant_name_info,app_number,pub_number,app_date,pub_date,title,pat_type,status_code,legal_info,create_time,update_time,deleted".split(","))
+
+    , export_2_es_args("company_certificate"
+      , "rowkey,company_id,company_name,start_date,end_date,cert_no,type,source,detail,create_time,update_time,deleted".split(","))
+//    , export_2_es_args("company_icp"
+//      , "rowkey,company_id,company_name,web_site,web_name,domain,web_baxh,review_time,create_time,update_time,deleted".split(","))
+    , export_2_es_args("company_weibo"
+      , "rowkey,company_id,company_name,name,ico,info,tags,count,fans,follow_count,source_url,create_time,update_time,deleted".split(","))
+
+    , export_2_es_args("company_customs_credit"
+      , "rowkey,company_id,company_name,record_date,customs_registered_address,management_category,industry_category,status,create_time,update_time,deleted".split(","))
+    , export_2_es_args("company_license"
+      , "rowkey,company_id,company_name,license_name,license_number,start_date,end_date,department,create_time,update_time,deleted".split(","))
+    , export_2_es_args("company_license_creditchina"
+      , "rowkey,company_id,company_name,licence_number,licence_content,validity_time,decision_date,end_date,department,create_time,update_time,deleted".split(","))
+
+    , export_2_es_args("company_license_entpub"
+      , "rowkey,company_id,company_name,license_name,license_number,scope,start_date,end_date,department,status,create_time,update_time,deleted".split(","))
+    , export_2_es_args("company_tax"
+      , "rowkey,company_id,company_name,base,id_number,grade,year,eval_department,source,type,create_time,update_time,deleted".split(","))
+    , export_2_es_args("company_land_announcement"
+      , "rowkey,company_id,e_number,application_name,district,project_name,project_loc,area,use_type,supply_method,contract_date,create_time,update_time,deleted".split(","))
+    , export_2_es_args("company_land_publicity"
+      , "rowkey,company_id,title,district,location,use_for,area,project_name,application_name,publication_organize,publication_date,create_time,update_time,deleted".split(","))
+    , export_2_es_args("company_land_transfer"
+      , "rowkey,pre_keyno,now_keyno,num,location,user_pre,user_now,area,merchandise_price,merchandise_time,create_time,update_time,deleted".split(","))
+    , export_2_es_args("company_bond"
+      , "rowkey,company_id,bond_name,bond_full_name,bond_defined_code,bond_num,publisher_name,bond_type,publish_time,bond_trade_time,debt_rtng,create_time,update_time,deleted".split(","))
+    , export_2_es_args("company_tele_license"
+      , "rowkey,company_id,company_name,license_number,business_scope,is_available,create_time,update_time,deleted".split(","))
   )