行动算子用来提交Job!和转换算子不同的时,转换算子一般是懒执行的!转换算子需要行动算子触发!
测试:
@Test def testActionOperation() : Unit ={ val list = List(11, 2, 3, 4,5,5,5,5) val list2 = List((11,1), (2,1),(5,1),(5,1),(5,3)) val rdd: RDD[Int] = sparkContext.makeRDD(list, 2) val rdd2: RDD[(Int, Int)] = sparkContext.makeRDD(list2, 3) // rdd.saveAsObjectFile("output") rdd2.saveAsSequenceFile("output") // rdd.saveAsSequenceFile("output") /*println(rdd.countByValue()) println(rdd2.countByValue())*/ /* println(rdd2.countByKey())*/ println(rdd.reduce(_ + _)) rdd.collect() println(rdd.count()) println(rdd.first()) println(rdd.take(5).mkString(",")) println(rdd.takeOrdered(5)(Ordering[Int].reverse).mkString(",")) /*println(rdd.aggregate(10)(_ + _, _ + _)) println(rdd.fold(10)(_ + _))*/ }foreach测试:
@Test def testForeach() : Unit ={ val list = List(11, 2, 3, 4,5,5,5,5) val rdd: RDD[Int] = sparkContext.makeRDD(list, 3) /* foreach 是Spark中RDD的算子,在Executor端执行 分布式运算! 去Task所在的EXECUTOR 看输出的结果! */ rdd.foreach(x => println(Thread.currentThread().getName+"--------->"+x)) println("---------------") /* collect 将RDD中的元素,以Array返回,返回到Driver端! foreach 是 Driver遍历Array! 单线程,顺序执行! 在Driver端查看! client: 在客户端看到 cluster: 去Driver所运行的EXECUTOR查看! */ rdd.collect().foreach(x => println(Thread.currentThread().getName+"--------->"+x)) // 每个分区,调用一次函数,用于向数据库写数据,批处理 //rdd.foreachPartition()没有返回值 }测试:
@Test def testLineage() : Unit ={ val list = List(1, 3, 3, 4) val rdd: RDD[Int] = sparkContext.makeRDD(list, 2) val rdd1: RDD[Int] = rdd.map(x => x) val rdd2: RDD[Int] = rdd1.map(x => x) val rdd3: RDD[Int] = rdd2.map(x => x) val rdd4: RDD[Int] = rdd3.map(x => x) val rdd5: RDD[(Int, Int)] = rdd4.map(x => (x, 1)) val rdd6: RDD[(Int, Int)] = rdd5.reduceByKey(_ + _,3) val rdd7: RDD[(Int, Int)] = rdd6.map(x => x) val rdd8: RDD[(Int, Int)] = rdd7.map(x => x) println(rdd8.toDebugString) }结果:
//3代表三个分区 (3) MapPartitionsRDD[8] at map at LineageAndDependencyTest.scala:49 [] | MapPartitionsRDD[7] at map at LineageAndDependencyTest.scala:48 [] | ShuffledRDD[6] at reduceByKey at LineageAndDependencyTest.scala:46 [] /* 2 代表两个分区 | 代表同一层级 */ +-(2) MapPartitionsRDD[5] at map at LineageAndDependencyTest.scala:44 [] | MapPartitionsRDD[4] at map at LineageAndDependencyTest.scala:42 [] | MapPartitionsRDD[3] at map at LineageAndDependencyTest.scala:41 [] | MapPartitionsRDD[2] at map at LineageAndDependencyTest.scala:40 [] | MapPartitionsRDD[1] at map at LineageAndDependencyTest.scala:39 [] | ParallelCollectionRDD[0] at makeRDD at LineageAndDependencyTest.scala:37 []通过血统和血缘关系可以查看当前RDD的来龙去脉
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-luHdNTCj-1603280050875)(https://i.loli.net/2020/10/21/M3Qb9j8vJNnpuaC.png)]
窄依赖 【诸如Union,cartesian(笛卡尔积)】
1父1子: 父RDD的一个整个分区,全部经过转换后,生成RDD的一整个分区宽依赖
1父多子:父RDD的一整个分区,全部经过转换后,先shuffle分为若干个区,之后子RDD的每个分区,分别拷贝对应的分区[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-abpMTC8P-1603280050878)(https://i.loli.net/2020/10/21/IbGtiYqm2acWN6U.png)]
如何判断一个算子到底会不会shuffle 是NarrowDependency,就不会产生shuffle! union ---> RangeDependency cartesian(笛卡尔积) ---> NarrowDependency 或者看RDD生成的依赖是啥scala传入的基本数据类型都会序列化
传入闭包会发生什么事?
1.如果函数中传入的函数,产生了闭包,那么在提交job之前,会经过序列化的检查,检查使用的闭包中的变量是否可以序列化 2.如果可以序列化,此时将当前闭包变量,创建一个副本,以序列化的形式发送给每一个使用次算子的Task!序列化检查是在提交job之前
/** * Applies a function f to all elements of this RDD. */ def foreach(f: T => Unit): Unit = withScope { val cleanF = sc.clean(f) //检查闭包变量 sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF)) //提交job }自定义一个类,并测试:
//定义一个类 class user() @Test def test2() : Unit ={ val list = List(1, 2, 3, 4) val rdd: RDD[Int] = sparkContext.makeRDD(list, 2) val user = new User rdd.foreach( x => { println(user) }) }报错!!!!!!!!!!
org.apache.spark.SparkException: Task not serializable 说明:算子存在闭包,闭包中使用的变量所在的类没有实现序列化接口!解决1:
//定义一个类 class user() @Test def test2() : Unit ={ val list = List(1, 2, 3, 4) val rdd: RDD[Int] = sparkContext.makeRDD(list, 2) rdd.foreach( x => { val user = new User //将该对象放到算子里,不会报错,因为不构成闭包,就不会构成序列化等检查 println(user) }) }解决2;
--让定义的类实现序列化接口 class user() extends Serializable class user() @Test def test2() : Unit ={ val list = List(1, 2, 3, 4) val rdd: RDD[Int] = sparkContext.makeRDD(list, 2) val user = new User rdd.foreach( x => { println(user) }) } //注意: trait Serializable extends Any with java.io.Serializable 即: scala中的Serializable也是继承了java中的序列化 hadoop的Writable,spark不支持!只支持Java的java.io.Serializable解决3:
--直接使用样例类 case class user() class user() @Test def test2() : Unit ={ val list = List(1, 2, 3, 4) val rdd: RDD[Int] = sparkContext.makeRDD(list, 2) val user = new User rdd.foreach( x => { println(user) }) }自定义一个类测试:
class User1(age :Int=20){ // 过滤RDD中,小于20的元素 def fun1(rdd:RDD[Int]):RDD[Int]={ rdd.filter( x => { rdd.filter( x => x < age) }) } } @Test def test3() : Unit ={ val list = List(1, 2, 3, 4) val rdd: RDD[Int] = sparkContext.makeRDD(list, 2) val user = new User1() println(user.fun1(rdd).collect().mkString(",")) }//在调user的fun1方法的时候用到了age,age是user1的一个属性,在执行fun1时会调用filter算子,使用了user1类型的对象user中的age属性,又因为user1没有序列化,,所以报错 /* 属性不能单独存在,必须依附于对象存在 */报错!!!!!!!!!!
org.apache.spark.SparkException: Task not serializable 说明:算子存在闭包,闭包中使用的变量所在的类没有实现序列化接口! 在执行fun1,会调用filter算子,使用了User1类型的对象user中的age属性! 闭包变量: user对象 解决: 将user对象 ,创建一个副本,序列化发送给 Executor 属性必须依附于对象存在! //报错解决: ①User1 实现序列化 ②User1 不实现序列化 , 让闭包变量和 User1类脱离关系 闭包变量不能是 User1的成员。 可以使用局部变量来引用 User1中成员的值! 在算子中只使用 局部变量!解决1:
--User1 实现序列化解决2:
--User1不实现序列化,让闭包变量和User1类脱离关系 class User1(age :Int=20){ // 过滤RDD中,小于20的元素 def fun1(rdd:RDD[Int]):RDD[Int]={ rdd.filter( x => { // 20称为 变量 rdd.filter( x => x < 20) }) } } @Test def test3() : Unit ={ val list = List(1, 2, 3, 4) val rdd: RDD[Int] = sparkContext.makeRDD(list, 2) val user = new User1() println(user.fun1(rdd).collect().mkString(",")) }解决3:
--闭包变量不能是User1的成员,可以使用局部变量来引用User1中成员的值!在算子中只使用局部变量 class User1(age :Int=20){ // 过滤RDD中,小于20的元素 def fun1(rdd:RDD[Int]):RDD[Int]={ // 使用局部变量接受age的值 val myNum = age rdd.filter( x => { x < myNum /* 这时候使用的是myNum,并不是User1的属性age 相当于堆里面有一个20,User1类的属性age指向了20 我们在栈中new了一个myNum指向了20 这时候就不需要调用User1了,就不会报错了 */ }) } } @Test def test3() : Unit ={ val list = List(1, 2, 3, 4) val rdd: RDD[Int] = sparkContext.makeRDD(list, 2) val user = new User1() println(user.fun1(rdd).collect().mkString(",")) }报错!!!!!!!!!!
org.apache.spark.SparkException: Task not serializable 方法 是 类的成员! 解决思路: 闭包变量和 User3 没有关系! 使用函数 代替 方法! 也可以使用匿名函数!更简单!方法和函数的区别:
--方法和函数的区别: 1.加载时间不一样:函数是对象,定义在哪里就在哪里加载,方法是随着类的加载而加载 2.方法可以重载,函数不可以 3.方法是保存在方法区,函数是保存在堆 如果方法定义在方法里面,就是相当于时函数 方法可以转成函数: 方法名 _解决1:将方法定义在算子里,就是函数了
class User3(age :Int=20){ def myFilter(x:Int):Boolean={ x < 20 } // 过滤RDD中,小于20的元素 def fun1(rdd:RDD[Int]):RDD[Int]={ //是函数 def myFilter(x:Int):Boolean={ x < 20 } rdd.filter(myFilter) /* 这里的myFilter用的是最近的这个函数的myFilter,该函数不是User3的成员所以没关系 */ } } @Test def test4() : Unit ={ val list = List(1, 2, 3, 4) val rdd: RDD[Int] = sparkContext.makeRDD(list, 2) val user = new User3() println(user.fun1(rdd).collect().mkString(",")) }解决2:也可以使用匿名函数
class User3(age :Int=20){ def myFilter(x:Int):Boolean={ x < 20 } // 过滤RDD中,小于20的元素 def fun1(rdd:RDD[Int]):RDD[Int]={ //是函数 def myFilter(x:Int):Boolean={ x < 20 } rdd.filter(x => x<20) /* 这里的myFilter用的是最近的这个函数的myFilter,该函数不是User3的成员所以没关系 */ } } @Test def test4() : Unit ={ val list = List(1, 2, 3, 4) val rdd: RDD[Int] = sparkContext.makeRDD(list, 2) val user = new User3() println(user.fun1(rdd).collect().mkString(",")) }需要在sparkConf里设置:【kryo的设置】
val sparkContext = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("My app") // User2使用kryo进行序列化 .registerKryoClasses(Array(classOf[User2])) )源码:
/** * Use Kryo serialization and register the given set of classes with Kryo. * If called multiple times, this will append the classes from all calls together. */ def registerKryoClasses(classes: Array[Class[_]]): SparkConf = {//需要放到参数是一个数组,数组里面是一堆class类型的对象 val allClassNames = new LinkedHashSet[String]() allClassNames ++= get(KRYO_CLASSES_TO_REGISTER).map(_.trim) .filter(!_.isEmpty) allClassNames ++= classes.map(_.getName) set(KRYO_CLASSES_TO_REGISTER, allClassNames.toSeq) set(SERIALIZER, classOf[KryoSerializer].getName) this }测试:
@Test def testBroadCast() = { val list1 = List(1,2,3,4) val range = Range(1,100,1) val rdd : RDD[Int] = sparkContext.makeRDD(list1,4) val rdd2:RDD[Int] = rdd.filter(x => bc.value.contains(x)) println(rdd2.collect().mkString(",")) }改进:
@Test def testBroadCast() = { val list1 = List(1,2,3,4) val range = Range(1,100,1) val rdd : RDD[Int] = sparkContext.makeRDD(list1,4) /* 将RDD中的元素在Range中的元素过滤出来 存在闭包是,range会以副本的形式创建!之后序列化发送给每一个Task 4个分区,4个Task,每个Task都需要一个Range的副本 Range就需要复制四份,分别发给每一个Task task运行时,Range是需要放到Task所在的栈空间的 Task是在线程上运行的,每个线程都有一个方法栈,那么Range就要放到这个方法栈里 问题?Range是只读的,那么是不是可以让Range只发送一份,就是一个Executor一份,放到Executor的堆内存里面,当Executor运行每个Task时,就让每个Task去堆里面读取这一份变量,这样的好处就是打打节省了网络IO,可以用广播变量实现 */ //改进,将大的共享的只读变量range,使用广播变量的形式进行广播 val bc:Broadcast[Range] = sparkContext.broadcast(range) val rdd2:RDD[Int] = rdd.filter(x => bc.value.contains(x)) println(rdd2.collect().mkString(",")) }Driver的类的变量,属性或方法
创建多个副本以序列化的形式发给executor
自定义累加器模板:
class MyIntAccumulator extends AccumulatorV2[Int, Int] { override def isZero: Boolean = ??? override def copy(): AccumulatorV2[Int, Int] = ??? override def reset(): Unit = ??? override def add(v: Int): Unit = ??? override def merge(other: AccumulatorV2[Int, Int]): Unit = ??? override def value: Int = ??? }【用一个例子先引入】
@Test def testACC1() : Unit ={ val list = List(1, 2, 3, 4) val rdd: RDD[Int] = sparkContext.makeRDD(list, 2) var sum:Int =0 //在Driver中定义 rdd.foreach( x => sum += x) //这里的sum是在Executor端 // 0 println(sum) //打印的还是Driver端的,要想获得Exector端累加之后的值,就需要用到累加器 }【用累加器】
@Test def testACC2() : Unit ={ val list = List(1, 2, 3, 4) val rdd: RDD[Int] = sparkContext.makeRDD(list, 2) //创建累加器 根矩类型创建 val mysumACC: LongAccumulator = sparkContext.longAccumulator("mysum") // 调用add方法累加 rdd.foreach( x => mysumACC.add(x)) // 查看结果 value 为10 println(mysumACC.value) Thread.sleep(1000000) }用监控界面查看一下结果:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-3PzlbQLD-1603280050883)(https://i.loli.net/2020/10/21/V4BsCAQwMSIrkJD.png)]
看一下累加器原理:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-AwIsLbdZ-1603280050885)(Spark0621(3.0).assets/image-20200927233703329.png)]
自定义累加器模板,需要继承AccumulatorV2
【正常使用】
/*自定义累加器*/ class MyIntAccumulator extends AccumulatorV2[Int, Int] { // 累加的结果 private var sum :Int =0 // 当前累加器是否已经归0 override def isZero: Boolean = sum == 0 // 创建当前累加器的一个副本 override def copy(): AccumulatorV2[Int, Int] = new MyIntAccumulator // 重置归0 override def reset(): Unit = sum=0 // 累加 override def add(v: Int): Unit = sum += v // 将other中的值合并到当前累加器 override def merge(other: AccumulatorV2[Int, Int]): Unit = sum += other.value // 访问最终累加的结果 override def value: Int = sum } /*使用*/ @Test def testACC3() : Unit ={ val list = List(1, 2, 3, 4) val rdd: RDD[Int] = sparkContext.makeRDD(list, 2) //创建累加器 val accumulator = new MyIntAccumulator //自定义的需要自己注册 sparkContext.register(accumulator,"MySumInt") // 调用add方法累加 rdd.foreach( x => accumulator.add(x)) // 查看结果 value 10 println(accumulator.value) Thread.sleep(1000000) }【累加器如果没有归0,报错:】
/*自定义累加器*/ class MyIntAccumulator extends AccumulatorV2[Int, Int] { // 累加的结果 private var sum :Int =0 // 当前累加器是否已经归0 override def isZero: Boolean = false //sum == 0 【与测试3相比此处修改】 // 创建当前累加器的一个副本 override def copy(): AccumulatorV2[Int, Int] = new MyIntAccumulator // 重置归0 override def reset(): Unit = sum=0 // 累加 override def add(v: Int): Unit = sum += v // 将other中的值合并到当前累加器 override def merge(other: AccumulatorV2[Int, Int]): Unit = sum += other.value // 访问最终累加的结果 override def value: Int = sum } /*使用*/ @Test def testACC3() : Unit ={ val list = List(1, 2, 3, 4) val rdd: RDD[Int] = sparkContext.makeRDD(list, 2) //创建累加器 val accumulator = new MyIntAccumulator //自定义的需要自己注册 sparkContext.register(accumulator,"MySumInt") // 调用add方法累加 rdd.foreach( x => accumulator.add(x)) // 查看结果 value 10 println(accumulator.value) Thread.sleep(1000000) } 举个例子; 当前有两分区,都要执行累加的逻辑,累加器是在Driver端创建的,两个Task都需要使用这个累加器, 第一步:先拿Driver端的累加器调它的copy()方法, copy完, 第二步:再找reset方法,接着判断是否归零成功了,没问题了就发给Task,又到下一个Task,继续执行会报错,追踪一下源码:
copyAndReset must return a zero value copy atorg.apache.spark.util.AccumulatorV2.writeReplace(AccumulatorV2.scala:166)进入AccumulatorV2.scala:166
val copyAcc = copyAndReset() copyAcc当前累加器的一个副本,copyAcc调copyAndReset() 方法返回一个累加器 assert( copyAcc.isZero, //判断是否为0 "copyAndReset must return a zero value copy")进入copyAndReset() 方法:
/** * Creates a new copy of this accumulator, 【创建当前累加器的新的一个副本】 which is zero value. i.e. call `isZero` on the copy 【新的副本必须归零】 * must return true. */ def copyAndReset(): AccumulatorV2[IN, OUT] = { val copyAcc = copy() //复制一个副本 copyAcc.reset() //将当前副本归零 copyAcc }得知报错原因:
累加器如果没有归0,报错: java.lang.AssertionError: assertion failed: copyAndReset must return a zero value copy val copyAcc = copyAndReset() val copyAcc = copy() copyAcc.reset() copyAcc assert(copyAcc.isZero, "copyAndReset must return a zero value copy") 累加器在Job被提交时,经过: ①copy(): 复制一个副本对象 ②reset() : 将当前副本重置归0 ③isZero() : 判断是否已经归0 如果归0,继续执行! 归0失败,报错! Job就无法提交!【初始值为10】
/*自定义累加器*/ class MyIntAccumulator extends AccumulatorV2[Int, Int] { // 累加的结果 private var sum :Int =10 【此处将初始值改为10】 // 当前累加器是否已经归0 override def isZero: Boolean = sum == 0 // 创建当前累加器的一个副本 override def copy(): AccumulatorV2[Int, Int] = new MyIntAccumulator // 重置归0 override def reset(): Unit = sum=0 // 累加 override def add(v: Int): Unit = sum += v // 将other中的值合并到当前累加器 override def merge(other: AccumulatorV2[Int, Int]): Unit = sum += other.value // 访问最终累加的结果 override def value: Int = sum } /*使用*/ @Test def testACC3() : Unit ={ val list = List(1, 2, 3, 4) val rdd: RDD[Int] = sparkContext.makeRDD(list, 2) //创建累加器 sum初始值默认为10 val accumulator = new MyIntAccumulator //自定义的需要自己注册 sparkContext.register(accumulator,"MySumInt") // 调用add方法累加 rdd.foreach( x => accumulator.add(x)) // 查看结果 value 10 println(accumulator.value) Thread.sleep(1000000) }结果;
20[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-bTJhnMb8-1603280050886)(Spark0621(3.0).assets/image-20200928002304653.png)]
Task端确实重置为0了,但我Driver端还有值,所以合并是20
注意;
累加时,别忘记,Driver端还有一个值数据:
hi hi hi hello hello hello nice to meet you nice to meet you too goodbye goodbye /* 自定义累加单词的累加器 输入: 单词 输出: 所有单词的统计结果 Map[(单词,计数)] */ class WCAcc extends AccumulatorV2[String, mutable.Map[String,Int]] { //输入是String,输出是 // 构建一个返回值Map private var result:mutable.Map[String,Int] = mutable.Map[String,Int]() //重置归零 override def isZero: Boolean = result.isEmpty //创建副本 override def copy(): AccumulatorV2[String, mutable.Map[String, Int]] = new WCAcc //重置归零,判断是否归零 override def reset(): Unit = result.clear() //清空map //累加 override def add(v: String): Unit = { 【首先要判断要加入到该元素是否存在】 【写法就是,从结果集去该元素,如果没有就给0,如果有就加1】 val newValue: Int = result.getOrElse(v, 0) + 1 //再将新的key值V,和新的value值,加入结果集 result.put(v,newValue) } //合并 override def merge(other: AccumulatorV2[String, mutable.Map[String, Int]]): Unit = { //取出 other 中的map,要合并的map val toMergeMap: mutable.Map[String, Int] = other.value //遍历map,里面用模式匹配 for ((key , value) <- toMergeMap) { 【存在这样的情况,本分区有,而另一个分区没有的元素该怎么合并,同上】 //先从当前result中取出 key对应的值 val newValue: Int = result.getOrElse(key, 0) + value // 插入或更新 result.put(key,newValue) } } //返回值 override def value: mutable.Map[String, Int] = result }使用
/* 【 在某些累加求和的场景, 可以使用累加器替代reduceByKey,并行累加,避免shuffle! 】 */ @Test def testWordCountACC() : Unit ={ val rdd: RDD[String] = sparkContext.textFile("input/data1.txt") // RDD[单词] val rdd1: RDD[String] = rdd.flatMap(line => line.split(" ")) // 之前使用reduceByKey进行合并,现在使用累加器,对单词进行累加 val acc = new WCAcc //注册 sparkContext.register(acc,"wc") // 使用累加器累加每一个单词 rdd1.foreach( word => acc.add(word)) //打印累加的结果 println(acc.value) }测试:
//未加缓存时: @Test def testCache() : Unit ={ val list = List(1, 2, 3, 4) val rdd: RDD[Int] = sparkContext.makeRDD(list, 2) val rdd1: RDD[Int] = rdd.map(x => x) .map(x => x) .map(x => x) .map(x => x) .map(x => x) .map(x => x) .map(x => x) .map(x => x) .map(x => x) .map(x => x) .map(x => { println("进行了若干次的map操作") x }) //把血缘关系打印一下 println(rdd1.toDebugstring) //将人rdd1缓存起来 rdd1.coach() println(rdd1.collect().mkString(",")) println(rdd1.collect().mkString(",")) println(rdd1.collect().mkString(",")) println("------------------") rdd1.saveAsTextFile("output") } //结果 共打印16次 进行了若干次的map操作 //改进,将RDD1缓存起来,再看结果 只打印一次【说明第一次job提交时,会将数据缓存起来】看一下catch的源码:
def cache(): this.type = persist() def persist(): this.type = persist(StorageLevel.MEMORY_ONLY) def persist(newLevel: StorageLevel): this.type = {...} //此方法指定数据在缓存时使用什么样的形式缓存!由此可知:
def persist(newLevel: StorageLevel): 指定数据在缓存时使用什么样的形式缓存! 参考 StorageLevel(缓存级别)! cache()===>persist() ====>persist(StorageLevel.MEMORY_ONLY) StorageLevel可以通过以下参数构造: 【即,缓存级别可以通过persist来指定】 class StorageLevel private( private var _useDisk: Boolean, //是否使用磁盘来缓存数据 private var _useMemory: Boolean, //是否使用内存磁盘来缓存数据 private var _useOffHeap: Boolean, //是否使用堆外内存(JVM管理不到的,向OS申请的内存)磁盘来缓存数据 private var _deserialized: Boolean, //缓存的数据是否以序列化的字节形式存储 private var _replication: Int = 1) //指定缓存对象的副本数量 extends Externalizable 1.缓存不会改变血缘关系!原因在于缓存的不可靠性 2.在出现shuffle时,如果一个Job Map阶段产生的文件会被之后的job使用,那么这些文件会一直保留 3.可以理解为 Map阶段的输出缓存在了磁盘因为cache不可靠,那么怎么解决呢?
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-gkSBva3P-1603280050887)(Spark0621(3.0).assets/image-20200928194115781.png)]
追踪一下 textFile源码:
def textFile( path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = withScope { assertNotStopped() hadoopFile( path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minPartitions ) .map(pair =>pair._2.toString) .setName(path) }可见textFile调用了hadoopFile方法:
def hadoopFile[K, V]( path: String, inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope { assertNotStopped() FileSystem.getLocal(hadoopConfiguration) val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration)) val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path) new HadoopRDD( // 【用的是mapred.xxx 】 this, confBroadcast, Some(setInputPathsFunc), inputFormatClass, keyClass, valueClass, minPartitions).setName(path) }然后可见hadoopFile方法new了一个HadoopRDD。 用的是老的mapred.xxx 默认是:mapred.TextInputFormat,更熟悉的是:mapreduce.TextInputFormat
那么mapreduce.TextInputFormat在哪呢?
在RDD下有一个NewHadoopRDD
/** * :: DeveloperApi :: * An RDD that provides core functionality for reading data stored in Hadoop (e.g., files in HDFS, * sources in HBase, or S3), using the new MapReduce API 【用的是新的mapreduce的API】(`org.apache.hadoop.mapreduce`). * * @param sc The SparkContext to associate the RDD with. * @param inputFormatClass Storage format of the data to be read. * @param keyClass Class of the key associated with the inputFormatClass. * @param valueClass Class of the value associated with the inputFormatClass. * * @note Instantiating this class directly is not recommended, please use * `org.apache.spark.SparkContext.newAPIHadoopRDD()` */new一个NewHadoopRDD需要用这样一个构造器:
@DeveloperApi class NewHadoopRDD[K, V]( sc : SparkContext, inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], @transient private val _conf: Configuration) extends RDD[(K, V)](sc, Nil) with Logging11.3.1 基本介绍
读写SF文件: 写: RDD.saveAsSequenceFile() 读: sparkContext.sequenceFile11.3.2 测试
def sequenceFile[K, V]( path: String, keyClass: Class[K], valueClass: Class[V]): RDD[(K, V)] = withScope { assertNotStopped() sequenceFile(path, keyClass, valueClass, defaultMinPartitions) } @Test def testSF() : Unit ={ val list = List(1, 2, 3, 4) val rdd: RDD[Int] = sparkContext.makeRDD(list, 2) val rdd1: RDD[(Int, Int)] = rdd.map(x => (x, 1)) //写 rdd1.saveAsSequenceFile("output") // 读的时候,需要指定 SF中 K-V对的类型 println( sparkContext.sequenceFile[Int, Int] ("output").collect() .mkString(",")) }jdbcRDD:
/** An RDD that executes a SQL query on a JDBC connection and reads results.For usage example, see test case JdbcRDDSuite. @param getConnection a function that returns an open Connection. The RDD takes care of closing the connection. @param sql the text of the query. The query must contain two ? placeholders for parameters used to partition the results. For example, {{{ select title, author from books where ? <= id and id <= ? }}} @param lowerBound the minimum value of the first placeholder @param upperBound the maximum value of the second placeholder The lower and upper bounds are inclusive. @param numPartitions the number of partitions. Given a lowerBound of 1, an upperBound of 20, and a numPartitions of 2, the query would be executed twice, once with (1, 10) and once with (11, 20) @param mapRow a function from a ResultSet to a single row of the desired result type(s). This should only call getInt, getString, etc; the RDD takes care of calling next. The default maps a ResultSet to an array of Object. */ 译文: 一个RDD,它在JDBC连接上执行SQL查询并读取结果。 有关用法示例,请参见测试用例JdbcRDDSuite。 @param getConnection一个返回打开的Connection的函数。RDD负责关闭连接。 @param sql查询的文本。查询必须包含两个?用于对结果进行分区的参数的占位符。 例如, {{{选择书名,作者从书中何处? <= id和id <=? }}} @param lowerBound第一个占位符的最小值 @param upperBound第二个占位符的最大值上下限包括在内。 @param numPartitions分区数。给定lowerBound为1,upperBound为20,numPartitions为2,查询将执行两次,一次使用(1,10),一次使用(11,20) @param mapRow ResultSet中的函数到所需结果类型的单行。这应该只调用getInt,getString等; RDD负责下一步呼叫。默认值将ResultSet映射到Object数组。 * / class JdbcRDD[T: ClassTag]( sc: SparkContext, //sparkContext getConnection: () => Connection, //函数用于创建Connection sql: String, //要执行的查询的sql语句,sql中需要设置两个占位符 lowerBound: Long, //下界,用来填充小的占位符 upperBound: Long, //上界,用来填充大的占位符,取上下界的区间 numPartitions: Int, //跟分区数 mapRow: (ResultSet) => T =JdbcRDD.resultSetToObjectArray _) //不能调用next(),只需要调用ResultSet.getString()...+...Result.getInt() extends RDD[T]