CompanyIncCompany2Es.scala 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250
  1. package com.winhc.bigdata.spark.jobs
  2. import com.winhc.bigdata.spark.config.{EsConfig, HBaseConfig}
  3. import com.winhc.bigdata.spark.const.BaseConst
  4. import com.winhc.bigdata.spark.udf.BaseFunc
  5. import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
  6. import org.apache.hadoop.hbase.client.Put
  7. import org.apache.hadoop.hbase.io.ImmutableBytesWritable
  8. import org.apache.hadoop.hbase.util.Bytes
  9. import org.apache.spark.sql.SparkSession
  10. import scala.annotation.meta.getter
  11. import scala.collection.mutable
  12. /**
  13. * @Author: XuJiakai
  14. * @Date: 2020/6/29 10:28
  15. * @Description: 增量公司基本信息数据写入到 ads 和 es
  16. */
  17. object CompanyIncCompany2Es {
  18. val outFields = Seq(
  19. "CID"
  20. , "BASE"
  21. , "NAME"
  22. , "NAME_EN"
  23. , "NAME_ALIAS"
  24. , "HISTORY_NAMES"
  25. , "LEGAL_ENTITY_ID"
  26. , "LEGAL_ENTITY_TYPE"
  27. , "REG_NUMBER"
  28. , "COMPANY_ORG_TYPE"
  29. , "REG_LOCATION"
  30. , "ESTIBLISH_TIME"
  31. , "FROM_TIME"
  32. , "TO_TIME"
  33. , "BUSINESS_SCOPE"
  34. , "REG_INSTITUTE"
  35. , "APPROVED_TIME"
  36. , "REG_STATUS"
  37. , "REG_CAPITAL"
  38. , "ORG_NUMBER"
  39. , "ORG_APPROVED_INSTITUTE"
  40. , "CURRENT_CID"
  41. , "PARENT_CID"
  42. , "COMPANY_TYPE"
  43. , "CREDIT_CODE"
  44. , "SCORE"
  45. , "CATEGORY_CODE"
  46. , "LAT"
  47. , "LNG"
  48. , "AREA_CODE"
  49. , "REG_CAPITAL_AMOUNT"
  50. , "REG_CAPITAL_CURRENCY"
  51. , "ACTUAL_CAPITAL_AMOUNT"
  52. , "ACTUAL_CAPITAL_CURRENCY"
  53. , "REG_STATUS_STD"
  54. , "SOCIAL_SECURITY_STAFF_NUM"
  55. , "CANCEL_DATE"
  56. , "CANCEL_REASON"
  57. , "REVOKE_DATE"
  58. , "REVOKE_REASON"
  59. , "EMAILS"
  60. , "PHONES"
  61. , "WECHAT_PUBLIC_NUM"
  62. , "LOGO"
  63. , "CRAWLED_TIME"
  64. , "CREATE_TIME"
  65. , "UPDATE_TIME"
  66. , "DELETED"
  67. )
  68. val outFields_Human = Seq(
  69. "NEW_CID"
  70. ,"CID"
  71. ,"ID"
  72. ,"COMPANY_NAME"
  73. ,"HUMAN_NAME"
  74. ,"HID"
  75. ,"HUMAN_PID"
  76. ,"STATUS"
  77. ,"CREATE_TIME"
  78. ,"UPDATE_TIME"
  79. ,"DELETED"
  80. )
  81. case class Company2Es(s: SparkSession, project: String, bizDate: String) extends LoggingUtils with BaseFunc {
  82. @(transient@getter) val spark: SparkSession = s
  83. def calc() {
  84. val code = code2Name()
  85. val partition = bizDate.replaceAll("\\-", "")
  86. if (partition.length != 8) {
  87. println("biz date is error!")
  88. sys.exit(-99)
  89. }
  90. val inc_ods_partitions = BaseUtil.getPartitions(s"${project}.inc_ods_company", spark)
  91. val end_partition = if (inc_ods_partitions.isEmpty) partition else inc_ods_partitions.last
  92. val inc_ads_partitions = BaseUtil.getPartitions(s"${project}.inc_ads_company", spark)
  93. val start_partition = if (inc_ads_partitions.isEmpty) '0' else inc_ads_partitions.last
  94. if (start_partition.equals(end_partition)) {
  95. println("start_partition == end_partition")
  96. sys.exit(-999)
  97. }
  98. val companyCols = spark.table(s"${project}.ads_company").columns
  99. .filter(!_.equals("ds"))
  100. .seq
  101. //读取数据
  102. // 去除数据本身重复
  103. val df = sql(
  104. s"""
  105. |SELECT ${companyCols.mkString(",")}
  106. |FROM (
  107. | SELECT a.*
  108. | ,row_number() OVER (PARTITION BY a.cid ORDER BY update_time DESC) c
  109. | FROM (
  110. | SELECT *
  111. | FROM $project.inc_ods_company
  112. | WHERE ds > $start_partition and ds <= $end_partition and cid is not null
  113. | ) as a
  114. | ) AS tmp
  115. |WHERE tmp.c = 1
  116. |""".stripMargin)
  117. df.cache().createOrReplaceTempView("tmp_company_inc")
  118. //写出到ads
  119. sql(
  120. s"""
  121. |INSERT ${if (BaseUtil.isWindows) "INTO" else "OVERWRITE"} TABLE ${project}.inc_ads_company PARTITION(ds='$end_partition')
  122. |SELECT ${companyCols.mkString(",")}
  123. |FROM
  124. | tmp_company_inc
  125. |""".stripMargin)
  126. //写出到hbase
  127. import org.apache.spark.sql.functions.col
  128. val jobConf = HBaseConfig.HBaseOutputJobConf("COMPANY")
  129. val stringDf = df.select(companyCols.map(column => col(column).cast("string")): _*)
  130. stringDf.rdd.map(row => {
  131. val id = row.getAs[String]("cid")
  132. val put = new Put(Bytes.toBytes(id))
  133. for (f <- outFields) {
  134. val v = row.getAs[String](f.toLowerCase)
  135. if (v != null) {
  136. put.addColumn(BaseConst.F_BYTES, Bytes.toBytes(f), Bytes.toBytes(v))
  137. }
  138. }
  139. (new ImmutableBytesWritable, put)
  140. }).filter(_ != null)
  141. .saveAsHadoopDataset(jobConf)
  142. //写出到es
  143. import com.winhc.bigdata.spark.implicits.CompanyIndexSave2EsHelper._
  144. stringDf.companyIndexSave2Es(code._1, code._2)
  145. }
  146. }
  147. case class Company_Human_Relation2HBase(s: SparkSession, project: String, bizDate: String) extends LoggingUtils {
  148. @(transient@getter) val spark: SparkSession = s
  149. def calc() {
  150. val partition = bizDate.replaceAll("\\-", "")
  151. if (partition.length != 8) {
  152. println("biz date is error!")
  153. sys.exit(-99)
  154. }
  155. val inc_ods_partitions = BaseUtil.getPartitions(s"${project}.inc_ods_company_human_relation", spark)
  156. val end_partition = if (inc_ods_partitions.isEmpty) partition else inc_ods_partitions.last
  157. val inc_ads_partitions = BaseUtil.getPartitions(s"${project}.inc_ads_company_human_relation", spark)
  158. val start_partition = if (inc_ads_partitions.isEmpty) '0' else inc_ads_partitions.last
  159. if (start_partition.equals(end_partition)) {
  160. println("start_partition == end_partition")
  161. sys.exit(-999)
  162. }
  163. val companyCols = spark.table(s"${project}.ads_company_human_relation").columns
  164. .filter(!_.equals("ds"))
  165. .seq
  166. //读取数据
  167. // 去除数据本身重复
  168. val df = sql(
  169. s"""
  170. |SELECT ${companyCols.mkString(",")}
  171. |FROM (
  172. | SELECT CONCAT_WS("_",new_cid,hid) AS rowkey,a.*
  173. | ,row_number() OVER (PARTITION BY a.cid,a.hid,a.human_pid ORDER BY update_time DESC) c
  174. | FROM (
  175. | SELECT *,cid as new_cid
  176. | FROM $project.inc_ods_company_human_relation
  177. | WHERE ds > $start_partition and ds <= $end_partition and cid is not null and hid is not null
  178. | ) as a
  179. | ) AS tmp
  180. |WHERE tmp.c = 1
  181. |""".stripMargin)
  182. df.cache().createOrReplaceTempView("tmp_company_human_relation_inc")
  183. //写出到ads
  184. sql(
  185. s"""
  186. |INSERT ${if (BaseUtil.isWindows) "INTO" else "OVERWRITE"} TABLE ${project}.inc_ads_company_human_relation PARTITION(ds='$end_partition')
  187. |SELECT ${companyCols.mkString(",")}
  188. |FROM
  189. | tmp_company_human_relation_inc
  190. |""".stripMargin)
  191. //写出到hbase
  192. import org.apache.spark.sql.functions.col
  193. val jobConf = HBaseConfig.HBaseOutputJobConf("COMPANY_HUMAN_RELATION")
  194. val stringDf = df.select(companyCols.map(column => col(column).cast("string")): _*)
  195. stringDf.rdd.map(row => {
  196. val id = row.getAs[String]("rowkey")
  197. val put = new Put(Bytes.toBytes(id))
  198. for (f <- outFields_Human) {
  199. val v = row.getAs[String](f.toLowerCase)
  200. if (v != null) {
  201. put.addColumn(BaseConst.F_BYTES, Bytes.toBytes(f), Bytes.toBytes(v))
  202. }
  203. }
  204. (new ImmutableBytesWritable, put)
  205. }).filter(_ != null)
  206. .saveAsHadoopDataset(jobConf)
  207. }
  208. }
  209. def main(args: Array[String]): Unit = {
  210. if (args.length != 2) {
  211. println("please enter project and bizDate!")
  212. sys.exit(-99)
  213. }
  214. val Array(project, bizDate) = args
  215. val config = EsConfig.getEsConfigMap ++ mutable.Map(
  216. "spark.hadoop.odps.project.name" -> project,
  217. "spark.hadoop.odps.spark.local.partition.amt" -> "10"
  218. )
  219. val spark = SparkUtils.InitEnv("company2Es", config)
  220. Company2Es(spark, project, bizDate).calc
  221. Company_Human_Relation2HBase(spark, project, bizDate).calc
  222. spark.stop()
  223. }
  224. }