Bladeren bron

专利区分逻辑修改,增量cids修改

xufei 4 jaren geleden
bovenliggende
commit
1c00d79851

+ 20 - 15
src/main/scala/com/winhc/bigdata/spark/model/CompanyIntellectualsScore.scala

@@ -30,7 +30,10 @@ object CompanyIntellectualsScore {
 
     val (sourceTable, flag, time, kind, project) = valid(args)
 
-    val config = mutable.Map.empty[String, String]
+    var config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "10"
+    )
     config.+=("spark.hadoop.odps.project.name" -> "winhc_eci_dev")
     println(config)
     val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
@@ -83,11 +86,13 @@ case class CompanyIntellectualsScore(s: SparkSession, sourceTable: String,
 
     var appsql = ""
     var apptab = ""
+
+    // 专利类型(1(A).发明公布,2(B,C).发明授权,3(U,Y).实用新型,4(D,S).外观设计)
     if ("3".equals(flag)) { //实用新型、外观设计专利
-      appsql = "AND substring(pub_number, 7,1) in ('2','3')"
+      appsql = "AND SUBSTR(pub_number, -1) in ('U','Y','D','S')"
       apptab = "_s"
     } else if ("4".equals(flag)) { //发明专利
-      appsql = "AND substring(pub_number, 7,1) not in ('2','3')"
+      appsql = "AND SUBSTR(pub_number, -1) in ('A','B','C')"
       apptab = "_f"
     }
 
@@ -109,18 +114,18 @@ case class CompanyIntellectualsScore(s: SparkSession, sourceTable: String,
     df.map(r => {
       trans(r, flag, kind, project)
     }).toDF("id", "cid", "kind", "kind_code", "project", "project_code", "type",
-      "score", "total", "extraScore")
-      .createOrReplaceTempView(s"t1_view")
-
-    //    logger.info(
-    //      s"""
-    //         |- - - - - - - - - - - - - - - - - - - - - - - - -
-    //         |${showString(sql(s"select * from t1_view"))}
-    //         |- - - - - - - - - - - - - - - - - - - - - - - - -
-    //       """.stripMargin)
-
-    sql(s"insert overwrite table ${targetTable}${apptab} " +
-      s"partition (ds='${ds}')  select * from t1_view")
+      "score", "total", "extraScore").show(10)
+//      .createOrReplaceTempView(s"t1_view")
+
+//        logger.info(
+//          s"""
+//             |- - - - - - - - - - - - - - - - - - - - - - - - -
+//             |${showString(sql(s"select * from t1_view"))}
+//             |- - - - - - - - - - - - - - - - - - - - - - - - -
+//           """.stripMargin)
+
+//    sql(s"insert overwrite table ${targetTable}${apptab} " +
+//      s"partition (ds='${ds}')  select * from t1_view")
   }
 
   def trans(r: Row, flag: String, kind: String, prpject: String) = {

+ 3 - 5
src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncrForCidsUtils.scala

@@ -29,17 +29,16 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
 
 
     val sublistTableFieldName = spark.table(ads_company_tb_list).columns.filter(s => {
-      !s.equals("ds") && !s.equals("new_cid") && !s.equals("rowkey")
+      !s.equals("ds") && !s.equals("new_cid") && !s.equals("rowkey") && !s.equals("cids") && !s.equals("new_cids")
     }).seq
 
-
     println(s"${this.getClass.getSimpleName} calc start! " + new Date().toString)
 
     val firstDs = BaseUtil.getFirstPartion("winhc_eci_dev.inc_ods_company", spark)
 
     //table字段
     val columns: Seq[String] = spark.table(ads_company_tb).schema.map(_.name).filter(s => {
-      !s.equals("ds") && !s.equals("cid") && !s.equals("new_cid") && !s.equals("rowkey") && !s.equals("cids")
+      !s.equals("ds") && !s.equals("cid") && !s.equals("new_cid") && !s.equals("rowkey") && !s.equals("cids") && !s.equals("new_cids")
     })
 
     //mapping 映射关系
@@ -105,7 +104,7 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
          |                        UNION ALL
          |                        SELECT  new_cid AS cid
          |                                ,${columns.mkString(",")}
-         |                        FROM    ${ads_company_tb}
+         |                        FROM    ${ads_company_tb_list}
          |                        WHERE   ds >= ${firstDs}
          |                    ) b
          |            ON      a.cid = b.cid
@@ -113,7 +112,6 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
          |WHERE   num = 1
          |""".stripMargin)
 
-
     //主表按照id去重落库
     sql(
       s"""