开窗函数row_number()是按照某个字段分组,然后取另外一个字段排序的前几个值的函数,相当于分组topN。
object RowNumberWindowFunction
{
def main
(args
: Array
[String]): Unit = {
val spark
= SparkSession
.builder
()
.appName
("RowNumberWindowFunction")
.master
("local[2]")
.enableHiveSupport
()
.getOrCreate
()
val array
= Array
("1,Hadoop,12","6,Spark,6","3,Solr,15","3,HBase,8","6,Hive,16","6,TensorFlow,26")
val rdd
= spark
.sparkContext
.parallelize
(array
).map
(_
.split
(",")).map
{
item
=>
Row
(item
(0), item
(1), item
(2).toInt
)
}
val structType
= new StructType
(Array
(
StructField
("id", StringType
, true),
StructField
("name", StringType
, true),
StructField
("age", IntegerType
, true)
))
val df
= spark
.createDataFrame
(rdd
, structType
)
df
.show
()
df
.createOrReplaceTempView
("table")
val top1
= spark
.sql
("select id, name, age from (select id, name, age,"+
"row_number() over(partition by id order by age desc) top from table)t where top <= 1")
top1
.show
()
val top2
= spark
.sql
("select id, name, age from (select id, name, age,"+
"row_number() over(partition by id order by age desc) top from table)t where top <= 2")
top2
.show
()
val top3
= spark
.sql
("select id, name, age from (select id, name, age,"+
"row_number() over(partition by id order by age desc) top from table)t where top <= 3")
top3
.show
()
}
}
原始数据
+---+----------+---+
| id
| name
|age
|
+---+----------+---+
| 1| Hadoop
| 12|
| 6| Spark
| 6|
| 3| Solr
| 15|
| 3| HBase
| 8|
| 6| Hive
| 16|
| 6|TensorFlow
| 26|
+---+----------+---+
top1
+---+----------+---+
| id
| name
|age
|
+---+----------+---+
| 3| Solr
| 15|
| 6|TensorFlow
| 26|
| 1| Hadoop
| 12|
+---+----------+---+
top2
+---+----------+---+
| id
| name
|age
|
+---+----------+---+
| 3| Solr
| 15|
| 3| HBase
| 8|
| 6|TensorFlow
| 26|
| 6| Hive
| 16|
| 1| Hadoop
| 12|
+---+----------+---+
top3
+---+----------+---+
| id
| name
|age
|
+---+----------+---+
| 3| Solr
| 15|
| 3| HBase
| 8|
| 6|TensorFlow
| 26|
| 6| Hive
| 16|
| 6| Spark
| 6|
| 1| Hadoop
| 12|
+---+----------+---+
转载请注明原文地址: https://lol.8miu.com/read-12910.html