Selaa lähdekoodia

增量计算bug fix

xufei 4 vuotta sitten
vanhempi
commit
709aaae3fe

+ 15 - 13
src/main/scala/com/winhc/bigdata/spark/model/CompanyIntellectualsScore.scala

@@ -34,8 +34,6 @@ object CompanyIntellectualsScore {
       "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
       "spark.hadoop.odps.spark.local.partition.amt" -> "10"
     )
-    config.+=("spark.hadoop.odps.project.name" -> "winhc_eci_dev")
-    println(config)
     val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
 
     println(s"company ${this.getClass.getSimpleName} calc start! " + new Date().toString)
@@ -57,14 +55,14 @@ object CompanyIntellectualsScore {
 
   def valid(args: Array[String]) = {
     if (args.length != 2) {
-      println("请输入要计算的 工作空间,table !!!! ")
+      println("please insert namespace,table !!!! ")
       sys.exit(-1)
     }
     val Array(namespace, sourceTable) = args
 
     val (flag, time, kind, project) = tabMapping.getOrElse(sourceTable, ("", "", "", ""))
     if (flag.isEmpty || time.isEmpty || kind.isEmpty || project.isEmpty) {
-      println("输入表不存在!!!   ")
+      println("table not find!!!   ")
       sys.exit(-1)
     }
 
@@ -75,7 +73,7 @@ object CompanyIntellectualsScore {
   def start(spark: SparkSession, namespace: String, sourceTable: String, tableView: String): Unit = {
     val (flag, time, kind, project) = tabMapping.getOrElse(sourceTable, ("", "", "", ""))
     if (flag.isEmpty || time.isEmpty || kind.isEmpty || project.isEmpty) {
-      println("输入模型计算表不存在!!! ")
+      println("model table not find !!! ")
       sys.exit(-1)
     }
 
@@ -100,6 +98,7 @@ case class CompanyIntellectualsScore(s: SparkSession, sourceTable: String, table
   import spark.implicits._
 
   def calc(): Unit = {
+    println(s"${this.getClass.getSimpleName} calc start! " + new Date().toString)
     //val targetTable = "ads_company_total_score"
     val adsTable = namespace + "ads_" + sourceTable
     val incAdsTable = namespace + "inc_ads_" + sourceTable
@@ -150,12 +149,12 @@ case class CompanyIntellectualsScore(s: SparkSession, sourceTable: String, table
       "score", "total", "extraScore")
       .createOrReplaceTempView(s"t1_view")
 
-    logger.info(
-      s"""
-         |- - - - - - - - - - - - - - - - - - - - - - - - -
-         |${showString(sql(s"select * from t1_view"))}
-         |- - - - - - - - - - - - - - - - - - - - - - - - -
-               """.stripMargin)
+    //    logger.info(
+    //      s"""
+    //         |- - - - - - - - - - - - - - - - - - - - - - - - -
+    //         |${showString(sql(s"select * from t1_view"))}
+    //         |- - - - - - - - - - - - - - - - - - - - - - - - -
+    //               """.stripMargin)
 
     sql(s"insert overwrite table ${targetTable}${apptab} " +
       s"partition (ds='${ds}')  select * from t1_view")
@@ -163,13 +162,16 @@ case class CompanyIntellectualsScore(s: SparkSession, sourceTable: String, table
     val dataFrame = sql(
       s"""
          |select
-         |CONCAT_WS('_',cid,id) AS rowkey,
+         |CONCAT_WS('_',cid,project_code) AS rowkey,
          |id,cid,kind,kind_code,project,project_code,type,score,total,extraScore
          |from t1_view
          |""".stripMargin)
 
     //同步hbase
-    Maxcomputer2Hbase(dataFrame,"COMPANY_SCORE").syn()
+    if ("1".equals(tp)) { //存量计算不用同步hbase
+      Maxcomputer2Hbase(dataFrame, "COMPANY_SCORE").syn()
+    }
+    println(s"${this.getClass.getSimpleName} calc end! " + new Date().toString)
   }
 
   def trans(r: Row, flag: String, kind: String, prpject: String) = {

+ 3 - 2
src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncSummary.scala

@@ -35,11 +35,12 @@ case class CompanyIncSummary(s: SparkSession,
 
 
     val ads_table_cols = spark.table(ads_table).columns.filter(l => {
-      !l.equals("ds") && !l.equals("rowkey") && !l.equals("flag")
+      !l.equals("ds") && !l.equals("rowkey") && !l.equals("flag") && !l.equals("new_cids") && !l.equals("cids")
     }).toList.sorted
 
+
     val inc_ads_table_cols = spark.table(inc_ads_table).columns.filter(l => {
-      !l.equals("ds") && !l.equals("rowkey") && !l.equals("flag")
+      !l.equals("ds") && !l.equals("rowkey") && !l.equals("flag") && !l.equals("new_cids") && !l.equals("cids")
     }).toList.sorted
 
     val new_cols = (ads_table_cols ::: inc_ads_table_cols).distinct.sorted

+ 2 - 4
src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncrForCidUtils.scala

@@ -19,14 +19,12 @@ case class CompanyIncrForCidUtils(s: SparkSession,
   @(transient@getter) val spark: SparkSession = s
 
   def calc(): Unit = {
+    println(s"${this.getClass.getSimpleName} calc start! " + new Date().toString)
     val inc_ods_company = s"${project}.inc_ods_company" //每日公司基本信息增量
     val ads_company_tb = s"${project}.ads_${tableName}" //存量ads表
     val inc_ods_company_tb = s"${project}.inc_ods_$tableName" //增量ods表
     val inc_ads_company_tb = s"${project}.inc_ads_$tableName" //增量ads表
 
-
-    println(s"${this.getClass.getSimpleName} calc start! " + new Date().toString)
-
     //存量表ads最新分区
     val remainDs = BaseUtil.getPartion(ads_company_tb, spark)
 
@@ -57,7 +55,7 @@ case class CompanyIncrForCidUtils(s: SparkSession,
 
     //增量ods和增量ads最后一个分区相等,跳出
     if (lastDsIncOds.equals(lastDsIncAds)) {
-      println("增量ods和增量ads最后一个分区相等,跳出")
+      println("inc_ods equals inc_ads ds ,please delete last ds !!!")
       sys.exit(-1)
     }
 

+ 2 - 3
src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncrForCidsUtils.scala

@@ -21,6 +21,7 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
   @(transient@getter) val spark: SparkSession = s
 
   def calc(): Unit = {
+    println(s"${this.getClass.getSimpleName} calc start! " + new Date().toString)
     val inc_ods_company = s"${project}.inc_ods_company" //每日公司基本信息增量
     val ads_company_tb = s"${project}.ads_$mainTableName" //存量ads主表数据
     val ads_company_tb_list = s"${project}.ads_$sublistTableName" //存量子表数据 用于读取表字段
@@ -32,8 +33,6 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
       !s.equals("ds") && !s.equals("new_cid") && !s.equals("rowkey") && !s.equals("cids") && !s.equals("new_cids")
     }).seq
 
-    println(s"${this.getClass.getSimpleName} calc start! " + new Date().toString)
-
     //存量表ads最新分区
     val remainDs = BaseUtil.getPartion(ads_company_tb, spark)
 
@@ -64,7 +63,7 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
 
     //增量ods和增量ads最后一个分区相等,跳出
     if (lastDsIncOds.equals(lastDsIncAds)) {
-      println("增量ods和增量ads最后一个分区相等,跳出")
+      println("inc_ods equals inc_ads ds ,please delete last ds !!!")
       sys.exit(-1)
     }
 

+ 8 - 0
src/main/scala/com/winhc/bigdata/spark/utils/LoggingUtils.scala

@@ -28,6 +28,14 @@ trait LoggingUtils {
          |- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
        """.stripMargin
     )
+    println(
+      s"""
+        |- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+        |Job[${this.getClass.getSimpleName}].SQL[No$sqlNo.]
+        |
+        |$sqlString
+        |- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+        |""".stripMargin)
     sqlNo += 1
     spark.sql(sqlString)
   }

+ 2 - 1
src/main/scala/com/winhc/bigdata/spark/utils/MaxComputer2Phoenix.scala

@@ -21,7 +21,8 @@ case class MaxComputer2Phoenix(spark: SparkSession,
 
     println(s"${htable} phoenix syn start! " + new Date().toString)
 
-    val resTable = s"TEST_${htable}"
+//    val resTable = s"TEST_${htable}"
+    val resTable = s"${htable}"
 
     println("------------" + resTable + "---------")
 

+ 5 - 2
src/main/scala/com/winhc/bigdata/spark/utils/Maxcomputer2Hbase.scala

@@ -1,5 +1,7 @@
 package com.winhc.bigdata.spark.utils
 
+import java.util.Date
+
 import com.winhc.bigdata.spark.config.HBaseConfig
 import org.apache.hadoop.hbase.client.Put
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable
@@ -19,12 +21,13 @@ case class Maxcomputer2Hbase(dataFrame: DataFrame
   lazy val f_bytes: Array[Byte] = Bytes.toBytes("F")
 
   def syn(): Unit = {
+    println(s"${hbaseTable} HBASE syn start! " + new Date().toString)
     val jobConf = HBaseConfig.HBaseOutputJobConf(hbaseTable.toUpperCase)
     import org.apache.spark.sql.functions.col
     //df字段转化string
     val columns: Array[String] = dataFrame.columns
     val df2: DataFrame = dataFrame.select(columns.map(column => col(column).cast("string")): _*)
-    df2.printSchema()
+    //df2.printSchema()
     df2.rdd.map(row => {
       try {
         val rowkey = row.getAs[String]("rowkey")
@@ -42,7 +45,7 @@ case class Maxcomputer2Hbase(dataFrame: DataFrame
         }
       }
     }).filter(_ != null).saveAsHadoopDataset(jobConf)
-
+    println(s"${hbaseTable} HBASE syn start! " + new Date().toString)
   }
 
 }