Apache Spark基础及架构

it2024-11-12  13

为什么使用Spark

1.MapReduce编程模型的局限性 (1)繁杂: 只有Map和Reduce两个操作,复杂的逻辑需要大量的样板代码 (2)处理效率低: ①Map中间结果写磁盘,Reduce写HDFS,多个Map通过HDFS交换数据 ②任务调度与启动开销大 (3)不适合迭代处理、交互式处理和流式处理

2.Spark是类Hadoop MapReduce的通用并行框架 (1)Job中间输出结果可以保存在内存,不再需要读写HDFS (2)比MapReduce平均快10倍以上

3.Spark优势 (1)速度快 ①基于内存数据处理,比MR快100个数量级以上(逻辑回归算法测试) ②基于硬盘数据处理,比MR快10个数量级以上 (2)易用性 ①支持Java、Scala、Python、R语言 ②交互式shell方便开发测试 (3)通用性 一栈式解决方案:批处理、交互式查询、实时流处理、图计算及机器学习 (4)多种运行模式 YARN、Mesos、EC2、Kubernetes、Standalone、Local

Spark架构

1.Spark技术栈 (1)Spark Core 核心组件,分布式计算引擎 (2)Spark SQL 高性能的基于Hadoop的SQL解决方案 (3)Spark Streaming 可以实现高吞吐量、具备容错机制的准实时流处理系统 (4)Spark GraphX 分布式图处理框架 (5)Spark MLlib 构建在Spark上的分布式机器学习库

2.Spark架构设计 (1)在驱动程序中,通过SparkContext主导应用的执行 (2)SparkContext可以连接不同类型的Cluster Manager(Standalone、YARN、Mesos),连接后,获得集群节点上的Executor (3)一个Worker节点默认一个Executor,可通过SPARK_WORKER_INSTANCES调整 (4)每个应用获取自己的Executor (5)每个Task处理一个RDD分区

3.Spark架构核心组件 (1)Application 建立在Spark上的用户程序,包括Driver代码和运行在集群各节点Executor中的代码 (2)Driver program 驱动程序。Application中的main函数并创建SparkContext (3)Cluster Manager 在集群(Standalone、Mesos、YARN)上获取资源的外部服务 (4)Worker Node 集群中任何可以运行Application代码的节点 (5)Executor 某个Application运行在worker节点上的一个进程 (6)Job 包含多个Task组成的并行计算,往往由Spark Action触发生成,一个Application中往往会产生多个Job (7)Stage 每个Job会被拆分成多组Task,作为一个TaskSet,其名称为Stage (8)Task 被送到某个Executor上的工作单元

SparkAPI

1.SparkContext (1)连接Driver与Spark Cluster(Workers) (2)Spark的主入口 (3)每个JVM仅能有一个活跃的SparkContext (4)应用:

import org.apache.spark.{SparkConf, SparkContext} val conf=new SparkConf().setMaster("local[2]").setAppName("HelloSpark") val sc=SparkContext.getOrCreate(conf)

2.SparkSession (1)Spark 2.0+应用程序的主入口:包含了SparkContext、SQLContext、HiveContext以及StreamingContext (2)应用:

import org.apache.spark.sql.SparkSession val spark = SparkSession.builder .master("local[2]") .appName("appName") .getOrCreate()

3.RDD(弹性分布式数据集) Spark核心,主要数据抽象 (1)概念: ①RDD是将数据项拆分为多个分区的集合,存储在集群的工作节点上的内存和磁盘中,并执行正确的操作 ②RDD是用于数据转换的接口,如map、filter等 ③RDD指向了存储在HDFS、Cassandra、HBase等、或缓存(内存、内存+磁盘、仅磁盘等)、或在故障或缓存收回时重新计算其他RDD分区中的数据。

(2)分布式(Distributed): 数据的计算并非只局限于单个节点,而是多个节点之间协同计算得到 (3)数据集(Datasets): ①RDD是只读的、分区记录的集合,每个分区分布在集群的不同节点上 ②RDD并不存储真正的数据,只是对数据和操作的描述

(4)弹性(Resilient): ①自动进行存储方式的切换 RDD优先存储内存中,内存不足将自动写入磁盘 ②基于Linage的高效容错机制 在任何时候都能进行重算,根据数据血统,可以自动从节点失败中恢复分区,各个分片之间的数据互不影响 ③Stage失败自动重试 / Task失败自动重试 ④Checkpoint和Persist checkpoint持久化到文件系统。

4.Dataset 从Spark1.6开始引入的新的抽象,特定领域对象中的强类型集合,它可以使用函数或者相关操作并行地进行转换等操作。

5.DataFrame DataFrame是特殊的Dataset

SparkRDD

1.特性 (1)一系列的分区(分片)信息,每个任务处理一个分区 RDD由很多partition构成,在spark中,计算式,有多少partition就对应有多少task来执行 (2)每个分区上都有compute函数,计算该分区中的数据 对RDD做计算,相当于对RDD的每个split或partition做计算 (3)RDD之间有一系列的依赖 RDD之间有依赖关系,可溯源 (4)分区器决定数据(key-value)分配至哪个分区 如果RDD里面存的数据是Key-Value形式,则可传递到一个自定义的partition进行重新分区, 比如可以按key的hash值进行分区 (5)优先位置列表,将计算任务分派到其所在处理数据块的存储位置(移动数据不如移动计算) 最优的位置去计算,也就是数据的本地性

