设置SparkConf的时候不能设置为local,会报错,应当设置成local[N],N>1。这是因为需要一个核接收数据,另一个核处理数据,如果只分配一个线程处理,这个线程会被用来接收数据,就没有办法处理接收到的数据
Spark Streaming快速入门
使用命令nc -lv 端口号进行测试
1.创建StreamingContext对象 同Spark初始化需要创建SparkContext对象一样,使用Spark Streaming就需要创建StreamingContext对象。创建StreamingContext对象所需的参数与SparkContext基本一致,包括指明Master,设定名称(如NetworkWordCount)。需要注意的是参数Seconds(1),Spark Streaming需要指定处理数据的时间间隔,如上例所示的1s,那么Spark Streaming会以1s为时间窗口进行数据处理。此参数需要根据用户的需求和集群的处理能力进行适当的设置;
2.创建InputDStream如同Storm的Spout,Spark Streaming需要指明数据源。如上例所示的socketTextStream,Spark Streaming以socket连接作为数据源读取数据。当然Spark Streaming支持多种不同的数据源,包括Kafka、 Flume、HDFS/S3、Kinesis和Twitter等数据源;
3.操作DStream对于从数据源得到的DStream,用户可以在其基础上进行各种操作,如上例所示的操作就是一个典型的WordCount执行流程:对于当前时间窗口内从数据源得到的数据首先进行分割,然后利用Map和ReduceByKey方法进行计算,当然最后还有使用print()方法输出结果;
4.启动Spark Streaming之前所作的所有步骤只是创建了执行流程,程序没有真正连接上数据源,也没有对数据进行任何操作,只是设定好了所有的执行计划,当ssc.start()启动后程序才真正进行所有预期的操作。
⽂件数据流:能够读取所有HDFS API兼容的⽂件系统⽂件,通过fileStream⽅法进⾏读取,Spark Streaming 将会监控 dataDirectory ⽬录并不断处理移动进来的⽂件,记住目前不⽀持嵌套目录。(如果监控的文件夹中又进来一个文件夹,那么该文件夹里面的数据不会被监控)
1)⽂件需要有相同的数据格式; 2)⽂件进⼊ dataDirectory 的⽅式需要通过移动或者重命名来实现; 3)⼀旦⽂件移动进⽬录,则不能再修改,即便修改了也不会读取新数据;
ps:这个流只需要了解就⾏,效果不是很明显,本地的话也有可能会明显⼀下,所以了解即可我们多⽤于flume来完成⽂件的监控
如果需要访问hdfs文件系统,那么需要将core-site.xml文件添加进resources中,在textFileSystem中直接从hdfs根目录开始写,会自动读取hdfs中的文件
import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.{Seconds, StreamingContext} /** * 将文件作为输入源。 * 可以监控一个文件夹,SparkStreaming会检测这个文件夹中的内容发生变化。 * * ps: * 这个流,只需要了解即可。 这个效果不明显。 * */ object _01_InputFile { def main(args: Array[String]): Unit = { val ssc: StreamingContext = new StreamingContext(new SparkConf().setMaster("local[2]").setAppName("input"), Seconds(3)) // 作为SparkStreaming监控的目录 // val path: String = "file:///C:\\Users\\luds\\Desktop\\data" // 监控这个目录,并生成DStream val dirStream: DStream[String] = ssc.textFileStream(path) // 处理数据 val resDStream: DStream[(String, Int)] = dirStream.flatMap(_.split("\\s+")).map((_, 1)).reduceByKey(_ + _) // 输出数据 resDStream.print() ssc.start() ssc.awaitTermination() } }oneAtATime – Whether only one RDD should be consumed from the queue in every interva 是否在每个间隔时间内只有一个RDD能够从队列中取出并被消费
import org.apache.spark.SparkConf import org.apache.spark.rdd.RDD import org.apache.spark.streaming.dstream.{DStream, InputDStream} import org.apache.spark.streaming.{Seconds, StreamingContext} import scala.collection.mutable object _02_RddQueue { def main(args: Array[String]): Unit = { val ssc: StreamingContext = new StreamingContext(new SparkConf().setMaster("local[2]").setAppName("rddQueue"), Seconds(3)) // 1. 创建一个RDD的队列 val queue: mutable.Queue[RDD[Int]] = new mutable.Queue[RDD[Int]]() // 2. 初始化DStream, 监控一个队列 // oneAtTime表示在每个间隔中是否每个rdd只能消费一次 val rddStream: InputDStream[Int] = ssc.queueStream(queue, oneAtATime = false) // 3. 数据处理: 统计每一个数字出现的次数 val resStream: DStream[(Int, Int)] = rddStream.map((_, 1)).reduceByKey(_ + _) // 4. 输出统计结果 resStream.print() // 先开启作业 ssc.start() // 5. 往队列中,添加RDD for (i <- 0 to 5) { val rdd: RDD[Int] = ssc.sparkContext.parallelize(1 to 500) queue += rdd // 添加到队列 Thread.sleep(2500) //让每一个批次都有数据处理 } ssc.awaitTermination() } }⾃定义数据源其实就是需要继承Receiver,并实现onStart、onStop⽅法来⾃定义数据源采集。
import java.io.{BufferedReader, InputStreamReader} import java.net.Socket import org.apache.spark.SparkConf import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.streaming.receiver.Receiver /** * 自定义的输入源 */ object _03_CustomReceiverTest { def main(args: Array[String]): Unit = { val ssc: StreamingContext = new StreamingContext(new SparkConf().setMaster("local[2]").setAppName("custom"), Seconds(3)) // 从一个自定义的输入源读取数据,创建DStream val dStream: ReceiverInputDStream[String] = ssc.receiverStream(new CustomReceiver("qianfeng01", 6666)) // 处理数据 val res: DStream[(String, Int)] = dStream.flatMap(_.split("\\s+")).map((_, 1)).reduceByKey(_ + _) res.print() res.saveAsTextFiles("C:\\Users\\luds\\Desktop\\output\\myLog_", "log") ssc.start() ssc.awaitTermination() } } /** * 自定义的输入源,需要继承自 org.apache.spark.streaming.receiver.Receiver * 在继承的时候,需要设置父类Receiver的主构造器 * 并重写 onStart() 和 onStop() 方法 * 该类含有store()、isStopped()方法 */ class CustomReceiver(host: String, port: Int) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_SER_2) { // 启动的时候调用的 // 读取数据,并将数据发送给Spark override def onStart(): Unit = { new Thread() { override def run(): Unit = { receive() } }.start() } def receive(): Unit = { val socket: Socket = new Socket(host, port) // 实例化一个流,用来读取Socket中的数据 val reader: BufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream, "UTF8")) // 读取数据 var line: String = reader.readLine() // 如果receiver还没有停止,就需要继续读取 while (line != null && !isStopped()) { // 将接收到的数据块存储到spark内存汇总 store(line) line = reader.readLine() } reader.close() socket.close() } // 停止的时候调用 override def onStop(): Unit = { } }(1)Transformations (2)Output Operations (3)特殊的原语 updateStateByKey ---- ⽤于记录历史记录 transform window
这里只说明特殊的原语用法,因为前面两个都在之前的学习中学习过
updateStateByKey() 的结果会是⼀个新的 DStream,其内部的 RDD 序列是由每个时间区间对应的(键,状态)对组成的。 updateStateByKey操作使得我们可以在⽤新信息进⾏更新时保持任意的状态。为使⽤这个功能,你需要做下⾯两步:
定义状态,状态可以是⼀个任意的数据类型。定义状态更新函数,⽤此函数阐明如何使⽤之前的状态和来⾃输⼊流的新值对状态进⾏更新 使⽤updateStateByKey需要对检查点⽬录进⾏配置,会使⽤检查点来保存状态该原语只能保存5个批次的数据,保存的文件夹会随时间不断刷新
package day18_SparkStreaming import org.apache.spark.{HashPartitioner, SparkConf} import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Seconds, StreamingContext} /** * 使用 updateStateByKey 实现wordcount * 要将每一个批次中处理的结果累加到一起 * * updateStateByKey: 将之前的批次处理的结果,存储在磁盘上的 * */ object _04_UpdateStateByKeyTest { def main(args: Array[String]): Unit = { // 写入hdfs文件系统 // System.setProperty("HADOOP_USER_NAME", "root") val ssc: StreamingContext = new StreamingContext(new SparkConf().setMaster("local[2]").setAppName("wc"), Seconds(3)) // 1. 如果要使用updateStateByKey的话,需要指定一个临时的文件保存目录 // SparkStreaming在处理的过程中,会将临时的数据,保存在这个目录下。 // 保存的就是之前批次的处理结果,历史记录。 // 这里,只会保留最近的5个批次的数据。 ssc.checkpoint("C:\\Users\\luds\\Desktop\\ck") // ssc.checkpoint("hdfs://host01:9000/2020-10-21/20-25") // 2. 监听数据,创建DStream val stream: ReceiverInputDStream[String] = ssc.socketTextStream("host01", 6666) // 3. 数据处理 val mappedStream: DStream[(String, Int)] = stream.flatMap(_.split("\\s+")).map((_, 1)) // val res: DStream[(String, Int)] = mappedStream.updateStateByKey(updateState) val res: DStream[(String, Int)] = mappedStream.updateStateByKey(updateState2 _, new HashPartitioner(ssc.sparkContext.defaultParallelism), rememberPartitioner = true) res.print() ssc.start() ssc.awaitTermination() } // (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)] // Iterator中的参数 // String : Key // Seq[Int] : 这个Key对应的出现的次数 // Option[Int] : 这个批次中这个Key对应的次数 def updateState2(iterator: Iterator[(String, Seq[Int], Option[Int])]): Iterator[(String, Int)] = { iterator.map(tuple => { (tuple._1, tuple._2.sum + tuple._3.getOrElse(0)) }) } /** * seq: Seq[V] => 历史记录中的,之前的数据累加的结果。 Seq(1, 1, 1, 1, 1, 1, 1, 1...) * value: Option[S] => 当前批次的结果 */ def updateState(seq: Seq[Int], value: Option[Int]): Option[Int] = { Some(seq.sum + value.getOrElse(0)) } }测试:nc -lv 6666
mapWithState只返回变化后的key的值,这样做的好处是,可以只是关⼼那些已经发⽣的变化的key,对于没有数据输⼊,则不会返回那些没有变化的key的数据。这样的话,即使数据量很⼤,checkpoint也不会像updateStateByKey那样,占⽤太多的存储,效率⽐较⾼(再⽣产环境中建议使⽤这个) 注意:
mapWithState是1.6版本之后推出的;必须设置checkpoint来储存历史数据;mapWithState和updateStateByKey的区别 : 它们是类似的,都是有状态DStream操作, 区别在于, updateStateByKey是输出增量数据,随着时间的增加, 输出的数据越来越多,这样会影响计算的效率, 对CPU和内存压⼒较⼤。⽽mapWithState则输出本批次数据,也含有状态更新;(mapWithState会在缓存中维护数据的状态)checkpoint的数据会分散存储在不同的分区中, 在进⾏状态更新时, ⾸先会对当前 key 做 hash , 再到对应的分区中去更新状态 , 这种⽅式⼤⼤提⾼了效率。需要用到的函数和参数类型
Params: spec – Specification of this transformation Type parameters: StateType – Class type of the state data MappedType – Class type of the mapped data def mapWithState[StateType: ClassTag, MappedType: ClassTag]( spec: StateSpec[K, V, StateType, MappedType] ): MapWithStateDStream[K, V, StateType, MappedType] = {} Params: mappingFunction – The function applied on every data item to manage the associated state and generate the mapped data Type parameters: ValueType – Class of the values StateType – Class of the states data MappedType – Class of the mapped data def function[KeyType, ValueType, StateType, MappedType]( mappingFunction: (KeyType, Option[ValueType], State[StateType]) => MappedType ): StateSpec[KeyType, ValueType, StateType, MappedType] = {}测试代码:
import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, MapWithStateDStream, ReceiverInputDStream} import org.apache.spark.streaming.{Seconds, State, StateSpec, StreamingContext} object _03_MapWithStateTest { def main(args: Array[String]): Unit = { val ssc: StreamingContext = new StreamingContext(new SparkConf().setMaster("local[2]").setAppName("map"), Seconds(3)) // 设置一个文件存储的路径 ssc.checkpoint("file:///C:/Users/luds/Desktop/ck") // 监控nc的输入 val stream: ReceiverInputDStream[String] = ssc.socketTextStream("host01", 6666) // 简单的处理 val mappedStream: DStream[(String, Int)] = stream.flatMap(_.split("\\s+")).map((_, 1)) val res: MapWithStateDStream[String, Int, Int, (String, Int)] = mappedStream.mapWithState(StateSpec.function(mapping _)) res.stateSnapshots().print() ssc.start() ssc.awaitTermination() } // (KeyType, Option[ValueType], State[StateType]) => MappedType def mapping(key: String, value: Option[Int], state: State[Int]): (String, Int) = { // 累加新的单词数量 val sum: Int = value.getOrElse(0) + state.getOption().getOrElse(0) // 更新state的值 state.update(sum) // 将这个键,对应的新的值返回 (key, sum) } }测试:nc -lv 6666
有两种方式,分别是Receiver方式(已过时)和Driver方式
HasOffeSetRang类型存储的是所有的offset 手动管理offset
自动提交offset点击这里
import java.util import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.serialization.StringDeserializer import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.InputDStream import org.apache.spark.streaming.kafka010.{ConsumerStrategies, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange} import org.apache.spark.streaming.{Seconds, StreamingContext} import redis.clients.jedis.{HostAndPort, JedisCluster, JedisPoolConfig} /** * SparkStreaming对接Kafka,完成手动的offset提交 * * 在程序中维护一个Map,存储每一个tpoic的每一个分区,消费到了哪一个offset。 * 每当有消费者在消费的时候,获取到消费的最新的offset,并更新到Map中。 * 需要将Map中的数据,在Redis中存储一份。 * */ object _02_Offset { def main(args: Array[String]): Unit = { val ssc: StreamingContext = new StreamingContext(new SparkConf().setMaster("local[2]").setAppName("offset"), Seconds(3)) // 获取Jedis连接对象 val jedis: JedisCluster = RedisConn.getJedis // 准备消费者策略需要的数据 // 0. 将消费者组单独列出来 val groupid: String = "streaming2KafkaGroup" // 1. 准备一个所有的topic val topics: Array[String] = Array("streaming2kafka") // 2. 配置消费的属性 val configuration: Map[String, Object] = Map[String, Object]( "bootstrap.servers" -> "host01:9092,host03:9092,host04:9092", "key.deserializer" -> classOf[StringDeserializer], //反射方式反序列化 "value.deserializer" -> classOf[StringDeserializer], "enable.auto.commit" -> "true", //关闭offset自动提交 "auto.offset.reset" -> "earliest", //offset设置到最新的位置,从最新的位置开始消费 "group.id" -> "streaming2KafkaGroup" //设置消费者组id ) // 从Redis中查询之前的数据 var offsets: Map[TopicPartition, Long] = Offset(jedis, groupid) // 3. 创建DStream // 在消费的时候,分为两种情况: 第一次消费和非第一次消费 val kafkaStream: InputDStream[ConsumerRecord[String, String]] = if (offsets.isEmpty) { // offsets中没有记录之前的offset,说明是第一次消费 KafkaUtils.createDirectStream( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](topics, configuration) ) } else { KafkaUtils.createDirectStream( ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Assign[String, String](offsets.keys, configuration, offsets) ) } // 4. 处理消息,消费消息 kafkaStream.foreachRDD(rdd => { // 模拟消费数据: 直接输出到控制台 rdd.map(_.value).foreach(println) // 获取最新的offset,并将其同步给Redis val ranges: Array[OffsetRange] = rdd.asInstanceOf[HasOffsetRanges].offsetRanges for (or <- ranges) { jedis.hset(groupid, or.topic + "_" + or.partition, or.untilOffset.toString) } }) ssc.start() ssc.awaitTermination() } } object RedisConn { // 设置主节点信息 val hosts:util.Set[HostAndPort] = { val set: util.HashSet[HostAndPort] = new util.HashSet[HostAndPort]() set.add(new HostAndPort("192.168.10.101", 7001)) set.add(new HostAndPort("192.168.10.101", 7002)) set.add(new HostAndPort("192.168.10.101", 7003)) set } // 配置连接池的信息 val config: JedisPoolConfig = new JedisPoolConfig config.setMaxTotal(20) config.setMaxIdle(10) // 连接Redis集群的对象 val cluster: JedisCluster = new JedisCluster(hosts, 10000, config) def getJedis: JedisCluster = cluster } object Offset { def apply(jedis: JedisCluster, groupid: String): Map[TopicPartition, Long] = { // 1. 实例化一个新的Map,用来存储从Redis查询到的数据 var offsets: Map[TopicPartition, Long] = Map[TopicPartition, Long]() // 2. 从Redis查询数据 val topicsAndPartitionsAndOffsets: util.Map[String, String] = jedis.hgetAll(groupid) // 3. 引入一个转换包 import scala.collection.JavaConversions._ // 4. 将Map转成Scala的List集合,方便遍历 val list: List[(String, String)] = topicsAndPartitionsAndOffsets.toList // 5. 遍历集合,存入offsetMap中 for ((topicAndPartition, offset) <- list) { // 按照下划线,切分,得到topic和partition val arr: Array[String] = topicAndPartition.split("_") offsets += (new TopicPartition(arr(0), arr(1).toInt) -> offset.toLong) } offsets } }这块需要关注窗口的大小和步长
案例一
import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext} /** * window的使用: * 案例: 热搜,统计过去的20秒内,所有的词条搜索量,每隔10秒更新一次 * */ object WindowUsage { def main(args: Array[String]): Unit = { // 批次的间隔, 可以设置为毫秒, 使用Milliseconds即可 val ssc: StreamingContext = new StreamingContext(new SparkConf().setMaster("local[2]").setAppName("window"), Milliseconds(5000)) // 监测nc的输入, 获取一个DStream对象 val stream: ReceiverInputDStream[String] = ssc.socketTextStream("qianfeng01", 6666) // wordcount val res: DStream[(String, Int)] = stream .window(Seconds(20), Seconds(10)) // 设置一个窗口的大小为20秒,滑动的间隔为10秒 .flatMap(_.split("\\s+")) .map((_, 1)) .reduceByKey(_ + _) // 需求进阶: 需要让每一个window中的次品搜索量,降序排序取top3 // res.transform(rdd => {}) // res.foreachRDD(rdd => rdd.sortBy(-_._2).take(3)) ssc.start() ssc.awaitTermination() } }案例二
import org.apache.spark.SparkConf import org.apache.spark.streaming.{Milliseconds, Seconds, StreamingContext} import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} object WindowUsage2 { def main(args: Array[String]): Unit = { // 批次的间隔, 可以设置为毫秒, 使用Milliseconds即可 val ssc: StreamingContext = new StreamingContext(new SparkConf().setMaster("local[2]").setAppName("window"), Milliseconds(5000)) // 监测nc的输入, 获取一个DStream对象 val stream: ReceiverInputDStream[String] = ssc.socketTextStream("qianfeng01", 6666) // wordcount // val res: DStream[(String, Int)] = stream // .flatMap(_.split("\\s+")) // .map((_, 1)) // .window(Seconds(20), Seconds(10)) // 设置一个窗口的大小为20秒,滑动的间隔为10秒 // .reduceByKey(_ + _) val res: DStream[(String, Int)] = stream .flatMap(_.split("\\s+")) .map((_, 1)) .reduceByKeyAndWindow((x: Int, y: Int) => x + y , Seconds(20), Seconds(10)) res.print() ssc.start() ssc.awaitTermination() } }