Sfoglia il codice sorgente

增量公司基本信息索引更新

许家凯 4 anni fa
parent
commit
c3ef636484

+ 8 - 1
src/main/scala/com/winhc/bigdata/spark/implicits/CompanyIndexSave2EsHelper.scala

@@ -85,7 +85,14 @@ object CompanyIndexSave2EsHelper {
     val city_code = c._2
     val county_code = c._3
     val et = map("estiblish_time")
-    val time = if (StringUtils.isNotBlank(et)) et else null
+    var time = if (StringUtils.isNotBlank(et)){
+      if (et.contains(" ")) {
+        et.split(" ")(0)
+      }else{
+        et
+      }
+    }  else null
+
 
     val doc = CompanyDoc(
       cname = getCompanyName(map("name"))

+ 4 - 13
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyIncCompany2Es.scala

@@ -136,20 +136,11 @@ object CompanyIncCompany2Es {
         }
         (new ImmutableBytesWritable, put)
       }).filter(_ != null)
-//        .saveAsNewAPIHadoopDataset(jobConf)
-              .saveAsHadoopDataset(jobConf)
+        .saveAsHadoopDataset(jobConf)
 
       //写出到es
-      import com.winhc.bigdata.spark.utils.CompanyEsUtils.getEsDoc
-      import org.elasticsearch.spark._
-      stringDf.map(r => {
-        val cid = r.getAs[String]("cid")
-        val cname = r.getAs[String]("name")
-        val history_names = r.getAs[String]("history_names")
-        val current_cid = r.getAs[String]("current_cid")
-        val company_type = r.getAs[String]("company_type")
-        getEsDoc(cid, cname, history_names, current_cid, company_type)
-      }).rdd.saveToEsWithMeta("winhc-company/company")
+      import com.winhc.bigdata.spark.implicits.CompanyIndexSave2EsHelper._
+      stringDf.companyIndexSave2Es()
 
     }
   }
@@ -164,7 +155,7 @@ object CompanyIncCompany2Es {
 
     val config = EsConfig.getEsConfigMap ++ mutable.Map(
       "spark.hadoop.odps.project.name" -> project,
-      "spark.hadoop.odps.spark.local.partition.amt" -> "2"
+      "spark.hadoop.odps.spark.local.partition.amt" -> "10"
     )
 
     val spark = SparkUtils.InitEnv("company2Es", config)