Forráskód Böngészése

公司动态-招聘-修改为每日新增数量

yandawei 4 éve
szülő
commit
4034a542fc

+ 7 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamic.scala

@@ -236,7 +236,13 @@ object CompanyDynamic {
     startArgs.filter(e => {
       ts.contains(e.tableName)
     }).foreach(e => {
-      cd.calc(e.tableName, e.bName)
+      if(e.tableName == "company_employment"){
+        val cdf = CompanyDynamicForDayCount(spark, project, ds)
+        cdf.calc(e.tableName, e.bName)
+      }else{
+        cd.calc(e.tableName, e.bName)
+      }
+
     })
     spark.stop()
   }

+ 134 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamicForDayCount.scala

@@ -0,0 +1,134 @@
+package com.winhc.bigdata.spark.jobs.dynamic
+
+import java.util
+import java.util.Date
+
+import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamic.{env, targetTab}
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
+import com.winhc.bigdata.spark.utils.LoggingUtils
+import com.winhc.bigdata.spark.utils.ReflectUtils.getClazz
+import org.apache.commons.lang3.time.DateFormatUtils
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.types.StringType
+import org.apache.spark.sql.{Row, SparkSession}
+
+import scala.annotation.meta.getter
+import scala.collection.immutable.ListMap
+import scala.collection.mutable
+
+case class CompanyDynamicForDayCount(s: SparkSession,
+                                     project: String, //表所在工程名
+                                     ds: String //此维度主键
+
+                                    ) extends LoggingUtils with Logging {
+  @(transient@getter) val spark: SparkSession = s
+
+
+  def init(): Unit = {
+    sql(
+      s"""
+         |CREATE TABLE IF NOT EXISTS ${getEnvProjectName(env, project)}.$targetTab
+         |(
+         |    cid  STRING COMMENT '公司id'
+         |    ,cname  STRING COMMENT '公司name'
+         |    ,info_type STRING COMMENT '变更分类,大类'
+         |    ,rta_desc STRING COMMENT '变更信息描述,变更标题'
+         |    ,change_content STRING COMMENT '变更内容'
+         |    ,change_time STRING COMMENT '变更时间'
+         |    ,biz_id STRING COMMENT '业务id,数据行id'
+         |    ,sub_info_type STRING COMMENT '变更小类,表名'
+         |    ,info_risk_level STRING COMMENT '变更风险等级'
+         |    ,winhc_suggest STRING COMMENT '提示信息'
+         |    ,create_time STRING COMMENT '创建时间'
+         |)
+         |COMMENT '企业动态输出表'
+         |PARTITIONED BY (ds STRING COMMENT '分区',tn STRING COMMENT '表名')
+         |LIFECYCLE 30
+         |""".stripMargin)
+  }
+
+  //表名(不加前后辍)
+  def calc(tableName: String
+           , bName: Int = 0 //是否补充cname字段
+          ): Unit = {
+    val handle = getClazz[CompanyDynamicHandle](s"com.winhc.bigdata.spark.jobs.dynamic.tables.$tableName")
+
+    val types = handle.org_type()
+    val colsExclusiveSome = spark.table(s"${project}.ads_change_extract").columns.filter(s => {
+      !s.equals("cid") && !s.equals("data") && !s.equals("old_data") && !s.equals("ds") && !s.equals("tn")
+    }).seq
+    val rdd = sql(
+      bName match {
+        //默认:无需补全cname字段
+        case 0 =>
+          s"""
+             |SELECT  cid,count(1) AS cnt,biz_date,null AS cname
+             |FROM    ${project}.ads_change_extract
+             |WHERE   ds = '$ds'
+             |AND     tn = '$tableName'
+             |AND     TYPE in (${types.map("'" + _ + "'").mkString(",")})
+             |GROUP BY CID
+             |""".stripMargin
+        //需根据cid补全cname字段数据
+        case 1 =>
+          s"""
+             |SELECT A.cid as cid,count(1) AS cnt,B.cname AS cname
+             |FROM(
+             |  SELECT  *
+             |  FROM    ${project}.ads_change_extract
+             |  WHERE   ds = '$ds'
+             |  AND     tn = '$tableName'
+             |  AND     TYPE in (${types.map("'" + _ + "'").mkString(",")})
+             |) AS A
+             |LEFT JOIN (
+             |    SELECT cid,cname FROM  $project.base_company_mapping
+             |    WHERE ds = '${getLastPartitionsOrElse(project + ".base_company_mapping", "0")}'
+             |) AS B
+             |ON A.cid = B.cid
+             |GROUP BY A.CID,B.cname
+             |""".stripMargin
+      })
+      .rdd.flatMap(r => {
+      val cid = r.getAs[String]("cid")
+      val biz_date = ds.substring(0,4)+"-"+ds.substring(4,6)+"-"+ds.substring(6) +" 00:00:00"
+      val cnt = r.getAs[Long]("cnt")
+      val cname = r.getAs[String]("cname")
+      val new_map = Map("cnt" -> (cnt+""))
+
+      val result = handle.handle(cid+biz_date, biz_date, cid, null, null, new_map, cname)
+      if (result == null) {
+        None
+      }
+      else {
+        result.map(res => Row(CompanyDynamicHandleUtils.getDynamicId(cid, res._4, res._7, res._6), res._1, res._2, res._3, res._4, res._5, res._6, res._7, res._8, res._9, res._10, DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss")))
+      }
+    })
+
+    val schema = getSchema(ListMap(
+      "id" -> StringType
+      , "cid" -> StringType
+      , "cname" -> StringType
+      , "info_type" -> StringType
+      , "rta_desc" -> StringType
+      , "change_content" -> StringType
+      , "change_time" -> StringType
+      , "biz_id" -> StringType
+      , "sub_info_type" -> StringType
+      , "info_risk_level" -> StringType
+      , "winhc_suggest" -> StringType
+      , "create_time" -> StringType
+    ))
+    spark.createDataFrame(rdd, schema)
+      .createOrReplaceTempView("company_dynamic_tmp")
+
+    val cols = getColumns(s"$project.$targetTab").filter(!_.equals("ds")).filter(!_.equals("tn"))
+
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE ${getEnvProjectName(env, project)}.$targetTab PARTITION(ds='$ds',tn='$tableName')
+         |SELECT ${cols.mkString(",")}
+         |FROM
+         |    company_dynamic_tmp
+         |""".stripMargin)
+  }
+}

+ 4 - 2
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_employment.scala

@@ -15,7 +15,7 @@ case class company_employment() extends CompanyDynamicHandle {
     * @param new_map
     * @return
     */
-  override protected def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = new_map.getOrElse("name", null)
+  override protected def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = "新增"+new_map.getOrElse("cnt", "")+"条招聘信息"
 
 
   /**
@@ -34,5 +34,7 @@ case class company_employment() extends CompanyDynamicHandle {
     * @param new_map
     * @return
     */
-  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = new_map.toJson(Seq("city->地区", "description->招聘描述", "source->招聘来源", "employ_num->招聘人数", "start_date->职位发布日期", "ori_salary->薪资", "education->学历", "experience->工作经验", "title->职位"))
+//  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = new_map.toJson(Seq("city->地区", "description->招聘描述", "source->招聘来源", "employ_num->招聘人数", "start_date->职位发布日期", "ori_salary->薪资", "education->学历", "experience->工作经验", "title->职位"))
+  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = ""
+
 }