|
@@ -0,0 +1,324 @@
|
|
|
+package com.winhc.bigdata.spark.jobs
|
|
|
+
|
|
|
+import java.sql.Timestamp
|
|
|
+import java.util
|
|
|
+import java.util.Collections
|
|
|
+
|
|
|
+import com.alibaba.fastjson.JSON
|
|
|
+import com.winhc.bigdata.spark.udf.{BaseFunc, CompanyMapping, JsonSerializable}
|
|
|
+import com.winhc.bigdata.spark.utils.BaseUtil._
|
|
|
+import com.winhc.bigdata.spark.utils.EsRestUtils.getRestClient
|
|
|
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
|
|
|
+import org.apache.commons.lang3.StringUtils
|
|
|
+import org.apache.http.entity.ContentType
|
|
|
+import org.apache.http.nio.entity.NStringEntity
|
|
|
+import org.apache.http.util.EntityUtils
|
|
|
+import org.apache.spark.internal.Logging
|
|
|
+import org.apache.spark.sql.functions.col
|
|
|
+import org.apache.spark.sql.{Row, SparkSession}
|
|
|
+import org.elasticsearch.client.RestClient
|
|
|
+import org.json4s.jackson.Json
|
|
|
+
|
|
|
+import scala.annotation.meta.getter
|
|
|
+import scala.collection.mutable
|
|
|
+
|
|
|
+/**
|
|
|
+ * @Description: 裁判文书
|
|
|
+ * @author lyb
|
|
|
+ * @date
|
|
|
+ */
|
|
|
+
|
|
|
+case class YgYishenWenshuJson(
|
|
|
+ yg_name: String,
|
|
|
+ bg_name: String,
|
|
|
+ case_no: String,
|
|
|
+ case_amt : String,
|
|
|
+ case_stage : String,
|
|
|
+ is_success : String,
|
|
|
+ judge_date: String
|
|
|
+ ) extends JsonSerializable
|
|
|
+
|
|
|
+object YgYishenWenshuJson {
|
|
|
+ def apply(r: Row, cols: Seq[String]) = {
|
|
|
+ var res: Map[String, String] = cols.map(c => {
|
|
|
+ (c, r.getAs[String](c))
|
|
|
+ }).toMap
|
|
|
+ res += ("tag" ->"一审胜诉")
|
|
|
+ res
|
|
|
+ }
|
|
|
+
|
|
|
+}
|
|
|
+
|
|
|
+case class CompanyWenshuYishen(s: SparkSession, project: String, //表所在工程名
|
|
|
+ tableName: String //表名(不加前后辍)
|
|
|
+ //detailCols: Seq[String] // 详情字段
|
|
|
+ ) extends LoggingUtils with CompanyMapping with Logging with BaseFunc {
|
|
|
+
|
|
|
+ @(transient@getter) val spark: SparkSession = s
|
|
|
+
|
|
|
+ def calc(runOld: Boolean = false) = {
|
|
|
+ import spark.implicits._
|
|
|
+
|
|
|
+ val ods_wenshu_detail = s"${project}.ods_wenshu_detail" //增量ods表
|
|
|
+
|
|
|
+ var adsListDs = getPartion(ods_wenshu_detail, spark)
|
|
|
+ //跑存量取第一个分区
|
|
|
+ if (runOld) {
|
|
|
+ adsListDs = getFirstPartion(ods_wenshu_detail, spark)
|
|
|
+ }
|
|
|
+ val ads_case_chance_address = s"${project}.inc_ads_case_chance_wenshu_yg_yishen_shengsu_address" //增量地址表
|
|
|
+ val ads_case_chance = s"${project}.inc_ads_case_chance_wenshu_yg_yishen_shengsu" //增量原被告-原告表
|
|
|
+ val address_type = "0" // 债权人 0; 债务人 1
|
|
|
+ //原告一审胜诉
|
|
|
+// val df = sql(
|
|
|
+// s"""
|
|
|
+// |SELECT *
|
|
|
+// |FROM (
|
|
|
+// | SELECT *
|
|
|
+// | ,ROW_NUMBER() OVER (PARTITION BY litigant_name ORDER BY publish_date DESC ) num
|
|
|
+// | ,md5(CLEANUP(concat_ws('',yg_name,bg_name,case_no,judge_date,court_name))) AS rowkey_business
|
|
|
+// | FROM $inc_ads_company_tb_list a
|
|
|
+// | WHERE ds = $adsListDs
|
|
|
+// | AND LENGTH(litigant_name) > 4
|
|
|
+// | AND is_success = '胜'
|
|
|
+// | AND judge_date >= '${atMonthsBefore(1)} 00:00:00'
|
|
|
+// | AND case_stage = '一审'
|
|
|
+// | AND case_type = '民事案件'
|
|
|
+// | ) b
|
|
|
+// |WHERE num = 1
|
|
|
+// |""".stripMargin)
|
|
|
+
|
|
|
+
|
|
|
+ var df = sql(
|
|
|
+ s"""
|
|
|
+ | SELECT
|
|
|
+ | md5(CLEANUP(concat_ws('',cname,bg_name,case_no,court_name,judge_date))) AS rowkey
|
|
|
+ | ,case_id AS biz_id
|
|
|
+ | ,cname
|
|
|
+ | ,yg_name
|
|
|
+ | ,bg_name
|
|
|
+ | ,case_reason
|
|
|
+ | ,case_stage
|
|
|
+ | ,case_amt
|
|
|
+ | ,case_no
|
|
|
+ |
|
|
|
+ | ,title
|
|
|
+ | ,ROW_NUMBER() OVER(PARTITION BY cname,bg_name,case_no,court_name,judge_date ORDER BY judge_date) RN
|
|
|
+ | ,court_name
|
|
|
+ | ,judge_date AS biz_date
|
|
|
+ | ,url AS link_url
|
|
|
+ | ,is_success
|
|
|
+ | ,judge_result
|
|
|
+ | FROM ods_wenshu_detail
|
|
|
+ | LATERAL VIEW explode(split(yg_name, '\n')) tmpTable AS cname
|
|
|
+ | WHERE ds = $adsListDs
|
|
|
+ | AND judge_date >= '${atMonthsBefore(1)}'
|
|
|
+ | AND is_success = '胜'
|
|
|
+ | AND case_stage = '一审'
|
|
|
+ | AND case_type = '民事案件'
|
|
|
+ | AND length(cname) > 4
|
|
|
+ |""".stripMargin
|
|
|
+ )
|
|
|
+
|
|
|
+
|
|
|
+ println("dfnum:"+df.count())
|
|
|
+
|
|
|
+ df.mapPartitions(iter => {
|
|
|
+ trans(iter)
|
|
|
+ //restClient.close()
|
|
|
+ }).filter(_ != null)
|
|
|
+ .toDF("rowkey", "title", "cname", "cid", "yg_name", "bg_name", "label", "business_id",
|
|
|
+ "business_type", "business_type_name", "dynamic_content", "biz_date", "create_time", "province_code", "city_code", "county_code")
|
|
|
+ .createOrReplaceTempView("t1")
|
|
|
+
|
|
|
+ //案源机会表
|
|
|
+ sql(
|
|
|
+ s"""
|
|
|
+ |insert ${if (isWindows) "INTO" else "OVERWRITE"} table $ads_case_chance partition (ds=$adsListDs)
|
|
|
+ |select
|
|
|
+ |rowkey,title,cname,cid,label,business_id,business_type,business_type_name,dynamic_content,
|
|
|
+ |biz_date,create_time, province_code, city_code, county_code
|
|
|
+ |from t1
|
|
|
+ |""".stripMargin)
|
|
|
+
|
|
|
+ //债务人要素表
|
|
|
+ sql(
|
|
|
+ s"""
|
|
|
+ |insert ${if (isWindows) "INTO" else "OVERWRITE"} table $ads_case_chance_address partition (ds=$adsListDs)
|
|
|
+ |select
|
|
|
+ |md5(concat_ws('',rowkey, business_type, $address_type, province_code,city_code)) as id,
|
|
|
+ |rowkey, business_type, $address_type, province_code,city_code,county_code,biz_date,create_time
|
|
|
+ |from t1
|
|
|
+ |where trim(province_code) <> ''
|
|
|
+ |""".stripMargin)
|
|
|
+
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ def trans(iter: Iterator[Row]) = {
|
|
|
+ val restClient = getRestClient()
|
|
|
+ val df = iter.map(r => {
|
|
|
+ try {
|
|
|
+ import org.json4s.DefaultFormats
|
|
|
+ val rowkey_business = r.getAs[String]("rowkey") //案源机会主键
|
|
|
+ val title = r.getAs[String]("title") //标题
|
|
|
+ val cname = r.getAs[String]("cname") //企业名称
|
|
|
+ val yg_name = r.getAs[String]("yg_name") //原告
|
|
|
+
|
|
|
+ val bg_name = r.getAs[String]("bg_name") //被告企业
|
|
|
+ val label: String = Json(DefaultFormats).write(YgYishenWenshuJson(r, Seq("case_amt", "biz_date"))) //标签列表
|
|
|
+ val business_id = r.getAs[Long]("biz_id") //业务主键id
|
|
|
+ val business_type = "5" //动态类型
|
|
|
+ val business_type_name = "0" //动态类型name
|
|
|
+ val m1: Map[String, String] = queryCompany(restClient, cname)
|
|
|
+ //动态变更内容
|
|
|
+ val m2: Map[String, String] = YgYishenWenshuJson(r, Seq("yg_name",
|
|
|
+ "bg_name", "case_reason", "case_no", "court_name", "biz_date", "link_url", "case_stage", "is_success"))
|
|
|
+ val dynamic_content = Json(DefaultFormats).write(m1 ++: m2)
|
|
|
+ val biz_date = r.getAs[Timestamp]("biz_date") //动态变更时间
|
|
|
+ val create_time = atMonthsBefore(0, "yyyy-MM-dd HH:mm:ss") //创建时间
|
|
|
+ val province_code = m1.getOrElse("province_code", "") //省code
|
|
|
+ val city_code = m1.getOrElse("city_code", "") //市code
|
|
|
+ val county_code = m1.getOrElse("county_code", "") //区code
|
|
|
+ val cid = m1.getOrElse("id", "") //企业cid
|
|
|
+ (rowkey_business, title, cname, cid, yg_name, bg_name, label, business_id, business_type,
|
|
|
+ business_type_name, dynamic_content, biz_date, create_time, province_code, city_code, county_code)
|
|
|
+ } catch {
|
|
|
+ case e: Exception => {
|
|
|
+ logWarning(r.toString())
|
|
|
+ logError(e.getMessage, e)
|
|
|
+ null
|
|
|
+ }
|
|
|
+ }
|
|
|
+ })
|
|
|
+ df
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ def regfun() = {
|
|
|
+ prepareFunctions(spark)
|
|
|
+ company_split()
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ def queryCompany(restClient: RestClient, companyName: String) = {
|
|
|
+ val query =
|
|
|
+ s"""
|
|
|
+ |{
|
|
|
+ | "_source": {
|
|
|
+ | "includes": [ "_id","province_code", "city_code","county_code","reg_capital","estiblish_time","phones"]
|
|
|
+ | },
|
|
|
+ | "query": {
|
|
|
+ | "term": {
|
|
|
+ | "cname.value.keyword": "${BaseUtil.cleanup(companyName)}"
|
|
|
+ | }
|
|
|
+ | },
|
|
|
+ | "sort": [
|
|
|
+ | {
|
|
|
+ | "company_type": {
|
|
|
+ | "order": "asc"
|
|
|
+ | }
|
|
|
+ | }
|
|
|
+ | ]
|
|
|
+ |}
|
|
|
+ |""".stripMargin
|
|
|
+ val entity = new NStringEntity(query, ContentType.APPLICATION_JSON)
|
|
|
+
|
|
|
+ val indexResponse = restClient.performRequest(
|
|
|
+ "GET",
|
|
|
+ "/winhc-company/company/_search",
|
|
|
+ Collections.emptyMap[String, String](),
|
|
|
+ entity)
|
|
|
+ val en = indexResponse.getEntity
|
|
|
+ val res = EntityUtils.toString(en)
|
|
|
+ import scala.collection.JavaConverters._
|
|
|
+ val list = getIndexResult2(res)
|
|
|
+ if (list.nonEmpty) {
|
|
|
+ val id = list.head("_id").asInstanceOf[String]
|
|
|
+ val source: util.Map[String, Any] = list.head("_source").asInstanceOf[util.Map[String, Any]]
|
|
|
+ val province_code = source.get("province_code").asInstanceOf[String]
|
|
|
+ val city_code = source.get("city_code").asInstanceOf[String]
|
|
|
+ val county_code = source.get("county_code").asInstanceOf[String]
|
|
|
+ val reg_capital = source.get("reg_capital").asInstanceOf[String]
|
|
|
+ val estiblish_time = source.get("estiblish_time").asInstanceOf[String]
|
|
|
+ val phones = source.get("phones").asInstanceOf[util.List[String]].asScala.mkString(",")
|
|
|
+
|
|
|
+ Map(
|
|
|
+ "id" -> id,
|
|
|
+ "province_code" -> province_code,
|
|
|
+ "city_code" -> city_code,
|
|
|
+ "county_code" -> county_code,
|
|
|
+ "reg_capital" -> reg_capital,
|
|
|
+ "estiblish_time" -> estiblish_time,
|
|
|
+ "phones" -> phones
|
|
|
+ )
|
|
|
+ } else {
|
|
|
+ Map.empty[String, String]
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ def getIndexResult2(json: String) = {
|
|
|
+ import scala.collection.JavaConverters._
|
|
|
+ JSON.parseObject(json).getJSONObject("hits").getJSONArray("hits").toArray().map(m => m.asInstanceOf[util.Map[String, Any]]).map(_.asScala).toList
|
|
|
+ }
|
|
|
+
|
|
|
+}
|
|
|
+
|
|
|
+object CompanyWenshuYgYishen {
|
|
|
+ def main(args: Array[String]): Unit = {
|
|
|
+ var project = "winhc_eci_dev"
|
|
|
+ var table = ""
|
|
|
+ var runOld = false
|
|
|
+
|
|
|
+// if (args.length == 2) {
|
|
|
+// val Array(project1, table1) = args
|
|
|
+// project = project1
|
|
|
+// table = table1
|
|
|
+// } else if (args.length == 3) {
|
|
|
+// val Array(project1, table1, remain) = args
|
|
|
+// project = project1
|
|
|
+// table = table1
|
|
|
+// if (remain.equals("1"))
|
|
|
+// runOld = true
|
|
|
+// } else {
|
|
|
+// println("please set project,table...")
|
|
|
+// sys.exit(-1)
|
|
|
+// }
|
|
|
+
|
|
|
+// println(
|
|
|
+// s"""
|
|
|
+// |project: $project| table: $table| runOld: $runOld
|
|
|
+// |""".stripMargin)
|
|
|
+
|
|
|
+ val config = mutable.Map(
|
|
|
+ "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
|
|
|
+ "spark.hadoop.odps.spark.local.partition.amt" -> "100"
|
|
|
+ )
|
|
|
+ val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
|
|
|
+
|
|
|
+ val companyWenshu = CompanyWenshuYishen(spark, project, table)
|
|
|
+ companyWenshu.regfun()
|
|
|
+ //是否跑全量数据
|
|
|
+ if (!runOld) {
|
|
|
+// val flag = announcement.preCalc()
|
|
|
+ //增量没更新返回
|
|
|
+// if(!flag) return
|
|
|
+ }
|
|
|
+ companyWenshu.calc(runOld)
|
|
|
+ spark.stop()
|
|
|
+
|
|
|
+
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+
|