Forráskód Böngészése

年报股权出质

xufei 4 éve
szülő
commit
41b53b2b6e

+ 130 - 0
src/main/scala/com/winhc/bigdata/spark/model/CompanyAnnualReport.scala

@@ -0,0 +1,130 @@
+package com.winhc.bigdata.spark.model
+
+import java.util.Date
+
+import com.winhc.bigdata.calc.DimScoreV2
+import com.winhc.bigdata.spark.utils.BaseUtil.atMonthsBefore
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, Maxcomputer2Hbase, SparkUtils}
+import org.apache.spark.sql.{Row, SparkSession}
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+
+/**
+ * @Description:对外投资得分
+ * @author π
+ * @date 2020/9/316:52
+ */
+object CompanyAnnualReport {
+  def main(args: Array[String]): Unit = {
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "1000"
+    )
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+    CompanyAnnualReport(spark, "company_annual_report",
+      "", "301", "update_time", "资产权益", "对外投资", "0", "winhc_eci_dev").calc()
+    spark.stop()
+  }
+}
+
+
+case class CompanyAnnualReport(s: SparkSession, sourceTable: String, tableView: String = "",
+                               flag: String, time: String, kind: String, project: String,
+                               tp: String = "0", namespace: String = ""
+                              ) extends LoggingUtils {
+
+  @(transient@getter) val spark: SparkSession = s
+
+  import spark.implicits._
+
+  def calc(): Unit = {
+    println(s"${this.getClass.getSimpleName} calc start! " + new Date().toString)
+    val adsTable = namespace + ".ads_" + sourceTable
+    val incAdsTable = namespace + ".inc_ads_" + sourceTable
+    val targetTable = namespace + ".ads_" + sourceTable + "_score"
+    var ds = ""
+
+    //区别有无分区表
+    var appsql2 = ""
+    var tb = adsTable
+    if ("1".equals(tp)) {
+      tb = tableView
+      ds = BaseUtil.getPartion(incAdsTable, spark)
+    } else {
+      ds = BaseUtil.getPartion(adsTable, spark)
+      appsql2 = s"AND  ds = ${ds}"
+    }
+
+    val df = sql(
+      s"""
+         |SELECT  *
+         |FROM    (
+         |        SELECT
+         |                *
+         |                ,COUNT(new_cid) OVER(PARTITION BY new_cid ) AS cnt1
+         |                ,ROW_NUMBER() OVER(PARTITION BY new_cid ORDER BY $time DESC ) AS num
+         |        FROM    $tb
+         |        WHERE   new_cid IS NOT NULL
+         |        ${appsql2}
+         |        ) a
+         |WHERE   num =1
+         |""".stripMargin)
+
+    df.map(r => {
+      trans(r, flag, kind, project)
+    }).toDF("id", "cid", "kind", "kind_code", "project", "project_code", "type",
+      "score", "total", "extraScore")
+      .createOrReplaceTempView(s"t1_view")
+
+    sql("select * from t1_view").show(20, false)
+
+    sql(s"insert overwrite table ${targetTable} " +
+      s"partition (ds='${ds}')  select * from t1_view")
+
+    //同步hbase
+    if ("1".equals(tp)) { //存量计算不用同步hbase
+      val dataFrame = sql(
+        s"""
+           |select
+           |CONCAT_WS('_',cid,project_code) AS rowkey,
+           |id,cid,kind,kind_code,project,project_code,type,score,total,extraScore
+           |from t1_view
+           |""".stripMargin)
+
+      Maxcomputer2Hbase(dataFrame, "COMPANY_SCORE").syn()
+    }
+    println(s"${this.getClass.getSimpleName} calc end! " + new Date().toString)
+  }
+
+  def trans(r: Row, flag: String, kind: String, prpject: String) = {
+    val id = r.getAs[Long]("id")
+    val cid = r.getAs[Long]("new_cid").toString
+    val cnt1 = r.getAs[Long]("cnt1")
+    getScore(id, cid, cnt1, kind, prpject)
+  }
+
+  //对外投资
+  def getScore(id: Long, cid: String, cnt1: Long, kind: String, project: String) = {
+    var score = 0f
+    val total = 15f
+    val extraScore = 0f
+    var ty = ""
+    if (cnt1 == 0) {
+      score = 3f
+      ty = "对外投资企业数量=0"
+    } else if (cnt1 < 5) {
+      score = 7f
+      ty = "对外投资企业数量>0,<5"
+    } else if (cnt1 < 10) {
+      score = 10f
+      ty = "对外投资企业数量≥5,<10"
+    } else {
+      score = 15f
+      ty = "对外投资企业数量≥10"
+    }
+    (id, cid, kind, DimScoreV2.newsEventMap.get(kind), project, DimScoreV2.newsEventMap.get(project), ty,
+      score, total, extraScore)
+  }
+
+}

+ 156 - 0
src/main/scala/com/winhc/bigdata/spark/model/CompanyEquityInfo.scala

