浏览代码

fix: 股权出质、年报

- 股权出质兼容重跑
- 年报fix bug
- HbaseHelper自动转换大写
许家凯 4 年之前
父节点
当前提交
d7b8a64eac

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/implicits/DataFrame2HBaseHelper.scala

@@ -26,7 +26,7 @@ object DataFrame2HBaseHelper {
         for (f <- fields) {
           val v = row.getAs[String](f.toLowerCase)
           if (v != null) {
-            put.addColumn(f_bytes, Bytes.toBytes(f), Bytes.toBytes(v))
+            put.addColumn(f_bytes, Bytes.toBytes(f.toUpperCase()), Bytes.toBytes(v))
           }
         }
         (new ImmutableBytesWritable, put)

+ 60 - 20
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyAnnualReport.scala

@@ -3,7 +3,7 @@ package com.winhc.bigdata.spark.jobs
 import com.winhc.bigdata.spark.config.EsConfig
 import com.winhc.bigdata.spark.udf.BaseFunc
 import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
-import com.winhc.bigdata.spark.utils.{DataTypeUtils, LoggingUtils, SparkUtils}
+import com.winhc.bigdata.spark.utils.{CompanyIncSummary, DataTypeUtils, LoggingUtils, SparkUtils}
 import org.apache.commons.lang3.StringUtils
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
@@ -146,14 +146,19 @@ object CompanyAnnualReport {
 
       val inc_ods_end_ds = getLastPartitionsOrElse(s"$project.inc_ods_company_annual_report", "0")
       val ads_end_ds = getLastPartitionsOrElse(s"$project.ads_company_annual_report", "0")
+      var inc_ads_end_ds = getLastPartitionsOrElse(s"$project.inc_ads_company_annual_report", "0")
 
+      if (inc_ods_end_ds.equals(inc_ads_end_ds)) {
+        println("inc_ods_end_ds = inc_ads_end_ds ! ")
+        inc_ads_end_ds = getSecondLastPartitionOrElse(s"$project.inc_ads_company_annual_report", "0")
+      }
       val columns = getColumns(s"$project.ads_company_annual_report").diff(Seq("ds", "rowkey", "new_cid", "cid"))
 
       sql(
         s"""
            |SELECT  cid,current_cid as new_cid
            |FROM    $project.inc_ods_company
-           |WHERE   ds > ${ads_end_ds}
+           |WHERE   ds > ${inc_ads_end_ds}
            |AND     cid IS NOT NULL
            |AND     current_cid IS NOT NULL
            |GROUP BY cid,current_cid
@@ -180,12 +185,12 @@ object CompanyAnnualReport {
            |                FROM    (
            |                        SELECT  *
            |                        FROM    $project.inc_ods_company_annual_report
-           |                        WHERE   ds > $ads_end_ds and cid is not null
+           |                        WHERE   ds > $inc_ads_end_ds and cid is not null
            |                        ) a
            |                LEFT JOIN (
            |                        select *
            |                        from $project.base_company_mapping
-           |                        where ds =${getLastPartitionsOrElse(s"$project.base_company_mapping", "0")}
+           |                        where ds = ${getLastPartitionsOrElse(s"$project.base_company_mapping", "0")}
            |                ) b
            |                ON      a.cid = b.cid
            |                UNION ALL
@@ -205,7 +210,7 @@ object CompanyAnnualReport {
            |                                new_cid AS cid
            |                                ,${columns.mkString(",")}
            |                        FROM    $project.inc_ads_company_annual_report
-           |                        WHERE   ds > ${ads_end_ds}
+           |                        WHERE   ds > ${inc_ads_end_ds}
            |                     ) b
            |                ON      a.cid = b.cid
            |                ) c
@@ -215,6 +220,26 @@ object CompanyAnnualReport {
 
       //todo 只写入hbase
 
+      val writCols = getColumns("winhc_eci_dev.inc_ads_company_annual_report").diff(Seq(
+        "ds"
+        , "id"
+        , "new_cid"
+        , "cid"
+      ))
+
+      import com.winhc.bigdata.spark.implicits.DataFrame2HBaseHelper._
+      sql(
+        s"""
+           |SELECT  new_cid as cid,${writCols.mkString(",")}
+           |FROM    winhc_eci_dev.inc_ads_company_annual_report
+           |WHERE   ds = '$inc_ods_end_ds'
+           |""".stripMargin)
+        .save2HBase("COMPANY_ANNUAL_REPORT"
+          , "rowkey"
+          , "cid" +: writCols)
+
+      CompanyIncSummary(spark, project, "company_annual_report", "new_cid", Seq("rowkey")).calc
+
     }
 
 
@@ -287,8 +312,13 @@ object CompanyAnnualReport {
       utils.getMainTmpTab("main_table_tmp")
 
 
-      val inc_ads_end_ds = getLastPartitionsOrElse(s"$project.inc_ads_$project", "0")
-      val inc_ods_end_ds = getLastPartitionsOrElse(s"$project.inc_ods_$project", "0")
+      var inc_ads_end_ds = getLastPartitionsOrElse(s"$project.inc_ads_$tableName", "0")
+      val inc_ods_end_ds = getLastPartitionsOrElse(s"$project.inc_ods_$tableName", "0")
+
+      if (inc_ads_end_ds.equals(inc_ods_end_ds)) {
+        println("inc_ads_end_ds = inc_ods_end_ds !")
+        inc_ads_end_ds = getSecondLastPartitionOrElse(s"$project.inc_ads_$tableName", "0")
+      }
 
       sql(
         s"""
@@ -325,8 +355,24 @@ object CompanyAnnualReport {
            |WHERE   t4.num = 1
            |""".stripMargin)
 
-      //todo 只写入hbase
+      val writeCols = getColumns(s"winhc_eci_dev.inc_ads_$tableName").diff(Seq(
+        "ds"
+        , "id"
+        , "new_cid"
+        , "cid"
+      ))
 
+      //todo 只写入hbase
+      import com.winhc.bigdata.spark.implicits.DataFrame2HBaseHelper._
+      sql(
+        s"""
+           |SELECT  new_cid as cid,${writeCols.mkString(",")}
+           |FROM    winhc_eci_dev.inc_ads_$tableName
+           |WHERE   ds = '$inc_ods_end_ds'
+           |""".stripMargin)
+        .save2HBase(tableName.toUpperCase
+          , "rowkey"
+          , "cid" +: writeCols)
     }
   }
 
@@ -352,32 +398,26 @@ object CompanyAnnualReport {
       , "company_annual_report_change" -> "change_item,change_time" //年报-变更
       , "company_annual_report_equity_change" -> "investor_name,change_time" //年报-股权变更
       , "company_annual_report_holder" -> "investor_name" //年报-股东
-      , "company_annual_report_out_guarantee" -> "id" //年报-对外担保
+      //      , "company_annual_report_out_guarantee" -> "id" //年报-对外担保
       , "company_annual_report_webinfo" -> "website" //年报-网站
       , "company_annual_report_social_security" -> "" //年报-社保 (不采取去重,和main_id一对一)
     )
 
-    //    for (elem <- sublist_map) {
-    //      CompanyAnnualReportHandle(spark, project).sublist_all(elem._1, elem._2.split(","))
-    //    }
-
-    //    for (e <- sublist_map) {
-    //      CompanyAnnualReportHandle(spark, project).sublist_all(e._1, e._2.split(","))
-    //    }
-
-    val all_flag = true
+    val all_flag = false
 
     if (all_flag) {
       //存量
-      CompanyAnnualReportHandle(spark, project).main_table_all()
+      //      CompanyAnnualReportHandle(spark, project).main_table_all()
       for (elem <- sublist_map) {
+        println("xjk:" + elem._1)
         CompanyAnnualReportHandle(spark, project).sublist_all(elem._1, elem._2.split(","))
       }
     } else {
       //增量
       CompanyAnnualReportHandle(spark, project).main_table_inc()
       for (e <- sublist_map) {
-        CompanyAnnualReportHandle(spark, project).sublist_all(e._1, e._2.split(","))
+        println("xjk:" + e._1)
+        CompanyAnnualReportHandle(spark, project).sublist_inc(e._1, e._2.split(","))
       }
     }
     spark.stop()

+ 2 - 2
src/main/scala/com/winhc/bigdata/spark/jobs/inc_company_equity_info.scala

@@ -30,12 +30,12 @@ object inc_company_equity_info {
       cleanup()
       val cols = getColumns(s"winhc_eci_dev.ads_company_equity_info").diff(Seq("ds", "rowkey", "id"))
 
-      val startPart = getLastPartitionsOrElse(s"winhc_eci_dev.inc_ads_company_equity_info", "0")
+      var startPart = getLastPartitionsOrElse(s"winhc_eci_dev.inc_ads_company_equity_info", "0")
       val endPart = getLastPartitionsOrElse(s"winhc_eci_dev.inc_ods_company_equity_info", BaseUtil.getYesterday())
 
       if (startPart.equals(endPart)) {
         println("start partition = end partition!")
-        sys.exit(-1)
+        startPart = getSecondLastPartitionOrElse(s"winhc_eci_dev.inc_ads_company_equity_info", "0")
       }
 
       sql(

+ 9 - 0
src/main/scala/com/winhc/bigdata/spark/utils/LoggingUtils.scala

@@ -121,6 +121,15 @@ trait LoggingUtils extends Logging {
     sql(sql_s).collect.toList.map(_.getString(0).split("=")(1)).seq
   }
 
+  def getSecondLastPartitionOrElse(t: String, default: String): String = {
+    val ps = getPartitions(t)
+    if (ps.length >= 2) {
+      ps(ps.length - 2)
+    } else {
+      default
+    }
+  }
+
   def getLastPartitionsOrElse(t: String, default: String): String = {
     val ps = getPartitions(t)
     if (ps.nonEmpty) {