146 Commity 0bc1198bc9 ... 948097f549

Autor SHA1 Wiadomość Data
  yandawei 948097f549 Merge remote-tracking branch 'origin/master' 4 lat temu
  yandawei 4034a542fc 公司动态-招聘-修改为每日新增数量 4 lat temu
  yandawei 19f756b5d3 Merge remote-tracking branch 'origin/master' 4 lat temu
  yandawei 9de873e615 公司动态-招聘 4 lat temu
  晏永年 b10f91b2ec 企业公告维度的业务日期的兜底用update_time 4 lat temu
  晏永年 e93baf59e5 动产抵押表中若公示日期为null则用登记日期 4 lat temu
  晏永年 4aa01443bd fix 4 lat temu
  晏永年 d59a98ceb8 添加企业动态:企业公告 4 lat temu
  xufei 721c97919c 司法拍卖前置增加 4 lat temu
  许家凯 3e55dbcd88 feat: 存量企业动态从存量变更信息中提取 4 lat temu
  许家凯 77c242924a fix: 企业动态唯一id采用新md5 4 lat temu
  许家凯 0a30636155 Merge remote-tracking branch 'origin/master' 4 lat temu
  许家凯 55f0189580 feat: 添加摘要计算通用程序 4 lat temu
  许家凯 5270d444db feat: 企业动态添加维一标识 4 lat temu
  许家凯 79ab0c2427 Merge remote-tracking branch 'origin/master' 4 lat temu
  许家凯 4455c0a22b fix: 过滤银行和保险机构 4 lat temu
  许家凯 e5b3a417d4 feat: 工商信息变更及动态 4 lat temu
  晏永年 17ad1dab48 针对2.3类的案源机会过滤掉银行、保险企业 4 lat temu
  晏永年 b186d36957 公示催告 4 lat temu
  许家凯 202bc037bd feat: 企业动态风险等级抽取统一映射 4 lat temu
  许家凯 b70f5857f5 Merge remote-tracking branch 'origin/master' 4 lat temu
  许家凯 c556c2fedc feat: 主要成员企业动态 4 lat temu
  许家凯 20c8424683 feat: 主要成员变更提取 4 lat temu
  许家凯 e69a151f52 fix: 主要成员增量 4 lat temu
  许家凯 675f6c0cde Merge remote-tracking branch 'origin/master' 4 lat temu
  许家凯 1dfbe8e709 feat: 变更和动态主函数入口参数调整 4 lat temu
  许家凯 b482d4862f Merge remote-tracking branch 'origin/master' 4 lat temu
  许家凯 f49bd3920e fix: phoenix加入超时配制 4 lat temu
  许家凯 d832c12dfd Merge remote-tracking branch 'origin/master' 4 lat temu
  许家凯 aea1077d14 feat: 主要成员增量和存量处理 4 lat temu
  许家凯 cfcd864234 feat: 存量cid通用spark程序加入自动建表 4 lat temu
  许家凯 45e3b9ce96 feat: 企业动态股权出质融入框架 4 lat temu
  许家凯 b0a8903811 Merge remote-tracking branch 'origin/master' 4 lat temu
  许家凯 50ac3b8230 feat: 企业变更通用程序修改 4 lat temu
  许家凯 4b1f265d00 fix: 企业变更及动态调整 4 lat temu
  许家凯 9d64faa7f4 Merge remote-tracking branch 'origin/master' 4 lat temu
  许家凯 64cc4c48b8 add:cid与new_cid的mapping表 4 lat temu
  晏永年 c3cb5f8c33 补全企业动态预备表的cname字段数据 4 lat temu
  xufei 83ec3fd3a4 增量计算补充 4 lat temu
  晏永年 09181e597e fix增量维度同步更换new_cid导致与老数据比较误判为insert数据 4 lat temu
  许家凯 ce51cfd574 Merge remote-tracking branch 'origin/master' 4 lat temu
  许家凯 da36b407bb add:股权出质 4 lat temu
  许家凯 c7ea469bc6 style 4 lat temu
  许家凯 79cd017511 Merge remote-tracking branch 'origin/master' 4 lat temu
  许家凯 daef91ca81 fix bugs 4 lat temu
  许家凯 1a0b844d32 fix bugs 4 lat temu
  许家凯 d36d594d22 添加企业动态启动参数 4 lat temu
  许家凯 abd7de490c Merge remote-tracking branch 'origin/master' 4 lat temu
  许家凯 5d1246ec18 添加企业动态 4 lat temu
  许家凯 33837bf012 Merge remote-tracking branch 'origin/master' 4 lat temu
  许家凯 8893fcc727 add 4 lat temu
  许家凯 2a7365f911 公司动态通用程序init 4 lat temu
  许家凯 1bc8911ea6 公司变更动态加字段,phoenix不输出id 4 lat temu
  许家凯 4f57a575db Merge remote-tracking branch 'origin/master' 4 lat temu
  许家凯 5d8395dfb0 企业债权关系表调参 4 lat temu
  许家凯 17e1014dc5 run 4 lat temu
  许家凯 1d475cb37f Merge remote-tracking branch 'origin/master' 4 lat temu
  许家凯 4fe19da806 企业动态,利好消息只取指定维度的动态 4 lat temu
  许家凯 77f1af0123 增量索引、企业债权关系、人员表增量 4 lat temu
  许家凯 d7bf2abd8d add 4 lat temu
  许家凯 d4ec5db2e9 fix bugs 4 lat temu
  晏永年 dd8c8ba212 生成rowkey 4 lat temu
  晏永年 861dc4bb0c fix 4 lat temu
  晏永年 71f4559997 添加人与公司表增量同步 4 lat temu
  许家凯 f9b705f3e6 Merge remote-tracking branch 'origin/master' 4 lat temu
  许家凯 b3216d1694 公司基本信息索引部分调整输出 4 lat temu
  许家凯 884acfbab6 利好消息tags加参 4 lat temu
  许家凯 8289e52153 利好消息配置 4 lat temu
  xufei da507b43fc merge 4 lat temu
  xufei bd1947c5a2 Merge remote-tracking branch 'origin/master' 4 lat temu
  xufei 49ce50b82d 地块公示,购地信息修改 4 lat temu
  许家凯 5864d5dc50 利好消息 4 lat temu
  许家凯 e96ebbe75f Merge remote-tracking branch 'origin/master' 4 lat temu
  许家凯 95bc6051a0 加入别名 4 lat temu
  许家凯 94d105bf87 加入九大类type自动推测 4 lat temu
  许家凯 30d0358c11 add 4 lat temu
  许家凯 e93c568347 Merge remote-tracking branch 'origin/master' 4 lat temu
  许家凯 6969da8e2a Merge branch 'master' of http://139.224.213.4:3000/bigdata/Spark_Max 4 lat temu
  许家凯 0569194c60 知识产权处理程序 4 lat temu
  xufei cf0dc616b5 Merge remote-tracking branch 'origin/master' 4 lat temu
  xufei bf7f5b4208 add 4 lat temu
  xufei 5ac259c122 法院公告计算 4 lat temu
  许家凯 ea4583e4d0 Merge branch 'master' of http://139.224.213.4:3000/bigdata/Spark_Max 4 lat temu
  许家凯 6b045b8056 全量写出索引,企业债权关系表增量 4 lat temu
  许家凯 eeca257202 加入es client 4 lat temu
  许家凯 a1a02793ba 扩充变更字段 4 lat temu
  许家凯 60c7004d74 提取动态 4 lat temu
  许家凯 d534fff6ff add 4 lat temu
  许家凯 c221b903d1 fix bug 4 lat temu
  许家凯 3ce3b62429 Merge branch 'master' of http://139.224.213.4:3000/bigdata/Spark_Max 4 lat temu
  许家凯 1177b509e8 动态提到,pom更新 4 lat temu
  许家凯 eea45f2525 Merge branch 'master' of http://139.224.213.4:3000/bigdata/Spark_Max 4 lat temu
  许家凯 8aeb70be1d phoenix batchsize 4 lat temu
  许家凯 7886470b9c Merge branch 'master' of http://139.224.213.4:3000/bigdata/Spark_Max 4 lat temu
  许家凯 dc96700725 公司基本信息 4 lat temu
  许家凯 84776f7d24 Merge branch 'master' of http://139.224.213.4:3000/bigdata/Spark_Max 4 lat temu
  许家凯 3bb5d4b7f9 add 4 lat temu
  许家凯 cf827cf81e 整理package 4 lat temu
  许家凯 883b17a97c Merge branch 'master' of http://139.224.213.4:3000/bigdata/Spark_Max 4 lat temu
  许家凯 bd41fa12b4 摘要写出到hbase,公司基本信息更新到ads hbase和es 4 lat temu
  许家凯 6f69375a53 添加环境切换 4 lat temu
  许家凯 455d676beb Merge branch 'master' of http://139.224.213.4:3000/bigdata/Spark_Max 4 lat temu
  许家凯 f188f2bb81 add 4 lat temu
  许家凯 aa86eb7b01 摘要 4 lat temu
  许家凯 6db783b08c 增量数据结合调度 4 lat temu
  xufei 95d3848b79 增量cids生成 4 lat temu
  yongnian b567fae904 fix 4 lat temu
  yongnian 146b9fcc49 增量单表(Type1)维度的复制方案 4 lat temu
  xufei 120168df92 增量数据更新 4 lat temu
  许家凯 044c2fef7a fix:jdbc phoenix 4 lat temu
  许家凯 e57fb79980 Merge branch 'master' of http://139.224.213.4:3000/bigdata/Spark_Max 4 lat temu
  许家凯 0fd6f59d2d add 4 lat temu
  xufei 7e888e1f26 Merge remote-tracking branch 'origin/master' 4 lat temu
  xufei 072d10626b 增加知识产权处理逻辑 4 lat temu
  许家凯 c001fb1a34 Merge branch 'master' of http://139.224.213.4:3000/bigdata/Spark_Max 4 lat temu
  许家凯 51612201b5 加入工具类,优化写出es 4 lat temu
  许家凯 6862604a91 写出索引到es 4 lat temu
  xufei 1cb7989763 Merge remote-tracking branch 'origin/master' 4 lat temu
  xufei 4a9c209180 新增模型字段 4 lat temu
  许家凯 acc7d7770a 加入隐式转换,写出phoenix两种方式。jdbc和in-memory 4 lat temu
  许家凯 e85368defd fix bugs 4 lat temu
  许家凯 c56a8ccbc4 Merge branch 'master' of http://139.224.213.4:3000/bigdata/Spark_Max 4 lat temu
  许家凯 863d0e998a odpsOps方式写入phoenix 4 lat temu
  yongnian 076e6ef15d 另一种spark读取phoenix方式绕开阿里云的bug 4 lat temu
  许家凯 58bf807021 add phoenix 4 lat temu
  许家凯 663dabba20 add phoenix test 4 lat temu
  许家凯 9d73ec20da 代码和依赖分离 4 lat temu
  许家凯 86c6c195df Merge branch 'master' of http://139.224.213.4:3000/bigdata/Spark_Max 4 lat temu
  许家凯 df5c5ab443 hbase 依赖 4 lat temu
  许家凯 0d5b27e6c1 修改groupid 精简项目 4 lat temu
  xufei c2603b0992 软件著作权 4 lat temu
  许家凯 1c365c5664 local模式下读写hbase 4 lat temu
  xufei 2ab43f0b0b 基本信息评分修改版 4 lat temu
  许家凯 3cfbd0efe5 Merge branch 'master' of http://139.224.213.4:3000/bigdata/Spark_Max 4 lat temu
  许家凯 de4438f583 公司名称映射关系计算 4 lat temu
  许家凯 d83bc175d2 更新mongodb 4 lat temu
  许家凯 da73f5b922 修复异常 4 lat temu
  许家凯 99acb8f87b a 4 lat temu
  许家凯 157bf5a727 fix bugs 4 lat temu
  许家凯 06d991e03c Merge branch 'master' of http://139.224.213.4:3000/bigdata/Spark_Max 4 lat temu
  许家凯 9b18b48c04 add 4 lat temu
  许家凯 6ea56f908e alter sparkUtils 4 lat temu
  许家凯 95e6684aa4 添加摘要计算 4 lat temu
  许家凯 b5c40d8963 add appName 4 lat temu
  许家凯 e6102fdbc6 alter name 4 lat temu
  xufei 0a9880c83f init commit 4 lat temu

