Explorar o código

关系提取,change初始化

xufei %!s(int64=4) %!d(string=hai) anos
pai
achega
abfa2d0081

+ 203 - 0
src/main/scala/com/winhc/bigdata/spark/ng/change/NgChangeExtract.scala

@@ -0,0 +1,203 @@
+package com.winhc.bigdata.spark.ng.change
+
+import com.winhc.bigdata.spark.config.EsConfig
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
+import com.winhc.bigdata.spark.utils._
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.types.{MapType, StringType, StructField, StructType}
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+
+/**
+ * @Description: 筛选出数据的具体变更
+ */
+object NgChangeExtract {
+
+  //判断两个map在指定key上是否相等,如不等反回不相等字段
+  def getDoubleDataMap(iterable: Iterable[Map[String, String]]): (Map[String, String], Map[String, String]) = {
+    val map = iterable.map(m => (m("change_flag"), m)).toMap
+    (map.getOrElse("0", null), map.getOrElse("1", null))
+  }
+
+  case class ChangeExtractHandle(s: SparkSession,
+                                 project: String, //表所在工程名
+                                 tableName1: String, //表名(不加前后辍)
+                                 primaryKey: String, //此维度主键
+                                 inc_ds: String, //需要计算的分区
+                                 primaryFields: Seq[String] //主要字段,该字段任意一个不同 则认为发生变化
+                                ) extends LoggingUtils with Logging {
+    @(transient@getter) val spark: SparkSession = s
+
+    val target_eci_change_extract = "ads_change_extract"
+
+    val updateTimeMapping = Map(
+      "wenshu_detail_combine" -> "update_date", //文书排序时间
+      "company_equity_info_list" -> "reg_date" //文书排序时间
+    )
+    //不同name映射table
+    val tabMapping =
+      Map("company_holder_v2" -> "company_holder" //胜诉案件
+      )
+
+    //转换字段
+    def trans(s: String): String = {
+      var res = s
+      if (tabMapping.contains(s)) {
+        res = tabMapping(s)
+      }
+      res
+    }
+
+    def calc(): Unit = {
+      val tableName = trans(tableName1)
+      val cols = primaryFields.filter(!_.equals(primaryKey)).seq
+
+      val ds = inc_ds.replace("-", "")
+
+      val intersectCols = getColumns(s"$project.ads_$tableName").toSet & getColumns(s"$project.inc_ads_$tableName").toSet
+
+      val otherAllCols = intersectCols.filter(!primaryKey.equals(_)).toSeq
+      val all_cols = primaryKey +: otherAllCols :+ "change_flag"
+
+      val lastDs_ads_all = getLastPartitionsOrElse(s"$project.ads_$tableName", "0")
+
+      val handle = ReflectUtils.getClazz[NgCompanyChangeHandle](s"com.winhc.bigdata.spark.ng.change.table.$tableName1", cols)
+
+      val update_time = BaseUtil.nowDate()
+
+      val df = sql(
+        s"""
+           |SELECT  $primaryKey,${otherAllCols.mkString(",")},'0' as change_flag
+           |FROM    $project.inc_ads_$tableName
+           |WHERE   ds = $ds
+           |UNION ALL
+           |SELECT  t2.$primaryKey,${otherAllCols.map("t2." + _).mkString(",")},'1' as change_flag
+           |FROM    (
+           |            SELECT  DISTINCT ${primaryKey}
+           |            FROM    $project.inc_ads_$tableName
+           |            WHERE   ds = $ds
+           |        ) AS t1
+           |JOIN    (
+           |             SELECT  tmp.*
+           |             FROM    (
+           |                         SELECT  a.*
+           |                                 ,row_number() OVER (PARTITION BY a.${primaryKey} ORDER BY ds DESC, ${updateTimeMapping.getOrElse(tableName, "update_time")} DESC) c
+           |                         FROM    (
+           |                                     SELECT  ${intersectCols.mkString(",")},ds
+           |                                     FROM    $project.ads_$tableName
+           |                                     WHERE   ds = $lastDs_ads_all
+           |                                     UNION ALL
+           |                                     SELECT  ${intersectCols.mkString(",")},ds
+           |                                     FROM    $project.inc_ads_$tableName
+           |                                     WHERE   ds > $lastDs_ads_all and ds < $ds
+           |                                 ) AS a
+           |                     ) AS tmp
+           |             WHERE   tmp.c = 1
+           |        ) AS t2
+           |ON      t1.${primaryKey} = t2.${primaryKey}
+           |""".stripMargin)
+
+
+      val rdd =
+        df.select(all_cols.map(column => col(column).cast("string")): _*)
+          .rdd.map(r => {
+          (r.getAs[String](primaryKey), all_cols.map(f => (f, r.getAs[String](f))).toMap)
+        }).groupByKey()
+          .map(x => {
+            val rowkey = x._1
+            val map_list = x._2
+            val m = getDoubleDataMap(map_list)
+
+            val new_map = m._1
+            val old_map = m._2
+            if (new_map == null && old_map == null) {
+              null
+            } else if (old_map == null) {
+              val res = handle.handle(rowkey, null, map_list.head)
+              if (res == null) {
+                null
+              } else {
+                Row(res._1, res._2, tableName, res._3, res._4, res._5, res._6, res._7, res._8, update_time, res._9)
+              }
+            } else if (new_map == null) {
+              null
+            } else {
+              val res = handle.handle(rowkey, old_map, new_map)
+              if (res == null) {
+                null
+              } else {
+                Row(res._1, res._2, tableName, res._3, res._4, res._5, res._6, res._7, res._8, update_time, res._9)
+              }
+            }
+          }).filter(_ != null)
+
+      val schema = StructType(Array(
+        StructField("rowkey", StringType), //表数据主建
+        StructField("company_id", StringType), //公司id
+        StructField("table_name", StringType), //表名
+        StructField("type", StringType), // 变更类型 insert update
+        StructField("data", MapType(StringType, StringType)), //变更后数据
+        StructField("fields", StringType), //如果是更新 则显示更新字段
+        StructField("title", StringType), // 动态数据展示 ps. 新增某土地公示
+        StructField("label", StringType), // 1.一般变更,2.风险变更
+        StructField("biz_time", StringType), //业务时间
+        StructField("update_time", StringType), //处理时间
+        StructField("old_data", MapType(StringType, StringType)) //变更前数据
+      ))
+
+      spark.createDataFrame(rdd, schema)
+        .createOrReplaceTempView(s"tmp_change_extract_view_$tableName1")
+
+      sql(
+        s"""
+           |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE ${project}.$target_eci_change_extract PARTITION(ds='$ds',tn='$tableName1')
+           |SELECT *
+           |FROM
+           |    tmp_change_extract_view_$tableName1
+           |""".stripMargin)
+    }
+  }
+
+
+  private val startArgs = Seq(
+    Args(tableName = "company_holder", primaryFields = "percent,deleted")
+    , Args(tableName = "company_staff", primaryFields = "staff_type,deleted")
+    , Args(tableName = "company", primaryKey ="company_id", primaryFields = "name,cate_third_code,county_code,reg_capital_amount")
+  )
+
+
+  private case class Args(project: String = "winhc_ng"
+                          , tableName: String
+                          , primaryKey: String = "rowkey"
+                          , primaryFields: String)
+
+
+  def main(args: Array[String]): Unit = {
+    val Array(tableName, inc_ds) = args
+
+    val config = EsConfig.getEsConfigMap ++ mutable.Map(
+      "spark.hadoop.odps.project.name" -> "winhc_ng",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "100"
+    )
+    val spark = SparkUtils.InitEnv("NgChangeExtract", config)
+
+    var start = startArgs
+    if (!tableName.equals("all")) {
+      val set = tableName.split(",").toSet
+      start = start.filter(a => set.contains(a.tableName))
+    }
+
+    val a = start.map(e => (e.tableName, () => {
+      ChangeExtractHandle(spark, e.project, e.tableName, e.primaryKey, inc_ds, e.primaryFields.split(",")).calc()
+      true
+    }))
+
+    AsyncExtract.startAndWait(spark, a)
+
+    spark.stop()
+  }
+
+}

