spark 开窗函数row

it2024-01-02  62

开窗函数row_number()是按照某个字段分组,然后取另外一个字段排序的前几个值的函数,相当于分组topN。 object RowNumberWindowFunction { //开窗函数 def main(args: Array[String]): Unit = { val spark = SparkSession.builder() .appName("RowNumberWindowFunction") .master("local[2]") .enableHiveSupport() .getOrCreate() val array = Array("1,Hadoop,12","6,Spark,6","3,Solr,15","3,HBase,8","6,Hive,16","6,TensorFlow,26") val rdd = spark.sparkContext.parallelize(array).map(_.split(",")).map{ item => Row(item(0), item(1), item(2).toInt) } val structType = new StructType(Array( StructField("id", StringType, true), StructField("name", StringType, true), StructField("age", IntegerType, true) )) val df = spark.createDataFrame(rdd, structType) df.show() df.createOrReplaceTempView("table") val top1 = spark.sql("select id, name, age from (select id, name, age,"+ "row_number() over(partition by id order by age desc) top from table)t where top <= 1") top1.show() val top2 = spark.sql("select id, name, age from (select id, name, age,"+ "row_number() over(partition by id order by age desc) top from table)t where top <= 2") top2.show() val top3 = spark.sql("select id, name, age from (select id, name, age,"+ "row_number() over(partition by id order by age desc) top from table)t where top <= 3") top3.show() } } 原始数据 +---+----------+---+ | id| name|age| +---+----------+---+ | 1| Hadoop| 12| | 6| Spark| 6| | 3| Solr| 15| | 3| HBase| 8| | 6| Hive| 16| | 6|TensorFlow| 26| +---+----------+---+ top1 +---+----------+---+ | id| name|age| +---+----------+---+ | 3| Solr| 15| | 6|TensorFlow| 26| | 1| Hadoop| 12| +---+----------+---+ top2 +---+----------+---+ | id| name|age| +---+----------+---+ | 3| Solr| 15| | 3| HBase| 8| | 6|TensorFlow| 26| | 6| Hive| 16| | 1| Hadoop| 12| +---+----------+---+ top3 +---+----------+---+ | id| name|age| +---+----------+---+ | 3| Solr| 15| | 3| HBase| 8| | 6|TensorFlow| 26| | 6| Hive| 16| | 6| Spark| 6| | 1| Hadoop| 12| +---+----------+---+
最新回复(0)