Procházet zdrojové kódy

利好消息tags加参

许家凯 před 4 roky
rodič
revize
884acfbab6

+ 10 - 10
src/main/scala/com/winhc/bigdata/spark/const/CaseChanceConst.scala

@@ -34,15 +34,15 @@ object CaseChanceConst {
     , "" -> "3-6" //购地信息
     , "company_land_transfer" -> "3-7" //土地转让
     ,*/
-    "company_bid_list" -> "3-3" //新增招投标
-    , "company_employment" -> "3-4" //新增招聘
-    , "company_land_publicity" -> "3-5" //地块公示
-    , "company_land_announcement" -> "3-6" //购地信息
-    , "company_land_transfer" -> "3-7" //土地转让
-    , "company_tm" -> "3-8" //知识产权-商标
-    , "company_patent_list" -> "3-9" //专利
-    , "company_certificate" -> "3-10" //资质证书   X
-    , "company_copyright_works_list" -> "3-11" //作品著作权
-    , "company_copyright_reg_list" -> "3-12" //软件著作权
+    "company_bid_list" -> "3" //新增招投标
+    , "company_employment" -> "4" //新增招聘
+    , "company_land_publicity" -> "5" //地块公示
+    , "company_land_announcement" -> "6" //购地信息
+    , "company_land_transfer" -> "7" //土地转让
+    , "company_tm" -> "8" //知识产权-商标
+    , "company_patent_list" -> "9" //专利
+    , "company_certificate" -> "10" //资质证书   X
+    , "company_copyright_works_list" -> "11" //作品著作权
+    , "company_copyright_reg_list" -> "12" //软件著作权
   )
 }

+ 45 - 19
src/main/scala/com/winhc/bigdata/spark/jobs/chance/ChangeExtract.scala

@@ -3,6 +3,7 @@ package com.winhc.bigdata.spark.jobs.chance
 import com.winhc.bigdata.spark.config.EsConfig
 import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
 import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
+import org.apache.commons.lang3.StringUtils
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.functions.col
 import org.apache.spark.sql.types.{MapType, StringType, StructField, StructType}
@@ -157,32 +158,57 @@ object ChangeExtract {
   }
 
 
-  // winhc_eci_dev company_tm rowkey 20200715 status_new
-  // winhc_eci_dev company_patent_list rowkey 20200715 lprs
+  // winhc_eci_dev company_tm rowkey 20200717 status_new
+  // winhc_eci_dev company_patent_list rowkey 20200717 lprs
   // winhc_eci_dev company_certificate rowkey 20200707 type
-  // winhc_eci_dev company_copyright_works_list rowkey 20200715 type
-  // winhc_eci_dev company_copyright_reg_list rowkey 20200715 version
+  // winhc_eci_dev company_copyright_works_list rowkey 20200717 type
+  // winhc_eci_dev company_copyright_reg_list rowkey 20200717 version
   // winhc_eci_dev company_employment rowkey 20200630 source
 
-  // winhc_eci_dev company_land_publicity rowkey 20200630 title,location,use_for
-  // winhc_eci_dev company_land_announcement rowkey 20200715 e_number,project_name
+  // winhc_eci_dev company_land_publicity rowkey 20200717 title,location,use_for
+  // winhc_eci_dev company_land_announcement rowkey 20200717 e_number,project_name
 
