使用反射来推断包含了特定数据类型的RDD的元数据信息
使用DataFrame API或者sql方式编程
代码如下
import org.apache.spark.sql.SparkSession object DataFrameRDDApp { def main(args: Array[String]): Unit = { //初始化 val sparkSession = SparkSession.builder().appName("DataFrameRDDApp").master("local[2]").getOrCreate() //RDD => DataFrame val rdd = sparkSession.sparkContext.textFile("G:\\people.txt") //需要导入隐式转换 import sparkSession.implicits._ val infoDF = rdd.map(_.split(",")).map(line => Info(line(0).toInt,line(1),line(2).toInt)).toDF() infoDF.show() infoDF.filter(infoDF.col("age")>10).show //将DataFrame转化成sql表 infoDF.createOrReplaceTempView("infos") //利用spark.sql 进行api操作 sparkSession.sql("select * from infos where age > 10").show() sparkSession.close() } //采用反射的方式,获取字段的类型和名字 使用反射来推断包含了特定数据类型的RDD的元数据信息 case class Info(id : Int,name: String,age:Int) }操作文件如下
1,zhangsan,9 2,lish,14 3,zhangwu,17DataFrame 和RDD互操作的两种方式
1反射 case class 前提:事先知道你的字段、字段类型(推荐)
2编程 row 前提: 事先不知道你的字段类型
例子:对日志文件进行即席查询
1 上传文件到hdfs上,这里随便上传了一个datanode的日志文件
hdfs dfs -put /var/log/hadoop-hdfs/hadoop-cmf-hdfs-DATANODE-udap69a166.log.out /test2 将文件加载成rdd
scala> val logrdd = sc.textFile("hdfs://hadoop1/test/hadoop-cmf-hdfs-DATANODE-udap69a166.log.out")3 对原来的rdd进行map操作 这里直接用一列
import org.apache.spark.sql.Row val masterrdd = logrdd.map(line => Row(line))4 定义schema信息
加载隐式转换 import spark.implicits._val schemaString = "line"val filed = schemaString.split(" ").map(fieldName => StructField(fieldName ,StringType,nullable = true))val schema = StructType(filed)val masterDF = spark.createDataFrame(masterrdd,schema)masterDF.createOrReplaceTempView("logs")
spark.sql("select * from logs").show
这个地方会报错
error: not found: value StructField解决方法:stackoverflow上 给出的解决方法是导入相应的类型
import org.apache.spark.sql.types._