代码地址:https://github.com/luslin1711/kafka_demo/tree/master/kafka_demo_06
消费者群组里的消费者共同读取主题的分区。一个新的消费者加入群组时,它读取的是原来由其他消费者读取的消息。当一个消费者被关闭或者发生崩溃时,他就离开群组,原本由它读取的分区将由群组里的其他消费者读取。在主题发生变化时,如添加了新的分区,也会发生分区重分配。
分区的所有权从一个消费者转移到另一个消费者,这被称为再均衡。再均衡非常重要,它为消费者群组带来了高可用性和伸缩性。不过在正常情况下,我们不希望发生这样的行为。在再均衡期间,消费者无法读取消息,造成整个群组一小段时间的不可用。另外,当分区被重新分配给另一个消费者时,消费者当前的读取状态会丢失,它有可能还需要去刷新缓存,在它重新恢复状态之前会拖慢应用程序。
在为消费者分配新的分区或移除旧的分区时,可以通过消费者API执行一些应用程序代码,在调用subscribe()方法穿进去一个ConsumerRebalanceListener.
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import java.util.Collection; import java.util.HashMap; import java.util.Map; public class DefaultRebalanceListener implements ConsumerRebalanceListener { private Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>(); private org.apache.kafka.clients.consumer.Consumer consumer; public DefaultRebalanceListener(org.apache.kafka.clients.consumer.Consumer consumer) { this.consumer = consumer; } public void put(TopicPartition partition, OffsetAndMetadata offsetAndMetadata) { currentOffsets.put(partition, offsetAndMetadata); } public Map<TopicPartition, OffsetAndMetadata> getCurrentOffsets() { return currentOffsets; } @Override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { System.out.println("Lost partitions in reblance. Committing current offsets: " + currentOffsets); consumer.commitSync(currentOffsets); } @Override public void onPartitionsAssigned(Collection<TopicPartition> partitions) { System.out.println("get partitions :" + partitions.toString()); } }Consumer
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class Consumer implements Runnable{ protected String groupId; protected String topic; protected Properties properties; public Consumer(String groupId, String topic) { this.groupId = groupId; this.topic = topic; this.properties = getProperties(); } protected Properties getProperties() { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", groupId); // 组id,同一组下共享offset props.put("enable.auto.commit", "false"); // 关闭自动提交 props.put("fetch.min.bytes", "512"); // 指定从服务器获取的最小字节数 props.put("fetch.max.wait.ms", "100"); // 等待时间。在服务器数据不足fetch.min.bytes时, 达到时间也会返回 props.put("auto.offset.reset", "latest"); // 在offset 失效的情况下,earliest表示从起始位置开始读取分区记录。 latest 表示从最新记录读取(在消费者启动后) return props; } public void run() { KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties, new StringDeserializer(), new StringDeserializer()); DefaultRebalanceListener rebalanceListener = new DefaultRebalanceListener(consumer); try { consumer.subscribe(Collections.singletonList(topic), rebalanceListener); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); if (records.isEmpty()) continue; System.out.println("recordsLen: " + records.count()); for ( ConsumerRecord <String, String> record : records ) { System.out.println("threadId: " + Thread.currentThread().getName() +" ,groupId: " + groupId +", partition: " + record.partition()); rebalanceListener.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset() + 1, "no medata")); } consumer.commitAsync(); } } catch (WakeupException E) { // 忽略异常,关闭消费者 } catch (Exception e) { e.printStackTrace(); }finally { try { consumer.commitSync(rebalanceListener.getCurrentOffsets()); } finally { consumer.close(); } } } public static void main(String[] args) { new Consumer("06", "topic06").run(); } }在启动或其他客户端退出时:
get partitions :[topic06-1, topic06-0] Lost partitions in reblance. Committing current offsets: {topic06-1=OffsetAndMetadata{offset=17, leaderEpoch=null, metadata='no medata'}, topic06-0=OffsetAndMetadata{offset=31, leaderEpoch=null, metadata='no medata'}}