|
@@ -40,6 +40,8 @@ case class CreditPunishmentCaseAgg(s: SparkSession
|
|
|
|
|
|
spark.udf.register("generate_entity_id", generate_entity_id _)
|
|
spark.udf.register("generate_entity_id", generate_entity_id _)
|
|
|
|
|
|
|
|
+ def keyno_type(keyno: String): String = if (keyno == null) "1" else if (keyno.length == 32) "0" else "1"
|
|
|
|
+ spark.udf.register("keyno_type", keyno_type _)
|
|
|
|
|
|
cleanup()
|
|
cleanup()
|
|
sql(
|
|
sql(
|
|
@@ -53,19 +55,30 @@ case class CreditPunishmentCaseAgg(s: SparkSession
|
|
| ,case_no STRING COMMENT '案号'
|
|
| ,case_no STRING COMMENT '案号'
|
|
| ,rowkey STRING COMMENT '涉案rowkey'
|
|
| ,rowkey STRING COMMENT '涉案rowkey'
|
|
| ,card_num STRING COMMENT '案号'
|
|
| ,card_num STRING COMMENT '案号'
|
|
- | ,keyno STRING COMMENT '惩戒主体id'
|
|
|
|
|
|
+ | ,keyno STRING COMMENT '惩戒主体id,公司id或人id'
|
|
|
|
+ | ,keyno_type STRING COMMENT '惩戒主体类型,0 公司 1人'
|
|
| ,label STRING COMMENT '标签'
|
|
| ,label STRING COMMENT '标签'
|
|
| ,case_create_time STRING COMMENT '立案时间'
|
|
| ,case_create_time STRING COMMENT '立案时间'
|
|
| ,record_num bigint COMMENT '记录条数'
|
|
| ,record_num bigint COMMENT '记录条数'
|
|
|
|
+ |
|
|
|
|
+ | ,exec_info string COMMENT '执行情况'
|
|
|
|
+ |
|
|
| ,total_exec_amount STRING COMMENT '累计被执行总金额'
|
|
| ,total_exec_amount STRING COMMENT '累计被执行总金额'
|
|
| ,total_no_exec_amount STRING COMMENT '疑似当前欠款总金额'
|
|
| ,total_no_exec_amount STRING COMMENT '疑似当前欠款总金额'
|
|
| ,zxr_total_exec_amount STRING COMMENT '被执行人当前被执行总金额'
|
|
| ,zxr_total_exec_amount STRING COMMENT '被执行人当前被执行总金额'
|
|
| ,final_case_exec_amount STRING COMMENT '终本案件执行标的总金额'
|
|
| ,final_case_exec_amount STRING COMMENT '终本案件执行标的总金额'
|
|
| ,final_case_no_exec_amount STRING COMMENT '终本案件未履行总金额'
|
|
| ,final_case_no_exec_amount STRING COMMENT '终本案件未履行总金额'
|
|
- | ,company_dishonest_info_num BIGINT COMMENT '失信人条数'
|
|
|
|
- | ,company_zxr_num BIGINT COMMENT '被执条数'
|
|
|
|
- | ,company_zxr_final_case_num BIGINT COMMENT '终本案件条数'
|
|
|
|
- | ,company_zxr_restrict_num BIGINT COMMENT '限高条数'
|
|
|
|
|
|
+ |
|
|
|
|
+ | ,company_dishonest_info_num_0 BIGINT COMMENT '失信人条数'
|
|
|
|
+ | ,company_zxr_num_0 BIGINT COMMENT '被执条数'
|
|
|
|
+ | ,company_zxr_final_case_num_0 BIGINT COMMENT '终本案件条数'
|
|
|
|
+ | ,company_zxr_restrict_num_0 BIGINT COMMENT '限高条数'
|
|
|
|
+ |
|
|
|
|
+ | ,company_dishonest_info_num_1 BIGINT COMMENT '失信人条数-历史'
|
|
|
|
+ | ,company_zxr_num_1 BIGINT COMMENT '被执条数-历史'
|
|
|
|
+ | ,company_zxr_final_case_num_1 BIGINT COMMENT '终本案件条数-历史'
|
|
|
|
+ | ,company_zxr_restrict_num_1 BIGINT COMMENT '限高条数-历史'
|
|
|
|
+ |
|
|
| ,deleted BIGINT COMMENT '0:未删除,1:删除'
|
|
| ,deleted BIGINT COMMENT '0:未删除,1:删除'
|
|
| ,all_rowkey STRING COMMENT '所有rowkey的集合,包含历史非历史'
|
|
| ,all_rowkey STRING COMMENT '所有rowkey的集合,包含历史非历史'
|
|
|)
|
|
|)
|
|
@@ -129,7 +142,7 @@ case class CreditPunishmentCaseAgg(s: SparkSession
|
|
s"""
|
|
s"""
|
|
|SELECT
|
|
|SELECT
|
|
| split(xjk_rowkey,'@@') [0 ] AS tn
|
|
| split(xjk_rowkey,'@@') [0 ] AS tn
|
|
- | ,split(xjk_rowkey,'@@') [1 ] AS rowkey
|
|
|
|
|
|
+ | ,split(xjk_rowkey,'@@') [2 ] AS rowkey
|
|
|FROM (
|
|
|FROM (
|
|
| SELECT
|
|
| SELECT
|
|
| *
|
|
| *
|
|
@@ -196,90 +209,76 @@ case class CreditPunishmentCaseAgg(s: SparkSession
|
|
"all_record"
|
|
"all_record"
|
|
}
|
|
}
|
|
|
|
|
|
|
|
+ private def get_sql(org_tab: String): String = {
|
|
|
|
+ s"""
|
|
|
|
+ |SELECT md5(cleanup(CONCAT_WS('',name,court_name,case_no))) AS credit_punishment_case_id
|
|
|
|
+ | ,generate_entity_id(mm['keyno'],mm['card_num'],md5(CONCAT_WS('',mm['card_num'],name)),md5(cleanup(CONCAT_WS('',name,court_name,case_no)))) as credit_punishment_entity_id
|
|
|
|
+ | ,name
|
|
|
|
+ | ,court_name
|
|
|
|
+ | ,case_no
|
|
|
|
+ | ,mm['rowkey'] AS rowkey
|
|
|
|
+ | ,mm['card_num'] AS card_num
|
|
|
|
+ | ,mm['keyno'] AS keyno
|
|
|
|
+ | ,keyno_type(mm['keyno']) AS keyno_type
|
|
|
|
+ | ,mm['label'] AS label
|
|
|
|
+ | ,mm['case_create_time'] AS case_create_time
|
|
|
|
+ | ,cast(mm['record_num'] as bigint) AS record_num
|
|
|
|
+ |
|
|
|
|
+ | ,mm['exec_info'] AS exec_info
|
|
|
|
+ |
|
|
|
|
+ | ,mm['total_exec_amount'] AS total_exec_amount
|
|
|
|
+ | ,mm['total_no_exec_amount'] AS total_no_exec_amount
|
|
|
|
+ | ,mm['zxr_total_exec_amount'] AS zxr_total_exec_amount
|
|
|
|
+ | ,mm['final_case_exec_amount'] AS final_case_exec_amount
|
|
|
|
+ | ,mm['final_case_no_exec_amount'] AS final_case_no_exec_amount
|
|
|
|
+ |
|
|
|
|
+ | ,CAST(mm['company_dishonest_info_num_0'] AS BIGINT) AS company_dishonest_info_num_0
|
|
|
|
+ | ,CAST(mm['company_zxr_num_0'] AS BIGINT) AS company_zxr_num_0
|
|
|
|
+ | ,CAST(mm['company_zxr_final_case_num_0'] AS BIGINT) AS company_zxr_final_case_num_0
|
|
|
|
+ | ,CAST(mm['company_zxr_restrict_num_0'] AS BIGINT) AS company_zxr_restrict_num_0
|
|
|
|
+ |
|
|
|
|
+ | ,CAST(mm['company_dishonest_info_num_1'] AS BIGINT) AS company_dishonest_info_num_1
|
|
|
|
+ | ,CAST(mm['company_zxr_num_1'] AS BIGINT) AS company_zxr_num_1
|
|
|
|
+ | ,CAST(mm['company_zxr_final_case_num_1'] AS BIGINT) AS company_zxr_final_case_num_1
|
|
|
|
+ | ,CAST(mm['company_zxr_restrict_num_1'] AS BIGINT) AS company_zxr_restrict_num_1
|
|
|
|
+ |
|
|
|
|
+ | ,cast(mm['deleted'] as bigint) AS deleted
|
|
|
|
+ | ,mm['all_rowkey'] AS all_rowkey
|
|
|
|
+ |FROM (
|
|
|
|
+ | SELECT name
|
|
|
|
+ | ,court_name
|
|
|
|
+ | ,case_no
|
|
|
|
+ | ,credit_punishment_case_agg(rowkey,tn,keyno,card_num,case_create_time,deleted,detail_data) AS mm
|
|
|
|
+ | FROM $org_tab
|
|
|
|
+ | GROUP BY name
|
|
|
|
+ | ,court_name
|
|
|
|
+ | ,case_no
|
|
|
|
+ | )
|
|
|
|
+ |WHERE name IS NOT NULL and trim(name) <> ''
|
|
|
|
+ |""".stripMargin
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
|
|
def calc(ds: String): Unit = {
|
|
def calc(ds: String): Unit = {
|
|
val args = StartAndEndDsUtils(spark).get_gt_ds(CreditPunishment.extractionTargetTab, CreditPunishment.caseTargetTab)
|
|
val args = StartAndEndDsUtils(spark).get_gt_ds(CreditPunishment.extractionTargetTab, CreditPunishment.caseTargetTab)
|
|
|
|
|
|
val org_tab = getCreditPunishmentDataExtraction(args.inc_tab_gt_ds)
|
|
val org_tab = getCreditPunishmentDataExtraction(args.inc_tab_gt_ds)
|
|
|
|
+
|
|
|
|
+
|
|
sql(
|
|
sql(
|
|
s"""
|
|
s"""
|
|
|---INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $target_tab PARTITION(ds='${ds}')
|
|
|---INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $target_tab PARTITION(ds='${ds}')
|
|
- |SELECT md5(cleanup(CONCAT_WS('',name,court_name,case_no))) AS credit_punishment_case_id
|
|
|
|
- | ,generate_entity_id(mm['keyno'],mm['card_num'],md5(CONCAT_WS('',mm['card_num'],name)),md5(cleanup(CONCAT_WS('',name,court_name,case_no)))) as credit_punishment_entity_id
|
|
|
|
- | ,name
|
|
|
|
- | ,court_name
|
|
|
|
- | ,case_no
|
|
|
|
- | ,mm['rowkey'] AS rowkey
|
|
|
|
- | ,mm['card_num'] AS card_num
|
|
|
|
- | ,mm['keyno'] AS keyno
|
|
|
|
- | ,mm['label'] AS label
|
|
|
|
- | ,mm['case_create_time'] AS case_create_time
|
|
|
|
- | ,cast(mm['record_num'] as bigint) AS record_num
|
|
|
|
- | ,mm['total_exec_amount'] AS total_exec_amount
|
|
|
|
- | ,mm['total_no_exec_amount'] AS total_no_exec_amount
|
|
|
|
- | ,mm['zxr_total_exec_amount'] AS zxr_total_exec_amount
|
|
|
|
- | ,mm['final_case_exec_amount'] AS final_case_exec_amount
|
|
|
|
- | ,mm['final_case_no_exec_amount'] AS final_case_no_exec_amount
|
|
|
|
- | ,CAST(mm['company_dishonest_info_num'] AS BIGINT) AS company_dishonest_info_num
|
|
|
|
- | ,CAST(mm['company_zxr_num'] AS BIGINT) AS company_zxr_num
|
|
|
|
- | ,CAST(mm['company_zxr_final_case_num'] AS BIGINT) AS company_zxr_final_case_num
|
|
|
|
- | ,CAST(mm['company_zxr_restrict_num'] AS BIGINT) AS company_zxr_restrict_num
|
|
|
|
- | ,cast(mm['deleted'] as bigint) AS deleted
|
|
|
|
- | ,mm['all_rowkey'] AS all_rowkey
|
|
|
|
- |FROM (
|
|
|
|
- | SELECT name
|
|
|
|
- | ,court_name
|
|
|
|
- | ,case_no
|
|
|
|
- | ,credit_punishment_case_agg(rowkey,tn,keyno,card_num,case_create_time,deleted,detail_data) AS mm
|
|
|
|
- | FROM $org_tab
|
|
|
|
- | GROUP BY name
|
|
|
|
- | ,court_name
|
|
|
|
- | ,case_no
|
|
|
|
- | )
|
|
|
|
- |WHERE name IS NOT NULL and trim(name) <> ''
|
|
|
|
|
|
+ |${get_sql(org_tab)}
|
|
|""".stripMargin)
|
|
|""".stripMargin)
|
|
.cache()
|
|
.cache()
|
|
.createTempView(s"tmp_data1")
|
|
.createTempView(s"tmp_data1")
|
|
|
|
|
|
val all_record = getAllDataByTmpData("tmp_data1")
|
|
val all_record = getAllDataByTmpData("tmp_data1")
|
|
|
|
|
|
-
|
|
|
|
sql(
|
|
sql(
|
|
s"""
|
|
s"""
|
|
|INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $target_tab PARTITION(ds='${ds}')
|
|
|INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $target_tab PARTITION(ds='${ds}')
|
|
- |SELECT md5(cleanup(CONCAT_WS('',name,court_name,case_no))) AS credit_punishment_case_id
|
|
|
|
- | ,generate_entity_id(mm['keyno'],mm['card_num'],md5(CONCAT_WS('',mm['card_num'],name)),md5(cleanup(CONCAT_WS('',name,court_name,case_no)))) as credit_punishment_entity_id
|
|
|
|
- | ,name
|
|
|
|
- | ,court_name
|
|
|
|
- | ,case_no
|
|
|
|
- | ,mm['rowkey'] AS rowkey
|
|
|
|
- | ,mm['card_num'] AS card_num
|
|
|
|
- | ,mm['keyno'] AS keyno
|
|
|
|
- | ,mm['label'] AS label
|
|
|
|
- | ,mm['case_create_time'] AS case_create_time
|
|
|
|
- | ,cast(mm['record_num'] as bigint) AS record_num
|
|
|
|
- | ,mm['total_exec_amount'] AS total_exec_amount
|
|
|
|
- | ,mm['total_no_exec_amount'] AS total_no_exec_amount
|
|
|
|
- | ,mm['zxr_total_exec_amount'] AS zxr_total_exec_amount
|
|
|
|
- | ,mm['final_case_exec_amount'] AS final_case_exec_amount
|
|
|
|
- | ,mm['final_case_no_exec_amount'] AS final_case_no_exec_amount
|
|
|
|
- | ,CAST(mm['company_dishonest_info_num'] AS BIGINT) AS company_dishonest_info_num
|
|
|
|
- | ,CAST(mm['company_zxr_num'] AS BIGINT) AS company_zxr_num
|
|
|
|
- | ,CAST(mm['company_zxr_final_case_num'] AS BIGINT) AS company_zxr_final_case_num
|
|
|
|
- | ,CAST(mm['company_zxr_restrict_num'] AS BIGINT) AS company_zxr_restrict_num
|
|
|
|
- | ,cast(mm['deleted'] as bigint) AS deleted
|
|
|
|
- | ,mm['all_rowkey'] AS all_rowkey
|
|
|
|
- |FROM (
|
|
|
|
- | SELECT name
|
|
|
|
- | ,court_name
|
|
|
|
- | ,case_no
|
|
|
|
- | ,credit_punishment_case_agg(rowkey,tn,keyno,card_num,case_create_time,deleted,detail_data) AS mm
|
|
|
|
- | FROM $all_record
|
|
|
|
- | GROUP BY name
|
|
|
|
- | ,court_name
|
|
|
|
- | ,case_no
|
|
|
|
- | )
|
|
|
|
- |WHERE name IS NOT NULL and trim(name) <> ''
|
|
|
|
|
|
+ |${get_sql(all_record)}
|
|
|""".stripMargin)
|
|
|""".stripMargin)
|
|
}
|
|
}
|
|
}
|
|
}
|