@@ -0,0 +1,156 @@
+package com.winhc.bigdata.spark.model
+
+import java.util.Date
+
+import com.winhc.bigdata.calc.DimScoreV2
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, Maxcomputer2Hbase, SparkUtils}
+import org.apache.spark.sql.{Row, SparkSession}
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+
+/**
+ * @Description:股权出质
+ * @author π
+ * @date 2020/9/316:52
+ */
+object CompanyEquityInfo {
+  def main(args: Array[String]): Unit = {
+    val project = "winhc_eci_dev"
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> s"$project",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "1000"
+    )
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+    CompanyEquityInfo(spark, "company_equity_info_list",
+      "", "301", "main_id", "资产权益", "股权出质(质权人)", "0", s"$project").calc()
+    CompanyEquityInfo(spark, "company_equity_info_list",
+      "", "408", "main_id", "经营风险", "股权出质(出质人)", "0", s"$project").calc()
+    spark.stop()
+  }
+}
+
+case class CompanyEquityInfo(s: SparkSession, sourceTable: String, tableView: String = "",
+                             flag: String, time: String, kind: String, project: String,
+                             tp: String = "0", namespace: String = ""
+                            ) extends LoggingUtils {
+
+  @(transient@getter) val spark: SparkSession = s
+
+  import spark.implicits._
+
+  def calc(): Unit = {
+    println(s"${this.getClass.getSimpleName} calc start! " + new Date().toString)
+    val adsTable = namespace + ".ads_" + sourceTable
+    val incAdsTable = namespace + ".inc_ads_" + sourceTable
+    val targetTable = namespace + ".ads_" + sourceTable + "_score"
+    var ds = ""
+
+    //区别有无分区表
+    var appsql2 = ""
+    var tb = adsTable
+    if ("1".equals(tp)) {
+      tb = tableView
+      ds = BaseUtil.getPartion(incAdsTable, spark)
+    } else {
+      ds = BaseUtil.getPartion(adsTable, spark)
+      appsql2 = s"AND  ds = ${ds}"
+    }
+
+    var appsql1 = ""
+    if(flag.equals("301")){
+      appsql1 = s"AND  type = 2"
+    }else if(flag.equals("408")){
+      appsql1 = s"AND  type = 1"
+    }
+
+    val df = sql(
+      s"""
+         |SELECT  *
+         |FROM    (
+         |        SELECT
+         |                *
+         |                ,COUNT(cid) OVER(PARTITION BY cid ) AS cnt1
+         |                ,ROW_NUMBER() OVER(PARTITION BY cid ORDER BY $time DESC ) AS num
+         |        FROM    $tb
+         |        WHERE   cid IS NOT NULL
+         |        ${appsql2} $appsql1
+         |        ) a
+         |WHERE   num =1
+         |""".stripMargin)
+
+    df.map(r => {
+      trans(r, flag, kind, project)
+    }).toDF("id", "cid", "kind", "kind_code", "project", "project_code", "type",
+      "score", "total", "extraScore")
+      .createOrReplaceTempView(s"t1_view")
+
+    sql("select * from t1_view").show(20, false)
+
+    sql(s"insert ${if (flag.equals("408")) "INTO" else "OVERWRITE"} table ${targetTable} " +
+      s"partition (ds='${ds}')  select * from t1_view")
+
+    //同步hbase
+    if ("1".equals(tp)) { //存量计算不用同步hbase
+      val dataFrame = sql(
+        s"""
+           |select
+           |CONCAT_WS('_',cid,project_code) AS rowkey,
+           |id,cid,kind,kind_code,project,project_code,type,score,total,extraScore
+           |from t1_view
+           |""".stripMargin)
+
+      Maxcomputer2Hbase(dataFrame, "COMPANY_SCORE").syn()
+    }
+    println(s"${this.getClass.getSimpleName} calc end! " + new Date().toString)
+  }
+
+  def trans(r: Row, flag: String, kind: String, prpject: String) = {
+    val id = -1
+    val cid = r.getAs[Long]("cid").toString
+    val cnt1 = r.getAs[Long]("cnt1")
+    flag match {
+      case "301" => getScore1(id, cid, cnt1, kind, prpject)
+      case "408" => getScore2(id, cid, cnt1, kind, prpject)
+    }
+  }
+
+  //股权出质(质权人)
+  def getScore1(id: Long, cid: String, cnt1: Long, kind: String, project: String) = {
+    var score = 0f
+    val total = 5f
+    val extraScore = 0f
+    var ty = ""
+    if (cnt1 == 0) {
+      score = 2f
+      ty = "无"
+    } else if (cnt1 < 3) {
+      score = 4f
+      ty = "质权数量<3"
+    } else {
+      score = 5f
+      ty = "质权数量≥3"
+    }
+    (id, cid, kind, DimScoreV2.newsEventMap.get(kind), project, DimScoreV2.newsEventMap.get(project), ty,
+      score, total, extraScore)
+  }
+
+  //股权出质(出质人)
+  def getScore2(id: Long, cid: String, cnt1: Long, kind: String, project: String) = {
+    var score = 0f
+    val total = 5f
+    val extraScore = 0f
+    var ty = ""
+    if (cnt1 == 0) {
+      score = 5f
+      ty = "无"
+    }else {
+      score = 1f
+      ty = "有"
+    }
+    (id, cid, kind, DimScoreV2.newsEventMap.get(kind), project, DimScoreV2.newsEventMap.get(project), ty,
+      score, total, extraScore)
+  }
+
+}