|
@@ -1,20 +1,30 @@
|
|
|
package com.winhc.bigdata.spark.jobs
|
|
|
|
|
|
import java.util.Date
|
|
|
-
|
|
|
-import com.winhc.bigdata.spark.utils.SparkUtils
|
|
|
+import com.winhc.bigdata.spark.utils.{BaseUtil, SparkUtils}
|
|
|
import org.apache.spark.sql.{Row, SparkSession}
|
|
|
import com.winhc.bigdata.calc.DimScoreV2
|
|
|
-import org.apache.spark.internal.Logging
|
|
|
-import com.winhc.bigdata.spark.utils.BaseUtil
|
|
|
import scala.collection.mutable
|
|
|
|
|
|
/**
|
|
|
- * 法院公告
|
|
|
+ * 法院公告,开庭公告,立案信息
|
|
|
*/
|
|
|
-object CompanyCourtAnnouncement extends Logging {
|
|
|
+object CompanyCourtAnnouncement {
|
|
|
+
|
|
|
+ val tabMapping = Map("ads_company_court_announcement_list" -> ("1", "publish_date", "法律风险", "法院公告"), //法院公告
|
|
|
+ "ads_company_court_open_announcement_list" -> ("2", "start_date", "法律风险", "开庭公告"), //开庭公告
|
|
|
+ "ads_company_court_register_list" -> ("3", "filing_date", "法律风险", "立案信息") //立案信息
|
|
|
+ )
|
|
|
+
|
|
|
def main(args: Array[String]): Unit = {
|
|
|
|
|
|
+ if (args.length != 1) {
|
|
|
+ println("请输入要计算的table!!!! ")
|
|
|
+ sys.exit(-1)
|
|
|
+ }
|
|
|
+
|
|
|
+ val sourceTable = args(0)
|
|
|
+
|
|
|
var config = mutable.Map.empty[String, String]
|
|
|
val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
|
|
|
import spark.implicits._
|
|
@@ -22,8 +32,11 @@ object CompanyCourtAnnouncement extends Logging {
|
|
|
import org.apache.spark.sql.functions._
|
|
|
println(s"company ${this.getClass.getSimpleName} calc start! " + new Date().toString)
|
|
|
|
|
|
- val company_court_announcement = "ads_company_court_announcement_list"
|
|
|
- val company_court_announcement_score = "company_court_announcement_score"
|
|
|
+ val (flag, time, kind, project) = tabMapping.getOrElse(sourceTable, ("", "", "", ""))
|
|
|
+ if (flag.isEmpty || time.isEmpty || kind.isEmpty || project.isEmpty) {
|
|
|
+ println("输入表不存在!!! ")
|
|
|
+ sys.exit(-1)
|
|
|
+ }
|
|
|
|
|
|
val df = sql(
|
|
|
s"""
|
|
@@ -32,32 +45,43 @@ object CompanyCourtAnnouncement extends Logging {
|
|
|
| SELECT *
|
|
|
| ,sum(CASE WHEN party_role = 'y' THEN 1 ELSE 0 END) OVER(PARTITION BY new_cid) AS cnt1
|
|
|
| ,sum(CASE WHEN party_role = 'n' THEN 1 ELSE 0 END) OVER(PARTITION BY new_cid) AS cnt2
|
|
|
- | ,row_number() OVER(PARTITION BY new_cid ORDER BY publish_date DESC) AS num
|
|
|
- | FROM $company_court_announcement
|
|
|
- | WHERE ds = '${BaseUtil.getPartion(company_court_announcement, spark)}' and new_cid is not null
|
|
|
- | and publish_date >= '${BaseUtil.atMonthsBefore(3)}'
|
|
|
+ | ,row_number() OVER(PARTITION BY new_cid ORDER BY $time DESC) AS num
|
|
|
+ | FROM $sourceTable
|
|
|
+ | WHERE ds = '${BaseUtil.getPartion(sourceTable, spark)}' and new_cid is not null
|
|
|
+ | and $time >= '${BaseUtil.atMonthsBefore(3)}'
|
|
|
| ) a
|
|
|
|WHERE num = 1
|
|
|
|""".stripMargin)
|
|
|
|
|
|
df.map(r => {
|
|
|
- trans(r)
|
|
|
+ trans(r, flag, kind, project)
|
|
|
}).toDF("id", "cid", "name", "kind", "kind_code", "project", "project_code", "type",
|
|
|
"score", "total", "extraScore")
|
|
|
- .createOrReplaceTempView(s"${company_court_announcement}_tmp_view")
|
|
|
+ .createOrReplaceTempView(s"${sourceTable}_tmp_view")
|
|
|
+
|
|
|
+ sql(s"select * from ${sourceTable}_tmp_view").show(100)
|
|
|
|
|
|
- sql(s"insert overwrite table ${company_court_announcement_score} select * from ${company_court_announcement}_tmp_view")
|
|
|
+ sql(s"insert overwrite table ${sourceTable}_score select * from ${sourceTable}_tmp_view")
|
|
|
|
|
|
println(s"company ${this.getClass.getSimpleName} calc end! " + new Date().toString)
|
|
|
spark.stop()
|
|
|
}
|
|
|
|
|
|
- def trans(r: Row) = {
|
|
|
+ def trans(r: Row, flag: String, kind: String, prpject: String) = {
|
|
|
val id = r.getAs[Long]("id")
|
|
|
val cid = r.getAs[Long]("new_cid").toString
|
|
|
val name = r.getAs[String]("new_cname")
|
|
|
val cnt1 = r.getAs[Long]("cnt1")
|
|
|
val cnt2 = r.getAs[Long]("cnt2")
|
|
|
+ flag match {
|
|
|
+ case "1" => getInfoAnnouncement(id, cid, name, cnt1, cnt2, kind, prpject)
|
|
|
+ case "2" => getInfoOpenAnnouncement(id, cid, name, cnt1, cnt2, kind, prpject)
|
|
|
+ case "3" => getInforegister(id, cid, name, cnt1, cnt2, kind, prpject)
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ //法院公告
|
|
|
+ def getInfoAnnouncement(id: Long, cid: String, name: String, cnt1: Long, cnt2: Long, kind: String, project: String) = {
|
|
|
var score = 0f
|
|
|
val total = 5f
|
|
|
val extraScore = 0f
|
|
@@ -75,8 +99,52 @@ object CompanyCourtAnnouncement extends Logging {
|
|
|
score = 0f
|
|
|
ty = "近3个月有法院公告,作为被告人/被告/被上诉人/被申请人的数量大于作为公诉人/原告/上诉人/申请人的数量"
|
|
|
}
|
|
|
- val kind = "法律风险"
|
|
|
- val project = "法院公告"
|
|
|
+ (id, cid, name, kind, DimScoreV2.newsEventMap.get(kind), project, DimScoreV2.newsEventMap.get(project), ty,
|
|
|
+ score, total, extraScore)
|
|
|
+ }
|
|
|
+
|
|
|
+ //开庭公告
|
|
|
+ def getInfoOpenAnnouncement(id: Long, cid: String, name: String, cnt1: Long, cnt2: Long, kind: String, project: String) = {
|
|
|
+ var score = 0f
|
|
|
+ val total = 5f
|
|
|
+ val extraScore = 0f
|
|
|
+ var ty = ""
|
|
|
+ if (cnt1 == 0 && cnt2 == 0) {
|
|
|
+ score = 5f
|
|
|
+ ty = "近3个月无开庭公告"
|
|
|
+ } else if (cnt1 > 0 && cnt2 == 0) {
|
|
|
+ score = 5f
|
|
|
+ ty = "近3个月有开庭公告,均为原告"
|
|
|
+ } else if (cnt1 > cnt2) {
|
|
|
+ score = 3f
|
|
|
+ ty = "近3个月有开庭公告,作为原告的数量大于作为被告的数量"
|
|
|
+ } else if (cnt1 <= cnt2) {
|
|
|
+ score = 0f
|
|
|
+ ty = "近3个月有开庭公告,作为被告的数量大于作为原告的数量"
|
|
|
+ }
|
|
|
+ (id, cid, name, kind, DimScoreV2.newsEventMap.get(kind), project, DimScoreV2.newsEventMap.get(project), ty,
|
|
|
+ score, total, extraScore)
|
|
|
+ }
|
|
|
+
|
|
|
+ //立案信息
|
|
|
+ def getInforegister(id: Long, cid: String, name: String, cnt1: Long, cnt2: Long, kind: String, project: String) = {
|
|
|
+ var score = 0f
|
|
|
+ val total = 5f
|
|
|
+ val extraScore = 0f
|
|
|
+ var ty = ""
|
|
|
+ if (cnt1 == 0 && cnt2 == 0) {
|
|
|
+ score = 5f
|
|
|
+ ty = "近3个月无立案信息"
|
|
|
+ } else if (cnt1 > 0 && cnt2 == 0) {
|
|
|
+ score = 5f
|
|
|
+ ty = "近3个月有立案信息,均为公诉人/原告/上诉人/申请人"
|
|
|
+ } else if (cnt1 > cnt2) {
|
|
|
+ score = 3f
|
|
|
+ ty = "近3个月有立案信息,作为公诉人/原告/上诉人/申请人的数量大于作为被告人/被告/被上诉人/被申请人的数量"
|
|
|
+ } else if (cnt1 <= cnt2) {
|
|
|
+ score = 0f
|
|
|
+ ty = "近3个月有立案信息,作为被告人/被告/被上诉人/被申请人的数量大于作为公诉人/原告/上诉人/申请人的数量"
|
|
|
+ }
|
|
|
(id, cid, name, kind, DimScoreV2.newsEventMap.get(kind), project, DimScoreV2.newsEventMap.get(project), ty,
|
|
|
score, total, extraScore)
|
|
|
}
|