它是spark中用于处理结构化数据的一个模块
功能跟hive类似,最早shark, 为了解决hive速度慢的问题
Spark SQL 和 Spark Core的关系(RDD)与 hive 和 MapReduce关系类似
Spark SQL 优势
代码少
spark SQL操作数据的两种方式
通过sql语句自带了DataFrame的API速度快(对比直接写RDD代码)
SparkSQL API 转换成RDD的时候会做执行优化(catalyst优化器、钨丝计划、代码生成器)优化引擎转化的RDD代码比自己写的效率更高DataFrame 在操作结构化数据的时候
引入了schema(表结构)off-heap 突破虚拟机的限制,能够使用操作系统层面上的内存解决了RDD序列化、反序列化开销大,频繁创建和销毁对象造成大量的GC(Garbage Collection) 垃圾回收机制的缺点丢失了RDD编译时进行类型检查,具有面向对象编程的特点的优点RDD是分布式的对象的集合,Spark并不知道对象的详细模式信息;DataFrame相当于是一个带着schema(提供由列组成的详细模式信息)的RDD
DataFrame还引入了off-heap,意味着可以使用JVM堆以外的内存
RDD是分布式的Java对象的集合。DataFrame是分布式的Row对象的集合
解决了RDD序列化、反序列化开销大,频繁创建和销毁对象造成大量的GC(Garbage Collection) 垃圾回收机制的缺点
丢失了RDD编译时进行类型检查,具有面向对象编程的特点的优点
DataFrame提供比RDD更丰富的算子,能够提升执行效率,减少数据读取以及执行计划的优化
通过DataFrame API或SQL处理数据,会自动经过Spark 优化器Catalyst的优化,即使你写的程序或SQL不仅高效,也可以运行的很快。
启动spark集群
cd ~/bigdata/spark/sbin ./start-master.sh - h 192.168.19.137 ./start-slave.sh spark://192.168.19.137 source active py365 jupyter notebook --ip 0.0.0.0 --allow-root配置环境 python和java解释器
import os JAVA_HOME = '/root/bigdata/jdk' PYSPARK_PYTHON = "/miniconda2/envs/py365/bin/python" os.environ["JAVA_HOME"] = JAVA_HOME os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_PYTHON from pyspark import SparkContext,SparkConf from pyspark.sql import SparkSession创建DataFrame需要一个spark session 与创建RDD需要spark context
SPARK_APP_NAME = "dataframetest" SPARK_URL = "spark://192.168.19.137:7077" conf = SparkConf() # 创建spark config对象 config = ( ("spark.app.name", SPARK_APP_NAME), # 设置启动的spark的app名称,没有提供,将随机产生一个名称 ("spark.executor.memory", "6g"), # 设置该app启动时占用的内存用量,默认1g ("spark.master", SPARK_URL), # spark master的地址 ("spark.executor.cores", "4"), # 设置spark executor使用的CPU核心数 conf.setAll(config) # 利用config对象,创建spark session spark = SparkSession.builder.config(conf=conf).getOrCreate()增加或替换一列
df.withColumn('列名',数值).show()删除一列
df.drop('列名').show()统计信息
df.describe().show() #计算某一列的描述信息 df.describe('cls').show()提取部分列
df.select('列名','列名').show()基本统计功能
df.select('列名').distinct().count()分组统计
# 分组统计 groupby(colname).agg({'col':'fun','col2':'fun2'}) df.groupby('列名').agg({'列名':'mean','列名':'max'}).show() # avg(), count(), countDistinct(), first(), kurtosis(), # max(), mean(), min(), skewness(), stddev(), stddev_pop(), # stddev_samp(), sum(), sumDistinct(), var_pop(), var_samp() and variance()自定义方法 并重命名
# 自定义的汇总方法 import pyspark.sql.functions as fn #调用函数并起一个别名 df.agg(fn.count('SepalWidth').alias('width_count'),fn.countDistinct('cls').alias('distinct_cls_count')).show()拆分数据集
trainDF, testDF = df.randomSplit([0.6, 0.4])采样数据
#第一个参数withReplacement:是否有放回的采样 #第二个参数fraction:采样比例 #第三个参数seed:随机种子 sdf = df.sample(False,0.2,100)查看两个数据集在类别上的差异
#查看两个数据集在类别上的差异 subtract,确保训练数据集覆盖了所有分类 diff_in_train_test = testDF.select('cls').subtract(trainDF.select('cls')) diff_in_train_test.distinct().count()交叉表
df.crosstab('cls','SepalLength').show()udf
udf:自定义函数
创建udf,udf函数需要两个参数: 1. Function 2. Return type (in my case StringType()) 在RDD中可以直接定义函数,交给rdd的transformatioins方法进行执行 在DataFrame中需要通过udf将自定义函数封装成udf函数再交给DataFrame进行调用执行详见链接