2.RDD的创建 (1)使用集合创建RDD

val rdd=sc.parallelize(List(1,2,3,4,5,6)) rdd.count rdd.partitions.size //设置分区数 val rdd=sc.parallelize(List(1,2,3,4,5,6),5) rdd.partitions.size val rdd=sc.makeRDD(List(1,2,3,4,5,6))

①Spark默认会根据集群的情况来设置分区的数量,也可以通过parallelize()第二参数来指定 ②Spark会为每一个分区运行一个任务进行处理

(2)通过加载文件产生RDD ①加载“file://……”时,以local运行仅需一份本地文件,以Spark集群方式运行,应保证每个节点均有该文件的本地副本

//文件中的一行文本作为RDD的一个元素 val distFile=sc.textFile("file:///home/hadoop/data/hello.txt") distFile.count val distHDFSFile=sc.textFile("hdfs://hadoop000:9000/hello.txt")

②支持目录、压缩文件以及通配符

//1.Spark默认访问HDFS //2.Spark默认为HDFS文件的每一个数据块创建一个分区,也可以通过textFile()第二个参数指定,但只能比数据块数量多 sc.textFile("/my/directory") sc.textFile("/my/directory/*.txt") sc.textFile("/my/directory/*.gz")

(3)创建PairRDD park 为包含键值对类型的 RDD 提供了一些专有的操作,比如:reduceByKey()、groupByKey()……也可以通过键值对集合创建PairRDD:sc.parallelize(List((1,2),(1,3))) ①SparkContext.wholeTextFiles():可以针对一个目录中的大量小文件返回<filename,fileContent>作为PairRDD ②普通RDD:org.apache.spark.rdd.RDD[data_type] ③PairRDD:org.apache.spark.rdd.RDD[(key_type,value_type)]

(4)其他创建RDD的方法 ①SparkContext.sequenceFileK,V Hadoop SequenceFile的读写支持 ②SparkContext.hadoopRDD()、newAPIHadoopRDD() 从Hadoop接口API创建 ③SparkContext.objectFile() RDD.saveAsObjectFile()的逆操作 (5)RDD创建方式最佳实践: ①测试环境 使用内存集合创建RDD 使用本地文件创建RDD ②生产环境 使用HDFS文件创建RDD

3.RDD分区 分区是被拆分并发送到节点的RDD的不同块之一。 (1)我们拥有的分区越多,得到的并行性就越强 (2)每个分区都是被分发到不同Worker Node的候选者 (3)每个分区对应一个Task

4.RDD操作 分为lazy与non-lazy两种 (1)Transformation(lazy):也称转换操作、转换算子 对于转换操作,RDD的所有转换都不会直接计算结果 ①仅记录作用于RDD上的操作 ②当遇到动作算子(Action)时才会进行真正计算 (2)Actions(non-lazy):立即执行,也称动作操作、动作算子 ①本质上动作算子通过SparkContext执行提交作业操作,触发RDD DAG(有向无环图)的执行 ②所有的动作算子都是急迫型(non-lazy),RDD遇到Action就会立即计算

5.RDD转换算子 (1)map ①对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD ②任何原RDD中的元素在新RDD中都有且只有一个元素与之对应 ③输入分区与输出分区一一对应

/将原RDD中每个元素都乘以2来产生一个新的RDD val a=sc.parallelize(1 to 9) val b=a.map(x=>x*2) a.collect b.collect //map把普通RDD变成PairRDD val a=sc.parallelize(List("dog","tiger","lion","cat","panther")) val b=a.map(x=>(x,1)) b.collect

(2)filter 对元素进行过滤,对每个元素应用指定函数,返回值为true的元素保留在新的RDD中

val a=sc.parallelize(1 to 10) a.filter(_%2==0).collect a.filter(_<4).collect //map&filter val rdd=sc.parallelize(List(1 to 6)) val mapRdd=rdd.map(_*2) mapRdd.collect val filterRdd=mapRdd.filter(_>5) filterRdd.collect

(3)mapValues 原RDD中的Key保持不变,与新的Value一起组成新的RDD中的元素,仅适用于PairRDD

val a=sc.parallelize(List("dog","tiger","lion","cat","panther","eagle")) val b=a.map(x=>(x.length,x)) b.mapValues("x"+_+"x").collect //Array((3,xdogx), (5,xtigerx), (4,xlionx), (3,xcatx), (7,xpantherx), (5,xeaglex))

(4)distinct 去重

val dis = sc.parallelize(List(1,2,3,4,5,6,7,8,9,9,2,6)) dis.distinct.collect dis.distinct(2).partitions.length

(5)sortBy 返回一个新的RDD,输入元素经过func函数计算后,按照指定的方式进行排序。(默认方式为false,升序;true是降序)