+ 9 - 5
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamic.scala

@@ -274,13 +274,17 @@ object CompanyDynamic {
     } else {
       val ts = tableNames.split(",").toSet
 
-      startArgs.filter(e => {
-        ts.contains(e.tableName)
-      }).foreach(e => {
+    startArgs.filter(e => {
+      ts.contains(e.tableName)
+    }).foreach(e => {
+      if(e.tableName == "company_employment"){
+        val cdf = CompanyDynamicForDayCount(spark, project, ds)
+        cdf.calc(e.tableName, e.bName)
+      }else{
         cd.calc(e.tableName, e.bName)
-      })
-    }*/
+      }
 
+    })*/
     spark.stop()
   }
 }

+ 134 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamicForDayCount.scala

@@ -0,0 +1,134 @@
+package com.winhc.bigdata.spark.jobs.dynamic
+
+import java.util
+import java.util.Date
+
+import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamic.{env, targetTab}
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
+import com.winhc.bigdata.spark.utils.LoggingUtils
+import com.winhc.bigdata.spark.utils.ReflectUtils.getClazz
+import org.apache.commons.lang3.time.DateFormatUtils
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.types.StringType
+import org.apache.spark.sql.{Row, SparkSession}
+
+import scala.annotation.meta.getter
+import scala.collection.immutable.ListMap
+import scala.collection.mutable
+
+case class CompanyDynamicForDayCount(s: SparkSession,
+                                     project: String, //表所在工程名
+                                     ds: String //此维度主键
+
+                                    ) extends LoggingUtils with Logging {
+  @(transient@getter) val spark: SparkSession = s
+
+
+  def init(): Unit = {
+    sql(
+      s"""
+         |CREATE TABLE IF NOT EXISTS ${getEnvProjectName(env, project)}.$targetTab
+         |(
+         |    cid  STRING COMMENT '公司id'
+         |    ,cname  STRING COMMENT '公司name'
+         |    ,info_type STRING COMMENT '变更分类,大类'
+         |    ,rta_desc STRING COMMENT '变更信息描述,变更标题'
+         |    ,change_content STRING COMMENT '变更内容'
+         |    ,change_time STRING COMMENT '变更时间'
+         |    ,biz_id STRING COMMENT '业务id,数据行id'
+         |    ,sub_info_type STRING COMMENT '变更小类,表名'
+         |    ,info_risk_level STRING COMMENT '变更风险等级'
+         |    ,winhc_suggest STRING COMMENT '提示信息'
+         |    ,create_time STRING COMMENT '创建时间'
+         |)
+         |COMMENT '企业动态输出表'
+         |PARTITIONED BY (ds STRING COMMENT '分区',tn STRING COMMENT '表名')
+         |LIFECYCLE 30
+         |""".stripMargin)
+  }
+
+  //表名(不加前后辍)
+  def calc(tableName: String
+           , bName: Int = 0 //是否补充cname字段
+          ): Unit = {
+    val handle = getClazz[CompanyDynamicHandle](s"com.winhc.bigdata.spark.jobs.dynamic.tables.$tableName")
+
+    val types = handle.org_type()
+    val colsExclusiveSome = spark.table(s"${project}.ads_change_extract").columns.filter(s => {
+      !s.equals("cid") && !s.equals("data") && !s.equals("old_data") && !s.equals("ds") && !s.equals("tn")
+    }).seq
+    val rdd = sql(
+      bName match {
+        //默认:无需补全cname字段
+        case 0 =>
+          s"""
+             |SELECT  cid,count(1) AS cnt,biz_date,null AS cname
+             |FROM    ${project}.ads_change_extract
+             |WHERE   ds = '$ds'
+             |AND     tn = '$tableName'
+             |AND     TYPE in (${types.map("'" + _ + "'").mkString(",")})
+             |GROUP BY CID
+             |""".stripMargin
+        //需根据cid补全cname字段数据
+        case 1 =>
+          s"""
+             |SELECT A.cid as cid,count(1) AS cnt,B.cname AS cname
+             |FROM(
+             |  SELECT  *
+             |  FROM    ${project}.ads_change_extract
+             |  WHERE   ds = '$ds'
+             |  AND     tn = '$tableName'
+             |  AND     TYPE in (${types.map("'" + _ + "'").mkString(",")})
+             |) AS A
+             |LEFT JOIN (
+             |    SELECT cid,cname FROM  $project.base_company_mapping
+             |    WHERE ds = '${getLastPartitionsOrElse(project + ".base_company_mapping", "0")}'
+             |) AS B
+             |ON A.cid = B.cid
+             |GROUP BY A.CID,B.cname
+             |""".stripMargin
+      })
+      .rdd.flatMap(r => {
+      val cid = r.getAs[String]("cid")
+      val biz_date = ds.substring(0,4)+"-"+ds.substring(4,6)+"-"+ds.substring(6) +" 00:00:00"
+      val cnt = r.getAs[Long]("cnt")
+      val cname = r.getAs[String]("cname")
+      val new_map = Map("cnt" -> (cnt+""))
+
+      val result = handle.handle(cid+biz_date, biz_date, cid, null, null, new_map, cname)
+      if (result == null) {
+        None
+      }
+      else {
+        result.map(res => Row(CompanyDynamicHandleUtils.getDynamicId(cid, res._4, res._7, res._6), res._1, res._2, res._3, res._4, res._5, res._6, res._7, res._8, res._9, res._10, DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss")))
+      }
+    })
+
+    val schema = getSchema(ListMap(
+      "id" -> StringType
+      , "cid" -> StringType
+      , "cname" -> StringType
+      , "info_type" -> StringType
+      , "rta_desc" -> StringType
+      , "change_content" -> StringType
+      , "change_time" -> StringType
+      , "biz_id" -> StringType
+      , "sub_info_type" -> StringType
+      , "info_risk_level" -> StringType
+      , "winhc_suggest" -> StringType
+      , "create_time" -> StringType
+    ))
+    spark.createDataFrame(rdd, schema)
+      .createOrReplaceTempView("company_dynamic_tmp")
+
+    val cols = getColumns(s"$project.$targetTab").filter(!_.equals("ds")).filter(!_.equals("tn"))
+
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE ${getEnvProjectName(env, project)}.$targetTab PARTITION(ds='$ds',tn='$tableName')
+         |SELECT ${cols.mkString(",")}
+         |FROM
+         |    company_dynamic_tmp
+         |""".stripMargin)
+  }
+}

+ 4 - 2
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_employment.scala

@@ -15,7 +15,7 @@ case class company_employment() extends CompanyDynamicHandle {
     * @param new_map
     * @return
     */
-  override protected def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = new_map.getOrElse("name", null)
+  override protected def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = "新增"+new_map.getOrElse("cnt", "")+"条招聘信息"
 
 
   /**
@@ -34,5 +34,7 @@ case class company_employment() extends CompanyDynamicHandle {
     * @param new_map
     * @return
     */
-  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = new_map.toJson(Seq("city->地区", "description->招聘描述", "source->招聘来源", "employ_num->招聘人数", "start_date->职位发布日期", "ori_salary->薪资", "education->学历", "experience->工作经验", "title->职位"))
+//  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = new_map.toJson(Seq("city->地区", "description->招聘描述", "source->招聘来源", "employ_num->招聘人数", "start_date->职位发布日期", "ori_salary->薪资", "education->学历", "experience->工作经验", "title->职位"))
+  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = ""
+
 }