فهرست منبع

Merge remote-tracking branch 'origin/master'

许家凯 4 سال پیش
والد
کامیت
81c9d3dbf9

+ 106 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/GraphX4Judicase.scala

@@ -0,0 +1,106 @@
+package com.winhc.bigdata.spark.jobs
+
+import com.winhc.bigdata.spark.udf.BaseFunc
+import com.winhc.bigdata.spark.utils.BaseUtil.{BKDRHash, isWindows}
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
+import org.apache.spark.graphx._
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import org.apache.spark.sql.{Row, SparkSession}
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+import org.apache.spark.ui.graphx.GraphXTab._
+
+case class GraphX4Judicase(s: SparkSession,
+                           project: String, //表所在工程名
+                           tableName: String, //表名(不加前后辍)
+                           fromCol: String, //边的起点列名
+                           toCol: String //边的终点列名
+                    ) extends LoggingUtils with Logging with BaseFunc {
+  @(transient@getter) val spark: SparkSession = s
+  justicase_ops()
+
+  def calc(): Unit = {
+//    val allCols = getColumns(s"$project.ods_$tableName").filter(_ != "ds").toSeq
+//    val ods_ds = BaseUtil.getPartion(s"$project.ods_$tableName", spark)
+//    val inc_ods_ds = BaseUtil.getPartion(s"$project.inc_ods_$tableName", spark)
+    val srcAllCols = getColumns(s"$project.xjk_ads_judicial_case_relation1").filter(_ != "ds").toSeq
+    val desAllCols = getColumns(s"$project.ods_justicase").filter(_ != "ds").toSeq
+    val dfRelations = sql(
+      s"""
+         |SELECT  *
+         |FROM    $project.$tableName
+         |WHERE   ${toCol} IS NOT NULL AND ${fromCol} IS NOT NULL
+         |""".stripMargin)
+    val edgeRDD: RDD[Edge[Long]] = dfRelations .select(srcAllCols.map(column => col(column).cast("string")): _*) .rdd.map(r => {
+      val case_no_from = r.getAs[String](fromCol)
+      val case_no_to = r.getAs[String](toCol)
+      val from = case_no_from.toLong
+      val to = case_no_to.toLong
+      Edge(from, to)
+    })
+    // 根据边构造图
+    val graph = Graph.fromEdges(edgeRDD, defaultValue = 0)
+
+    // 将同一连通分量中各个边聚合,经过处理形成打平的(case_no->司法案件id)并与原表join补全信息
+    val tripleRDD = graph.connectedComponents().vertices
+      .map(tp => (tp._2, tp._1)) //尝试N次明确必须这样交换,否则得到的不是极大连通子图
+      .map(r => (r._1, Set(r._2)))
+      .reduceByKey(_ ++ _)
+      .flatMap(r => {
+        val judicase_id = BKDRHash(r._2.toSeq.sorted.mkString(","))
+        var mp: Map[Long, Map[String, String]] = Map()
+        r._2.map(r => {
+          mp = mp ++ Map(r -> Map("judicase_id" -> judicase_id.toString))
+        })
+        mp
+      })
+      .map(r => {
+      Row(r._1.toString, r._2("judicase_id"), "1")
+    })
+    val schemaJust = StructType(Array(
+      StructField("id", StringType),
+      StructField("judicase_id", StringType),
+      StructField("flag", StringType)
+    ))
+    //仅包含这3个字段的表在后面融入全量时再实例其他属性
+    val dfEdgelets = spark.createDataFrame(tripleRDD, schemaJust).createOrReplaceTempView(s"tmp_edgelets_$tableName")
+    //将图结果融入全量数据中,case_no对应的司法案件号以图为准
+    sql(
+/*      s"""INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE ${project}.${tableName}_graphx PARTITION(ds='20200903')
+         |SELECT IF(B.judicase_id IS NOT NULL,B.judicase_id,A.case_id) AS judicase_id
+         |,IF(B.judicase_id IS NOT NULL,B.flag,A.flag) AS flag
+         |,${desAllCols.mkString(",")}
+         |FROM(
+         |  SELECT  '0' AS flag, *
+         |  FROM    $project.ods_justicase
+         |  WHERE   ds='20200830'
+         |) A
+         |LEFT JOIN
+         |(
+         |  SELECT id, judicase_id, flag FROM tmp_edgelets_$tableName
+         |) B
+         |ON A.case_id=B.id
+         |""".stripMargin)//.createOrReplaceTempView(s"tmp_graphx_$tableName")*/
+      s"""INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE ${project}.xjk_ads_judicial_case_relation1_tmp
+         |SELECT id, judicase_id, flag
+         |FROM tmp_edgelets_$tableName
+         |""".stripMargin)//.createOrReplaceTempView(s"tmp_graphx_$tableName")
+  }
+}
+
+object GraphX4Judicase {
+  def main(args: Array[String]): Unit = {
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "2000"
+    )
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+    spark.sparkContext.setLogLevel("All")
+    attachSparkUITab(spark.sparkContext)
+    GraphX4Judicase(spark, "winhc_eci_dev", "xjk_ads_judicial_case_relation1", "id_2", "id_1").calc()
+  }
+}

