Browse Source

增量数据更新

xufei 4 years ago
parent
commit
120168df92

+ 16 - 9
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyForCid.scala

@@ -3,6 +3,8 @@ package com.winhc.bigdata.spark.jobs
 import com.winhc.bigdata.spark.utils.{CompanyForCidUtils, SparkUtils}
 import com.winhc.bigdata.spark.utils.{CompanyForCidUtils, SparkUtils}
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.SparkSession
 
 
+import scala.collection.mutable
+
 /**
 /**
  * 专利
  * 专利
  * π
  * π
@@ -10,38 +12,43 @@ import org.apache.spark.sql.SparkSession
 
 
 object CompanyForCid {
 object CompanyForCid {
   val tabMapping: Map[String, Seq[String]] =
   val tabMapping: Map[String, Seq[String]] =
-    Map("ods_company_icp" -> Seq("web_site", "new_cid"), //网站
+    Map("ods_company_icp" -> Seq("domain", "new_cid"), //网站
       "ods_company_tm" -> Seq("reg_no", "new_cid"), //商标
       "ods_company_tm" -> Seq("reg_no", "new_cid"), //商标
       "ods_company_wechat" -> Seq("public_num", "new_cid"), //微信公众号
       "ods_company_wechat" -> Seq("public_num", "new_cid"), //微信公众号
-      "ods_company_app_info" -> Seq("name", "new_cid") //产品信息
+      "ods_company_app_info" -> Seq("name", "new_cid"), //产品信息
+      "ods_company_own_tax" -> Seq("own_tax_amount","tax_category","tax_num", "new_cid"), //产品信息
+      "ods_company_mortgage_info" -> Seq("reg_date","reg_num","amount", "new_cid") //产品信息
     )
     )
 
 
   def main(args: Array[String]): Unit = {
   def main(args: Array[String]): Unit = {
-    val (sourceTable, cols) = valid(args)
+    val (space, sourceTable, cols) = valid(args)
     //    var config = mutable.Map.empty[String, String]
     //    var config = mutable.Map.empty[String, String]
-    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, null)
-    CompanyForCidUtils(spark, sourceTable, cols).calc()
+    var config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> "winhc_eci_dev"
+    )
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+    CompanyForCidUtils(spark, space, sourceTable, cols).calc()
     spark.stop()
     spark.stop()
   }
   }
 
 
-  def valid(args: Array[String]): (String, Seq[String]) = {
+  def valid(args: Array[String]): (String, String, Seq[String]) = {
     println(args.toSeq.mkString(" "))
     println(args.toSeq.mkString(" "))
     if (args.length == 1) {
     if (args.length == 1) {
 
 
     } else if (args.length == 2) {
     } else if (args.length == 2) {
       val Array(sourceTable, cols) = args
       val Array(sourceTable, cols) = args
-      return (sourceTable, cols.split(";").toSeq)
+      return (sourceTable.split("\\.")(0), sourceTable.split("\\.")(1), cols.split(";").toSeq)
     } else {
     } else {
       println("请输入要计算的table!!!! ")
       println("请输入要计算的table!!!! ")
       sys.exit(-1)
       sys.exit(-1)
     }
     }
     val Array(sourceTable) = args
     val Array(sourceTable) = args
 
 
-    val cols: Seq[String] = tabMapping.getOrElse("ods_" + sourceTable, Seq())
+    val cols: Seq[String] = tabMapping.getOrElse(sourceTable.split("\\.")(1), Seq())
     if (cols.isEmpty) {
     if (cols.isEmpty) {
       println("输入表不存在,请配置计算规则!!!   ")
       println("输入表不存在,请配置计算规则!!!   ")
       sys.exit(-1)
       sys.exit(-1)
     }
     }
-    (sourceTable, cols)
+    (sourceTable.split("\\.")(0), sourceTable.split("\\.")(1), cols)
   }
   }
 }
 }

+ 60 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyIncrForCid.scala

@@ -0,0 +1,60 @@
+package com.winhc.bigdata.spark.jobs
+
+import com.winhc.bigdata.spark.utils.{CompanyForCidUtils, CompanyIncrForCidUtils, SparkUtils}
+import org.apache.spark.sql.SparkSession
+
+import scala.collection.mutable
+
+/**
+ * 增量数据更新
+ * π
+ */
+
+//liscense,domain,new_cid winhc_eci_dev.inc_ods_company winhc_eci_dev.ads_company_icp winhc_eci_dev.inc_ods_company_icp winhc_eci_dev.inc_ads_company_icp
+//winhc_eci_dev.tmp_xf_inc_ods_company_icp_2
+object CompanyIncrForCid {
+
+  val tabMapping: Map[String, (String, String, String, String, String)] =
+    Map("ods_company_icp" -> ("liscense,domain,new_cid", "inc_ods_company", "ads_company_icp", "inc_ods_company_icp", "winhc_eci_dev.inc_ads_company_icp"), //网站
+      "ods_company_tm" -> ("reg_no,new_cid", "", "", "", ""), //商标
+      "ods_company_wechat" -> ("public_num,new_cid", "", "", "", ""), //微信公众号
+      "ods_company_app_info" -> ("name,new_cid", "", "", "", ""), //产品信息
+      "ods_company_own_tax" -> ("own_tax_amount,tax_category,tax_num,new_cid", "", "", "", ""), //税
+      "ods_company_mortgage_info" -> ("reg_date,reg_num,amount,new_cid", "", "", "", "") //
+    )
+
+  def main(args: Array[String]): Unit = {
+    val (cols, t1, t2, t3, t4) = valid(args)
+    var config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "10"
+    )
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+    CompanyIncrForCidUtils(spark, t1, t2, t3, t4, cols).calc()
+    spark.stop()
+  }
+
+  def valid(args: Array[String]): (Seq[String], String, String, String, String) = {
+    println(args.toSeq.mkString(" "))
+
+    if (args.length == 1) {
+
+    } else if (args.length == 5) {
+      val Array(cols, t1, t2, t3, t4) = args
+      return (cols.split(",").toSeq, t1, t2, t3, t4)
+    } else {
+      println("请输入要计算的table!!!! ")
+      sys.exit(-1)
+    }
+    val Array(sourceTable) = args
+    val space = sourceTable.split("\\.")(0) + "."
+
+    val (cols, t1, t2, t3, t4) = tabMapping.getOrElse(sourceTable.split("\\.")(1), null)
+    if (cols.isEmpty) {
+      println("输入表不存在,请配置计算规则!!!   ")
+      sys.exit(-1)
+    }
+
+    (cols.split(",").toSeq, space + t1, space + t2, space + t3, space + t4)
+  }
+}

