kafka数据消费的情况一般分为两种: a.数据生产过快,消费者消费不及时,造成数据丢失 b.数据生产过慢,消费者半天消费不到一条数据,造成poll超时
解决:(只针对情况b) a.sparkConf配置参数:加大poll不到数据的超时时长(参数大小视情况而定) val conf: SparkConf = new SparkConf().setAppName(this.getClass.getName).set(“spark.streaming.kafka.consumer.poll.ms”, “1000000”)
b.kafka配置参数reconnect.backoff.ms:尝试重新连接到给定主机之前等待的时间量。这避免了在紧密循环中重复连接到主机。该退避适用于消费者发送给代理的所有请求 val kafkaParams = Map[String, Object]( “bootstrap.servers” -> properties.getProperty(“kafka.host”), “key.deserializer” -> classOf[StringDeserializer], “value.deserializer” -> classOf[StringDeserializer], “group.id” -> groupId, “auto.offset.reset” -> “latest”, "reconnect.backoff.ms" ->“0”, “enable.auto.commit” -> (false: java.lang.Boolean) )
两个配置都加上,完美解决!!
//1.获取ssc val conf: SparkConf = new SparkConf().setAppName(LiveNowRankSinkEs.getClass.getName).set("spark.streaming.kafka.consumer.poll.ms", "1000000") // .setMaster("local[*]") val sc: SparkContext = new SparkContext(conf) val ssc: StreamingContext = new StreamingContext(sc, Seconds(Integer.parseInt(args(0)))) // val ssc: StreamingContext = new StreamingContext(sc, Seconds(30)) //2.设置kafka参数 val topicName = properties.getProperty("kafka.live.now.rank.topic") val groupId = properties.getProperty("kafka.live.now.rank.group.id") val kafkaParams = Map[String, Object]( "bootstrap.servers" -> properties.getProperty("kafka.host"), "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> groupId, "auto.offset.reset" -> "latest", "reconnect.backoff.ms" ->"0", "enable.auto.commit" -> (false: java.lang.Boolean) )