+ 130 - 0
src/main/scala/com/winhc/bigdata/spark/model/CompanyAnnualReport.scala

@@ -0,0 +1,130 @@
+package com.winhc.bigdata.spark.model
+
+import java.util.Date
+
+import com.winhc.bigdata.calc.DimScoreV2
+import com.winhc.bigdata.spark.utils.BaseUtil.atMonthsBefore
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, Maxcomputer2Hbase, SparkUtils}
+import org.apache.spark.sql.{Row, SparkSession}
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+
+/**
+ * @Description:对外投资得分
+ * @author π
+ * @date 2020/9/316:52
+ */
+object CompanyAnnualReport {
+  def main(args: Array[String]): Unit = {
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "1000"
+    )
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+    CompanyAnnualReport(spark, "company_annual_report",
+      "", "301", "update_time", "资产权益", "对外投资", "0", "winhc_eci_dev").calc()
+    spark.stop()
+  }
+}
+
+
+case class CompanyAnnualReport(s: SparkSession, sourceTable: String, tableView: String = "",
+                               flag: String, time: String, kind: String, project: String,
+                               tp: String = "0", namespace: String = ""
+                              ) extends LoggingUtils {
+
+  @(transient@getter) val spark: SparkSession = s
+
+  import spark.implicits._
+
+  def calc(): Unit = {
+    println(s"${this.getClass.getSimpleName} calc start! " + new Date().toString)
+    val adsTable = namespace + ".ads_" + sourceTable
+    val incAdsTable = namespace + ".inc_ads_" + sourceTable
+    val targetTable = namespace + ".ads_" + sourceTable + "_score"
+    var ds = ""
+
+    //区别有无分区表
+    var appsql2 = ""
+    var tb = adsTable
+    if ("1".equals(tp)) {
+      tb = tableView
+      ds = BaseUtil.getPartion(incAdsTable, spark)
+    } else {
+      ds = BaseUtil.getPartion(adsTable, spark)
+      appsql2 = s"AND  ds = ${ds}"
+    }
+
+    val df = sql(
+      s"""
+         |SELECT  *
+         |FROM    (
+         |        SELECT
+         |                *
+         |                ,COUNT(new_cid) OVER(PARTITION BY new_cid ) AS cnt1
+         |                ,ROW_NUMBER() OVER(PARTITION BY new_cid ORDER BY $time DESC ) AS num
+         |        FROM    $tb
+         |        WHERE   new_cid IS NOT NULL
+         |        ${appsql2}
+         |        ) a
+         |WHERE   num =1
+         |""".stripMargin)
+
+    df.map(r => {
+      trans(r, flag, kind, project)
+    }).toDF("id", "cid", "kind", "kind_code", "project", "project_code", "type",
+      "score", "total", "extraScore")
+      .createOrReplaceTempView(s"t1_view")
+
+    sql("select * from t1_view").show(20, false)
+
+    sql(s"insert overwrite table ${targetTable} " +
+      s"partition (ds='${ds}')  select * from t1_view")
+
+    //同步hbase
+    if ("1".equals(tp)) { //存量计算不用同步hbase
+      val dataFrame = sql(
+        s"""
+           |select
+           |CONCAT_WS('_',cid,project_code) AS rowkey,
+           |id,cid,kind,kind_code,project,project_code,type,score,total,extraScore
+           |from t1_view
+           |""".stripMargin)
+
+      Maxcomputer2Hbase(dataFrame, "COMPANY_SCORE").syn()
+    }
+    println(s"${this.getClass.getSimpleName} calc end! " + new Date().toString)
+  }
+
+  def trans(r: Row, flag: String, kind: String, prpject: String) = {
+    val id = r.getAs[Long]("id")
+    val cid = r.getAs[Long]("new_cid").toString
+    val cnt1 = r.getAs[Long]("cnt1")
+    getScore(id, cid, cnt1, kind, prpject)
+  }
+
+  //对外投资
+  def getScore(id: Long, cid: String, cnt1: Long, kind: String, project: String) = {
+    var score = 0f
+    val total = 15f
+    val extraScore = 0f
+    var ty = ""
+    if (cnt1 == 0) {
+      score = 3f
+      ty = "对外投资企业数量=0"
+    } else if (cnt1 < 5) {
+      score = 7f
+      ty = "对外投资企业数量>0,<5"
+    } else if (cnt1 < 10) {
+      score = 10f
+      ty = "对外投资企业数量≥5,<10"
+    } else {
+      score = 15f
+      ty = "对外投资企业数量≥10"
+    }
+    (id, cid, kind, DimScoreV2.newsEventMap.get(kind), project, DimScoreV2.newsEventMap.get(project), ty,
+      score, total, extraScore)
+  }
+
+}