+ 198 - 0
src/main/scala/com/winhc/bigdata/spark/model/CompanyBidScore.scala

@@ -0,0 +1,198 @@
+package com.winhc.bigdata.spark.model
+
+import java.util.Date
+
+import com.winhc.bigdata.calc.{DimScore, DimScoreV2}
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.sql.{Row, SparkSession}
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+
+/**
+ * 招投标
+ * π
+ */
+
+object CompanyBidScore {
+
+  val tabMapping: Map[String, (String, String, String, String)] =
+    Map("ads_company_bid_list" -> ("1", "publish_time", "资产权益", "招投标") //招投标
+    )
+
+  def main(args: Array[String]): Unit = {
+
+    val (sourceTable, flag, time, kind, project) = valid(args)
+
+    var config = mutable.Map.empty[String, String]
+
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+
+    println(s"company ${this.getClass.getSimpleName} calc start! " + new Date().toString)
+
+    new CompanyBidScore(spark, sourceTable, flag, time, kind, project).calc()
+
+    println(s"company ${this.getClass.getSimpleName} calc end! " + new Date().toString)
+    spark.stop()
+  }
+
+  def valid(args: Array[String]) = {
+    if (args.length != 1) {
+      println("请输入要计算的table!!!! ")
+      sys.exit(-1)
+    }
+    val Array(sourceTable) = args
+
+    val (flag, time, kind, project) = tabMapping.getOrElse(sourceTable, ("", "", "", ""))
+    if (flag.isEmpty || time.isEmpty || kind.isEmpty || project.isEmpty) {
+      println("输入表不存在!!!   ")
+      sys.exit(-1)
+    }
+    (sourceTable, flag, time, kind, project)
+  }
+}
+
+case class CompanyBidScore(s: SparkSession, sourceTable: String,
+                           flag: String, time: String, kind: String, project: String
+                          ) extends LoggingUtils {
+
+  @(transient@getter) val spark: SparkSession = s
+
+  import spark.implicits._
+
+  def calc(): Unit = {
+
+    val ods_company = "new_ods_company"
+    val company_category = "const_company_category_code"
+
+    //    //所属行业
+    //    val code2Name: Broadcast[Map[String, String]] = spark.sparkContext.broadcast(sql(
+    //      s"""
+    //         |select category_code,category_str_big
+    //         |from $company_category
+    //      """.stripMargin).collect().map(r => {
+    //      (r.getString(0), r.getString(1))
+    //    }).toMap)
+    //
+    //    spark.udf.register("industry_name", (code: String) => {
+    //      code2Name.value.getOrElse(code, null)
+    //    })
+    //
+    //    val industry = sql(
+    //      s"""
+    //         |select category_code,cast(cid as string) as ncid,
+    //         |       industry_name(category_code) AS industry_name
+    //         |from $ods_company where cid is not null
+    //         |""".stripMargin)
+    //
+    //    industry.show(100)
+    //
+    //
+    //    industry.createOrReplaceTempView("t1")
+
+    val industry2 = sql(
+      s"""
+         |select a.category_code,cast(a.cid as string) as ncid,
+         |       b.category_str_big AS industry_name
+         |from $ods_company  a
+         |left join const_company_category_code b on a.category_code = b.category_code
+         |where cid is not null
+         |""".stripMargin)
+    industry2.createOrReplaceTempView("t1")
+
+    //    注意线上是否分区
+    //     ds = '${BaseUtil.getPartion(sourceTable, spark)}' AND
+
+    val df = sql(
+      s"""
+         |SELECT  *
+         |FROM    (
+         |        SELECT
+         |                *
+         |                ,COUNT(ncid) OVER(PARTITION BY ncid ) AS cnt1
+         |                ,row_number() OVER(PARTITION BY ncid ORDER BY $time DESC ) AS num
+         |        FROM    $sourceTable
+         |        WHERE
+         |
+         |             ncid IS NOT NULL
+         |        ) a
+         |WHERE   num =1
+         |""".stripMargin).createOrReplaceTempView("t2")
+    //      .join(industry, Seq("ncid"), "left")
+    //      .select("cid", "id", "cnt1", "industry_name", "ncid")
+
+    val df2 = sql(
+      """
+        |select t2.*,t1.industry_name,category_code from t2 left join t1 on t2.ncid = t1.ncid
+        |""".stripMargin)
+    df2.show(100)
+
+    df2.map(r => {
+      trans(r, flag, kind, project)
+    }).toDF("id", "cid", "kind", "kind_code", "project", "project_code", "type",
+      "score", "total", "extraScore")
+      .createOrReplaceTempView(s"${sourceTable}_tmp_view")
+
+
+    sql(s"select * from ${sourceTable}_tmp_view").show(10)
+    sql(s"insert overwrite table ${sourceTable}_score  select * from ${sourceTable}_tmp_view")
+  }
+
+  def trans(r: Row, flag: String, kind: String, prpject: String) = {
+    val id = r.getAs[Long]("id")
+    val cid = r.getAs[Long]("ncid").toString
+    val cnt1 = r.getAs[Long]("cnt1")
+    val industry_name = r.getAs[String]("industry_name")
+    flag match {
+      case "1" => tenderScore(id, cid, cnt1, kind, prpject, industry_name)
+    }
+  }
+
+  //招投标
+  def tenderScore(id: Long, cid: String, cnt1: Long, kind: String, project: String, industry_name: String) = {
+    var score = 0f
+    val total = 20f
+    val extraScore = 0f
+    var isBuilderCompany = false
+    if (StringUtils.isNotBlank(industry_name)
+      && industry_name.contains("建筑")) {
+      isBuilderCompany = true
+    }
+    var ty = ""
+    if (!isBuilderCompany) {
+      if (cnt1 == 0) {
+        score = 12f
+        ty = "无(非建筑类企业)"
+      } else {
+        score = 20f
+        ty = "有(非建筑类企业)"
+      }
+    } else {
+      if (cnt1 == 0) {
+        score = 5f
+        ty = "无(建筑类企业)"
+      } else if (cnt1 <= 3) {
+        score = 12f
+        ty = "招投标数量≤3"
+      } else if (cnt1 <= 10) {
+        score = 15f
+        ty = "招投标数量>3,≤10"
+      } else if (cnt1 <= 50) {
+        score = 17f
+        ty = "招投标数量>10,≤50"
+      } else if (cnt1 <= 100) {
+        score = 20f
+        ty = "招投标数量>50"
+      } else {
+        score = 25f
+        ty = "招投标数量>100,该项分数上加+5"
+      }
+    }
+
+    (id, cid, kind, DimScoreV2.newsEventMap.get(kind), project, DimScoreV2.newsEventMap.get(project), ty,
+      score, total, extraScore)
+  }
+
+}

