SparkCore——下篇

it2025-02-18  3

接上章节

文章目录

接上章节 六、核心编程------RDD 【编程模型之一】6.5 行动算子6.5.1 区别6.5.2 常见的行动算子 七、 RDD依赖关系7.1 RDD血缘关系 【toDebugString】7.2 RDD依赖关系 【dependencies】 八、RDD序列化8.1 序列化8.1.1 闭包8.1.2 测试18.1.3 测试28.1.4 测试38.1.5 测试4 8.2 kryo8.2.1 引入8.2.2 测试: 九、共享变量 【编程模型】9.1、广播变量 【编程模型之二,它很重要!!!】9.1.1 概念9.1.2 使用 9.2、累加器【编程模型之三,它很重要!!!】9.2.1 概念9.2.2 源码9.2.3 测试9.2.3.1 累加器原理测试1:测试2: 9.2.3.2 自定义累加器9.2.3.2.1 自定义一个Int类型的累加器测试3:测试4:测试5: 9.2.3.2.2 自定义单词统计累加器 9.3 总结 十、RDD持久化10.1 cache10.1.1 缓存引入10.1.2 缓存基本介绍10.1.3 缓存的使用10.1.4 总结 10.2 checkpoint10.2.1 基本介绍10.2.2 测试10.2.3 总结 十一、RDD文件读取不同数据源的数据11.1 RDD读写文本文件【重要!!!】11.1.1 基本介绍11.1.2 源码11.1.3 测试: 11.2 RDD读写对象文件11.2.1 基本介绍11.2.2 测试 11.3 RDD读写Sf文件11.3 RDD读写MySql数据库文件 【重要!!!】11.3.1 基本介绍11.3.2 源码11.3.3 测试11.3.3.1读11.3.3.2 写 11.4 RDD读写HBase的文件11.4.1 基本介绍11.4.2 读11.4.3 写

六、核心编程------RDD 【编程模型之一】

6.5 行动算子

6.5.1 区别

行动算子用来提交Job!和转换算子不同的时,转换算子一般是懒执行的!转换算子需要行动算子触发!

6.5.2 常见的行动算子

算子解释备注reduce将RDD的所有的元素使用一个函数进行归约,返回归约后的结果collect将RDD的所有元素使用Array进行返回,收集到Driver慎用!如果RDD元素过多,Driver端可能会OOM!count统计RDD中元素的个数take(n:Int)取前N个元素慎用!如果取RDD元素过多,Driver端可能会OOM!takeOrdered取排序后的前N个元素慎用!如果取RDD元素过多,Driver端可能会OOM!first返回第一个元素aggregate和aggregateByKey算子类似,不同的是zeroValue在分区间聚合时也会参与运算fold简化版的aggregate,分区内和分区间的运算逻辑一样countByValue统计相同元素的个数countByKey针对RDD[(K,V)]类型的RDD,统计相同key对应的K-V的个数foreach遍历集合中的每一个元素,对元素执行函数特殊场景(向数据库写入数据),应该使用foreachPartitionsave相关将RDD中的数据保存为指定格式的文件保存为SequnceFile文件,必须是RDD[(K,V)]特殊情况new RangeParitioner时,也会触发Job的提交! 常见行动算子 2.reduce : 使用指定的函数对RDD中的每个元素,两两聚合,得到最终结果! 3.collect: 将RDD中的元素,放入数组中! 慎用! 必须是Driver端内存大,生成的数组必须小时,才可以用!否则报错Driver端OOM! 4.count: 返回RDD中元素的数量 5.first: 返回RDD中的第一个元素! 0号区的第一个元素! 6.take: 取前N 慎用! 必须是Driver端内存大,生成的数组必须小时,才可以用!否则报错Driver端OOM! 7.takeOrdered: 排序取前N 慎用! 必须是Driver端内存大,生成的数组必须小时,才可以用!否则报错Driver端OOM! 8.aggregate: 和aggregateByKey的区别在于 ,zeroValue不仅在分区内参与运算,还在分区间参与运算! 9.fold: fold是aggregate的简化版 分区内和分区间的运算逻辑一样 zeroValue的类型和 RDD中元素的类型一致 10.countByKey: 只有 RDD[(K,V)]类型RDD才可以调用,统计相同key对应的value的个数 11.countByValue: 统计RDD中相同元素的个数 12.save相关 : saveAsTextFile 将RDD中的元素以文本文件格式保存 saveAsObjectFile 将RDD中的元素以对象文件格式保存 saveAsSequenceFile: 只有RDD[(K,V)]类型RDD才可以调用 将K-V对,保存为SequenceFile

测试:

