Spark: 快速统一的分析引擎!
--统一性作何解? 统一: a) 涵盖了所有的数据分析场景(离线,实时,sql,图计算,ML) b) 可以对接多种数据源( orc,parquet,mysql,hbase,es,hdfs)DAG: 有向无环图!
Hadoop 中计算的引擎 MR :不支持DAG!
分析一个复杂的任务:
Job1(Mapper------Reducer) <-- Job2(Mapper------Reducer) Hive : select * from (select * from t1 join t2 on xxx group by xxx order by xxx) t3 join t4 group by xxx order by xxx 翻译为N个Job 运算! 根本原因在于Hadoop的MR不支持DAG! Spark: Spark支持DAG运算,可以将复杂的任务,通过划分stage(阶段)的方式,整合到一个Job中! select * from (select * from t1 join t2 on xxx group by xxx order by xxx) t3 join t4 group by xxx order by xxx 使用spark执行,只需要启动一个Job!将支持DAG的运算引擎,称为第二代大数据计算引擎: Spark ,Tez
结论:
--Spark比hadoop快的原因 a) 基于内存运算,减少落盘的IO操作。 需要更强大的硬件支持! b) 支持DAG,只需要一个Job就可以完成复杂的运算DAG是Spark区分Hadoop MR的根本特征!
OOP------> Java,Python,C++
①new SparkConf
注意: 在一个JVM中,只能创建一个SparkContext!
可以调用Stop,在创建新的SparkContext之前!
安装:将spark-3.0.0-bin-hadoop3.2.tgz解压到非中文无空格目录!
测试:
spark-shell:交互环境
println(sc.textFile("/home/atguigu/input") .flatMap(_.split(" ")) .map((_, 1)) .reduceByKey(_ + _) .collect().mkString(",")) spark-submit:用来提交jar包
# spark-submit [options] jar包 jar包中类的参数 /opt/module/spark-local/bin/spark-submit --master local #通过本地服务将job提交到集群上 --class com.atguigu.spark.WordCount1 #调用jar包的哪个类 /home/atguigu/wc.jar /home/atguigu/input
配置需要在哪些机器启动worker!
①在一台机器安装spark ②编辑$SPARK_HOME/conf/slaves
#在哪些机器启动worker hadoop102 hadoop103 hadoop104告诉worker,Master在哪个进程,通过哪个端口号可以和master通信
③编辑 $SPARK_HOME/conf/spark-env.sh
#告诉每个worker,集群中的master在哪个机器 SPARK_MASTER_HOST=hadoop103 ##告诉每个worker,集群中的master绑定的rpc端口号 SPARK_MASTER_PORT=7077④分发到集群
⑤启动
# 在当前机器启动master,必须在master配置的那台机器启动 sbin/start-all.sh⑥查看是否启动
hadoop103:8080spark-shell:交互环境(是一个RPEL环境,能够立刻出结果,一般用于调试)
启动:bin/spark-shell --master spark://hadoop103:7077
println(sc.textFile("hdfs://hadoop102:8020/input") .flatMap(_.split(" ")) .map((_, 1)) .reduceByKey(_ + _,3) .reduceByKey(_ + _,2) .collect().mkString(","))spark-submit:用来提交jar包
用法: spark-submit [options] jar包 jar包中类的参数
/opt/module/spark-local/bin/spark-submit --master spark://hadoop103:7077 #通过Hadoop103的7077端口将job提交到集群上 --class com.atguigu.spark.WordCount1 #调用jar包的哪个类,要执行程序的主类 /home/atguigu/wc.jar #程序主类所在的jar /home/atguigu/input =故障处理=
区别:
①Driver程序运行位置的不同会影响结果的查看!
client: 将某些算子的结果收集到client端!
要求client端Driver不能中止!
cluster(生产): 需要在Driver运行的executor上查看日志!
②client: jar包只需要在client端有!
cluster: 保证jar包可以在集群的任意台worker都可以读到!
===================================================================================================================================================================================================================================================================================================================================================================================================================================================
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-84reymjU-1603267550451)(Spark0621.assets/image-20200922002641049.png)]
其中将结果打印这一步的collect()方法:(将RDD中的元素转成数组,在driver中打包成array)
Return an array that contains all of the elements in this RDD.解释为:该方法返回一个数组,数组里面包含了RDD里面所有的元素
举个例子:
result是最后运算的结果,所有RDD相关的都在executor上运行。executor都在集群里面, result运行结束之后,要运行将结果打印出来那段代码,这段代码在driver里面 (sparkcontext在哪driver就在哪) 在driver端打印一段数组前提是啥? 前提是result这个RDD肯定是在一个exector上,即executor上有这个result 现在有一个driver,我们说了根据模式不一样,driver所在位置不一样 --如果是client模式, 就在客户端,那么客户端就需要将result的结果打印,就需要将result的结果发给我才能打印,此时这个结果打印在客户端[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-XCiemgIn-1603267550454)(Spark0621.assets/image-20200922004259681.png)]
--如果是集群模式 则result在某个集群上打印,但客户端在client(假设为103)上提交,如果此时该客户端上执行了一个sparksubmint命令,此时103就是客户端,但是driver有可能在102,此时在客户端无法看见结果,只有client模式才能看见,这个时候你就需要找,找driver运行的exeecutor才能看见结果此时找到集群,Hadoop103:8080找到102的8081上
====================================================================================================================================================================================================================================================================================================================================================
①配置 SPARK_HOME/conf/spark-defaults.conf 类比(mapred-site.xml)
影响的是启动的spark应用sql
spark.eventLog.enabled true #日志保存目录需要手动创建 spark.eventLog.dir hdfs://hadoop102:8020/sparklogs ②配置 SPARK_HOME/conf/spark-env.sh**
影响的是spark的历史服务进程
export SPARK_HISTORY_OPTS=" -Dspark.history.ui.port=18080 -Dspark.history.fs.logDirectory=hdfs://hadoop102:8020/sparklogs -Dspark.history.retainedApplications=30" ③分发以上文件,重启集群
④启动历史服务进程
sbin/start-historyserver.shYARN: 集群!
Spark: 相对于YARN来说,就是一个客户端,提交Job到YARN上运行!
MR ON YARN提交流程:
①Driver 请求 RM,申请一个applicatiion
②RM接受请求,此时,RM会首先准备一个Container运行 ApplicationMaster
③ ApplicationMaster启动完成后,向RM进行注册,向RM申请资源运行
其他任务
Hadoop MR : 申请Container运行MapTask,ReduceTask
Spark: 申请Container运行 Executor
只需要选择任意一台可以连接上YARN的机器,安装Spark即可!
确认下YARN的 yarn-site.xml配置中
<!--是否启动一个线程检查每个任务正使用的物理内存量,如果任务超出分配值,则直接将其杀掉,默认是true --> <property> <name>yarn.nodemanager.pmem-check-enabled</name> <value>false</value> </property> <!--是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认是true --> <property> <name>yarn.nodemanager.vmem-check-enabled</name> <value>false</value> </property> <property> <name>yarn.log.server.url</name> <value>http://hadoop102:19888/jobhistory/logs</value> </property>分发 yarn-site.xml,重启YARN!
编辑 SPARK_HOME/conf/spark-env.sh
YARN_CONF_DIR=/opt/module/hadoop/etc/hadoop export SPARK_HISTORY_OPTS=" -Dspark.history.ui.port=18080 -Dspark.history.fs.logDirectory=hdfs://hadoop102:8020/sparklogs -Dspark.history.retainedApplications=30"编辑 SPARK_HOME/conf/spark-defaults.conf
spark.eventLog.enabled true #需要手动创建 spark.eventLog.dir hdfs://hadoop102:8020/sparklogs #spark历史服务的地址和端口 spark.yarn.historyServer.address=hadoop103:18080 spark.history.ui.port=18080spark-shell --master yarn: 交互式环境
println(sc.textFile("hdfs://hadoop102:8020/input") .flatMap(_.split(" ")) .map((_, 1)) .reduceByKey(_ + _,3) .collect().mkString(","))spark-submit: 用来提交jar包
用法: spark-submit [options] jar包 jar包中类的参数
spark-on-yarn client模式:
/opt/module/spark-yarn/bin/spark-submit --class com.atguigu.spark.WordCount1 /home/atguigu/wc.jar hdfs://hadoop102:8020/inputspark-on-yarn cluster模式:
/opt/module/spark-yarn/bin/spark-submit --deploy-mode cluster --class com.atguigu.spark.WordCount1 /home/atguigu/wc.jar hdfs://hadoop102:8020/inputAM:Application master
yarn上每提交一个job,就要启自己的application master,只允许启动一个master则只能有一个job
解决方法一:直接杀死前一个进程
解决方法二:修改一下配置
cd /opt/module/hadoop/etc/hadoop/ vim capacity-scheduler.xml <property> <name>yarn.scheduler.capacity.maximum-am-resource-percent</name> <value>"0.5"</value> <description> Maximum percent of resources in the cluster which can be used to run application masters i.e. controls number of concurrent running applications. </description> </property> xsync capacity-scheduler.xml 重启动yarn调度和运行是两回事
https://www.processon.com/view/link/5f60e3fb6376894e32710fd4
https://www.processon.com/view/link/5f4876d46376890e62f62889
运行spark-submit命令【当前是client模式,在client端运行driver】 启动Driver 向ResourceManager提交应用,同时启动ApplicationMaster(ExecutorLaunchar)【在yarn上运行需要appmaster】 ExecutorLaunchar启动之后,需要向ResourceMAnager注册,告诉ResourceMAnager我启动好了 之后ExecutorLaunchar向NodeManager申请container【container是由nodemanager分配的】 于是就和nodemanager通信,告诉nodemanager把container准备好,我要启动了 于是nodemanager就启动了YarnCoarseGraindExecutorBackend(Executor)【executor是负责task运算的,但task谁发给我呢,driver发给我】 于是通知driver我启动好了,然后driver就给我new了一个executor【这是一个对象,这个对象负责运算task】 此时又向driver发申请,申请给我发任务 driver向executor发送任务(offer) 然后executor这个线程接收到task,会启动许多个线程,来运行task ----运行框架
--目前明白几个东西: 1.Sparkcontext所在的程序称为driver程序 2.有一个集群,driver程序把job提交到集群上后,会在集群上启动executor(类似于maptask,reducertask) 3.executor这个进程会有很多线程算RDD的算子提交模式
--Standalone模式 集群是master和worker,在worker上启executor --YARN模式 在nodemanager上准备一个container,在container上其executor*driver会监控每个计算节点(Executor),如果计算时某个节点任务报错了,他会将这个出错订单任务继续发到别的计算节点(Executor)上继续运算*
https://www.processon.com/view/link/5f699d387d9c087da1bbf254
分区数增多一定会shuffer,shuffer不一定分区增加,就数据需要重新分区的时候会shuffer,即分区之间交换数据也会shuffer 比如有两个行动算子,提交了两个job任务, --提交的job任务通过DAGScheduler划分出不同的阶段, 以job1为例,这里job1这个任务有四个算子,其中第三个算子运算的时候会进行数据的重新分配,产生新的分区,RDD3到RDD4就是shuffle阶段,这个job1就被分成了两个阶段(stage0和stage1),其中RDD1,RDD2,RDD3在第一个阶段,每个阶段会有任务,至于这个任务是怎么运算的,(以task0为例,task0只算o号阶段0号分区的数据,只算Rdd3的0号分区的数据,调用的是RDD3的compute方法,compute计算的话就是需要数据和逻辑,数据就是0号区的数据,RDD3的0号分区数据是从哪来的呢,是从上一个算子的0号分区来的,这些RDD之间通过依赖关系找父RDD,那么最原始的数据从哪里来的呢,我们调用了textFile这个算子,从hdfs上拿到了数据,RDD0的0号分区调用computer,compute通过计算0号分区的数据,得到了RDD2的0分区的数据,再拿RDD3的compute方法的计算逻辑得到我自己的0号分区的数据数据)stage0的最后一个算子有两个分区,就意味着这个阶段每个算子都有两个分区,每个分区由一个task任务进行计算,stage1有三个分区,便有三个task进行计算,job一共有5个task。 --通过TaskScheduler进行调度,通过Task的本地化级别,负载均衡,黑名单等综合考虑调度 --将Task任务发送给集群上的Executor 假设这里有两个Executor进程,其中Executor1收到了三个Task任务,Executor2收到了两个Task任务,每个Task会启动一个线程,这里Executor1启动三个线程,Executor2启动两个线程,每个线程启动的顺序是不一样的,Executor先收到哪个Task,就先启动那个任务对应的线程, 又stage0和stage1这两个阶段之间会有shuffer,stage0上的task这里类比为maptask,stage1上的task这里类比为reducetask,maptask需要将结果溢写到磁盘(至于怎么写的,就涉及到下一个阶段),这里的下一个阶段就是stage1这个阶段,这个阶段有三个分区,就需要将stage0上的task重新分配成三个分区,接着就是reducer阶段,这里就是stage1上的Task0任务找到stage0上的task重新分配成的三个分区的第一个分区,重新分配结束之后,最后的结果就有三个区,RDD4有一个collect方法,将分成的三个区的数据收集到一个数组上,在driver上打印出来。MapReduce: 有shuffle
MapTask: context.write(k,v)------>分区------>排序-------->溢写…
combine
分区的目的:①分类,将相同的数据分到一个区
每个ReduceTask都会处理一个分区的数据
②并行运算
Kafka: 有Topic,有Partition
分区的目的: ①方便主题的扩展
②分区多,同时运行多个消费者线程并行消费,提升消费速率
Spark: 分区。 RDD(数据的集合),对集合进行分区!
分区的目的: ①分类(使用合适的分区器)
②并行运算(主要目的)
======================================================================================================================================================================================================================================================================================================================================================================================================
客户端:就是命令用来干嘛的。在哪提交job哪就是客户端服务端:就是你那块集群master:standalone模式的两个进程之一,在集群上worker:standalone模式的两个进程之一,在集群上driver:叫做驱动,在哪创建SparkContext --因为job要提交(driver主要完成job提交的) --就要有一个集群通信,sparkcontext就是维持一个集群的连接,无它就不能提交, --所以sparkcontext在哪那就是driverexecutor:计算器,计算者。是一个进程,负责Job核心运算逻辑的运行!
一个负责计算的进程
类似于hadoop中的
MapTask – > 运行mapper里面的map方法
ReduceTask --> 运行rducer里面的reduce方法
map方法和reduce方法都是编程模型
分布式运算:
--executor是一个进程,负责运行spark的编程模型,spark的编程模型RDD --所有调用RDD的,都在exector端运行,Exector端在集群里面 --所以客户端可以任意机器,例如102,103,104,甚至110,只要110能够联上功能集群, --此时RDD里面的这堆算子就在集群里面算,这叫分布式运算 7.stage:阶段。一个Job的stage的数量= shuffle算子的个数+1。 只要遇到会产生shuffle的算子,就会产生新的阶段! 阶段划分的意义: 同一个阶段的每个分区的数据,可以交给1个Task进行处理!
===============================================================================================================================================================================================================================================================================================================================================
转换算子(方法):
将一个RDD 转换为 另一个RDD 方法传入RDD,返回RDD,就是转换算子!行动算子:
将一个RDD提交为一个Job 方法传入RDD,返回值不是RDD,通常都是行动算子! 只有行动算子会提交Job!RDD: RDD在Driver中封装的是计算逻辑,而不是数据!
--以下几点 1.使用RDD编程后,调用了行动算子后,此时会提交一个job,在提交job时,会划分Stage,划分之后,将每个阶段的每个分区使用一个Task进行计算处理! 2.Task封装的就是计算逻辑! 3.Driver将Task发送给Executor执行,Driver只将计算逻辑发送给Executor!Executor在执行计算逻辑时。此时发现,我们需要读取某个数据,举例(textFile) 4.RDD真正被创建是在Executor端! 5.移动计算而不是移动数据,如果读取的数据时HDFS上的数据时,此时HDFS上的数据以block的形式存储在DataNode所在的机器! 6.如果当前Task要计算那片数据,恰好就是对应的那块数据,那块数据在102,当前job在102,104启动了Executor,当前Task发送给102更合适!省略数据的网络传输,直接从本地读取块信息!===============================================================================================================================================================================================================================================================================================================================================
RDD的getpartition方法可以设置分区,
computer函数是在Task调用,executor里的task
===============================================================================================================================================================================================================================================================================================================================================
RDD怎么来: ①new
直接new
调用SparkContext提供的方法
②由一个RDD转换而来
源码:在SparkContext.scala这个文件下
/** Distribute a local Scala collection to form an RDD. * * This method is identical to `parallelize`. * @param seq Scala collection to distribute * @param numSlices number of partitions to divide the collection into * @return RDD representing distributed collection */ 译文: /*分发本地Scala集合以形成RDD。 *此方法与`parallelize`相同。 * @param(参数一) seq Scala集合以分发 * @param(参数二) numSlices将集合划分为的分区数 * @return(返回值) RDD代表分布式集合* / // 返回ParallelCollectionRDD def makeRDD[T: ClassTag]( seq: Seq[T], //数据来源,一个集合 numSlices: Int = defaultParallelism) : RDD[T] = withScope { parallelize(seq, numSlices) //将集合划分到这些分区数上 }从集合中创建RDD
val rdd1: RDD[Int] = sparkContext.makeRDD(list) //等价 val rdd2: RDD[Int] = sparkContext.parallelize(list)设置分区数可以有以下方法:
①:makeRDD的时候传参:在会产生shuffle的算子后面传【自己制定分区数】
例如:.reduceByKey(_ + _,"numPartitions:3") //见双引号部分②:直接修改默认并行度:【配置默认并行度】
例如: val conf: SparkConf = new SparkConf() .setMaster("local[*]") .setAppName("My app") .set("spark.default.parallelism","需要设置的数量")③:以上都没配就看当前cpu申请的总的核数,setMaster(“local[*]”)设置
例如: val conf: SparkConf = new SparkConf() .setMaster("local[*]") //这里[]里可以设置cpu数 .setAppName("My app") .set("spark.default.parallelism","需要设置的数量")numSlices:控制集合创建几个分区
由makeRDD方法知道,如果不设置numSlices,会有一个默认的并行度(defaultParallelism)
--并行和并发的区别: 1.并行:多个线程可以同时进行 两个CPU每个cpu一个线程 2.并发:两个线程抢同一个资源提交一个job的最大并行度是啥呢?
job在提交时会运行一个executor,executor这个进程在container上运行,container容器里面有cpu和内存资源, 假设一个executor申请的每个container都有2个cpu,一共起了10个executor,就有20个cpu,那么最大并行度就是20,意味着可以有20个task同时运行Executor的个数 X Executor申请的cpu的个数 = 最大并行度
疑问?
怎么决定job的提交到底申请几个executor呢
/** Default level of parallelism to use when not given by user (e.g. parallelize and makeRDD). 用户未指定时要使用的默认并行度 */ def defaultParallelism: Int = { assertNotStopped() taskScheduler.defaultParallelism //任务调度器的参数 }输入命令spark-submit 后面是可以写参数的,最大并行度如下图
本地提交的默认最大并行度如何找?如下==============================================================
进入defaultParallelism方法如下:
// Get the default level of parallelism to use in the cluster, as a hint for sizing jobs. def defaultParallelism(): Int //是抽象的,那么就要找实现类,它只有一个实现类可见defaultParallelism是抽象的,那么就要找实现类,它只有一个实现类TaskSchedulerImpl,找该类的defaultParallelism方法,如下:
override def defaultParallelism(): Int = backend.defaultParallelism()它调用了defaultParallelism(),此方法在SchedulerBackend下,代码见下:
def defaultParallelism(): Int可见它又是一个抽象的,SchedulerBackend有以下实现类:
SchedulerBackend
--> CoarseGrainedSchedulerBackend
--> StandaloneSchedulerBackend 【如果往Standalone上提交找这个类】
--> LocalSchedulerBackend 【如果往本地提交找这个类】
至于在哪提交,查看setMaster()参数,我们这里是本地,
继续查看LocalSchedulerBackend这个类的LocalSchedulerBackend这个方法,如下:
override def defaultParallelism(): Int = scheduler.conf.getInt("spark.default.parallelism", totalCores)综上:=====================================================================================================
在本地提交时,defaultParallelism(默认并行度)由以下参数决定:
override def defaultParalleism():Int = // 默认的 SparkConf中没有设置spark.default.parallelism scheduler.conf.getInt("spark.default.parallelism",totalCores)可见我们可以设置最大并行度:
val conf: SparkConf = new SparkConf() .setMaster("local[*]") .setAppName("My app") .set("spark.default.parallelism","需要设置的数量") //这最后一行这块是设置默认最大并行度的默认defaultParallelism=totalCores是不配的,默认等于一
val conf: SparkConf = new SparkConf() .setMaster("local[*]") //上面这行local[]里面填cpu个数,填的是几,那么total就代表是几,就是默认最大并行度是几 .setAppName("My app") .set("spark.default.parallelism","需要设置的数量")默认defaultParallelism=totalCores(当前本地集群可以用的总核数),目的为了最大限度并行运行!
standalone / YARN模式, totalCores是Job申请的总的核数!
本地集群总的核数取决于 : Local[N]
local: 1核
local[2]: 2核
local[*] : 所有核,多少个线程就有多少个cpu
makeRDD方法:
第二个参数numSlices决定分区数,可见numSlices走的是parallelize方法
def makeRDD[T: ClassTag]( seq: Seq[T], //数据来源,一个集合 numSlices: Int = defaultParallelism): RDD[T] = withScope { parallelize(seq, numSlices) //将集合划分到这些分区数上 }parallelize方法:
该方法new 了一个ParallelCollectionRDD(并行的集合RDD)
def parallelize[T: ClassTag]( seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = withScope { assertNotStopped() new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]()) }此时创建的RDD就是ParallelCollectionRDD类型
那么找分区策略的话就找这个RDD对应的分区策略
找ParallelCollectionRDD.scala这个文件的getPartitions方法
override def getPartitions: Array[Partition] = { val slices = ParallelCollectionRDD.slice(data, numSlices).toArray slices.indices.map(i => new ParallelCollectionPartition(id, i, slices(i))).toArray } /* 这个方法又调用了ParallelCollectionRDD的slice方法,传了两个参数,一个数据,一个分区数,然后toArray得到slices,slices又调用indices.map方法,将slices又作为一个数组返回了 */我们再看slice方法
/** * Slice a collection into numSlices sub-collections. One extra thing we do here is to treat Range * collections specially, encoding the slices as other Ranges to minimize memory cost. This makes * it efficient to run Spark over RDDs representing large sets of numbers. And if the collection * is an inclusive Range, we use inclusive range for the last slice. */ ①先看这: def slice[T: ClassTag](seq: Seq[T], numSlices: Int): Seq[Seq[T]] = { // seq:就是传入的数据 // numSlices:就是分区数 // 检查分区数是否合法 if (numSlices < 1) { throw new IllegalArgumentException("Positive number of partitions required") } // Sequences need to be sliced at the same set of index positions for operations // like RDD.zip() to behave as expected /*length:5 numSlices:2 */ def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = { /* (0 until numSlices) = [0,2) 0: (0,2) 1: (2,5) */ (0 until numSlices).iterator.map { i => val start = ((i * length) / numSlices).toInt val end = (((i + 1) * length) / numSlices).toInt (start, end) } } ②再看 seq match { //这里是seq为range case r: Range => positions(r.length, numSlices).zipWithIndex.map { case ((start, end), index) => // If the range is inclusive, use inclusive range for the last slice if (r.isInclusive && index == numSlices - 1) { new Range.Inclusive(r.start + start * r.step, r.end, r.step) } else { new Range(r.start + start * r.step, r.start + end * r.step, r.step) } }.toSeq.asInstanceOf[Seq[Seq[T]]] //适用于Long,Double,BigInteger等范围 case nr: NumericRange[_] => // For ranges of Long, Double, BigInteger, etc val slices = new ArrayBuffer[Seq[T]](numSlices) var r = nr for ((start, end) <- positions(nr.length, numSlices)) { val sliceSize = end - start slices += r.take(sliceSize).asInstanceOf[Seq[T]] r = r.drop(sliceSize) } slices =======================================看下面代码======================================= ..我们是list.... 对range类型进行特殊处理 // 非Range,按此方法处理 case _ => //Array(1,2,3,4,5) val array = seq.toArray // To prevent O(n^2) operations for List etc将list转成array // 返回 {(0,2),(2,5)} // positions(array.length, numSlices) //这里的positions方法在上面 .map { case (start, end) => // Array(1,2,3,4,5).slice // (0,2) => (1,2) // (2,5) => (3,4,5) array.slice(start, end).toSeq }.toSeq }**总结: **
ParallelCollectionRDD在对集合中的元素进行分区时,大致是平均分。如果不能整除,后面的分区会多分!
源码在SparkContext.scala这个文件下
/** * Read a text file from HDFS, a local file system (available on all nodes), or any * Hadoop-supported file system URI, and return it as an RDD of Strings. * The text files must be encoded as UTF-8. * * @param path path to the text file on a supported file system * @param minPartitions suggested minimum number of partitions for the resulting RDD * @return RDD of lines of the text file */ 译文 /* *从HDFS,本地文件系统(在所有节点上都可用)或任何Hadoop支持的文件系统URI中读取文本文件,并将其作为字符串的 RDD 返回。文本文件必须编码为UTF-8。 * @param path --> 所支持文件系统上文本文件的路径 * @param minPartitions --> 建议生成的RDD的最小分区数 * @返回文本文件行的RDD */ def textFile( path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = withScope { assertNotStopped() hadoopFile( path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minPartitions ).map(pair => pair._2.toString).setName(path) }从外部存储创建RDD
@Test def testHadoopRDD() = { val rdd:RDD[String] = sparkContext.textFile("input",4) rdd.saveAsTextFile("output") }defaultMinPartitions:
// 使用defaultParallelism(默认集群的核数) 和 2取最小 def defaultMinPartitions: Int = math.min(defaultParallelism, 2)defaultMinPartitions和minPartitions 不是最终 分区数,但是会影响最终分区数!
最终分区数,取决于切片数!
源码创建如下:
def textFile( path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = withScope { assertNotStopped() hadoopFile( path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minPartitions ).map(pair => pair._2.toString).setName(path) } 由源码可知,textFile方法,需要传两个参数,一个是数据存储路径,另一个就是建议生成的RDD的最小分区数,调用的是hadoopFile这个方法,进入hadoopFile方法,如下
minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope { assertNotStopped() // This is a hack to enforce loading hdfs-site.xml. // See SPARK-11227 for details. FileSystem.getLocal(hadoopConfiguration) // A Hadoop configuration can be about 10 KiB, which is pretty big, so broadcast it. val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration)) val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path) new HadoopRDD( this, confBroadcast, Some(setInputPathsFunc), inputFormatClass, keyClass, valueClass, minPartitions).setName(path) }可知该方法就是new了一个 HadoopRDD,具体的分区策略。需要看HadoopRDD是怎么运行的,进入HadoopRDD,找到getPartitions这个方法,代码如下
override def getPartitions: Array[Partition] = { val jobConf = getJobConf() // add the credentials here as this can be called before SparkContext initialized SparkHadoopUtil.get.addCredentials(jobConf) try { /* ①调用输入格式 getInputFormat做两件事: 第一个切片 第二个RecordReder,将每段数据封装成K,V (org.apache.hadoop.mapred.TextInputFormat)进行切片,切片时, minPartitions=2 */ val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions) /* ②是否过滤空切片后的切片集合,要么是原先的切片 */ val inputSplits = if (ignoreEmptySplits) { allInputSplits.filter(_.getLength > 0) } else { allInputSplits } /* ③如果切的是1片,且是针对文件的切片,做特殊处理 FileSplit:对接的数据源不同。生成的切片就不一样,对接的是文本 或者文件数据,生成的切片都叫FileSplit */ if (inputSplits.length == 1 && inputSplits(0).isInstanceOf[FileSplit]) { val fileSplit = inputSplits(0).asInstanceOf[FileSplit] val path = fileSplit.getPath if (fileSplit.getLength > conf.get(IO_WARNING_LARGEFILETHRESHOLD)) { val codecFactory = new CompressionCodecFactory(jobConf) if (Utils.isFileSplittable(path, codecFactory)) { logWarning(s"Loading one large file ${path.toString} with only one partition, " + s"we can increase partition numbers for improving performance.") } else { logWarning(s"Loading one large unsplittable file ${path.toString} with only one " + s"partition, because the file is compressed by unsplittable compression codec.") } } } /* ④分区数=过滤后的切片数 inputSplits有多少我的长度就有多少 */ val array = new Array[Partition](inputSplits.size) for (i <- 0 until inputSplits.size) { array(i) = new HadoopPartition(id, i, inputSplits(i)) } array } catch { case e: InvalidInputException if ignoreMissingFiles => logWarning(s"${jobConf.get(FileInputFormat.INPUT_DIR)} doesn't exist and no" + s" partitions returned from this path.", e) Array.empty[Partition] } 切的片数,等于我的分区数,看看ignoreEmptySplits这个值是true还是false,点击ignoreEmptySplits,如下:
private val ignoreEmptySplits = sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS)点击HADOOP_RDD_IGNORE_EMPTY_SPLITS,代码如下:
private[spark] val HADOOP_RDD_IGNORE_EMPTY_SPLITS = ConfigBuilder("spark.hadoopRDD.ignoreEmptySplits") .internal() .doc("When true, HadoopRDD/NewHadoopRDD will not create partitions for empty input splits.") .version("2.3.0") .booleanConf .createWithDefault(false)我们看到值默认为false
所以源代码中
/* ②是否过滤空切片后的切片集合,要么是原先的切片 */ val inputSplits = if (ignoreEmptySplits) { allInputSplits.filter(_.getLength > 0) } else { allInputSplits }ignoreEmptySplits默认为空,所以allInputSplits被返回了
综上结论·:在HadoopRDD中切片数等于分区数
接下来看具体时间怎么切的
首先看看HadoopRDD介绍
/** * :: DeveloperApi :: * An RDD that provides core functionality for reading data stored in Hadoop (e.g., files in HDFS, * sources in HBase, or S3), using the older MapReduce API (`org.apache.hadoop.mapred`). * * @param sc The SparkContext to associate the RDD with. * @param broadcastedConf A general Hadoop Configuration, or a subclass of it. If the enclosed * variable references an instance of JobConf, then that JobConf will be used for the Hadoop job. * Otherwise, a new JobConf will be created on each slave using the enclosed Configuration. * @param initLocalJobConfFuncOpt Optional closure used to initialize any JobConf that HadoopRDD * creates. * @param inputFormatClass Storage format of the data to be read. * @param keyClass Class of the key associated with the inputFormatClass. * @param valueClass Class of the value associated with the inputFormatClass. * @param minPartitions Minimum number of HadoopRDD partitions (Hadoop Splits) to generate. * * @note Instantiating this class directly is not recommended, please use * `org.apache.spark.SparkContext.hadoopRDD()` */译文:
DeveloperApi :: *一个RDD,它使用较旧的MapReduce API(`org.apache.hadoop.mapred`)提供核心功能,以读取Hadoop中存储的数据(例如HDFS中的文件,HBase或S3中的源)。 )。 /*HadoopRDD用的是老的包(org.apache.hadoop.mapred)下的输入格式*/ * * @param sc与RDD关联的SparkContext。 * @param broadcastedConf常规Hadoop配置或其子类。如果附带的 *变量引用JobConf的实例,则该JobConf将用于Hadoop作业。 *否则,将使用随附的配置在每个从站上创建一个新的JobConf。 * @param initLocalJobConfFuncOpt可选闭包,用于初始化HadoopRDD创建的任何JobConf。 * @param inputFormatClass要读取的数据的存储格式。 * @param keyClass与inputFormatClass关联的键的类。 * @param valueClass与inputFormatClass关联的值的类。 * @param minPartitions要生成的HadoopRDD分区(Hadoop拆分)的最小数量。 * @note不建议直接实例化此类,请使用 *`org.apache.spark.SparkContext.hadoopRDD()`调试
val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions) //此处打断点第一次进去,进的是getInputFormat,代码如下
protected def getInputFormat(conf: JobConf): InputFormat[K, V] = { val newInputFormat = ReflectionUtils.newInstance(inputFormatClass.asInstanceOf[Class[_]], conf) .asInstanceOf[InputFormat[K, V]] newInputFormat match { case c: Configurable => c.setConf(conf) case _ => } newInputFormat }出来,再进,进去的是getSplits,代码如下:
package org.apache.hadoop.mapred; /** Splits files returned by {@link #listStatus(JobConf)} when * they're too big.*/ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { StopWatch sw = new StopWatch().start(); FileStatus[] files = listStatus(job);发现他是老的包下面的(即是:HadoopRDD调用的是mapred包下旧的API)
默认的输入格式:
TextInputFormat exterends FileInputFormat,
它并没有重写getSplit方法,
因此TextInputFormat 依然使用父类也就是FileInputFormat的getSplit方法进行切片
切片(getSplits在FilInputFormat文件下)完整源码如下:
/*11111111111111111111*/ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { //①统计当前切片的数据的总大小 long totalSize = 0; // compute total size for (FileStatus file: files) { // check we have valid files if (file.isDirectory()) { throw new IOException("Not a file: "+ file.getPath()); } totalSize += file.getLen(); }断点处=================================
// ②计算 goalsize(期望每片大小) long goalSize = totalSize / ("numSplits" == 0 ? 1 : numSplits);问?其中的numsplits从哪来的,见getSplits方法传的参数的红色部分
public InputSplit[] getSplits(JobConf job, int "numSplits") throws IOException {那么numSplits又是谁传过去的呢
来到它的父方法HadoopRDD文件里的getPartitions方法
val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions)可见minPartitions就是numSplits,那么在HadoopRDD文件minPartitions又是哪来的呢?本文件的头部,如下
class HadoopRDD[K, V]( sc: SparkContext, broadcastedConf: Broadcast[SerializableConfiguration], initLocalJobConfFuncOpt: Option[JobConf => Unit], inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], minPartitions: Int)可见最后一个属性就是minPartitions,构造HadoopRDD的时候已经将minPartitions传过来了
那么是怎么构造的呢?
进入textFile方法,如下
def textFile( path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = withScope { assertNotStopped() hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minPartitions).map(pair => pair._2.toString).setName(path) }发现minPartitions等于defaultMinPartitions
/* minPartitions: Int = defaultMinPartitions (不传时默认为2) defaultMinPartitions = math.min(defaultParallelism, 2) numSplits = minPartitions */回到断点处===============
/*222222222222222222*/ // ②计算 goalsize(期望每片大小) long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits); // 默认为1,调节 org.apache.hadoop.mapreduce.lib.input. FileInputFormat.SPLIT_MINSIZE, 改变minSize我们知道numSplits默认为2,就不是0,就走后边,
接着有统计一个minSize值,minSize怎么算呢,如下
/*3333333333333333333*/ long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input. FileInputFormat.SPLIT_MINSIZE, 1), minSplitSize); //minSplitSize默认为1接着new了一个数组,数组里放的是每一个切片,接着for循环,取出每一个文件,如下:切片以文件为单位切
/*44444444444444444444444444444444*/ // generate splits ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits); NetworkTopology clusterMap = new NetworkTopology(); // 切片以文件为单位切片 for (FileStatus file: files) { Path path = file.getPath(); long length = file.getLen();如果文件不为空,源码如下:
判断文件长度是否为0,不为0,判断嫩肤不能切,能切再计算块大小
/*555555555555555555555555555*/ if (length != 0) { FileSystem fs = path.getFileSystem(job); BlockLocation[] blkLocations; if (file instanceof LocatedFileStatus) { blkLocations = ((LocatedFileStatus) file).getBlockLocations(); } else { blkLocations = fs.getFileBlockLocations(file, 0, length); } if (isSplitable(fs, path)) { // 获取文件的块大小,块大小在上传文件时,指定,如果不指定,默认 128M long blockSize = file.getBlockSize(); // 计算片大小 一般等于 blockSize() long splitSize = computeSplitSize(goalSize, minSize, blockSize); /* computeSplitSize方法如下: protected long computeSplitSize(long goalSize, long minSize, long blockSize) { // 在处理大数据时,一般情况下,blockSize作为片大小 //那么goalSize受什么影响呢,numSplits和totalSize, //同时numSplits又受minPartitions影响 return Math.max(minSize, Math.min(goalSize, blockSize)); } */ long bytesRemaining = length; // 循环切片 while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length-bytesRemaining, splitSize, clusterMap); splits.add(makeSplit(path, length-bytesRemaining, splitSize, splitHosts[0], splitHosts[1])); bytesRemaining -= splitSize; } // 剩余部分 <=片大小1.1倍,整体作为1片 if (bytesRemaining != 0) { String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length - bytesRemaining, bytesRemaining, clusterMap); splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining, splitHosts[0], splitHosts[1])); } } else { //如果文件不可切,整个文件作为1片 // 不是压缩格式,都可切,如果是压缩格式,继续判断是否是可切的压缩格式(bzip2,lzo) // 看文件的后缀,如果文件后缀是 .deflate, .gzip , .snappy都不可切 // 否则都可切 String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap); splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1])); } }这里看一下isSplitable方法:
protected boolean isSplitable(FileSystem fs, Path filename) { return true; } protected boolean isSplitable(FileSystem fs, Path file) { final CompressionCodec codec = compressionCodecs.getCodec(file); if (null == codec) { return true; } return codec instanceof SplittableCompressionCodec; }看getCodec方法
/** * Find the relevant compression codec for the given file based on its * filename suffix. * @param file the filename to check * @return the codec object * 根据文件名后缀找到给定文件的相关压缩编解码器。 * @param文件要检查的文件名 * @返回编解码器对象* / */ public CompressionCodec getCodec(Path file) { CompressionCodec result = null; if (codecs != null) { String filename = file.getName(); String reversedFilename = new StringBuilder(filename).reverse().toString(); SortedMap<String, CompressionCodec> subMap = codecs.headMap(reversedFilename); if (!subMap.isEmpty()) { String potentialSuffix = subMap.lastKey(); if (reversedFilename.startsWith(potentialSuffix)) { result = codecs.get(potentialSuffix); } } } return result; }回去,所以不是压缩格式都能切
如果文件长度为空,源码如下:
/*6666666666666666666666666*/ else { // 文件长度为0,创建一个空切片 //Create empty hosts array for zero length files //这个splits就是上面new的数组 splits.add(makeSplit(path, 0, length, new String[0])); /* 这一块makeSplit(path, 0, length, new String[0])就相当于new一个FileSplit对象 */ }怎么new一个FileSplit对象呢?点击makeSplit,进入该方法,在FileInputFormat文件下
/** * A factory that makes the split for this class. It can be overridden * by sub-classes to make sub-types */ protected FileSplit makeSplit(Path file, long start, long length, String[] hosts) { return new FileSplit(file, start, length, hosts); }说明文件为空也是切一片
/*7777777777777777777777777777777*/ sw.stop(); if (LOG.isDebugEnabled()) { LOG.debug("Total # of splits generated by getSplits: " + splits.size() + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS)); } return splits.toArray(new FileSplit[splits.size()]); } long splitSize = computeSplitSize(goalSize, minSize, blockSize); // 在处理大数据时,一般情况下,blockSize作为片大小 return Math.max(minSize, Math.min(goalSize, blockSize));