+ 156 - 0
src/main/scala/com/winhc/bigdata/spark/model/CompanyEquityInfo.scala

@@ -0,0 +1,156 @@
+package com.winhc.bigdata.spark.model
+
+import java.util.Date
+
+import com.winhc.bigdata.calc.DimScoreV2
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, Maxcomputer2Hbase, SparkUtils}
+import org.apache.spark.sql.{Row, SparkSession}
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+
+/**
+ * @Description:股权出质
+ * @author π
+ * @date 2020/9/316:52
+ */
+object CompanyEquityInfo {
+  def main(args: Array[String]): Unit = {
+    val project = "winhc_eci_dev"
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> s"$project",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "1000"
+    )
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+    CompanyEquityInfo(spark, "company_equity_info_list",
+      "", "301", "main_id", "资产权益", "股权出质(质权人)", "0", s"$project").calc()
+    CompanyEquityInfo(spark, "company_equity_info_list",
+      "", "408", "main_id", "经营风险", "股权出质(出质人)", "0", s"$project").calc()
+    spark.stop()
+  }
+}
+
+case class CompanyEquityInfo(s: SparkSession, sourceTable: String, tableView: String = "",
+                             flag: String, time: String, kind: String, project: String,
+                             tp: String = "0", namespace: String = ""
+                            ) extends LoggingUtils {
+
+  @(transient@getter) val spark: SparkSession = s
+
+  import spark.implicits._
+
+  def calc(): Unit = {
+    println(s"${this.getClass.getSimpleName} calc start! " + new Date().toString)
+    val adsTable = namespace + ".ads_" + sourceTable
+    val incAdsTable = namespace + ".inc_ads_" + sourceTable
+    val targetTable = namespace + ".ads_" + sourceTable + "_score"
+    var ds = ""
+
+    //区别有无分区表
+    var appsql2 = ""
+    var tb = adsTable
+    if ("1".equals(tp)) {
+      tb = tableView
+      ds = BaseUtil.getPartion(incAdsTable, spark)
+    } else {
+      ds = BaseUtil.getPartion(adsTable, spark)
+      appsql2 = s"AND  ds = ${ds}"
+    }
+
+    var appsql1 = ""
+    if(flag.equals("301")){
+      appsql1 = s"AND  type = 2"
+    }else if(flag.equals("408")){
+      appsql1 = s"AND  type = 1"
+    }
+
+    val df = sql(
+      s"""
+         |SELECT  *
+         |FROM    (
+         |        SELECT
+         |                *
+         |                ,COUNT(cid) OVER(PARTITION BY cid ) AS cnt1
+         |                ,ROW_NUMBER() OVER(PARTITION BY cid ORDER BY $time DESC ) AS num
+         |        FROM    $tb
+         |        WHERE   cid IS NOT NULL
+         |        ${appsql2} $appsql1
+         |        ) a
+         |WHERE   num =1
+         |""".stripMargin)
+
+    df.map(r => {
+      trans(r, flag, kind, project)
+    }).toDF("id", "cid", "kind", "kind_code", "project", "project_code", "type",
+      "score", "total", "extraScore")
+      .createOrReplaceTempView(s"t1_view")
+
+    sql("select * from t1_view").show(20, false)
+
+    sql(s"insert ${if (flag.equals("408")) "INTO" else "OVERWRITE"} table ${targetTable} " +
+      s"partition (ds='${ds}')  select * from t1_view")
+
+    //同步hbase
+    if ("1".equals(tp)) { //存量计算不用同步hbase
+      val dataFrame = sql(
+        s"""
+           |select
+           |CONCAT_WS('_',cid,project_code) AS rowkey,
+           |id,cid,kind,kind_code,project,project_code,type,score,total,extraScore
+           |from t1_view
+           |""".stripMargin)
+
+      Maxcomputer2Hbase(dataFrame, "COMPANY_SCORE").syn()
+    }
+    println(s"${this.getClass.getSimpleName} calc end! " + new Date().toString)
+  }
+
+  def trans(r: Row, flag: String, kind: String, prpject: String) = {
+    val id = -1
+    val cid = r.getAs[Long]("cid").toString
+    val cnt1 = r.getAs[Long]("cnt1")
+    flag match {
+      case "301" => getScore1(id, cid, cnt1, kind, prpject)
+      case "408" => getScore2(id, cid, cnt1, kind, prpject)
+    }
+  }
+
+  //股权出质(质权人)
+  def getScore1(id: Long, cid: String, cnt1: Long, kind: String, project: String) = {
+    var score = 0f
+    val total = 5f
+    val extraScore = 0f
+    var ty = ""
+    if (cnt1 == 0) {
+      score = 2f
+      ty = "无"
+    } else if (cnt1 < 3) {
+      score = 4f
+      ty = "质权数量<3"
+    } else {
+      score = 5f
+      ty = "质权数量≥3"
+    }
+    (id, cid, kind, DimScoreV2.newsEventMap.get(kind), project, DimScoreV2.newsEventMap.get(project), ty,
+      score, total, extraScore)
+  }
+
+  //股权出质(出质人)
+  def getScore2(id: Long, cid: String, cnt1: Long, kind: String, project: String) = {
+    var score = 0f
+    val total = 5f
+    val extraScore = 0f
+    var ty = ""
+    if (cnt1 == 0) {
+      score = 5f
+      ty = "无"
+    }else {
+      score = 1f
+      ty = "有"
+    }
+    (id, cid, kind, DimScoreV2.newsEventMap.get(kind), project, DimScoreV2.newsEventMap.get(project), ty,
+      score, total, extraScore)
+  }
+
+}