+  // winhc_eci_dev company_bid_list rowkey 20200717 title
+  // winhc_eci_dev company_land_transfer rowkey 20200717 num,location
 
 
   // winhc_eci_dev company cid 20200630 legal_entity_id,reg_location,business_scope,reg_status,reg_capital,emails,phones
   def main(args: Array[String]): Unit = {
-    val Array(project, tableName, rowkey, inc_ds, pf) = args
-
-    val config = EsConfig.getEsConfigMap ++ mutable.Map(
-      "spark.hadoop.odps.project.name" -> project,
-      "spark.hadoop.odps.spark.local.partition.amt" -> "10"
-    )
-
-
-    val spark = SparkUtils.InitEnv("ChangeExtract", config)
-
-    ChangeExtractHandle(spark, project, tableName, rowkey, inc_ds, pf.split(",")).calc
-    spark.stop()
+    if (args.length == 5) {
+      val Array(project, tableName, rowkey, inc_ds, pf) = args
+      val config = EsConfig.getEsConfigMap ++ mutable.Map(
+        "spark.hadoop.odps.project.name" -> project,
+        "spark.hadoop.odps.spark.local.partition.amt" -> "10"
+      )
+      val spark = SparkUtils.InitEnv("ChangeExtract", config)
+
+
+      ChangeExtractHandle(spark, project, tableName, rowkey, inc_ds, pf.split(",")).calc
+      spark.stop()
+    } else {
+      val project = "winhc_eci_dev"
+      val config = EsConfig.getEsConfigMap ++ mutable.Map(
+        "spark.hadoop.odps.project.name" -> project,
+        "spark.hadoop.odps.spark.local.partition.amt" -> "10"
+      )
+      val spark = SparkUtils.InitEnv("ChangeExtract", config)
+      val rows =
+        """winhc_eci_dev company_tm rowkey 20200717 status_new
+          |winhc_eci_dev company_patent_list rowkey 20200717 lprs
+          |winhc_eci_dev company_copyright_works_list rowkey 20200717 type
+          |winhc_eci_dev company_copyright_reg_list rowkey 20200717 version
+          |winhc_eci_dev company_land_publicity rowkey 20200717 title,location,use_for
+          |winhc_eci_dev company_land_announcement rowkey 20200717 e_number,project_name
+          |winhc_eci_dev company_bid_list rowkey 20200717 title
+          |winhc_eci_dev company_land_transfer rowkey 20200717 num,location
+          |""".stripMargin
+      for (r <- rows.split("\r\n")) {
+        if(StringUtils.isNotEmpty(r)){
+          val Array(tmp,tableName,rowkey,inc_ds,pf) =r.split(" ")
+          ChangeExtractHandle(spark, project, tableName, rowkey, inc_ds, pf.split(",")).calc
+        }
+      }
+      spark.stop()
+    }
   }
-
 }

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/chance/eci_good_news.scala

@@ -111,7 +111,7 @@ object eci_good_news {
            |        ,null AS defendant
            |        ,rel_bg_name AS company_name
            |        ,detail_cid AS cid
-           |        ,json_add_str(detail_label,CONCAT_WS(',',get_json_kv('reg_capital',rel_bg_reg_capital),get_json_kv('province',rel_bg_province_code),get_json_kv('city',rel_bg_city_code),get_json_kv('county',rel_bg_county_code))) AS tags
+           |        ,json_add_str(detail_label,CONCAT_WS(',',get_json_kv('reg_capital',rel_bg_reg_capital),get_json_kv('province',rel_bg_province_code),get_json_kv('city',rel_bg_city_code),get_json_kv('county',rel_bg_county_code),get_json_kv('estiblish_time',rel_bg_estiblish_time),get_json_kv('category_code',rel_bg_category_code))) AS tags
            |        ,detail_rowkey AS biz_id
            |        ,get_table_type(detail_table_name) AS type
            |        ,get_chance_dynamic_type(detail_table_name) AS dynamic_type

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/udf/BaseFunc.scala

@@ -40,7 +40,7 @@ trait BaseFunc {
       if (StringUtils.isNotBlank(value)) {
         "\"" + key + "\":\"" + value + "\""
       } else {
-        "\"" + key + "\":" + value
+        "\"" + key + "\":\"\""
       }
     })
   }

+ 2 - 1
src/main/scala/com/winhc/bigdata/spark/utils/ChangeExtractUtils.scala

@@ -35,12 +35,13 @@ object ChangeExtractUtils {
     if (StringUtils.isNotBlank(value)) {
       "\"" + value + "\""
     } else {
-      null
+      "\"\""
     }
   }
 
 
   def main(args: Array[String]): Unit = {
     val name = get_ip_tags("a", null, "b", null)
+    println(name)
   }
 }