+ 75 - 0
src/main/scala/com/winhc/bigdata/spark/ng/change/NgCompanyChangeHandle.scala

@@ -0,0 +1,75 @@
+package com.winhc.bigdata.spark.ng.change
+
+import com.winhc.bigdata.spark.utils.BaseUtil.cleanup
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.internal.Logging
+
+import scala.annotation.meta.{getter, setter}
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/7/9 16:44
+ * @Description:
+ */
+
+trait NgCompanyChangeHandle extends Serializable with Logging {
+  @getter
+  @setter
+  protected val equCols: Seq[String]
+
+  /**
+   *
+   * @param rowkey
+   * @param oldMap
+   * @param newMap
+   * @return rowkey,cid,类型【insert or update】,新数据,更新字段,更新标题,变更标签【1.一般变更,2.风险变更 ...】,业务时间
+   */
+  def handle(rowkey: String, oldMap: Map[String, String], newMap: Map[String, String]): (String, String, String, Map[String, String], String, String, String, String, Map[String, String]) = {
+    if(getBizTime(newMap)==null){
+      return null
+    }
+    if (oldMap == null) {
+      (rowkey, getCompanyId(rowkey, newMap), "insert", newMap, null, getInsertTitle(newMap), getLabel(oldMap, newMap), getBizTime(newMap), null)
+    } else {
+      val t = getEquAndFields(oldMap, newMap)
+      if (t._1) {
+        null
+      } else {
+        (rowkey, getCompanyId(rowkey, newMap), "update", newMap,
+          t._2
+          , getUpdateTitle(newMap), getLabel(oldMap, newMap), getBizTime(newMap), oldMap)
+      }
+    }
+  }
+
+  def getCompanyId(rowkey: String, newMap: Map[String, String]): String = rowkey.split("_")(0)
+
+  def getUpdateTitle(newMap: Map[String, String]): String
+
+  def getInsertTitle(newMap: Map[String, String]): String
+
+  def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String
+
+  def getBizTime(newMap: Map[String, String]): String
+
+  def getEquAndFields(oldMap: Map[String, String], newMap: Map[String, String]): (Boolean, String) = {
+    val tmp = equCols.map(f => {
+      (f, cleanup(newMap(f)).equals(cleanup(oldMap(f))))
+    })
+    val eq = tmp.map(_._2).reduce((a1, a2) => a1 && a2)
+    if (eq) {
+      (true, null)
+    } else {
+      (eq, tmp.filter(!_._2).map(_._1).mkString(","))
+    }
+  }
+
+
+  protected def getValueOrNull(value: String, callBack: String): String = {
+    if (StringUtils.isNotBlank(value)) {
+      callBack
+    } else {
+      null
+    }
+  }
+}

