代码地址:https://github.com/luslin1711/kafka_demo/tree/master/kafka_demo_07
一般情况下,针对偶尔出现的提交失败,不进行重试不会有太大问题,因为如果提交失败是因为临时原因导致的,那么后续的提交总会有成功的。但如果这是在关闭消费者前的最后一次提交,就要确保能够提交成功
因此,在消费者关闭前一般会组合使用commitAsync() 与commitSync()
public void run() { KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties, new StringDeserializer(), new StringDeserializer()); try { consumer.subscribe(Collections.singletonList(topic)); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); if (records.isEmpty()) continue; System.out.println("recordsLen: " + records.count()); for ( ConsumerRecord <String, String> record : records ) { System.out.println("threadId: " + Thread.currentThread().getName() +" ,groupId: " + groupId +", partition: " + record.partition()); } consumer.commitAsync(); } } catch (WakeupException E) { // 忽略异常,关闭消费者 } catch (Exception e) { e.printStackTrace(); }finally { try { consumer.commitSync(); } finally { consumer.close(); } } }如果一切正常,我们使用consumer.commitAsync()方法来提交。这样速度更快,而且即使这次提交失败,下次提交也可能会成功
如果直接关闭消费者,那就没有所谓的“下一次提交”了。使用consumer.commitSync()方法会一直重试,直到提交成功或者发生无法恢复的错误
提交偏移量的频率与处理消息批次的频率是一样的。但如果想要更频繁地提交该怎么办?如果poll方法返回了一大批数据,为了避免再均衡导致重复处理整批消息,想要在批次中间提交该怎么办?这种情况无法通过commitAsync()进行提交,因为它只会提交 最后一个偏移量,而此时该批次里的消息还没有处理完。
可以在commitAsync()时传入希望提交的分区和偏移量的map
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Properties; public class Consumer implements Runnable{ protected String groupId; protected String topic; protected Properties properties; private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>(); public Consumer(String groupId, String topic) { this.groupId = groupId; this.topic = topic; this.properties = getProperties(); } protected Properties getProperties() { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", groupId); // 组id,同一组下共享offset props.put("enable.auto.commit", "false"); // 关闭自动提交 props.put("fetch.min.bytes", "512"); // 指定从服务器获取的最小字节数 props.put("fetch.max.wait.ms", "100"); // 等待时间。在服务器数据不足fetch.min.bytes时, 达到时间也会返回 props.put("auto.offset.reset", "latest"); // 在offset 失效的情况下,earliest表示从起始位置开始读取分区记录。 latest 表示从最新记录读取(在消费者启动后) return props; } public void run() { KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties, new StringDeserializer(), new StringDeserializer()); int count = 0; try { consumer.subscribe(Collections.singletonList(topic)); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); if (records.isEmpty()) continue; System.out.println("recordsLen: " + records.count()); for ( ConsumerRecord <String, String> record : records ) { System.out.println("threadId: " + Thread.currentThread().getName() +" ,groupId: " + groupId +", partition: " + record.partition()); count++; currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1, "no medata")); if (count % 10 == 0) { consumer.commitAsync(currentOffsets, null); } } consumer.commitAsync(); } } catch (WakeupException E) { // 忽略异常,关闭消费者 } catch (Exception e) { e.printStackTrace(); }finally { try { consumer.commitSync(); } finally { consumer.close(); } } } public static void main(String[] args) { new Consumer("06", "topic06").run(); } }如果想从分区的起始位置开始读取消息,或者直接跳到分区的末尾读取消息,可以使用 seekToBeginning(Collection< TopicPartition> partitions) 和 seekToEnd(Collection< TopicPartition> partitions)
如果将偏移量保存到数据库中,而不是kafka里,在消费者处理完一条记录,存储到数据库同时,将偏移量一并提交。那么就不会出现重复处理数据的问题
模拟dbsource
package com.luslin.demo.kafka.dbsource; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; public class DBsource { Map<Integer, Long> source = new ConcurrentHashMap<>(); public void save(Integer partition, Long offset) {source.put(partition, offset);} public void save(Map<TopicPartition, OffsetAndMetadata> currentOffsets) { for (Map.Entry<TopicPartition, OffsetAndMetadata> topicPartitionOffsetAndMetadataEntry : currentOffsets.entrySet()) { save(topicPartitionOffsetAndMetadataEntry.getKey().partition(), topicPartitionOffsetAndMetadataEntry.getValue().offset()); } } public Long search(Integer partition) {return source.get(partition);} }DefaultRebalanceListener
package com.luslin.demo.kafka.consumer; import com.luslin.demo.kafka.dbsource.DBsource; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import java.util.Collection; import java.util.HashMap; import java.util.Map; public class DefaultRebalanceListener implements ConsumerRebalanceListener { private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>(); private org.apache.kafka.clients.consumer.Consumer consumer; private DBsource dBsource = new DBsource(); public DefaultRebalanceListener(org.apache.kafka.clients.consumer.Consumer consumer) { this.consumer = consumer; } public void put(TopicPartition partition, OffsetAndMetadata offsetAndMetadata) { currentOffsets.put(partition, offsetAndMetadata); } public Map<TopicPartition, OffsetAndMetadata> getCurrentOffsets() { return currentOffsets; } @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { System.out.println("Lost partitions in reblance. Committing current offsets: " + currentOffsets); consumer.commitSync(currentOffsets); dBsource.save(currentOffsets); } @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { System.out.println("get partitions :" + partitions.toString()); Long offset; for (TopicPartition partition: partitions) { offset = dBsource.search(partition.partition()); if (offset != null) { consumer.seek(partition, offset); } } } }consumer
package com.luslin.demo.kafka.consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Properties; public class Consumer implements Runnable{ protected String groupId; protected String topic; protected Properties properties; private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>(); public Consumer(String groupId, String topic) { this.groupId = groupId; this.topic = topic; this.properties = getProperties(); } protected Properties getProperties() { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", groupId); // 组id,同一组下共享offset props.put("enable.auto.commit", "false"); // 关闭自动提交 props.put("fetch.min.bytes", "512"); // 指定从服务器获取的最小字节数 props.put("fetch.max.wait.ms", "100"); // 等待时间。在服务器数据不足fetch.min.bytes时, 达到时间也会返回 props.put("auto.offset.reset", "latest"); // 在offset 失效的情况下,earliest表示从起始位置开始读取分区记录。 latest 表示从最新记录读取(在消费者启动后) return props; } public void run() { KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties, new StringDeserializer(), new StringDeserializer()); int count = 0; try { consumer.subscribe(Collections.singletonList(topic), new DefaultRebalanceListener(consumer)); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); if (records.isEmpty()) continue; System.out.println("recordsLen: " + records.count()); for ( ConsumerRecord <String, String> record : records ) { System.out.println("offset: " + record.offset() + ", partition: " + record.partition()); count++; currentOffsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1, "no medata")); if (count % 10 == 0) { consumer.commitAsync(currentOffsets, null); } } consumer.commitAsync(); } } catch (WakeupException E) { // 忽略异常,关闭消费者 } catch (Exception e) { e.printStackTrace(); }finally { try { consumer.commitSync(); } finally { consumer.close(); } } } public static void main(String[] args) { new Consumer("07", "topic07").run(); } }