Browse Source

Merge remote-tracking branch 'origin/master'

许家凯 4 years ago
parent
commit
8e698cd12c

+ 3 - 3
src/main/scala/com/winhc/bigdata/spark/model/CompanyAnnualReport.scala

@@ -15,13 +15,13 @@ import scala.collection.mutable
  */
 object CompanyAnnualReport {
   def main(args: Array[String]): Unit = {
+    val Array(project) = args
     val config = mutable.Map(
-      "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
+      "spark.hadoop.odps.project.name" -> s"$project",
       "spark.hadoop.odps.spark.local.partition.amt" -> "1000"
     )
     val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
-    CompanyAnnualReport(spark, "company_annual_report",
-      "", "206", "update_time", "经营情况", "年报信息", "0", "winhc_eci_dev").calc()
+    CompanyAnnualReport(spark, "company_annual_report", "", "206", "update_time", "经营情况", "年报信息", "0", s"$project").calc()
     spark.stop()
   }
 }

+ 38 - 72
src/main/scala/com/winhc/bigdata/spark/model/CompanyBidScore.scala

@@ -27,50 +27,17 @@ import scala.collection.mutable
 
 object CompanyBidScore {
 
-  val tabMapping: Map[String, (String, String, String, String)] =
-    Map("company_bid_list" -> ("306", "publish_time", "资产权益", "招投标") //招投标
-    )
-
   def main(args: Array[String]): Unit = {
-
-    val (namespace, sourceTable, flag, time, kind, project) = valid(args)
-
-    var config = mutable.Map(
-      "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
-      "spark.hadoop.odps.spark.local.partition.amt" -> "100"
+    val Array(project) = args
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> s"$project",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "1000"
     )
-
     val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
-
-    println(s"company ${this.getClass.getSimpleName} calc start! " + new Date().toString)
-        spark.sql(
-          """
-            |select "24416401" as new_cid,1111L as id,'2020-07-18' as publish_time
-            |""".stripMargin).createOrReplaceTempView("inc_view")
-
-    new CompanyBidScore(spark, sourceTable, "inc_view", flag, time, kind, project, "1", namespace).calc()
-//    new CompanyBidScore(spark, sourceTable, "", flag, time, kind, project, "0", namespace).calc()
-
-
-    println(s"company ${this.getClass.getSimpleName} calc end! " + new Date().toString)
+    CompanyBidScore(spark, "company_bid_list", "", "306", "update_time", "资产权益", "招投标", "0", s"$project").calc()
     spark.stop()
   }
 
-  def valid(args: Array[String]) = {
-    println(args.mkString(", "))
-    if (args.length != 2) {
-      println("please enter namespace, table!!!! ")
-      sys.exit(-1)
-    }
-    val Array(namespace, sourceTable) = args
-
-    val (flag, time, kind, project) = tabMapping.getOrElse(sourceTable, ("", "", "", ""))
-    if (flag.isEmpty || time.isEmpty || kind.isEmpty || project.isEmpty) {
-      println("table not found!!!   ")
-      sys.exit(-1)
-    }
-    (namespace, sourceTable, flag, time, kind, project)
-  }
 }
 
 case class CompanyBidScore(s: SparkSession, sourceTable: String, tableView: String,
@@ -83,14 +50,12 @@ case class CompanyBidScore(s: SparkSession, sourceTable: String, tableView: Stri
 
   def calc() = {
 
-    val ads_company = s"$namespace.ads_company"
+    val ads_company = s"$namespace.ads_company" //公司映射关系
+    val inc_ads_company = s"$namespace.inc_ads_company" //公司映射关系
     val company_category = s"$namespace.const_company_category_code"
     val ads_company_tb = s"$namespace.ads_$sourceTable"
     val inc_ads_company_tb = s"$namespace.inc_ads_$sourceTable"
 
-    val adsCompanyPar = BaseUtil.getPartion(ads_company, spark)
-//    val adsPar = BaseUtil.getPartion(ads_company_tb, spark)
-
     var ds = ""
     var appsql2 = ""
     var tb = ads_company_tb
@@ -102,7 +67,7 @@ case class CompanyBidScore(s: SparkSession, sourceTable: String, tableView: Stri
       appsql2 = s"AND  ds = ${ds}"
     }
 
-    val df = sql(
+    sql(
       s"""
          |SELECT  *
          |FROM    (
@@ -115,38 +80,39 @@ case class CompanyBidScore(s: SparkSession, sourceTable: String, tableView: Stri
          |        ${appsql2}
          |        ) a
          |WHERE   num =1
-         |""".stripMargin)
-
-    df.createOrReplaceTempView("t2")
-
-    if (tp.equals("0")) {
-      sql(
-        s"""
-           |select a.category_code,cast(a.cid as string) as new_cid,
-           |       b.category_str_big AS industry_name
-           |from $ads_company  a
-           |left join $company_category b on a.category_code = b.category_code
-           |where a.cid is not null and a.ds=${adsCompanyPar}
-           |""".stripMargin).createOrReplaceTempView("t1")
-
-      sql(
-        """
-          |select t2.*,t1.industry_name,category_code from t2 left join t1 on t2.new_cid = t1.new_cid
-          |""".stripMargin).map(r => {
-        trans(r, flag, kind, project)
-      }).toDF("id", "cid", "kind", "kind_code", "project", "project_code", "type",
-        "score", "total", "extraScore")
-        .createOrReplaceTempView(s"tmp_view")
+         |""".stripMargin).createOrReplaceTempView("t2")
 
-    } else {
-      df.mapPartitions(iter => {
-        trans2(iter, flag, kind, project)
-      }).toDF("id", "cid", "kind", "kind_code", "project", "project_code", "type",
-        "score", "total", "extraScore")
-        .createOrReplaceTempView(s"tmp_view")
-    }
+    sql(
+      s"""
+         |SELECT  b.new_cid,c.category_str_big AS industry_name
+         |FROM    (
+         |            SELECT  new_cid,category_code
+         |            FROM    (
+         |                        SELECT  cid as new_cid,category_code
+         |                        FROM    $inc_ads_company
+         |                        WHERE   ds > '0' AND     cid IS NOT NULL
+         |                        UNION ALL
+         |                        SELECT  cid as new_cid,category_code
+         |                        FROM    $ads_company
+         |                        WHERE   ds > '0' AND     cid IS NOT NULL
+         |                    ) a
+         |            GROUP BY new_cid,category_code
+         |        ) b
+         |LEFT JOIN $company_category c
+         |ON      b.category_code = c.category_code
+         |""".stripMargin).createOrReplaceTempView("t1")
 
     sql(
+      """
+        |select t2.*,t1.industry_name from t2 left join t1 on t2.new_cid = t1.new_cid
+        |""".stripMargin).map(r => {
+      trans(r, flag, kind, project)
+    }).toDF("id", "cid", "kind", "kind_code", "project", "project_code", "type",
+      "score", "total", "extraScore")
+      .createOrReplaceTempView(s"tmp_view")
+
+    sql("select * from tmp_view").show(20)
+    sql(
       s"""
          |insert ${if (isWindows) "INTO" else "OVERWRITE"} table ${ads_company_tb}_score partition(ds=$ds)
          |select id,cid,kind,kind_code,project,project_code,type,score,total,extraScore

+ 96 - 45
src/main/scala/com/winhc/bigdata/spark/model/CompanyCourtAnnouncement.scala

@@ -3,7 +3,9 @@ package com.winhc.bigdata.spark.model
 import java.util.Date
 
 import com.winhc.bigdata.calc.DimScoreV2
-import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
+import com.winhc.bigdata.spark.udf.CompanyMapping
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, Maxcomputer2Hbase, SparkUtils}
+import org.apache.commons.lang3.StringUtils
 import org.apache.spark.sql.{Row, SparkSession}
 
 import scala.annotation.meta.getter
@@ -18,67 +20,107 @@ object CompanyCourtAnnouncement {
   val tabMapping: Map[String, (String, String, String, String)] =
     Map("ads_company_court_announcement_list" -> ("1", "publish_date", "法律风险", "法院公告"), //法院公告
       "ads_company_court_open_announcement_list" -> ("2", "start_date", "法律风险", "开庭公告"), //开庭公告
-      "ads_company_court_register_list" -> ("3", "filing_date", "法律风险", "立案信息"), //立案信息
-      "ads_company_lawsuit_list" -> ("4", "judge_time", "法律风险", "裁判文书") //裁判文书
+      "ads_company_court_register_list" -> ("3", "filing_date", "法律风险", "立案信息") //立案信息
+      //,"ads_company_lawsuit_list" -> ("4", "judge_time", "法律风险", "裁判文书") //裁判文书//todo
     )
 
   def main(args: Array[String]): Unit = {
 
-    val (sourceTable, flag, time, kind, project) = valid(args)
-
-    var config = mutable.Map.empty[String, String]
+    val namespace = "winhc_eci_dev"
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> s"$namespace",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "1000"
+    )
 
     val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+    //法院公告
+    CompanyCourtAnnouncement(spark, "company_court_announcement_list", "", "509", "update_time", "法律风险", "法院公告", "0", s"$namespace").calc()
+    //开庭公告
+    CompanyCourtAnnouncement(spark, "company_court_open_announcement_list", "", "507", "update_time", "法律风险", "开庭公告", "0", s"$namespace").calc()
+    //立案信息
+    CompanyCourtAnnouncement(spark, "company_court_register_list", "", "510", "update_time", "法律风险", "立案信息", "0", s"$namespace").calc()
 
-    new CompanyCourtAnnouncement(spark, sourceTable, flag, time, kind, project).calc()
     spark.stop()
 
   }
 
-  def valid(args: Array[String]) = {
-    if (args.length != 1) {
-      println("请输入要计算的table!!!! ")
-      sys.exit(-1)
-    }
-    val sourceTable = args(0)
-
-    val (flag, time, kind, project) = tabMapping.getOrElse(sourceTable, ("", "", "", ""))
-    if (flag.isEmpty || time.isEmpty || kind.isEmpty || project.isEmpty) {
-      println("输入表不存在!!!   ")
-      sys.exit(-1)
-    }
-    (sourceTable, flag, time, kind, project)
-  }
 }
 
-case class CompanyCourtAnnouncement(s: SparkSession, sourceTable: String,
-                                    flag: String, time: String, kind: String, project: String
-                                   ) extends LoggingUtils {
+case class CompanyCourtAnnouncement(s: SparkSession, sourceTable: String, tableView: String = "",
+                                    flag: String, time: String, kind: String, project: String,
+                                    tp: String, namespace: String
+                                   ) extends LoggingUtils with CompanyMapping {
 
   @(transient@getter) val spark: SparkSession = s
 
   import spark.implicits._
 
+  val nameMapping: Map[String, (String, String)] =
+    Map("company_court_announcement_list" -> ("plaintiff", "litigant"), //法院公告
+      "company_court_open_announcement_list" -> ("plaintiff", "defendant"), //开庭公告
+      "company_court_register_list" -> ("plaintiff", "defendant") //立案信息
+    )
+
   def calc(): Unit = {
     println(s"company ${this.getClass.getSimpleName} calc start! " + new Date().toString)
 
+    prepareFunctions(spark)
+
+    val adsTable = namespace + ".ads_" + sourceTable
+    val incAdsTable = namespace + ".inc_ads_" + sourceTable
+    val targetTable = namespace + ".ads_" + sourceTable + "_score"
+    var ds = ""
+
+    //最近三个月内
     var sqlapp = ""
-    if (!"4".equals(flag)) {
-      sqlapp = s"and $time >= '${BaseUtil.atMonthsBefore(3)}'"
+    sqlapp = s"and $time >= '${BaseUtil.atMonthsBefore(3)}'"
+
+    val company_mapping = s"$namespace.base_company_mapping"
+    val mapping_ds = BaseUtil.getPartion(company_mapping, spark)
+
+    //区别有无分区表
+    var appsql2 = ""
+    var tb = adsTable
+    if ("1".equals(tp)) {
+      tb = tableView
+      ds = BaseUtil.getPartion(incAdsTable, spark)
+    } else {
+      ds = BaseUtil.getPartion(adsTable, spark)
+      appsql2 = s"AND  ds = ${ds}"
+    }
+
+    //获取原被告字段
+    val (yg_name, bg_name) = nameMapping.getOrElse(sourceTable, ("", ""))
+    if (StringUtils.isBlank(yg_name) || StringUtils.isBlank(yg_name)) {
+      println("no table mapping ....")
+      sys.exit(-1)
     }
 
     val df = sql(
       s"""
          |SELECT  *
          |FROM    (
-         |            SELECT  *
-         |                    ,sum(CASE  WHEN party_role = 'y' THEN 1 ELSE 0 END) OVER(PARTITION BY new_cid) AS cnt1
-         |                    ,sum(CASE  WHEN party_role = 'n' THEN 1 ELSE 0 END) OVER(PARTITION BY new_cid) AS cnt2
+         |            SELECT  sum(CASE WHEN role = 'y' THEN 1 ELSE 0 END) OVER(PARTITION BY new_cid) AS cnt1
+         |                    ,sum(CASE WHEN role = 'b' THEN 1 ELSE 0 END) OVER(PARTITION BY new_cid) AS cnt2
          |                    ,row_number() OVER(PARTITION BY new_cid ORDER BY $time DESC) AS num
-         |            FROM    $sourceTable
-         |            WHERE   ds = '${BaseUtil.getPartion(sourceTable, spark)}' and new_cid is not null
-         |                    ${sqlapp}
-         |        ) a
+         |                    ,*
+         |            FROM    (
+         |                        SELECT  b.cname
+         |                                ,name_judge(coalesce(cleanup(b.cname),''),cleanup($yg_name),cleanup($bg_name)) role
+         |                                ,a.*
+         |                        FROM    (
+         |                                    SELECT  *
+         |                                    FROM    $tb
+         |                                    WHERE   new_cid IS NOT NULL ${appsql2} ${sqlapp}
+         |                                ) a
+         |                        LEFT JOIN (
+         |                                      SELECT  *
+         |                                      FROM    $company_mapping
+         |                                      WHERE   ds = '$mapping_ds'
+         |                                  ) b
+         |                        ON      a.cid = b.cid
+         |                    ) c
+         |        ) d
          |WHERE   num = 1
          |""".stripMargin)
 
@@ -86,16 +128,25 @@ case class CompanyCourtAnnouncement(s: SparkSession, sourceTable: String,
       trans(r, flag, kind, project)
     }).toDF("id", "cid", "kind", "kind_code", "project", "project_code", "type",
       "score", "total", "extraScore")
-      .createOrReplaceTempView(s"${sourceTable}_tmp_view")
+      .createOrReplaceTempView(s"t1_view_${sourceTable}")
 
-    logInfo(
-      s"""
-         |- - - - - - - - - - - - - - - - - - - - - - - - -
-         |${showString(sql(s"select * from ${sourceTable}_tmp_view"))}
-         |- - - - - - - - - - - - - - - - - - - - - - - - -
-       """.stripMargin)
+    sql(s"select * from t1_view_${sourceTable}").show(20, false)
+
+    sql(s"insert overwrite table ${targetTable} " +
+      s"partition (ds='${ds}')  select * from t1_view_${sourceTable}")
 
-    sql(s"insert overwrite table ${sourceTable}_score  select * from ${sourceTable}_tmp_view")
+    //同步hbase
+    if ("1".equals(tp)) { //存量计算不用同步hbase
+      val dataFrame = sql(
+        s"""
+           |select
+           |CONCAT_WS('_',cid,project_code) AS rowkey,
+           |id,cid,kind,kind_code,project,project_code,type,score,total,extraScore
+           |from t1_view_${sourceTable}
+           |""".stripMargin)
+
+      Maxcomputer2Hbase(dataFrame, "COMPANY_SCORE").syn()
+    }
 
     println(s"company ${this.getClass.getSimpleName} calc end! " + new Date().toString)
   }
@@ -107,10 +158,10 @@ case class CompanyCourtAnnouncement(s: SparkSession, sourceTable: String,
     val cnt1 = r.getAs[Long]("cnt1")
     val cnt2 = r.getAs[Long]("cnt2")
     flag match {
-      case "1" => getInfoAnnouncement(id, cid, cnt1, cnt2, kind, prpject)
-      case "2" => getInfoOpenAnnouncement(id, cid, cnt1, cnt2, kind, prpject)
-      case "3" => getInforegister(id, cid, cnt1, cnt2, kind, prpject)
-      case "4" => getRefereeScore(id, cid, cnt1, cnt2, kind, prpject)
+      case "509" => getInfoAnnouncement(id, cid, cnt1, cnt2, kind, prpject)
+      case "507" => getInfoOpenAnnouncement(id, cid, cnt1, cnt2, kind, prpject)
+      case "510" => getInforegister(id, cid, cnt1, cnt2, kind, prpject)
+      //case "4" => getRefereeScore(id, cid, cnt1, cnt2, kind, prpject)
     }
   }
 

+ 17 - 16
src/main/scala/com/winhc/bigdata/spark/model/CompanyEmploymentScore.scala

@@ -10,6 +10,19 @@ import org.apache.spark.sql.{Row, SparkSession}
 import scala.annotation.meta.getter
 import scala.collection.mutable
 
+
+object CompanyEmploymentScore {
+  def main(args: Array[String]): Unit = {
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "1000"
+    )
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+    CompanyEmploymentScore(spark, "company_employment", "", "208", "start_date", "经营情况", "招聘", "0", "winhc_eci_dev").calc()
+    spark.stop()
+  }
+}
+
 /**
  * 招聘得分
  */
@@ -86,12 +99,12 @@ case class CompanyEmploymentScore(s: SparkSession, sourceTable: String, tableVie
     val cnt1 = r.getAs[Long]("cnt1")
     val cnt2 = r.getAs[Long]("cnt2")
     flag match {
-      case "208" => employmentScore(id, cid, cnt1,cnt2, kind, prpject)
+      case "208" => employmentScore(id, cid, cnt1, cnt2, kind, prpject)
     }
   }
 
   //招聘
-  def employmentScore(id: Long, cid: String, cnt1: Long, cnt2: Long,kind: String, project: String) = {
+  def employmentScore(id: Long, cid: String, cnt1: Long, cnt2: Long, kind: String, project: String) = {
     var score = 0f
     val total = 5f
     val extraScore = 0f
@@ -102,7 +115,7 @@ case class CompanyEmploymentScore(s: SparkSession, sourceTable: String, tableVie
     } else if (cnt1 > 0) {
       score = 4f
       ty = "有招聘信息,但是近1年内无招聘信息"
-    } else{
+    } else {
       score = 3f
       ty = "无招聘信息"
     }
@@ -110,16 +123,4 @@ case class CompanyEmploymentScore(s: SparkSession, sourceTable: String, tableVie
       score, total, extraScore)
   }
 
-}
-
-object CompanyEmploymentScore {
-  def main(args: Array[String]): Unit = {
-    var config = mutable.Map(
-      "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
-      "spark.hadoop.odps.spark.local.partition.amt" -> "10"
-    )
-    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
-    CompanyEmploymentScore(spark, "company_employment","", "208", "start_date", "经营情况", "招聘", "0", "winhc_eci_dev").calc()
-    spark.stop()
-  }
-}
+}

+ 3 - 4
src/main/scala/com/winhc/bigdata/spark/model/CompanyEnvPunishment.scala

@@ -24,8 +24,7 @@ object CompanyEnvPunishment {
       "spark.hadoop.odps.spark.local.partition.amt" -> "1000"
     )
     val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
-    CompanyEnvPunishment(spark, "company_env_punishment",
-      "", "411", "update_time", "经营风险", "环保处罚", "0", s"$project").calc()
+    CompanyEnvPunishment(spark, "company_env_punishment", "", "411", "update_time", "经营风险", "环保处罚", "0", s"$project").calc()
     spark.stop()
   }
 }
@@ -33,7 +32,7 @@ object CompanyEnvPunishment {
 case class CompanyEnvPunishment(s: SparkSession, sourceTable: String, tableView: String = "",
                                 flag: String, time: String, kind: String, project: String,
                                 tp: String = "0", namespace: String = ""
-                              ) extends LoggingUtils {
+                               ) extends LoggingUtils {
 
   @(transient@getter) val spark: SparkSession = s
 
@@ -107,7 +106,7 @@ case class CompanyEnvPunishment(s: SparkSession, sourceTable: String, tableView:
     val extraScore = 0f
     var ty = ""
     if (StringUtils.isNotBlank(content)) {
-      val bool1 = StrUtil.containsAny(content, "停业关闭", "暂扣", "吊销", "许可证","执照")
+      val bool1 = StrUtil.containsAny(content, "停业关闭", "暂扣", "吊销", "许可证", "执照")
       if (bool1) {
         score = 0f
         ty = "责令停产停业关闭、暂扣或吊销许可证、执照的行政处罚"

+ 5 - 7
src/main/scala/com/winhc/bigdata/spark/model/CompanyEquityInfo.scala

@@ -23,10 +23,8 @@ object CompanyEquityInfo {
       "spark.hadoop.odps.spark.local.partition.amt" -> "1000"
     )
     val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
-    CompanyEquityInfo(spark, "company_equity_info_list",
-      "", "301", "main_id", "资产权益", "股权出质(质权人)", "0", s"$project").calc()
-    CompanyEquityInfo(spark, "company_equity_info_list",
-      "", "408", "main_id", "经营风险", "股权出质(出质人)", "0", s"$project").calc()
+    CompanyEquityInfo(spark, "company_equity_info_list", "", "301", "main_id", "资产权益", "股权出质(质权人)", "0", s"$project").calc()
+    CompanyEquityInfo(spark, "company_equity_info_list", "", "408", "main_id", "经营风险", "股权出质(出质人)", "0", s"$project").calc()
     spark.stop()
   }
 }
@@ -59,9 +57,9 @@ case class CompanyEquityInfo(s: SparkSession, sourceTable: String, tableView: St
     }
 
     var appsql1 = ""
-    if(flag.equals("301")){
+    if (flag.equals("301")) {
       appsql1 = s"AND  type = 2"
-    }else if(flag.equals("408")){
+    } else if (flag.equals("408")) {
       appsql1 = s"AND  type = 1"
     }
 
@@ -145,7 +143,7 @@ case class CompanyEquityInfo(s: SparkSession, sourceTable: String, tableView: St
     if (cnt1 == 0) {
       score = 5f
       ty = "无"
-    }else {
+    } else {
       score = 1f
       ty = "有"
     }

+ 70 - 43
src/main/scala/com/winhc/bigdata/spark/model/CompanyInfoCalculatorV2.scala

@@ -3,57 +3,69 @@ package com.winhc.bigdata.spark.model
 import java.util.Date
 
 import com.winhc.bigdata.calc.DimScoreV2
-import com.winhc.bigdata.spark.utils.SparkUtils
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, Maxcomputer2Hbase, SparkUtils}
 import org.apache.commons.lang3.StringUtils
-import org.apache.commons.logging.LogFactory
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.sql.{Row, SparkSession}
 
 import scala.collection.mutable
 
+/**
+ * @Description:公司基本信息评分
+ * @author π
+ * @date 2020/9/14 16:52
+ */
 object CompanyInfoCalculatorV2 {
 
-  private val LOG = LogFactory.getLog(this.getClass)
-
   def main(args: Array[String]): Unit = {
 
-    if (args.length != 3) {
-      println("请配置计算资源: instances, cores, memory .")
-      System.exit(-1)
-    }
+    val project = "winhc_eci_dev"
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> s"$project",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "1000"
+    )
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+    CompanyInfoCalculatorV2(spark, "company", "", "", "", "", "", "0", s"$project").calc()
+    spark.stop()
+  }
 
-    var config = mutable.Map.empty[String, String]
-    val Array(instances, cores, memory) = args
+}
 
-    println(
-      s"""
-         |instances : $instances,
-         |cores : $cores,
-         |memory : $memory
-         |""".stripMargin)
-
-    config = mutable.Map("spark.executor.instances" -> instances,
-      "spark.executor.cores" -> cores,
-      "spark.executor.memory" -> memory
-    )
 
-    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+case class CompanyInfoCalculatorV2(s: SparkSession, sourceTable: String, tableView: String = "",
+                                   flag: String, time: String, kind: String, project: String,
+                                   tp: String = "0", namespace: String = "") extends LoggingUtils {
 
-    import spark._
-    import spark.implicits._
+  override protected val spark: SparkSession = s
 
+  import spark.implicits._
+
+  def calc(): Unit = {
     println("company calc start! " + new Date().toString)
 
-    val ods_company = "new_ods_company"
-    val company_score = "ads_company_score_v3"
-    val company_category = "const_company_category_code"
-    val company_stock = "ods_company_stock"
+    val adsTable = s"$namespace.ads_company" //公司基本信息表
+    val incAdsTable = s"$namespace.inc_ads_company" //增量公司基本信息表
+    val company_score = s"$namespace.ads_company_score"
+    val company_category = s"$namespace.const_company_category_code"
+    val company_stock = s"$namespace.ods_company_stock"
+
+    var ds = ""
+    //区别有无分区表
+    var appsql2 = ""
+    var tb = adsTable
+    if ("1".equals(tp)) {
+      tb = tableView
+      ds = BaseUtil.getPartion(incAdsTable, spark)
+    } else {
+      ds = BaseUtil.getPartion(adsTable, spark)
+      appsql2 = s"AND  ds = ${ds}"
+    }
 
     //所属行业
     val code2Name: Broadcast[Map[String, String]] = spark.sparkContext.broadcast(sql(
       s"""
-        |select category_code,category_str_big
-        |from $company_category
+         |select category_code,category_str_big
+         |from $company_category
       """.stripMargin).collect().map(r => {
       (r.getString(0), r.getString(1))
     }).toMap)
@@ -61,8 +73,8 @@ object CompanyInfoCalculatorV2 {
     //上市信息
     val stock: Broadcast[Map[String, String]] = spark.sparkContext.broadcast(sql(
       s"""
-        |select cid,name
-        |from $company_stock
+         |select cid,name
+         |from $company_stock
       """.stripMargin).collect().map(r => {
       (r.getLong(0).toString, "1")
     }).toMap)
@@ -83,25 +95,38 @@ object CompanyInfoCalculatorV2 {
          |        ,CAST(from_time AS STRING) from_time
          |        ,CAST(to_time AS STRING) to_time
          |        ,reg_location
-         |FROM    ${ods_company}
-         |where   cid is not null
+         |FROM    ${tb}
+         |where   cid is not null ${appsql2}
          |""".stripMargin).flatMap(r => {
       trans(stock, code2Name, r)
-    }).toDF("id", "cid", "name", "kind", "kind_code", "project", "project_code", "type", "score", "total", "extraScore")
-      .createOrReplaceTempView(s"${ods_company}_tmp_view")
+    }).toDF("id", "cid", "kind", "kind_code", "project", "project_code", "type", "score", "total", "extraScore")
+      .createOrReplaceTempView(s"${sourceTable}_tmp_view")
 
-    sql(s"insert overwrite table ${company_score}  select * from ${ods_company}_tmp_view")
-    println("company calc end! " + new Date().toString)
+    sql(s"select * from ${sourceTable}_tmp_view").show(100)
 
-    spark.stop();
-  }
+    sql(s"insert overwrite table ${company_score}  partition (ds='${ds}') select * from ${sourceTable}_tmp_view")
+
+    //同步hbase
+    if ("1".equals(tp)) { //存量计算不用同步hbase
+      val dataFrame = sql(
+        s"""
+           |select
+           |CONCAT_WS('_',cid,project_code) AS rowkey,
+           |id,cid,kind,kind_code,project,project_code,type,score,total,extraScore
+           |from ${sourceTable}_tmp_view
+           |""".stripMargin)
+
+      Maxcomputer2Hbase(dataFrame, "COMPANY_SCORE").syn()
+    }
 
+    println("company calc end! " + new Date().toString)
+  }
 
   private def trans(stock: Broadcast[Map[String, String]], code2Name: Broadcast[Map[String, String]], r: Row) = {
 
     val id = r.getAs[Long]("id")
     val cid = r.getAs[Long]("cid").toString
-    val name = r.getAs[String]("name")
+    //val name = r.getAs[String]("name")
     val reg_capital = r.getAs[String]("reg_capital")
     val actual_capital_amount = r.getAs[Long]("actual_capital_amount")
     val actual_capital_currency = r.getAs[String]("actual_capital_currency")
@@ -128,7 +153,9 @@ object CompanyInfoCalculatorV2 {
     val r8 = DimScoreV2.bean2Map(DimScoreV2.addressScore(reg_location))
 
     Seq(r1, r2, r3, r4, r5, r6, r7, r8)
-      .map(m => (id, cid, name, m.get("kind"), m.get("kind_code"), m.get("project"), m.get("project_code"),
+      .map(m => (id, cid, m.get("kind"), m.get("kind_code"), m.get("project"), m.get("project_code"),
         m.get("type"), m.get("score").toFloat, m.get("total").toFloat, m.get("extraScore").toFloat))
   }
-}
+
+
+}

+ 151 - 0
src/main/scala/com/winhc/bigdata/spark/model/CompanyLandScoreV1.scala

@@ -0,0 +1,151 @@
+package com.winhc.bigdata.spark.model
+
+import java.util.Date
+import com.winhc.bigdata.calc.DimScoreV2
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, Maxcomputer2Hbase, SparkUtils}
+import org.apache.spark.sql.{Row, SparkSession}
+import scala.annotation.meta.getter
+import scala.collection.mutable
+
+
+/**
+ * @Description:地块公示,购地信息
+ * @author π
+ * @date 2020/9/316:52
+ */
+object CompanyLandScoreV1 {
+  def main(args: Array[String]): Unit = {
+    val Array(project) = args
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> s"$project",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "1000"
+    )
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+    //土地公示
+    CompanyLandScoreV1(spark, "company_land_publicity", "", "304", "update_time", "资产权益", "土地公示", "0", s"$project").calc()
+    //购地信息
+    CompanyLandScoreV1(spark, "company_land_announcement", "", "305", "update_time", "资产权益", "购地信息", "0", s"$project").calc()
+    spark.stop()
+  }
+}
+
+case class CompanyLandScoreV1(s: SparkSession, sourceTable: String, tableView: String = "",
+                              flag: String, time: String, kind: String, project: String,
+                              tp: String = "0", namespace: String = ""
+                             ) extends LoggingUtils {
+
+  @(transient@getter) val spark: SparkSession = s
+
+  import spark.implicits._
+
+  def calc(): Unit = {
+    println(s"${this.getClass.getSimpleName} calc start! " + new Date().toString)
+    val adsTable = namespace + ".ads_" + sourceTable
+    val incAdsTable = namespace + ".inc_ads_" + sourceTable
+    val targetTable = namespace + ".ads_" + sourceTable + "_score"
+    var ds = ""
+
+    //区别有无分区表
+    var appsql2 = ""
+    var tb = adsTable
+    if ("1".equals(tp)) {
+      tb = tableView
+      ds = BaseUtil.getPartion(incAdsTable, spark)
+    } else {
+      ds = BaseUtil.getPartion(adsTable, spark)
+      appsql2 = s"AND  ds = ${ds}"
+    }
+
+    val df = sql(
+      s"""
+         |SELECT  *
+         |FROM    (
+         |        SELECT
+         |                *
+         |                ,COUNT(new_cid) OVER(PARTITION BY new_cid ) AS cnt1
+         |                ,ROW_NUMBER() OVER(PARTITION BY new_cid ORDER BY $time DESC ) AS num
+         |        FROM    $tb
+         |        WHERE   new_cid IS NOT NULL
+         |        ${appsql2}
+         |        ) a
+         |WHERE   num =1
+         |""".stripMargin)
+
+    df.map(r => {
+      trans(r, flag, kind, project)
+    }).toDF("id", "cid", "kind", "kind_code", "project", "project_code", "type",
+      "score", "total", "extraScore")
+      .createOrReplaceTempView(s"t1_view")
+
+    sql("select * from t1_view").show(20, false)
+
+    sql(s"insert overwrite table ${targetTable} " +
+      s"partition (ds='${ds}')  select * from t1_view")
+
+    //同步hbase
+    if ("1".equals(tp)) { //存量计算不用同步hbase
+      val dataFrame = sql(
+        s"""
+           |select
+           |CONCAT_WS('_',cid,project_code) AS rowkey,
+           |id,cid,kind,kind_code,project,project_code,type,score,total,extraScore
+           |from t1_view
+           |""".stripMargin)
+
+      Maxcomputer2Hbase(dataFrame, "COMPANY_SCORE").syn()
+    }
+    println(s"${this.getClass.getSimpleName} calc end! " + new Date().toString)
+  }
+
+  def trans(r: Row, flag: String, kind: String, prpject: String) = {
+    val id = r.getAs[Long]("id")
+    val cid = r.getAs[Long]("new_cid").toString
+    val cnt1 = r.getAs[Long]("cnt1")
+
+    flag match {
+      case "304" => getLandpubScore(id, cid, cnt1, kind, prpject)
+      case "305" => getLandbuyScore(id, cid, cnt1, kind, prpject)
+    }
+  }
+
+  //地块公示
+  def getLandpubScore(id: Long, cid: String, cnt1: Long, kind: String, project: String) = {
+    var score = 0f
+    val total = 15f
+    val extraScore = 0f
+    var ty = ""
+    if (cnt1 == 0) {
+      score = 7f
+      ty = "无"
+    } else if(cnt1 <= 2) {
+      score = 12f
+      ty = "≤2"
+    }else{
+      score = 15f
+      ty = ">2"
+    }
+    (id, cid, kind, DimScoreV2.newsEventMap.get(kind), project, DimScoreV2.newsEventMap.get(project), ty,
+      score, total, extraScore)
+  }
+
+  //购地信息
+  def getLandbuyScore(id: Long, cid: String, cnt1: Long, kind: String, project: String) = {
+    var score = 0f
+    val total = 15f
+    val extraScore = 0f
+    var ty = ""
+    if (cnt1 == 0) {
+      score = 7f
+      ty = "无"
+    } else if(cnt1 <= 2) {
+      score = 12f
+      ty = "≤2"
+    }else{
+      score = 15f
+      ty = ">2"
+    }
+    (id, cid, kind, DimScoreV2.newsEventMap.get(kind), project, DimScoreV2.newsEventMap.get(project), ty,
+      score, total, extraScore)
+  }
+
+}

+ 118 - 0
src/main/scala/com/winhc/bigdata/spark/model/CompanyMortgageInfo.scala

@@ -0,0 +1,118 @@
+package com.winhc.bigdata.spark.model
+
+import java.util.Date
+import cn.hutool.core.util.StrUtil
+import com.winhc.bigdata.calc.DimScoreV2
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, Maxcomputer2Hbase, SparkUtils}
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql.{Row, SparkSession}
+import scala.annotation.meta.getter
+import scala.collection.mutable
+
+/**
+ * @Description:动产抵押
+ * @author π
+ * @date 2020/9/3 16:52
+ */
+object CompanyMortgageInfo {
+  def main(args: Array[String]): Unit = {
+    val project = "winhc_eci_dev"
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> s"$project",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "1000"
+    )
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+    CompanyMortgageInfo(spark, "company_mortgage_info", "", "407", "update_time", "经营风险", "动产", "0", s"$project").calc()
+    spark.stop()
+  }
+}
+
+case class CompanyMortgageInfo(s: SparkSession, sourceTable: String, tableView: String = "",
+                               flag: String, time: String, kind: String, project: String,
+                               tp: String = "0", namespace: String = ""
+                               ) extends LoggingUtils {
+
+  @(transient@getter) val spark: SparkSession = s
+
+  import spark.implicits._
+
+  def calc(): Unit = {
+    println(s"${this.getClass.getSimpleName} calc start! " + new Date().toString)
+    val adsTable = namespace + ".ads_" + sourceTable
+    val incAdsTable = namespace + ".inc_ads_" + sourceTable
+    val targetTable = namespace + ".ads_" + sourceTable + "_score"
+    var ds = ""
+
+    //区别有无分区表
+    var appsql2 = ""
+    var tb = adsTable
+    if ("1".equals(tp)) {
+      tb = tableView
+      ds = BaseUtil.getPartion(incAdsTable, spark)
+    } else {
+      ds = BaseUtil.getPartion(adsTable, spark)
+      appsql2 = s"AND  ds = ${ds}"
+    }
+
+    val df = sql(
+      s"""
+         |SELECT
+         |        new_cid,concat_ws(',' , collect_set(remark)) content
+         |FROM    $tb
+         |WHERE   new_cid IS NOT NULL
+         |${appsql2}
+         |GROUP BY new_cid
+         |""".stripMargin)
+
+    df.map(r => {
+      trans(r, flag, kind, project)
+    }).toDF("id", "cid", "kind", "kind_code", "project", "project_code", "type",
+      "score", "total", "extraScore")
+      .createOrReplaceTempView(s"t1_view")
+
+    sql("select * from t1_view").show(20, false)
+
+    sql(s"insert overwrite table ${targetTable} " +
+      s"partition (ds='${ds}')  select * from t1_view")
+
+    //同步hbase
+    if ("1".equals(tp)) { //存量计算不用同步hbase
+      val dataFrame = sql(
+        s"""
+           |select
+           |CONCAT_WS('_',cid,project_code) AS rowkey,
+           |id,cid,kind,kind_code,project,project_code,type,score,total,extraScore
+           |from t1_view
+           |""".stripMargin)
+
+      Maxcomputer2Hbase(dataFrame, "COMPANY_SCORE").syn()
+    }
+    println(s"${this.getClass.getSimpleName} calc end! " + new Date().toString)
+  }
+
+  def trans(r: Row, flag: String, kind: String, prpject: String) = {
+    val id = -1
+    val cid = r.getAs[Long]("new_cid").toString
+    val content = r.getAs[String]("content")
+    getScore(id, cid, kind, prpject, content)
+  }
+
+  //环保处罚
+  def getScore(id: Long, cid: String, kind: String, project: String, content: String) = {
+    var score = 0f
+    val total = 5f
+    val extraScore = 0f
+    var ty = "抵押其他动产"
+    if (StringUtils.isNotBlank(content)) {
+      val bool1 = StrUtil.containsAny(content, "飞机", "船舶")
+      if (bool1) {
+        score = 2f
+        ty = "抵押飞机、船舶等特殊动产"
+      }
+    }
+
+    (id, cid, kind, DimScoreV2.newsEventMap.get(kind), project, DimScoreV2.newsEventMap.get(project), ty,
+      score, total, extraScore)
+  }
+
+}

+ 13 - 4
src/main/scala/com/winhc/bigdata/spark/model/CompanyPunishmentInfo.scala

@@ -13,7 +13,7 @@ import scala.collection.mutable
 
 
 /**
- * @Description:行政处罚
+ * @Description:行政处罚,行政处罚信用中国得分
  * @author π
  * @date 2020/9/3 16:52
  */
@@ -25,8 +25,11 @@ object CompanyPunishmentInfo {
       "spark.hadoop.odps.spark.local.partition.amt" -> "1000"
     )
     val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
-    CompanyPunishmentInfo(spark, "company_punishment_info",
-      "", "410", "update_time", "经营风险", "行政处罚", "0", s"$project").calc()
+
+    //行政处罚
+    CompanyPunishmentInfo(spark, "company_punishment_info", "", "410", "update_time", "经营风险", "行政处罚", "0", s"$project").calc()
+    //行政处罚信用中国
+    CompanyPunishmentInfo(spark, "company_punishment_info_creditchina", "", "410", "update_time", "经营风险", "行政处罚", "0", s"$project").calc()
     spark.stop()
   }
 }
@@ -48,6 +51,12 @@ case class CompanyPunishmentInfo(s: SparkSession, sourceTable: String, tableView
     val targetTable = namespace + ".ads_" + sourceTable + "_score"
     var ds = ""
 
+    //行政处罚内容字段
+    var content = "content"
+    if (sourceTable.contains(s"creditchina")) {
+      content = "result"
+    }
+
     //区别有无分区表
     var appsql2 = ""
     var tb = adsTable
@@ -62,7 +71,7 @@ case class CompanyPunishmentInfo(s: SparkSession, sourceTable: String, tableView
     val df = sql(
       s"""
          |SELECT
-         |        new_cid,concat_ws(',' , collect_set(content)) content
+         |        new_cid,concat_ws(',' , collect_set($content)) content
          |FROM    $tb
          |WHERE   new_cid IS NOT NULL
          |${appsql2}

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/utils/CompanyNameMappingUtil.scala

@@ -1,6 +1,6 @@
 package com.winhc.bigdata.spark.utils
 
-import cn.oyohotels.utils.HbaseUtil
+import com.winhc.bigdata.spark.utils
 import com.aliyun.odps.utils.StringUtils
 import com.winhc.bigdata.spark.jobs.CompanyNameMappingPro.{cname_bytes, current_cid_bytes, f_bytes}
 import org.apache.hadoop.hbase.CellUtil

+ 28 - 19
src/main/scala/com/winhc/bigdata/spark/utils/HbaseUtil.scala

@@ -1,22 +1,17 @@
-package cn.oyohotels.utils
-
-import java.{lang, util}
+package com.winhc.bigdata.spark.utils
 
 import com.winhc.bigdata.spark.config.HBaseConfig
 import org.apache.hadoop.hbase._
 import org.apache.hadoop.hbase.client._
-import org.apache.hadoop.hbase.filter.{BinaryPrefixComparator, CompareFilter, RowFilter}
 import org.apache.hadoop.hbase.util.Bytes
-import org.slf4j.LoggerFactory
-
-import scala.collection.mutable
 import scala.collection.mutable.ListBuffer
 
-
+/**
+ * π hbase工具
+ */
 object HbaseUtil {
   final val FAMILY_NAME = "F"
   final val RT_NS = "default"
-  private val logger = LoggerFactory.getLogger(this.getClass)
 
   val lrs = HbaseUtil.getClass.getResource("/").getPath
 
@@ -50,19 +45,17 @@ object HbaseUtil {
 
   def getRowDataScan(tb: Table, prefix: String, family: String = FAMILY_NAME) = {
     val scan = new Scan()
-    val prefixFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, new BinaryPrefixComparator(prefix.getBytes()));
-    scan.setFilter(prefixFilter)
+    //    val prefixFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, new BinaryPrefixComparator(prefix.getBytes()));
+    //    scan.setFilter(prefixFilter)
+    scan.setRowPrefixFilter(prefix.getBytes())
     var ms = ListBuffer[Map[String, String]]()
     try {
       val scanner = tb.getScanner(scan)
       import scala.collection.JavaConversions._
       import collection.JavaConverters._
       for (res <- scanner) {
-        //System.out.println(res)
-//        val row: Array[Byte] = res.getRow
         var r: Map[String, String] = res.rawCells().map(c1 => (Bytes.toString(CellUtil.cloneQualifier(c1)), Bytes.toString(CellUtil.cloneValue(c1)))).toMap
-        //        println(r)
-        r += ("ROWKEY" ->Bytes.toString(res.getRow))
+        r += ("ROWKEY" -> Bytes.toString(res.getRow))
         ms.+=(r)
       }
     } catch {
@@ -71,10 +64,26 @@ object HbaseUtil {
     ms.toList
   }
 
-  def main(args: Array[String]): Unit = {
-    val rows = getRowDataScan(getTable("COMPANY_SCORE"), "23402373")
-    for(r <- rows){
-      println(r)
+  def deleteRows(tb: Table, rowkeys: List[String], family: String = FAMILY_NAME): Unit = {
+    import org.apache.hadoop.hbase.client.Delete
+    import scala.collection.JavaConverters._
+    val deletes: ListBuffer[Delete] = ListBuffer[Delete]()
+    for (r <- rowkeys) {
+      val t: Delete = new Delete(Bytes.toBytes(r))
+      deletes += t
+      //println(t)
     }
+    tb.delete(deletes.asJava)
+  }
+
+  def main(args: Array[String]): Unit = {
+//    val rows = getRowDataScan(getTable("COMPANY_SCORE"), "23402373")
+//    for (r <- rows) {
+//      println(r)
+//    }
+
+    val rows = List[String]("4")
+    deleteRows(getTable("TEST_COMPANY"), rows)
+
   }
 }