+ 45 - 13
src/main/scala/com/winhc/bigdata/spark/model/CompanyCourtAnnouncement.scala

@@ -18,7 +18,8 @@ object CompanyCourtAnnouncement {
   val tabMapping: Map[String, (String, String, String, String)] =
   val tabMapping: Map[String, (String, String, String, String)] =
     Map("ads_company_court_announcement_list" -> ("1", "publish_date", "法律风险", "法院公告"), //法院公告
     Map("ads_company_court_announcement_list" -> ("1", "publish_date", "法律风险", "法院公告"), //法院公告
       "ads_company_court_open_announcement_list" -> ("2", "start_date", "法律风险", "开庭公告"), //开庭公告
       "ads_company_court_open_announcement_list" -> ("2", "start_date", "法律风险", "开庭公告"), //开庭公告
-      "ads_company_court_register_list" -> ("3", "filing_date", "法律风险", "立案信息") //立案信息
+      "ads_company_court_register_list" -> ("3", "filing_date", "法律风险", "立案信息"), //立案信息
+      "ads_company_lawsuit_list" -> ("4", "judge_time", "法律风险", "裁判文书") //裁判文书
     )
     )
 
 
   def main(args: Array[String]): Unit = {
   def main(args: Array[String]): Unit = {
@@ -61,6 +62,11 @@ case class CompanyCourtAnnouncement(s: SparkSession, sourceTable: String,
   def calc(): Unit = {
   def calc(): Unit = {
     println(s"company ${this.getClass.getSimpleName} calc start! " + new Date().toString)
     println(s"company ${this.getClass.getSimpleName} calc start! " + new Date().toString)
 
 
+    var sqlapp = ""
+    if (!"4".equals(flag)) {
+      sqlapp = s"and $time >= '${BaseUtil.atMonthsBefore(3)}'"
+    }
+
     val df = sql(
     val df = sql(
       s"""
       s"""
          |SELECT  *
          |SELECT  *
@@ -71,14 +77,14 @@ case class CompanyCourtAnnouncement(s: SparkSession, sourceTable: String,
          |                    ,row_number() OVER(PARTITION BY new_cid ORDER BY $time DESC) AS num
          |                    ,row_number() OVER(PARTITION BY new_cid ORDER BY $time DESC) AS num
          |            FROM    $sourceTable
          |            FROM    $sourceTable
          |            WHERE   ds = '${BaseUtil.getPartion(sourceTable, spark)}' and new_cid is not null
          |            WHERE   ds = '${BaseUtil.getPartion(sourceTable, spark)}' and new_cid is not null
-         |                    and $time >= '${BaseUtil.atMonthsBefore(3)}'
+         |                    ${sqlapp}
          |        ) a
          |        ) a
          |WHERE   num = 1
          |WHERE   num = 1
          |""".stripMargin)
          |""".stripMargin)
 
 
     df.map(r => {
     df.map(r => {
       trans(r, flag, kind, project)
       trans(r, flag, kind, project)
-    }).toDF("id", "cid", "name", "kind", "kind_code", "project", "project_code", "type",
+    }).toDF("id", "cid", "kind", "kind_code", "project", "project_code", "type",
       "score", "total", "extraScore")
       "score", "total", "extraScore")
       .createOrReplaceTempView(s"${sourceTable}_tmp_view")
       .createOrReplaceTempView(s"${sourceTable}_tmp_view")
 
 
@@ -98,18 +104,18 @@ case class CompanyCourtAnnouncement(s: SparkSession, sourceTable: String,
   def trans(r: Row, flag: String, kind: String, prpject: String) = {
   def trans(r: Row, flag: String, kind: String, prpject: String) = {
     val id = r.getAs[Long]("id")
     val id = r.getAs[Long]("id")
     val cid = r.getAs[Long]("new_cid").toString
     val cid = r.getAs[Long]("new_cid").toString
-    val name = r.getAs[String]("new_cname")
     val cnt1 = r.getAs[Long]("cnt1")
     val cnt1 = r.getAs[Long]("cnt1")
     val cnt2 = r.getAs[Long]("cnt2")
     val cnt2 = r.getAs[Long]("cnt2")
     flag match {
     flag match {
-      case "1" => getInfoAnnouncement(id, cid, name, cnt1, cnt2, kind, prpject)
-      case "2" => getInfoOpenAnnouncement(id, cid, name, cnt1, cnt2, kind, prpject)
-      case "3" => getInforegister(id, cid, name, cnt1, cnt2, kind, prpject)
+      case "1" => getInfoAnnouncement(id, cid, cnt1, cnt2, kind, prpject)
+      case "2" => getInfoOpenAnnouncement(id, cid, cnt1, cnt2, kind, prpject)
+      case "3" => getInforegister(id, cid, cnt1, cnt2, kind, prpject)
+      case "4" => getRefereeScore(id, cid, cnt1, cnt2, kind, prpject)
     }
     }
   }
   }
 
 
   //法院公告
   //法院公告
-  def getInfoAnnouncement(id: Long, cid: String, name: String, cnt1: Long, cnt2: Long, kind: String, project: String) = {
+  def getInfoAnnouncement(id: Long, cid: String, cnt1: Long, cnt2: Long, kind: String, project: String) = {
     var score = 0f
     var score = 0f
     val total = 5f
     val total = 5f
     val extraScore = 0f
     val extraScore = 0f
@@ -127,12 +133,12 @@ case class CompanyCourtAnnouncement(s: SparkSession, sourceTable: String,
       score = 0f
       score = 0f
       ty = "近3个月有法院公告,作为被告人/被告/被上诉人/被申请人的数量大于作为公诉人/原告/上诉人/申请人的数量"
       ty = "近3个月有法院公告,作为被告人/被告/被上诉人/被申请人的数量大于作为公诉人/原告/上诉人/申请人的数量"
     }
     }
-    (id, cid, name, kind, DimScoreV2.newsEventMap.get(kind), project, DimScoreV2.newsEventMap.get(project), ty,
+    (id, cid, kind, DimScoreV2.newsEventMap.get(kind), project, DimScoreV2.newsEventMap.get(project), ty,
       score, total, extraScore)
       score, total, extraScore)
   }
   }
 
 
   //开庭公告
   //开庭公告
-  def getInfoOpenAnnouncement(id: Long, cid: String, name: String, cnt1: Long, cnt2: Long, kind: String, project: String) = {
+  def getInfoOpenAnnouncement(id: Long, cid: String, cnt1: Long, cnt2: Long, kind: String, project: String) = {
     var score = 0f
     var score = 0f
     val total = 5f
     val total = 5f
     val extraScore = 0f
     val extraScore = 0f
@@ -150,12 +156,12 @@ case class CompanyCourtAnnouncement(s: SparkSession, sourceTable: String,
       score = 0f
       score = 0f
       ty = "近3个月有开庭公告,作为被告的数量大于作为原告的数量"
       ty = "近3个月有开庭公告,作为被告的数量大于作为原告的数量"
     }
     }
-    (id, cid, name, kind, DimScoreV2.newsEventMap.get(kind), project, DimScoreV2.newsEventMap.get(project), ty,
+    (id, cid, kind, DimScoreV2.newsEventMap.get(kind), project, DimScoreV2.newsEventMap.get(project), ty,
       score, total, extraScore)
       score, total, extraScore)
   }
   }
 
 
   //立案信息
   //立案信息
-  def getInforegister(id: Long, cid: String, name: String, cnt1: Long, cnt2: Long, kind: String, project: String) = {
+  def getInforegister(id: Long, cid: String, cnt1: Long, cnt2: Long, kind: String, project: String) = {
     var score = 0f
     var score = 0f
     val total = 5f
     val total = 5f
     val extraScore = 0f
     val extraScore = 0f
@@ -173,7 +179,33 @@ case class CompanyCourtAnnouncement(s: SparkSession, sourceTable: String,
       score = 0f
       score = 0f
       ty = "近3个月有立案信息,作为被告人/被告/被上诉人/被申请人的数量大于作为公诉人/原告/上诉人/申请人的数量"
       ty = "近3个月有立案信息,作为被告人/被告/被上诉人/被申请人的数量大于作为公诉人/原告/上诉人/申请人的数量"
     }
     }
-    (id, cid, name, kind, DimScoreV2.newsEventMap.get(kind), project, DimScoreV2.newsEventMap.get(project), ty,
+    (id, cid, kind, DimScoreV2.newsEventMap.get(kind), project, DimScoreV2.newsEventMap.get(project), ty,
+      score, total, extraScore)
+  }
+
+  //裁判文书
+  def getRefereeScore(id: Long, cid: String, cnt1: Long, cnt2: Long, kind: String, project: String) = {
+    var score = 0f
+    val total = 8f
+    val extraScore = 0f
+    var ty = ""
+    if (cnt1 == 0 && cnt2 == 0) {
+      score = 8f
+      ty = "无涉诉情况"
+    } else if (cnt1 > 0 && cnt2 == 0) {
+      score = 8f
+      ty = "有涉诉情况,均为原告"
+    } else if (cnt1 == 0 && cnt2 > 0) {
+      score = 0f
+      ty = "有涉诉情况,均为被告"
+    } else if (cnt1 > cnt2) {
+      score = 5f
+      ty = "有涉诉情况,作为原告的案件数量大于作为被告的案件数量"
+    } else if (cnt1 <= cnt2) {
+      score = 0f
+      ty = "有涉诉情况,作为被告的案件数量大于作为原告的案件数量"
+    }
+    (id, cid, kind, DimScoreV2.newsEventMap.get(kind), project, DimScoreV2.newsEventMap.get(project), ty,
       score, total, extraScore)
       score, total, extraScore)
   }
   }
 }
 }

+ 42 - 14
src/main/scala/com/winhc/bigdata/spark/model/CompanyIntellectualsScore.scala

@@ -22,15 +22,17 @@ object CompanyIntellectualsScore {
       "ads_company_copyright_works_list" -> ("2", "finish_time", "资产权益", "著作权"), //作品著作权
       "ads_company_copyright_works_list" -> ("2", "finish_time", "资产权益", "著作权"), //作品著作权
       "ads_company_patent_list" -> ("3;4", "pub_date", "资产权益", "实用新型、外观设计专利;发明专利"), //专利
       "ads_company_patent_list" -> ("3;4", "pub_date", "资产权益", "实用新型、外观设计专利;发明专利"), //专利
       "ads_company_icp" -> ("5", "examine_date", "资产权益", "网站"), //网站
       "ads_company_icp" -> ("5", "examine_date", "资产权益", "网站"), //网站
-      "ads_company_tm" -> ("6", "app_date", "资产权益", "商标") //商标
+      "ads_company_tm" -> ("6", "app_date", "资产权益", "商标"), //商标
+      "ads_company_land_announcement" -> ("7", "commit_time", "资产权益", "购地信息") //购地信息
     )
     )
 
 
   def main(args: Array[String]): Unit = {
   def main(args: Array[String]): Unit = {
 
 
     val (sourceTable, flag, time, kind, project) = valid(args)
     val (sourceTable, flag, time, kind, project) = valid(args)
 
 
-    var config = mutable.Map.empty[String, String]
-
+    val config = mutable.Map.empty[String, String]
+    config.+=("spark.hadoop.odps.project.name"->"winhc_eci_dev")
+    println(config)
     val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
     val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
 
 
     println(s"company ${this.getClass.getSimpleName} calc start! " + new Date().toString)
     println(s"company ${this.getClass.getSimpleName} calc start! " + new Date().toString)
@@ -55,8 +57,9 @@ object CompanyIntellectualsScore {
       sys.exit(-1)
       sys.exit(-1)
     }
     }
     val Array(sourceTable) = args
     val Array(sourceTable) = args
+    println(sourceTable.substring(sourceTable.indexOf(".")+1))
 
 
-    val (flag, time, kind, project) = tabMapping.getOrElse(sourceTable, ("", "", "", ""))
+    val (flag, time, kind, project) = tabMapping.getOrElse(sourceTable.substring(sourceTable.indexOf(".")+1), ("", "", "", ""))
     if (flag.isEmpty || time.isEmpty || kind.isEmpty || project.isEmpty) {
     if (flag.isEmpty || time.isEmpty || kind.isEmpty || project.isEmpty) {
       println("输入表不存在!!!   ")
       println("输入表不存在!!!   ")
       sys.exit(-1)
       sys.exit(-1)
@@ -74,6 +77,9 @@ case class CompanyIntellectualsScore(s: SparkSession, sourceTable: String,
   import spark.implicits._
   import spark.implicits._
 
 
   def calc(): Unit = {
   def calc(): Unit = {
+    val targetTable = "ads_company_total_score"
+    val ds = BaseUtil.getPartion(sourceTable, spark)
+
     var appsql = ""
     var appsql = ""
     var apptab = ""
     var apptab = ""
     if ("3".equals(flag)) { //实用新型、外观设计专利
     if ("3".equals(flag)) { //实用新型、外观设计专利
@@ -93,7 +99,7 @@ case class CompanyIntellectualsScore(s: SparkSession, sourceTable: String,
          |                ,COUNT(new_cid) OVER(PARTITION BY new_cid ) AS cnt1
          |                ,COUNT(new_cid) OVER(PARTITION BY new_cid ) AS cnt1
          |                ,row_number() OVER(PARTITION BY new_cid ORDER BY $time DESC ) AS num
          |                ,row_number() OVER(PARTITION BY new_cid ORDER BY $time DESC ) AS num
          |        FROM    $sourceTable
          |        FROM    $sourceTable
-         |        WHERE   ds = '${BaseUtil.getPartion(sourceTable, spark)}'
+         |        WHERE   ds = ${ds}
          |        AND     new_cid IS NOT NULL ${appsql}
          |        AND     new_cid IS NOT NULL ${appsql}
          |        ) a
          |        ) a
          |WHERE   num =1
          |WHERE   num =1
@@ -103,16 +109,17 @@ case class CompanyIntellectualsScore(s: SparkSession, sourceTable: String,
       trans(r, flag, kind, project)
       trans(r, flag, kind, project)
     }).toDF("id", "cid", "kind", "kind_code", "project", "project_code", "type",
     }).toDF("id", "cid", "kind", "kind_code", "project", "project_code", "type",
       "score", "total", "extraScore")
       "score", "total", "extraScore")
-      .createOrReplaceTempView(s"${sourceTable}_tmp_view")
+      .createOrReplaceTempView(s"t1_view")
 
 
-    logger.info(
-      s"""
-         |- - - - - - - - - - - - - - - - - - - - - - - - -
-         |${showString(sql(s"select * from ${sourceTable}_tmp_view"))}
-         |- - - - - - - - - - - - - - - - - - - - - - - - -
-       """.stripMargin)
+//    logger.info(
+//      s"""
+//         |- - - - - - - - - - - - - - - - - - - - - - - - -
+//         |${showString(sql(s"select * from t1_view"))}
+//         |- - - - - - - - - - - - - - - - - - - - - - - - -
+//       """.stripMargin)
 
 
-    sql(s"insert overwrite table ${sourceTable}${apptab}_score  select * from ${sourceTable}_tmp_view")
+    sql(s"insert overwrite table ${targetTable} " +
+      s"partition (tb='${sourceTable.substring(sourceTable.indexOf(".")+5)}${apptab}' , ds='${ds}')  select * from t1_view")
   }
   }
 
 
   def trans(r: Row, flag: String, kind: String, prpject: String) = {
   def trans(r: Row, flag: String, kind: String, prpject: String) = {
@@ -126,6 +133,7 @@ case class CompanyIntellectualsScore(s: SparkSession, sourceTable: String,
       case "4" => inventScore(id, cid, cnt1, kind, prpject)
       case "4" => inventScore(id, cid, cnt1, kind, prpject)
       case "5" => webSiteScore(id, cid, cnt1, kind, prpject)
       case "5" => webSiteScore(id, cid, cnt1, kind, prpject)
       case "6" => tradeMarkScore(id, cid, cnt1, kind, prpject)
       case "6" => tradeMarkScore(id, cid, cnt1, kind, prpject)
+      case "7" => immovableScore(id, cid, cnt1, kind, prpject)
     }
     }
   }
   }
 
 
@@ -255,7 +263,7 @@ case class CompanyIntellectualsScore(s: SparkSession, sourceTable: String,
     } else if (cnt1 <= 100) {
     } else if (cnt1 <= 100) {
       score = 4f
       score = 4f
       ty = "授予数量>8"
       ty = "授予数量>8"
-    }  else {
+    } else {
       score = 9f
       score = 9f
       ty = "商标数量>100,该项分数上加5分"
       ty = "商标数量>100,该项分数上加5分"
     }
     }
@@ -263,4 +271,24 @@ case class CompanyIntellectualsScore(s: SparkSession, sourceTable: String,
       score, total, extraScore)
       score, total, extraScore)
   }
   }
 
 
