SparkCore——中篇(转换算子)

it2025-01-12  7

文章目录

接上章节 六、核心编程------RDD 【编程模型之一】6.4 RDD的转换算子分类6.4.1 单个RDD可以调用的算子6.4.1.1 map6.4.1.2 mapPartitions6.4.1.3 mapPartitionsWithIndex6.4.1.4 flatMap扁平化6.4.1.5 glom6.4.1.6 groupby分组6.4.1.7 shuffle(很重要!)6.4.1.8 defaultPartitioner(this)(很重要,非常重要)一个分区器中有分区数,和分区方法6.4.1.9 sample抽样6.4.1.10 distinct(去重)6.4.1.11 Partitioner6.4.1.12 依赖关系6.4.1.13 coalesce(扩大或减少分区)6.4.1.14 repartition(重新分区)6.4.1.15 ClassTag6.4.1.16 filter过滤6.4.1.17 sortBy6.4.1.18 pipe 6.4.2 双Value类型6.4.2.1 intersection(交)会产生shuffle!最终交集后RDD的分区数取决于上游RDD最大的分区数6.4.2.2 union(并)没shuffle6.4.2.3 substract(差)产生shuffle!使用的是前面RDD的分区器和分区数作为最终RDD的分区数和分区器!6.4.2.4 cartesian(笛卡尔积)不会产生shuffle! 运算后的RDD的分区数=所有上游RDD分区数的乘积。6.4.2.5 zip(拉链)两个RDD的分区数和分区中元素的个数必须相同!6.4.2.6 zipWithIndex(只需要一个集合)6.4.2.7 zipPartitions要求两个RDD的分区数必须是一样的!,拉链的规则可以自己定义 6.4.3 key-value类型6.4.3.1 reduceByKey 有shuffle相同key的values合并6.4.3.2 groupByKey 有shuffle直接按照K对 V进行分组!6.4.3.3 aggregateByKey 通过K将V转化成U进行聚合6.4.3.4 foldByKey aggregateByKey的简化版!aggrregateByKey和foldByKey两都是柯里化的方法,有多个参数列表6.4.3.5 combineByKey 也是基于K进行合并6.4.3.6 4个算子的区别6.4.3.7 partitionBy 有shuffle6.4.3.8 自定义Partitioner,自定义类,继承Partitioner类!实现其中的getPartition()6.4.3.9 mapValues 没有shuffle 对V进行操作常用算子6.4.3.10 SortByKey 默认是 RangePartitioner,有shuffle!根据key进行排序!6.4.3.11 连接 全部shuffle逻辑练习:6.4.3.12 cogroup

接上章节

问:Partitione什么时候调用?跟getPartitions有什么联系?

答:

1.集合转换成RDD时,RDD是有分区的集合,RDD会调用getPartitions进行分区 2.当RDD调用了有shuffle的算子时,因为会重新分区,就需要用到Partitione(分区器),调用该分区器的getpartiitons方法分区。

六、核心编程------RDD 【编程模型之一】

6.4 RDD的转换算子分类

一个RDD就可以调用的转换算子!

K-V类型的RDD 可以调用的转换算子!

两个RDD才能调用的算子!

6.4.1 单个RDD可以调用的算子

6.4.1.1 map

def map[U: ClassTag](f: T => U): RDD[U] = withScope { // 当f函数存在闭包时,将闭包进行清理,确保使用的闭包变量可以被序列化,才能发给task val cleanF = sc.clean(f) // iter 使用scala集合中的迭代器,调用 map方法,迭代集合中的元素,每个元素都调用 f new MapPartitionsRDD[U, T](this, (_, _, iter) => iter.map(cleanF)) }

MapPartitionsRDD

map算子,不会改变之前RDD的分区数,也不会改变元素的分区!

map的特点是1对1

6.4.1.2 mapPartitions

源码:

以分区为单位,每个分区都调用一次用户自定义的函数,源码如下:

def mapPartitions[U: ClassTag]( f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = withScope { val cleanedF = sc.clean(f) new MapPartitionsRDD( this, (_: TaskContext, _: Int, iter: Iterator[T]) => cleanedF(iter), preservesPartitioning) }

走完withScope走sc.clean

private[spark] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = { ClosureCleaner.clean(f, checkSerializable) f }

此方法走了ClosureCleaner,这玩意叫闭包清理者,看函数是否存在闭包,存在的话,需要保证闭包变量是否能够序列化

一直clean,clean会出现下面代码

if (checkSerializable) { ensureSerializable(func) } } private def ensureSerializable(func: AnyRef): Unit = { try { if (SparkEnv.get != null) { SparkEnv.get.closureSerializer.newInstance().serialize(func) } } catch { case ex: Exception => throw new SparkException("Task not serializable", ex) } }

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-WkYFjvMO-1603275423493)(Spark0621(2.0).assets/image-20200924191503808.png)]

以上写法有可能汇报一个错:org.apache.spark.SparkException: Task not serializable,

原因是:在mapPartitions算子中自定义的函数,产生了闭包,但是闭包用到的变量无法被序列化!

由源码得知正确的该这么写:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-APhQfbDa-1603275423498)(Spark0621(2.0).assets/image-20200924191539508.png)]

闭包:一个函数方法中的内容,用到了另一个,方法栈的变量

MapPartitionsRDD //分区整体调用一次函数 mapPartitions: (_: TaskContext, _: Int, iter: Iterator[T]) => cleanedF(iter) // 分区中的每一个元素,调用一次f(函数) map: (this, (_, _, iter) => iter.map(cleanF)

区别:

/* mapPartitions 和 map的区别 fun 是用户定义的函数 mapPartitions: fun(iter) 将集合整体作为参数,调用fun,有几个 iter(分区集合),调用几次 fun fun调用的次数:分区的个数 批处理! 场景: 将RDD中的数据写入数据库,此时使用mapPartitions就高效! 有几个分区,就创建几次连接,更高效! map: iter.map(fun) 将分区中的每个元素,都调用一次fun , fun调用的次数:元素的个数 */

①mapPartitions(批处理) 和 map(1对1)

②某些场景下,只能使用mapPartitions

​ 将一个分区的数据写入到数据库中!

③ map(1对1) ,只能对每个元素都进行map处理

​ mapPartitions(批处理),可以对一个分区的数据进行任何类型的处理,例如filter等其他算子都行!

​ 返回的记录可以和之前输入的记录个数不同!

一个细节上的差异:

@Test def testMapDiffMapPartitions() : Unit ={ val list = List(1, 2, 3, 4) // 4 val rdd: RDD[Int] = sparkContext.makeRDD(list, 2) // map 是1对1 处理, 输入 N个,输出也是N个 val rdd1: RDD[Int] = rdd.map(x => x + 1) // 输入N个,输出可以根据逻辑灵活调整 val rdd2: RDD[Nothing] = rdd.mapPartitions(iter => Nil.iterator) println(rdd1.count()) //4 // 统计RDD中元素的个数 println(rdd2.count()) // 0 } 1.map 是11 处理, 输入 N个,输出也是N个 2.输入N个,输出可以根据逻辑灵活调整

6.4.1.3 mapPartitionsWithIndex

mapPartitions 为每个分区的数据提供了一个对应分区的索引号。

//mapPartitionsWithIndex // 在使用MapPartitions,可以额外使用到分区的索引 //小功能:获取每个数据分区的最大值,输出时同时输出索引号 (分区索引号,最大值) @Test def testMapPartitionsWithIndex() : Unit ={ val list = List(1, 2, 3, 4) val rdd: RDD[Int] = sparkContext.makeRDD(list, 2) rdd.mapPartitionsWithIndex{ /* index: 分区的索引号 iter: 分区的迭代器 iter.max: 使用迭代器求集合的最大值 */ case (index , iter) => List((index,iter.max)).iterator }.saveAsTextFile("output") }

6.4.1.4 flatMap扁平化

扁平化

扁平化之前,保证要扁平化的集合中的元素的嵌套层数一致!

测试:

/* flatMap: 扁平化之前,保证要扁平化的集合中的元素的嵌套层数一致! 小功能:将List(List(1,2),3,List(4,5))进行扁平化操作 */ @Test def testFlatMap() : Unit ={ val rdd: RDD[Any] = sparkContext.makeRDD(List(List(1, 2), 3, List(4, 5)), 2) rdd.flatMap{ case x:List[_] => x case x:Int => List[Int](x) }.saveAsTextFile("output") }

6.4.1.5 glom

glom将一个分区的数据合并到一个 Array中,返回一个新的RDD

def glom(): RDD[Array[T]] = withScope { new MapPartitionsRDD[Array[T], T](this, (_, _, iter) => Iterator(iter.toArray)) }

测试:

/* glom: 将每个分区的所有元素,封装到一个Array中,再将Array放入RDD */ @Test def testGlom() : Unit ={ val list = List(1, 2, 3, 4) val rdd: RDD[Int] = sparkContext.makeRDD(list, 2) rdd.glom(). map(array => array.mkString(",")) .saveAsTextFile("output") } //小功能:计算所有分区最大值求和(分区内取最大值,分区间最大值求和) @Test def testGlomExec() : Unit ={ val list = List(1, 2, 3, 4) val rdd: RDD[Int] = sparkContext.makeRDD(list, 2) println(rdd.glom(). map(array => array.max) .sum()) }

6.4.1.6 groupby分组

源码:

def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = withScope { groupBy[K](f, defaultPartitioner(this)) } /* 将T类型,转成K类型,分完组之后,将K类型,装成K,再把K对应的T类型做成一个集合 groupby : 分组 T : 1,2,3,4 f(T=>K) : a,c,c,a 分组: RDD( (a,{1,4}) , (c,{2,3}) ) CompactBuffer: ArrayBuffer,比ArrayBuffer高效! */

测试

只要一个算子的参数列表中,可以传入numPartitions 或 Partitioner,算子一定会产生shuffle!(重新分区一定会产生shuffle) 将List("Hello", "hive", "hbase", "Hadoop")根据单词首写字母进行分组 小功能:从服务器日志数据apache.log中获取每个时间段(不考虑日期)访问量。 小功能:WordCount */ @Test 将RDD中的元素通过一个函数进行转换,将转换后的类型作为KEY,进行分组! def testGroupBy() : Unit ={ val list = List(1, 2, 3, 4) val rdd: RDD[Int] = sparkContext.makeRDD(list, 2) val rdd1: RDD[(Boolean, Iterable[Int])] = rdd.groupBy(x => x > 3) rdd1.saveAsTextFile("output") } // 将List("Hello", "hive", "hbase", "Hadoop")根据单词首写字母进行分组 // 只要一个算子的参数列表中,可以传入numPartitions 或 Partitioner,算子一定会产生shuffle!(重新分区一定会产生shuffle) // 什么是shuffle? //https://spark.apache.org/docs/latest/rdd-programming-guide.html#shuffle-operations @Test def testGroupByExec1() : Unit ={ val rdd: RDD[String] = sparkContext.makeRDD(List("Hello", "hive", "hbase", "Hadoop"), 2) rdd.groupBy(word => word(0)).saveAsTextFile("output") } //小功能:从服务器日志数据apache.log中获取每个时间(小时)段(不考虑日期)访问量。 @Test def testGroupByExec2() : Unit ={ val rdd: RDD[String] = sparkContext.textFile("input/apache.log") // (时间段,同一时间段的数据集合) val rdd1: RDD[(String, Iterable[String])] = rdd.groupBy(line => line.split(":")(1)) rdd1.map{ case (time,iter) => (time,iter.size) }.saveAsTextFile("output") }

6.4.1.7 shuffle(很重要!)

shuffle:

在Hadoop的MR中,shuffle意思为混洗,目的是为了在MapTask和ReduceTask传递数据! 在传递数据时,会对数据进行分区,排序等操作! 当Job有reduce阶段时,才会有shuffle! shuffle最典型的特征:就是会跨Executor,跨机器(machines)传输数据,

以reducebykey举例子解释shuffle:

读之前RDD的所有分区来找到所有key,他们相同key对应的values,之后呢,将相同key的values跨分区的拷贝到一起,然后计算每个key的最终结果,这样一个过程称之为shuffle(all-to-all的过程,就是找所有分区的所有value,然后将所有value开分区拷贝到一起,就叫shuffle)

Spark总shuffle的概念:

​ shuffle是Spark的一种用于重新分配数据的机制,为了方便跨分区对数据进行不同的分组。

Spark中的shuffle:

1.spark中只有特定的算子会触发shuffle,shuffle会在不同的分区间重新分配数据! 2.如果出现了shuffle,会造成需要跨机器和executor传输数据,这样会到底低效和额外的资源消耗!

和Hadoop的shuffle不同的时,数据分到哪些区是确定的,但是在区内的顺序不一定有序! --Hadoop 的shuffle : MapTask : map------- sort -------- merge ReduceTask: copy ---- sort ----- reduce shuffle : sort ------ merge ----- copy ---- sort --Spark的shuffle: 第一种表现: MapTask端也不排序,ReduceTask一定不排序! 第二种表现: MapTask端可以排序,ReduceTask一定不排序!

问:为什么spark的reduceTask端可以不排序,haoop的reduceTask端一定要排序?(hadoop的shuffle和hadoop的shuffle区别)

hadoop的rduceTask端如果不排序,后面reduce就运行不了 spark的就可以。 --为什么呢? --因为reduce是一次取一组。两种框架做合并的方式不一样,hadoop是用迭代的方式,通过key进行比较,key相同的作为一组进行迭代来计算,spark就不一样,他的reduceTask的时候,相同key的一定会放到同一个分区,放到同一个分区后会重新组织,把相同key的value组织到一起

如果希望shuffle后的数据有序,可以以下操作:

a) 调用mapPartitions,对每个分区的数据,进行手动排序! b)repartitionAndSortWithinPartitions(重新分区,并且每个分区重新排序) c)sortBy(会创建一个全局有序的RDD)

