|
@@ -0,0 +1,537 @@
|
|
|
+package com.winhc.bigdata.spark.jobs
|
|
|
+
|
|
|
+import com.winhc.bigdata.spark.udf.BaseFunc
|
|
|
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
|
|
|
+import com.winhc.bigdata.spark.utils.{CompanyCidAndNameUtils, CompanySummaryPro, LoggingUtils, SparkUtils}
|
|
|
+import org.apache.spark.internal.Logging
|
|
|
+import org.apache.spark.sql.SparkSession
|
|
|
+import org.apache.spark.sql.functions.col
|
|
|
+
|
|
|
+import scala.annotation.meta.getter
|
|
|
+import scala.collection.mutable
|
|
|
+
|
|
|
+
|
|
|
+/**
|
|
|
+ * @author: XuJiakai
|
|
|
+ * @date: 2020/10/22 09:38
|
|
|
+ * 股权冻结
|
|
|
+ */
|
|
|
+case class company_judicial_assistance(s: SparkSession,
|
|
|
+ project: String
|
|
|
+ ) extends LoggingUtils with Logging with BaseFunc {
|
|
|
+ @(transient@getter) val spark: SparkSession = s
|
|
|
+
|
|
|
+ def init(): Unit = {
|
|
|
+ println(
|
|
|
+ s"""
|
|
|
+ |CREATE TABLE IF NOT EXISTS `winhc_eci_dev`.`ads_company_judicial_assistance`(
|
|
|
+ | `rowkey` string comment '执行通知文书号+执行人+被执行人',
|
|
|
+ | `id` BIGINT COMMENT '司法协助基本信息',
|
|
|
+ | `cid` BIGINT COMMENT '公司标识',
|
|
|
+ | `name` string comment '股权标的企业名称',
|
|
|
+ | `cg_assignee_cid` BIGINT COMMENT '公司标识,变更信息,受让人',
|
|
|
+ | `cg_executed_person_cid` BIGINT COMMENT '公司标识,变更信息,被执行人',
|
|
|
+ | `gn_executed_person_cid` BIGINT COMMENT '公司标识,续行信息,被执行人',
|
|
|
+ | `lf_executed_person_cid` BIGINT COMMENT '公司标识,解除冻结信息,被执行人',
|
|
|
+ | `fz_executed_person_cid` BIGINT COMMENT '公司标识,冻结信息,被执行人',
|
|
|
+ | `executed_person_cid` BIGINT COMMENT '公司标识,被执行人',
|
|
|
+ | `executed_person` STRING COMMENT '被执行人',
|
|
|
+ | `equity_amount` STRING COMMENT '股权数额',
|
|
|
+ | `execute_notice_num` STRING COMMENT '执行通知书文号',
|
|
|
+ | `executive_court` STRING COMMENT '执行法院',
|
|
|
+ | `type_state` STRING COMMENT '类型|状态',
|
|
|
+ | `fz_executive_court` STRING COMMENT '冻结信息,执行法院',
|
|
|
+ | `fz_implementation_matters` STRING COMMENT '冻结信息,执行事项',
|
|
|
+ | `fz_execute_order_num` STRING COMMENT '冻结信息,执行裁定书文号',
|
|
|
+ | `fz_execute_notice_num` STRING COMMENT '冻结信息,执行通知书文号',
|
|
|
+ | `fz_executed_person` STRING COMMENT '冻结信息,被执行人',
|
|
|
+ | `fz_equity_amount_other` STRING COMMENT '冻结信息,被执行人持有股权、其它投资权益的数额',
|
|
|
+ | `fz_license_type` STRING COMMENT '冻结信息,被执行人证照种类',
|
|
|
+ | `fz_license_num` STRING COMMENT '冻结信息,被执行人证照号码',
|
|
|
+ | `fz_from_date` DATETIME COMMENT '冻结信息,冻结期限自',
|
|
|
+ | `fz_to_date` DATETIME COMMENT '冻结信息,冻结期限至',
|
|
|
+ | `fz_period` STRING COMMENT '冻结信息,冻结期限',
|
|
|
+ | `fz_publicity_date` DATETIME COMMENT '冻结信息,冻结公示日期',
|
|
|
+ | `lf_executive_court` STRING COMMENT '解除冻结信息,执行法院',
|
|
|
+ | `lf_implementation_matters` STRING COMMENT '解除冻结信息,执行事项',
|
|
|
+ | `lf_execute_order_num` STRING COMMENT '解除冻结信息,执行裁定书文号',
|
|
|
+ | `lf_execute_notice_num` STRING COMMENT '解除冻结信息,执行通知书文号',
|
|
|
+ | `lf_executed_person` STRING COMMENT '解除冻结信息,被执行人',
|
|
|
+ | `lf_equity_amount_other` STRING COMMENT '解除冻结信息,被执行人持有股权、其它投资权益的数额',
|
|
|
+ | `lf_license_type` STRING COMMENT '解除冻结信息,被执行人证照种类',
|
|
|
+ | `lf_license_num` STRING COMMENT '解除冻结信息,被执行人证照号码',
|
|
|
+ | `lf_frozen_remove_date` DATETIME COMMENT '解除冻结信息,解除冻结日期',
|
|
|
+ | `lf_publicity_date` DATETIME COMMENT '解除冻结信息,公示日期',
|
|
|
+ | `lp_invalidation_reason` STRING COMMENT '冻结失效信息,失效原因',
|
|
|
+ | `lp_invalidation_date` DATETIME COMMENT '冻结失效信息,失效日期',
|
|
|
+ | `gn_executive_court` STRING COMMENT '续行信息,执行法院',
|
|
|
+ | `gn_implementation_matters` STRING COMMENT '续行信息,执行事项',
|
|
|
+ | `gn_execute_order_num` STRING COMMENT '续行信息,执行裁定书文号',
|
|
|
+ | `gn_execute_notice_num` STRING COMMENT '续行信息,执行通知书文号',
|
|
|
+ | `gn_executed_person` STRING COMMENT '续行信息,被执行人',
|
|
|
+ | `gn_equity_amount_other` STRING COMMENT '续行信息,被执行人持有股权、其它投资权益的数额',
|
|
|
+ | `gn_license_type` STRING COMMENT '续行信息,被执行人证照种类',
|
|
|
+ | `gn_license_num` STRING COMMENT '续行信息,被执行人证照号码',
|
|
|
+ | `gn_from_date` DATETIME COMMENT '续行信息,冻结期限自',
|
|
|
+ | `gn_to_date` DATETIME COMMENT '续行信息,冻结期限至',
|
|
|
+ | `gn_period` STRING COMMENT '续行信息,冻结期限',
|
|
|
+ | `gn_publicity_date` DATETIME COMMENT '续行信息,冻结公示日期',
|
|
|
+ | `cg_executive_court` STRING COMMENT '变更信息,执行法院',
|
|
|
+ | `cg_implementation_matters` STRING COMMENT '变更信息,执行事项',
|
|
|
+ | `cg_execute_order_num` STRING COMMENT '变更信息执行裁定书文号',
|
|
|
+ | `cg_execute_notice_num` STRING COMMENT '变更信息,执行通知书文号',
|
|
|
+ | `cg_executed_person` STRING COMMENT '变更信息,被执行人',
|
|
|
+ | `cg_equity_amount_other` STRING COMMENT '变更信息,被执行人持有股权数额',
|
|
|
+ | `cg_license_type` STRING COMMENT '变更信息,被执行人证照种类',
|
|
|
+ | `cg_license_num` STRING COMMENT '变更信息,被执行人证照号码',
|
|
|
+ | `cg_assignee` STRING COMMENT '变更信息,受让人',
|
|
|
+ | `cg_execution_date` DATETIME COMMENT '变更信息,协助执行日期',
|
|
|
+ | `cg_assignee_license_type` STRING COMMENT '变更信息,受让人证照种类',
|
|
|
+ | `cg_assignee_license_num` STRING COMMENT '变更信息,受让人证照号码',
|
|
|
+ | `create_time` DATETIME COMMENT '创建时间',
|
|
|
+ | `update_time` DATETIME COMMENT '更新时间',
|
|
|
+ | `deleted` BIGINT COMMENT '是否删除,1:删除,0:未删除')
|
|
|
+ |COMMENT '*' PARTITIONED BY(
|
|
|
+ | `ds` STRING);
|
|
|
+ |
|
|
|
+ |CREATE TABLE IF NOT EXISTS `winhc_eci_dev`.`ads_company_judicial_assistance_list`(
|
|
|
+ | `rowkey` string comment 'cid+main_id',
|
|
|
+ | `main_id` string comment '主表id',
|
|
|
+ | `cid` BIGINT COMMENT '公司标识',
|
|
|
+ | `name` string comment '企业名称',
|
|
|
+ | `flag` string comment '1为执行人,2为标的企业',
|
|
|
+ | `equity_amount` STRING COMMENT '股权数额',
|
|
|
+ | `execute_notice_num` STRING COMMENT '执行通知书文号',
|
|
|
+ | `executive_court` STRING COMMENT '执行法院',
|
|
|
+ | `type_state` STRING COMMENT '类型|状态',
|
|
|
+ | `deleted` BIGINT COMMENT '是否删除,1:删除,0:未删除')
|
|
|
+ |COMMENT '*' PARTITIONED BY(
|
|
|
+ | `ds` STRING);
|
|
|
+ |
|
|
|
+ |CREATE TABLE IF NOT EXISTS `winhc_eci_dev`.`inc_ads_company_judicial_assistance`(
|
|
|
+ | `rowkey` string comment '执行通知文书号+执行人+被执行人',
|
|
|
+ | `id` BIGINT COMMENT '司法协助基本信息',
|
|
|
+ | `cid` BIGINT COMMENT '公司标识',
|
|
|
+ | `name` string comment '股权标的企业名称',
|
|
|
+ | `cg_assignee_cid` BIGINT COMMENT '公司标识,变更信息,受让人',
|
|
|
+ | `cg_executed_person_cid` BIGINT COMMENT '公司标识,变更信息,被执行人',
|
|
|
+ | `gn_executed_person_cid` BIGINT COMMENT '公司标识,续行信息,被执行人',
|
|
|
+ | `lf_executed_person_cid` BIGINT COMMENT '公司标识,解除冻结信息,被执行人',
|
|
|
+ | `fz_executed_person_cid` BIGINT COMMENT '公司标识,冻结信息,被执行人',
|
|
|
+ | `executed_person_cid` BIGINT COMMENT '公司标识,被执行人',
|
|
|
+ | `executed_person` STRING COMMENT '被执行人',
|
|
|
+ | `equity_amount` STRING COMMENT '股权数额',
|
|
|
+ | `execute_notice_num` STRING COMMENT '执行通知书文号',
|
|
|
+ | `executive_court` STRING COMMENT '执行法院',
|
|
|
+ | `type_state` STRING COMMENT '类型|状态',
|
|
|
+ | `fz_executive_court` STRING COMMENT '冻结信息,执行法院',
|
|
|
+ | `fz_implementation_matters` STRING COMMENT '冻结信息,执行事项',
|
|
|
+ | `fz_execute_order_num` STRING COMMENT '冻结信息,执行裁定书文号',
|
|
|
+ | `fz_execute_notice_num` STRING COMMENT '冻结信息,执行通知书文号',
|
|
|
+ | `fz_executed_person` STRING COMMENT '冻结信息,被执行人',
|
|
|
+ | `fz_equity_amount_other` STRING COMMENT '冻结信息,被执行人持有股权、其它投资权益的数额',
|
|
|
+ | `fz_license_type` STRING COMMENT '冻结信息,被执行人证照种类',
|
|
|
+ | `fz_license_num` STRING COMMENT '冻结信息,被执行人证照号码',
|
|
|
+ | `fz_from_date` DATETIME COMMENT '冻结信息,冻结期限自',
|
|
|
+ | `fz_to_date` DATETIME COMMENT '冻结信息,冻结期限至',
|
|
|
+ | `fz_period` STRING COMMENT '冻结信息,冻结期限',
|
|
|
+ | `fz_publicity_date` DATETIME COMMENT '冻结信息,冻结公示日期',
|
|
|
+ | `lf_executive_court` STRING COMMENT '解除冻结信息,执行法院',
|
|
|
+ | `lf_implementation_matters` STRING COMMENT '解除冻结信息,执行事项',
|
|
|
+ | `lf_execute_order_num` STRING COMMENT '解除冻结信息,执行裁定书文号',
|
|
|
+ | `lf_execute_notice_num` STRING COMMENT '解除冻结信息,执行通知书文号',
|
|
|
+ | `lf_executed_person` STRING COMMENT '解除冻结信息,被执行人',
|
|
|
+ | `lf_equity_amount_other` STRING COMMENT '解除冻结信息,被执行人持有股权、其它投资权益的数额',
|
|
|
+ | `lf_license_type` STRING COMMENT '解除冻结信息,被执行人证照种类',
|
|
|
+ | `lf_license_num` STRING COMMENT '解除冻结信息,被执行人证照号码',
|
|
|
+ | `lf_frozen_remove_date` DATETIME COMMENT '解除冻结信息,解除冻结日期',
|
|
|
+ | `lf_publicity_date` DATETIME COMMENT '解除冻结信息,公示日期',
|
|
|
+ | `lp_invalidation_reason` STRING COMMENT '冻结失效信息,失效原因',
|
|
|
+ | `lp_invalidation_date` DATETIME COMMENT '冻结失效信息,失效日期',
|
|
|
+ | `gn_executive_court` STRING COMMENT '续行信息,执行法院',
|
|
|
+ | `gn_implementation_matters` STRING COMMENT '续行信息,执行事项',
|
|
|
+ | `gn_execute_order_num` STRING COMMENT '续行信息,执行裁定书文号',
|
|
|
+ | `gn_execute_notice_num` STRING COMMENT '续行信息,执行通知书文号',
|
|
|
+ | `gn_executed_person` STRING COMMENT '续行信息,被执行人',
|
|
|
+ | `gn_equity_amount_other` STRING COMMENT '续行信息,被执行人持有股权、其它投资权益的数额',
|
|
|
+ | `gn_license_type` STRING COMMENT '续行信息,被执行人证照种类',
|
|
|
+ | `gn_license_num` STRING COMMENT '续行信息,被执行人证照号码',
|
|
|
+ | `gn_from_date` DATETIME COMMENT '续行信息,冻结期限自',
|
|
|
+ | `gn_to_date` DATETIME COMMENT '续行信息,冻结期限至',
|
|
|
+ | `gn_period` STRING COMMENT '续行信息,冻结期限',
|
|
|
+ | `gn_publicity_date` DATETIME COMMENT '续行信息,冻结公示日期',
|
|
|
+ | `cg_executive_court` STRING COMMENT '变更信息,执行法院',
|
|
|
+ | `cg_implementation_matters` STRING COMMENT '变更信息,执行事项',
|
|
|
+ | `cg_execute_order_num` STRING COMMENT '变更信息执行裁定书文号',
|
|
|
+ | `cg_execute_notice_num` STRING COMMENT '变更信息,执行通知书文号',
|
|
|
+ | `cg_executed_person` STRING COMMENT '变更信息,被执行人',
|
|
|
+ | `cg_equity_amount_other` STRING COMMENT '变更信息,被执行人持有股权数额',
|
|
|
+ | `cg_license_type` STRING COMMENT '变更信息,被执行人证照种类',
|
|
|
+ | `cg_license_num` STRING COMMENT '变更信息,被执行人证照号码',
|
|
|
+ | `cg_assignee` STRING COMMENT '变更信息,受让人',
|
|
|
+ | `cg_execution_date` DATETIME COMMENT '变更信息,协助执行日期',
|
|
|
+ | `cg_assignee_license_type` STRING COMMENT '变更信息,受让人证照种类',
|
|
|
+ | `cg_assignee_license_num` STRING COMMENT '变更信息,受让人证照号码',
|
|
|
+ | `create_time` DATETIME COMMENT '创建时间',
|
|
|
+ | `update_time` DATETIME COMMENT '更新时间',
|
|
|
+ | `deleted` BIGINT COMMENT '是否删除,1:删除,0:未删除')
|
|
|
+ |COMMENT '*' PARTITIONED BY(
|
|
|
+ | `ds` STRING);
|
|
|
+ |CREATE TABLE IF NOT EXISTS `winhc_eci_dev`.`inc_ads_company_judicial_assistance_list`(
|
|
|
+ | `rowkey` string comment 'cid+main_id',
|
|
|
+ | `main_id` string comment '主表id',
|
|
|
+ | `cid` BIGINT COMMENT '公司标识',
|
|
|
+ | `name` string comment '企业名称',
|
|
|
+ | `flag` string comment '1为执行人,2为标的企业',
|
|
|
+ | `equity_amount` STRING COMMENT '股权数额',
|
|
|
+ | `execute_notice_num` STRING COMMENT '执行通知书文号',
|
|
|
+ | `executive_court` STRING COMMENT '执行法院',
|
|
|
+ | `type_state` STRING COMMENT '类型|状态',
|
|
|
+ | `deleted` BIGINT COMMENT '是否删除,1:删除,0:未删除')
|
|
|
+ |COMMENT '*' PARTITIONED BY(
|
|
|
+ | `ds` STRING);
|
|
|
+ |""".stripMargin)
|
|
|
+ }
|
|
|
+
|
|
|
+ reg_udf()
|
|
|
+
|
|
|
+ private val companyCidAndNameUtils: CompanyCidAndNameUtils = CompanyCidAndNameUtils(spark)
|
|
|
+
|
|
|
+ def reg_udf(): Unit = {
|
|
|
+ cleanup()
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ def calc(): Unit = {
|
|
|
+ val ods_tab = s"$project.ods_company_judicial_assistance"
|
|
|
+ val inc_ods_tab = s"$project.inc_ods_company_judicial_assistance"
|
|
|
+ val inc_ads_tab = s"$project.inc_ads_company_judicial_assistance"
|
|
|
+ val ads_tab = s"$project.ads_company_judicial_assistance"
|
|
|
+ val ads_list_tab = s"$project.ads_company_judicial_assistance_list"
|
|
|
+ val inc_ads_list_tab = s"$project.inc_ads_company_judicial_assistance_list"
|
|
|
+
|
|
|
+
|
|
|
+ val ads_cols = getColumns(ads_tab)
|
|
|
+ val intersect_ods_cols = getColumns(ods_tab).intersect(getColumns(inc_ods_tab))
|
|
|
+
|
|
|
+ val ods_last_ds = getLastPartitionsOrElse(ods_tab, "0")
|
|
|
+ val inc_ods_last_ds = getLastPartitionsOrElse(inc_ods_tab, "0")
|
|
|
+
|
|
|
+ val ads_last_ds = getLastPartitionsOrElse(ads_tab, null)
|
|
|
+ val inc_ads_last_ds = getLastPartitionsOrElse(inc_ads_tab, "0")
|
|
|
+
|
|
|
+ val ads_list_last_ds = getLastPartitionsOrElse(ads_list_tab, null)
|
|
|
+ val inc_ads_list_last_ds = getLastPartitionsOrElse(inc_ads_list_tab, "0")
|
|
|
+
|
|
|
+ val list_tab_row_num = "cleanup(concat_ws('',rowkey,cid,flag,execute_notice_num))"
|
|
|
+
|
|
|
+
|
|
|
+ def all(): Unit = {
|
|
|
+ sql(
|
|
|
+ s"""
|
|
|
+ |SELECT ${intersect_ods_cols.mkString(",")}
|
|
|
+ |FROM $ods_tab
|
|
|
+ |WHERE ds = '$ods_last_ds'
|
|
|
+ |--- UNION ALL
|
|
|
+ |--- SELECT ${intersect_ods_cols.mkString(",")}
|
|
|
+ |--- FROM $inc_ods_tab
|
|
|
+ |--- WHERE ds > '$ods_last_ds'
|
|
|
+ |""".stripMargin)
|
|
|
+ .repartition(500)
|
|
|
+ .createOrReplaceTempView("company_judicial_assistance_all")
|
|
|
+
|
|
|
+ var new_tab = companyCidAndNameUtils.addNewNameByCid("company_judicial_assistance_all", "cid", "name")
|
|
|
+ new_tab = companyCidAndNameUtils.replaceNewNameByCid(new_tab, "executed_person_cid", "executed_person")
|
|
|
+
|
|
|
+ sql(
|
|
|
+ s"""
|
|
|
+ |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $ads_tab PARTITION(ds='$ods_last_ds')
|
|
|
+ |SELECT ${ads_cols.diff(Seq("ds")).mkString(",")}
|
|
|
+ |FROM (
|
|
|
+ | SELECT MD5(cleanup(CONCAT_ws('',name,executed_person,execute_notice_num))) AS rowkey
|
|
|
+ | ,*
|
|
|
+ | ,ROW_NUMBER() OVER(PARTITION BY cleanup(CONCAT_ws('',name,executed_person)) ORDER BY ds DESC ) AS num
|
|
|
+ | FROM $new_tab
|
|
|
+ | ) AS t1
|
|
|
+ |WHERE t1.num = 1
|
|
|
+ |""".stripMargin)
|
|
|
+
|
|
|
+
|
|
|
+ sql(
|
|
|
+ s"""
|
|
|
+ |INSERT OVERWRITE TABLE $ads_list_tab PARTITION(ds='$ods_last_ds')
|
|
|
+ |SELECT rowkey
|
|
|
+ | ,main_id
|
|
|
+ | ,cid
|
|
|
+ | ,name
|
|
|
+ | ,flag
|
|
|
+ | ,target_enterprise
|
|
|
+ | ,target_enterprise_cid
|
|
|
+ | ,executed_person
|
|
|
+ | ,executed_person_cid
|
|
|
+ | ,equity_amount
|
|
|
+ | ,execute_notice_num
|
|
|
+ | ,executive_court
|
|
|
+ | ,type_state
|
|
|
+ | ,deleted
|
|
|
+ |FROM (
|
|
|
+ | SELECT *
|
|
|
+ | ,ROW_NUMBER() OVER(PARTITION BY $list_tab_row_num ORDER BY ds DESC ) AS num
|
|
|
+ | FROM (
|
|
|
+ | SELECT CONCAT_ws('_',cid,rowkey) AS rowkey
|
|
|
+ | ,rowkey AS main_id
|
|
|
+ | ,cid AS cid
|
|
|
+ | ,name AS name
|
|
|
+ | ,2 AS flag
|
|
|
+ | ,name as target_enterprise
|
|
|
+ | ,cid as target_enterprise_cid
|
|
|
+ | ,executed_person as executed_person
|
|
|
+ | ,executed_person_cid AS executed_person_cid
|
|
|
+ | ,equity_amount AS equity_amount
|
|
|
+ | ,execute_notice_num AS execute_notice_num
|
|
|
+ | ,executive_court AS executive_court
|
|
|
+ | ,type_state AS type_state
|
|
|
+ | ,deleted AS deleted
|
|
|
+ | ,ds
|
|
|
+ | FROM $ads_tab
|
|
|
+ | WHERE ds = '$ods_last_ds'
|
|
|
+ | AND cid IS NOT NULL
|
|
|
+ | UNION ALL
|
|
|
+ | SELECT CONCAT_ws('_',executed_person_cid,rowkey) AS rowkey
|
|
|
+ | ,rowkey AS main_id
|
|
|
+ | ,executed_person_cid AS cid
|
|
|
+ | ,executed_person AS name
|
|
|
+ | ,1 AS flag
|
|
|
+ | ,name as target_enterprise
|
|
|
+ | ,cid as target_enterprise_cid
|
|
|
+ | ,executed_person as executed_person
|
|
|
+ | ,executed_person_cid AS executed_person_cid
|
|
|
+ | ,equity_amount AS equity_amount
|
|
|
+ | ,execute_notice_num AS execute_notice_num
|
|
|
+ | ,executive_court AS executive_court
|
|
|
+ | ,type_state AS type_state
|
|
|
+ | ,deleted AS deleted
|
|
|
+ | ,ds
|
|
|
+ | FROM $ads_tab
|
|
|
+ | WHERE ds = '$ods_last_ds'
|
|
|
+ | AND executed_person_cid IS NOT NULL
|
|
|
+ | ) AS t1
|
|
|
+ | ) AS t2
|
|
|
+ |WHERE t2.num = 1
|
|
|
+ |""".stripMargin)
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ def inc(): Unit = {
|
|
|
+ sql(
|
|
|
+ s"""
|
|
|
+ |SELECT cid
|
|
|
+ | ,current_cid AS new_cid
|
|
|
+ | ,name AS cname
|
|
|
+ |FROM winhc_eci_dev.inc_ads_company
|
|
|
+ |WHERE ds > '$ads_last_ds'
|
|
|
+ |AND current_cid IS NOT NULL
|
|
|
+ |""".stripMargin)
|
|
|
+ .createOrReplaceTempView("tmp_company_cid_change")
|
|
|
+
|
|
|
+ sql(
|
|
|
+ s"""
|
|
|
+ |SELECT ${intersect_ods_cols.mkString(",")}
|
|
|
+ |FROM $inc_ods_tab
|
|
|
+ |WHERE ds > '$inc_ads_last_ds'
|
|
|
+ |""".stripMargin)
|
|
|
+ .repartition(500)
|
|
|
+ .createOrReplaceTempView("company_judicial_assistance_inc")
|
|
|
+
|
|
|
+ var new_tab = companyCidAndNameUtils.addNewNameByCid("company_judicial_assistance_inc", "cid", "name")
|
|
|
+ new_tab = companyCidAndNameUtils.replaceNewNameByCid(new_tab, "executed_person_cid", "executed_person")
|
|
|
+
|
|
|
+ sql(
|
|
|
+ s"""
|
|
|
+ |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $inc_ads_tab PARTITION(ds='$inc_ods_last_ds')
|
|
|
+ |SELECT ${ads_cols.diff(Seq("ds")).mkString(",")}
|
|
|
+ |FROM (
|
|
|
+ | SELECT MD5(cleanup(CONCAT_ws('',name,executed_person,execute_notice_num))) AS rowkey
|
|
|
+ | ,*
|
|
|
+ | ,ROW_NUMBER() OVER(PARTITION BY cleanup(CONCAT_ws('',name,executed_person)) ORDER BY ds DESC ) AS num
|
|
|
+ | FROM $new_tab
|
|
|
+ | ) AS t1
|
|
|
+ |WHERE t1.num = 1
|
|
|
+ |""".stripMargin)
|
|
|
+
|
|
|
+ sql(
|
|
|
+ s"""
|
|
|
+ |SELECT CONCAT_ws('_',t2.cid,t1.main_id) AS rowkey
|
|
|
+ | ,t1.main_id
|
|
|
+ | ,t2.cid as cid
|
|
|
+ | ,t2.cname as name
|
|
|
+ | ,t1.flag
|
|
|
+ | ,t1.target_enterprise
|
|
|
+ | ,t1.target_enterprise_cid
|
|
|
+ | ,t1.executed_person
|
|
|
+ | ,t1.executed_person_cid
|
|
|
+ | ,t1.equity_amount
|
|
|
+ | ,t1.execute_notice_num
|
|
|
+ | ,t1.executive_court
|
|
|
+ | ,t1.type_state
|
|
|
+ | ,t1.deleted
|
|
|
+ | ,t1.ds
|
|
|
+ |FROM (
|
|
|
+ | SELECT *
|
|
|
+ | FROM (
|
|
|
+ | SELECT *
|
|
|
+ | ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY ds DESC ) AS num
|
|
|
+ | FROM (
|
|
|
+ | SELECT *
|
|
|
+ | FROM $ads_list_tab
|
|
|
+ | WHERE ds = '$ads_list_last_ds'
|
|
|
+ | UNION ALL
|
|
|
+ | SELECT *
|
|
|
+ | FROM $inc_ads_list_tab
|
|
|
+ | WHERE ds > '$ads_list_last_ds'
|
|
|
+ | ) AS t1
|
|
|
+ | ) AS t2
|
|
|
+ | WHERE t2.num = 1
|
|
|
+ | ) AS t1
|
|
|
+ |JOIN (
|
|
|
+ | SELECT *
|
|
|
+ | FROM tmp_company_cid_change
|
|
|
+ | ) AS t2
|
|
|
+ |ON t1.cid = t2.cid
|
|
|
+ |""".stripMargin)
|
|
|
+ .createOrReplaceTempView("rep_list_tab")
|
|
|
+
|
|
|
+
|
|
|
+ sql(
|
|
|
+ s"""
|
|
|
+ |INSERT OVERWRITE TABLE $inc_ads_list_tab PARTITION(ds='$inc_ods_last_ds')
|
|
|
+ |SELECT rowkey
|
|
|
+ | ,main_id
|
|
|
+ | ,cid
|
|
|
+ | ,name
|
|
|
+ | ,flag
|
|
|
+ | ,target_enterprise
|
|
|
+ | ,target_enterprise_cid
|
|
|
+ | ,executed_person
|
|
|
+ | ,executed_person_cid
|
|
|
+ | ,equity_amount
|
|
|
+ | ,execute_notice_num
|
|
|
+ | ,executive_court
|
|
|
+ | ,type_state
|
|
|
+ | ,deleted
|
|
|
+ |FROM (
|
|
|
+ | SELECT *
|
|
|
+ | ,ROW_NUMBER() OVER(PARTITION BY $list_tab_row_num ORDER BY ds DESC ) AS num
|
|
|
+ | FROM (
|
|
|
+ | SELECT rowkey
|
|
|
+ | ,main_id
|
|
|
+ | ,cid
|
|
|
+ | ,name
|
|
|
+ | ,flag
|
|
|
+ | ,target_enterprise
|
|
|
+ | ,target_enterprise_cid
|
|
|
+ | ,executed_person
|
|
|
+ | ,executed_person_cid
|
|
|
+ | ,equity_amount
|
|
|
+ | ,execute_notice_num
|
|
|
+ | ,executive_court
|
|
|
+ | ,type_state
|
|
|
+ | ,deleted
|
|
|
+ | ,ds
|
|
|
+ | FROM rep_list_tab
|
|
|
+ | UNION ALL
|
|
|
+ | SELECT CONCAT_ws('_',cid,rowkey) AS rowkey
|
|
|
+ | ,rowkey AS main_id
|
|
|
+ | ,cid AS cid
|
|
|
+ | ,name AS name
|
|
|
+ | ,2 AS flag
|
|
|
+ | ,name as target_enterprise
|
|
|
+ | ,cid as target_enterprise_cid
|
|
|
+ | ,executed_person as executed_person
|
|
|
+ | ,executed_person_cid AS executed_person_cid
|
|
|
+ | ,equity_amount AS equity_amount
|
|
|
+ | ,execute_notice_num AS execute_notice_num
|
|
|
+ | ,executive_court AS executive_court
|
|
|
+ | ,type_state AS type_state
|
|
|
+ | ,deleted AS deleted
|
|
|
+ | ,ds
|
|
|
+ | FROM $inc_ads_tab
|
|
|
+ | WHERE ds = '$inc_ods_last_ds'
|
|
|
+ | AND cid IS NOT NULL
|
|
|
+ | UNION ALL
|
|
|
+ | SELECT CONCAT_ws('_',executed_person_cid,rowkey) AS rowkey
|
|
|
+ | ,rowkey AS main_id
|
|
|
+ | ,executed_person_cid AS cid
|
|
|
+ | ,executed_person AS name
|
|
|
+ | ,1 AS flag
|
|
|
+ | ,name as target_enterprise
|
|
|
+ | ,cid as target_enterprise_cid
|
|
|
+ | ,executed_person as executed_person
|
|
|
+ | ,executed_person_cid AS executed_person_cid
|
|
|
+ | ,equity_amount AS equity_amount
|
|
|
+ | ,execute_notice_num AS execute_notice_num
|
|
|
+ | ,executive_court AS executive_court
|
|
|
+ | ,type_state AS type_state
|
|
|
+ | ,deleted AS deleted
|
|
|
+ | ,ds
|
|
|
+ | FROM $inc_ads_tab
|
|
|
+ | WHERE ds = '$inc_ods_last_ds'
|
|
|
+ | AND executed_person_cid IS NOT NULL
|
|
|
+ | ) AS t1
|
|
|
+ | ) AS t2
|
|
|
+ |WHERE t2.num = 1
|
|
|
+ |""".stripMargin)
|
|
|
+
|
|
|
+ import com.winhc.bigdata.spark.implicits.DataFrame2HBaseHelper._
|
|
|
+ sql(
|
|
|
+ s"""
|
|
|
+ |SELECT *
|
|
|
+ |FROM $inc_ads_tab
|
|
|
+ |WHERE ds = '$inc_ods_last_ds'
|
|
|
+ |""".stripMargin)
|
|
|
+ .save2HBase("COMPANY_JUDICIAL_ASSISTANCE", "rowkey", ads_cols.diff(Seq("ds", "id")))
|
|
|
+ import com.winhc.bigdata.spark.implicits.PhoenixHelper._
|
|
|
+ sql(
|
|
|
+ s"""
|
|
|
+ |SELECT *
|
|
|
+ |FROM $inc_ads_list_tab
|
|
|
+ |WHERE ds = '$inc_ods_last_ds'
|
|
|
+ |""".stripMargin)
|
|
|
+ .select(ads_cols.diff(Seq("ds")).map(column => col(column).cast("string")): _*)
|
|
|
+ .save2PhoenixByJDBC("COMPANY_JUDICIAL_ASSISTANCE_LIST")
|
|
|
+
|
|
|
+ CompanySummaryPro(s = spark
|
|
|
+ , project = "winhc_eci_dev"
|
|
|
+ , tableName = "company_judicial_assistance_list"
|
|
|
+ , cidField = "split(rowkey,'_')[0]"
|
|
|
+ , where = "deleted = 0"
|
|
|
+ ).calc()
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ if (ads_last_ds == null) {
|
|
|
+ println("all...")
|
|
|
+ all()
|
|
|
+ } else {
|
|
|
+ println("inc...")
|
|
|
+ inc()
|
|
|
+ }
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+}
|
|
|
+
|
|
|
+object company_judicial_assistance {
|
|
|
+ def main(args: Array[String]): Unit = {
|
|
|
+ val config = mutable.Map(
|
|
|
+ "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
|
|
|
+ "spark.debug.maxToStringFields" -> "200",
|
|
|
+ "spark.hadoop.odps.spark.local.partition.amt" -> "10000"
|
|
|
+ )
|
|
|
+
|
|
|
+ val spark = SparkUtils.InitEnv(getClass.getSimpleName, config)
|
|
|
+ company_judicial_assistance(s = spark, project = "winhc_eci_dev").calc
|
|
|
+
|
|
|
+ spark.stop()
|
|
|
+ }
|
|
|
+}
|