DataFrame与RDD互操作

it2025-06-27  8

DataFrame与RDD互操作之一: 反射方式

  使用反射来推断包含了特定数据类型的RDD的元数据信息

  使用DataFrame API或者sql方式编程

代码如下

import org.apache.spark.sql.SparkSession object DataFrameRDDApp { def main(args: Array[String]): Unit = { //初始化 val sparkSession = SparkSession.builder().appName("DataFrameRDDApp").master("local[2]").getOrCreate() //RDD => DataFrame val rdd = sparkSession.sparkContext.textFile("G:\\people.txt") //需要导入隐式转换 import sparkSession.implicits._ val infoDF = rdd.map(_.split(",")).map(line => Info(line(0).toInt,line(1),line(2).toInt)).toDF() infoDF.show() infoDF.filter(infoDF.col("age")>10).show   //将DataFrame转化成sql表 infoDF.createOrReplaceTempView("infos")   //利用spark.sql 进行api操作 sparkSession.sql("select * from infos where age > 10").show() sparkSession.close() } //采用反射的方式,获取字段的类型和名字 使用反射来推断包含了特定数据类型的RDD的元数据信息 case class Info(id : Int,name: String,age:Int) }

操作文件如下

1,zhangsan,9 2,lish,14 3,zhangwu,17

DataFrame与RDD互操作之二: 编程方式

import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} import org.apache.spark.sql.{Row, SparkSession} object DataFrameRDDApp { def main(args: Array[String]): Unit = { //初始化 val sparkSession = SparkSession.builder().appName("DataFrameRDDApp").master("local[2]").getOrCreate() program(sparkSession) sparkSession.close() } def program(sparkSession: SparkSession): Unit = { //RDD => DataFrame val rdd = sparkSession.sparkContext.textFile("G:\\people.txt") val infoRDD = rdd.map(_.split(",")).map(line => Row(line(0).toInt,line(1),line(2).toInt)) val structType = StructType(Array(StructField("id",IntegerType,true), StructField("name", StringType,true), StructField("age",IntegerType,true))) val people = sparkSession.createDataFrame(infoRDD,structType) people.printSchema()   //通过df的api方式进行操作 people.filter(people.col("age")>8).show people.show() } }

DataFrame 和RDD互操作的两种方式

1反射 case class  前提:事先知道你的字段、字段类型(推荐)

2编程 row            前提: 事先不知道你的字段类型

例子:对日志文件进行即席查询

1 上传文件到hdfs上,这里随便上传了一个datanode的日志文件

hdfs dfs -put /var/log/hadoop-hdfs/hadoop-cmf-hdfs-DATANODE-udap69a166.log.out /test

2 将文件加载成rdd

scala> val logrdd = sc.textFile("hdfs://hadoop1/test/hadoop-cmf-hdfs-DATANODE-udap69a166.log.out")

3 对原来的rdd进行map操作 这里直接用一列

import org.apache.spark.sql.Row val masterrdd = logrdd.map(line => Row(line))

4 定义schema信息

  加载隐式转换 import spark.implicits._val schemaString = "line"val filed = schemaString.split(" ").map(fieldName => StructField(fieldName ,StringType,nullable = true))val schema = StructType(filed)val masterDF = spark.createDataFrame(masterrdd,schema)

 masterDF.createOrReplaceTempView("logs")

 spark.sql("select * from logs").show

这个地方会报错

error: not found: value StructField

解决方法:stackoverflow上 给出的解决方法是导入相应的类型

import org.apache.spark.sql.types._
最新回复(0)