Explorar o código

增加报警标志位

xufei %!s(int64=3) %!d(string=hai) anos
pai
achega
76f3a10723

+ 15 - 1
src/main/scala/com/winhc/bigdata/spark/ng/relation/inc_company_relation_v2.scala

@@ -57,11 +57,14 @@ case class inc_company_relation_v2(s: SparkSession,
                                   end_name: String, deleted: Int, legal_entity_type: String, topic_type: String): String =
       CompanyRelationUtils.get_relation_legal_entity(start_id, start_name, end_id, end_name, deleted.toString, legal_entity_type, topic_type)
 
+    def get_success_status(ds: String, status: String, topic_type: String): String = CompanyRelationUtils.get_success_status(ds, status, topic_type)
+
     spark.udf.register("get_company_node", get_company_node _)
     spark.udf.register("get_person_node", get_person_node _)
     spark.udf.register("get_relation_holder", get_relation_holder _)
     spark.udf.register("get_relation_staff", get_relation_staff _)
     spark.udf.register("get_relation_legal_entity", get_relation_legal_entity _)
+    spark.udf.register("get_success_status", get_success_status _)
   }
 
   def inc(): Unit = {
@@ -386,6 +389,7 @@ case class inc_company_relation_v2(s: SparkSession,
     val inc_ads_relation_staff_kafka = "winhc_ng.inc_ads_relation_staff_kafka"
     val inc_ads_relation_legal_entity_v1_kafka = "winhc_ng.inc_ads_relation_legal_entity_v1_kafka"
     val inc_ads_relation_legal_entity_v2_kafka = "winhc_ng.inc_ads_relation_legal_entity_v2_kafka"
+    val inc_ads_node_relation_success_status = "winhc_ng.inc_ads_node_relation_success_status"
     //公司节点
     sql(
       s"""
@@ -456,6 +460,15 @@ case class inc_company_relation_v2(s: SparkSession,
          |where ds = '$ds'
          |""".stripMargin).show(20, false)
 
+    //发送成功状态标识
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} table $inc_ads_node_relation_success_status  PARTITION (ds='$ds')
+         |select
+         |$ds key,
+         |get_success_status($ds, '1', '100') message
+         |""".stripMargin).show(20, false)
+
     //防止空分区
     addEmptyPartitionOrSkip(inc_ads_company_node_kafka, ds)
     addEmptyPartitionOrSkip(inc_ads_relation_holder_v1_kafka, ds)
@@ -463,6 +476,7 @@ case class inc_company_relation_v2(s: SparkSession,
     addEmptyPartitionOrSkip(inc_ads_relation_staff_kafka, ds)
     addEmptyPartitionOrSkip(inc_ads_relation_legal_entity_v1_kafka, ds)
     addEmptyPartitionOrSkip(inc_ads_relation_legal_entity_v2_kafka, ds)
+    addEmptyPartitionOrSkip(inc_ads_node_relation_success_status, ds)
 
   }
 
@@ -495,7 +509,7 @@ case class inc_company_relation_v2(s: SparkSession,
          |                ) WHERE   num =1
          |     ) b
          |ON      a.human_pid = b.human_pid
-        |""".stripMargin).createOrReplaceTempView("inc_update_deleted_person_company")
+         |""".stripMargin).createOrReplaceTempView("inc_update_deleted_person_company")
 
     //删除数据
     sql(

+ 4 - 0
src/main/scala/com/winhc/bigdata/spark/utils/CompanyRelationUtils.scala

@@ -19,6 +19,7 @@ case class relation_staff(start_id: String, start_name: String, end_id: String,
 case class relation_legal_entity(start_id: String, start_name: String, end_id: String,
                                  end_name: String, deleted: String, legal_entity_type: String, topic_type: String)
 
+case class success_status(ds: String, status: String, topic_type: String)
 
 object CompanyRelationUtils {
 
@@ -40,4 +41,7 @@ object CompanyRelationUtils {
                                 end_name: String, deleted: String, legal_entity_type: String, topic_type: String): String =
     relation_legal_entity(start_id, start_name, end_id, end_name, deleted, legal_entity_type, topic_type).toJson()
 
+  def get_success_status(ds: String, status: String, topic_type: String): String =
+    success_status(ds, status, topic_type).toJson()
+
 }