|
@@ -1,12 +1,21 @@
|
|
|
package com.winhc.bigdata.spark.model
|
|
|
|
|
|
-import java.util.Date
|
|
|
-
|
|
|
-import com.winhc.bigdata.calc.{DimScore, DimScoreV2}
|
|
|
-import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
|
|
|
+import java.util
|
|
|
+import java.util.{Collections, Date}
|
|
|
+
|
|
|
+import com.alibaba.fastjson.JSON
|
|
|
+import com.winhc.bigdata.calc.DimScoreV2
|
|
|
+import com.winhc.bigdata.spark.jobs.EsQuery
|
|
|
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
|
|
|
+import com.winhc.bigdata.spark.utils.EsRestUtils.getRestClient
|
|
|
+import com.winhc.bigdata.spark.utils.{BaseUtil, EsRestUtils, LoggingUtils, Maxcomputer2Hbase, SparkUtils}
|
|
|
import org.apache.commons.lang3.StringUtils
|
|
|
-import org.apache.spark.broadcast.Broadcast
|
|
|
+import org.apache.http.entity.ContentType
|
|
|
+import org.apache.http.nio.entity.NStringEntity
|
|
|
+import org.apache.http.util.EntityUtils
|
|
|
+import org.apache.spark.internal.Logging
|
|
|
import org.apache.spark.sql.{Row, SparkSession}
|
|
|
+import org.elasticsearch.client.RestClient
|
|
|
|
|
|
import scala.annotation.meta.getter
|
|
|
import scala.collection.mutable
|
|
@@ -19,91 +28,79 @@ import scala.collection.mutable
|
|
|
object CompanyBidScore {
|
|
|
|
|
|
val tabMapping: Map[String, (String, String, String, String)] =
|
|
|
- Map("ads_company_bid_list" -> ("1", "publish_time", "资产权益", "招投标") //招投标
|
|
|
+ Map("company_bid_list" -> ("306", "publish_time", "资产权益", "招投标") //招投标
|
|
|
)
|
|
|
|
|
|
def main(args: Array[String]): Unit = {
|
|
|
|
|
|
- val (sourceTable, flag, time, kind, project) = valid(args)
|
|
|
+ val (namespace, sourceTable, flag, time, kind, project) = valid(args)
|
|
|
|
|
|
- var config = mutable.Map.empty[String, String]
|
|
|
+ var config = mutable.Map(
|
|
|
+ "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
|
|
|
+ "spark.hadoop.odps.spark.local.partition.amt" -> "100"
|
|
|
+ )
|
|
|
|
|
|
val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
|
|
|
|
|
|
println(s"company ${this.getClass.getSimpleName} calc start! " + new Date().toString)
|
|
|
+ spark.sql(
|
|
|
+ """
|
|
|
+ |select "24416401" as new_cid,1111L as id,'2020-07-18' as publish_time
|
|
|
+ |""".stripMargin).createOrReplaceTempView("inc_view")
|
|
|
+
|
|
|
+ new CompanyBidScore(spark, sourceTable, "inc_view", flag, time, kind, project, "1", namespace).calc()
|
|
|
+// new CompanyBidScore(spark, sourceTable, "", flag, time, kind, project, "0", namespace).calc()
|
|
|
|
|
|
- new CompanyBidScore(spark, sourceTable, flag, time, kind, project).calc()
|
|
|
|
|
|
println(s"company ${this.getClass.getSimpleName} calc end! " + new Date().toString)
|
|
|
spark.stop()
|
|
|
}
|
|
|
|
|
|
def valid(args: Array[String]) = {
|
|
|
- if (args.length != 1) {
|
|
|
- println("请输入要计算的table!!!! ")
|
|
|
+ println(args.mkString(", "))
|
|
|
+ if (args.length != 2) {
|
|
|
+ println("please enter namespace, table!!!! ")
|
|
|
sys.exit(-1)
|
|
|
}
|
|
|
- val Array(sourceTable) = args
|
|
|
+ val Array(namespace, sourceTable) = args
|
|
|
|
|
|
val (flag, time, kind, project) = tabMapping.getOrElse(sourceTable, ("", "", "", ""))
|
|
|
if (flag.isEmpty || time.isEmpty || kind.isEmpty || project.isEmpty) {
|
|
|
- println("输入表不存在!!! ")
|
|
|
+ println("table not found!!! ")
|
|
|
sys.exit(-1)
|
|
|
}
|
|
|
- (sourceTable, flag, time, kind, project)
|
|
|
+ (namespace, sourceTable, flag, time, kind, project)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-case class CompanyBidScore(s: SparkSession, sourceTable: String,
|
|
|
- flag: String, time: String, kind: String, project: String
|
|
|
- ) extends LoggingUtils {
|
|
|
+case class CompanyBidScore(s: SparkSession, sourceTable: String, tableView: String,
|
|
|
+ flag: String, time: String, kind: String, project: String, tp: String = "0", namespace: String
|
|
|
+ ) extends LoggingUtils with Logging {
|
|
|
|
|
|
@(transient@getter) val spark: SparkSession = s
|
|
|
|
|
|
import spark.implicits._
|
|
|
|
|
|
- def calc(): Unit = {
|
|
|
-
|
|
|
- val ods_company = "new_ods_company"
|
|
|
- val company_category = "const_company_category_code"
|
|
|
-
|
|
|
- // //所属行业
|
|
|
- // val code2Name: Broadcast[Map[String, String]] = spark.sparkContext.broadcast(sql(
|
|
|
- // s"""
|
|
|
- // |select category_code,category_str_big
|
|
|
- // |from $company_category
|
|
|
- // """.stripMargin).collect().map(r => {
|
|
|
- // (r.getString(0), r.getString(1))
|
|
|
- // }).toMap)
|
|
|
- //
|
|
|
- // spark.udf.register("industry_name", (code: String) => {
|
|
|
- // code2Name.value.getOrElse(code, null)
|
|
|
- // })
|
|
|
- //
|
|
|
- // val industry = sql(
|
|
|
- // s"""
|
|
|
- // |select category_code,cast(cid as string) as ncid,
|
|
|
- // | industry_name(category_code) AS industry_name
|
|
|
- // |from $ods_company where cid is not null
|
|
|
- // |""".stripMargin)
|
|
|
- //
|
|
|
- // industry.show(100)
|
|
|
- //
|
|
|
- //
|
|
|
- // industry.createOrReplaceTempView("t1")
|
|
|
-
|
|
|
- val industry2 = sql(
|
|
|
- s"""
|
|
|
- |select a.category_code,cast(a.cid as string) as ncid,
|
|
|
- | b.category_str_big AS industry_name
|
|
|
- |from $ods_company a
|
|
|
- |left join const_company_category_code b on a.category_code = b.category_code
|
|
|
- |where cid is not null
|
|
|
- |""".stripMargin)
|
|
|
- industry2.createOrReplaceTempView("t1")
|
|
|
+ def calc() = {
|
|
|
+
|
|
|
+ val ads_company = s"$namespace.ads_company"
|
|
|
+ val company_category = s"$namespace.const_company_category_code"
|
|
|
+ val ads_company_tb = s"$namespace.ads_$sourceTable"
|
|
|
+ val inc_ads_company_tb = s"$namespace.inc_ads_$sourceTable"
|
|
|
|
|
|
- // 注意线上是否分区
|
|
|
- // ds = '${BaseUtil.getPartion(sourceTable, spark)}' AND
|
|
|
+ val adsCompanyPar = BaseUtil.getPartion(ads_company, spark)
|
|
|
+// val adsPar = BaseUtil.getPartion(ads_company_tb, spark)
|
|
|
+
|
|
|
+ var ds = ""
|
|
|
+ var appsql2 = ""
|
|
|
+ var tb = ads_company_tb
|
|
|
+ if ("1".equals(tp)) {
|
|
|
+ tb = tableView
|
|
|
+ ds = BaseUtil.getPartion(inc_ads_company_tb, spark)
|
|
|
+ } else {
|
|
|
+ ds = BaseUtil.getPartion(ads_company_tb, spark)
|
|
|
+ appsql2 = s"AND ds = ${ds}"
|
|
|
+ }
|
|
|
|
|
|
val df = sql(
|
|
|
s"""
|
|
@@ -111,45 +108,100 @@ case class CompanyBidScore(s: SparkSession, sourceTable: String,
|
|
|
|FROM (
|
|
|
| SELECT
|
|
|
| *
|
|
|
- | ,COUNT(ncid) OVER(PARTITION BY ncid ) AS cnt1
|
|
|
- | ,row_number() OVER(PARTITION BY ncid ORDER BY $time DESC ) AS num
|
|
|
- | FROM $sourceTable
|
|
|
- | WHERE
|
|
|
- |
|
|
|
- | ncid IS NOT NULL
|
|
|
+ | ,COUNT(new_cid) OVER(PARTITION BY new_cid ) AS cnt1
|
|
|
+ | ,ROW_NUMBER() OVER(PARTITION BY new_cid ORDER BY $time DESC ) AS num
|
|
|
+ | FROM $tb
|
|
|
+ | WHERE new_cid IS NOT NULL
|
|
|
+ | ${appsql2}
|
|
|
| ) a
|
|
|
|WHERE num =1
|
|
|
- |""".stripMargin).createOrReplaceTempView("t2")
|
|
|
- // .join(industry, Seq("ncid"), "left")
|
|
|
- // .select("cid", "id", "cnt1", "industry_name", "ncid")
|
|
|
+ |""".stripMargin)
|
|
|
|
|
|
- val df2 = sql(
|
|
|
- """
|
|
|
- |select t2.*,t1.industry_name,category_code from t2 left join t1 on t2.ncid = t1.ncid
|
|
|
- |""".stripMargin)
|
|
|
- df2.show(100)
|
|
|
+ df.createOrReplaceTempView("t2")
|
|
|
+
|
|
|
+ if (tp.equals("0")) {
|
|
|
+ sql(
|
|
|
+ s"""
|
|
|
+ |select a.category_code,cast(a.cid as string) as new_cid,
|
|
|
+ | b.category_str_big AS industry_name
|
|
|
+ |from $ads_company a
|
|
|
+ |left join $company_category b on a.category_code = b.category_code
|
|
|
+ |where a.cid is not null and a.ds=${adsCompanyPar}
|
|
|
+ |""".stripMargin).createOrReplaceTempView("t1")
|
|
|
+
|
|
|
+ sql(
|
|
|
+ """
|
|
|
+ |select t2.*,t1.industry_name,category_code from t2 left join t1 on t2.new_cid = t1.new_cid
|
|
|
+ |""".stripMargin).map(r => {
|
|
|
+ trans(r, flag, kind, project)
|
|
|
+ }).toDF("id", "cid", "kind", "kind_code", "project", "project_code", "type",
|
|
|
+ "score", "total", "extraScore")
|
|
|
+ .createOrReplaceTempView(s"tmp_view")
|
|
|
|
|
|
- df2.map(r => {
|
|
|
- trans(r, flag, kind, project)
|
|
|
- }).toDF("id", "cid", "kind", "kind_code", "project", "project_code", "type",
|
|
|
- "score", "total", "extraScore")
|
|
|
- .createOrReplaceTempView(s"${sourceTable}_tmp_view")
|
|
|
+ } else {
|
|
|
+ df.mapPartitions(iter => {
|
|
|
+ trans2(iter, flag, kind, project)
|
|
|
+ }).toDF("id", "cid", "kind", "kind_code", "project", "project_code", "type",
|
|
|
+ "score", "total", "extraScore")
|
|
|
+ .createOrReplaceTempView(s"tmp_view")
|
|
|
+ }
|
|
|
|
|
|
+ sql(
|
|
|
+ s"""
|
|
|
+ |insert ${if (isWindows) "INTO" else "OVERWRITE"} table ${ads_company_tb}_score partition(ds=$ds)
|
|
|
+ |select id,cid,kind,kind_code,project,project_code,type,score,total,extraScore
|
|
|
+ |from tmp_view
|
|
|
+ |""".stripMargin)
|
|
|
|
|
|
- sql(s"select * from ${sourceTable}_tmp_view").show(10)
|
|
|
- sql(s"insert overwrite table ${sourceTable}_score select * from ${sourceTable}_tmp_view")
|
|
|
+ //同步hbase
|
|
|
+ if ("1".equals(tp)) { //存量计算不用同步hbase
|
|
|
+ val dataFrame = sql(
|
|
|
+ s"""
|
|
|
+ |select
|
|
|
+ |CONCAT_WS('_',cid,project_code) AS rowkey,
|
|
|
+ |id,cid,kind,kind_code,project,project_code,type,score,total,extraScore
|
|
|
+ |from tmp_view
|
|
|
+ |""".stripMargin)
|
|
|
+ Maxcomputer2Hbase(dataFrame, "COMPANY_SCORE").syn()
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
+ //存量逻辑
|
|
|
def trans(r: Row, flag: String, kind: String, prpject: String) = {
|
|
|
val id = r.getAs[Long]("id")
|
|
|
- val cid = r.getAs[Long]("ncid").toString
|
|
|
+ val cid = r.getAs[String]("new_cid")
|
|
|
val cnt1 = r.getAs[Long]("cnt1")
|
|
|
val industry_name = r.getAs[String]("industry_name")
|
|
|
flag match {
|
|
|
- case "1" => tenderScore(id, cid, cnt1, kind, prpject, industry_name)
|
|
|
+ case "306" => tenderScore(id, cid, cnt1, kind, prpject, industry_name)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ //增量逻辑
|
|
|
+ def trans2(iter: Iterator[Row], flag: String, kind: String, prpject: String) = {
|
|
|
+ val restClient = getRestClient()
|
|
|
+ val df = iter.map(r => {
|
|
|
+ try {
|
|
|
+ val id = r.getAs[Long]("id")
|
|
|
+ val cid = r.getAs[String]("new_cid")
|
|
|
+ val cnt1 = r.getAs[Long]("cnt1")
|
|
|
+ var m1: Map[String, String] = Map.empty[String, String]
|
|
|
+ m1 = EsQuery2.queryCompanyForCid(restClient, cid)
|
|
|
+ val industry_name = m1("category_first")
|
|
|
+ flag match {
|
|
|
+ case "306" => tenderScore(id, cid, cnt1, kind, prpject, industry_name)
|
|
|
+ }
|
|
|
+ } catch {
|
|
|
+ case e: Exception => {
|
|
|
+ logWarning(r.toString())
|
|
|
+ logError(e.getMessage, e)
|
|
|
+ null
|
|
|
+ }
|
|
|
+ }
|
|
|
+ })
|
|
|
+ df
|
|
|
+ }
|
|
|
+
|
|
|
//招投标
|
|
|
def tenderScore(id: Long, cid: String, cnt1: Long, kind: String, project: String, industry_name: String) = {
|
|
|
var score = 0f
|
|
@@ -196,3 +248,62 @@ case class CompanyBidScore(s: SparkSession, sourceTable: String,
|
|
|
}
|
|
|
|
|
|
}
|
|
|
+
|
|
|
+object EsQuery2 {
|
|
|
+
|
|
|
+ def main(args: Array[String]): Unit = {
|
|
|
+ val client = EsRestUtils.getRestClient()
|
|
|
+ val map = queryCompanyForCid(client, "23537076")
|
|
|
+ println(map)
|
|
|
+ }
|
|
|
+
|
|
|
+ def queryCompanyForCid(restClient: RestClient, cid: String) = {
|
|
|
+ val query = ""
|
|
|
+ val entity = new NStringEntity(query, ContentType.APPLICATION_JSON)
|
|
|
+
|
|
|
+ val indexResponse = restClient.performRequest(
|
|
|
+ "GET",
|
|
|
+ s"/winhc-company/company/_search/?q=_id:$cid",
|
|
|
+ Collections.emptyMap[String, String](),
|
|
|
+ entity)
|
|
|
+ val en = indexResponse.getEntity
|
|
|
+ val res = EntityUtils.toString(en)
|
|
|
+ import scala.collection.JavaConverters._
|
|
|
+ val list = getIndexResult2(res)
|
|
|
+ if (list.nonEmpty) {
|
|
|
+ val id = list.head("_id").asInstanceOf[String]
|
|
|
+ val source: util.Map[String, Any] = list.head("_source").asInstanceOf[util.Map[String, Any]]
|
|
|
+ val province_code = source.get("province_code").asInstanceOf[String]
|
|
|
+ val city_code = source.get("city_code").asInstanceOf[String]
|
|
|
+ val county_code = source.get("county_code").asInstanceOf[String]
|
|
|
+ val reg_capital = source.get("reg_capital").asInstanceOf[String]
|
|
|
+ val category_first = source.get("category_first").asInstanceOf[String]
|
|
|
+ val category_second = source.get("category_second").asInstanceOf[String]
|
|
|
+ val category_third = source.get("category_third").asInstanceOf[String]
|
|
|
+ val estiblish_time = source.get("estiblish_time").asInstanceOf[String]
|
|
|
+ val phones = source.get("phones").asInstanceOf[util.List[String]].asScala.mkString(",")
|
|
|
+
|
|
|
+ Map(
|
|
|
+ "id" -> id,
|
|
|
+ "province_code" -> province_code,
|
|
|
+ "city_code" -> city_code,
|
|
|
+ "county_code" -> county_code,
|
|
|
+ "reg_capital" -> reg_capital,
|
|
|
+ "estiblish_time" -> estiblish_time,
|
|
|
+ "phones" -> phones,
|
|
|
+ "category_first" -> category_first,
|
|
|
+ "category_second" -> category_second,
|
|
|
+ "category_third" -> category_third
|
|
|
+ )
|
|
|
+ } else {
|
|
|
+ Map.empty[String, String]
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ def getIndexResult2(json: String) = {
|
|
|
+ import scala.collection.JavaConverters._
|
|
|
+ JSON.parseObject(json).getJSONObject("hits").getJSONArray("hits").toArray().map(m => m.asInstanceOf[util.Map[String, Any]]).map(_.asScala).toList
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+
|