+ 26 - 0
src/main/scala/com/winhc/bigdata/spark/ng/change/table/company.scala

@@ -0,0 +1,26 @@
+
+package com.winhc.bigdata.spark.ng.change.table
+
+import com.winhc.bigdata.spark.ng.change.NgCompanyChangeHandle
+
+/**
+ * @Author: π
+ * @Date: 2020/8/11
+ * @Description:公司基本信息
+ */
+
+case class company(equCols: Seq[String]) extends NgCompanyChangeHandle with Serializable  {
+  override def getUpdateTitle(newMap: Map[String, String]): String = ""
+
+  override def getInsertTitle(newMap: Map[String, String]): String = ""
+
+  override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ""
+
+  override def getBizTime(newMap: Map[String, String]): String = {
+    newMap("update_time")
+  }
+
+  override def getCompanyId(rowkey: String, newMap: Map[String, String]): String ={
+    rowkey
+  }
+}

+ 24 - 0
src/main/scala/com/winhc/bigdata/spark/ng/change/table/company_holder.scala

@@ -0,0 +1,24 @@
+
+package com.winhc.bigdata.spark.ng.change.table
+
+import com.winhc.bigdata.spark.jobs.chance.CompanyChangeHandle
+import com.winhc.bigdata.spark.ng.change.NgCompanyChangeHandle
+import com.winhc.bigdata.spark.utils.{ChangeExtractUtils, DateUtils}
+
+/**
+ * @Author: π
+ * @Date: 2020/8/11
+ * @Description:股东
+ */
+
+case class company_holder(equCols: Seq[String]) extends NgCompanyChangeHandle with Serializable  {
+  override def getUpdateTitle(newMap: Map[String, String]): String = ""
+
+  override def getInsertTitle(newMap: Map[String, String]): String = ""
+
+  override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ""
+
+  override def getBizTime(newMap: Map[String, String]): String = {
+    newMap("update_time")
+  }
+}