val rdd1 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10)) val rdd2 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10)).map(_*2).sortBy(x=>x,true) val rdd3 = rdd2.filter(_>10) val rdd2 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10)).map(_*2).sortBy(x=>x+"",true) val rdd2 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10)).map(_*2).sortBy(x=>x.toString,true)

(6)flatMap 类似于map,但是每一个输入元素可以被映射为0或多个输出元素,类似于先map,然后再flatten。

val rdd4 = sc.parallelize(Array("a b c", "d e f", "h i j")) rdd4.flatMap(_.split(' ')).collect ------------------------------------------------------------------ val rdd5 = sc.parallelize(List(List("a b c", "a b b"),List("e f g", "a f g"), List("h i j", "a a b"))) rdd5.flatMap(_.flatMap(_.split(" "))).collect

(7)union 求并集,注意类型要一致

val u1 = sc.parallelize(1 to 3) val u2 = sc.parallelize(3 to 4) u1.union(u2).collect (u1 ++ u2).collect

(8)intersection 求交集

u1.intersection(u2).collect

(9)join,leftOuterJoin,rightOuterJoin 两个需要连接的 RDD 进行 cogroup函数操作,将相同 key 的数据能够放到一个分区,在 cogroup 操作之后形成的新 RDD 对每个key 下的元素进行笛卡尔积的操作,返回的结果再展平,对应 key 下的所有元组形成一个集合。最后返回 RDD[(K, (V, W))]。

val j1 = sc.parallelize(List("abe", "abby", "apple")).map(a => (a, 1)) val j2 = sc.parallelize(List("apple", "beatty", "beatrice")).map(a => (a, 1)) j1.join(j2).collect j1.leftOuterJoin(j2).collect j1.rightOuterJoin(j2).collect

(10)groupByKey 在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD----只针对数据是对偶元组的

val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 2), ("kitty", 3))) val rdd2 = sc.parallelize(List(("jerry", 9), ("tom", 8), ("shuke", 7))) val rdd3 = rdd1 union rdd2 val rdd4 = rdd3.groupByKey.collect rdd4: Array[(String, Iterable[Int])] = Array((tom,CompactBuffer(8, 1)), (shuke,CompactBuffer(7)), (kitty,CompactBuffer(3)), (jerry,CompactBuffer(9, 2)))

(11)groupBy 传入一个参数的函数,按照传入的参数为key,返回一个新的RDD[(K, Iterable[T])],value是所有可以相同的传入数据组成的迭代器。

scala> val rdd1=sc.parallelize(List(("a",1,2),("b",1,1),("a",4,5))) rdd1: org.apache.spark.rdd.RDD[(String, Int, Int)] = ParallelCollectionRDD[47] at parallelize at <console>:24 scala> rdd1.groupBy(_._1).collect res18: Array[(String, Iterable[(String, Int, Int)])] = Array((a,CompactBuffer((a,1,2), (a,4,5))), (b,CompactBuffer((b,1,1))))

(12)reduceByKey 在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置。

val rdd1 = sc.parallelize(List(("tom", 1), ("jerry", 2), ("kitty", 3))) val rdd2 = sc.parallelize(List(("jerry", 9), ("tom", 8), ("shuke", 7))) val rdd3 = rdd1 union rdd2 val rdd6 = rdd3.reduceByKey(_+_).collect rdd6: Array[(String, Int)] = Array((tom,9), (shuke,7), (kitty,3), (jerry,11))

6.RDD动作算子 (1)count 返回的是数据集中的元素的个数

val rdd=sc.parallelize(List(1,2,3,4,5,6)) rdd.count

(2)collect 以Array返回RDD的所有元素。一般在过滤或者处理足够小的结果的时候使用

val rdd=sc.parallelize(List(1,2,3,4,5,6)) rdd.collect

(3)take 返回前n个元素

val rdd=sc.parallelize(List(1,2,3,4,5,6)) rdd.take(3)

(4)first 返回RDD第一个元素

val rdd=sc.parallelize(List(1,2,3,4,5,6)) rdd.first

(5)reduce 根据指定函数,对RDD中的元素进行两两计算,返回计算结果

val a=sc.parallelize(1 to 100) a.reduce((x,y)=>x+y) a.reduce(_+_) //与上面等价 val b=sc.parallelize(Array(("A",0), ("A",2), ("B",1), ("B",2), ("C",1))) b.reduce((x,y)=>{(x._1+y._1,x._2+y._2)}) //(AABBC,6)

(6)foreach 对RDD中的每个元素都使用指定函数,无返回值

val rdd=sc.parallelize(1 to 100) rdd.foreach(println)

(7)lookup 用于PairRDD,返回K对应的所有V值

val rdd=sc.parallelize(List(('a',1), ('a',2), ('b',3), ('c',4))) rdd.lookup('a') //输出WrappedArray(1, 2)

(8)max,min 最值:返回最大值、最小值

val y=sc.parallelize(10 to 30) y.max //求最大值 y.min //求最小值

(9)saveAsTextFile 保存RDD数据至文件系统

val rdd=sc.parallelize(1 to 10,2) rdd.saveAsTextFile("hdfs://hadoop000:8020/data/rddsave/")
最新回复(0)