Spark Sql 和DataFrame总结

it2024-10-15  36

Spark Sql 和DataFrame总结

1. Spark SQL概述2. DataFrame2.1 DataFrame概述2.2 DataFrame vs RDD 区别2.3 Pandas DataFrame vs Spark DataFrame 3. DataFrame 操作3.1 创建DataFrame3.1.1 从RDD创建DataFrame3.1.2 从CSV文件创建DataFrame3.1.3连接数据库3.1.4 读取json数据 3.2 DataFrame操作3.3 综合案例 4. 用DataFrame对数据进行去重、缺失值处理、异常值处理

1. Spark SQL概述

它是spark中用于处理结构化数据的一个模块

功能跟hive类似,最早shark, 为了解决hive速度慢的问题

Spark SQL 和 Spark Core的关系(RDD)与 hive 和 MapReduce关系类似

Spark SQL 优势

代码少

spark SQL操作数据的两种方式

通过sql语句自带了DataFrame的API

速度快(对比直接写RDD代码)

SparkSQL API 转换成RDD的时候会做执行优化(catalyst优化器、钨丝计划、代码生成器)优化引擎转化的RDD代码比自己写的效率更高

DataFrame 在操作结构化数据的时候

引入了schema(表结构)off-heap 突破虚拟机的限制,能够使用操作系统层面上的内存解决了RDD序列化、反序列化开销大,频繁创建和销毁对象造成大量的GC(Garbage Collection) 垃圾回收机制的缺点丢失了RDD编译时进行类型检查,具有面向对象编程的特点的优点

2. DataFrame

2.1 DataFrame概述

是RDD为基础的分布式数据集,类似于传统关系型数据库的二维表,dataframe记录了对应列的名称和类型DataFrame是一个分布式的行集合dataset[ROW]基于RDD的 Immuatable 不可变的 只能生成新的RDDLazy Evaluations transformation 延迟执行action 执行了action之后transformation才会触发 Distributed 分布式dataframe和dataset统一,dataframe只是**dataset[ROW]**的类型别名。由于Python是弱类型语言,只能使用DataFrame

2.2 DataFrame vs RDD 区别

RDD是分布式的对象的集合,Spark并不知道对象的详细模式信息;DataFrame相当于是一个带着schema(提供由列组成的详细模式信息)的RDD

DataFrame还引入了off-heap,意味着可以使用JVM堆以外的内存

RDD是分布式的Java对象的集合。DataFrame是分布式的Row对象的集合

解决了RDD序列化、反序列化开销大,频繁创建和销毁对象造成大量的GC(Garbage Collection) 垃圾回收机制的缺点

丢失了RDD编译时进行类型检查,具有面向对象编程的特点的优点

DataFrame提供比RDD更丰富的算子,能够提升执行效率,减少数据读取以及执行计划的优化

通过DataFrame API或SQL处理数据,会自动经过Spark 优化器Catalyst的优化,即使你写的程序或SQL不仅高效,也可以运行的很快。

2.3 Pandas DataFrame vs Spark DataFrame

单机 VS 分布式集群并行计算Spark DataFrame 延迟执行Spark DataFrame 不可变Pandas DataFrame API 更丰富

3. DataFrame 操作

3.1 创建DataFrame

启动spark集群

cd ~/bigdata/spark/sbin ./start-master.sh - h 192.168.19.137 ./start-slave.sh spark://192.168.19.137 source active py365 jupyter notebook --ip 0.0.0.0 --allow-root

配置环境 python和java解释器

import os JAVA_HOME = '/root/bigdata/jdk' PYSPARK_PYTHON = "/miniconda2/envs/py365/bin/python" os.environ["JAVA_HOME"] = JAVA_HOME os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON from pyspark import SparkContext,SparkConf from pyspark.sql import SparkSession

创建DataFrame需要一个spark session 与创建RDD需要spark context

SPARK_APP_NAME = "dataframetest" SPARK_URL = "spark://192.168.19.137:7077" conf = SparkConf() # 创建spark config对象 config = ( ("spark.app.name", SPARK_APP_NAME), # 设置启动的spark的app名称,没有提供,将随机产生一个名称 ("spark.executor.memory", "6g"), # 设置该app启动时占用的内存用量,默认1g ("spark.master", SPARK_URL), # spark master的地址 ("spark.executor.cores", "4"), # 设置spark executor使用的CPU核心数 conf.setAll(config) # 利用config对象,创建spark session spark = SparkSession.builder.config(conf=conf).getOrCreate()

3.1.1 从RDD创建DataFrame