③什么操作会导致shuffle

a)重新分区的算子 : reparition, coalese b) xxxBykey类型的算子,除了 count(统计)ByKey c) join类型的算子,例如[`cogroup`](http://spark.apache.org/docs/latest/rdd-programming-guide.html#CogroupLink) and [`join`](http://spark.apache.org/docs/latest/rdd-programming-guide.html#JoinLink).

1.在Spark中,shuffle会带来性能消耗,主要涉及 磁盘IO,网络IO,对象的序列化和反序列化! 2.在Spark中,基于MR中提出的MapTask和ReduceTask概念, spark也将shuffle中组织数据的task称为maptask, 将聚合数据的task称为reduceTask! maptask和spark的map算子无关,reducetask和reduce算子无关! (不是说maptask是执行map的task) 3.Spark的shuffle,mapTask将所有的数据先缓存到内存(堆内存,不溢写就不排序,排序时溢写前的操作),如果内存不足,参考Hadoop2.0 sort-based是shuffle的形式,将数据排序后,基于分区,溢写到磁盘!每个MapTask会生成一个结果文件,这个结果文件中,有多个分区! --**Hadoop 2.0的shuffle形式,称为 sort-based类型的shuffle!** 4.ReduceTask读取相关分区的数据块,再进行ByKey操作! mapTask端在组织数据时,如果内存不够,导致磁盘溢写,触发GC! 5.Spark的shuffle同时会在磁盘上产生大量的溢写的临时文件,这个临时文件会一直保留,直到后续的RDD完全不需要使用!此举是为了避免在数据容错时,重新计算RDD,重新产生shuffle文件! (容错机制,会将前一次shuffle产生的数据保留) 6.长时间运行的Spark的Job,如果在shuffle中产生大量的临时的磁盘文件,会占用大量的本地磁盘空间,可以通过spark.local.dir设置本地数据保存的目录!(如果不可避免的产生shuffle后也能够优化,怎么优化呢?调整参数,具体看Shuffle Behavior)

总结:

Spark的shuffle和Hadoop的shuffle目的都是为了 在不同的task交换数据!

Spark的shuffle借鉴了hadoop的shuffle,但是在细节上略有不同 hadoop的shuffle: 在到达ReduceTask端时,会进行排序! 在Spark中,数据在ReduceTask端一定不排序,在MapTask端,可以根据设置进行排序或不排!

shuffle会消耗性能,因此能避免就避免,避免不了,采取一些优化的策略!

​ https://spark.apache.org/docs/latest/configuration.html

6.4.1.8 defaultPartitioner(this)(很重要,非常重要)

一个分区器中有分区数,和分区方法

/** * Choose a partitioner to use for a cogroup-like operation between a number of RDDs. * * If spark.default.parallelism is set, we'll use the value of SparkContext defaultParallelism * as the default partitions number, otherwise we'll use the max number of upstream partitions. * * When available, we choose the partitioner from rdds with maximum number of partitions. If this * partitioner is eligible (number of partitions within an order of maximum number of partitions * in rdds), or has partition number higher than or equal to default partitions number - we use * this partitioner. * * Otherwise, we'll use a new HashPartitioner with the default partitions number. * * Unless spark.default.parallelism is set, the number of partitions will be the same as the * number of partitions in the largest upstream RDD, as this should be least likely to cause * out-of-memory errors. * * We use two method parameters (rdd, others) to enforce callers passing at least 1 RDD. */ 译文: / ** 选择一个分区,以用于多个RDD之间的类似cogroup的操作。 //【确定分几个区,就是分区数】 如果设置了spark.default.parallelism(默认分区数),我们将使用SparkContext defaultParallelism 的值作为默认分区号,否则,我们将使用上游分区的最大数量。 //【然后需要new 一个分区器,new 一个分区器的时候要传一个分区数,然后需要传一个类型的分区器】 如果可用(上游RDD中都有分区器),我们从rdds中选择具有最大分区数的分区器。 //【同时你new的分区器传的分区数,要大于你的默认并行度才合格】 如果此分区程序是合格的(在rdds中,分区数量在最大分区数的顺序内), 或者 分区号大于或等于默认分区号 -我们使用此分区器。 否则,//RDD分区数 < 默认并行度 //【意思就是RDD的分区数比你默认并行度的分区数小】 我们将使用具有默认分区号的新HashPartitioner。 //【意思是这时候我们new 的分区器的分区数就是使用的默认并行度的分区数】 除非设置了spark.default.parallelism,否则分区数将与最大上游RDD中的分区数相同,因为这最有可能导致内存不足错误。 我们使用两个方法参数(rdd,其他)来强制调用者传递至少1个RDD。 def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = { // 将所有的RDD放入一个Seq中 val rdds = (Seq(rdd) ++ others) // 过滤出 分区数>0的 有分区器 的RDD val hasPartitioner = rdds.filter(_.partitioner.exists(_.numPartitions > 0)) // 如果 存在符合上述条件的RDD val hasMaxPartitioner: Option[RDD[_]] = if (hasPartitioner.nonEmpty) { // 取出 分区数最大的那个RDD Some(hasPartitioner.maxBy(_.partitions.length)) } else { None } // 默认分区数 val defaultNumPartitions = if //如果 设置了默认并行度,选择默认并行度作为分区数 (rdd.context.conf.contains("spark.default.parallelism")) { rdd.context.defaultParallelism } else { //没有设置默认并行度,取所有上游RDD中最大分区数作为默认分区数 rdds.map(_.partitions.length).max } // If the existing max partitioner is an eligible one, or its partitions number is larger // than or equal to the default number of partitions, use the existing partitioner. // 如果分区数最大的RDD存在 且 此RDD的分区器 合格 if (hasMaxPartitioner.nonEmpty && (isEligiblePartitioner(hasMaxPartitioner.get, rdds) || // 或 此RDD的分区数 >= 默认分区数 defaultNumPartitions <= hasMaxPartitioner.get.getNumPartitions)) { // 上游分区数最大 RDD的分区器 hasMaxPartitioner.get.partitioner.get } else { // Hash分区器,使用默认分区数 new HashPartitioner(defaultNumPartitions) } } private def isEligiblePartitioner( hasMaxPartitioner: RDD[_], rdds: Seq[RDD[_]]): Boolean = { val maxPartitions = rdds.map(_.partitions.length).max log10(maxPartitions) - log10(hasMaxPartitioner.getNumPartitions) < 1 }

defaultPartitioner: 为类似cogroup类型的算子,提供一个默认分区器!

如何确认默认分区数:

​ 如果设置了spark.default.parallelism,那么默认分区数=默认并行度!

​ 如果没有设置,取上游RDD最大分区数!

​ 默认分区数,不一定就是默认分区器的分区数!

如何确认分区器:

​ 从上游 RDD中取有分区器且分区器的分区数最大的那个RDD 的分区器

【就是取有分区器,且分区数最大的RDD的分区器】

​ a) 有 =>

--【如果有这样的分区器,接下就判断能不能用,怎么判断呢?】 (从分区数,和分区器合不合格判断)

