Forráskód Böngészése

Merge remote-tracking branch 'origin/master'

许家凯 4 éve
szülő
commit
4f57a575db

+ 9 - 5
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyCourtAnnouncement.scala

@@ -63,8 +63,13 @@ case class CompanyCourtAnnouncement(s: SparkSession, project: String, //表所
     }
     val ads_eci_debtor_relation = s"${project}.ads_eci_debtor_relation" //债权全量表
     val debtorRelationDs = getPartion(ads_eci_debtor_relation, spark)
-    val ads_address = s"${project}.inc_ads_${tableName}_address" //增量地址表
-    val ads_yg_bg = s"${project}.inc_ads_${tableName}_bg_yg" //增量原被告-原告表
+
+    //结果表导入生产表
+//    val ads_address = s"${project}.inc_ads_${tableName}_address" //增量地址表
+//    val ads_yg_bg = s"${project}.inc_ads_${tableName}_bg_yg" //增量原被告-原告表
+
+    val ads_address = s"winhc_eci.inc_ads_${tableName}_address" //增量地址表
+    val ads_yg_bg = s"winhc_eci.inc_ads_${tableName}_bg_yg" //增量原被告-原告表
 
     //被告
     val df = sql(
@@ -458,7 +463,6 @@ case class CompanyCourtAnnouncement(s: SparkSession, project: String, //表所
   }
 
 
-
 }
 
 object CompanyCourtAnnouncement {
@@ -499,7 +503,7 @@ object CompanyCourtAnnouncement {
     if (!runOld) {
       val flag = announcement.preCalc()
       //增量没更新返回
-      if(!flag) return
+      if (!flag) return
     }
     announcement.calc(runOld)
     spark.stop()
@@ -507,7 +511,7 @@ object CompanyCourtAnnouncement {
 
 }
 
-object EsQuery{
+object EsQuery {
   def queryCompany(restClient: RestClient, companyName: String) = {
     val query =
       s"""

+ 324 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyWenshuYgYishen.scala

@@ -0,0 +1,324 @@
+package com.winhc.bigdata.spark.jobs
+
+import java.sql.Timestamp
+import java.util
+import java.util.Collections
+
+import com.alibaba.fastjson.JSON
+import com.winhc.bigdata.spark.udf.{BaseFunc, CompanyMapping, JsonSerializable}
+import com.winhc.bigdata.spark.utils.BaseUtil._
+import com.winhc.bigdata.spark.utils.EsRestUtils.getRestClient
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
+import org.apache.commons.lang3.StringUtils
+import org.apache.http.entity.ContentType
+import org.apache.http.nio.entity.NStringEntity
+import org.apache.http.util.EntityUtils
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.{Row, SparkSession}
+import org.elasticsearch.client.RestClient
+import org.json4s.jackson.Json
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+
+/**
+ * @Description: 裁判文书
+ * @author lyb
+ * @date
+ */
+
+case class YgYishenWenshuJson(
+                       yg_name: String,
+                       bg_name: String,
+                       case_no: String,
+                       case_amt : String,
+                       case_stage : String,
+                       is_success : String,
+                       judge_date: String
+                            ) extends JsonSerializable
+
+object YgYishenWenshuJson {
+  def apply(r: Row, cols: Seq[String]) = {
+    var res: Map[String, String] = cols.map(c => {
+      (c, r.getAs[String](c))
+    }).toMap
+    var newmap = Map("tag" ->"一审胜诉")  ++ res
+    newmap
+  }
+
+}
+
+case class CompanyWenshuYishen(s: SparkSession, project: String, //表所在工程名
+                         tableName: String //表名(不加前后辍)
+                         //detailCols: Seq[String] // 详情字段
+                                   ) extends LoggingUtils with CompanyMapping with Logging with BaseFunc {
+
+  @(transient@getter) val spark: SparkSession = s
+
+  def calc(runOld: Boolean = false) = {
+    import spark.implicits._
+
+    val ods_wenshu_detail = s"${project}.ods_wenshu_detail" //增量ods表
+
+    var adsListDs = getPartion(ods_wenshu_detail, spark)
+    //跑存量取第一个分区
+    if (runOld) {
+      adsListDs = getFirstPartion(ods_wenshu_detail, spark)
+    }
+    val ads_case_chance_address = s"${project}.inc_ads_case_chance_wenshu_yg_yishen_shengsu_address" //增量地址表
+    val ads_case_chance = s"${project}.inc_ads_case_chance_wenshu_yg_yishen_shengsu" //增量原被告-原告表
+    val address_type = "0"  // 债权人 0;  债务人 1
+    //原告一审胜诉
+//    val df = sql(
+//      s"""
+//         |SELECT  *
+//         |FROM    (
+//         |            SELECT  *
+//         |                    ,ROW_NUMBER() OVER (PARTITION BY litigant_name ORDER BY publish_date DESC ) num
+//         |                    ,md5(CLEANUP(concat_ws('',yg_name,bg_name,case_no,judge_date,court_name))) AS rowkey_business
+//         |            FROM    $inc_ads_company_tb_list a
+//         |            WHERE   ds = $adsListDs
+//         |            AND     LENGTH(litigant_name) > 4
+//         |            AND     is_success = '胜'
+//         |            AND     judge_date >= '${atMonthsBefore(1)} 00:00:00'
+//         |            AND     case_stage = '一审'
+//         |            AND     case_type = '民事案件'
+//         |        ) b
+//         |WHERE   num = 1
+//         |""".stripMargin)
+
+
+    var df = sql(
+     s"""
+       |    SELECT
+       |    md5(CLEANUP(concat_ws('',cname,bg_name,case_no,court_name,judge_date))) AS rowkey
+       |    ,case_id AS biz_id
+       |    ,cname
+       |    ,yg_name
+       |    ,bg_name
+       |    ,case_reason
+       |    ,case_stage
+       |    ,concat(case_amt, '万元') as case_amt
+       |    ,case_no
+       |    ,date_format(judge_date, 'yyyy-MM-dd') as judge_date
+       |    ,title
+       |            ,ROW_NUMBER() OVER(PARTITION BY cname,bg_name,case_no,court_name,judge_date ORDER BY judge_date) RN
+       |    ,court_name
+       |    ,judge_date AS biz_date
+       |    ,url AS link_url
+       |    ,is_success
+       |    ,judge_result
+       |    FROM    ods_wenshu_detail
+       |      LATERAL VIEW explode(split(yg_name, '\n')) tmpTable AS cname
+       |    WHERE   ds = $adsListDs
+       |    AND     judge_date >= '${atMonthsBefore(1)}'
+       |    AND     is_success = '胜'
+       |    AND     case_stage = '一审'
+       |    AND     case_type = '民事案件'
+       |    AND     length(cname) > 4
+        |""".stripMargin
+    )
+
+
+    println("dfnum:"+df.count())
+
+    df.mapPartitions(iter => {
+      trans(iter)
+      //restClient.close()
+    }).filter(_ != null)
+      .toDF("rowkey", "title", "cname", "cid", "yg_name", "bg_name",  "label", "business_id",
+        "business_type", "business_type_name", "dynamic_content", "biz_date", "create_time", "province_code", "city_code", "county_code")
+      .createOrReplaceTempView("t1")
+
+    //案源机会表
+    sql(
+      s"""
+         |insert ${if (isWindows) "INTO" else "OVERWRITE"} table $ads_case_chance  partition (ds=$adsListDs)
+         |select
+         |rowkey,title, yg_name, bg_name, cname,cid,label,business_id,business_type,business_type_name,dynamic_content,
+         |biz_date,create_time
+         |from t1
+         |""".stripMargin)
+
+    //债务人要素表
+    sql(
+      s"""
+         |insert ${if (isWindows) "INTO" else "OVERWRITE"} table $ads_case_chance_address  partition (ds=$adsListDs)
+         |select
+         |md5(concat_ws('',rowkey, business_type, $address_type, province_code,city_code)) as id,
+         |rowkey, business_type, $address_type, province_code,city_code,county_code,biz_date,create_time
+         |from t1
+         |where trim(province_code) <> ''
+         |""".stripMargin)
+
+
+  }
+
+
+
+
+
+
+  def trans(iter: Iterator[Row]) = {
+    val restClient = getRestClient()
+    val df = iter.map(r => {
+      try {
+        import org.json4s.DefaultFormats
+        val rowkey_business = r.getAs[String]("rowkey") //案源机会主键
+        val title = r.getAs[String]("title") //标题
+        val cname = r.getAs[String]("cname") //企业名称
+        val yg_name = r.getAs[String]("yg_name") //原告
+
+        val bg_name = r.getAs[String]("bg_name") //被告企业
+        val label: String = Json(DefaultFormats).write(YgYishenWenshuJson(r, Seq("case_amt", "judge_date"))) //标签列表
+        val business_id = r.getAs[Long]("biz_id") //业务主键id
+        val business_type = "5" //动态类型
+        val business_type_name = "0" //动态类型name
+        val m1: Map[String, String] = queryCompany(restClient, cname)
+        //动态变更内容
+        val m2: Map[String, String] = YgYishenWenshuJson(r, Seq(
+          "case_reason", "case_no", "court_name", "judge_date", "link_url", "case_stage", "judge_result"))
+        val dynamic_content = Json(DefaultFormats).write(m1 ++: m2)
+        val biz_date = r.getAs[Timestamp]("biz_date") //动态变更时间
+        val create_time = atMonthsBefore(0, "yyyy-MM-dd HH:mm:ss") //创建时间
+        val province_code = m1.getOrElse("province_code", "") //省code
+        val city_code = m1.getOrElse("city_code", "") //市code
+        val county_code = m1.getOrElse("county_code", "") //区code
+        val cid = m1.getOrElse("id", "") //企业cid
+        (rowkey_business, title, cname, cid, yg_name, bg_name, label, business_id, business_type,
+          business_type_name, dynamic_content, biz_date, create_time, province_code, city_code, county_code)
+      } catch {
+        case e: Exception => {
+          logWarning(r.toString())
+          logError(e.getMessage, e)
+          null
+        }
+      }
+    })
+    df
+  }
+
+
+
+  def regfun() = {
+    prepareFunctions(spark)
+    company_split()
+  }
+
+
+  def queryCompany(restClient: RestClient, companyName: String) = {
+    val query =
+      s"""
+         |{
+         |  "_source": {
+         |     "includes": [ "_id","province_code", "city_code","county_code","reg_capital","estiblish_time","phones"]
+         |   },
+         |  "query": {
+         |    "term": {
+         |      "cname.value.keyword": "${BaseUtil.cleanup(companyName)}"
+         |    }
+         |  },
+         |  "sort": [
+         |    {
+         |      "company_type": {
+         |        "order": "asc"
+         |      }
+         |    }
+         |  ]
+         |}
+         |""".stripMargin
+    val entity = new NStringEntity(query, ContentType.APPLICATION_JSON)
+
+    val indexResponse = restClient.performRequest(
+      "GET",
+      "/winhc-company/company/_search",
+      Collections.emptyMap[String, String](),
+      entity)
+    val en = indexResponse.getEntity
+    val res = EntityUtils.toString(en)
+    import scala.collection.JavaConverters._
+    val list = getIndexResult2(res)
+    if (list.nonEmpty) {
+      val id = list.head("_id").asInstanceOf[String]
+      val source: util.Map[String, Any] = list.head("_source").asInstanceOf[util.Map[String, Any]]
+      val province_code = source.get("province_code").asInstanceOf[String]
+      val city_code = source.get("city_code").asInstanceOf[String]
+      val county_code = source.get("county_code").asInstanceOf[String]
+      val reg_capital = source.get("reg_capital").asInstanceOf[String]
+      val estiblish_time = source.get("estiblish_time").asInstanceOf[String]
+      val phones = source.get("phones").asInstanceOf[util.List[String]].asScala.mkString(",")
+
+      Map(
+        "id" -> id,
+        "province_code" -> province_code,
+        "city_code" -> city_code,
+        "county_code" -> county_code,
+        "reg_capital" -> reg_capital,
+        "estiblish_time" -> estiblish_time,
+        "phones" -> phones
+      )
+    } else {
+      Map.empty[String, String]
+    }
+  }
+
+  def getIndexResult2(json: String) = {
+    import scala.collection.JavaConverters._
+    JSON.parseObject(json).getJSONObject("hits").getJSONArray("hits").toArray().map(m => m.asInstanceOf[util.Map[String, Any]]).map(_.asScala).toList
+  }
+
+}
+
+object CompanyWenshuYgYishen {
+  def main(args: Array[String]): Unit = {
+    var project = "winhc_eci_dev"
+    var table = ""
+    var runOld = false
+
+    if (args.length == 2) {
+      val Array(project1, table1) = args
+      project = project1
+      table = table1
+    } else if (args.length == 3) {
+      val Array(project1, table1, remain) = args
+      project = project1
+      table = table1
+      if (remain.equals("1"))
+        runOld = true
+    } else {
+      println("please set project,table...")
+      sys.exit(-1)
+    }
+
+    println(
+      s"""
+         |project: $project| table: $table| runOld: $runOld
+         |""".stripMargin)
+
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "100"
+    )
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+
+    val companyWenshu = CompanyWenshuYishen(spark, project, table)
+    companyWenshu.regfun()
+    //是否跑全量数据
+    if (!runOld) {
+//      val flag = announcement.preCalc()
+      //增量没更新返回
+//      if(!flag) return
+    }
+    companyWenshu.calc(runOld)
+    spark.stop()
+
+
+  }
+
+
+
+}
+
+
+

+ 313 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyWenshuYgZhongben.scala

@@ -0,0 +1,313 @@
+package com.winhc.bigdata.spark.jobs
+
+import java.sql.Timestamp
+import java.util
+import java.util.Collections
+
+import com.alibaba.fastjson.JSON
+import com.winhc.bigdata.spark.udf.{BaseFunc, CompanyMapping, JsonSerializable}
+import com.winhc.bigdata.spark.utils.BaseUtil._
+import com.winhc.bigdata.spark.utils.EsRestUtils.getRestClient
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
+import org.apache.http.entity.ContentType
+import org.apache.http.nio.entity.NStringEntity
+import org.apache.http.util.EntityUtils
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{Row, SparkSession}
+import org.elasticsearch.client.RestClient
+import org.json4s.jackson.Json
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+
+/**
+ * @Description: 案源机会_裁判文书原告终本案件
+ * @author lyb
+ * @date
+ */
+
+case class WenshuJson(
+                       yg_name: String,
+                       bg_name: String,
+                       case_no: String,
+                       case_amt : String,
+                       case_stage : String,
+                       is_success : String,
+                       judge_date: String
+                            ) extends JsonSerializable
+
+object WenshuJson {
+  def apply(r: Row, cols: Seq[String]) = {
+    var res: Map[String, String] = cols.map(c => {
+      (c, r.getAs[String](c))
+    }).toMap
+    var newmap = Map("tag" ->"终结本次执行")  ++ res
+    newmap
+  }
+
+}
+
+case class CompanyWenshu(s: SparkSession, project: String, //表所在工程名
+                         tableName: String //表名(不加前后辍)
+                         //detailCols: Seq[String] // 详情字段
+                                   ) extends LoggingUtils with CompanyMapping with Logging with BaseFunc {
+
+  @(transient@getter) val spark: SparkSession = s
+
+  def calc(runOld: Boolean = false) = {
+    import spark.implicits._
+
+    val ods_wenshu_detail = s"${project}.ods_wenshu_detail" //增量ods表
+
+    var adsListDs = getPartion(ods_wenshu_detail, spark)
+    //跑存量取第一个分区
+    if (runOld) {
+      adsListDs = getFirstPartion(ods_wenshu_detail, spark)
+    }
+    val ads_case_chance_address = s"${project}.inc_ads_case_chance_wenshu_yg_zhongben_address" //增量地址表
+    val ads_case_chance = s"${project}.inc_ads_case_chance_wenshu_yg_zhongben" //增量原被告-原告表
+    val address_type = "0"  // 债权人 0;  债务人 1
+    //原告一审胜诉
+//    val df = sql(
+//      s"""
+//         |SELECT  *
+//         |FROM    (
+//         |            SELECT  *
+//         |                    ,ROW_NUMBER() OVER (PARTITION BY litigant_name ORDER BY publish_date DESC ) num
+//         |                    ,md5(CLEANUP(concat_ws('',yg_name,bg_name,case_no,judge_date,court_name))) AS rowkey_business
+//         |            FROM    $inc_ads_company_tb_list a
+//         |            WHERE   ds = $adsListDs
+//         |            AND     LENGTH(litigant_name) > 4
+//         |            AND     is_success = '胜'
+//         |            AND     judge_date >= '${atMonthsBefore(1)}'
+//         |            AND     case_stage = '一审'
+//         |            AND     case_type = '执行案件'
+//         |        ) b
+//         |WHERE   num = 1
+//         |""".stripMargin)
+
+
+    var df = sql(
+     s"""
+       |    SELECT
+       |    md5(CLEANUP(concat_ws('',cname,bg_name,case_no,court_name,judge_date))) AS rowkey
+       |    ,case_id AS biz_id
+       |    ,cname
+       |    ,yg_name
+       |    ,bg_name
+       |    ,case_reason
+       |    ,case_stage
+       |    ,concat(case_amt, '万元') as case_amt
+       |    ,case_no
+       |    ,date_format(judge_date, 'yyyy-MM-dd') as judge_date
+       |    ,title
+       |            ,ROW_NUMBER() OVER(PARTITION BY cname,bg_name,case_no,court_name,judge_date ORDER BY judge_date) RN
+       |    ,court_name
+       |    ,judge_date AS biz_date
+       |    ,url AS link_url
+       |    ,is_success
+       |    FROM    $ods_wenshu_detail
+       |      LATERAL VIEW explode(split(yg_name, '\n')) tmpTable AS cname
+       |    WHERE   ds = $adsListDs
+       |    AND     judge_date >= '${atMonthsBefore(1)} 00:00:00'
+       |    AND     judge_result like '%终结本次执行程序%'
+       |    AND     length(cname) > 4
+        |""".stripMargin
+    )
+
+    println("dfnum:"+df.count())
+
+    df.mapPartitions(iter => {
+      trans(iter)
+      //restClient.close()
+    }).filter(_ != null)
+      .toDF("rowkey", "title", "cname", "cid", "yg_name", "bg_name",  "label", "business_id",
+        "business_type", "business_type_name", "dynamic_content", "biz_date", "create_time", "province_code", "city_code", "county_code")
+      .createOrReplaceTempView("tmp_ads_chance_zhongben")
+
+    //案源机会表
+    sql(
+      s"""
+         |insert ${if (isWindows) "INTO" else "OVERWRITE"} table $ads_case_chance  partition (ds=$adsListDs)
+         |select
+         |rowkey,title,yg_name, bg_name, cname,cid,label,business_id,business_type,business_type_name,dynamic_content,
+         |biz_date,create_time
+         |from tmp_ads_chance_zhongben
+         |""".stripMargin)
+
+    //债务人要素表
+    sql(
+      s"""
+         |insert ${if (isWindows) "INTO" else "OVERWRITE"} table $ads_case_chance_address  partition (ds=$adsListDs)
+         |select
+         |md5(concat_ws('',rowkey, business_type, $address_type, province_code,city_code)) as id,
+         |rowkey, business_type, $address_type, province_code,city_code,county_code,biz_date,create_time
+         |from tmp_ads_chance_zhongben
+         |where trim(province_code) <> ''
+         |""".stripMargin)
+
+
+  }
+
+  def trans(iter: Iterator[Row]) = {
+    val restClient = getRestClient()
+    val df = iter.map(r => {
+//      try {
+        import org.json4s.DefaultFormats
+        val rowkey_business = r.getAs[String]("rowkey") //案源机会主键
+        val title = r.getAs[String]("title") //标题
+        val cname = r.getAs[String]("cname") //企业名称
+        val yg_name = r.getAs[String]("yg_name") //原告
+
+        val bg_name = r.getAs[String]("bg_name") //被告企业
+        val label: String = Json(DefaultFormats).write(WenshuJson(r, Seq("case_amt", "judge_date" ))) //标签列表
+        val business_id = r.getAs[Long]("biz_id") //业务主键id
+        val business_type = "6" //动态类型
+        val business_type_name = "0" //动态类型name
+        val m1: Map[String, String] = queryCompany(restClient, cname)
+        //动态变更内容
+        val m2: Map[String, String] = WenshuJson(r, Seq("case_reason", "case_no", "court_name", "judge_date", "link_url", "case_stage", "is_success"))
+        val dynamic_content = Json(DefaultFormats).write(m1 ++: m2)
+        val biz_date = r.getAs[Timestamp]("biz_date") //动态变更时间
+        val create_time = atMonthsBefore(0, "yyyy-MM-dd HH:mm:ss") //创建时间
+        val province_code = m1.getOrElse("province_code", "") //省code
+        val city_code = m1.getOrElse("city_code", "") //市code
+        val county_code = m1.getOrElse("county_code", "") //区code
+        val cid = m1.getOrElse("id", "") //企业cid
+        (rowkey_business, title, cname, cid, yg_name, bg_name, label, business_id, business_type,
+          business_type_name, dynamic_content, biz_date, create_time, province_code, city_code, county_code)
+//      } catch {
+//        case e: Exception => {
+//          logWarning(r.toString())
+//          logError(e.getMessage, e)
+//          null
+//        }
+//      }
+    })
+    df
+  }
+
+
+
+  def regfun() = {
+    prepareFunctions(spark)
+    company_split()
+  }
+
+
+  def queryCompany(restClient: RestClient, companyName: String) = {
+    val query =
+      s"""
+         |{
+         |  "_source": {
+         |     "includes": [ "_id","province_code", "city_code","county_code","reg_capital","estiblish_time","phones"]
+         |   },
+         |  "query": {
+         |    "term": {
+         |      "cname.value.keyword": "${BaseUtil.cleanup(companyName)}"
+         |    }
+         |  },
+         |  "sort": [
+         |    {
+         |      "company_type": {
+         |        "order": "asc"
+         |      }
+         |    }
+         |  ]
+         |}
+         |""".stripMargin
+    val entity = new NStringEntity(query, ContentType.APPLICATION_JSON)
+
+    val indexResponse = restClient.performRequest(
+      "GET",
+      "/winhc-company/company/_search",
+      Collections.emptyMap[String, String](),
+      entity)
+    val en = indexResponse.getEntity
+    val res = EntityUtils.toString(en)
+    import scala.collection.JavaConverters._
+    val list = getIndexResult2(res)
+    if (list.nonEmpty) {
+      val id = list.head("_id").asInstanceOf[String]
+      val source: util.Map[String, Any] = list.head("_source").asInstanceOf[util.Map[String, Any]]
+      val province_code = source.get("province_code").asInstanceOf[String]
+      val city_code = source.get("city_code").asInstanceOf[String]
+      val county_code = source.get("county_code").asInstanceOf[String]
+      val reg_capital = source.get("reg_capital").asInstanceOf[String]
+      val estiblish_time = source.get("estiblish_time").asInstanceOf[String]
+      val phones = source.get("phones").asInstanceOf[util.List[String]].asScala.mkString(",")
+
+      Map(
+        "id" -> id,
+        "province_code" -> province_code,
+        "city_code" -> city_code,
+        "county_code" -> county_code,
+        "reg_capital" -> reg_capital,
+        "estiblish_time" -> estiblish_time,
+        "phones" -> phones
+      )
+    } else {
+      Map.empty[String, String]
+    }
+  }
+
+  def getIndexResult2(json: String) = {
+    import scala.collection.JavaConverters._
+    JSON.parseObject(json).getJSONObject("hits").getJSONArray("hits").toArray().map(m => m.asInstanceOf[util.Map[String, Any]]).map(_.asScala).toList
+  }
+
+}
+
+object CompanyWenshuYgZhongben {
+  def main(args: Array[String]): Unit = {
+    var project = "winhc_eci_dev"
+    var table = ""
+    var runOld = false
+
+    if (args.length == 2) {
+      val Array(project1, table1) = args
+      project = project1
+      table = table1
+    } else if (args.length == 3) {
+      val Array(project1, table1, remain) = args
+      project = project1
+      table = table1
+      if (remain.equals("1"))
+        runOld = true
+    } else {
+      println("please set project,table...")
+      sys.exit(-1)
+    }
+
+    println(
+      s"""
+         |project: $project| table: $table| runOld: $runOld
+         |""".stripMargin)
+
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "100"
+    )
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+
+    val companyWenshu = CompanyWenshu(spark, project, table)
+    companyWenshu.regfun()
+    //是否跑全量数据
+    if (!runOld) {
+//      val flag = announcement.preCalc()
+      //增量没更新返回
+//      if(!flag) return
+    }
+    companyWenshu.calc(runOld)
+    spark.stop()
+
+
+  }
+
+
+
+
+
+
+}
+