Explorar el Código

Merge branch 'master' of http://139.224.213.4:3000/bigdata/Spark_Max

# Conflicts:
#	src/main/scala/com/winhc/bigdata/spark/utils/BaseUtil.scala
许家凯 hace 5 años
padre
commit
c001fb1a34

+ 6 - 0
src/main/java/com/winhc/bigdata/calc/DimScoreV2.java

@@ -287,6 +287,12 @@ public class DimScoreV2 {
         put("4","经营风险");
         put("5","法律风险");
 
+        put("基本情况","1");
+        put("经营情况","2");
+        put("资产权益","3");
+        put("经营风险","4");
+        put("法律风险","5");
+
         //小类code
         //基本情况
         put("注册资本","101");

+ 182 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyCourtAnnouncement.scala

@@ -0,0 +1,182 @@
+package com.winhc.bigdata.spark.jobs
+
+import java.util.Date
+
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
+import org.apache.spark.sql.{Row, SparkSession}
+import com.winhc.bigdata.calc.DimScoreV2
+import com.winhc.bigdata.spark.jobs.CompanyCourtAnnouncement.tabMapping
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+
+/**
+ * 法院公告,开庭公告,立案信息
+ */
+
+object CompanyCourtAnnouncement {
+
+  val tabMapping: Map[String, (String, String, String, String)] =
+    Map("ads_company_court_announcement_list" -> ("1", "publish_date", "法律风险", "法院公告"), //法院公告
+      "ads_company_court_open_announcement_list" -> ("2", "start_date", "法律风险", "开庭公告"), //开庭公告
+      "ads_company_court_register_list" -> ("3", "filing_date", "法律风险", "立案信息") //立案信息
+    )
+
+  def main(args: Array[String]): Unit = {
+
+    val (sourceTable, flag, time, kind, project) = valid(args)
+
+    var config = mutable.Map.empty[String, String]
+
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+
+    new CompanyCourtAnnouncement(spark, sourceTable, flag, time, kind, project).calc()
+    spark.stop()
+
+  }
+
+  def valid(args: Array[String]) = {
+    if (args.length != 1) {
+      println("请输入要计算的table!!!! ")
+      sys.exit(-1)
+    }
+    val sourceTable = args(0)
+
+    val (flag, time, kind, project) = tabMapping.getOrElse(sourceTable, ("", "", "", ""))
+    if (flag.isEmpty || time.isEmpty || kind.isEmpty || project.isEmpty) {
+      println("输入表不存在!!!   ")
+      sys.exit(-1)
+    }
+    (sourceTable, flag, time, kind, project)
+  }
+}
+
+case class CompanyCourtAnnouncement(s: SparkSession, sourceTable: String,
+                                    flag: String, time: String, kind: String, project: String
+                                   ) extends LoggingUtils {
+
+  @(transient@getter) val spark: SparkSession = s
+
+  import spark.implicits._
+  import spark._
+  import org.apache.spark.sql.functions._
+
+  def calc(): Unit = {
+    println(s"company ${this.getClass.getSimpleName} calc start! " + new Date().toString)
+
+    val df = sql(
+      s"""
+         |SELECT  *
+         |FROM    (
+         |            SELECT  *
+         |                    ,sum(CASE  WHEN party_role = 'y' THEN 1 ELSE 0 END) OVER(PARTITION BY new_cid) AS cnt1
+         |                    ,sum(CASE  WHEN party_role = 'n' THEN 1 ELSE 0 END) OVER(PARTITION BY new_cid) AS cnt2
+         |                    ,row_number() OVER(PARTITION BY new_cid ORDER BY $time DESC) AS num
+         |            FROM    $sourceTable
+         |            WHERE   ds = '${BaseUtil.getPartion(sourceTable, spark)}' and new_cid is not null
+         |                    and $time >= '${BaseUtil.atMonthsBefore(3)}'
+         |        ) a
+         |WHERE   num = 1
+         |""".stripMargin)
+
+    df.map(r => {
+      trans(r, flag, kind, project)
+    }).toDF("id", "cid", "name", "kind", "kind_code", "project", "project_code", "type",
+      "score", "total", "extraScore")
+      .createOrReplaceTempView(s"${sourceTable}_tmp_view")
+
+    logger.info(
+      s"""
+         |- - - - - - - - - - - - - - - - - - - - - - - - -
+         |${showString(sql(s"select * from ${sourceTable}_tmp_view"))}
+         |- - - - - - - - - - - - - - - - - - - - - - - - -
+       """.stripMargin)
+
+    sql(s"insert overwrite table ${sourceTable}_score  select * from ${sourceTable}_tmp_view")
+
+    println(s"company ${this.getClass.getSimpleName} calc end! " + new Date().toString)
+  }
+
+
+  def trans(r: Row, flag: String, kind: String, prpject: String) = {
+    val id = r.getAs[Long]("id")
+    val cid = r.getAs[Long]("new_cid").toString
+    val name = r.getAs[String]("new_cname")
+    val cnt1 = r.getAs[Long]("cnt1")
+    val cnt2 = r.getAs[Long]("cnt2")
+    flag match {
+      case "1" => getInfoAnnouncement(id, cid, name, cnt1, cnt2, kind, prpject)
+      case "2" => getInfoOpenAnnouncement(id, cid, name, cnt1, cnt2, kind, prpject)
+      case "3" => getInforegister(id, cid, name, cnt1, cnt2, kind, prpject)
+    }
+  }
+
+  //法院公告
+  def getInfoAnnouncement(id: Long, cid: String, name: String, cnt1: Long, cnt2: Long, kind: String, project: String) = {
+    var score = 0f
+    val total = 5f
+    val extraScore = 0f
+    var ty = ""
+    if (cnt1 == 0 && cnt2 == 0) {
+      score = 5f
+      ty = "近3个月无法院公告"
+    } else if (cnt1 > 0 && cnt2 == 0) {
+      score = 5f
+      ty = "近3个月有法院公告,均为公诉人/原告/上诉人/申请人"
+    } else if (cnt1 > cnt2) {
+      score = 3f
+      ty = "近3个月有法院公告,作为公诉人/原告/上诉人/申请人的数量大于作为被告人/被告/被上诉人/被申请人的数量"
+    } else if (cnt1 <= cnt2) {
+      score = 0f
+      ty = "近3个月有法院公告,作为被告人/被告/被上诉人/被申请人的数量大于作为公诉人/原告/上诉人/申请人的数量"
+    }
+    (id, cid, name, kind, DimScoreV2.newsEventMap.get(kind), project, DimScoreV2.newsEventMap.get(project), ty,
+      score, total, extraScore)
+  }
+
+  //开庭公告
+  def getInfoOpenAnnouncement(id: Long, cid: String, name: String, cnt1: Long, cnt2: Long, kind: String, project: String) = {
+    var score = 0f
+    val total = 5f
+    val extraScore = 0f
+    var ty = ""
+    if (cnt1 == 0 && cnt2 == 0) {
+      score = 5f
+      ty = "近3个月无开庭公告"
+    } else if (cnt1 > 0 && cnt2 == 0) {
+      score = 5f
+      ty = "近3个月有开庭公告,均为原告"
+    } else if (cnt1 > cnt2) {
+      score = 3f
+      ty = "近3个月有开庭公告,作为原告的数量大于作为被告的数量"
+    } else if (cnt1 <= cnt2) {
+      score = 0f
+      ty = "近3个月有开庭公告,作为被告的数量大于作为原告的数量"
+    }
+    (id, cid, name, kind, DimScoreV2.newsEventMap.get(kind), project, DimScoreV2.newsEventMap.get(project), ty,
+      score, total, extraScore)
+  }
+
+  //立案信息
+  def getInforegister(id: Long, cid: String, name: String, cnt1: Long, cnt2: Long, kind: String, project: String) = {
+    var score = 0f
+    val total = 5f
+    val extraScore = 0f
+    var ty = ""
+    if (cnt1 == 0 && cnt2 == 0) {
+      score = 5f
+      ty = "近3个月无立案信息"
+    } else if (cnt1 > 0 && cnt2 == 0) {
+      score = 5f
+      ty = "近3个月有立案信息,均为公诉人/原告/上诉人/申请人"
+    } else if (cnt1 > cnt2) {
+      score = 3f
+      ty = "近3个月有立案信息,作为公诉人/原告/上诉人/申请人的数量大于作为被告人/被告/被上诉人/被申请人的数量"
+    } else if (cnt1 <= cnt2) {
+      score = 0f
+      ty = "近3个月有立案信息,作为被告人/被告/被上诉人/被申请人的数量大于作为公诉人/原告/上诉人/申请人的数量"
+    }
+    (id, cid, name, kind, DimScoreV2.newsEventMap.get(kind), project, DimScoreV2.newsEventMap.get(project), ty,
+      score, total, extraScore)
+  }
+}

+ 17 - 0
src/main/scala/com/winhc/bigdata/spark/utils/BaseUtil.scala

@@ -1,6 +1,9 @@
 package com.winhc.bigdata.spark.utils
 
 import scala.collection.mutable
+import java.util.{Calendar, Date, Locale}
+import org.apache.commons.lang3.time.DateFormatUtils
+import org.apache.spark.sql.SparkSession
 
 /**
  * @Author: XuJiakai
@@ -46,4 +49,18 @@ object BaseUtil {
       "spark.executor.memory" -> memory
     )
   }
+  def getPartion(t: String, @transient spark: SparkSession) = {
+    import spark.implicits._
+    import spark._
+    val sql_s = s"show partitions " + t
+    sql(sql_s).collect.toList.map(_.getString(0).split("=")(1)).last
+  }
+
+  def atMonthsBefore(n: Int, pattern: String = "yyyy-MM-dd"): String = {
+    val c = Calendar.getInstance(Locale.CHINA)
+    c.setTimeInMillis(new Date().getTime)
+    c.add(Calendar.MONTH, -1 * n)
+    DateFormatUtils.format(c.getTime.getTime, pattern)
+  }
+
 }

+ 59 - 0
src/main/scala/com/winhc/bigdata/spark/utils/HbaseUtil.scala

@@ -0,0 +1,59 @@
+package cn.oyohotels.utils
+
+import java.util
+import com.alibaba.dcm.DnsCacheManipulator
+import org.apache.hadoop.hbase.client.Delete
+import org.apache.hadoop.hbase._
+import org.apache.hadoop.hbase.client._
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+import org.apache.hadoop.hbase.mapreduce.{TableInputFormat, TableMapReduceUtil, TableOutputFormat}
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.hadoop.mapreduce.Job
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
+import org.slf4j.LoggerFactory
+
+
+object HbaseUtil {
+  final val FAMILY_NAME = "F"
+  final val RT_NS = "default"
+  private val logger = LoggerFactory.getLogger(this.getClass)
+
+  val lrs = HbaseUtil.getClass.getResource("/").getPath
+
+  lazy val conf = {
+    val myConf = HBaseConfiguration.create()
+    myConf.set("hbase.zookeeper.property.clientPort", "2181")
+    myConf.set("hbase.zookeeper.quorum", "hb-proxy-pub-uf6as8i6h85k02092-001.hbase.rds.aliyuncs.com")
+    DnsCacheManipulator.setDnsCache("hb-uf6as8i6h85k02092-001.hbase.rds.aliyuncs.com", "47.101.251.157")
+    myConf
+  }
+
+  lazy val conn = {
+    //本地开启
+    ConnectionFactory.createConnection(conf)
+  }
+
+  def getTable(tbName: String, ns: String = RT_NS) = {
+    conn.getTable(TableName.valueOf(ns, tbName))
+  }
+
+  def getRowData(tb: Table, rowkey: String, family: String = FAMILY_NAME) = {
+    var m = Map.empty[String, String]
+    try {
+
+      val g = new Get(Bytes.toBytes(rowkey))
+      // val g = new Get(rowkey.getBytes)
+      val r = tb.get(g).rawCells()
+      m = r.map(c1 => (Bytes.toString(CellUtil.cloneQualifier(c1)), Bytes.toString(CellUtil.cloneValue(c1)))).toMap
+    } catch {
+      case e: Throwable => e.printStackTrace()
+    }
+    m
+  }
+
+  def main(args: Array[String]): Unit = {
+    val row = getRowData(getTable("COMPANY_SCORE_V3"), "100008680_101")
+    println(row)
+  }
+}

+ 112 - 0
src/main/scala/com/winhc/bigdata/spark/utils/LoggingUtils.scala

@@ -0,0 +1,112 @@
+package com.winhc.bigdata.spark.utils
+
+import java.io.PrintWriter
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.log4j.Logger
+import org.apache.spark.sql.{DataFrame, SparkSession}
+
+import scala.annotation.meta.getter
+
+/**
+ * π
+ */
+trait LoggingUtils {
+  protected var sqlNo = 1
+
+  @transient protected[this] val logger: Logger = Logger.getLogger(this.getClass)
+
+  @(transient@getter) protected val spark: SparkSession
+
+  def sql(sqlString: String): DataFrame = {
+    logger.info(
+      s"""
+         |- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+         |Job[${this.getClass.getSimpleName}].SQL[No$sqlNo.]
+         |
+         |$sqlString
+         |- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+       """.stripMargin
+    )
+    sqlNo += 1
+    spark.sql(sqlString)
+  }
+
+  def showString(dataset: DataFrame, _numRows: Int = 20, truncate: Int = -1): String = {
+    val numRows = _numRows.max(0)
+    val takeResult = dataset.take(numRows + 1)
+    val hasMoreData = takeResult.length > numRows
+    val data = takeResult.take(numRows)
+
+    // For array values, replace Seq and Array with square brackets
+    // For cells that are beyond `truncate` characters, replace it with the
+    // first `truncate-3` and "..."
+    val rows: Seq[Seq[String]] = dataset.schema.fieldNames.toSeq +: data.map { row =>
+      row.toSeq.map { cell =>
+        val str = cell match {
+          case null => "null"
+          case binary: Array[Byte] => binary.map("%02X".format(_)).mkString("[", " ", "]")
+          case array: Array[_] => array.mkString("[", ", ", "]")
+          case seq: Seq[_] => seq.mkString("[", ", ", "]")
+          case _ => cell.toString
+        }
+        if (truncate > 0 && str.length > truncate) {
+          // do not show ellipses for strings shorter than 4 characters.
+          if (truncate < 4) str.substring(0, truncate)
+          else str.substring(0, truncate - 3) + "..."
+        } else {
+          str
+        }
+      }: Seq[String]
+    }
+
+    val sb = new StringBuilder
+    val numCols = dataset.schema.fieldNames.length
+
+    // Initialise the width of each column to a minimum value of '3'
+    val colWidths = Array.fill(numCols)(3)
+
+    // Compute the width of each column
+    for (row <- rows) {
+      for ((cell, i) <- row.zipWithIndex) {
+        colWidths(i) = math.max(colWidths(i), cell.length)
+      }
+    }
+
+    // Create SeparateLine
+    val sep: String = colWidths.map("-" * _).addString(sb, "+", "+", "+\n").toString()
+
+    // column names
+    rows.head.zipWithIndex.map { case (cell, i) =>
+      if (truncate > 0) {
+        StringUtils.leftPad(cell, colWidths(i))
+      } else {
+        StringUtils.rightPad(cell, colWidths(i))
+      }
+    }.addString(sb, "|", "|", "|\n")
+
+    sb.append(sep)
+
+    // data
+    rows.tail.map {
+      _.zipWithIndex.map { case (cell, i) =>
+        if (truncate > 0) {
+          StringUtils.leftPad(cell.toString, colWidths(i))
+        } else {
+          StringUtils.rightPad(cell.toString, colWidths(i))
+        }
+      }.addString(sb, "|", "|", "|\n")
+    }
+
+    sb.append(sep)
+
+    // For Data that has more than "numRows" records
+    if (hasMoreData) {
+      val rowsString = if (numRows == 1) "row" else "rows"
+      sb.append(s"only showing top $numRows $rowsString\n")
+    }
+
+    sb.toString()
+  }
+
+}