+  //购地信息
+  def immovableScore(id: Long, cid: String, cnt1: Long, kind: String, project: String) = {
+    var score = 0f
+    val total = 15f
+    val extraScore = 0f
+    var ty = ""
+    if (cnt1 == 0) {
+      score = 7f
+      ty = "无"
+    } else if (cnt1 <= 2) {
+      score = 12f
+      ty = "≤2"
+    } else {
+      score = 15f
+      ty = ">2"
+    }
+    (id, cid, kind, DimScoreV2.newsEventMap.get(kind), project, DimScoreV2.newsEventMap.get(project), ty,
+      score, total, extraScore)
+  }
+
 }
 }

+ 6 - 0
src/main/scala/com/winhc/bigdata/spark/utils/BaseUtil.scala

@@ -19,6 +19,12 @@ object BaseUtil {
     sql(sql_s).collect.toList.map(_.getString(0).split("=")(1)).last
     sql(sql_s).collect.toList.map(_.getString(0).split("=")(1)).last
   }
   }
 
 
+  def getFirstPartion(t: String, @transient spark: SparkSession) = {
+    import spark._
+    val sql_s = s"show partitions " + t
+    sql(sql_s).collect.toList.map(_.getString(0).split("=")(1)).head
+  }
+
   def atMonthsBefore(n: Int, pattern: String = "yyyy-MM-dd"): String = {
   def atMonthsBefore(n: Int, pattern: String = "yyyy-MM-dd"): String = {
     val c = Calendar.getInstance(Locale.CHINA)
     val c = Calendar.getInstance(Locale.CHINA)
     c.setTimeInMillis(new Date().getTime)
     c.setTimeInMillis(new Date().getTime)

+ 87 - 0
src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncrForCidUtils.scala

@@ -0,0 +1,87 @@
+package com.winhc.bigdata.spark.utils
+
+import java.util.Date
+
+import org.apache.spark.sql.SparkSession
+
+import scala.annotation.meta.getter
+
+/**
+ * π
+ * 增量cid转换
+ */
+
+case class CompanyIncrForCidUtils(s: SparkSession,
+                                  inc_ods_company: String,
+                                  ads_company_tb: String,
+                                  inc_ods_company_tb: String,
+                                  target_inc_ods_company_tb: String,
+                                  cols: Seq[String]) extends LoggingUtils {
+  @(transient@getter) val spark: SparkSession = s
+
+  def calc(): Unit = {
+    println(s"${this.getClass.getSimpleName} calc start! " + new Date().toString)
+
+    //val lastDs = BaseUtil.getPartion("winhc_eci_dev.ads_company_icp", spark)
+    val firstDs = BaseUtil.getFirstPartion("winhc_eci_dev.inc_ods_company", spark)
+
+    //table字段
+    val columns: Seq[String] = spark.table(ads_company_tb).schema.map(_.name).filter(s => {
+      !s.equals("ds") && !s.equals("cid") && !s.equals("new_cid") && !s.equals("rowkey")
+    })
+
+    sql(
+      s"""
+         |SELECT  cid,current_cid as new_cid
+         |FROM    ${inc_ods_company}
+         |WHERE   ds >= ${firstDs}
+         |AND     cid IS NOT NULL
+         |AND     current_cid IS NOT NULL
+         |GROUP BY cid,current_cid
+         |""".stripMargin).cache().createOrReplaceTempView("mapping")
+
+    sql(
+      s"""
+         |INSERT OVERWRITE TABLE ${target_inc_ods_company_tb} PARTITION(ds=$firstDs)
+         |SELECT  CONCAT_WS('_',new_cid,id) AS rowkey
+         |        ,flag
+         |        ,new_cid
+         |        ,cid
+         |        ,${columns.mkString(",")}
+         |FROM    (
+         |            SELECT  "0" AS flag
+         |                    ,a.new_cid
+         |                    ,b.cid
+         |                    ,${columns.mkString(",")}
+         |                    ,ROW_NUMBER() OVER (PARTITION BY ${cols.mkString(",")} ORDER BY update_time DESC ) num
+         |            FROM    mapping a
+         |            JOIN    (
+         |                        SELECT  new_cid AS cid
+         |                                ,${columns.mkString(",")}
+         |                        FROM    ${ads_company_tb}
+         |                        WHERE   ds >= ${firstDs}
+         |                        UNION ALL
+         |                        SELECT  new_cid AS cid
+         |                                ,${columns.mkString(",")}
+         |                        FROM    ${target_inc_ods_company_tb}
+         |                        WHERE   ds >= ${firstDs}
+         |                    ) b
+         |            ON      a.cid = b.cid
+         |            UNION ALL
+         |            SELECT  "1" AS flag
+         |                    ,coalesce(b.new_cid,a.cid) new_cid
+         |                    ,a.cid
+         |                    ,${columns.mkString(",")}
+         |                    ,ROW_NUMBER() OVER (PARTITION BY ${cols.mkString(",")} ORDER BY update_time DESC ) num
+         |            FROM    ${inc_ods_company_tb} a
+         |            LEFT JOIN mapping b
+         |            ON      a.cid = b.cid
+         |            WHERE   a.ds >= ${firstDs}
+         |            AND     a.cid IS NOT NULL
+         |        ) d
+         |WHERE   num = 1
+         |""".stripMargin)
+
+    println(s"${this.getClass.getSimpleName} calc end! " + new Date().toString)
+  }
+}