|
@@ -90,19 +90,19 @@ case class CompanyIncSummary(s: SparkSession,
|
|
|SELECT ${cidField} as cid
|
|
|SELECT ${cidField} as cid
|
|
| ,COUNT(1) as num
|
|
| ,COUNT(1) as num
|
|
|FROM inc_tmp_view
|
|
|FROM inc_tmp_view
|
|
|
|
+ |where $cidField is not null
|
|
|GROUP BY $cidField
|
|
|GROUP BY $cidField
|
|
|
|
+ |having count(1) >0
|
|
|""".stripMargin)
|
|
|""".stripMargin)
|
|
.select(Seq("cid", "num").map(column => col(column).cast("string")): _*)
|
|
.select(Seq("cid", "num").map(column => col(column).cast("string")): _*)
|
|
.rdd
|
|
.rdd
|
|
- .filter(r => {
|
|
|
|
- r.get(1) != null && !"0".equals(r.getString(1))
|
|
|
|
- }).map(row => {
|
|
|
|
- val id = row.getString(0)
|
|
|
|
- val num = row.getString(1)
|
|
|
|
- val put = new Put(Bytes.toBytes(id))
|
|
|
|
- put.addColumn(f_bytes, name_bytes, Bytes.toBytes(num))
|
|
|
|
- (new ImmutableBytesWritable, put)
|
|
|
|
- }).filter(_ != null).saveAsHadoopDataset(jobConf)
|
|
|
|
|
|
+ .map(row => {
|
|
|
|
+ val id = row.getString(0)
|
|
|
|
+ val num = row.getString(1)
|
|
|
|
+ val put = new Put(Bytes.toBytes(id))
|
|
|
|
+ put.addColumn(f_bytes, name_bytes, Bytes.toBytes(num))
|
|
|
|
+ (new ImmutableBytesWritable, put)
|
|
|
|
+ }).filter(_ != null).saveAsHadoopDataset(jobConf)
|
|
}
|
|
}
|
|
|
|
|
|
private def getCastCols(name: String, pre: String): String = {
|
|
private def getCastCols(name: String, pre: String): String = {
|