|
@@ -0,0 +1,528 @@
|
|
|
+package com.winhc.bigdata.spark.jobs
|
|
|
+
|
|
|
+import java.util.Collections
|
|
|
+
|
|
|
+import com.winhc.bigdata.spark.udf.{BaseFunc, CompanyMapping, JsonSerializable}
|
|
|
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
|
|
|
+import com.winhc.bigdata.spark.utils.BaseUtil._
|
|
|
+import org.apache.spark.sql.{Row, SparkSession}
|
|
|
+
|
|
|
+import scala.annotation.meta.getter
|
|
|
+import scala.collection.mutable
|
|
|
+import com.winhc.bigdata.spark.utils.EsRestUtils.{getIndexResult, getRestClient}
|
|
|
+import org.apache.commons.lang3.StringUtils
|
|
|
+import org.apache.http.entity.ContentType
|
|
|
+import org.apache.http.nio.entity.NStringEntity
|
|
|
+import org.apache.http.util.EntityUtils
|
|
|
+import org.apache.spark.internal.Logging
|
|
|
+import org.apache.spark.sql.functions.col
|
|
|
+import org.elasticsearch.client.RestClient
|
|
|
+import org.json4s.jackson.Json
|
|
|
+
|
|
|
+/**
|
|
|
+ * @Description: 法院公告
|
|
|
+ * @author π
|
|
|
+ * @date 2020/7/1014:33
|
|
|
+ */
|
|
|
+
|
|
|
+case class CourtAnnouncement(
|
|
|
+ plaintiff_name: String,
|
|
|
+ litigant_name: String,
|
|
|
+ case_no: String,
|
|
|
+ publish_date: String
|
|
|
+ ) extends JsonSerializable
|
|
|
+
|
|
|
+object CourtAnnouncement {
|
|
|
+ def apply(r: Row, cols: Seq[String]) = {
|
|
|
+ val res: Map[String, String] = cols.map(c => {
|
|
|
+ (c, r.getAs[String](c))
|
|
|
+ }).toMap
|
|
|
+ res
|
|
|
+ }
|
|
|
+
|
|
|
+}
|
|
|
+
|
|
|
+case class CompanyCourtAnnouncement(s: SparkSession, project: String, //表所在工程名
|
|
|
+ tableName: String //表名(不加前后辍)
|
|
|
+ //detailCols: Seq[String] // 详情字段
|
|
|
+ ) extends LoggingUtils with CompanyMapping with Logging with BaseFunc {
|
|
|
+
|
|
|
+ @(transient@getter) val spark: SparkSession = s
|
|
|
+
|
|
|
+ def calc() = {
|
|
|
+ import spark.implicits._
|
|
|
+
|
|
|
+ val inc_ads_company_tb_list = s"${project}.inc_ads_${tableName}_list" //增量ads_list表
|
|
|
+ val adsListDs = getPartion(inc_ads_company_tb_list, spark)
|
|
|
+ val ads_eci_debtor_relation = s"${project}.ads_eci_debtor_relation" //债权全量表
|
|
|
+ val debtorRelationDs = getPartion(ads_eci_debtor_relation, spark)
|
|
|
+ val ads_address = s"${project}.inc_ads_${tableName}_address" //增量地址表
|
|
|
+ val ads_yg_bg = s"${project}.inc_ads_${tableName}_bg_yg" //增量原被告-原告表
|
|
|
+
|
|
|
+ //被告
|
|
|
+ val df = sql(
|
|
|
+ s"""
|
|
|
+ |SELECT *
|
|
|
+ |FROM (
|
|
|
+ | SELECT *
|
|
|
+ | ,ROW_NUMBER() OVER (PARTITION BY litigant_name ORDER BY publish_date DESC ) num
|
|
|
+ | ,md5(CLEANUP(concat_ws('',plaintiff,litigant,announcement_type,publish_date,case_no,litigant_name))) AS rowkey_business
|
|
|
+ | FROM $inc_ads_company_tb_list a
|
|
|
+ | WHERE ds = $adsListDs
|
|
|
+ | AND LENGTH(litigant_name) > 4
|
|
|
+ | AND announcement_type = '起诉状副本及开庭传票'
|
|
|
+ | ) b
|
|
|
+ |WHERE num = 1
|
|
|
+ |""".stripMargin)
|
|
|
+
|
|
|
+ val all_cols: Seq[String] = spark.table(inc_ads_company_tb_list).schema.map(_.name).filter(s => {
|
|
|
+ !s.equals("ds")
|
|
|
+ })
|
|
|
+ val cols_md5 = all_cols ++ Seq("rowkey_business")
|
|
|
+
|
|
|
+ df.select(cols_md5.map(column => col(column).cast("string")): _*).mapPartitions(iter => {
|
|
|
+ trans(iter)
|
|
|
+ //restClient.close()
|
|
|
+ }).filter(_ != null)
|
|
|
+ .toDF("rowkey", "title", "plaintiff", "litigant", "company_name", "company_id", "label", "business_id",
|
|
|
+ "business_type", "business_type_name", "dynamic_content", "publish_date", "create_time", "province_code", "city_code", "county_code")
|
|
|
+ .createOrReplaceTempView("t1")
|
|
|
+
|
|
|
+ //案源机会表
|
|
|
+ sql(
|
|
|
+ s"""
|
|
|
+ |insert ${if (isWindows) "INTO" else "OVERWRITE"} table $ads_yg_bg partition (ds=$adsListDs)
|
|
|
+ |select
|
|
|
+ |'0' as flag,
|
|
|
+ |rowkey,title,plaintiff,litigant,company_name,company_id,label,business_id,business_type,business_type_name,dynamic_content,
|
|
|
+ |publish_date,create_time
|
|
|
+ |from t1
|
|
|
+ |""".stripMargin)
|
|
|
+
|
|
|
+ //债务人要素表
|
|
|
+ sql(
|
|
|
+ s"""
|
|
|
+ |insert ${if (isWindows) "INTO" else "OVERWRITE"} table $ads_address partition (ds=$adsListDs)
|
|
|
+ |select
|
|
|
+ |'0' as flag,
|
|
|
+ |rowkey,1 as address_type,province_code,city_code,county_code,business_type,publish_date,create_time
|
|
|
+ |from t1
|
|
|
+ |where trim(province_code) <> ''
|
|
|
+ |""".stripMargin)
|
|
|
+
|
|
|
+ //原告
|
|
|
+ sql(
|
|
|
+ s"""
|
|
|
+ |SELECT d.*,bg_cid
|
|
|
+ |FROM (
|
|
|
+ | SELECT *
|
|
|
+ | FROM (
|
|
|
+ | SELECT *
|
|
|
+ | ,ROW_NUMBER() OVER (PARTITION BY plaintiff_name ORDER BY publish_date DESC) num
|
|
|
+ | ,md5(CLEANUP(concat_ws('',plaintiff,litigant,announcement_type,publish_date,case_no,plaintiff_name))) AS rowkey_business
|
|
|
+ | FROM $inc_ads_company_tb_list
|
|
|
+ | WHERE ds = $adsListDs
|
|
|
+ | AND announcement_type = '起诉状副本及开庭传票'
|
|
|
+ | AND LENGTH(plaintiff_name) > 4
|
|
|
+ | ) x
|
|
|
+ | WHERE num = 1
|
|
|
+ | ) d
|
|
|
+ |JOIN (
|
|
|
+ | SELECT bg_name,bg_cid
|
|
|
+ | FROM $ads_eci_debtor_relation
|
|
|
+ | WHERE ds = $debtorRelationDs
|
|
|
+ | AND deleted = 0
|
|
|
+ | group by bg_name,bg_cid
|
|
|
+ | ) e
|
|
|
+ |ON cleanup(d.plaintiff_name) = cleanup(e.bg_name)
|
|
|
+ |""".stripMargin).map(r => {
|
|
|
+ trans2(r)
|
|
|
+ })
|
|
|
+ //.filter(_ != null)
|
|
|
+ .toDF("rowkey", "title", "plaintiff", "litigant", "company_name", "company_id", "label", "business_id",
|
|
|
+ "business_type", "business_type_name", "dynamic_content", "publish_date", "create_time")
|
|
|
+ .createOrReplaceTempView("t2")
|
|
|
+
|
|
|
+ sql(
|
|
|
+ s"""
|
|
|
+ |insert into table $ads_yg_bg partition (ds=$adsListDs)
|
|
|
+ |select
|
|
|
+ |'1' as flag,
|
|
|
+ |rowkey,title,plaintiff,litigant,company_name,company_id,label,business_id,business_type,business_type_name,dynamic_content,
|
|
|
+ |publish_date,create_time
|
|
|
+ |from t2
|
|
|
+ |""".stripMargin)
|
|
|
+
|
|
|
+
|
|
|
+ //地址表
|
|
|
+ sql(
|
|
|
+ s"""
|
|
|
+ |SELECT d.*
|
|
|
+ | ,bg_name
|
|
|
+ | ,bg_cid
|
|
|
+ | ,bg_reg_status
|
|
|
+ | ,bg_province_code
|
|
|
+ | ,bg_city_code
|
|
|
+ | ,bg_county_code
|
|
|
+ | ,bg_reg_location
|
|
|
+ | ,bg_estiblish_time
|
|
|
+ | ,bg_category_code
|
|
|
+ | ,bg_reg_capital
|
|
|
+ | ,bg_phones
|
|
|
+ | ,bg_emails
|
|
|
+ | ,yg_name
|
|
|
+ | ,yg_cid
|
|
|
+ | ,yg_reg_status
|
|
|
+ | ,yg_province_code
|
|
|
+ | ,yg_city_code
|
|
|
+ | ,yg_county_code
|
|
|
+ | ,yg_reg_location
|
|
|
+ | ,yg_estiblish_time
|
|
|
+ | ,yg_category_code
|
|
|
+ | ,yg_reg_capital
|
|
|
+ | ,yg_phones
|
|
|
+ | ,yg_emails
|
|
|
+ | ,rowkey_business
|
|
|
+ |FROM (
|
|
|
+ | SELECT *
|
|
|
+ | FROM (
|
|
|
+ | SELECT *
|
|
|
+ | ,ROW_NUMBER() OVER (PARTITION BY plaintiff_name ORDER BY publish_date DESC) num
|
|
|
+ | ,md5(CLEANUP(concat_ws('',plaintiff,litigant,announcement_type,publish_date,case_no,plaintiff_name))) AS rowkey_business
|
|
|
+ | FROM $inc_ads_company_tb_list
|
|
|
+ | WHERE ds = $adsListDs
|
|
|
+ | AND announcement_type = '起诉状副本及开庭传票'
|
|
|
+ | AND LENGTH(plaintiff_name) > 4
|
|
|
+ | ) x
|
|
|
+ | WHERE num = 1
|
|
|
+ | ) d
|
|
|
+ |JOIN (
|
|
|
+ | SELECT *
|
|
|
+ | FROM $ads_eci_debtor_relation
|
|
|
+ | WHERE ds = $debtorRelationDs
|
|
|
+ | AND deleted = 0
|
|
|
+ | ) e
|
|
|
+ |ON cleanup(d.plaintiff_name) = cleanup(e.bg_name)
|
|
|
+ |""".stripMargin).createOrReplaceTempView("t3")
|
|
|
+
|
|
|
+ sql(
|
|
|
+ s"""
|
|
|
+ |insert into table $ads_address partition (ds=$adsListDs)
|
|
|
+ |SELECT
|
|
|
+ | '1' as flag
|
|
|
+ | ,rowkey_business
|
|
|
+ | ,address_type
|
|
|
+ | ,province_code
|
|
|
+ | ,city_code
|
|
|
+ | ,county_code
|
|
|
+ | ,business_type
|
|
|
+ | ,publish_date
|
|
|
+ | ,create_time
|
|
|
+ |FROM (
|
|
|
+ | SELECT *
|
|
|
+ | ,ROW_NUMBER() OVER (PARTITION BY rowkey_business,province_code,city_code,county_code,address_type ORDER BY publish_date DESC) num
|
|
|
+ | FROM (
|
|
|
+ | SELECT rowkey_business
|
|
|
+ | ,1 AS address_type
|
|
|
+ | ,bg_province_code AS province_code
|
|
|
+ | ,bg_city_code AS city_code
|
|
|
+ | ,bg_county_code AS county_code
|
|
|
+ | ,"7" AS business_type
|
|
|
+ | ,publish_date
|
|
|
+ | ,substr(from_unixtime(unix_timestamp()),1,10) AS create_time
|
|
|
+ | FROM t3
|
|
|
+ | UNION ALL
|
|
|
+ | SELECT rowkey_business
|
|
|
+ | ,0 AS address_type
|
|
|
+ | ,yg_province_code AS province_code
|
|
|
+ | ,yg_city_code AS city_code
|
|
|
+ | ,yg_county_code AS county_code
|
|
|
+ | ,"7" AS business_type
|
|
|
+ | ,publish_date
|
|
|
+ | ,substr(from_unixtime(unix_timestamp()),1,10) AS create_time
|
|
|
+ | FROM t3
|
|
|
+ | ) a
|
|
|
+ | ) b
|
|
|
+ |WHERE num = 1 AND trim(province_code) <> ''
|
|
|
+ |""".stripMargin)
|
|
|
+ }
|
|
|
+
|
|
|
+ def trans(iter: Iterator[Row]) = {
|
|
|
+ val restClient = getRestClient()
|
|
|
+ val df = iter.map(r => {
|
|
|
+ try {
|
|
|
+ import org.json4s.DefaultFormats
|
|
|
+ val rowkey_business = r.getAs[String]("rowkey_business") //案源机会主键
|
|
|
+ val title = "" //标题
|
|
|
+ val plaintiff = r.getAs[String]("plaintiff") //原告
|
|
|
+ val litigant = r.getAs[String]("litigant") //当事人
|
|
|
+ val litigant_name = r.getAs[String]("litigant_name") //被告企业
|
|
|
+ val label: String = Json(DefaultFormats).write(CourtAnnouncement(r, Seq("announcement_type", "publish_date"))) //标签列表
|
|
|
+ val business_id = r.getAs[String]("rowkey") //业务主键id
|
|
|
+ val business_type = "8" //动态类型
|
|
|
+ val business_type_name = "0" //动态类型name
|
|
|
+ val m1: Map[String, String] = queryCompany(restClient, litigant_name)
|
|
|
+ //动态变更内容
|
|
|
+ val m2: Map[String, String] = CourtAnnouncement(r, Seq("plaintiff",
|
|
|
+ "litigant", "announcement_type", "court_name", "publish_date", "content", "litigant_name"))
|
|
|
+ val dynamic_content = Json(DefaultFormats).write(m1 ++: m2)
|
|
|
+ val publish_date = r.getAs[String]("publish_date") //动态变更时间
|
|
|
+ val create_time = atMonthsBefore(0, "yyyy-MM-dd HH:mm:ss") //创建时间
|
|
|
+
|
|
|
+ val province_code = m1.getOrElse("province_code", "") //省code
|
|
|
+ val city_code = m1.getOrElse("city_code", "") //市code
|
|
|
+ val county_code = m1.getOrElse("county_code", "") //区code
|
|
|
+ val litigant_cid = m1.getOrElse("id", "") //企业cid
|
|
|
+ (rowkey_business, title, plaintiff, litigant, litigant_name, litigant_cid, label, business_id, business_type,
|
|
|
+ business_type_name, dynamic_content, publish_date, create_time, province_code, city_code, county_code)
|
|
|
+ } catch {
|
|
|
+ case e: Exception => {
|
|
|
+ logWarning(r.toString())
|
|
|
+ logError(e.getMessage, e)
|
|
|
+ null
|
|
|
+ }
|
|
|
+ }
|
|
|
+ })
|
|
|
+ df
|
|
|
+ }
|
|
|
+
|
|
|
+ def trans2(r: Row) = {
|
|
|
+ // try {
|
|
|
+ import org.json4s.DefaultFormats
|
|
|
+ val rowkey_business = r.getAs[String]("rowkey_business") //案源机会主键
|
|
|
+ val title = "" //标题
|
|
|
+ val plaintiff = r.getAs[String]("plaintiff") //原告
|
|
|
+ val litigant = r.getAs[String]("litigant") //当事人
|
|
|
+ val plaintiff_name = r.getAs[String]("plaintiff_name") //原告企业
|
|
|
+ val plaintiff_cid = r.getAs[String]("bg_cid") //原告企业
|
|
|
+ val label: String = Json(DefaultFormats).write(CourtAnnouncement(r, Seq("announcement_type", "publish_date"))) //标签列表
|
|
|
+ val business_id = r.getAs[String]("rowkey") //业务主键id
|
|
|
+ val business_type = "7" //动态类型
|
|
|
+ val business_type_name = "0" //动态类型name
|
|
|
+ //动态变更内容
|
|
|
+ val m2: Map[String, String] = CourtAnnouncement(r, Seq("plaintiff",
|
|
|
+ "litigant", "announcement_type", "court_name", "publish_date", "content", "litigant_name"))
|
|
|
+ val dynamic_content = Json(DefaultFormats).write(m2)
|
|
|
+ val publish_date = r.getAs[String]("publish_date") //动态变更时间
|
|
|
+ val create_time = atMonthsBefore(0, "yyyy-MM-dd HH:mm:ss") //创建时间
|
|
|
+
|
|
|
+ (rowkey_business, title, plaintiff, litigant, plaintiff_name, plaintiff_cid, label, business_id, business_type,
|
|
|
+ business_type_name, dynamic_content, publish_date, create_time)
|
|
|
+ // } catch {
|
|
|
+ // case e: Exception => {
|
|
|
+ // logWarning(r.toString())
|
|
|
+ // logError(e.getMessage, e)
|
|
|
+ // null
|
|
|
+ // }
|
|
|
+ // }
|
|
|
+ }
|
|
|
+
|
|
|
+ def regfun() = {
|
|
|
+ prepareFunctions(spark)
|
|
|
+ company_split()
|
|
|
+ }
|
|
|
+
|
|
|
+ def preCalc() = {
|
|
|
+
|
|
|
+ val inc_ods_company_tb = s"${project}.inc_ods_$tableName" //增量ods表
|
|
|
+ val inc_ads_company_tb = s"${project}.inc_ads_$tableName" //增量ads表
|
|
|
+ val inc_ads_company_tb_list = s"${project}.inc_ads_${tableName}_list" //增量ads_list表
|
|
|
+
|
|
|
+ val ads_company_tb = s"${project}.ads_$tableName" //存量ads表
|
|
|
+
|
|
|
+ //table字段
|
|
|
+ val columns: Seq[String] = spark.table(inc_ads_company_tb).schema.map(_.name).filter(s => {
|
|
|
+ !s.equals("ds") && !s.equals("rowkey")
|
|
|
+ })
|
|
|
+
|
|
|
+ //增量表ads最新分区
|
|
|
+ val lastDsIncAds = getPartion(inc_ads_company_tb, spark)
|
|
|
+
|
|
|
+ //存量表ads最新分区
|
|
|
+ val remainDs = getPartion(ads_company_tb, spark)
|
|
|
+
|
|
|
+ val list = sql(s"show partitions $inc_ods_company_tb").collect.toList.map(_.getString(0).split("=")(1))
|
|
|
+ //增量ods第一个分区
|
|
|
+ val firstDsIncOds = list.head
|
|
|
+ //增量ods最后一个分区//落表分区
|
|
|
+ val lastDsIncOds = list.last
|
|
|
+
|
|
|
+ //执行分区
|
|
|
+ var runDs = ""
|
|
|
+ //第一次run
|
|
|
+ if (StringUtils.isBlank(lastDsIncAds)) {
|
|
|
+ runDs = firstDsIncOds
|
|
|
+ } else { //非第一次分区时间 + 1天
|
|
|
+ runDs = BaseUtil.atDaysAfter(1, lastDsIncAds)
|
|
|
+ }
|
|
|
+
|
|
|
+ println(
|
|
|
+ s"""
|
|
|
+ |remainDs:$remainDs
|
|
|
+ |lastDsIncOds:$lastDsIncOds
|
|
|
+ |lastDsIncAds:$lastDsIncAds
|
|
|
+ |runDs:$runDs
|
|
|
+ |firstDsIncOds:$firstDsIncOds
|
|
|
+ |""".stripMargin)
|
|
|
+
|
|
|
+ //增量去重解析case_no
|
|
|
+ sql(
|
|
|
+ s"""
|
|
|
+ |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $inc_ads_company_tb PARTITION(ds =$lastDsIncOds)
|
|
|
+ |SELECT md5(CLEANUP(concat_ws('',plaintiff,litigant,announcement_type,publish_date,case_no))) AS rowkey
|
|
|
+ | ,${columns.mkString(",")}
|
|
|
+ |FROM (
|
|
|
+ | SELECT ROW_NUMBER() OVER(PARTITION BY CLEANUP(concat_ws('',replace_char(plaintiff),replace_char(litigant),coalesce(announcement_type,''),coalesce(publish_date,''),case_no(content))) ORDER BY update_time DESC) num
|
|
|
+ | ,id
|
|
|
+ | ,cids
|
|
|
+ | ,bltn_no
|
|
|
+ | ,bltn_status
|
|
|
+ | ,announcement_type_code
|
|
|
+ | ,coalesce(announcement_type,'') AS announcement_type
|
|
|
+ | ,case_no(content) AS case_no
|
|
|
+ | ,content
|
|
|
+ | ,court_name
|
|
|
+ | ,deal_grade
|
|
|
+ | ,judge
|
|
|
+ | ,judge_phone
|
|
|
+ | ,replace_char(plaintiff) AS plaintiff
|
|
|
+ | ,replace_char(litigant) AS litigant
|
|
|
+ | ,province
|
|
|
+ | ,publish_date
|
|
|
+ | ,publish_page
|
|
|
+ | ,appro_type
|
|
|
+ | ,source
|
|
|
+ | ,create_time
|
|
|
+ | ,update_time
|
|
|
+ | ,deleted
|
|
|
+ | FROM $inc_ods_company_tb a
|
|
|
+ | WHERE ds >= $runDs
|
|
|
+ | AND trim(litigant) <> ''
|
|
|
+ | AND trim(plaintiff) <> ''
|
|
|
+ | AND litigant IS NOT NULL
|
|
|
+ | AND plaintiff IS NOT NULL
|
|
|
+ | ) x
|
|
|
+ |WHERE num = 1
|
|
|
+ |""".stripMargin)
|
|
|
+
|
|
|
+ //找出增量数据
|
|
|
+ sql(
|
|
|
+ s"""
|
|
|
+ |SELECT a.*
|
|
|
+ |FROM (
|
|
|
+ | SELECT *
|
|
|
+ | FROM $inc_ads_company_tb
|
|
|
+ | WHERE ds>=$lastDsIncOds
|
|
|
+ | ) a
|
|
|
+ |LEFT JOIN (
|
|
|
+ | SELECT rowkey
|
|
|
+ | FROM (
|
|
|
+ | SELECT *
|
|
|
+ | FROM $ads_company_tb
|
|
|
+ | WHERE ds = '$remainDs'
|
|
|
+ | UNION ALL
|
|
|
+ | SELECT *
|
|
|
+ | FROM $inc_ads_company_tb
|
|
|
+ | WHERE ds < $lastDsIncOds
|
|
|
+ | ) c
|
|
|
+ | GROUP BY rowkey
|
|
|
+ | ) b
|
|
|
+ |ON a.rowkey = b.rowkey
|
|
|
+ |WHERE b.rowkey IS NULL
|
|
|
+ |""".stripMargin).createOrReplaceTempView("incr")
|
|
|
+
|
|
|
+ sql(
|
|
|
+ s"""
|
|
|
+ |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $inc_ads_company_tb_list PARTITION (ds =$lastDsIncOds)
|
|
|
+ |SELECT
|
|
|
+ | rowkey
|
|
|
+ | ,plaintiff_name
|
|
|
+ | ,litigant_name
|
|
|
+ | ,${columns.mkString(",")}
|
|
|
+ | from (
|
|
|
+ |SELECT rowkey
|
|
|
+ | ,plaintiff_name
|
|
|
+ | ,litigant_name
|
|
|
+ | ,${columns.mkString(",")}
|
|
|
+ |FROM incr a
|
|
|
+ |LATERAL VIEW explode(company_split(litigant)) b AS litigant_name
|
|
|
+ |LATERAL VIEW explode(company_split(plaintiff)) c AS plaintiff_name
|
|
|
+ |WHERE trim(b.litigant_name) <> '' and trim(c.plaintiff_name) <> ''
|
|
|
+ |)c
|
|
|
+ |""".stripMargin)
|
|
|
+ }
|
|
|
+
|
|
|
+ def queryCompany(restClient: RestClient, companyName: String) = {
|
|
|
+ val query =
|
|
|
+ s"""
|
|
|
+ |{
|
|
|
+ | "_source": {
|
|
|
+ | "includes": [ "_id","province_code", "city_code","county_code","reg_capital","estiblish_time","phones"]
|
|
|
+ | },
|
|
|
+ | "query": {
|
|
|
+ | "term": {
|
|
|
+ | "cname.value.keyword": "${BaseUtil.cleanup(companyName)}"
|
|
|
+ | }
|
|
|
+ | }
|
|
|
+ |}
|
|
|
+ |""".stripMargin
|
|
|
+ val entity = new NStringEntity(query, ContentType.APPLICATION_JSON)
|
|
|
+
|
|
|
+ val indexResponse = restClient.performRequest(
|
|
|
+ "GET",
|
|
|
+ "/winhc-company/company/_search",
|
|
|
+ Collections.emptyMap[String, String](),
|
|
|
+ entity)
|
|
|
+
|
|
|
+ val en = indexResponse.getEntity
|
|
|
+ val res = EntityUtils.toString(en)
|
|
|
+ val list = getIndexResult(res)
|
|
|
+ if (list.nonEmpty) {
|
|
|
+ val id = list.head("_id").asInstanceOf[String]
|
|
|
+ val source = list.head("_source").asInstanceOf[Map[String, Any]]
|
|
|
+ val province_code = source("province_code").asInstanceOf[String]
|
|
|
+ val city_code = source("city_code").asInstanceOf[String]
|
|
|
+ val county_code = source("county_code").asInstanceOf[String]
|
|
|
+ val reg_capital = source("reg_capital").asInstanceOf[String]
|
|
|
+ val estiblish_time = source("estiblish_time").asInstanceOf[String]
|
|
|
+ val phones = source("phones").asInstanceOf[List[String]].mkString(",")
|
|
|
+ Map(
|
|
|
+ "id" -> id,
|
|
|
+ "province_code" -> province_code,
|
|
|
+ "city_code" -> city_code,
|
|
|
+ "county_code" -> county_code,
|
|
|
+ "reg_capital" -> reg_capital,
|
|
|
+ "estiblish_time" -> estiblish_time,
|
|
|
+ "phones" -> phones
|
|
|
+ )
|
|
|
+ } else {
|
|
|
+ Map.empty[String, String]
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+}
|
|
|
+
|
|
|
+object CompanyCourtAnnouncement {
|
|
|
+ def main(args: Array[String]): Unit = {
|
|
|
+
|
|
|
+ val config = mutable.Map(
|
|
|
+ "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
|
|
|
+ "spark.hadoop.odps.spark.local.partition.amt" -> "100"
|
|
|
+ )
|
|
|
+ val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
|
|
|
+
|
|
|
+ val project = "winhc_eci_dev"
|
|
|
+ val table = "company_court_announcement"
|
|
|
+
|
|
|
+ val announcement = CompanyCourtAnnouncement(spark, project, table)
|
|
|
+ announcement.regfun()
|
|
|
+ announcement.preCalc()
|
|
|
+ announcement.calc()
|
|
|
+ spark.stop()
|
|
|
+ }
|
|
|
+
|
|
|
+}
|
|
|
+
|
|
|
+
|
|
|
+
|