spark 累加器

it2023-10-13  68

累加器

累加器:分布式共享只写变量。(Task和Task之间不能读数据) 累加器用来对信息进行聚合,通常在向Spark传递函数时,比如使用map()函数或者用 filter()传条件时,可以使用驱动器程序中定义的变量,但是集群中运行的每个任务都会得到这些变量的一份新的副本,更新这些副本的值也不会影响驱动器中的对应变量。如果我们想实现所有分片处理时更新共享变量的功能,那么累加器可以实现我们想要的效果。

系统累加器

package com.bupt.day06 import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD import org.apache.spark.util.LongAccumulator /** * @author yangkun * @date 2020/10/20 19:56 * @version 1.0 */ object accumulatorDemo { def main(args: Array[String]): Unit = { //创建SparkConf并设置App名称 val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]") //创建SparkContext,该对象是提交Spark App的入口 val sc: SparkContext = new SparkContext(conf) //创建一个RDD, 单值RDD,直接并对数据进行求和,不存在shuffle过程 /*val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4)) println(rdd.sum()) println(rdd.reduce(_ + _)) */ val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a",1),("a",2),("a",3),("a",4)),2) /* //存在shuffle,效率较低 val resRDD: RDD[(String, Int)] = rdd.reduceByKey(_+_) resRDD.map(_._2).collect().foreach(println) */ /* //如果定义一个普通的变量,那么在Driver定义,Excutor会创建变量的副本,算子都是对副本进行操作,Driver端的变量不会更新 var sum:Int = 0 rdd.foreach{ case (word,count)=>{ sum += count } } println(sum) */ //如果需要通过Excutor,对Driver端定义的变量进行更新,需要定义为累加器 //累加器和普通的变量相比,会将Excutor端的结果,收集到Driver端进行汇总 //创建累加器 //val sum: Accumulator[Int] = sc.accumulator(10) //过时了 val sum: LongAccumulator = sc.longAccumulator("myAcc") rdd.foreach{ case (word,count)=>{ //sum +=count sum.add(count) println("****" + sum.value) } } println(sum.value) // 关闭连接 sc.stop() } }

注意: (1)工作节点上的任务不能相互访问累加器的值。从这些任务的角度来看,累加器是一个只写变量。 (2)对于要在行动操作中使用的累加器,Spark只会把每个任务对各累加器的修改应用一次。因此,如果想要一个无论在失败还是重复计算时都绝对可靠的累加器,我们必须把它放在foreach()这样的行动操作中。转化操作中累加器可能会发生不止一次更新。

自定义累加器

自定义累加器类型的功能在1.X版本中就已经提供了,但是使用起来比较麻烦,在2.0版本后,累加器的易用性有了较大的改进,而且官方还提供了一个新的抽象类:AccumulatorV2来提供更加友好的自定义类型累加器的实现方式。 1)自定义累加器步骤 (1)继承AccumulatorV2,设定输入、输出泛型 (2)重写方法 2)需求:自定义累加器,统计集合中首字母为“H”单词出现的次数。 List(“Hello”, “Hello”, “Hello”, “Hello”, “Hello”, “Spark”, “Spark”)

package com.atguigu.spark.day06 import org.apache.spark.rdd.RDD import org.apache.spark.util.{AccumulatorV2, LongAccumulator} import org.apache.spark.{SparkConf, SparkContext} import scala.collection.mutable /** * Author: Felix * Date: 2020/5/15 * Desc: 累加器 * 自定义累加器,统计出RDD中,所有以"H"开头的单词以及出现次数(word,count) */ object Spark07_Accumulator { def main(args: Array[String]): Unit = { //创建SparkConf并设置App名称 val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[*]") //创建SparkContext,该对象是提交Spark App的入口 val sc: SparkContext = new SparkContext(conf) val rdd: RDD[String] = sc.makeRDD(List("Hello", "Hello", "HaHa", "Hello", "HaHa", "Spark", "Spark")) //创建累加器对象 val myAcc = new MyAccumulator //注册累加器 sc.register(myAcc) //使用累加器 rdd.foreach{ word=>{ myAcc.add(word) } } //输出累加器结果 println(myAcc.value) // 关闭连接 sc.stop() } } //定义一个类,继承AccumulatorV2 //泛型累加器输入和输出数据的类型 class MyAccumulator extends AccumulatorV2[String,mutable.Map[String,Int]]{ //定义一个集合,集合单词以及出现次数 var map = mutable.Map[String,Int]() //是否为初始状态 override def isZero: Boolean = map.isEmpty //拷贝 override def copy(): AccumulatorV2[String, mutable.Map[String, Int]] = { val newAcc = new MyAccumulator newAcc.map = this.map newAcc } //重置 override def reset(): Unit = map.clear() //向累加器中添加元素 override def add(elem: String): Unit = { if(elem.startsWith("H")){ //向可变集合中添加或者更新元素 map(elem) = map.getOrElse(elem,0) + 1 } } //合并 override def merge(other: AccumulatorV2[String, mutable.Map[String, Int]]): Unit = { //当前Excutor的map var map1 = map //另一个Excutor的map var map2 = other.value map = map1.foldLeft(map2) { //mm表示map2,kv表示map1中的每一个元素 (mm, kv) => { //指定合并规则 val k: String = kv._1 val v: Int = kv._2 //根据map1中元素的key,到map2中获取value mm(k) = mm.getOrElse(k, 0) + v mm } } } //获取累加器的值 override def value: mutable.Map[String, Int] = map } package day08 import org.apache.spark.rdd.RDD import org.apache.spark.util.AccumulatorV2 import org.apache.spark.{SparkConf, SparkContext} /** * @author yangkun * @date 2020/10/28 11:15 * @version 1.0 * 求平均年龄----累加器方式实现 */ object Spark04_accumulatorgetAvgAge { def main(args: Array[String]): Unit = { //创建SparkConf配置文件对象 val conf: SparkConf = new SparkConf().setAppName("SparkSQL04_Demo").setMaster("local[*]") val sc: SparkContext = new SparkContext(conf) val rdd: RDD[(String, Int)] = sc.makeRDD(List(("yangkun",21),("ll",32),("aan",20))) //创建累加器对象 val my = new myAcc //注册累加器 sc.register(my) //使用累加器 rdd.foreach{ case (name,age) => { my.add(age) } } //获得累加器的值 println(my.value) } } class myAcc extends AccumulatorV2[Int,Double]{ var ageSum = 0 var countSum = 0 override def isZero: Boolean = ageSum == 0 && countSum == 0 override def copy(): AccumulatorV2[Int, Double] = { var newAcc = new myAcc newAcc.ageSum = this.ageSum newAcc.countSum = this.countSum newAcc } override def reset(): Unit = { ageSum = 0 countSum = 0 } override def add(v: Int): Unit = { this.countSum += 1 this.ageSum += v } override def merge(other: AccumulatorV2[Int, Double]): Unit = { other match { case ot:myAcc => { this.ageSum += ot.ageSum this.countSum += ot.countSum } case _ => } } override def value: Double = ageSum.toDouble / countSum }
最新回复(0)