1.Spark SQL概述 (1)概念: Spark SQL 是 Spark 1.0 的新加入成员,前身是 Shark。 (2)shark的初衷: ①让Hive运行在Spark之上 ②是对Hive的改造,继承了大量Hive代码,给优化和维护带来了大量的麻烦 (3)Spark SQL覆盖了shark的功能,并且拜托了Hive的依赖 (4)Spark SQL特点: ①易整合 ②统一的数据访问方式 ③兼容Hive 2.Spark SQL架构 (1)Spark SQL是Spark的核心组件之一 (2)能够直接访问现存的Hive数据 (3)提供JDBC/ODBC接口供第三方工具借助Spark进行数据处理 (4)提供了更高层级的接口方便地处理数据 (5)支持多种操作方式:SQL、API编程 (6)支持多种外部数据源:Parquet、JSON、RDBMS等
3.Spark SQL运行原理 (1)核心: Catalyst是Spark SQL的核心。所有 SQL 操作最终都通过 Catalyst 翻译成类似的 Spark 程序代码被 Spark Core 调度执行。 (2)Catalyst优化器 对于 Spark SQL 来说,从 SQL 到 RDD 的执行需要经过两个大的阶段,分别是逻辑计划和物理计划。Catalyst 优化器,作用便是将逻辑计划转为物理计划。 ①逻辑计划
SELECT name FROM ( SELECT id, name FROM people ) p WHERE p.id = 1②优化 在投影上面查询过滤器 检查过滤是否可下压
③物理计划
1.基本组成 (1)SparkContext Spark的主入口 (2)SQLContext Spark SQL的编程入口 (3)HiveContext SQLContext的子集,包含更多功能 (4)SparkSession(Spark2.x+才有)
val spark = SparkSession.builder .master("local[*]") .appName("appName") .getOrCreate()①合并了SQLContext和HiveContext ②提供了Spark功能交互单一入口点,并允许使用DataFrame和Dataset API对Spark进行编程 (5)Dataset 特定域对象中的强类型集合
scala> spark.createDataset(1 to 3).show scala> spark.createDataset(List(("a",1),("b",2),("c",3))).show scala> spark.createDataset(sc.parallelize(List(("a",1,1),("b",2,2)))).show①createDataset()的参数可以是:Seq、Array、RDD ②上面三行代码生成的Dataset分别是: Dataset[Int]、Dataset[(String,Int)]、Dataset[(String,Int,Int)] ③Dataset=RDD+Schema,所以Dataset与RDD有大部共同的函数,如map、filter等
(6)DataFrame ①DataFrame=Dataset[Row] ②类似传统数据的二维表格 ③在RDD基础上加入了Schema(数据结构信息) ④DataFrame Schema支持嵌套数据类型(struct,map,array) ⑤提供更多类似SQL操作的API
2.使用Case Class创建Dataset (1)直接创建
case class Point(label:String,x:Double,y:Double) case class Category(id:Long,name:String) val points=Seq(Point("bar",3.0,5.6),Point("foo",-1.0,3.0)).toDS val categories=Seq(Category(1,"foo"), Category(2,"bar")).toDS points.join(categories,points("label")===categories("name")).show(2)RDD->Dataset
import spark.implicits._ case class Point(label:String,x:Double,y:Double) case class Category(id:Long,name:String) val pointsRDD=sc.parallelize(List(("bar",3.0,5.6),("foo",-1.0,3.0))) val categoriesRDD=sc.parallelize(List((1,"foo"),(2,"bar"))) val points=pointsRDD.map(line=>Point(line._1,line._2,line._3)).toDS val categories=categories.map(line=>Category(line._1,line._2)).toDS points.join(categories,points("label")===categories("name")).show3.DataFrame操作 (1)外部文件创建DataFrame ①通过结构化数据创建(csv,json等) Json
/** 将JSON文件转成DataFrame * people.json内容如下 * {"name":"Michael"} * {"name":"Andy", "age":30} * {"name":"Justin", "age":19} */ val df = spark.read.json("file:///home/hadoop/data/people.json") // 使用show方法将DataFrame的内容输出 df.showcsv
/*CSV 内容预览 id|name|age 1| darren |18 2|anne|18 3|"test"|18 4|'test2'|18 */ val result = spark.read.format("csv") .option("delimiter", "|") //字段分隔符,默认为“,” .option("header", "true") //第一行作为 Schema,而非内容 .option("quote", "'") //引号字符,默认为双引号“"” .option("nullValue", "\\N") //指定一个字符串代表 null 值 .option("inferSchema", "true") //自动推测字段类型 .load("/test-in/csv/csv_with_header.csv") result.show() result.printSchema()(2)通过RDD创建DataFrame ①通过反射获取RDD内的Schema
case class Person(name:String,age:Int) import spark.implicits._ val people=sc.textFile("file:///home/hadooop/data/people.txt") .map(_.split(",")) .map(p => Person(p(0), p(1).trim.toInt)).toDF() people.show people.createOrReplaceTempView("people") val teenagers = spark.sql( "SELECT name, age FROM people WHERE age >= 13 AND age <= 19") teenagers.show()②通过编程接口指定Schema
//方式二:通过编程接口指定Schema case class Person(name:String,age:Int) val people=sc.textFile("file:///home/hadoop/data/people.txt") // 以字符串的方式定义DataFrame的Schema信息 val schemaString = "name age" //导入所需要的类 import org.apache.spark.sql.Row import org.apache.spark.sql.types.{StructType, StructField, StringType} // 根据自定义的字符串schema信息产生DataFrame的Schema val schema = StructType(schemaString.split(" ").map(fieldName =>StructField(fieldName,StringType, true))) //将RDD转换成Row val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim)) // 将Schema作用到RDD上 val peopleDataFrame = spark.createDataFrame(rowRDD, schema) // 将DataFrame注册成临时表 peopleDataFrame.createOrReplaceTempView("people") val results = spark.sql("SELECT name FROM people") results.show(3)通过Hive创建
val spark = SparkSession.builder().enableHiveSupport().getOrCreate() val df = spark.sql("select * from toronto") df.filter($"ssn".startsWith("111")).write.saveAsTable("t1")(4)通过JDBC创建
$spark-shell --jars /opt/spark/ext_jars/mysql-connector-java-5.1.38.jar val url = "jdbc:mysql://localhost:3306/metastore" val tableName = "TBLS" // 设置连接用户、密码、数据库驱动类 val prop = new java.util.Properties prop.setProperty("user","hive") prop.setProperty("password","mypassword") prop.setProperty("driver","com.mysql.jdbc.Driver") // 取得该表数据 val jdbcDF = spark.read.jdbc(url,tableName,prop) jdbcDF.show //DF 存为新的表 jdbcDF.write.mode("append").jdbc(url,"t1",prop)(5)DataFrame API常用操作
val df = spark.read.json("file:///home/hadoop/data/people.json") // 使用printSchema方法输出DataFrame的Schema信息 df.printSchema() // 使用select方法来选择我们所需要的字段 df.select("name").show() // 使用select方法选择我们所需要的字段,并未age字段加1 df.select(df("name"), df("age") + 1).show() // 使用filter方法完成条件过滤 df.filter(df("age") > 21).show() // 使用groupBy方法进行分组,求分组后的总数 df.groupBy("age").count().show() //sql()方法执行SQL查询操作 df.createOrReplaceTempView("people") spark.sql("SELECT * FROM people").show4.Spark SQL操作外部数据源 (1)Parquet文件 是一种流行的列式存储格式,以二进制存储,文件中包含数据与元数据。 ①写文件
//Spark SQL写parquet文件 import org.apache.spark.sql.types.{StructType, StructField, StringType,ArrayType,IntegerType} val schema=StructType(Array(StructField("name",StringType), StructField("favorite_color",StringType), StructField("favorite_numbers",ArrayType(IntegerType)))) val rdd=sc.parallelize(List(("Alyssa",null,Array(3,9,15,20)),("Ben","red",null))) val rowRDD=rdd.map(p=>Row(p._1,p._2,p._3)) val df=spark.createDataFrame(rowRDD,schema) df.write.parquet("/data/users") //在该目录下生成parquet文件②读文件
//Spark SQL读parquet文件 val df=spark.read.parquet("/data/users") //该目录下存在parquet文件 df.show df.printSchema(2)Hive表 ①创建Hive表
#创建一个Hive表 hive>create table toronto(full_name string, ssn string, office_address string); hive>insert into toronto(full_name, ssn, office_address) values('John S. ', '111-222-333 ', '123 Yonge Street ');②Spark-shell操作
//集成Hive后spark-shell下可直接访问Hive表 val df=spark.table("toronto") df.printSchema df.show③IDEA操作
//IDEA中使用,需将hive-site.xml拷贝至resources val spark = SparkSession.builder() .master("local[*]") .enableHiveSupport() .getOrCreate() val df = spark.sql("select * from toronto") df.filter($"ssn".startsWith("111")).write.saveAsTable("t1")(3)RDBMS表
$spark-shell --jars /opt/spark/ext_jars/mysql-connector-java-5.1.38.jar val url = "jdbc:mysql://localhost:3306/metastore" val tableName = "TBLS" // 设置连接用户、密码、数据库驱动类 val prop = new java.util.Properties prop.setProperty("user","root") prop.setProperty("password","12345") prop.setProperty("driver","com.mysql.jdbc.Driver") // 取得该表数据 val jdbcDF = spark.read.jdbc(url,tableName,prop) jdbcDF.show //DF存为新的表 jdbcDF.write.mode("append").jdbc(url,"t1",prop)5.RDD、Dataset、DataFrame相互转换 (1)RDD ①优点: 内置很多函数操作,group,map,filter 等,方便处理结构化或非结构化数据。 面向对象编程,直接存储的 java 对象,类型转化也安全。 ②缺点:. 由于它基本和 hadoop 一样万能的,因此没有针对特殊场景的优化,比如对于结构化数据处理相对于 sql 来比非常麻烦。 .默认采用的是 java 序列号方式,序列化结果比较大,而且数据存储在 java 堆内存中,导致 gc 比较频繁。 ③RDD->Dataset
val ds = rdd.toDS()④RDD -> DataFrame
val df=rdd.toDF()(2)Dataset ①优点: Dataset 整合了 RDD 和 DataFrame 的优点,支持结构化和非结构化数据。 和 RDD 一样,支持自定义对象存储。 和 DataFrame 一样,支持结构化数据的 sql查询。 采用堆外内存存储,gc 友好。 类型转化安全,代码友好 ②缺点: 很多情况下,Dataset 的性能实际上是会比 DataFrame 要来得差的,因为 Dataset 会涉及到额外的数据格式转换成本。 ③Dataset->RDD
val rdd = ds.rdd④Dataset->DataFrame
val df = ds.toDF()(3)DataFrame ①优点: 结构化数据处理非常方便,支持 Avro, CSV, elastic search, and Cassandra 等 kv 数据,也支持HIVE tables, MySQL 等传统数据表。 有针对性的优化,由于数据结构元信息 spark已经保存,序列化时不需要带上元信息,大大的减少了序列化大小,而且数据保存在堆外内存中,减少了 gc 次数 hive 兼容,支持 hql,udf 等 ②缺点 编译时不能类型转化安全检查,运行时才能确定是否有问题 对于对象支持不友好,rdd 内部数据直接以 java 对象存储,DataFrame 内存存储的是 row对象而不能是自定义对象 ③DataFrame->Dataset
val ds = df.toJSON val ds = df.as[T]④DataFrame->RDD
val rdd = df.rdd6.Spark SQL函数 (1)内置函数(org.apache.spark.sql.functions._) ①聚合函数 count(),countDistinct(),avg(),max(),min() ②集合函数 sort_array、explode ③日期、时间函数 hour、quarter、next_day ④数学函数 asin、atan、sqrt、tan、round ⑤开窗函数 row_number ⑥字符串函数 concat、format_number、regexp_extract ⑦其他函数 isNaN、sha、randn、callUDF
(2)内置函数的使用
import org.apache.spark.sql.Row import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} val accessLogRDD = sc.parallelize(accessLog).map(row => { val splited = row.split(",") Row(splited(0), splited(1).toInt) }) val structTypes = StructType(Array( StructField("day", StringType, true), StructField("userId", IntegerType, true) )) //根据数据以及Schema信息生成DataFrame val accessLogDF = spark.createDataFrame(accessLogRDD, structTypes) //导入Spark SQL内置的函数 import org.apache.spark.sql.functions._ //求每天所有的访问量(pv) accessLogDF.groupBy("day").agg(count("userId").as("pv")).select("day", "pv").collect.foreach(println) //求每天的去重访问量(uv) accessLogDF.groupBy("day").agg(countDistinct('userId).as("uv")).select("day", "uv").collect.foreach(println)(3)自定义函数
case class Hobbies(name: String, hobbies: String) val info = sc.textFile("/data/hobbies.txt") //需要手动导入一个隐式转换,否则RDD无法转换成DF import spark.implicits._ val hobbyDF = info.map(_.split("\t")).map(p => Hobbies(p(0), p(1))).toDF hobbyDF.show hobbyDF.createOrReplaceTempView("hobbies") //注册自定义函数,注意是匿名函数 spark.udf.register("hobby_num", (s: String) => s.split(',').size) spark.sql("select name, hobbies, hobby_num(hobbies) as hobby_num from hobbies").show7.Spark SQL CLI (1)Spark SQL CLI是在本地模式下使用Hive元存储服务和执行从命令行所输入查询语句的简便工具 (2)Spark SQL CLI无法与thrift JDBC服务器通信 (3)Spark SQL CLI等同于Hive CLI(old CLI)、Beeline CLI(new CLI) (4)启动Spark SQL CLI
./bin/spark-sql(5)示例
$spark-sql spark-sql> show databases; default spark-sql> show tables; default toronto false spark-sql> select * from toronto where ssn like '111%'; John S. 111-222-333 123 Yonge Street spark-sql> create table montreal(full_name string, ssn string, office_address string); spark-sql> insert into montreal values('Winnie K. ', '111-222-333 ', '62 John Street'); spark-sql> select t.full_name, m.ssn, t.office_address, m.office_address from toronto t inner join montreal m on t.ssn = m.ssn; John S. 111-222-333 123 Yonge Street 62 John Street8.Spark性能优化 (1)序列化 ①Java序列化,Spark默认方式
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");②Kryo序列化,比Java序列化快约10倍,但不支持所有可序列化类型
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); //向Kryo注册自定义类型 conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]));(2)SQL优化 ①使用对象数组、原始类型代替Java、Scala集合类(如HashMap) ②避免嵌套结构 ③尽量使用数字作为Key,而非字符串 ④以较大的RDD使用MEMORY_ONLY_SER ⑤加载CSV、JSON时,仅加载所需字段 ⑥仅在需要时持久化中间结果(RDD/DS/DF) ⑦避免不必要的中间结果(RDD/DS/DF)的生成 ⑧DF的执行速度比DS快约3倍
(3)性能优化 ①自定义RDD分区与spark.default.parallelism 该参数用于设置每个stage的默认task数量 ②将大变量广播出去,而不是直接使用 ③尝试处理本地数据并最小化跨工作节点的数据传输
(4)表连接(join)优化 ①包含所有表的谓词(predicate)
select * from t1 join t2 on t1.name = t2.full_name where t1.name = 'mike' and t2.full_name = 'mike'②最大的表放在第一位 ③广播最小的表 ④最小化表join的数量