from pyspark.sql import SparkSession from pyspark.sql import Row spark = SparkSession.builder.appName('test').getOrCreate() # DataFrame -> RDD sc = spark.sparkContext l = [('Ankit',25),('Jalfaizy',22),('saurabh',20),('Bala',26)] rdd = sc.parallelize(l) #为数据添加列名 注意row对象 people = rdd.map(lambda x: Row(name=x[0], age=int(x[1]))) #RDD->DataFrame schemaPeople = spark.createDataFrame(people) df = spark.createDataFrame(Row对象的RDD)

3.1.2 从CSV文件创建DataFrame

df = spark.read.format('csv').option('header','true').load('/iris.csv') #显示数据结构 df.printSchema() #显示前10条数据 df.show(10) #统计总量 df.count() #列名 df.columns

3.1.3连接数据库

jdbcDF = spark.read.format("jdbc").option("url","jdbc:mysql://localhost:3306/db_name").option("dbtable","table_name").option("user","xxx").option("password","xxx").load()

3.1.4 读取json数据

from pyspark.sql import SparkSession from pyspark.sql.types import * jsonString = [ """{ "id" : "01001", "city" : "AGAWAM", "pop" : 15338, "state" : "MA" }""", """{ "id" : "01002", "city" : "CUSHMAN", "pop" : 36963, "state" : "MA" }""" ] spark = SparkSession.builder.appName('json_demo').getOrCreate() sc = spark.sparkContext jsonRDD = sc.parallelize(jsonString) #定义结构类型 不带嵌套 #StructType:schema的整体结构,表示JSON的对象结构 #XXXStype:指的是某一列的数据类型 jsonSchema = StructType() \ .add("id", StringType(),True) \ .add("city", StringType()) \ .add("pop" , LongType()) \ .add("state",StringType()) # 带嵌套 ArrayType jsonSchema = StructType([ StructField("id", StringType(), True), StructField("city", StringType(), True), StructField("loc" , ArrayType(DoubleType())), StructField("pop", LongType(), True), StructField("state", StringType(), True) ]) # 读取表结构 reader = spark.read.schema(jsonSchema) jsonDF = reader.json(jsonRDD) jsonDF.printSchema() jsonDF.show() jsonDF.filter(jsonDF.pop>4000).show(10)

3.2 DataFrame操作

增加或替换一列

df.withColumn('列名',数值).show()

删除一列

df.drop('列名').show()

统计信息

df.describe().show() #计算某一列的描述信息 df.describe('cls').show()

提取部分列

df.select('列名','列名').show()

基本统计功能

df.select('列名').distinct().count()

分组统计

# 分组统计 groupby(colname).agg({'col':'fun','col2':'fun2'}) df.groupby('列名').agg({'列名':'mean','列名':'max'}).show() # avg(), count(), countDistinct(), first(), kurtosis(), # max(), mean(), min(), skewness(), stddev(), stddev_pop(), # stddev_samp(), sum(), sumDistinct(), var_pop(), var_samp() and variance()

自定义方法 并重命名

# 自定义的汇总方法 import pyspark.sql.functions as fn #调用函数并起一个别名 df.agg(fn.count('SepalWidth').alias('width_count'),fn.countDistinct('cls').alias('distinct_cls_count')).show()

拆分数据集

trainDF, testDF = df.randomSplit([0.6, 0.4])

采样数据

#第一个参数withReplacement:是否有放回的采样 #第二个参数fraction:采样比例 #第三个参数seed:随机种子 sdf = df.sample(False,0.2,100)

查看两个数据集在类别上的差异

#查看两个数据集在类别上的差异 subtract,确保训练数据集覆盖了所有分类 diff_in_train_test = testDF.select('cls').subtract(trainDF.select('cls')) diff_in_train_test.distinct().count()

交叉表

df.crosstab('cls','SepalLength').show()

udf

udf:自定义函数

创建udf,udf函数需要两个参数: 1. Function 2. Return type (in my case StringType()) 在RDD中可以直接定义函数,交给rdd的transformatioins方法进行执行 在DataFrame中需要通过udf将自定义函数封装成udf函数再交给DataFrame进行调用执行

3.3 综合案例

#================== 综合案例 + udf================ # 测试数据集中有些类别在训练集中是不存在的,找到这些数据集做后续处理 from pyspark.sql.types import StringType from pyspark.sql.functions import udf trainDF,testDF = df.randomSplit([0.99,0.01]) diff_in_train_test = trainDF.select('cls').subtract(testDF.select('cls')).distinct().show() #首先找到这些类,整理到一个列表 not_exist_cls = trainDF.select('cls').subtract(testDF.select('cls')).distinct().rdd.map(lambda x :x[0]).collect() #定义一个方法,用于检测 def should_remove(x): if x in not_exist_cls: return -1 else : return x check = udf(should_remove,StringType()) resultDF = trainDF.withColumn('New_cls',check(trainDF['cls'])).filter('New_cls <> -1') resultDF.show()

4. 用DataFrame对数据进行去重、缺失值处理、异常值处理

详见链接

最新回复(0)