​ (如果 此分区器的 分区数 >= 默认分区数 ,

--条件一: 【看看这个分区器的分区数是不是大于默认分区数】 (默认分区数,如果设置了默认并行度,就是默认并行度,如果没有设置,就是这个RDD的分区数) --条件二: 【或者,看看这个分区器是否合格】 (怎样才是合格呢?就是上游最大的分区数比这个分区器的分区数小)

​ 或

​ 此分区器合格)。

-- 【以上条件满足一条就可以】

​ 就使用此分区器!

​ 合格的判断条件:

log10(上游最大分区数) - log10(此分区器的分区数) < 1

​ b) 其他情况 =>

--那么哪些其他情况呢?

​ new HashPartitioner(默认分区数 )

​ 其他情况: 不存在有分区器的RDD

--①【就是上游RDD中分区数最大的那个RDD没有分区器】

​ 有分区器RDD的分区器不合格 且 有分区器的RDD的分区数 < 默认分区数

--②【就是上游有分区器,且分区数最大的那个RDD不合格】 (不合格意思就是,该RDD的分区数小于默认分区数)

6.4.1.9 sample抽样

sample : 抽样。 当Job发生了数据倾斜时,希望通过抽样了解当前数据中,是否有一些 可能造成倾斜的数据!

源码:

/** * Return a sampled subset of this RDD. * * @param withReplacement can elements be sampled multiple times (replaced when sampled out) * @param fraction expected size of the sample as a fraction of this RDD's size * without replacement: probability that each element is chosen; fraction must be [0, 1] * with replacement: expected number of times each element is chosen; fraction must be greater * than or equal to 0 * @param seed seed for the random number generator * * @note This is NOT guaranteed to provide exactly the fraction of the count * of the given [[RDD]]. */ /* *返回此RDD的采样子集。 * @param withReplacement可以对元素进行多次采样(采样后将被替换) * @param分数期望的样本大小,是该RDD大小的一部分 *不需替换:选择每个元素的可能性;分数必须为[0,1] *并带有替换:选择每个元素的预期次数;分数必须大于或等于0 *随机数生成器的@param种子种子 * @note这不能保证精确提供给定[[RDD]]的计数*的分数。 */ def sample( withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T] = { require(fraction >= 0, s"Fraction must be nonnegative, but got ${fraction}") withScope { require(fraction >= 0.0, "Negative fraction value: " + fraction) if (withReplacement) { new PartitionwiseSampledRDD[T, T](this, new PoissonSampler[T](fraction), true, seed) } else { new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](fraction), true, seed) } } }

总结:

/* withReplacement: Boolean, 是否允许重复抽取元素 true: 允许重复抽 false: 不允许重复抽 fraction: Double 期望抽样的数据的个数 / 数据集总个数 withReplacement= true fraction >=0 withReplacement=false 0<=fraction<=1 seed: Long = Utils.random.nextLong 控制每次抽样生成的结果是否一致,seed一致,每次抽的结果都一样! */ def sample( withReplacement: Boolean, fraction: Double, seed: Long = Utils.random.nextLong): RDD[T]

测试:

/* withReplacement: Boolean, 是否允许重复抽取元素 true: 允许重复抽 false: 不允许重复抽 fraction: Double 期望抽样的数据的个数 / 数据集总个数 withReplacement= true fraction >=0 withReplacement=false 0<=fraction<=1 seed: Long = Utils.random.nextLong 控制每次抽样生成的结果是否一致,seed一致,每次抽的结果都一样! */ @Test def testSample() : Unit ={ val list = List(1, 2, 3, 4,5,6,7,8,9) val rdd: RDD[Int] = sparkContext.makeRDD(list, 2) /* println(rdd.sample(true, 2, 1).collect().mkString(",")) println(rdd.sample(true, 2, 1).collect().mkString(","))*/ println(rdd.sample(true, 2).collect().mkString(",")) println(rdd.sample(true, 2).collect().mkString(",")) }

6.4.1.10 distinct(去重)

问?distinct是否一定有shuffle

// 如果当前RDD有分区器,且分区的数量和 distinct(numPartions)相同,就不会产生shuffle! /* 什么样的RDD有分区器: ①K-V类型的RDD会有分区器 ②执行了类似 corgrop这种操作时,下游RDD会自动生成分区器 如果RDD已经有了分区器,那么代表RDD已经使用所带的分区器对数据进行了分区! key相同的已经在一个区内,不需要再分组,没有shuffle! 去重为什么会有shuffle: 去重是利用分区去重,需要先将key相同的数据,分到一组! 分组时产生了shuffle! */ case Some(_) if numPartitions == partitions.length => // 返回的是MapPartitionsRDD,没有shuffle mapPartitions(removeDuplicatesInPartition, preservesPartitioning = true) // 有shuffle case _ => map(x => (x, null)).reduceByKey((x, _) => x, numPartitions).map(_._1)

去重!有可能会产生shuffle,也有可能没有shuffle!

有shuffle,原理基于reduceByKey进行去重!

可以使用groupBy去重,之后只取分组后的key部分!