@Test def testActionOperation() : Unit ={ val list = List(11, 2, 3, 4,5,5,5,5) val list2 = List((11,1), (2,1),(5,1),(5,1),(5,3)) val rdd: RDD[Int] = sparkContext.makeRDD(list, 2) val rdd2: RDD[(Int, Int)] = sparkContext.makeRDD(list2, 3) // rdd.saveAsObjectFile("output") rdd2.saveAsSequenceFile("output") // rdd.saveAsSequenceFile("output") /*println(rdd.countByValue()) println(rdd2.countByValue())*/ /* println(rdd2.countByKey())*/ println(rdd.reduce(_ + _)) rdd.collect() println(rdd.count()) println(rdd.first()) println(rdd.take(5).mkString(",")) println(rdd.takeOrdered(5)(Ordering[Int].reverse).mkString(",")) /*println(rdd.aggregate(10)(_ + _, _ + _)) println(rdd.fold(10)(_ + _))*/ }

foreach测试:

@Test def testForeach() : Unit ={ val list = List(11, 2, 3, 4,5,5,5,5) val rdd: RDD[Int] = sparkContext.makeRDD(list, 3) /* foreach 是Spark中RDD的算子,在Executor端执行 分布式运算! 去Task所在的EXECUTOR 看输出的结果! */ rdd.foreach(x => println(Thread.currentThread().getName+"--------->"+x)) println("---------------") /* collect 将RDD中的元素,以Array返回,返回到Driver端! foreach 是 Driver遍历Array! 单线程,顺序执行! 在Driver端查看! client: 在客户端看到 cluster: 去Driver所运行的EXECUTOR查看! */ rdd.collect().foreach(x => println(Thread.currentThread().getName+"--------->"+x)) // 每个分区,调用一次函数,用于向数据库写数据,批处理 //rdd.foreachPartition()没有返回值 }

七、 RDD依赖关系

7.1 RDD血缘关系 【toDebugString】

1.血缘关系 血统(Lineage): 沿袭 龙的传人 爱新觉罗的传人 toDebugString()

测试:

@Test def testLineage() : Unit ={ val list = List(1, 3, 3, 4) val rdd: RDD[Int] = sparkContext.makeRDD(list, 2) val rdd1: RDD[Int] = rdd.map(x => x) val rdd2: RDD[Int] = rdd1.map(x => x) val rdd3: RDD[Int] = rdd2.map(x => x) val rdd4: RDD[Int] = rdd3.map(x => x) val rdd5: RDD[(Int, Int)] = rdd4.map(x => (x, 1)) val rdd6: RDD[(Int, Int)] = rdd5.reduceByKey(_ + _,3) val rdd7: RDD[(Int, Int)] = rdd6.map(x => x) val rdd8: RDD[(Int, Int)] = rdd7.map(x => x) println(rdd8.toDebugString) }

结果:

//3代表三个分区 (3) MapPartitionsRDD[8] at map at LineageAndDependencyTest.scala:49 [] | MapPartitionsRDD[7] at map at LineageAndDependencyTest.scala:48 [] | ShuffledRDD[6] at reduceByKey at LineageAndDependencyTest.scala:46 [] /* 2 代表两个分区 | 代表同一层级 */ +-(2) MapPartitionsRDD[5] at map at LineageAndDependencyTest.scala:44 [] | MapPartitionsRDD[4] at map at LineageAndDependencyTest.scala:42 [] | MapPartitionsRDD[3] at map at LineageAndDependencyTest.scala:41 [] | MapPartitionsRDD[2] at map at LineageAndDependencyTest.scala:40 [] | MapPartitionsRDD[1] at map at LineageAndDependencyTest.scala:39 [] | ParallelCollectionRDD[0] at makeRDD at LineageAndDependencyTest.scala:37 []

通过血统和血缘关系可以查看当前RDD的来龙去脉

7.2 RDD依赖关系 【dependencies】

源码: /** * Get the list of dependencies of this RDD, taking into account whether the * RDD is checkpointed or not. */ final def dependencies: Seq[Dependency[_]] = { checkpointRDD.map(r => List(new OneToOneDependency(r))).getOrElse { if (dependencies_ == null) { stateLock.synchronized { if (dependencies_ == null) { dependencies_ = getDependencies } } } dependencies_ } } 测试: @Test def testDependency() : Unit ={ val list = List(1, 3, 3, 4) val rdd: RDD[Int] = sparkContext.makeRDD(list, 2) //List() 不依赖任何的父RDD 没父RDD println(rdd.dependencies) val rdd1: RDD[(Int, Int)] = rdd.map(x => (x, 1)) //List(org.apache.spark.OneToOneDependency@20a05b32) val dependencies: Seq[Dependency[_]] = rdd1.dependencies //ParallelCollectionRDD[0] println(dependencies(0).rdd) //println(rdd1.dependencies) val rdd2: RDD[(Int, Int)] = rdd1.reduceByKey(_ + _) //List(org.apache.spark.ShuffleDependency@a451491) println(rdd2.dependencies) } 结论: 依赖: 直接的关系! 每个人都依赖于父母! RDD.dependencies: 当前RDD和 上一级(父RDD)之间的依赖关系! 11子: 父RDD的一个整个分区,全部经过转换后,生成子RDD的以整个分区! ①:OneToOneDependency: 子RDD的一个分区,来源于父RDD的一个整体分区! extends NarrowDependency(窄依赖) RangeDependency OneToOneDependency ②:ShuffleDependency(shuffle依赖或宽依赖): 子RDD的一个分区,来源于父RDD的分区shuffle后的分区! 1父多子: 父RDD的一个整个分区,全部经过转换后,先shuffle分为若干个区,之后 子RDD的每个分区,分别拷贝对应的分区! Dependency : 依赖关系的顶级父类! 描述的是当前RDD的分区,和父RDD之间的关系! def rdd: RDD[T] : 当前RDD依赖的父RDD 作用: ①找爸爸 ②根据Dependency确定是窄依赖还是宽依赖 ③根据依赖的类型,可以划分阶段 如果是ShuffleDependency,就产生一个新的阶段!

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-luHdNTCj-1603280050875)(https://i.loli.net/2020/10/21/M3Qb9j8vJNnpuaC.png)]

窄依赖 【诸如Union,cartesian(笛卡尔积)】

11子: 父RDD的一个整个分区,全部经过转换后,生成RDD的一整个分区

宽依赖

1父多子:父RDD的一整个分区,全部经过转换后,先shuffle分为若干个区,之后子RDD的每个分区,分别拷贝对应的分区

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-abpMTC8P-1603280050878)(https://i.loli.net/2020/10/21/IbGtiYqm2acWN6U.png)]

如何判断一个算子到底会不会shuffle 是NarrowDependency,就不会产生shuffle! union ---> RangeDependency cartesian(笛卡尔积) ---> NarrowDependency 或者看RDD生成的依赖是啥

八、RDD序列化

8.1 序列化

8.1.1 闭包

闭包的概念 1、一个持有外部环境变量的函数就是闭包 2、理解闭包通常一下几个关键点 1、函数 2、自由变量 3、环境 举个例子 var a:Int=0 val b = rdd.foreach( x=>println(sum)) 函数b捕获了外部作用域(环境)中的变量sum,因此形成了闭包。而由于sum不属于函数b,在概念上称自由变量

8.1.2 测试1

@Test def test1() : Unit ={ val list = List(1, 2, 3, 4) val rdd: RDD[Int] = sparkContext.makeRDD(list, 2) // 在Driver端定义 var sum:Int =0 rdd.foreach( x => sum += x) println(sum) }

scala传入的基本数据类型都会序列化

传入闭包会发生什么事?

1.如果函数中传入的函数,产生了闭包,那么在提交job之前,会经过序列化的检查,检查使用的闭包中的变量是否可以序列化 2.如果可以序列化,此时将当前闭包变量,创建一个副本,以序列化的形式发送给每一个使用次算子的Task!

序列化检查是在提交job之前

/** * Applies a function f to all elements of this RDD. */ def foreach(f: T => Unit): Unit = withScope { val cleanF = sc.clean(f) //检查闭包变量 sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF)) //提交job }

8.1.3 测试2

自定义一个类,并测试:

//定义一个类 class user() @Test def test2() : Unit ={ val list = List(1, 2, 3, 4) val rdd: RDD[Int] = sparkContext.makeRDD(list, 2) val user = new User rdd.foreach( x => { println(user) }) }

报错!!!!!!!!!!

org.apache.spark.SparkException: Task not serializable 说明:算子存在闭包,闭包中使用的变量所在的类没有实现序列化接口!

解决1:

//定义一个类 class user() @Test def test2() : Unit ={ val list = List(1, 2, 3, 4) val rdd: RDD[Int] = sparkContext.makeRDD(list, 2) rdd.foreach( x => { val user = new User //将该对象放到算子里,不会报错,因为不构成闭包,就不会构成序列化等检查 println(user) }) }

解决2;

--让定义的类实现序列化接口 class user() extends Serializable class user() @Test def test2() : Unit ={ val list = List(1, 2, 3, 4) val rdd: RDD[Int] = sparkContext.makeRDD(list, 2) val user = new User rdd.foreach( x => { println(user) }) } //注意: trait Serializable extends Any with java.io.Serializable 即: scala中的Serializable也是继承了java中的序列化 hadoop的Writable,spark不支持!只支持Java的java.io.Serializable

解决3:

--直接使用样例类 case class user() class user() @Test def test2() : Unit ={ val list = List(1, 2, 3, 4) val rdd: RDD[Int] = sparkContext.makeRDD(list, 2) val user = new User rdd.foreach( x => { println(user) }) }

8.1.4 测试3

自定义一个类测试:

class User1(age :Int=20){ // 过滤RDD中,小于20的元素 def fun1(rdd:RDD[Int]):RDD[Int]={ rdd.filter( x => { rdd.filter( x => x < age) }) } } @Test def test3() : Unit ={ val list = List(1, 2, 3, 4) val rdd: RDD[Int] = sparkContext.makeRDD(list, 2) val user = new User1() println(user.fun1(rdd).collect().mkString(",")) }//在调user的fun1方法的时候用到了age,age是user1的一个属性,在执行fun1时会调用filter算子,使用了user1类型的对象user中的age属性,又因为user1没有序列化,,所以报错 /* 属性不能单独存在,必须依附于对象存在 */

报错!!!!!!!!!!

org.apache.spark.SparkException: Task not serializable 说明:算子存在闭包,闭包中使用的变量所在的类没有实现序列化接口! 在执行fun1,会调用filter算子,使用了User1类型的对象user中的age属性! 闭包变量: user对象 解决: 将user对象 ,创建一个副本,序列化发送给 Executor 属性必须依附于对象存在! //报错解决: ①User1 实现序列化 ②User1 不实现序列化 , 让闭包变量和 User1类脱离关系 闭包变量不能是 User1的成员。 可以使用局部变量来引用 User1中成员的值! 在算子中只使用 局部变量!

解决1:

--User1 实现序列化

解决2:

--User1不实现序列化,让闭包变量和User1类脱离关系 class User1(age :Int=20){ // 过滤RDD中,小于20的元素 def fun1(rdd:RDD[Int]):RDD[Int]={ rdd.filter( x => { // 20称为 变量 rdd.filter( x => x < 20) }) } } @Test def test3() : Unit ={ val list = List(1, 2, 3, 4) val rdd: RDD[Int] = sparkContext.makeRDD(list, 2) val user = new User1() println(user.fun1(rdd).collect().mkString(",")) }

解决3:

--闭包变量不能是User1的成员,可以使用局部变量来引用User1中成员的值!在算子中只使用局部变量 class User1(age :Int=20){ // 过滤RDD中,小于20的元素 def fun1(rdd:RDD[Int]):RDD[Int]={ // 使用局部变量接受age的值 val myNum = age rdd.filter( x => { x < myNum /* 这时候使用的是myNum,并不是User1的属性age 相当于堆里面有一个20,User1类的属性age指向了20 我们在栈中new了一个myNum指向了20 这时候就不需要调用User1了,就不会报错了 */ }) } } @Test def test3() : Unit ={ val list = List(1, 2, 3, 4) val rdd: RDD[Int] = sparkContext.makeRDD(list, 2) val user = new User1() println(user.fun1(rdd).collect().mkString(",")) }

8.1.5 测试4

// test4使用 class User3(age :Int=20){ def myFilter(x:Int):Boolean={ x < 20 } // 过滤RDD中,小于20的元素 def fun1(rdd:RDD[Int]):RDD[Int]={ rdd.filter(myFilter) } } @Test def test4() : Unit ={ val list = List(1, 2, 3, 4) val rdd: RDD[Int] = sparkContext.makeRDD(list, 2) val user = new User3() println(user.fun1(rdd).collect().mkString(",")) } /* 算子用到了myFilter方法,他是User3的方法是类的成员,因为该类没有序列化所以报错 */

报错!!!!!!!!!!

org.apache.spark.SparkException: Task not serializable 方法 是 类的成员! 解决思路: 闭包变量和 User3 没有关系! 使用函数 代替 方法! 也可以使用匿名函数!更简单!

方法和函数的区别:

--方法和函数的区别: 1.加载时间不一样:函数是对象,定义在哪里就在哪里加载,方法是随着类的加载而加载 2.方法可以重载,函数不可以 3.方法是保存在方法区,函数是保存在堆 如果方法定义在方法里面,就是相当于时函数 方法可以转成函数: 方法名 _

解决1:将方法定义在算子里,就是函数了

class User3(age :Int=20){ def myFilter(x:Int):Boolean={ x < 20 } // 过滤RDD中,小于20的元素 def fun1(rdd:RDD[Int]):RDD[Int]={ //是函数 def myFilter(x:Int):Boolean={ x < 20 } rdd.filter(myFilter) /* 这里的myFilter用的是最近的这个函数的myFilter,该函数不是User3的成员所以没关系 */ } } @Test def test4() : Unit ={ val list = List(1, 2, 3, 4) val rdd: RDD[Int] = sparkContext.makeRDD(list, 2) val user = new User3() println(user.fun1(rdd).collect().mkString(",")) }

解决2:也可以使用匿名函数

class User3(age :Int=20){ def myFilter(x:Int):Boolean={ x < 20 } // 过滤RDD中,小于20的元素 def fun1(rdd:RDD[Int]):RDD[Int]={ //是函数 def myFilter(x:Int):Boolean={ x < 20 } rdd.filter(x => x<20) /* 这里的myFilter用的是最近的这个函数的myFilter,该函数不是User3的成员所以没关系 */ } } @Test def test4() : Unit ={ val list = List(1, 2, 3, 4) val rdd: RDD[Int] = sparkContext.makeRDD(list, 2) val user = new User3() println(user.fun1(rdd).collect().mkString(",")) }

8.2 kryo

8.2.1 引入

java.io.Serializable 在JDK 1.1 设计 2000年之前,没有大数据! 没有考虑到数据量大的话,使用Serializable序列化的数据,会保存很多额外的无用信息! 在大数据处理中,只关心数据的值,不关心数据的子父继承关系,数据类所在的包的关系等等这些信息 Spark推荐使用kryo!kryo可以节省十倍空间!在网络中传输效率提高十倍 java.io.Serializable:658.0 B 体会kryo: 告诉spark,哪些类需要使用kyro序列化! kryo: 302.0 B

8.2.2 测试:

//样例类 case class User2(age:Int=10,name:String= "fhaowiehfaoiwhfoiua;whfeiawofh oi;aweh foiawhfoikjauwhfoi;auwhfeoiu;awehfaoiu;wehfaoiewu;fh") @Test def testKryo() : Unit ={ val list = List( User2(), User2(), User2(), User2(), User2(), User2(), User2(), User2(), User2(), User2(), User2(), User2(), User2(), User2(), User2(), User2(), User2(), User2(), User2(), User2(), User2(), User2(), User2(), User2(), User2(), User2(), User2(), User2(), User2(), User2(), User2(), User2(), User2(), User2(), User2(), User2(), User2(), User2(), User2(), User2() ) val rdd: RDD[User2] = sparkContext.makeRDD(list, 2) // 使用带shuffle的算子测试序列化 val rdd1: RDD[(User2, Iterable[User2])] = rdd.groupBy(x => x) rdd1.collect() Thread.sleep(100000000) }

需要在sparkConf里设置:【kryo的设置】

val sparkContext = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("My app") // User2使用kryo进行序列化 .registerKryoClasses(Array(classOf[User2])) )

源码:

/** * Use Kryo serialization and register the given set of classes with Kryo. * If called multiple times, this will append the classes from all calls together. */ def registerKryoClasses(classes: Array[Class[_]]): SparkConf = {//需要放到参数是一个数组,数组里面是一堆class类型的对象 val allClassNames = new LinkedHashSet[String]() allClassNames ++= get(KRYO_CLASSES_TO_REGISTER).map(_.trim) .filter(!_.isEmpty) allClassNames ++= classes.map(_.getName) set(KRYO_CLASSES_TO_REGISTER, allClassNames.toSeq) set(SERIALIZER, classOf[KryoSerializer].getName) this }

九、共享变量 【编程模型】

9.1、广播变量 【编程模型之二,它很重要!!!】

9.1.1 概念

广播变量和累加器 都是编程模型,都是使用SparkContext进行创建! --广播变量: 应用的领域是在集群间 共享一个大的只读变量! --原理: 将广播变量发送给每个Executor而不是以副本的形式,传输给每个Task!

测试:

@Test def testBroadCast() = { val list1 = List(1,2,3,4) val range = Range(1,100,1) val rdd : RDD[Int] = sparkContext.makeRDD(list1,4) val rdd2:RDD[Int] = rdd.filter(x => bc.value.contains(x)) println(rdd2.collect().mkString(",")) }

改进:

@Test def testBroadCast() = { val list1 = List(1,2,3,4) val range = Range(1,100,1) val rdd : RDD[Int] = sparkContext.makeRDD(list1,4) /* 将RDD中的元素在Range中的元素过滤出来 存在闭包是,range会以副本的形式创建!之后序列化发送给每一个Task 4个分区,4个Task,每个Task都需要一个Range的副本 Range就需要复制四份,分别发给每一个Task task运行时,Range是需要放到Task所在的栈空间的 Task是在线程上运行的,每个线程都有一个方法栈,那么Range就要放到这个方法栈里 问题?Range是只读的,那么是不是可以让Range只发送一份,就是一个Executor一份,放到Executor的堆内存里面,当Executor运行每个Task时,就让每个Task去堆里面读取这一份变量,这样的好处就是打打节省了网络IO,可以用广播变量实现 */ //改进,将大的共享的只读变量range,使用广播变量的形式进行广播 val bc:Broadcast[Range] = sparkContext.broadcast(range) val rdd2:RDD[Int] = rdd.filter(x => bc.value.contains(x)) println(rdd2.collect().mkString(",")) }

Driver的类的变量,属性或方法

创建多个副本以序列化的形式发给executor

9.1.2 使用

创建:val bc = sc.broadcast(v) 访问v: bc.value 注意: 一旦创建了广播变量,不能修改,不能再使用v,而是使用bc! 在executor端,可以使用unpersist()释放变量!释放后,如果后续的代码又使用了bc,Driver会重新广播!可以使用destroy永久释放变量! val bc:Broadcast[Range] = sparkContext.broadcast(range) val rdd2:RDD[Int] = rdd.filter(x => bc.value.contains(x)) //创建了广播变量,不能修改,不能在使用v,而是使用bc

9.2、累加器【编程模型之三,它很重要!!!】

9.2.1 概念

完成计数(MR中Counter),还可以完成累加的操作! spark默认提供了数值类型的累加器!用户可以自定义! 累加器可以启名字,在webUI界面可以查看累加器的累加过程! SparkContext.longAccumulator() : 创建Long类型的累加器 SparkContext.DoubleAccumulator() : 创建Double类型的累加器 累加:累加器 . add 早task端只能累加,不能读! 读: 累加器.value 在Driver端调用 用户可以通过继承 AccumulatorV2, 创建自己的累加器类型! 在继承时,必须重写父类中的某些方法: reset : 将累加器重置归0 add : 累加value到累加器上 merge : 合并两个累加器的值到一个累加器 自定义累加器,需要注册: sc . register (累加器对象, “ 累加器名称”) spark能保证在累加时不会多加也不会少加

9.2.2 源码

/** * The base class for accumulators, that can accumulate inputs of type `IN`, and produce output of * type `OUT`. * * `OUT` should be a type that can be read atomically (e.g., Int, Long), or thread-safely * (e.g., synchronized collections) because it will be read from other threads. */ 译文: / ** *累加器的基类,可以累加“ IN”类型的输入,并产生“ OUT”类型的输出。 * *`OUT`应该是可以自动读取的类型(例如IntLong)或线程安全的类型*(例如同步的集合),因为它将从其他线程读取。 * / abstract class AccumulatorV2[IN, OUT] /* IN: add(x) 累加的x的类型 OUT: value的返回值的类型,输出的类型 */ extends Serializable { private[spark] var metadata: AccumulatorMetadata = _ private[this] var atDriverSide = true private[spark] def register( sc: SparkContext, name: Option[String] = None, countFailedValues: Boolean = false): Unit = { if (this.metadata != null) { throw new IllegalStateException("Cannot register an Accumulator twice.") } this.metadata = AccumulatorMetadata(AccumulatorContext.newId(), name, countFailedValues) AccumulatorContext.register(this) sc.cleaner.foreach(_.registerAccumulatorForCleanup(this)) }

自定义累加器模板:

class MyIntAccumulator extends AccumulatorV2[Int, Int] { override def isZero: Boolean = ??? override def copy(): AccumulatorV2[Int, Int] = ??? override def reset(): Unit = ??? override def add(v: Int): Unit = ??? override def merge(other: AccumulatorV2[Int, Int]): Unit = ??? override def value: Int = ??? }

9.2.3 测试

9.2.3.1 累加器原理

测试1:

【用一个例子先引入】

@Test def testACC1() : Unit ={ val list = List(1, 2, 3, 4) val rdd: RDD[Int] = sparkContext.makeRDD(list, 2) var sum:Int =0 //在Driver中定义 rdd.foreach( x => sum += x) //这里的sum是在Executor端 // 0 println(sum) //打印的还是Driver端的,要想获得Exector端累加之后的值,就需要用到累加器 }
测试2:

【用累加器】

@Test def testACC2() : Unit ={ val list = List(1, 2, 3, 4) val rdd: RDD[Int] = sparkContext.makeRDD(list, 2) //创建累加器 根矩类型创建 val mysumACC: LongAccumulator = sparkContext.longAccumulator("mysum") // 调用add方法累加 rdd.foreach( x => mysumACC.add(x)) // 查看结果 value 为10 println(mysumACC.value) Thread.sleep(1000000) }

用监控界面查看一下结果:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-3PzlbQLD-1603280050883)(https://i.loli.net/2020/10/21/V4BsCAQwMSIrkJD.png)]

看一下累加器原理:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-AwIsLbdZ-1603280050885)(Spark0621(3.0).assets/image-20200927233703329.png)]

9.2.3.2 自定义累加器

9.2.3.2.1 自定义一个Int类型的累加器
/* IN: add(x) 累加的x的类型 OUT: value的返回值的类型,输出的类型 */

自定义累加器模板,需要继承AccumulatorV2

测试3:

【正常使用】

/*自定义累加器*/ class MyIntAccumulator extends AccumulatorV2[Int, Int] { // 累加的结果 private var sum :Int =0 // 当前累加器是否已经归0 override def isZero: Boolean = sum == 0 // 创建当前累加器的一个副本 override def copy(): AccumulatorV2[Int, Int] = new MyIntAccumulator // 重置归0 override def reset(): Unit = sum=0 // 累加 override def add(v: Int): Unit = sum += v // 将other中的值合并到当前累加器 override def merge(other: AccumulatorV2[Int, Int]): Unit = sum += other.value // 访问最终累加的结果 override def value: Int = sum } /*使用*/ @Test def testACC3() : Unit ={ val list = List(1, 2, 3, 4) val rdd: RDD[Int] = sparkContext.makeRDD(list, 2) //创建累加器 val accumulator = new MyIntAccumulator //自定义的需要自己注册 sparkContext.register(accumulator,"MySumInt") // 调用add方法累加 rdd.foreach( x => accumulator.add(x)) // 查看结果 value 10 println(accumulator.value) Thread.sleep(1000000) }
测试4:

【累加器如果没有归0,报错:】

/*自定义累加器*/ class MyIntAccumulator extends AccumulatorV2[Int, Int] { // 累加的结果 private var sum :Int =0 // 当前累加器是否已经归0 override def isZero: Boolean = false //sum == 0 【与测试3相比此处修改】 // 创建当前累加器的一个副本 override def copy(): AccumulatorV2[Int, Int] = new MyIntAccumulator // 重置归0 override def reset(): Unit = sum=0 // 累加 override def add(v: Int): Unit = sum += v // 将other中的值合并到当前累加器 override def merge(other: AccumulatorV2[Int, Int]): Unit = sum += other.value // 访问最终累加的结果 override def value: Int = sum } /*使用*/ @Test def testACC3() : Unit ={ val list = List(1, 2, 3, 4) val rdd: RDD[Int] = sparkContext.makeRDD(list, 2) //创建累加器 val accumulator = new MyIntAccumulator //自定义的需要自己注册 sparkContext.register(accumulator,"MySumInt") // 调用add方法累加 rdd.foreach( x => accumulator.add(x)) // 查看结果 value 10 println(accumulator.value) Thread.sleep(1000000) } 举个例子; 当前有两分区,都要执行累加的逻辑,累加器是在Driver端创建的,两个Task都需要使用这个累加器, 第一步:先拿Driver端的累加器调它的copy()方法, copy完, 第二步:再找reset方法,接着判断是否归零成功了,没问题了就发给Task,又到下一个Task,继续执行

会报错,追踪一下源码:

copyAndReset must return a zero value copy atorg.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:166)

进入AccumulatorV2.scala:166

val copyAcc = copyAndReset() copyAcc当前累加器的一个副本,copyAcc调copyAndReset() 方法返回一个累加器 assert( copyAcc.isZero, //判断是否为0 "copyAndReset must return a zero value copy")

进入copyAndReset() 方法:

/** * Creates a new copy of this accumulator, 【创建当前累加器的新的一个副本】 which is zero value. i.e. call `isZero` on the copy 【新的副本必须归零】 * must return true. */ def copyAndReset(): AccumulatorV2[IN, OUT] = { val copyAcc = copy() //复制一个副本 copyAcc.reset() //将当前副本归零 copyAcc }

得知报错原因:

累加器如果没有归0,报错: java.lang.AssertionError: assertion failed: copyAndReset must return a zero value copy val copyAcc = copyAndReset() val copyAcc = copy() copyAcc.reset() copyAcc assert(copyAcc.isZero, "copyAndReset must return a zero value copy") 累加器在Job被提交时,经过: ①copy(): 复制一个副本对象 ②reset() : 将当前副本重置归0 ③isZero() : 判断是否已经归0 如果归0,继续执行! 归0失败,报错! Job就无法提交!
测试5:

【初始值为10】

/*自定义累加器*/ class MyIntAccumulator extends AccumulatorV2[Int, Int] { // 累加的结果 private var sum :Int =10 【此处将初始值改为10// 当前累加器是否已经归0 override def isZero: Boolean = sum == 0 // 创建当前累加器的一个副本 override def copy(): AccumulatorV2[Int, Int] = new MyIntAccumulator // 重置归0 override def reset(): Unit = sum=0 // 累加 override def add(v: Int): Unit = sum += v // 将other中的值合并到当前累加器 override def merge(other: AccumulatorV2[Int, Int]): Unit = sum += other.value // 访问最终累加的结果 override def value: Int = sum } /*使用*/ @Test def testACC3() : Unit ={ val list = List(1, 2, 3, 4) val rdd: RDD[Int] = sparkContext.makeRDD(list, 2) //创建累加器 sum初始值默认为10 val accumulator = new MyIntAccumulator //自定义的需要自己注册 sparkContext.register(accumulator,"MySumInt") // 调用add方法累加 rdd.foreach( x => accumulator.add(x)) // 查看结果 value 10 println(accumulator.value) Thread.sleep(1000000) }

结果;

20

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-bTJhnMb8-1603280050886)(Spark0621(3.0).assets/image-20200928002304653.png)]

Task端确实重置为0了,但我Driver端还有值,所以合并是20

注意;

累加时,别忘记,Driver端还有一个值
9.2.3.2.2 自定义单词统计累加器

数据:

hi hi hi hello hello hello nice to meet you nice to meet you too goodbye goodbye /* 自定义累加单词的累加器 输入: 单词 输出: 所有单词的统计结果 Map[(单词,计数)] */ class WCAcc extends AccumulatorV2[String, mutable.Map[String,Int]] { //输入是String,输出是 // 构建一个返回值Map private var result:mutable.Map[String,Int] = mutable.Map[String,Int]() //重置归零 override def isZero: Boolean = result.isEmpty //创建副本 override def copy(): AccumulatorV2[String, mutable.Map[String, Int]] = new WCAcc //重置归零,判断是否归零 override def reset(): Unit = result.clear() //清空map //累加 override def add(v: String): Unit = { 【首先要判断要加入到该元素是否存在】 【写法就是,从结果集去该元素,如果没有就给0,如果有就加1val newValue: Int = result.getOrElse(v, 0) + 1 //再将新的key值V,和新的value值,加入结果集 result.put(v,newValue) } //合并 override def merge(other: AccumulatorV2[String, mutable.Map[String, Int]]): Unit = { //取出 other 中的map,要合并的map val toMergeMap: mutable.Map[String, Int] = other.value //遍历map,里面用模式匹配 for ((key , value) <- toMergeMap) { 【存在这样的情况,本分区有,而另一个分区没有的元素该怎么合并,同上】 //先从当前result中取出 key对应的值 val newValue: Int = result.getOrElse(key, 0) + value // 插入或更新 result.put(key,newValue) } } //返回值 override def value: mutable.Map[String, Int] = result }

使用

/* 【 在某些累加求和的场景, 可以使用累加器替代reduceByKey,并行累加,避免shuffle! 】 */ @Test def testWordCountACC() : Unit ={ val rdd: RDD[String] = sparkContext.textFile("input/data1.txt") // RDD[单词] val rdd1: RDD[String] = rdd.flatMap(line => line.split(" ")) // 之前使用reduceByKey进行合并,现在使用累加器,对单词进行累加 val acc = new WCAcc //注册 sparkContext.register(acc,"wc") // 使用累加器累加每一个单词 rdd1.foreach( word => acc.add(word)) //打印累加的结果 println(acc.value) }

9.3 总结

--一、什么是累加器? 分布式共享只写变量,使用累加器完成数据的累加 1.分布式:每个Executor都拥有这个累加器 2.共享:Driver中的变量原封不动的被Executor拥有一份副本 3.只写:同一个Executor中可以对这个变量进行改造,其它的executor不能读取。 --二、累加器用来解决什么问题? 1.想通过没有shuffle过程的算子来实现数据的累加,"所谓累加器,一般作用就是累加(可以是数值的累加,也可以是数据的累加)",我们实现的方式是:在driver代码中,声明一个变量"类似一个容器",来进行接收累加的结果,但是发现,当前情况,driver端的变量传递给execuotor以后,并在executor进行计算,该变量无法返回到driver 原因是: a.driver端能够传递给到executor,是因为存在闭包的原因 b.executor不能传递过来是因为没有闭包的原因 因此:我们使用了累加器的方式,将上述声明的变量封装成累加器的方式,使得executor端计算的累加结果能够传回到driver端。 --三、累加器实现过程 累加器用来把executor端变量信息聚合到Driver端,在Driver程序中定义的变量,在Executor端的每个Task都会得到这个变量的一份新的副本,每个Task更新这些副本值后,传回Driver端进行merge

十、RDD持久化

10.1 cache

10.1.1 缓存引入

SSM Mybatis ORM映射框架! object relational mapping : 对象和关系映射 一份数据在Java程序中,必须以属性的形式保存在对象模型中! 数据: jack,20,male 建模: JavaBean Person{ String name; int age; String gender; } Java中数据的表示 Person p=new Person("jack",20,"male"); 关系型数据库 建模 : 表 create table person( varchar(20) name, int age, varchar(20) gender ) DB中数据的表示: 一行记录 name age gender jack 20 male 场景: 用户访问一个web页面 --------> 在页面注册 ----------> 向后台的Java应用发送了一个请求 -------> Java应用程序处理请求 ------->从Http协议的报文中取出用户在页面填写的信息 jack,20,male ----- Person p=new Person("jack",20,"male"); -------> java程序中,将信息持久化到数据库中 ------JDBC 封装SQL // 麻烦 ------ORM 框架 xxxx.save(p) // 自动帮你封装SQL,自动执行数据的插入和更新,查询等 增删改: 对象 ----映射---> 表中记录 查询: 表中的记录 --- 映射----> 对象

10.1.2 缓存基本介绍

--缓存: 1.用来加快对重复元素的查询效率 2.为了加快查询效率,缓存多使用内存作为存储介质! (优秀的缓存产品: ehcache , redis , memcache) --缓存都会有淘汰策略: 1.LRU(Spark):less recent use 最近最少使用 2.TTL : 基于过期时间的淘汰策略,每个数据在进入缓存时,都可以指定一个过期时间 A (ttl = 2周) B (ttl = 1天) 3.Random : 随机策略

10.1.3 缓存的使用

--使用: 1.将需要缓存的数据,存入缓存 2.使用时,从缓存中取出数据

测试:

//未加缓存时: @Test def testCache() : Unit ={ val list = List(1, 2, 3, 4) val rdd: RDD[Int] = sparkContext.makeRDD(list, 2) val rdd1: RDD[Int] = rdd.map(x => x) .map(x => x) .map(x => x) .map(x => x) .map(x => x) .map(x => x) .map(x => x) .map(x => x) .map(x => x) .map(x => x) .map(x => { println("进行了若干次的map操作") x }) //把血缘关系打印一下 println(rdd1.toDebugstring) //将人rdd1缓存起来 rdd1.coach() println(rdd1.collect().mkString(",")) println(rdd1.collect().mkString(",")) println(rdd1.collect().mkString(",")) println("------------------") rdd1.saveAsTextFile("output") } //结果 共打印16次 进行了若干次的map操作 //改进,将RDD1缓存起来,再看结果 只打印一次【说明第一次job提交时,会将数据缓存起来】

看一下catch的源码:

def cache(): this.type = persist() def persist(): this.type = persist(StorageLevel.MEMORY_ONLY) def persist(newLevel: StorageLevel): this.type = {...} //此方法指定数据在缓存时使用什么样的形式缓存!

由此可知:

def persist(newLevel: StorageLevel): 指定数据在缓存时使用什么样的形式缓存! 参考 StorageLevel(缓存级别)! cache()===>persist() ====>persist(StorageLevel.MEMORY_ONLY) StorageLevel可以通过以下参数构造: 【即,缓存级别可以通过persist来指定】 class StorageLevel private( private var _useDisk: Boolean, //是否使用磁盘来缓存数据 private var _useMemory: Boolean, //是否使用内存磁盘来缓存数据 private var _useOffHeap: Boolean, //是否使用堆外内存(JVM管理不到的,向OS申请的内存)磁盘来缓存数据 private var _deserialized: Boolean, //缓存的数据是否以序列化的字节形式存储 private var _replication: Int = 1) //指定缓存对象的副本数量 extends Externalizable 1.缓存不会改变血缘关系!原因在于缓存的不可靠性 2.在出现shuffle时,如果一个Job Map阶段产生的文件会被之后的job使用,那么这些文件会一直保留 3.可以理解为 Map阶段的输出缓存在了磁盘

10.1.4 总结

--cache的作用: 加速查询效率,缓存多食用内存作为存储设备! 在spark中,缓存可以用来缓存已经计算好的RDD,避免相同RDD的重复计算! --使用: rdd.cache() 等价于==> rdd.persist() 等价于==> rdd.persist(Storage.MEMORY_ONLY) rdd.persist ( 缓存的级别 ) --①-- 缓存会在第一个job提交时,触发,之后相同的job如果会基于缓存的RDD进行计算,优先从缓存中读取RDD! --②-- 缓存多使用内存作为存储设备,内存是有限的,因此缓存多有淘汰策略,在Spark中默认使用LRU(less recent use) 算法淘汰缓存中的数据! --③-- 缓存因为不可靠性,所以并不会影响血缘关系! --④-- 带有shuffle的算子。会自动将MapTask阶段溢写到磁盘文件,一直保存。如果有相同的job使用到了这个文件,此时这个Map阶段可以被跳过,可以理解为使用到了缓存

因为cache不可靠,那么怎么解决呢?

10.2 checkpoint

10.2.1 基本介绍

chockpoint解决缓存不可靠的问题 cache为了效率,优先使用内存作为缓存介质,内存不可靠! chockpoint 可以将要缓存的数据或状态,持久化到文件系统,(以文件形式存储到磁盘)! 【即使程序蹦了,重启之后他依旧可以在磁盘文件系统中读取该数据】 RDD可以以文件形式保存CK目录中,CK目录可以通过SparkContext#setCheckpointDir设置! 在集群中运行Job时,CK目录必须是HDFS上的路径! RDD如果通过CK的方式持久化,会切断和父RDD的血缘关系! 【为什么会切断,因为它是可靠的】 --问:切断了,怎么找血缘关系呢? --答:切断了你要读取数据,就从一个叫ReliableCheckpointRDD中转换,这个RDD是MapPartitionsRDD的一种 --建议: 先将缓存的RDD保存到cache中,这样就可以避免checkpoint的job重复计算RDD!

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-gkSBva3P-1603280050887)(Spark0621(3.0).assets/image-20200928194115781.png)]

10.2.2 测试

/** * Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint * directory set with `SparkContext#setCheckpointDir` and all references to its parent * RDDs will be removed. This function must be called before any job has been * executed on this RDD. It is strongly recommended that this RDD is persisted in * memory, otherwise saving it on a file will require recomputation. */ 译文: 将此RDD标记为检查点。 它将被保存到由`SparkContext#setCheckpointDir` 设置的checkpoint目录内的文件中, //并且所有对其父RDD的引用都将被删除。【重点】 在此RDD上执行任何作业之前,必须先调用此函数。 //强烈建议将此RDD保留在内存中,否则将其保存在文件中将需要重新计算。 【即,chockpoint之后,最好缓存一下】 def checkpoint(): Unit = RDDCheckpointData.synchronized @Test def testCk() : Unit ={ val list = List(1, 2, 3, 4) val rdd: RDD[Int] = sparkContext.makeRDD(list, 2) val rdd1: RDD[Int] = rdd.map(x => x) .map(x => x) .map(x => x) .map(x => x) .map(x => x) .map(x => x) .map(x => x) .map(x => x) .map(x => x) .map(x => x) .map(x => { println("进行了若干次的map操作") x }) // 设置CK目录 sparkContext.setCheckpointDir("ck") // 持久化 //【也会提交一个Job,将rdd1持久化到ck目录中,由行动算子触发提交】 rdd1.checkpoint() //持久化之后最好缓存一下,这样就不会重复计算了,就是下面第一个行动算子collect只触发一个自己的job rdd1.cache() println(rdd1.collect().mkString(",")) //如果前面chockpoint之后没有缓存,这里就相当于在collect时,触发了两个job println("---------------- 血缘关系-------------") println(rdd1.toDebugString) println("------------------") // 从ck目录中读取rdd1 rdd1.saveAsTextFile("output") Thread.sleep(10000000) }

10.2.3 总结

checkpoint是为了解决cache的不可靠性! 设置了chechpoint的目录之后,job可以将当前RDD的数据,或job的状态持久化到文件系统中 作用: ①:文件系统中读取已经缓存的RDD ②当job失败,需要重试时,从文件系统中获取之前job运行的状态,基于状态恢复运行 【】 使用: SparkContext.setCheckpointDir(如果是在集群中运行,必须使用HDFS上的路径) rdd.checkpoint() checkpoint会在第一个job提交时,额外提交一个job进行计算,如果要避免重复计算,可以和cache结合使用 coach和checkpoint都需要行动算子触发,所以将他们定义在行动算子前运算都OK。

十一、RDD文件读取不同数据源的数据

11.1 RDD读写文本文件【重要!!!】

11.1.1 基本介绍

读: SparkContext.textFile() new HadoopRDD: 用的是mapred.xxx 默认是:mapred.TextInputFormat 更熟悉的是: mapreduce.TextInputFormat NewHadoopRDD: mapreduce.TextInputFormat 写: RDD.saveAsTextFile()

11.1.2 源码

追踪一下 textFile源码:

def textFile( path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = withScope { assertNotStopped() hadoopFile( path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minPartitions ) .map(pair =>pair._2.toString) .setName(path) }

可见textFile调用了hadoopFile方法:

def hadoopFile[K, V]( path: String, inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope { assertNotStopped() FileSystem.getLocal(hadoopConfiguration) val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration)) val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path) new HadoopRDD( // 【用的是mapred.xxx 】 this, confBroadcast, Some(setInputPathsFunc), inputFormatClass, keyClass, valueClass, minPartitions).setName(path) }

然后可见hadoopFile方法new了一个HadoopRDD。 用的是老的mapred.xxx 默认是:mapred.TextInputFormat,更熟悉的是:mapreduce.TextInputFormat

那么mapreduce.TextInputFormat在哪呢?

在RDD下有一个NewHadoopRDD

/** * :: DeveloperApi :: * An RDD that provides core functionality for reading data stored in Hadoop (e.g., files in HDFS, * sources in HBase, or S3), using the new MapReduce API 【用的是新的mapreduce的API】(`org.apache.hadoop.mapreduce`). * * @param sc The SparkContext to associate the RDD with. * @param inputFormatClass Storage format of the data to be read. * @param keyClass Class of the key associated with the inputFormatClass. * @param valueClass Class of the value associated with the inputFormatClass. * * @note Instantiating this class directly is not recommended, please use * `org.apache.spark.SparkContext.newAPIHadoopRDD()` */

new一个NewHadoopRDD需要用这样一个构造器:

@DeveloperApi class NewHadoopRDD[K, V]( sc : SparkContext, inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], @transient private val _conf: Configuration) extends RDD[(K, V)](sc, Nil) with Logging

11.1.3 测试:

@Test def testTextFile() : Unit ={ //val rdd: RDD[String] = sparkContext.textFile("input/data1.txt") //val inputFormat = new TextInputFormat[LongWritable, Text]() val conf = new Configuration() val job: Job = Job.getInstance(conf) FileInputFormat.setInputPaths(job,"input/data1.txt") // TextInputFormat 的 createRecordReader 返回的 RR的泛型 val rdd = new NewHadoopRDD[LongWritable, Text] ( sparkContext, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], job.getConfiguration ) rdd.saveAsTextFile("output") }

11.2 RDD读写对象文件

11.2.1 基本介绍

读写对象文件: 写: RDD.saveAsObjectFile() 读: sparkContext.objectFile

11.2.2 测试

@Test def test() : Unit ={ val list = List(1, 2, 3, 4) val rdd: RDD[Int] = sparkContext.makeRDD(list, 2) //写 rdd.saveAsObjectFile("output") //读 指定 对象文件中保存的数据的类型 val rdd1: RDD[Int] = sparkContext.objectFile[Int]("output") println(rdd1.collect().mkString(",")) }

11.3 RDD读写Sf文件

11.3.1 基本介绍

读写SF文件: 写: RDD.saveAsSequenceFile() 读: sparkContext.sequenceFile

11.3.2 测试

def sequenceFile[K, V]( path: String, keyClass: Class[K], valueClass: Class[V]): RDD[(K, V)] = withScope { assertNotStopped() sequenceFile(path, keyClass, valueClass, defaultMinPartitions) } @Test def testSF() : Unit ={ val list = List(1, 2, 3, 4) val rdd: RDD[Int] = sparkContext.makeRDD(list, 2) val rdd1: RDD[(Int, Int)] = rdd.map(x => (x, 1)) //写 rdd1.saveAsSequenceFile("output") // 读的时候,需要指定 SF中 K-V对的类型 println( sparkContext.sequenceFile[Int, Int] ("output").collect() .mkString(",")) }

11.3 RDD读写MySql数据库文件 【重要!!!】

11.3.1 基本介绍

//一个RDD,它在JDBC连接上执行SQL查询并读取结果。 读: new JDBCRDD() 写: RDD.foreachePartition()

11.3.2 源码

jdbcRDD:

/** An RDD that executes a SQL query on a JDBC connection and reads results.For usage example, see test case JdbcRDDSuite. @param getConnection a function that returns an open Connection. The RDD takes care of closing the connection. @param sql the text of the query. The query must contain two ? placeholders for parameters used to partition the results. For example, {{{ select title, author from books where ? <= id and id <= ? }}} @param lowerBound the minimum value of the first placeholder @param upperBound the maximum value of the second placeholder The lower and upper bounds are inclusive. @param numPartitions the number of partitions. Given a lowerBound of 1, an upperBound of 20, and a numPartitions of 2, the query would be executed twice, once with (1, 10) and once with (11, 20) @param mapRow a function from a ResultSet to a single row of the desired result type(s). This should only call getInt, getString, etc; the RDD takes care of calling next. The default maps a ResultSet to an array of Object. */ 译文: 一个RDD,它在JDBC连接上执行SQL查询并读取结果。 有关用法示例,请参见测试用例JdbcRDDSuite。 @param getConnection一个返回打开的Connection的函数。RDD负责关闭连接。 @param sql查询的文本。查询必须包含两个?用于对结果进行分区的参数的占位符。 例如, {{{选择书名,作者从书中何处? <= id和id <=}}} @param lowerBound第一个占位符的最小值 @param upperBound第二个占位符的最大值上下限包括在内。 @param numPartitions分区数。给定lowerBound为1,upperBound为20,numPartitions为2,查询将执行两次,一次使用(110),一次使用(1120@param mapRow ResultSet中的函数到所需结果类型的单行。这应该只调用getInt,getString等; RDD负责下一步呼叫。默认值将ResultSet映射到Object数组。 * / class JdbcRDD[T: ClassTag]( sc: SparkContext, //sparkContext getConnection: () => Connection, //函数用于创建Connection sql: String, //要执行的查询的sql语句,sql中需要设置两个占位符 lowerBound: Long, //下界,用来填充小的占位符 upperBound: Long, //上界,用来填充大的占位符,取上下界的区间 numPartitions: Int, //跟分区数 mapRow: (ResultSet) => T =JdbcRDD.resultSetToObjectArray _) //不能调用next(),只需要调用ResultSet.getString()...+...Result.getInt() extends RDD[T]

11.3.3 测试

11.3.3.1读

case class User(id:Int,name:String,age:Int,sex:String) @Test def testJDBCRead() : Unit ={ val rdd = new JdbcRDD[User](sparkContext, () => DriverManager.getConnection("jdbc:mysql://localhost:3306/demon2", "root", "123321"), "select * from student where id >= ? and id <= ?", 2, 4, 2, (rs: ResultSet) => { User(rs.getInt("id"), rs.getString("name"), rs.getInt("age"),rs.getString("sex")) } ) rdd.saveAsTextFile("output") }

11.3.3.2 写

case class User(id:Int,name:String,age:Int,sex:String) @Test def testJDBCWrite():Unit = { val list = List(User(0,"ee",24,"男"),User(0,"ff","25","女")) val rdd:RDD[User] = sparkcontext.makeRDD(list,2) //将RDD中的数据写入mysql rdd.foreachPartition(iter => { //批处理,一个分区只需要创建一个Connection val connection: Connection = DriverManager.getConnection("jdbc:mysql//localhost:3306/demon2","root","123321") //准备sql val sql = """ | insert into student(name,age,sex) | value(?,?,?) | |""".stripMargin val ps:PreparedStatement = connection.prepareStatement(sql) //iter是当前分区的迭代器,对于迭代到的每个元素都写出 iter.foreach(user => { ps.setString(1,user.name) ps.setInt(2,user.age) ps.setString(3,user.sex) //执行语句 ps.executorUpdate() }) //分区全部写完,释放资源 ps.close() connection.close() }) }

11.4 RDD读写HBase的文件

11.4.1 基本介绍

读HBase:NewHadoopRDD 核心:指定一个输入格式 找读写HBase的输入格式: 读:TableInputFormat RR读取的K-V: ImmutableBytesWritable : RowKey对应的Byte[] Resule : rowkey对应的一行结果! 写: TableOutputFormat HBase核心API: Result : 一行结果! rawCells : 获取一行所有的单元格 Connection : 和集群的一个连接。创建 Admin,Table Admin : 执行DDL语言,例如建表,建库,删表 Table: 执行DML语言,增删改查 增: Table.put(Put) 删: Table.delete(Delete) 查: 查 单行 Table.get(Get) 查 多行 Table.scan(Scan) CellUtil: CellUtil.cloneXXX : 取出指定的字段的值 Bytes: 将基本数据类型和 Byte[] 进行转换 如何读写HBase: 连接HBase所在的zk地址

11.4.2 读

@Test def tetHBaseRDD() : Unit ={ // 默认只会读取 core-site.xml,hdfs-site.xml,mapred-site.xml,yarn-site.xml , 4 个xxx-default.xml // val configuration = new Configuration() // 不仅读取 hadoop的配置文件,还读取Hbase的配置文件 val conf: Configuration = HBaseConfiguration.create // 指定读取哪张表 conf.set(TableInputFormat.INPUT_TABLE,"t1") // 指定扫描的起始行 conf.set(TableInputFormat.SCAN_ROW_START,"r2") conf.set(TableInputFormat.SCAN_ROW_STOP,"r4") val rdd = new NewHadoopRDD[ImmutableBytesWritable, Result] ( sparkContext, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result], conf ) // 遍历Result,取出每行的单元格 rdd.foreach{ case (rowkey , result) => { //取出所有的单元格 val cells: Array[Cell] = result.rawCells() for (cell <- cells) { println( "rowkey:" + Bytes.toString(CellUtil.cloneRow(cell)) + "列名:" + Bytes.toString(CellUtil.cloneFamily(cell)) + ":" + Bytes.toString(CellUtil.cloneQualifier(cell)) + "value:" + Bytes.toString(CellUtil.cloneValue(cell)) ) } } } }

11.4.3 写

写hbase :saveAsNewAPIHadoopDataset,只有 RDD[k,v] 写: 使用TableOutputFormat [KEY, 自定义Mutation ] key: rowkey ImmutableBytesWritable value : Put @Test def testHBaseWrite() : Unit ={ val conf: Configuration = HBaseConfiguration.create // 向哪个表写 conf.set(TableOutputFormat.OUTPUT_TABLE,"t1") val job: Job = Job.getInstance(conf) // 设置用哪个输出格式写 job.setOutputFormatClass(classOf[TableOutputFormat[ImmutableBytesWritable]]) // 输出格式写的 key-value的类型 job.setOutputKeyClass(classOf[ImmutableBytesWritable]) job.setOutputValueClass(classOf[Put]) val list = List( ("r5", "cf1", "age", "20"), ("r5", "cf1", "name", "jack"), ("r6", "cf1", "age", "30"), ("r6", "cf1", "name", "tom") ) val rdd: RDD[(String, String, String, String)] = sparkContext.makeRDD(list, 2) // 封装为K-V结构 , K-V必须是 TableOutputFormat 写出的 K-V val rdd1: RDD[(ImmutableBytesWritable, Put)] = rdd.map { case (rowkey, cf, cq, value) => { // 写出的key val outKey = new ImmutableBytesWritable(Bytes.toBytes(rowkey)) val put = new Put(Bytes.toBytes(rowkey)) // 设置要写出的内容 添加一列 put.addColumn( Bytes.toBytes(cf), Bytes.toBytes(cq), Bytes.toBytes(value) ) (outKey, put) } } //这里必须是job.getConfiguration rdd1.saveAsNewAPIHadoopDataset(job.getConfiguration) }
最新回复(0)