1.MapReduce编程模型的局限性 (1)繁杂: 只有Map和Reduce两个操作,复杂的逻辑需要大量的样板代码 (2)处理效率低: ①Map中间结果写磁盘,Reduce写HDFS,多个Map通过HDFS交换数据 ②任务调度与启动开销大 (3)不适合迭代处理、交互式处理和流式处理
2.Spark是类Hadoop MapReduce的通用并行框架 (1)Job中间输出结果可以保存在内存,不再需要读写HDFS (2)比MapReduce平均快10倍以上
3.Spark优势 (1)速度快 ①基于内存数据处理,比MR快100个数量级以上(逻辑回归算法测试) ②基于硬盘数据处理,比MR快10个数量级以上 (2)易用性 ①支持Java、Scala、Python、R语言 ②交互式shell方便开发测试 (3)通用性 一栈式解决方案:批处理、交互式查询、实时流处理、图计算及机器学习 (4)多种运行模式 YARN、Mesos、EC2、Kubernetes、Standalone、Local
1.Spark技术栈 (1)Spark Core 核心组件,分布式计算引擎 (2)Spark SQL 高性能的基于Hadoop的SQL解决方案 (3)Spark Streaming 可以实现高吞吐量、具备容错机制的准实时流处理系统 (4)Spark GraphX 分布式图处理框架 (5)Spark MLlib 构建在Spark上的分布式机器学习库
2.Spark架构设计 (1)在驱动程序中,通过SparkContext主导应用的执行 (2)SparkContext可以连接不同类型的Cluster Manager(Standalone、YARN、Mesos),连接后,获得集群节点上的Executor (3)一个Worker节点默认一个Executor,可通过SPARK_WORKER_INSTANCES调整 (4)每个应用获取自己的Executor (5)每个Task处理一个RDD分区
3.Spark架构核心组件 (1)Application 建立在Spark上的用户程序,包括Driver代码和运行在集群各节点Executor中的代码 (2)Driver program 驱动程序。Application中的main函数并创建SparkContext (3)Cluster Manager 在集群(Standalone、Mesos、YARN)上获取资源的外部服务 (4)Worker Node 集群中任何可以运行Application代码的节点 (5)Executor 某个Application运行在worker节点上的一个进程 (6)Job 包含多个Task组成的并行计算,往往由Spark Action触发生成,一个Application中往往会产生多个Job (7)Stage 每个Job会被拆分成多组Task,作为一个TaskSet,其名称为Stage (8)Task 被送到某个Executor上的工作单元
1.SparkContext (1)连接Driver与Spark Cluster(Workers) (2)Spark的主入口 (3)每个JVM仅能有一个活跃的SparkContext (4)应用:
import org.apache.spark.{SparkConf, SparkContext} val conf=new SparkConf().setMaster("local[2]").setAppName("HelloSpark") val sc=SparkContext.getOrCreate(conf)2.SparkSession (1)Spark 2.0+应用程序的主入口:包含了SparkContext、SQLContext、HiveContext以及StreamingContext (2)应用:
import org.apache.spark.sql.SparkSession val spark = SparkSession.builder .master("local[2]") .appName("appName") .getOrCreate()3.RDD(弹性分布式数据集) Spark核心,主要数据抽象 (1)概念: ①RDD是将数据项拆分为多个分区的集合,存储在集群的工作节点上的内存和磁盘中,并执行正确的操作 ②RDD是用于数据转换的接口,如map、filter等 ③RDD指向了存储在HDFS、Cassandra、HBase等、或缓存(内存、内存+磁盘、仅磁盘等)、或在故障或缓存收回时重新计算其他RDD分区中的数据。
(2)分布式(Distributed): 数据的计算并非只局限于单个节点,而是多个节点之间协同计算得到 (3)数据集(Datasets): ①RDD是只读的、分区记录的集合,每个分区分布在集群的不同节点上 ②RDD并不存储真正的数据,只是对数据和操作的描述
(4)弹性(Resilient): ①自动进行存储方式的切换 RDD优先存储内存中,内存不足将自动写入磁盘 ②基于Linage的高效容错机制 在任何时候都能进行重算,根据数据血统,可以自动从节点失败中恢复分区,各个分片之间的数据互不影响 ③Stage失败自动重试 / Task失败自动重试 ④Checkpoint和Persist checkpoint持久化到文件系统。
4.Dataset 从Spark1.6开始引入的新的抽象,特定领域对象中的强类型集合,它可以使用函数或者相关操作并行地进行转换等操作。
5.DataFrame DataFrame是特殊的Dataset
1.特性 (1)一系列的分区(分片)信息,每个任务处理一个分区 RDD由很多partition构成,在spark中,计算式,有多少partition就对应有多少task来执行 (2)每个分区上都有compute函数,计算该分区中的数据 对RDD做计算,相当于对RDD的每个split或partition做计算 (3)RDD之间有一系列的依赖 RDD之间有依赖关系,可溯源 (4)分区器决定数据(key-value)分配至哪个分区 如果RDD里面存的数据是Key-Value形式,则可传递到一个自定义的partition进行重新分区, 比如可以按key的hash值进行分区 (5)优先位置列表,将计算任务分派到其所在处理数据块的存储位置(移动数据不如移动计算) 最优的位置去计算,也就是数据的本地性
2.RDD的创建 (1)使用集合创建RDD
val rdd=sc.parallelize(List(1,2,3,4,5,6)) rdd.count rdd.partitions.size //设置分区数 val rdd=sc.parallelize(List(1,2,3,4,5,6),5) rdd.partitions.size val rdd=sc.makeRDD(List(1,2,3,4,5,6))①Spark默认会根据集群的情况来设置分区的数量,也可以通过parallelize()第二参数来指定 ②Spark会为每一个分区运行一个任务进行处理
(2)通过加载文件产生RDD ①加载“file://……”时,以local运行仅需一份本地文件,以Spark集群方式运行,应保证每个节点均有该文件的本地副本
//文件中的一行文本作为RDD的一个元素 val distFile=sc.textFile("file:///home/hadoop/data/hello.txt") distFile.count val distHDFSFile=sc.textFile("hdfs://hadoop000:9000/hello.txt")②支持目录、压缩文件以及通配符
//1.Spark默认访问HDFS //2.Spark默认为HDFS文件的每一个数据块创建一个分区,也可以通过textFile()第二个参数指定,但只能比数据块数量多 sc.textFile("/my/directory") sc.textFile("/my/directory/*.txt") sc.textFile("/my/directory/*.gz")(3)创建PairRDD park 为包含键值对类型的 RDD 提供了一些专有的操作,比如:reduceByKey()、groupByKey()……也可以通过键值对集合创建PairRDD:sc.parallelize(List((1,2),(1,3))) ①SparkContext.wholeTextFiles():可以针对一个目录中的大量小文件返回<filename,fileContent>作为PairRDD ②普通RDD:org.apache.spark.rdd.RDD[data_type] ③PairRDD:org.apache.spark.rdd.RDD[(key_type,value_type)]
(4)其他创建RDD的方法 ①SparkContext.sequenceFileK,V Hadoop SequenceFile的读写支持 ②SparkContext.hadoopRDD()、newAPIHadoopRDD() 从Hadoop接口API创建 ③SparkContext.objectFile() RDD.saveAsObjectFile()的逆操作 (5)RDD创建方式最佳实践: ①测试环境 使用内存集合创建RDD 使用本地文件创建RDD ②生产环境 使用HDFS文件创建RDD
3.RDD分区 分区是被拆分并发送到节点的RDD的不同块之一。 (1)我们拥有的分区越多,得到的并行性就越强 (2)每个分区都是被分发到不同Worker Node的候选者 (3)每个分区对应一个Task
4.RDD操作 分为lazy与non-lazy两种 (1)Transformation(lazy):也称转换操作、转换算子 对于转换操作,RDD的所有转换都不会直接计算结果 ①仅记录作用于RDD上的操作 ②当遇到动作算子(Action)时才会进行真正计算 (2)Actions(non-lazy):立即执行,也称动作操作、动作算子 ①本质上动作算子通过SparkContext执行提交作业操作,触发RDD DAG(有向无环图)的执行 ②所有的动作算子都是急迫型(non-lazy),RDD遇到Action就会立即计算
5.RDD转换算子 (1)map ①对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD ②任何原RDD中的元素在新RDD中都有且只有一个元素与之对应 ③输入分区与输出分区一一对应
/将原RDD中每个元素都乘以2来产生一个新的RDD val a=sc.parallelize(1 to 9) val b=a.map(x=>x*2) a.collect b.collect //map把普通RDD变成PairRDD val a=sc.parallelize(List("dog","tiger","lion","cat","panther")) val b=a.map(x=>(x,1)) b.collect(2)filter 对元素进行过滤,对每个元素应用指定函数,返回值为true的元素保留在新的RDD中
val a=sc.parallelize(1 to 10) a.filter(_%2==0).collect a.filter(_<4).collect //map&filter val rdd=sc.parallelize(List(1 to 6)) val mapRdd=rdd.map(_*2) mapRdd.collect val filterRdd=mapRdd.filter(_>5) filterRdd.collect(3)mapValues 原RDD中的Key保持不变,与新的Value一起组成新的RDD中的元素,仅适用于PairRDD
val a=sc.parallelize(List("dog","tiger","lion","cat","panther","eagle")) val b=a.map(x=>(x.length,x)) b.mapValues("x"+_+"x").collect //Array((3,xdogx), (5,xtigerx), (4,xlionx), (3,xcatx), (7,xpantherx), (5,xeaglex))(4)distinct 去重
val dis = sc.parallelize(List(1,2,3,4,5,6,7,8,9,9,2,6)) dis.distinct.collect dis.distinct(2).partitions.length(5)sortBy 返回一个新的RDD,输入元素经过func函数计算后,按照指定的方式进行排序。(默认方式为false,升序;true是降序)
val rdd1 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10)) val rdd2 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10)).map(_*2).sortBy(x=>x,true) val rdd3 = rdd2.filter(_>10) val rdd2 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10)).map(_*2).sortBy(x=>x+"",true) val rdd2 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10)).map(_*2).sortBy(x=>x.toString,true)(6)flatMap 类似于map,但是每一个输入元素可以被映射为0或多个输出元素,类似于先map,然后再flatten。
val rdd4 = sc.parallelize(Array("a b c", "d e f", "h i j")) rdd4.flatMap(_.split(' ')).collect ------------------------------------------------------------------ val rdd5 = sc.parallelize(List(List("a b c", "a b b"),List("e f g", "a f g"), List("h i j", "a a b"))) rdd5.flatMap(_.flatMap(_.split(" "))).collect(7)union 求并集,注意类型要一致
val u1 = sc.parallelize(1 to 3) val u2 = sc.parallelize(3 to 4) u1.union(u2).collect (u1 ++ u2).collect(8)intersection 求交集
u1.intersection(u2).collect(9)join,leftOuterJoin,rightOuterJoin 两个需要连接的 RDD 进行 cogroup函数操作,将相同 key 的数据能够放到一个分区,在 cogroup 操作之后形成的新 RDD 对每个key 下的元素进行笛卡尔积的操作,返回的结果再展平,对应 key 下的所有元组形成一个集合。最后返回 RDD[(K, (V, W))]。
val j1 = sc.parallelize(List("abe", "abby", "apple")).map(a => (a, 1)) val j2 = sc.parallelize(List("apple", "beatty", "beatrice")).map(a => (a, 1)) j1.join(j2).collect j1.leftOuterJoin(j2).collect j1.rightOuterJoin(j2).collect(10)groupByKey 在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD----只针对数据是对偶元组的
val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 2), ("kitty", 3))) val rdd2 = sc.parallelize(List(("jerry", 9), ("tom", 8), ("shuke", 7))) val rdd3 = rdd1 union rdd2 val rdd4 = rdd3.groupByKey.collect rdd4: Array[(String, Iterable[Int])] = Array((tom,CompactBuffer(8, 1)), (shuke,CompactBuffer(7)), (kitty,CompactBuffer(3)), (jerry,CompactBuffer(9, 2)))(11)groupBy 传入一个参数的函数,按照传入的参数为key,返回一个新的RDD[(K, Iterable[T])],value是所有可以相同的传入数据组成的迭代器。
scala> val rdd1=sc.parallelize(List(("a",1,2),("b",1,1),("a",4,5))) rdd1: org.apache.spark.rdd.RDD[(String, Int, Int)] = ParallelCollectionRDD[47] at parallelize at <console>:24 scala> rdd1.groupBy(_._1).collect res18: Array[(String, Iterable[(String, Int, Int)])] = Array((a,CompactBuffer((a,1,2), (a,4,5))), (b,CompactBuffer((b,1,1))))(12)reduceByKey 在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置。
val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 2), ("kitty", 3))) val rdd2 = sc.parallelize(List(("jerry", 9), ("tom", 8), ("shuke", 7))) val rdd3 = rdd1 union rdd2 val rdd6 = rdd3.reduceByKey(_+_).collect rdd6: Array[(String, Int)] = Array((tom,9), (shuke,7), (kitty,3), (jerry,11))6.RDD动作算子 (1)count 返回的是数据集中的元素的个数
val rdd=sc.parallelize(List(1,2,3,4,5,6)) rdd.count(2)collect 以Array返回RDD的所有元素。一般在过滤或者处理足够小的结果的时候使用
val rdd=sc.parallelize(List(1,2,3,4,5,6)) rdd.collect(3)take 返回前n个元素
val rdd=sc.parallelize(List(1,2,3,4,5,6)) rdd.take(3)(4)first 返回RDD第一个元素
val rdd=sc.parallelize(List(1,2,3,4,5,6)) rdd.first(5)reduce 根据指定函数,对RDD中的元素进行两两计算,返回计算结果
val a=sc.parallelize(1 to 100) a.reduce((x,y)=>x+y) a.reduce(_+_) //与上面等价 val b=sc.parallelize(Array(("A",0), ("A",2), ("B",1), ("B",2), ("C",1))) b.reduce((x,y)=>{(x._1+y._1,x._2+y._2)}) //(AABBC,6)(6)foreach 对RDD中的每个元素都使用指定函数,无返回值
val rdd=sc.parallelize(1 to 100) rdd.foreach(println)(7)lookup 用于PairRDD,返回K对应的所有V值
val rdd=sc.parallelize(List(('a',1), ('a',2), ('b',3), ('c',4))) rdd.lookup('a') //输出WrappedArray(1, 2)(8)max,min 最值:返回最大值、最小值
val y=sc.parallelize(10 to 30) y.max //求最大值 y.min //求最小值(9)saveAsTextFile 保存RDD数据至文件系统
val rdd=sc.parallelize(1 to 10,2) rdd.saveAsTextFile("hdfs://hadoop000:8020/data/rddsave/")