+package com.winhc.bigdata.spark.jobs
+import java.util
+import java.util.Collections
+import com.alibaba.fastjson.JSON
+import com.winhc.bigdata.spark.const.EnvConst
+import com.winhc.bigdata.spark.udf.{BaseFunc, CompanyMapping, JsonSerializable}
+import com.winhc.bigdata.spark.utils.BaseUtil._
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
+import org.apache.commons.lang3.StringUtils
+import org.apache.http.HttpHost
+import org.apache.http.auth.{AuthScope, UsernamePasswordCredentials}
+import org.apache.http.entity.ContentType
+import org.apache.http.impl.client.BasicCredentialsProvider
+import org.apache.http.impl.nio.client.HttpAsyncClientBuilder
+import org.apache.http.nio.entity.NStringEntity
+import org.apache.http.util.EntityUtils
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.{Row, SparkSession}
+import org.elasticsearch.client.{RestClient, RestClientBuilder}
+import org.json4s.jackson.Json
+import scala.annotation.meta.getter
+import scala.collection.mutable
+ * @Description: 法院公告
+ * @author π
+ * @date 2020/7/1014:33
+ */
+case class CourtAnnouncementV2(
+ plaintiff_name: String,
+ litigant_name: String,
+ case_no: String,
+ publish_date: String
+ ) extends JsonSerializable
+object CourtAnnouncementV2 {
+ def apply(r: Row, cols: Seq[String]) = {
+ val res: Map[String, String] = cols.map(c => {
+ (c, r.getAs[String](c))
+ }).toMap
+ res
+ }
+case class CompanyCourtAnnouncementV2(s: SparkSession, project: String, //表所在工程名
+ tableName: String //表名(不加前后辍)
+ //detailCols: Seq[String] // 详情字段
+ ) extends LoggingUtils with CompanyMapping with Logging with BaseFunc {
+ @(transient@getter) val spark: SparkSession = s
+ def calc(runOld: Boolean = false) = {
+ import spark.implicits._
+ val inc_ads_company_tb_list = s"${project}.inc_ads_${tableName}_list_v1" //增量ads_list表
+ var adsListDs = getPartion(inc_ads_company_tb_list, spark)
+ //跑存量取第一个分区
+ if (runOld) {
+ adsListDs = getFirstPartion(inc_ads_company_tb_list, spark)
+ }
+ val ads_eci_debtor_relation = s"winhc_eci.ads_eci_debtor_relation_v2" //债权全量表
+ val debtorRelationDs = getPartion(ads_eci_debtor_relation, spark)
+ //结果表导入生产表
+ val ads_address = s"winhc_eci.inc_ads_${tableName}_address" //增量地址表
+ val ads_yg_bg = s"winhc_eci.inc_ads_${tableName}_bg_yg" //增量原被告-原告表
+ //被告
+ val df = sql(
+ s"""
+ |FROM (
+ | SELECT *
+ | ,ROW_NUMBER() OVER (PARTITION BY litigant_name ORDER BY publish_date DESC ) num
+ | ,md5(CLEANUP(concat_ws('',plaintiff,litigant,announcement_type,publish_date,case_no,litigant_name))) AS rowkey_business
+ | FROM $inc_ads_company_tb_list a
+ | WHERE ds = $adsListDs
+ | AND LENGTH(litigant_name) > 4
+ | AND announcement_type = '起诉状副本及开庭传票'
+ | AND publish_date >= '${atMonthsBefore(1)}'
+ | ) b
+ |WHERE num = 1
+ |""".stripMargin)
+ val all_cols: Seq[String] = spark.table(inc_ads_company_tb_list).schema.map(_.name).filter(s => {
+ !s.equals("ds")
+ })
+ val cols_md5 = all_cols ++ Seq("rowkey_business")
+ df.select(cols_md5.map(column => col(column).cast("string")): _*).mapPartitions(iter => {
+ trans(iter)
+ //restClient.close()
+ }).filter(_ != null)
+ .toDF("rowkey", "title", "plaintiff", "litigant", "company_name", "company_id", "label", "business_id",
+ "business_type", "business_type_name", "dynamic_content", "publish_date", "create_time", "province_code", "city_code", "county_code")
+ .createOrReplaceTempView("t1")
+ //案源机会表
+ sql(
+ s"""
+ |insert ${if (isWindows) "INTO" else "OVERWRITE"} table $ads_yg_bg partition (ds=$adsListDs)
+ |select
+ |'0' as flag,
+ |rowkey,title,plaintiff,litigant,company_name,company_id,label,business_id,business_type,business_type_name,dynamic_content,
+ |publish_date,create_time
+ |from t1
+ |""".stripMargin)
+ //债务人要素表
+ sql(
+ s"""
+ |insert ${if (isWindows) "INTO" else "OVERWRITE"} table $ads_address partition (ds=$adsListDs)
+ |select
+ |md5(concat_ws('',rowkey,1,business_type,province_code,city_code)) as id,
+ |'0' as flag,
+ |rowkey,1 as address_type,province_code,city_code,county_code,business_type,publish_date,create_time
+ |from t1
+ |where trim(province_code) <> ''
+ |""".stripMargin)
+ //原告
+ sql(
+ s"""
+ |FROM (
+ | SELECT *
+ | ,ROW_NUMBER() OVER (PARTITION BY plaintiff_name ORDER BY publish_date DESC) num
+ | ,md5(CLEANUP(concat_ws('',plaintiff,litigant,announcement_type,publish_date,case_no,plaintiff_name))) AS rowkey_business
+ | FROM $inc_ads_company_tb_list
+ | WHERE ds = $adsListDs
+ | AND announcement_type = '起诉状副本及开庭传票'
+ | AND LENGTH(plaintiff_name) > 4
+ | AND plaintiff_name not like '%银行%'
+ | AND plaintiff_name not like '%保险%'
+ | ) x
+ |WHERE num = 1 AND publish_date >= '${atMonthsBefore(3)}'
+ |""".stripMargin).cache().createOrReplaceTempView("announcement")
+ sql(
+ s"""
+ |SELECT d.*,bg_cid,bg_city_name
+ |FROM announcement d
+ |JOIN (
+ | SELECT bg_name,bg_cid,bg_city_name
+ | FROM $ads_eci_debtor_relation
+ | WHERE ds = $debtorRelationDs
+ | AND deleted = 0
+ | group by bg_name,bg_cid,bg_city_name
+ | ) e
+ |ON cleanup(d.plaintiff_name) = cleanup(e.bg_name)
+ |""".stripMargin).map(r => {
+ trans2(r)
+ })
+ //.filter(_ != null)
+ .toDF("rowkey", "title", "plaintiff", "litigant", "company_name", "company_id", "label", "business_id",
+ "business_type", "business_type_name", "dynamic_content", "publish_date", "create_time")
+ .createOrReplaceTempView("t2")
+ sql(
+ s"""
+ |insert into table $ads_yg_bg partition (ds=$adsListDs)
+ |select
+ |'1' as flag,
+ |rowkey,title,plaintiff,litigant,company_name,company_id,label,business_id,business_type,business_type_name,dynamic_content,
+ |publish_date,create_time
+ |from t2
+ |""".stripMargin)
+ //地址表
+ sql(
+ s"""
+ |SELECT d.*
+ | ,bg_name
+ | ,bg_cid
+ | ,bg_reg_status
+ | ,bg_province_code
+ | ,bg_city_code
+ | ,bg_county_code
+ | ,bg_reg_location
+ | ,bg_estiblish_time
+ | ,bg_category_code
+ | ,bg_reg_capital
+ | ,bg_phones
+ | ,bg_emails
+ | ,yg_name
+ | ,yg_cid
+ | ,yg_reg_status
+ | ,yg_province_code
+ | ,yg_city_code
+ | ,yg_county_code
+ | ,yg_reg_location
+ | ,yg_estiblish_time
+ | ,yg_category_code
+ | ,yg_reg_capital
+ | ,yg_phones
+ | ,yg_emails
+ | ,rowkey_business
+ |FROM announcement d
+ |JOIN (
+ | SELECT *
+ | FROM $ads_eci_debtor_relation
+ | WHERE ds = $debtorRelationDs
+ | AND deleted = 0
+ | ) e
+ |ON cleanup(d.plaintiff_name) = cleanup(e.bg_name)
+ |""".stripMargin).createOrReplaceTempView("t3")
+ sql(
+ s"""
+ |insert into table $ads_address partition (ds=$adsListDs)
+ | md5(concat_ws('',rowkey_business,address_type,business_type,province_code,city_code)) as id
+ | ,'1' as flag
+ | ,rowkey_business
+ | ,address_type
+ | ,province_code
+ | ,city_code
+ | ,county_code
+ | ,business_type
+ | ,publish_date
+ | ,create_time
+ |FROM (
+ | SELECT *
+ | ,ROW_NUMBER() OVER (PARTITION BY rowkey_business,province_code,city_code,address_type ORDER BY publish_date DESC) num
+ | FROM (
+ | SELECT rowkey_business
+ | ,1 AS address_type
+ | ,bg_province_code AS province_code
+ | ,bg_city_code AS city_code
+ | ,bg_county_code AS county_code
+ | ,"7" AS business_type
+ | ,publish_date
+ | ,substr(from_unixtime(unix_timestamp()),1,10) AS create_time
+ | FROM t3
+ | SELECT rowkey_business
+ | ,0 AS address_type
+ | ,yg_province_code AS province_code
+ | ,yg_city_code AS city_code
+ | ,yg_county_code AS county_code
+ | ,"7" AS business_type
+ | ,publish_date
+ | ,substr(from_unixtime(unix_timestamp()),1,10) AS create_time
+ | FROM t3
+ | ) a
+ | ) b
+ |WHERE num = 1 AND trim(province_code) <> ''
+ |""".stripMargin)
+ }
+ def trans(iter: Iterator[Row]) = {
+ val restClient = EsQueryV2.getClient("es.eci.nodes")
+ val df = iter.map(r => {
+ try {
+ import org.json4s.DefaultFormats
+ val rowkey_business = r.getAs[String]("rowkey_business") //案源机会主键
+ val title = "" //标题
+ val plaintiff = r.getAs[String]("plaintiff") //原告
+ val litigant = r.getAs[String]("litigant") //当事人
+ val litigant_name = r.getAs[String]("litigant_name") //被告企业
+ val business_id = r.getAs[String]("rowkey") //业务主键id
+ val business_type = "8" //动态类型
+ val business_type_name = "0" //动态类型name
+ val m1: Map[String, String] = EsQueryV2.queryCompany(restClient, litigant_name)
+ //标签列表
+ val label: String = Json(DefaultFormats).write(
+ CourtAnnouncementV2(r, Seq("announcement_type", "publish_date")) ++ Map("city_name" -> m1("city_name"))
+ )
+ //动态变更内容
+ val m2: Map[String, String] =CourtAnnouncementV2(r, Seq("plaintiff",
+ "litigant", "announcement_type", "court_name", "publish_date", "content"))
+ val dynamic_content = Json(DefaultFormats).write(m1 ++: m2)
+ val publish_date = r.getAs[String]("publish_date") //动态变更时间
+ val create_time = atMonthsBefore(0, "yyyy-MM-dd HH:mm:ss") //创建时间
+ val province_code = m1.getOrElse("province_code", "") //省code
+ val city_code = m1.getOrElse("city_code", "") //市code
+ val county_code = m1.getOrElse("county_code", "") //区code
+ val litigant_cid = m1.getOrElse("id", "") //企业cid
+ (rowkey_business, title, plaintiff, litigant, litigant_name, litigant_cid, label, business_id, business_type,
+ business_type_name, dynamic_content, publish_date, create_time, province_code, city_code, county_code)
+ } catch {
+ case e: Exception => {
+ logWarning(r.toString())
+ logError(e.getMessage, e)
+ null
+ }
+ }
+ })
+ df
+ }
+ def trans2(r: Row) = {
+ import org.json4s.DefaultFormats
+ val rowkey_business = r.getAs[String]("rowkey_business") //案源机会主键
+ val title = "" //标题
+ val plaintiff = r.getAs[String]("plaintiff") //原告
+ val litigant = r.getAs[String]("litigant") //当事人
+ val plaintiff_name = r.getAs[String]("plaintiff_name") //原告企业
+ val plaintiff_cid = r.getAs[String]("bg_cid") //原告企业
+ val city_name = r.getAs[String]("bg_city_name") //原告企业
+ val label: String = Json(DefaultFormats).write(
+ CourtAnnouncementV2(r, Seq("announcement_type", "publish_date")) ++ Map("city_name" -> city_name)
+ ) //标签列表
+ val business_id = r.getAs[String]("rowkey") //业务主键id
+ val business_type = "7" //动态类型
+ val business_type_name = "0" //动态类型name
+ //动态变更内容
+ val m2: Map[String, String] =CourtAnnouncementV2(r, Seq("plaintiff",
+ "litigant", "announcement_type", "court_name", "publish_date", "content"))
+ val dynamic_content = Json(DefaultFormats).write(m2)
+ val publish_date = r.getAs[String]("publish_date") //动态变更时间
+ val create_time = atMonthsBefore(0, "yyyy-MM-dd HH:mm:ss") //创建时间
+ (rowkey_business, title, plaintiff, litigant, plaintiff_name, plaintiff_cid, label, business_id, business_type,
+ business_type_name, dynamic_content, publish_date, create_time)
+ }
+ def regfun() = {
+ prepareFunctions(spark)
+ company_split()
+ }
+object CompanyCourtAnnouncementV2 {
+ def main(args: Array[String]): Unit = {
+ var project = ""
+ var table = ""
+ var runOld = false
+ if (args.length == 2) {
+ val Array(project1, table1) = args
+ project = project1
+ table = table1
+ } else if (args.length == 3) {
+ val Array(project1, table1, remain) = args
+ project = project1
+ table = table1
+ if (remain.equals("1"))
+ runOld = true
+ } else {
+ println("please set project,table...")
+ sys.exit(-1)
+ }
+ println(
+ s"""
+ |project: $project| table: $table| runOld: $runOld
+ |""".stripMargin)
+ val config = mutable.Map(
+ "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
+ "spark.hadoop.odps.spark.local.partition.amt" -> "100"
+ )
+ val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+ val announcement = CompanyCourtAnnouncementV2(spark, project, table)
+ announcement.regfun()
+ announcement.calc()
+ spark.stop()
+ }
+object EsQueryV2 {
+ def main(args: Array[String]): Unit = {
+ val restClient = getClient("es.eci.nodes")
+ val id = queryCompany(restClient, "华为技术有限公司")
+ println(id)
+ restClient.close()
+ }
+ def getClient(nodes: String): RestClient = {
+ val credentialsProvider = new BasicCredentialsProvider();
+ credentialsProvider.setCredentials(AuthScope.ANY,
+ new UsernamePasswordCredentials("elastic", "elastic_168"))
+ val restClient = RestClient.builder(new HttpHost(EnvConst.getEnv().getValue(nodes), 9200))
+ .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
+ override def customizeHttpClient(httpClientBuilder: HttpAsyncClientBuilder): HttpAsyncClientBuilder = httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
+ }).build()
+ restClient
+ }
+ def queryCompany(restClient: RestClient, companyName: String) = {
+ val query =
+ s"""
+ |{
+ | "_source": {
+ | "includes": [
+ | "_id",
+ | "province_code",
+ | "city_code",
+ | "county_code",
+ | "reg_capital",
+ | "estiblish_time",
+ | "phones",
+ | "category_first_code",
+ | "category_second_code",
+ | "category_third_code"
+ | ]
+ | },
+ | "query": {
+ | "term": {
+ | "cname.value.keyword": "${BaseUtil.cleanup(companyName)}"
+ | }
+ | },
+ | "sort": [
+ | {
+ | "company_type": {
+ | "order": "asc"
+ | }
+ | }
+ | ]
+ |}
+ |""".stripMargin
+ val entity = new NStringEntity(query, ContentType.APPLICATION_JSON)
+ val indexResponse = restClient.performRequest(
+ "GET",
+ "/winhc-company-v8/company/_search",
+ Collections.emptyMap[String, String](),
+ entity)
+ val en = indexResponse.getEntity
+ val res = EntityUtils.toString(en)
+ import scala.collection.JavaConverters._
+ val list = getIndexResult2(res)
+ if (list.nonEmpty) {
+ val id = list.head("_id").asInstanceOf[String]
+ val source: util.Map[String, Any] = list.head("_source").asInstanceOf[util.Map[String, Any]]
+ val province_code = source.get("province_code").asInstanceOf[String]
+ val city_code = source.get("city_code").asInstanceOf[String]
+ val county_code = source.get("county_code").asInstanceOf[String]
+ val reg_capital = source.get("reg_capital").asInstanceOf[String]
+ val estiblish_time = source.get("estiblish_time").asInstanceOf[String]
+ val phones = source.get("phones").asInstanceOf[util.List[String]].asScala.mkString(",")
+ val category_first = source.get("category_first").asInstanceOf[String]
+ val category_second = source.get("category_second").asInstanceOf[String]
+ val category_third = source.get("category_third").asInstanceOf[String]
+ Map(
+ "id" -> id,
+ "province_code" -> province_code,
+ "city_code" -> city_code,
+ "county_code" -> county_code,
+ "reg_capital" -> reg_capital,
+ "estiblish_time" -> estiblish_time,
+ "phones" -> phones,
+ "category_first" -> category_first,
+ "category_second" -> category_second,
+ "category_third" -> category_third
+ )
+ } else {
+ Map.empty[String, String]
+ }
+ }
+ def getIndexResult2(json: String) = {
+ import scala.collection.JavaConverters._
+ JSON.parseObject(json).getJSONObject("hits").getJSONArray("hits").toArray().map(m => m.asInstanceOf[util.Map[String, Any]]).map(_.asScala).toList
+ }