测试1: /* distinct: 会产生shuffle 如果不用该算子,你有什么办法实现数据去重? group by */ @Test def testDistinct() : Unit ={ val list = List(1, 2, 3, 4,4,4,4) val rdd: RDD[Int] = sparkContext.makeRDD(list, 2) println(rdd) // rdd2.distinct(4).saveAsTextFile("output") // val keys: RDD[Int] = rdd.groupBy(x => x).keys //keys.saveAsTextFile("output") } 探究源码,回答一个问题 distinct : 是否一定会有shuffle?

进入distinct方法,如下:

/** * Return a new RDD containing the distinct elements in this RDD. */ def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { def removeDuplicatesInPartition(partition: Iterator[T]): Iterator[T] = { // Create an instance of external append only map which ignores values. val map = new ExternalAppendOnlyMap[T, Null, Null]( createCombiner = _ => null, mergeValue = (a, b) => a, mergeCombiners = (a, b) => a) map.insertAll(partition.map(_ -> null)) map.iterator.map(_._1) } partitioner match { case Some(_) if numPartitions == partitions.length => mapPartitions(removeDuplicatesInPartition, preservesPartitioning = true) case _ => map(x => (x, null)).reduceByKey((x, _) => x, numPartitions).map(_._1) } }

由此可知mapPartitions(removeDuplicatesInPartition, preservesPartitioning = true)不会产生shuffle,看看究竟怎么运行的

可见它在一个分区器里模式匹配,我们进入到Partitioner里,代码如下:

/** Optionally overridden by subclasses to specify how they are partitioned. */ @transient val partitioner: Option[Partitioner] = None

可见partitioner是RDD的一个属性默认为none,默认RDD是没有分区的

partitioner类型是Option,代表可有可无,再点击[Partitioner],如下:

6.4.1.11 Partitioner

/** * An object that defines how the elements in a key-value pair RDD are partitioned by key. * Maps each key to a partition ID, from 0 to `numPartitions - 1`. * * Note that, partitioner must be deterministic, i.e. it must return the same partition id given * the same partition key. */ /* *一个对象,它定义键-值对RDD中的元素如何通过键进行分区。 *将每个键映射到一个分区ID,从0到`numPartitions-1`。 *请注意,分区程序必须是确定性的,即,在给定相同的分区键时,它必须返回相同的分区ID。 */ abstract class Partitioner extends Serializable { def numPartitions: Int //数据所在总分区数 def getPartition(key: Any): Int //根据K把元素分到哪个区 } //可见分区器是抽象的,有一个属性,一个是抽象方法

这个抽象提供了三个实现类

spark默认提供两个分区器:

HashPartitioner: 调用元素的hashCode方法,基于hash值进行分区!

RangeParitioner: 局限:针对数据必须是可以排序的类型!

​ 将数据,进行采样,采样(水塘抽样)后确定一个范围(边界),将RDD中的每个元素划分到边界中!

​ 采样产生的边界,会尽可能保证RDD的元素在划分到边界后,尽可能均匀!

RangePartitioner源码:

/** * A [[org.apache.spark.Partitioner]] that partitions sortable records by range into roughly equal ranges. The ranges are determined by sampling the content of the RDD passed in. 【将可排序记录按范围划分为大致相等的范围。范围是通过对传入的RDD的内容进行采样来确定的。解析如下图】 * @note The actual number of partitions created by the RangePartitioner might not be the same * as the `partitions` parameter, in the case where the number of sampled records is less than * the value of `partitions`. 【在采样记录数小于*分区值的情况下,RangePartitioner创建的分区的实际数量可能与* partitions参数不同。】 */ class RangePartitioner[K : Ordering : ClassTag, V]( partitions: Int, rdd: RDD[_ <: Product2[K, V]], private var ascending: Boolean = true, val samplePointsPerPartitionHint: Int = 20) extends Partitioner { // A constructor declared in order to maintain backward compatibility for Java, when we add the // 4th constructor parameter samplePointsPerPartitionHint. See SPARK-22160. // This is added to make sure from a bytecode point of view, there is still a 3-arg ctor. def this(partitions: Int, rdd: RDD[_ <: Product2[K, V]], ascending: Boolean) = { this(partitions, rdd, ascending, samplePointsPerPartitionHint = 20) }

RangeParitioner 对比 HashPartitioner的优势: 一定程序上,可以避免数据倾斜!

以下是HashPartitioner源码:

class HashPartitioner(partitions: Int) extends Partitioner { require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.") // 总的分区数通过构造器指定 def numPartitions: Int = partitions //分区方法 def getPartition(key: Any): Int = key match { // null类型 0号区 case null => 0 // 不管 hashcode % numPartitions 的值为正数或负数 ,结果都控制在 [0,numPartitions) case _ => Utils.nonNegativeMod(key.hashCode, numPartitions) //K的hash值%分区数 } //进入nonNegativeMod /* /* * Calculates 'x' modulo 'mod', takes to consideration sign of x, * i.e. if 'x' is negative, than 'x' % 'mod' is negative too * so function return (x % mod) + mod in that case. * 计算'x'模'mod',考虑x的符号, * 即,如果'x'为负,则'x'%'mod'也为负 * 因此函数return(x%mod)+ mod in这种情况。 def nonNegativeMod(x: Int, mod: Int): Int = { val rawMod = x % mod rawMod + (if (rawMod < 0) mod else 0) } */ /* 当前分区器和其他类型的对象比较时: 如果比较的也是一个分区器,且是HashPartitioner,只比较分区数,分区数一致,就相等,否则就不等! */ override def equals(other: Any): Boolean = other match { //equals判断当前对象是否是和别的相等 case h: HashPartitioner => //如果比较的都是hash分区器(HashPartitioner) h.numPartitions == numPartitions //只比分区数 case _ => false //否则就是false ------见测试2 } override def hashCode: Int = numPartitions } 测试2 @Test def testHashParitioner() : Unit ={ val partitioner1 = new HashPartitioner(3) val partitioner2 = new HashPartitioner(3) val partitioner3 = new HashPartitioner(2) println(partitioner1 == 1) // false println(partitioner1 == partitioner2) // true println(partitioner1 == partitioner3) // false }

下面我们看看diatinct方法里的这段代码到底是啥意思:

case Some(_) if numPartitions == partitions.length => // 返回的是MapPartitionsRDD,没有shuffle mapPartitions(removeDuplicatesInPartition, preservesPartitioning = true) // 有shufflecase _ => map(x => (x, null)).reduceByKey((x, _) => x, numPartitions).map(_._1) /* 有分区器走1,没有走2,当前测试一没有分区器,走2,一定有shuffle 如果不想让他走shuffle,因为分区器是val类型,不能直接给,我们可以知道很多算子里面可以给他传分区器,见测试3 如果有分区器,还要比较numPartitions == partitions.length 是否相等 即ditinct重新分的区数,与分区器当前的长度是否一样 如果distinct不传就默认是partitions.length 【即是当前RDD有几个分区,就是: rdd.distinct( ) 等价于 rdd.distinct( 当前rdd的分区数)】 测试3的reduceByKey,可以设置重载的分区数 那么没有shuffle,有分区器进去后是怎么去重的呢·? */ 测试3 @Test def testDistinct() : Unit ={ val list = List(1, 2, 3, 4,4,4,4) val rdd: RDD[Int] = sparkContext.makeRDD(list, 2) println(rdd.partitioner) //None val rdd1: RDD[(Int, Int)] = rdd.map(x => (x, 1)) println(rdd1.partitioner) //None val rdd2: RDD[(Int, Int)] = rdd1.reduceByKey(_ + _,3) //相当于传一个分区器 println(rdd2.partitioner) //Some(org.apache.spark.HashPartitioner@3) // rdd.distinct( ) 等价于 rdd.distinct( 当前rdd的分区数) rdd2.distinct(4).saveAsTextFile("output") /* 记住KV类型的RDD,可以有分区器也可以没有,需要就有,不需要就没有 */

有分区器没有shuffle是怎么去重的呢?

--在分区里面把重复的分区去掉,因为我有分区器 ----有分区器相同K就一定分到同一个区

结论: 因此,结论: // 如果当前RDD有分区器,且分区的数量和 distinct(numPartions)相同,就不会产生shuffle! /* 什么样的RDD有分区器: ①K-V类型的RDD会有分区器 ②执行了类似 corgrop这种操作时,下游RDD会自动生成分区器 如果RDD已经有了分区器,那么代表RDD已经使用所带的分区器对数据进行了分区! key相同的已经在一个区内,不需要再分组,没有shuffle! 去重为什么会有shuffle: 去重是利用分区去重,需要先将key相同的数据,分到一组! 分组时产生了shuffle! */

去重!有可能会产生shuffle,也有可能没有shuffle!

有shuffle,原理基于reduceByKey进行去重!

可以使用groupBy去重,之后只取分组后的key部分!

6.4.1.12 依赖关系

narrow dependency: 窄依赖!

wide(shuffle) dependency: 宽(shuffle)依赖! 宽依赖会造成shuffle!会造成阶段的划分!

6.4.1.13 coalesce(扩大或减少分区)

测试:

/* coalesce: 默认允许将多的分区合并到少的分区,不会产生shuffle! 用coalesce将少的分区,扩大到多大 分区,必须coalesce(x,true) 我想要扩大分区,怎么办? 扩大分区: reparition,提高并行度 减少分区: coalesce,减少并行度,节省资源! 一般和filter结合使用! */ @Test 4个缩小到2def testCoalesce() : Unit ={ val list = List(1, 2, 3, 4,4,4,4,6) val rdd: RDD[Int] = sparkContext.makeRDD(list, 4) rdd.coalesce(2,true).saveAsTextFile("output") } /* 分区剧减的时候,为了节省资源,可以给他将shuffle设置成true */ @Test 4个扩大到10个 则还是4个个分区 def testCoalesce() : Unit ={ val list = List(1, 2, 3, 4,4,4,4,6) val rdd: RDD[Int] = sparkContext.makeRDD(list, 4) //rdd.coalesce(100,true).saveAsTextFile("output") rdd.coalesce(10,true).saveAsTextFile("output") }

源码注释:

/** * Return a new RDD that is reduced into `numPartitions` partitions. * * This results in a narrow dependency, e.g. if you go from 1000 partitions * to 100 partitions, there will not be a shuffle, instead each of the 100 * new partitions will claim 10 of the current partitions. If a larger number * of partitions is requested, it will stay at the current number of partitions. * * However, if you're doing a drastic coalesce, e.g. to numPartitions = 1, * this may result in your computation taking place on fewer nodes than * you like (e.g. one node in the case of numPartitions = 1). To avoid this, * you can pass shuffle = true. This will add a shuffle step, but means the * current upstream partitions will be executed in parallel (per whatever * the current partitioning is). * * @note With shuffle = true, you can actually coalesce to a larger number * of partitions. This is useful if you have a small number of partitions, * say 100, potentially with a few partitions being abnormally large. Calling * coalesce(1000, shuffle = true) will result in 1000 partitions with the * data distributed using a hash partitioner. The optional partition coalescer * passed in must be serializable. */ 译文: / ** *返回缩小为numPartitions分区的新RDD。 * *这会导致狭窄的依存关系,例如如果您从1000个分区 *扩展到100个分区,则不会进行混洗,而是100*新分区中的每一个将占用当前分区中的10个。如果请求的分区数量更多 *,它将保持在当前分区数量。 * *但是,如果您要进行剧烈的合并,例如到numPartitions = 1*可能导致您的计算在少于 *的节点上进行(例如,在numPartitions = 1的情况下为一个节点)。为了避免这种情况, *您可以传递shuffle = true。这将增加一个随机播放步骤,但是意味着 *当前的上游分区将并行执行(无论当前分区是什么)。 * * @note如果shuffle = true,则实际上可以合并到更大数量的分区。如果您的分区数量很少(例如100),并且可能有几个分区异常大,这将很有用。调用 * coalesce(1000,shuffle = true)将导致1000个分区,其中 *数据是使用哈希分区程序分配的。传入的可选分区合并器 *必须可序列化。 * /

coalesce:

def coalesce(numPartitions: Int, shuffle: Boolean = false, partitionCoalescer: Option[PartitionCoalescer] = Option.empty) (implicit ord: Ordering[T] = null) : RDD[T]

coalesce: 可以将一个RDD重新划分到若干个分区中!是一个重新分区的算子!

​ 默认coalesce只会产生窄依赖!默认只支持将多的分区合并到少的分区!

​ 如果将少的分区,扩充到多的分区,此时,coalesce产生的新的分区器依旧为old的分区数,不会变化!

​ 如果需要将少的分区,合并到多的分区,可以传入 shuffle=true,此时会产生shuffle!

6.4.1.14 repartition(重新分区)

repartition: 重新分区的算子!一定有shuffle!

​ 如果是将多的分区,核减到少的分区,建议使用collase,否则使用repartition

repartition:

/ * * If you are decreasing the number of partitions in this RDD, consider using `coalesce`, * which can avoid performing a shuffle. 译文 *如果要减少此RDD中的分区数量,请​​考虑使用`coalesce`,*可以避免执行随机播放。 * / */ def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope { coalesce(numPartitions, shuffle = true) }

6.4.1.15 ClassTag

(用来标记,在编译时被擦除的泛型类型,反射用的)

泛型为了在编译时,检查方法传入的参数类型是否复合指定的泛型(类型检查)

泛型在编译后,泛型会被擦除。 如果在Java代码中,泛型会统一使用Object代替,如果scala会使用Any代替!

在运行时,不知道泛型的类型!

针对Array类型,实例化一个数组时,必须知道当前的数组是个什么类型的数组!

java: String []

scala: Array[String]

Array和泛型共同组成数组的类型! 在使用一些反射框架实例化一个数组对象时,必须指定数组中的泛型!

需要ClassTag,来标记Array类型在擦除前,其中的泛型类型!

6.4.1.16 filter过滤

def filter(f: T => Boolean): RDD[T] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDD[T, T]( this, (_, _, iter) => iter.filter(cleanF), preservesPartitioning = true) }

将RDD中的元素通过传入的函数进行计算,返回true的元素可以保留!

测试:

/* filter 小功能:从服务器日志数据apache.log中获取2015年5月17日的请求路径 */ @Test def testFilter() : Unit ={ val rdd: RDD[String] = sparkContext.textFile("input/apache.log") rdd .filter(line => line.contains("17/05/2015")) .map(line => line.split(" ")(6)) .saveAsTextFile("output") }

6.4.1.17 sortBy

源码:可见sortBy是一个柯里化的函数

def sortBy[K]( //第一个参数列表可以自己传参数,第二个参数是隐式的东西,可以不传 f: (T) => K,//传一个函数,将T类型转成K类型 ascending: Boolean = true, //ascending相当于升序,降序将参数设置成false numPartitions: Int = this.partitions.length) //numPartitions一定产生shuffle,代码可见他调用了sortBykey (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope { this.keyBy[K](f) .sortByKey(ascending, numPartitions) .values }

测试:

@Test def testSortBy() : Unit ={ val list = List(11, 22, 3, 4,4,4,4,6) val rdd: RDD[Int] = sparkContext.makeRDD(list, 2) rdd.sortBy(x => x,false,numPartitions=1).saveAsTextFile("output") rdd.sortBy(x => x,false,numPartitions=1)(Ordering.Int,ClassTag(classOf[Int])) .saveAsTextFile("output") }

问:自定义的类型怎么排序呢?

测试:

样例类:

case class Person(name:String,age:Int) extends Ordered[Person] { // 实现排序的逻辑 按照名称降序排,名称相同的,按照年龄升序排 override def compare(that: Person): Int = { // 按照名称降序排 var result: Int = -this.name.compareTo(that.name) if (result == 0){ //名称相同的,按照年龄升序排 result = this.age.compareTo(that.age) } result } } case class Person1(name:String,age:Int) /* 排序需要有比较器,常见 的 AnyVal类型和String类型,Tuple,Scala默认提供了隐式的比较器,不需要提供! 自定义的类型,需要提供排序的比较器! 解决: ①要排什么样的数据,就把数据封装为Tuple ②让Person extends Comparable(java) Comparable : 实现了此接口的类是一个可以排序的类 Scala中 Person extends Ordered, 实现compare方法! ③提供一个Comparator[Person] 提供一个比较器,可以比较Person类型的数据 Scala : Ordering 更灵活,非侵入! Person 很清白,不需要实现任何接口(特质) */ @Test def testSortByPerson() : Unit ={ val list = List(Person("jack1", 21), Person("marry", 30), Person("tom", 20), Person("jack1", 20)) val rdd: RDD[Person] = sparkContext.makeRDD(list, 2) // 按照名称升序排,名称相同的,按照年龄升序排 //rdd.sortBy(p => (p.name,p.age)).coalesce(1).saveAsTextFile("output") // 按照名称降序排,名称相同的,按照年龄降序排 //rdd.sortBy(p => (p.name,p.age),false).coalesce(1).saveAsTextFile("output") //按照名称升序排,名称相同的,按照年龄降序排 /* rdd.sortBy(p => (p.name,p.age)) ( Ordering.Tuple2[String,Int] (Ordering.String,Ordering.Int.reverse), ClassTag(classOf[Tuple2[String,Int]]) ) .coalesce(1).saveAsTextFile("output") */ //按照名称降序排,名称相同的,按照年龄升序排 /* rdd.sortBy(p => (p.name,p.age),false) ( Ordering.Tuple2[String,Int] (Ordering.String,Ordering.Int.reverse), ClassTag(classOf[Tuple2[String,Int]]) ) .coalesce(1).saveAsTextFile("output")*/ rdd.sortBy(p => p).coalesce(1).saveAsTextFile("output") } @Test def testSortBy2() : Unit ={ val list = List(Person1("jack1", 21), Person1("marry", 30), Person1("tom", 20), Person1("jack1", 20)) val rdd: RDD[Person1] = sparkContext.makeRDD(list, 2) // 定义可以排序Person1的比较器 val myOrdering: Ordering[Person1] = new Ordering[Person1]() { // 比较的逻辑 override def compare(x: Person1, y: Person1): Int = { // 按照名称降序排 var result: Int = -x.name.compareTo(y.name) if (result == 0) { //名称相同的,按照年龄升序排 result = x.age.compareTo(y.age) } result } } rdd.sortBy(p=>p)(myOrdering,ClassTag(classOf[Person1]) //此段为记住此类的泛型,因为编译后泛型会被擦除) .coalesce(1).saveAsTextFile("output") }

使用函数将RDD中的元素进行转换,之后基于转换的类型进行比较排序,排序后,再返回对应的转换之前的元素!

本质利用了sortByKey,有shuffle!

sortByKey在排序后,将结果进行重新分区时,使用RangePartitioner!

对于自定义类型进行排序:

​ ①将自定义类型实现Ordered接口

​ ②提供对自定义类型排序的Ordering

6.4.1.18 pipe

pipe允许一个shell脚本来处理RDD中的元素!

在shell脚本中,可以使用READ函数读取RDD中的每一个数据,使用echo将处理后的结果输出!

每个分区都会调用一次脚本!

6.4.2 双Value类型

6.4.2.1 intersection(交)

会产生shuffle!最终交集后RDD的分区数取决于上游RDD最大的分区数

测试:

/* Intersection算子 取共有的! 有shuffle! */ @Test def testIntersection() : Unit ={ val list1 = List(1, 2, 3, 4) val list2 = List(5, 2, 4,6) val rdd1: RDD[Int] = sparkContext.makeRDD(list1, 3) println(rdd1.partitioner) val rdd2: RDD[Int] = sparkContext.makeRDD(list2, 2) println(rdd2.partitioner) // val rdd3: RDD[(Int, (Iterable[Null], Iterable[Null]))] = rdd1.map(v => (v, null)).cogroup(rdd2.map(v => (v, null))) val rdd3: RDD[Int] = rdd1.intersection(rdd2)//结果有三个区 //Some(org.apache.spark.HashPartitioner@4) // println(rdd3.partitioner) rdd3.saveAsTextFile("output") }

源码:

Intersection: /** * Return the intersection of this RDD and another one. The output will not contain any duplicate * elements, even if the input RDDs did. * * @note This method performs a shuffle internally. 【在内部产生shuffle】 */ def intersection(other: RDD[T]): RDD[T] = withScope { this.map(v => (v, null)).cogroup(other.map(v => (v, null))) .filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty } .keys } cogroup: /** * For each key k in `this` or `other`, return a resulting RDD that contains a tuple with the * list of values for that key in `this` as well as `other`. */ def cogroup[W](other: RDD[(K, W)])//other是一个RDD : RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope { cogroup(other, defaultPartitioner(self, other))//第一个参数传的分区器,说明此方法用完后会返回一个分区器 } defaultPartitioner:(见6.4.1.8) def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = { val rdds = (Seq(rdd) ++ others) val hasPartitioner = rdds.filter(_.partitioner.exists(_.numPartitions > 0)) val hasMaxPartitioner: Option[RDD[_]] = if (hasPartitioner.nonEmpty) { Some(hasPartitioner.maxBy(_.partitions.length)) } else { None }

6.4.2.2 union(并)

没shuffle

两个RDD取并集(并不是数学上的并集,元素允许重复)。没有shuffle!

最终并集后的RDD的分区数,取决于所有RDD分区数的总和!

union 等价于 ++

测试:

/* union算子: 并集 A :{1,2,3,4,5} B : {2,3,4,5,6} 数学上的并集: {1,2,3,4,5,6} 集合上的并集: {1,2,3,4,5,2,3,4,5,6} 没shuffle union 等价于 ++(Spark唯一一个可以使用符号作为方法名的算子) */ @Test def testUnion() : Unit ={ val list1 = List(1, 2, 3, 4) val list2 = List(5, 2, 4,6) val rdd1: RDD[Int] = sparkContext.makeRDD(list1, 3) val rdd2: RDD[Int] = sparkContext.makeRDD(list2, 2) (rdd1 ++ rdd2).saveAsTextFile("output") //rdd1.union(rdd2).saveAsTextFile("output") //把所有分区放一块,最后有5个分区 Thread.sleep(10000000) }

6.4.2.3 substract(差)

产生shuffle!使用的是前面RDD的分区器和分区数作为最终RDD的分区数和分区器!

两个RDD取差集,产生shuffle!取差集合时,会使用当前RDD的分区器和分区数!

测试:

/* subtract算子: 差集 list1 差集运算 list2: 1,3 list2 差集运算 list1: 5 subtract 调用 subtractByKey,会有shuffle! 使用的是前面RDD的分区器和分区数作为最终RDD的分区数和分区器! */ @Test def testSubtract() : Unit ={ val list1 = List(1, 2, 3, 4) val list2 = List(5, 4, 4,2) val rdd1: RDD[Int] = sparkContext.makeRDD(list1, 3) val rdd2: RDD[Int] = sparkContext.makeRDD(list2, 2) rdd2.subtract(rdd1).saveAsTextFile("output") }

源码:

/** * Return an RDD with the elements from `this` that are not in `other`. * * Uses `this` partitioner/partition size 【 *使用`this`分区/分区大小】, because even if `other` is huge, the resulting * RDD will be &lt;= us. */ def subtract(other: RDD[T]): RDD[T] = withScope { subtract(other, partitioner.getOrElse(new HashPartitioner(partitions.length))) } /** * Return an RDD with the elements from `this` that are not in `other`. */ def subtract( other: RDD[T], p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T] = withScope { if (partitioner == Some(p)) { // Our partitioner knows how to handle T (which, since we have a partitioner, is // really (K, V)) so make a new Partitioner that will de-tuple our fake tuples val p2 = new Partitioner() { override def numPartitions: Int = p.numPartitions override def getPartition(k: Any): Int = p.getPartition(k.asInstanceOf[(Any, _)]._1) } // Unfortunately, since we're making a new p2, we'll get ShuffleDependencies // anyway, and when calling .keys, will not have a partitioner set, even though // the SubtractedRDD will, thanks to p2's de-tupled partitioning, already be // partitioned by the right/real keys (e.g. p). this.map(x => (x, null)).subtractByKey(other.map((_, null)), p2).keys } else { this.map(x => (x, null)).subtractByKey(other.map((_, null)), p).keys //可见有subtractByKey,有ByKey类型的算子 } }

6.4.2.4 cartesian(笛卡尔积)

不会产生shuffle! 运算后的RDD的分区数=所有上游RDD分区数的乘积。

/* cartesian 算子 笛卡尔积 A : m个数据 B : n个数据 A 笛卡尔集 B : m * n 生成的RDD的分区数 = 两个RDD的分区数乘积 有没有shuffle?没有,他只是将分区数拷贝了一份 不需要分区间的数据交换,不需要将一个上游分区的数据,重新分区到多个下游分区! */ @Test def testCartesian() : Unit ={ val list1 = List(1, 2, 3, 4) val list2 = List(5, 4, 4,2) val rdd1: RDD[Int] = sparkContext.makeRDD(list1, 3) val rdd2: RDD[Int] = sparkContext.makeRDD(list2, 2) rdd1.cartesian(rdd2).saveAsTextFile("output") Thread.sleep(10000000) }

6.4.2.5 zip(拉链)

两个RDD的分区数和分区中元素的个数必须相同!

源码:

/** * Zips this RDD with another one, returning key-value pairs with the first element in each RDD, * second element in each RDD, etc. Assumes that the two RDDs have the *same number of * partitions* and the *same number of elements in each partition* (e.g. one was made through * a map on the other). */ / ** *用另一个RDD替换该RDD,并返回键值对以及每个RDD中的第一个元素, *每个RDD中的第二个元素,依此类推。假定两个RDD的*分区数量和每个分区 *中的元素数量相同(例如,一个分区是通过 *另一个映射创建的)。 * / def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)] = withScope { zipPartitions(other, preservesPartitioning = false) { (thisIter, otherIter) => new Iterator[(T, U)] { def hasNext: Boolean = (thisIter.hasNext, otherIter.hasNext) match { case (true, true) => true case (false, false) => false case _ => throw new SparkException("Can only zip RDDs with " + "same number of elements in each partition") } def next(): (T, U) = (thisIter.next(), otherIter.next()) } } }

将两个RDD,相同分区,相同位置的元素进行拉链操作,返回一个(x,y)

要求: 两个RDD的分区数和分区中元素的个数必须相同!

Zip算子: 两个RDD相同分区,相同索引位置的元素进行拉链! 1.如果两个RDD数据类型不一致怎么办? 不怎么办!该怎么办就怎么办!正常办! 2.如果两个RDD数据分区不一致怎么办? 抛异常 java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions: List(3, 2) 3.如果两个RDD分区数据数量不一致怎么办? 抛异常SparkException("Can only zip RDDs with " + "same number of elements in each partition")

6.4.2.6 zipWithIndex(只需要一个集合)

当前RDD的每个元素和对应的index进行拉链,返回(ele,index)

/* zipWithIndex算子 */ @Test def testZipWithIndexInSpark() : Unit ={ val list1 = List(1, 2, 3, 4,5,6) val rdd1: RDD[Int] = sparkContext.makeRDD(list1, 2) rdd1.zipWithIndex().saveAsTextFile("output") } /* Scala集合中的zipWithIndex: 每个元素和自己的索引进行拉链! 拉完: (ele,index) */ @Test def testZipWithIndex() : Unit ={ val list1 = List(1, 2, 3, 4) val list2 = List("a", "b", "c", "d","e") println(list1.zip(list2)) println(list1.zipWithIndex) } /* Scala集合中的ZipAll */ @Test def testZipAll() : Unit ={ val list1 = List(1, 2, 3, 4,5,6,7,8) val list2 = List("a", "b", "c","d","e","f") println(list1.zip(list2)) println(list1.zipAll(list2,100,"z")) }

6.4.2.7 zipPartitions

要求两个RDD的分区数必须是一样的!,拉链的规则可以自己定义

两个RDD进行拉链,在拉链时,可以返回任意类型的元素的迭代器!更加灵活!

/* zipPartitions算子: 更灵活,拉链的规则可以自己定义,只要返回拉完的集合的迭代器即可! 要求两个RDD的分区数必须是一样的! def zipPartitions[B: ClassTag, V: ClassTag] (rdd2: RDD[B]) // 要拉链的RDD (f: (Iterator[T], Iterator[B]) => Iterator[V]): RDD[V] */ @Test def testzipPartitionsInSpark() : Unit ={ val list1 = List(1, 2, 3, 4,5,6,7) val list2 = List("a", "b", "c","d","e","f") val rdd1: RDD[Int] = sparkContext.makeRDD(list1, 2) val rdd2: RDD[String] = sparkContext.makeRDD(list2, 2) rdd1.zipPartitions(rdd2)((iter1,iter2) => iter1.zipAll(iter2,0,"null")) .saveAsTextFile("output") }

6.4.3 key-value类型

key-value类型的算子,必须要求是KV类型需要经过隐式转换将RDD[(k,v)]转换为 PairRDDFunctions,才可以调用以下算子

/* 如何调用PairRDDFunctions中的方法? ParallelCollectionRDD => PairRDDFunctions scala : ①动态混入 ②隐式转换 将 ParallelCollectionRDD 隐式转换为 PairRDDFunctions 有隐式转换函数 哪里找? 在ParallelCollectionRDD.scala中找 去父类或实现的特质的源文件中找 包对象中 调用: implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)]) (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V] 将当前的RDD 转换为 RDD[K-V] */ @Test def testHowToUsePairRDDFunctions() : Unit ={ val list = List(1, 2, 3, 4) // rdd :new ParallelCollectionRDD val rdd: RDD[Int] = sparkContext.makeRDD(list, 2) // RDD[(K,V)] val rdd1: RDD[(Int, Int)] = rdd.map(x => (x, 1)) // RDD[(K,V)] 隐式转换为 PairRDDFunctions,调用PairRDDFunctions.reduceByKey rdd1.reduceByKey(_+_) }

6.4.3.1 reduceByKey 有shuffle

相同key的values合并

源码:

/** * Merge the values for each key using an associative and commutative reduce function. This will * also perform the merging locally on each mapper before sending results to a reducer, similarly * to a "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/ * parallelism level. 译文: 使用关联和可交换的归约函数合并每个键的值。这还将*在将结果发送给reducer之前,在每个Mapper上本地执行合并,类似于*与MapReduce中的“ combiner”。输出将使用现有的partitioner / * parallelism级别进行哈希分区。 */ def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope { reduceByKey(defaultPartitioner(self), func) }

会在Map端进行局部合并(类似Combiner)

combiner是在shuffle阶段运行,combin是在reducer阶段

注意:合并后的类型必须和之前value的类型一致!

测试:

/* ReduceByKey算子 有在MapTask端局部合并的效果,类似MR中的Combiner! */ @Test def testReduceByKey() : Unit ={ val list = List((1, 1), (2, 1), (3, 1), (4, 1)) val rdd: RDD[(Int, Int)] = sparkContext.makeRDD(list, 2) rdd.reduceByKey(_+_) }

6.4.3.2 groupByKey 有shuffle

直接按照K对 V进行分组!

根据key进行分组!

/* groupByKey算子 groupBy和groupBykey的区别: ① groupBy 本质上调用的还是 groupByKey groupBy: 当前RDD可以是任意类型 RDD[T] =====> f(T => K) ====> 根据K,将对应的T进行分组! ② groupByKey 必须要求是 RDD[(K,V)] ,直接按照K对 V进行分组! reduceByKey和groupByKey的区别? ① reduceByKey: 将 相同key的values合并 在shuffle时,会有map端聚合 ② groupByKey : 将K-V对按照Key进行分组 */

测试:

@Test def testGroupByKey() : Unit ={ val list = List((1, 1), (1, 2), (2, 1), (2, 2)) val rdd: RDD[(Int, Int)] = sparkContext.makeRDD(list, 2) rdd.groupByKey().saveAsTextFile("output") //可以不传参,不传参就是上游最大的分区数,可以传分区数,也可以传分区器 /* ① def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] ② def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] ③ def groupByKey(): RDD[(K, Iterable[V])] = self.withScope { groupByKey(defaultPartitioner(self)) } */ }

源码:

/** * Group the values for each key in the RDD into a single sequence. Hash-partitions the * resulting RDD with the existing partitioner/parallelism level. The ordering of elements * within each group is not guaranteed, and may even differ each time the resulting RDD is * evaluated. * * @note This operation may be very expensive. If you are grouping in order to perform an * aggregation (such as a sum or average) over each key, using `PairRDDFunctions.aggregateByKey` * or `PairRDDFunctions.reduceByKey` will provide much better performance. */ 译文: / ** *将RDD中每个键的值分组为单个序列。用现有的分区器/并行度级别对*生成的RDD进行哈希分区。不能保证每个组中元素*的顺序,甚至在每次对生成的RDD进行评估时都可能会有所不同。 * * @note此操作可能非常昂贵。如果要分组以便对每个键执行*聚合(例如求和或平均值),则使用`PairRDDFunctions.aggregateByKey` *或`PairRDDFunctions.reduceByKey`将提供更好的性能。 * / def groupByKey(): RDD[(K, Iterable[V])] = self.withScope { groupByKey(defaultPartitioner(self)) }

6.4.3.3 aggregateByKey 通过K将V转化成U进行聚合

源码:

/** * Aggregate the values of each key, using given combine functions and a neutral "zero value". * This function can return a different result type, U, than the type of the values in this RDD, * V. Thus, we need one operation for merging a V into a U and one operation for merging two U's, * as in scala.TraversableOnce. The former operation is used for merging values within a * partition, and the latter is used for merging values between partitions. To avoid memory * allocation, both of these functions are allowed to modify and return their first argument * instead of creating a new U. */ 译文: / ** *使用给定的Combine函数和中性的“零值”汇总每个键的值。 *此函数可以返回与该RDD中的值类型不同的结果类型U *V。因此,我们需要一个将V合并为U的操作,以及一个将两个U合并的操作* .TraversableOnce。前一个操作用于合并*分区中的值,而后一个操作用于合并分区之间的值。为了避免分配内存,这两个函数都可以修改并返回其第一个参数*而不是创建新的U。* / def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)] = self.withScope { aggregateByKey(zeroValue, defaultPartitioner(self))(seqOp, combOp) } def aggregateByKey[U: ClassTag](zeroValue: U, partitioner: Partitioner)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)] = self.withScope { // Serialize the zero value to a byte array so that we can get a new clone of it on each key val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue) val zeroArray = new Array[Byte](zeroBuffer.limit) zeroBuffer.get(zeroArray) lazy val cachedSerializer = SparkEnv.get.serializer.newInstance() val createZero = () => cachedSerializer.deserialize[U](ByteBuffer.wrap(zeroArray)) // We will clean the combiner closure later in `combineByKey` val cleanedSeqOp = self.context.clean(seqOp) combineByKeyWithClassTag[U]((v: V) => cleanedSeqOp(createZero(), v), cleanedSeqOp, combOp, partitioner) }

解释:

/* zeroValue: 零值,初始值,用来将 相同key的value 聚合到 zeroValue上 seqOp: 分区内,将相同key的values 聚合到 zeroValue上 combOp: 在多个分区间,将将相同key的合并后的zeroValue进行聚合,返回最终的zeroValue */ def aggregateByKey[U: ClassTag] (zeroValue: U) (seqOp: (U, V) => U,combOp: (U, U) => U)

测试:

/* aggregateByKey 初体验 def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)] 目的将 每个key对应的values的值,合并到一个 zeroValue上,将运算后的zeroValue 和key 作为结果返回! seqOp :(U, V) => U 在分区内运算,将分区内,相同key对应的values合并到 zeroValue combOp: (U, U) => U 在分区间运算,将每个分区生成的 K-U类型,相同k的 U类型的 zeroValue 再进行合并,得到最终结果! aggregateByKey 模拟 reduceByKey(_+_) */ @Test def testAggregateByKey() : Unit ={ val list = List((1, 1), (1, 2), (2, 1), (1,3),(2,2),(2, 2)) val rdd: RDD[(Int, Int)] = sparkContext.makeRDD(list, 2) rdd.aggregateByKey(0)(_+_,_+_).saveAsTextFile("output") } 【结果两个分区,一个(1,6)一个(2,5)】

练习一:

/* aggregateByKey 练习1 : 取出每个分区内相同key的最大值然后分区间相加 分区内: 0: (1, 1), (1, 2), (2, 1) => (1,{1,2}) => f(U,V) => (1,2),(2,1) 1: (1,3),(2,2),(2, 2) => (1,3),(2,2) 分区间相加: (1,5),(2,3) */ @Test def testAggregateByKeyExec1() : Unit ={ val list = List((1, 1), (1, 2), (2, 1), (1,3),(2,2),(2, 2)) val rdd: RDD[(Int, Int)] = sparkContext.makeRDD(list, 2) rdd.aggregateByKey(Int.MinValue)((zeroValue,value) => zeroValue.max(value),_+_).saveAsTextFile("output") }

练习二:

/* aggregateByKey 练习2 :分区内同时求最大和最小,分区间合并 分区内: 0: (1, 1), (1, 2), (2, 1) => (1,(2,1)) ,(2,(1,1)) 1: (1,3),(2,2),(2, 2) => (1,(3,3)),(2,(2,2)) 分区间相加: (1,(5,4)) (2,(3,3)) */ @Test def testAggregateByKeyExec2() : Unit ={ val list = List((1, 1), (1, 2), (2, 1), (1,3),(2,2),(2, 2)) val rdd: RDD[(Int, Int)] = sparkContext.makeRDD(list, 2) rdd.aggregateByKey((Int.MinValue,Int.MaxValue))( { case ((zero1,zero2),value ) => (zero1.max(value), zero2.min(value)) }, { case ( (max1,min1) , (max2 , min2)) => ( max1+max2,min1+min2 ) } ).saveAsTextFile("output") }

练习三:

/* aggregateByKey 练习3: 求每个key对应的平均值 平均值: sum / count 分区内: 统计 (key,(sum,count)) 0: (1, 1), (1, 2), (2, 1) => (1,(3,2)) ,(2,(1,1)) 1: (1,3),(2,2),(2, 2) => (1,(3,1)),(2,(4,2)) 分区间: (1,(3,2)) ,(1,(3,1)) => (1,(6,3)) => (1,2) (2,(1,1)),(2,(4,2)) => (2,(5,3)) => (2,1.6) */ @Test def testAggregateByKeyExec3() : Unit ={ val list = List((1, 1), (1, 2), (2, 1), (1,3),(2,2),(2, 2)) val rdd: RDD[(Int, Int)] = sparkContext.makeRDD(list, 2) rdd.aggregateByKey((0.0,0))( { case ((sum:Double,count:Int),value ) => (sum + value, count+1) }, { case ( (sum1,count1) , (sum2 , count2)) => ( sum1+sum2,count1+count2 ) } ).map{ case (key,(sum,count)) => (key,sum /count) } .saveAsTextFile("output") }

6.4.3.4 foldByKey aggregateByKey的简化版!

源码:

def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] = self.withScope { foldByKey(zeroValue, defaultPartitioner(self))(func) } def foldByKey( zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)] = self.withScope { // Serialize the zero value to a byte array so that we can get a new clone of it on each key val zeroBuffer = SparkEnv.get.serializer.newInstance().serialize(zeroValue) val zeroArray = new Array[Byte](zeroBuffer.limit) zeroBuffer.get(zeroArray) // When deserializing, use a lazy val to create just one instance of the serializer per task lazy val cachedSerializer = SparkEnv.get.serializer.newInstance() val createZero = () => cachedSerializer.deserialize[V](ByteBuffer.wrap(zeroArray)) val cleanedFunc = self.context.clean(func) combineByKeyWithClassTag[V]((v: V) => cleanedFunc(createZero(), v), cleanedFunc, cleanedFunc, partitioner) }

foldByKey是aggregateByKey的简化版!

​ foldByKey在运算时,分区内和分区间合并的函数都是一样的!

如果aggregateByKey,分区内和分区间运行的函数一致,且zeroValue和value的类型一致,可以简化使用foldByKey!

测试:

/* foldByKey算子: val cleanedFunc = self.context.clean(func) combineByKeyWithClassTag[V]((v: V) => cleanedFunc(createZero(), v), cleanedFunc, cleanedFunc, partitioner) aggregateByKeys算子: combineByKeyWithClassTag[U]((v: V) => cleanedSeqOp(createZero(), v), cleanedSeqOp, combOp, partitioner) foldByKey 是简化版的 aggregateByKeys算子 要求 cleanedSeqOp 和 combOp 必须是一样的! 要求 zeroValue类型必须和 value类型是一致的! */ @Test def testFoldByKey() : Unit ={ val list = List((1, 1), (1, 2), (2, 1), (1,3),(2,2),(2, 2)) val rdd: RDD[(Int, Int)] = sparkContext.makeRDD(list, 2) //可以简化为下述写法 rdd.aggregateByKey(0)(_+_,_+_) rdd.foldByKey(0)(_+_).saveAsTextFile("output") }

aggrregateByKey和foldByKey两都是柯里化的方法,有多个参数列表

6.4.3.5 combineByKey 也是基于K进行合并

源码:

/* createCombiner: 将每个key的第一个value,经过函数运算,运算后,将结果zeroValue。 mergeValue: 在分区内运行,C是 zeroValue类型,将剩余(除去第一个value)的value合并到 zevoValue上 mergeCombiners: 在分区间,将相同key的zeroValue合并 */ def combineByKey[C]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope { combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null) }

createCombiner: V => C

1.aggregateByKey 是一个柯里化的方法,第一个参数列表直接传入 zeroValue 2.combineByKey 通过createCombiner函数,生成 zeroValue,将相同key的第一个value,传入运算,将返回值作为zeroValue,之后,这个运算过的value不会再参与运算!

mergeValue: (C, V) => C

--和aggregateByKey的seqop一样: 分区内合并 , 将相同keyvalues合并到 zeroValue

mergeCombiners: (C, C) => C)

--和aggregateByKey的combOp一样: 分区间合并 , 将相同key的zeroValue合并到 最终zeroValue

combineByKeyWithClassTag的简化版本,简化在没有提供ClassTag

combineByKey: 是为了向后兼容。兼容foldByKey类型的算子!

测试:

/* CombineByKey算子: def combineByKey[C]( // Combiner = zeroValue createCombiner: V => C, // aggregateByKey 是一个柯里化的方法,第一个参数列表直接传入 zeroValue // 通过createCombiner函数,生成 zeroValue,将相同key的第一个value,传入运算,将返回值作为zeroValue // 之后,这个运算过的value不会再 参与运算! mergeValue: (C, V) => C, // 和aggregateByKey的seqop一样,分区内合并 , 将相同key的values合并到 zeroValue mergeCombiners: (C, C) => C // 和aggregateByKey的combOp一样,分区间合并 , 将相同key的zeroValue合并到 最终zeroValue ): RDD[(K, C)] 0: (1, 1), (1, 2), (2, 1) createCombiner: 1===>11 , 2=====>11 mergeValue: (1,13),(2,11) 1: (1,3),(2,2),(2, 2) createCombiner: 1===>13 , 2=====>12 mergeValue: (1,13),(2,14) mergeCombiners: (1,26) (2,25) */ @Test def testCombineByKey() : Unit ={ val list = List((1, 1), (1, 2), (2, 1), (1,3),(2,2),(2, 2)) val rdd: RDD[(Int, Int)] = sparkContext.makeRDD(list, 2) val result: RDD[(Int, Int)] =rdd.combineByKey[Int]( (value:Int) => value+10, (zeroValue:Int,value:Int) => zeroValue+value, (zeroValue1:Int,zeroValue2:Int) => zeroValue1+zeroValue2) //val result: RDD[(Int, Int)] = rdd.combineByKey[Int](_ + 10, _ + _, _ + _) result.saveAsTextFile("output") }

练习:

/* CombineByKey算子练习: 将数据List(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98))求每个key的平均值 */ @Test def testCombineByKeyExec() : Unit ={ val list = List(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 94), ("b", 97)) val rdd: RDD[(String, Int)] = sparkContext.makeRDD(list, 2) // 有问题 /*val result: RDD[(String, (Double, Int))] = rdd.combineByKey[(Double, Int)]((value: Int) => (value.toDouble, 1), mergeValue= { case ((sum: Double, count: Int), value: Int) => (sum + value, count + 1) }, mergeCombiners= { case ((sum1: Double, count1: Int), (sum2: Double, count2: Int)) => (sum1 + sum2, count1 + count2) } )*/ val result: RDD[(String, (Double, Int))] = rdd.combineByKey[(Double, Int)]((value: Int) => (value.toDouble, 1), (zeroValue:(Double, Int), value:Int) => (zeroValue._1 + value, zeroValue._2 + 1), (zerovalue1:(Double, Int), zerovalue2:(Double, Int)) => (zerovalue1._1 + zerovalue2._1, zerovalue1._2 + zerovalue2._2)) result.map{ case (key,(sum,count)) => (key,sum / count) } .saveAsTextFile("output") }

6.4.3.6 4个算子的区别

​ --ReduceByKey,CombineByKey,AggretageByKey,FoldByKey的联系和转换

combineByKey: combineByKeyWithClassTag: mapSideCombine: Boolean = true CombineByKey: combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)key的第一个value,通过转换,转换为zeroValue可以和value类型不一致! 分区内和分区间运算的逻辑不一致! AggretageByKey: AggretageByKey: combineByKeyWithClassTag[U]((v: V) => cleanedSeqOp(createZero(), v), cleanedSeqOp, combOp, partitioner) 允许自定义zeroValue,zeroValue可以和value不一致! 分区内和分区间运算的逻辑不一致! FoldByKey: FoldByKey:combineByKeyWithClassTag[V]((v: V) => cleanedFunc(createZero(), v), cleanedFunc, cleanedFunc, partitioner) 允许自定义zeroValue,zeroValue必须和value一致! 分区内和分区间运算的逻辑一致! ReduceByKey: ReduceByKey(最常用) : combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner) zeroValue 的类型和 Value的类型一致! 取key的第一个Value作为zeroValue 分区内和分区间运算的逻辑一致! ReduceByKey,CombineByKey,AggretageByKey,FoldByKey 都会在Map端聚合!类似MR中的combiner!

测试:

/* 用函数计算zeroValue 分区内计算 分区间计算 */ rdd.combineByKey(v => v + 10, (zero: Int, v) => zero + v, (zero1: Int, zero2: Int) => zero1 + zero2) //如果不需要使用函数计算zeroValue,而是直接传入,此时就可以简化为 rdd.aggregateByKey(10)(_ + _, _ + _) //如果aggregateByKey的分区内和分区间运算函数一致,且zeroValue和value同一类型,此时可以简化为 rdd.foldByKey(0)(_ + _) // 如果zeroValue为0,可以简化为reduceByKey rdd.reduceByKey(_ + _) //四个算子本质都调用了,只不过传入的参数进行了不同程度的处理 // 以上四个算子,都可以在map端聚合! combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null) // 根据需要用,常用是reduceByKey @Test def test4OperationTransform() : Unit ={ val list = List(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98)) val rdd: RDD[(String, Int)] = sparkContext.makeRDD(list, 2) val result: RDD[(String, Int)] =rdd.combineByKey[Int]( (value:Int) => value, (zeroValue:Int,value:Int) => zeroValue+value, (zeroValue1:Int,zeroValue2:Int) => zeroValue1+zeroValue2) // 等价于 rdd.aggregateByKey(0)(_+_,_+_) //等价于 rdd.foldByKey(0)(_+_) //等价于 rdd.reduceByKey(_+_) }

6.4.3.7 partitionBy 有shuffle

使用指定的分区器对RDD进行重新分区!

/* partitionBy算子 有shuffle */ @Test def testPartitionBy() : Unit ={ val list = List(1, 2, 3, 4,5,6,7,8,9,10,2,3,4,2,3,4,2,3,4) val rdd: RDD[Int] = sparkContext.makeRDD(list, 2) val rdd1: RDD[(Int, Int)] = rdd.map(x => (x, 1)) val rangeP = new RangePartitioner[Int, Int](3, rdd1) // 数据使用 rangeP 重新分区 rdd1.partitionBy(rangeP).saveAsTextFile("output") }

6.4.3.8 自定义Partitioner,自定义类,继承Partitioner类!实现其中的getPartition()

//自定义一个类继承Partitioner class MyPartitioner extends Partitioner { override def numPartitions: Int = 2 // 分区逻辑 0-20岁分到0号区,20岁以上到1号区 非Person类型,分到0号区 override def getPartition(key: Any): Int = { // 将Any转为Person if (!key.isInstanceOf[Person]){ 0 }else{ val person: Person = key.asInstanceOf[Person] if (person.age <=20){ 0 }else{ 1 } } } } /* 自定义分区器的使用 */ @Test def testCustomPartioner() : Unit ={ val list = List(Person("jack",20),Person("jack",20),Person("jack",20),Person("jack",20),Person("jack1",21),Person("jack1",21)) val rdd: RDD[Person] = sparkContext.makeRDD(list, 2) //val rdd2: RDD[(Int, Person)] = rdd.map(p => (p.age, p)) //rdd2.partitionBy(new HashPartitioner(2)) val rdd3: RDD[(Person, Int)] = rdd.map(p => (p, 0)) rdd3.partitionBy(new MyPartitioner).saveAsTextFile("output") }

6.4.3.9 mapValues 没有shuffle 对V进行操作

将K-V类型RDD相同key的所有values经过函数运算后,再和Key组成新的k-v类型的RDD

/* MapValues算子 RDD[(K,V)] ===> def mapValues[U](f: V => U): RDD[(K, U)] 将 Key对应的 value 使用 函数进行运算, 之后将运算后的结果和Key再组成新的K-V对返回 没有shuffle */ @Test def testMapValues() : Unit ={ val list = List((1, 1), (2, 1), (3, 1), (4, 1)) val rdd: RDD[(Int, Int)] = sparkContext.makeRDD(list, 2) val result: RDD[(Int, String)] = rdd.mapValues(x => "hello:" + x) result.saveAsTextFile("output") }

常用算子

* * RDD: 低阶编程API * 推荐: SQL(简单,学习成本低,受市场欢迎) * * select * xxx,count(*),sum(*),max(*) distinct * subString(x) 一进一出 map * from xxx join xxxx join * where xxx filter * group by xxx reduceByKey * having xxx * order by xxx sortBy sortByKey * limit take(n) *

6.4.3.10 SortByKey 默认是 RangePartitioner,有shuffle!

根据key进行排序!

基本数据类型排序测试:

/* SortByKey算子: SortByKey算子在哪? 如何调用OrderedRDDFunctions中的方法? implicit def rddToOrderedRDDFunctions[K : Ordering : ClassTag, V: ClassTag](rdd: RDD[(K, V)]) : OrderedRDDFunctions[K, V, (K, V)] = { new OrderedRDDFunctions[K, V, (K, V)](rdd) 默认是 RangePartitioner,有shuffle! } */ @Test def testSortByKey() : Unit ={ val list = List(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98)) val rdd: RDD[(String, Int)] = sparkContext.makeRDD(list, 2) // RDD[(K, V)] ===> rddToOrderedRDDFunctions ==> OrderedRDDFunctions.sortByKey rdd.sortByKey(false,1).saveAsTextFile("output") }

​ 针对自定义的类型进行排序!可以提供一个隐式的自定义类型的排序器Ordering[T],自定义类型测试:

/* SortByKey算子:排序自定义Bean Person(name:String,age:Int) */ @Test def testSortByKey2() : Unit ={ val list = List(Person("jack", 20), Person("jack", 21), Person("marry", 21), Person("tom", 23)) val rdd: RDD[Person] = sparkContext.makeRDD(list, 2) // 希望按照什么排序,就作为key val rdd1: RDD[(Int, Person)] = rdd.map(p => (p.age, p)) rdd1.sortByKey(false,1).saveAsTextFile("output") }

按照名称降序,年龄升序 ,提供一个Ordering

//比较器 @Test def testSortByKey3() : Unit ={ implicit val myOrdering: Ordering[Person] = new Ordering[Person]() { // 比较的逻辑 override def compare(x: Person, y: Person): Int = { // 按照名称降序排 var result: Int = -x.name.compareTo(y.name) if (result == 0) { //名称相同的,按照年龄升序排 result = x.age.compareTo(y.age) } result } } val list = List(Person("jack", 20), Person("jack", 21), Person("marry", 21), Person("tom", 23)) val rdd: RDD[Person] = sparkContext.makeRDD(list, 2) // 希望按照什么排序,就作为key val rdd1: RDD[(Person, Int)] = rdd.map(p => (p, 0)) // rdd1.sortByKey(true,1).saveAsTextFile("output") // 名称升序,年龄降序 rdd1.sortByKey(false,1).saveAsTextFile("output") }

冥界(隐式)召唤: 从当前代码的域中,获取一个隐式的指定类型的变量

当前类定义了一个隐式变量 implicit val num:Int=10 @Test def testImplicitly() : Unit ={ 方法属于这个类里 // 方法里获取一个 Int类型的值(Int类型很多,我不知道有没有) val i: Int = implicitly[Int] println(i)//有的话就是10 }

源码:

/** * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling * `collect` or `save` on the resulting RDD will return or output an ordered list of records * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in * order of the keys). */ // TODO: this currently doesn't work on P other than Tuple2! def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length) : RDD[(K, V)] = self.withScope { val part = new RangePartitioner(numPartitions, self, ascending) //默认是RangePartitioner new ShuffledRDD[K, V, V](self, part)//一定有suffle .setKeyOrdering(if (ascending) ordering else ordering.reverse)//设置了一个比比较器 } //进入ordering,可见语法是冥界召唤 private val ordering = implicitly[Ordering[K]] 如果要设比较器,就要将比较器设置成隐式的,后续调用的时候用冥界召唤 因为sortByKey只能传两个参数,ordering就不能传 sortBy就不一样他是一个柯里化的方法,他可以将你的比较器通过参数传进去 sortByKey就可以通过冥界召唤的方式让你传进去

6.4.3.11 连接 全部shuffle

Join: 根据两个RDD key,对value进行交集运算!

LeftOuterJoin: 类似 left join,取左侧RDD的全部和右侧key有关联的部分!

RightOuterJoin: 类似 right join,取右侧RDD的全部和左侧key有关联的部分!

FullOuterJoin: 取左右RDD的全部,及有关联的部分!

​ 如果关联上,使用Some,如果关联不上使用None标识!

测试:

/* Join操作的算子 全部有shuffle! Join: 内连接 取左右结果集的交集 leftJoin: 左外连接 取左侧结果集的全部 和 与右侧结果集的交集部分 rightjoin: 右外连接 取右侧结果集的全部 和 与左侧结果集的交集部分 full join: 满外连接 取左右侧结果集的全部 和 二者交集部分 将key作为关联的键 */ @Test def testJoin() : Unit ={ val list1 = List(("a", 88), ("b", 95),("c",20),("b",93)) val list2 = List(("a", 2), ("b", 2),("d",30),("a",30)) val rdd1: RDD[(String, Int)] = sparkContext.makeRDD(list1, 2) val rdd2: RDD[(String, Int)] = sparkContext.makeRDD(list2, 2) // rdd1.join(rdd2).coalesce(1).saveAsTextFile("output") //rdd1.leftOuterJoin(rdd2).coalesce(1).saveAsTextFile("output") //rdd1.rightOuterJoin(rdd2).coalesce(1).saveAsTextFile("output") rdd1.fullOuterJoin(rdd2).coalesce(1).saveAsTextFile("output") }

leftOuterJoin:

rightOuterJoin

fullOuterJoin

逻辑练习:

agent.log:时间戳,省份,城市,用户,广告,中间字段使用空格分隔 示例: 1516609143867 6 7 64 16 统计出每一个省份 广告被点击数量Top3 广告 结果 <= 省份 * 3 逻辑分析: 数据的粒度: 每一条数据代表的含义(意义) 某省份某城市的某用户 在某时间 点击了某广告, 封装数据: 1.--1516609143867 6 7 64 16 => (( 省份,广告 ) ,1) map 2.--(( 省份,广告 ) ,1) => (( 省份,广告 ) ,N) reduceByKey 3.--(( 省份,广告 ) ,N) => (省份,(广告 ,N)) map 4.--(省份,(广告 ,N)) => {(省份1, {(广告1 ,N),(广告2 ,N),(广3 ,N)... }) , ...,(省份2, {(广告1 ,N),(广告2 ,N),(广3 ,N)... }) } groupByKey 5.--(省份1, {(广告1 ,N),(广告2 ,N),(广3 ,N)... }) => {(广告1 ,N),(广告2 ,N),(广3 ,N)... } ==> 按照N排序后取前三,再和Key组合为 K-V对 mapValues @Test def testExec() : Unit ={ //读数据 val result: RDD[(String, List[(String, Int)])] = sparkContext.textFile("input/agent.log") .map(line => { val words: Array[String] = line.split(" ") ((words(1), words(4)), 1) }) .reduceByKey(_ + _) .map { case ((province, ads), count) => (province, (ads, count)) } .groupByKey() .mapValues(iter => { iter.toList.sortBy(-_._2).take(3) }) result.coalesce(1).saveAsTextFile("output") }

6.4.3.12 cogroup

将左右两侧RDD中所有相同key的value进行聚合,返回每个key及对应values的集合!

将两侧的RDD根据key进行聚合,返回左右两侧RDD相同的values集合的RDD!

最新回复(0)