|
@@ -0,0 +1,321 @@
|
|
|
+package com.winhc.bigdata.spark.model
|
|
|
+
|
|
|
+import java.util.Date
|
|
|
+
|
|
|
+import com.winhc.bigdata.calc.DimScoreV2
|
|
|
+import com.winhc.bigdata.spark.utils.{AsyncExtract, BaseUtil, LoggingUtils, Maxcomputer2Hbase, SparkUtils}
|
|
|
+import org.apache.spark.sql.{Row, SparkSession}
|
|
|
+
|
|
|
+import scala.annotation.meta.getter
|
|
|
+import scala.collection.mutable
|
|
|
+
|
|
|
+/**
|
|
|
+ * @Description:清算信息,简易注销,严重违法,经营异常,税收违法,欠税公告,公示催告,司法拍卖
|
|
|
+ * @author π
|
|
|
+ * @date 2020/9/316:52
|
|
|
+ */
|
|
|
+object CompanyCommonScoreV1 {
|
|
|
+
|
|
|
+ private case class Params(tableName: String //表名
|
|
|
+ , tableView: String = "" //增量数据视图
|
|
|
+ , flag: String //维度code
|
|
|
+ , time: String = "update_time" //去重字段
|
|
|
+ , kind: String //大类
|
|
|
+ , project: String //小类
|
|
|
+ , tp: String = "0" //区分增量=1,存量=0
|
|
|
+ , namespace: String = "winhc_eci_dev" //工作空间
|
|
|
+ )
|
|
|
+
|
|
|
+ private val startParams = Seq(
|
|
|
+ Params(tableName = "company_liquidating_info", tableView = "", flag = "401", time = "update_time", kind = "经营风险", project = "清算信息", tp = "0")
|
|
|
+ , Params(tableName = "company_brief_cancel_announcement", tableView = "", flag = "402", time = "update_time", kind = "经营风险", project = "简易注销", tp = "0")
|
|
|
+ , Params(tableName = "company_illegal_info", tableView = "", flag = "403", time = "update_time", kind = "经营风险", project = "严重违法行为", tp = "0")
|
|
|
+ , Params(tableName = "company_abnormal_info", tableView = "", flag = "409", time = "update_time", kind = "经营风险", project = "经营异常", tp = "0")
|
|
|
+ , Params(tableName = "company_tax_contravention", tableView = "", flag = "412", time = "update_time", kind = "经营风险", project = "税收违法", tp = "0")
|
|
|
+ , Params(tableName = "company_own_tax", tableView = "", flag = "413", time = "update_time", kind = "经营风险", project = "欠税公告", tp = "0")
|
|
|
+ , Params(tableName = "company_public_announcement2_list", tableView = "", flag = "414", time = "update_time", kind = "经营风险", project = "公示催告", tp = "0")
|
|
|
+ , Params(tableName = "company_judicial_sale_combine_list", tableView = "", flag = "506", time = "update_time", kind = "法律风险", project = "司法拍卖", tp = "0")
|
|
|
+ //增量是全量, Params(tableName = "bankruptcy_open_case", tableView = "", flag = "505", time = "update_time", kind = "法律风险", project = "破产重整", tp = "0",namespace = "winhc_eci")
|
|
|
+ )
|
|
|
+
|
|
|
+ def main(args: Array[String]): Unit = {
|
|
|
+
|
|
|
+ if (args.length != 2) {
|
|
|
+ println(
|
|
|
+ s"""
|
|
|
+ |Please enter the legal parameters !
|
|
|
+ |<project> <tableNames>
|
|
|
+ |""".stripMargin)
|
|
|
+ sys.exit(-1)
|
|
|
+ }
|
|
|
+
|
|
|
+ val Array(project, tableNames) = args
|
|
|
+
|
|
|
+ println(
|
|
|
+ s"""
|
|
|
+ |project: $project
|
|
|
+ |tableNames: $tableNames
|
|
|
+ |""".stripMargin)
|
|
|
+
|
|
|
+ val config = mutable.Map(
|
|
|
+ "spark.hadoop.odps.project.name" -> s"$project",
|
|
|
+ "spark.hadoop.odps.spark.local.partition.amt" -> "1000"
|
|
|
+ )
|
|
|
+ val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
|
|
|
+
|
|
|
+ var start = startParams
|
|
|
+ if (!tableNames.equals("all")) {
|
|
|
+ val set = tableNames.split(",").toSet
|
|
|
+ start = start.filter(a => set.contains(a.tableName))
|
|
|
+ }
|
|
|
+
|
|
|
+ val a = start.map(e => (e.tableName, () => {
|
|
|
+ CompanyCommonScoreV1(spark, e.tableName, e.tableView, e.flag, e.time, e.kind, e.project, e.tp, e.namespace).calc()
|
|
|
+ true
|
|
|
+ }))
|
|
|
+
|
|
|
+ AsyncExtract.startAndWait(spark, a)
|
|
|
+
|
|
|
+ //清算信息
|
|
|
+ //CompanyCommonScoreV1(spark, "company_liquidating_info","", "401", "update_time", "经营风险", "清算信息", "0", s"$project").calc()
|
|
|
+ //简易注销
|
|
|
+ //CompanyCommonScoreV1(spark, "company_brief_cancel_announcement","", "402", "update_time", "经营风险", "简易注销", "0", s"$project").calc()
|
|
|
+ //严重违法
|
|
|
+ //CompanyCommonScoreV1(spark, "company_illegal_info","", "403", "update_time", "经营风险", "严重违法行为", "0", s"$project").calc()
|
|
|
+ //经营异常
|
|
|
+ //CompanyCommonScoreV1(spark, "company_abnormal_info","", "409", "update_time", "经营风险", "经营异常", "0", s"$project").calc()
|
|
|
+ //税收违法
|
|
|
+ //CompanyCommonScoreV1(spark, "company_tax_contravention", "", "412", "update_time", "经营风险", "税收违法", "0", s"$project").calc()
|
|
|
+ //欠税公告
|
|
|
+ //CompanyCommonScoreV1(spark, "company_own_tax", "", "413", "update_time", "经营风险", "欠税公告", "0", s"$project").calc()
|
|
|
+ //公示催告
|
|
|
+ //CompanyCommonScoreV1(spark, "company_public_announcement2_list", "", "414", "update_time", "经营风险", "公示催告", "0", s"$project").calc()
|
|
|
+ //司法拍卖
|
|
|
+ //CompanyCommonScoreV1(spark, "company_judicial_sale_combine_list", "", "506", "update_time", "法律风险", "司法拍卖", "0", s"$project").calc()
|
|
|
+ //破产重整
|
|
|
+ //CompanyCommonScoreV1(spark, "bankruptcy_open_case", "", "505", "update_time", "法律风险", "破产重整", "0", s"$project").calc()
|
|
|
+ spark.stop()
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+case class CompanyCommonScoreV1(s: SparkSession, sourceTable: String, tableView: String = "",
|
|
|
+ flag: String, time: String, kind: String, project: String,
|
|
|
+ tp: String = "0", namespace: String = ""
|
|
|
+ ) extends LoggingUtils {
|
|
|
+
|
|
|
+ @(transient@getter) val spark: SparkSession = s
|
|
|
+
|
|
|
+ import spark.implicits._
|
|
|
+
|
|
|
+ def calc(): Unit = {
|
|
|
+ println(s"${this.getClass.getSimpleName} calc start! " + new Date().toString)
|
|
|
+ val adsTable = namespace + ".ads_" + sourceTable
|
|
|
+ val incAdsTable = namespace + ".inc_ads_" + sourceTable
|
|
|
+ val targetTable = namespace + ".ads_" + sourceTable + "_score"
|
|
|
+ var ds = ""
|
|
|
+
|
|
|
+ //区别有无分区表
|
|
|
+ var appsql2 = ""
|
|
|
+ var tb = adsTable
|
|
|
+ if ("1".equals(tp)) {
|
|
|
+ tb = tableView
|
|
|
+ ds = BaseUtil.getPartion(incAdsTable, spark)
|
|
|
+ } else {
|
|
|
+ ds = BaseUtil.getPartion(adsTable, spark)
|
|
|
+ appsql2 = s"AND ds = ${ds}"
|
|
|
+ }
|
|
|
+
|
|
|
+ val df = sql(
|
|
|
+ s"""
|
|
|
+ |SELECT *
|
|
|
+ |FROM (
|
|
|
+ | SELECT
|
|
|
+ | *
|
|
|
+ | ,COUNT(new_cid) OVER(PARTITION BY new_cid ) AS cnt1
|
|
|
+ | ,ROW_NUMBER() OVER(PARTITION BY new_cid ORDER BY $time DESC ) AS num
|
|
|
+ | FROM $tb
|
|
|
+ | WHERE new_cid IS NOT NULL
|
|
|
+ | ${appsql2}
|
|
|
+ | ) a
|
|
|
+ |WHERE num =1
|
|
|
+ |""".stripMargin)
|
|
|
+
|
|
|
+ df.map(r => {
|
|
|
+ trans(r, flag, kind, project)
|
|
|
+ }).toDF("id", "cid", "kind", "kind_code", "project", "project_code", "type",
|
|
|
+ "score", "total", "extraScore")
|
|
|
+ .createOrReplaceTempView(s"t1_view_$sourceTable")
|
|
|
+
|
|
|
+ sql(s"select * from t1_view_$sourceTable").show(20, false)
|
|
|
+
|
|
|
+ sql(s"insert overwrite table $targetTable " +
|
|
|
+ s"partition (ds='${ds}') select * from t1_view_$sourceTable")
|
|
|
+
|
|
|
+ //同步hbase
|
|
|
+ if ("1".equals(tp)) { //存量计算不用同步hbase
|
|
|
+ val dataFrame = sql(
|
|
|
+ s"""
|
|
|
+ |select
|
|
|
+ |CONCAT_WS('_',cid,project_code) AS rowkey,
|
|
|
+ |id,cid,kind,kind_code,project,project_code,type,score,total,extraScore
|
|
|
+ |from t1_view_$sourceTable
|
|
|
+ |""".stripMargin)
|
|
|
+
|
|
|
+ Maxcomputer2Hbase(dataFrame, "COMPANY_SCORE").syn()
|
|
|
+ }
|
|
|
+ println(s"${this.getClass.getSimpleName} calc end! " + new Date().toString)
|
|
|
+ }
|
|
|
+
|
|
|
+ def trans(r: Row, flag: String, kind: String, prpject: String) = {
|
|
|
+ val id = r.getAs[Long]("id")
|
|
|
+ val cid = r.getAs[Long]("new_cid").toString
|
|
|
+ val cnt1 = r.getAs[Long]("cnt1")
|
|
|
+ flag match {
|
|
|
+ case "401" => getLiquidatingScore(id, cid, cnt1, kind, prpject)
|
|
|
+ case "402" => getBriefCancelScore(id, cid, cnt1, kind, prpject)
|
|
|
+ case "403" => getIllegalInfoScore(id, cid, cnt1, kind, prpject)
|
|
|
+ case "409" => getAbnormalInfoScore(id, cid, cnt1, kind, prpject)
|
|
|
+ case "412" => getTaxContraventionScore(id, cid, cnt1, kind, prpject)
|
|
|
+ case "413" => getCompanyOwnTaxScore(id, cid, cnt1, kind, prpject)
|
|
|
+ case "414" => getPublicAnnouncementScore(id, cid, cnt1, kind, prpject)
|
|
|
+ case "506" => getJudicialSaleScore(id, cid, cnt1, kind, prpject)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ //清算信息
|
|
|
+ def getLiquidatingScore(id: Long, cid: String, cnt1: Long, kind: String, project: String) = {
|
|
|
+ var score = 0f
|
|
|
+ val total = 5f
|
|
|
+ val extraScore = 0f
|
|
|
+ var ty = ""
|
|
|
+ if (cnt1 == 0) {
|
|
|
+ score = 5f
|
|
|
+ ty = "无"
|
|
|
+ } else {
|
|
|
+ score = 0f
|
|
|
+ ty = "有"
|
|
|
+ }
|
|
|
+ (id, cid, kind, DimScoreV2.newsEventMap.get(kind), project, DimScoreV2.newsEventMap.get(project), ty,
|
|
|
+ score, total, extraScore)
|
|
|
+ }
|
|
|
+
|
|
|
+ //简易注销
|
|
|
+ def getBriefCancelScore(id: Long, cid: String, cnt1: Long, kind: String, project: String) = {
|
|
|
+ var score = 0f
|
|
|
+ val total = 5f
|
|
|
+ val extraScore = 0f
|
|
|
+ var ty = ""
|
|
|
+ if (cnt1 == 0) {
|
|
|
+ score = 5f
|
|
|
+ ty = "无"
|
|
|
+ } else {
|
|
|
+ score = 0f
|
|
|
+ ty = "有"
|
|
|
+ }
|
|
|
+ (id, cid, kind, DimScoreV2.newsEventMap.get(kind), project, DimScoreV2.newsEventMap.get(project), ty,
|
|
|
+ score, total, extraScore)
|
|
|
+ }
|
|
|
+
|
|
|
+ //严重违法行为
|
|
|
+ def getIllegalInfoScore(id: Long, cid: String, cnt1: Long, kind: String, project: String) = {
|
|
|
+ var score = 0f
|
|
|
+ val total = 10f
|
|
|
+ val extraScore = 0f
|
|
|
+ var ty = ""
|
|
|
+ if (cnt1 == 0) {
|
|
|
+ score = 10f
|
|
|
+ ty = "无"
|
|
|
+ } else {
|
|
|
+ score = 0f
|
|
|
+ ty = "有"
|
|
|
+ }
|
|
|
+ (id, cid, kind, DimScoreV2.newsEventMap.get(kind), project, DimScoreV2.newsEventMap.get(project), ty,
|
|
|
+ score, total, extraScore)
|
|
|
+ }
|
|
|
+
|
|
|
+ //经营异常
|
|
|
+ def getAbnormalInfoScore(id: Long, cid: String, cnt1: Long, kind: String, project: String) = {
|
|
|
+ var score = 0f
|
|
|
+ val total = 5f
|
|
|
+ val extraScore = 0f
|
|
|
+ var ty = ""
|
|
|
+ if (cnt1 == 0) {
|
|
|
+ score = 5f
|
|
|
+ ty = "无"
|
|
|
+ } else {
|
|
|
+ score = 0f
|
|
|
+ ty = "有"
|
|
|
+ }
|
|
|
+ (id, cid, kind, DimScoreV2.newsEventMap.get(kind), project, DimScoreV2.newsEventMap.get(project), ty,
|
|
|
+ score, total, extraScore)
|
|
|
+ }
|
|
|
+
|
|
|
+ //税收违法
|
|
|
+ def getTaxContraventionScore(id: Long, cid: String, cnt1: Long, kind: String, project: String) = {
|
|
|
+ var score = 0f
|
|
|
+ val total = 10f
|
|
|
+ val extraScore = 0f
|
|
|
+ var ty = ""
|
|
|
+ if (cnt1 == 0) {
|
|
|
+ score = 10f
|
|
|
+ ty = "无"
|
|
|
+ } else {
|
|
|
+ score = 0f
|
|
|
+ ty = "有"
|
|
|
+ }
|
|
|
+ (id, cid, kind, DimScoreV2.newsEventMap.get(kind), project, DimScoreV2.newsEventMap.get(project), ty,
|
|
|
+ score, total, extraScore)
|
|
|
+ }
|
|
|
+
|
|
|
+ //欠税公告
|
|
|
+ def getCompanyOwnTaxScore(id: Long, cid: String, cnt1: Long, kind: String, project: String) = {
|
|
|
+ var score = 0f
|
|
|
+ val total = 10f
|
|
|
+ val extraScore = 0f
|
|
|
+ var ty = ""
|
|
|
+ if (cnt1 == 0) {
|
|
|
+ score = 10f
|
|
|
+ ty = "无"
|
|
|
+ } else {
|
|
|
+ score = 0f
|
|
|
+ ty = "有"
|
|
|
+ }
|
|
|
+ (id, cid, kind, DimScoreV2.newsEventMap.get(kind), project, DimScoreV2.newsEventMap.get(project), ty,
|
|
|
+ score, total, extraScore)
|
|
|
+ }
|
|
|
+
|
|
|
+ //公示催告
|
|
|
+ def getPublicAnnouncementScore(id: Long, cid: String, cnt1: Long, kind: String, project: String) = {
|
|
|
+ var score = 0f
|
|
|
+ val total = 10f
|
|
|
+ val extraScore = 0f
|
|
|
+ var ty = ""
|
|
|
+ if (cnt1 == 0) {
|
|
|
+ score = 10f
|
|
|
+ ty = "无"
|
|
|
+ } else {
|
|
|
+ score = 0f
|
|
|
+ ty = "有"
|
|
|
+ }
|
|
|
+ (id, cid, kind, DimScoreV2.newsEventMap.get(kind), project, DimScoreV2.newsEventMap.get(project), ty,
|
|
|
+ score, total, extraScore)
|
|
|
+ }
|
|
|
+
|
|
|
+ //司法拍卖
|
|
|
+ def getJudicialSaleScore(id: Long, cid: String, cnt1: Long, kind: String, project: String) = {
|
|
|
+ var score = 0f
|
|
|
+ val total = 5f
|
|
|
+ val extraScore = 0f
|
|
|
+ var ty = ""
|
|
|
+ if (cnt1 == 0) {
|
|
|
+ score = 5f
|
|
|
+ ty = "无"
|
|
|
+ } else {
|
|
|
+ score = 0f
|
|
|
+ ty = "有"
|
|
|
+ }
|
|
|
+ (id, cid, kind, DimScoreV2.newsEventMap.get(kind), project, DimScoreV2.newsEventMap.get(project), ty,
|
|
|
+ score, total, extraScore)
|
|
|
+ }
|
|
|
+
|
|
|
+}
|