Kafka的幂等性,只能保证一条记录的在分区发送的原子性,但是如果要保证多条记录(多分区)之间的完整性,这个时候就需要开启kafk的事务操作。
在Kafka0.11.0.0除了引入的幂等性的概念,同时也引入了事务的概念。通常Kafka的事务分为 生产者事务Only、消费者&生产者事务。一般来说默认消费者消费的消息的级别是read_uncommited数据,这有可能读取到事务失败的数据,所有在开启生产者事务之后,需要用户设置消费者的事务隔离级别。
isolation.level = read_uncommitted 默认
该选项有两个值read_committed|read_uncommitted,如果开始事务控制,消费端必须将事务的隔离级别设置为read_committed
开启的生产者事务的时候,只需要指定transactional.id属性即可,一旦开启了事务,默认生产者就已经开启了幂等性。但是要求"transactional.id"的取值必须是唯一的,同一时刻只能有一个"transactional.id"存储在,其他的将会被关闭。
producer
package com.kafka.transaction; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; import java.util.UUID; public class ProducerTransaction { public static void main(String[] args) { KafkaProducer<String, String> producer = getProducer(); //1.事务初始化 producer.initTransactions(); //2.开始事务 producer.beginTransaction(); try{ for (int i = 0; i < 10; i++) { if(i==8){ int j = 10/0; } ProducerRecord<String, String> record = new ProducerRecord<>("topic01", "transaction" + i, "error test" + i); producer.send(record); //把缓冲区的数据刷出去 producer.flush(); } //3.提交事务 producer.commitTransaction(); }catch (Exception e){ System.err.println("error message:"+e.getMessage()); //4.终止事务 producer.abortTransaction(); }finally { producer.close(); } } public static KafkaProducer<String,String> getProducer(){ Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"node05kafka:9092,node06kafka:9092,node07kafka:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class); // 单个链接上未确认请求的最大数量,若>1并且开启重试,可能导致数据重排序。 properties.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION,1); // 开启事务的唯一标识事务id properties.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"transaction_id"+ UUID.randomUUID().toString()); // 设置批处理缓冲区的大小 properties.put(ProducerConfig.BATCH_SIZE_CONFIG,1024); // 若数据不够缓冲区大小,等待时间,刷出数据 properties.put(ProducerConfig.LINGER_MS_CONFIG,5); //开启幂等和重试机制 properties.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG,true); // 设置ack的应答级别 properties.put(ProducerConfig.ACKS_CONFIG,"all"); //重试多少次 有默认值2147483647,可以不设置 properties.put(ProducerConfig.RETRIES_CONFIG,3); KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties); return producer; } }注意consumer也要开启 隔离级别才行,才能保证读取到 正确的数据,ConsumerConfig.ISOLATION_LEVEL_CONFIG,“read_uncommitted”; 主要有两种级别:read_uncommitted和read_committed;默认read_uncommitted consumer
package com.kafka.transaction; import com.sun.org.apache.bcel.internal.generic.NEW; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import java.time.Duration; import java.util.Arrays; import java.util.Iterator; import java.util.Properties; public class ConsumerTransaction { public static void main(String[] args) { Properties properties = new Properties(); properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"node05kafka:9092,node06kafka:9092,node07kafka:9092"); properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); properties.put(ConsumerConfig.GROUP_ID_CONFIG,"g2"); //消费者开启事务,要设置隔离级别,两种read_committed和read_uncommitted /** * read_committed:读取已经提交的 * read_uncommitted:读取包含未提交的 */ properties.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG,"read_uncommitted"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties); consumer.subscribe(Arrays.asList("topic01")); while(true){ ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1)); if(!consumerRecords.isEmpty()){ Iterator<ConsumerRecord<String, String>> iterator = consumerRecords.iterator(); if(iterator.hasNext()){ ConsumerRecord<String, String> record = iterator.next(); String topic = record.topic(); long timestamp = record.timestamp(); int partition = record.partition(); long offset = record.offset(); String key = record.key(); String value = record.value(); System.out.println(key+"\t"+value+"\t"+offset+"\t"+partition+"\t"+topic+"\t"+timestamp); } } } } }producer控制台
INFO 2020-10-21 16:41:46 org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-1, transactionalId=transaction_id7af06238-55ec-4b49-8446-23362ecbb083] ProducerId set to -1 with epoch -1 INFO 2020-10-21 16:41:46 org.apache.kafka.clients.Metadata - Cluster ID: IPQxx1YvSm2lsAXHgX5mVg INFO 2020-10-21 16:41:47 org.apache.kafka.clients.producer.internals.TransactionManager - [Producer clientId=producer-1, transactionalId=transaction_id7af06238-55ec-4b49-8446-23362ecbb083] ProducerId set to 4000 with epoch 0 error message:/ by zero INFO 2020-10-21 16:41:48 org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1, transactionalId=transaction_id7af06238-55ec-4b49-8446-23362ecbb083] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.consumer控制台两种级别的输出 read_committed:没有输出,因为 producer是错误的测试 read_uncommitted:
transaction0 error test0 70 2 topic01 1603269707202 transaction1 error test1 84 1 topic01 1603269707335 transaction2 error test2 85 1 topic01 1603269707371 transaction3 error test3 86 1 topic01 1603269707390 transaction4 error test4 71 2 topic01 1603269707403 transaction5 error test5 72 2 topic01 1603269707429 transaction6 error test6 73 2 topic01 1603269707446 transaction7 error test7 40 0 topic01 1603269707459