+ 22 - 0
src/main/scala/com/winhc/bigdata/spark/ng/change/table/company_staff.scala

@@ -0,0 +1,22 @@
+
+package com.winhc.bigdata.spark.ng.change.table
+
+import com.winhc.bigdata.spark.ng.change.NgCompanyChangeHandle
+
+/**
+ * @Author: π
+ * @Date: 2020/8/11
+ * @Description:主要成员
+ */
+
+case class company_staff(equCols: Seq[String]) extends NgCompanyChangeHandle with Serializable  {
+  override def getUpdateTitle(newMap: Map[String, String]): String = ""
+
+  override def getInsertTitle(newMap: Map[String, String]): String = ""
+
+  override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ""
+
+  override def getBizTime(newMap: Map[String, String]): String = {
+    newMap("update_time")
+  }
+}

+ 446 - 0
src/main/scala/com/winhc/bigdata/spark/ng/relation/inc_company_relation.scala

@@ -0,0 +1,446 @@
+package com.winhc.bigdata.spark.ng.relation
+
+import com.winhc.bigdata.spark.config.EsConfig
+import com.winhc.bigdata.spark.udf.{BaseFunc, CompanyMapping}
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
+import com.winhc.bigdata.spark.utils.{CompanyIndexUtils, CompanyRelationUtils, LoggingUtils, SparkUtils}
+import org.apache.spark.sql.SparkSession
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+
+/**
+ * @author: π
+ * @date: 2021/01/10 09:36
+ */
+case class inc_company_relation(s: SparkSession,
+                                project: String, //表所在工程名
+                                ds: String //分区
+                               ) extends LoggingUtils with BaseFunc with CompanyMapping {
+  @(transient@getter) val spark: SparkSession = s
+
+  val inc_ads_company = "winhc_ng.inc_ads_company"
+  val ads_company = "winhc_ng.ads_company"
+  val inc_ads_company_legal_entity = "winhc_ng.inc_ads_company_legal_entity"
+  val inc_ads_company_holder = "winhc_ng.inc_ads_company_holder"
+  val inc_ads_company_human_relation = "winhc_ng.inc_ads_company_human_relation"
+  val inc_ads_company_staff = "winhc_ng.inc_ads_company_staff"
+
+  val inc_ads_company_node = "winhc_ng.inc_ads_company_node"
+  val inc_ads_relation_holder = "winhc_ng.inc_ads_relation_holder"
+  val inc_ads_relation_staff = "winhc_ng.inc_ads_relation_staff"
+  val inc_ads_relation_legal_entity = "winhc_ng.inc_ads_relation_legal_entity"
+
+  def register_fun(): Unit = {
+    prepareFunctions(spark)
+
+    def get_company_node(id: String, name: String, deleted: String, topic_type: String): String = CompanyRelationUtils.get_company_node(id, name, deleted, topic_type)
+
+    def get_relation_holder(start_id: String, start_name: String, end_id: String,
+                            end_name: String, percent: Double, deleted: Int, holder_type: Int, topic_type: String): String =
+      CompanyRelationUtils.get_relation_holder(start_id, start_name, end_id, end_name, percent, deleted, holder_type, topic_type)
+
+    def get_relation_staff(start_id: String, start_name: String, end_id: String,
+                           end_name: String, staff_type: String, deleted: Int, topic_type: String): String =
+      CompanyRelationUtils.get_relation_staff(start_id, start_name, end_id, end_name, staff_type, deleted, topic_type)
+
+    def get_relation_legal_entity(start_id: String, start_name: String, end_id: String,
+                                  end_name: String, deleted: Int, legal_entity_type: String, topic_type: String): String =
+      CompanyRelationUtils.get_relation_legal_entity(start_id, start_name, end_id, end_name, deleted, legal_entity_type, topic_type)
+
+
+    spark.udf.register("get_company_node", get_company_node _)
+    spark.udf.register("get_relation_holder", get_relation_holder _)
+    spark.udf.register("get_relation_staff", get_relation_staff _)
+    spark.udf.register("get_relation_legal_entity", get_relation_legal_entity _)
+  }
+
+  def inc(): Unit = {
+
+    //公司节点
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $inc_ads_company_node PARTITION(ds = '$ds')
+         |SELECT  a.company_id id
+         |        ,name_cleanup(a.name) name
+         |        ,'企业'AS LABEL
+         |        ,'0' deleted
+         |FROM    (
+         |            SELECT  company_id
+         |                    ,name
+         |            FROM    $inc_ads_company
+         |            WHERE   ds = '$ds' AND deleted <> 9
+         |        ) a
+         |LEFT JOIN (
+         |              SELECT  company_id,name
+         |              FROM    (
+         |                          SELECT  company_id,name
+         |                                  ,ROW_NUMBER() OVER(PARTITION BY company_id,name ORDER BY ds DESC,update_time DESC) num
+         |                          FROM    (
+         |                                      SELECT  company_id
+         |                                              ,name
+         |                                              ,update_time
+         |                                              ,ds
+         |                                      FROM    $inc_ads_company
+         |                                      WHERE   ds < '$ds' AND deleted <> 9
+         |                                      UNION ALL
+         |                                      SELECT  company_id
+         |                                              ,name
+         |                                              ,update_time
+         |                                              ,ds
+         |                                      FROM    $ads_company
+         |                                      WHERE   ds > '0' AND deleted <> 9
+         |                                  )
+         |                      )
+         |              WHERE   num = 1
+         |          ) b
+         |ON      a.company_id = b.company_id
+         |AND     a.name = b.name
+         |WHERE   b.company_id IS NULL
+         |""".stripMargin)
+
+    //增量法人表(新增,移除法人)
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $inc_ads_company_legal_entity PARTITION(ds= '$ds')
+         |SELECT  c.*
+         |FROM    (
+         |            SELECT  CONCAT_WS('_',company_id,hash(legal_entity_name)) AS rowkey
+         |                    ,company_id
+         |                    ,name AS company_name
+         |                    ,legal_entity_id
+         |                    ,legal_entity_name
+         |                    ,legal_entity_type
+         |                    ,create_time
+         |                    ,update_time
+         |                    ,0 AS deleted
+         |            FROM    $inc_ads_company
+         |            WHERE   ds = '$ds'
+         |            AND     legal_entity_id IS NOT NULL
+         |            AND     length(trim(legal_entity_name)) > 0
+         |            AND     deleted <> 9
+         |        ) c
+         |LEFT JOIN (
+         |              SELECT  company_id
+         |                      ,legal_entity_name
+         |              FROM    (
+         |                          SELECT  company_id
+         |                                  ,legal_entity_name
+         |                                  ,ROW_NUMBER() OVER(PARTITION BY company_id,legal_entity_name ORDER BY ds DESC,update_time DESC) num
+         |                          FROM    $inc_ads_company_legal_entity
+         |                          WHERE   ds < '$ds'
+         |                      )
+         |              WHERE   num = 1
+         |          ) d
+         |ON      c.company_id = d.company_id
+         |AND     c.legal_entity_name = d.legal_entity_name
+         |WHERE   d.company_id IS NULL
+         |UNION ALL
+         |SELECT  b.*
+         |FROM    (
+         |            SELECT  company_id
+         |                    ,legal_entity_name
+         |            FROM    (
+         |                        SELECT  company_id
+         |                                ,legal_entity_name
+         |                                ,ROW_NUMBER() OVER(PARTITION BY company_id ORDER BY ds DESC,update_time DESC) num
+         |                        FROM    $inc_ads_company
+         |                        WHERE   ds = '$ds'
+         |                        AND     legal_entity_id IS NOT NULL
+         |                        AND     length(trim(legal_entity_name)) > 0
+         |                        AND     deleted <> 9
+         |                    )
+         |            WHERE   num = 1
+         |        ) a
+         |JOIN    (
+         |            SELECT  rowkey
+         |                    ,company_id
+         |                    ,company_name
+         |                    ,legal_entity_id
+         |                    ,legal_entity_name
+         |                    ,legal_entity_type
+         |                    ,create_time
+         |                    ,update_time
+         |                    ,deleted
+         |            FROM    (
+         |                        SELECT  rowkey
+         |                                ,company_id
+         |                                ,company_name
+         |                                ,legal_entity_id
+         |                                ,legal_entity_name
+         |                                ,legal_entity_type
+         |                                ,create_time
+         |                                ,update_time
+         |                                ,1 AS deleted
+         |                                ,ROW_NUMBER() OVER(PARTITION BY company_id ORDER BY ds DESC,update_time DESC) num
+         |                        FROM    $inc_ads_company_legal_entity
+         |                        WHERE   ds <  '$ds'
+         |                    )
+         |            WHERE   num = 1
+         |        ) b
+         |ON      a.company_id = b.company_id
+         |AND     a.legal_entity_name <> b.legal_entity_name
+         |""".stripMargin)
+
+    //历史人员去重缓存
+    sql(
+      s"""
+         |SELECT  company_id,human_name,human_pid
+         |FROM    (
+         |          SELECT  company_id,human_name,human_pid
+         |                            ,ROW_NUMBER() OVER (PARTITION BY company_id,human_name ORDER BY ds desc,update_time desc) num
+         |           FROM    $inc_ads_company_human_relation
+         |           WHERE   ds < '$ds'
+         |          )
+         |WHERE   num = 1
+         |""".stripMargin).createOrReplaceTempView("company_human_relation_old")
+
+    //新增人员关系表
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $inc_ads_company_human_relation PARTITION(ds= '$ds')
+         |SELECT  CONCAT_WS('_',a.company_id,hash(a.human_name)) AS rowkey
+         |        ,a.company_id
+         |        ,a.company_name
+         |        ,a.human_name
+         |        ,hash(a.human_name) AS hid
+         |        ,concat('p',md5(uuid())) human_pid
+         |        ,a.STATUS
+         |        ,a.create_time
+         |        ,a.update_time
+         |        ,a.deleted
+         |FROM    (
+         |            SELECT  *
+         |            FROM    (
+         |                        SELECT  company_id,company_name,human_name,hid,status,create_time,update_time,0 as deleted
+         |                                ,ROW_NUMBER() OVER (PARTITION BY company_id,human_name ORDER BY ds desc,update_time desc) num
+         |                        FROM    (
+         |                                    --股东
+         |                                    SELECT  company_id,company_name,holder_name human_name,holder_id hid,2 as status,create_time,update_time,0 as deleted,ds
+         |                                    FROM    $inc_ads_company_holder
+         |                                    WHERE   ds = '$ds'
+         |                                    UNION ALL
+         |                                    --主要成员
+         |                                    SELECT  company_id,company_name,staff_name human_name,hid,2 as status,create_time,update_time,0 as deleted,ds
+         |                                    FROM    $inc_ads_company_staff
+         |                                    WHERE   ds = '$ds'
+         |                                    UNION ALL
+         |                                    --法人
+         |                                    SELECT  company_id,company_name,legal_entity_name human_name,legal_entity_id as hid,2 as status,create_time,update_time,0 as deleted,ds
+         |                                    FROM    $inc_ads_company_legal_entity
+         |                                    WHERE   ds = '$ds'
+         |                                )
+         |                    )
+         |            WHERE   num = 1
+         |        ) a
+         |LEFT JOIN (
+         |SELECT * FROM company_human_relation_old
+         |) b
+         |ON      a.company_id = b.company_id
+         |AND     a.human_name = b.human_name
+         |WHERE   b.company_id IS NULL
+         |""".stripMargin)
+
+    //股东关系
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"}  table $inc_ads_relation_holder  PARTITION (ds= '$ds')
+         |SELECT
+         |start_id,start_name,end_id,end_name,percent,deleted,holder_type,label
+         |FROM (
+         |SELECT coalesce(c.human_pid,'') start_id,name_cleanup(c.human_name) start_name,a.company_id end_id,name_cleanup(a.company_name) end_name,percent,deleted,holder_type,label
+         |from (
+         |SELECT * ,
+         |    '投资' as LABEL
+         | FROM    $inc_ads_company_holder
+         | WHERE   ds = '$ds'
+         |and holder_type = 1
+         |)a
+         |JOIN
+         |(
+         |SELECT * FROM company_human_relation_old
+         |) c
+         |ON a.company_id = c.company_id and a.holder_name = c.human_name
+         |UNION ALL
+         |SELECT holder_id start_id,name_cleanup(holder_name) start_name,a.company_id end_id,name_cleanup(company_name) end_name,percent,deleted,holder_type,label
+         |from (
+         |SELECT * ,
+         |    '投资' as LABEL
+         | FROM    $inc_ads_company_holder
+         |WHERE   ds = '$ds'
+         |and holder_type = 2
+         |)a
+         |)
+         |where start_id <> end_id and start_id is not null and end_id is not null
+         |""".stripMargin)
+
+    //主要成员关系
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"}  table $inc_ads_relation_staff  PARTITION (ds= '$ds')
+         |SELECT
+         |start_id,start_name,end_id,end_name,staff_type,deleted,label
+         |FROM (
+         |SELECT c.human_pid start_id,name_cleanup(c.human_name) start_name,a.company_id end_id,name_cleanup(a.company_name) end_name,name_cleanup(staff_type) staff_type,deleted,label
+         |from (
+         |SELECT * ,
+         |    '高管' as LABEL
+         | FROM    $inc_ads_company_staff
+         |WHERE   ds = '$ds'
+         |)a
+         |JOIN
+         |(
+         |SELECT * FROM company_human_relation_old
+         |) c
+         |ON a.company_id = c.company_id and a.staff_name   = c.human_name
+         |)
+         |where start_id <> end_id and start_id is not null and end_id is not null
+         |""".stripMargin)
+
+    //法人关系
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} table $inc_ads_relation_legal_entity   PARTITION (ds='$ds')
+         |SELECT
+         |start_id,start_name,end_id,end_name,deleted,legal_entity_type,label
+         |FROM (
+         |SELECT c.human_pid start_id,name_cleanup(c.human_name) start_name,a.company_id end_id,name_cleanup(a.company_name) end_name,deleted,legal_entity_type,label
+         |from (
+         |SELECT * ,
+         |    '法人' as LABEL
+         | FROM    $inc_ads_company_legal_entity
+         |WHERE   ds = '$ds'
+         |and legal_entity_type = 1
+         |)a
+         |JOIN
+         |(
+         |SELECT * FROM company_human_relation_old
+         |) c
+         |ON a.company_id = c.company_id and a.legal_entity_name = c.human_name
+         |UNION ALL
+         |SELECT legal_entity_id start_id,name_cleanup(legal_entity_name) start_name,a.company_id end_id,name_cleanup(company_name) end_name,deleted,legal_entity_type,label
+         |from (
+         |SELECT * ,
+         |    '法人' as LABEL
+         | FROM    $inc_ads_company_legal_entity
+         |WHERE   ds = '$ds'
+         |and legal_entity_type = 2
+         |)a
+         |)
+         |where start_id <> end_id and start_id is not null and end_id is not null
+         |""".stripMargin)
+
+  }
+
+  def sendKafkaPre(): Unit = {
+    val inc_ads_company_node_kafka = "winhc_ng.inc_ads_company_node_kafka"
+    val inc_ads_relation_holder_v1_kafka = "winhc_ng.inc_ads_relation_holder_v1_kafka"
+    val inc_ads_relation_holder_v2_kafka = "winhc_ng.inc_ads_relation_holder_v2_kafka"
+    val inc_ads_relation_staff_kafka = "winhc_ng.inc_ads_relation_staff_kafka"
+    val inc_ads_relation_legal_entity_v1_kafka = "winhc_ng.inc_ads_relation_legal_entity_v1_kafka"
+    val inc_ads_relation_legal_entity_v2_kafka = "winhc_ng.inc_ads_relation_legal_entity_v2_kafka"
+    //公司节点
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} table $inc_ads_company_node_kafka  PARTITION (ds='$ds')
+         |select
+         |id key,
+         |get_company_node(id, name, deleted, '1') message
+         |from $inc_ads_company_node
+         |where ds = '$ds'
+         |""".stripMargin).show(20, false)
+
+    //股东关系(人 -> 公司)
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} table $inc_ads_relation_holder_v1_kafka  PARTITION (ds='$ds')
+         |select
+         |end_id key,
+         |get_relation_holder(start_id, start_name, end_id, end_name, percent, deleted, holder_type, '2') message
+         |from $inc_ads_relation_holder
+         |where ds = '$ds'
+         |and holder_type = 1
+         |""".stripMargin).show(20, false)
+
+    //股东关系(公司 -> 公司)
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} table $inc_ads_relation_holder_v2_kafka  PARTITION (ds='$ds')
+         |select
+         |end_id key,
+         |get_relation_holder(start_id, start_name, end_id, end_name, percent, deleted, holder_type, '3') message
+         |from $inc_ads_relation_holder
+         |where ds = '$ds'
+         |and holder_type = 2
+         |""".stripMargin).show(20, false)
+
+    //法人关系(人 -> 公司)
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} table $inc_ads_relation_legal_entity_v1_kafka  PARTITION (ds='$ds')
+         |select
+         |end_id key,
+         |get_relation_legal_entity(start_id, start_name, end_id, end_name, deleted, legal_entity_type, '4') message
+         |from $inc_ads_relation_legal_entity
+         |where ds = '$ds'
+         |and legal_entity_type = '1'
+         |""".stripMargin).show(20, false)
+
+    //法人关系(公司 -> 公司)
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} table $inc_ads_relation_legal_entity_v2_kafka  PARTITION (ds='$ds')
+         |select
+         |end_id key,
+         |get_relation_legal_entity(start_id, start_name, end_id, end_name, deleted, legal_entity_type, '5') message
+         |from $inc_ads_relation_legal_entity
+         |where ds = '$ds'
+         |and legal_entity_type = '2'
+         |""".stripMargin).show(20, false)
+
+    //主要成员关系(人 -> 公司)
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} table $inc_ads_relation_staff_kafka  PARTITION (ds='$ds')
+         |select
+         |end_id key,
+         |get_relation_staff(start_id, start_name, end_id, end_name, staff_type, deleted, '6') message
+         |from $inc_ads_relation_staff
+         |where ds = '$ds'
+         |""".stripMargin).show(20, false)
+
+    //防止空分区
+    addEmptyPartitionOrSkip(inc_ads_company_node_kafka, ds)
+    addEmptyPartitionOrSkip(inc_ads_relation_holder_v1_kafka, ds)
+    addEmptyPartitionOrSkip(inc_ads_relation_holder_v2_kafka, ds)
+    addEmptyPartitionOrSkip(inc_ads_relation_staff_kafka, ds)
+    addEmptyPartitionOrSkip(inc_ads_relation_legal_entity_v1_kafka, ds)
+    addEmptyPartitionOrSkip(inc_ads_relation_legal_entity_v2_kafka, ds)
+
+  }
+
+}
+
+object inc_company_relation {
+  def main(args: Array[String]): Unit = {
+    if (args.size != 2) {
+      println("please set project ds.")
+      sys.exit(-1)
+    }
+    val Array(project, ds) = args
+    //    val project = "winhc_ng"
+    //    val ds = "20210106"
+    val config = EsConfig.getEsConfigMap ++ mutable.Map(
+      "spark.hadoop.odps.project.name" -> project,
+      "spark.debug.maxToStringFields" -> "200",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "10000"
+    )
+    val spark = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+    val re = inc_company_relation(s = spark, project = project, ds = ds)
+    re.register_fun()
+    re.inc()
+    re.sendKafkaPre()
+    spark.stop()
+  }
+}
+

+ 4 - 0
src/main/scala/com/winhc/bigdata/spark/udf/CompanyMapping.scala

@@ -63,6 +63,10 @@ trait CompanyMapping {
       compareName(yg, bg)
     })
 
+    spark.udf.register("name_cleanup", (name: String) => {
+      nameCleanup(name)
+    })
+
   }
 
   def prepare(spark: SparkSession): Unit = {