06.Java整合Kafka

it2025-11-06  13

五种客户端

Producer负责发送消息Consumer负责消费消息Streams高效的将输入流转换到输出流Connect从一些源系统或应用程序中拉取数据到KafkaAdmin负责管理和检测Topic和broker以及其他Kafka对象

依赖

<!--这个依赖包括Admin,producer,consumer客户端--> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.4.1</version> </dependency>

Admin客户端操作

创建AdminClient

Properties properties = new Properties(); properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.17.99:9092"); AdminClient adminClient = AdminClient.create(properties);

创建Topic

ArrayList<NewTopic> newTopics = new ArrayList<>(); Short sr = 1; //参数1:Topic名称,参数2:分区数量,参数3:副本数量 newTopics.add(new NewTopic("shanguoyu", 1, sr)); //创建Topic CreateTopicsResult topics = adminClient.createTopics(newTopics);

查看所有Topic信息

ListTopicsResult listTopicsResult = adminClient.listTopics(); //获取所有Topic的名称返回的是Future KafkaFuture<Set<String>> names = listTopicsResult.names(); //打印出所有Topic的名称 names.get().forEach(System.out::println);

删除Topic

DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList("shanguoyu"));

查看Topic详情

DescribeTopicsResult describeTopics = adminClient.describeTopics(Arrays.asList("shanguoyu")); Map<String, TopicDescription> stringTopicDescriptionMap = describeTopics.all().get(); stringTopicDescriptionMap.entrySet().forEach(System.out::println);

查看Topic配置信息

ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, "shanguoyu"); DescribeConfigsResult describeConfigsResult = adminClient.describeConfigs(Arrays.asList(configResource)); Map<ConfigResource, Config> configResourceConfigMap = describeConfigsResult.all().get(); configResourceConfigMap.entrySet().forEach(System.out::println);

修改Topic配置信息

ConfigResource configResource = new ConfigResource(Type.TOPIC, "shanguoyu"); Config config = new Config(Arrays.asList(new ConfigEntry("preallocate", "true"))); Map<ConfigResource, Config> configResourceConfigMap = new HashMap<>(); configResourceConfigMap.put(configResource, config); //新API对于单节点不友好 AlterConfigsResult alterConfigsResult = adminClient.alterConfigs(configResourceConfigMap);

增加分区

Map<String, NewPartitions> stringNewPartitionsMap=new HashMap<>(); NewPartitions newPartitions = NewPartitions.increaseTo(3); stringNewPartitionsMap.put("shanguoyu",newPartitions ); CreatePartitionsResult partitions = adminClient.createPartitions(stringNewPartitionsMap);

Producer客户端操作

异步消息无callback

//构建连接参数 Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.17.99:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); //创建连接 try (Producer producer = new KafkaProducer<>(properties)) { //创建消息对象 ProducerRecord<String, String> record = new ProducerRecord<String, String>("shanguoyu", "key", "value"); //发送消息 producer.send(record); }

异步消息有callback

//构建连接参数 Properties properties = new Properties(); properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.17.99:9092"); properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); //创建连接 try (Producer producer = new KafkaProducer<>(properties)) { //创建消息对象 ProducerRecord<String, String> record = new ProducerRecord<String, String>("shanguoyu", "key", "value"); //发送消息 producer.send(record,(metadata,exception)->{ System.out.println(metadata.partition()); System.out.println(metadata.offset()); System.out.println(metadata.topic()); }); }

自定义分区策略

properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "cn.shanguoyu.kafka.kafkaapi.producer.PartitionerLB");

指定压缩算法

properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip");

Consumer

自动提交offset的消费方式

//构建连接参数 Properties properties = new Properties(); properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.17.99:9092"); //消费组ID properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test"); //自动提交 建议为false properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true"); //自动提交间隔 properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000"); properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); //创建消费者连接 try (Consumer<String, String> consumer = new KafkaConsumer<>(properties)) { //订阅主题 consumer.subscribe(Arrays.asList("shanguoyu")); while (true) { ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(10000)); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println("topic = " + consumerRecord.topic()); System.out.println("partition = " + consumerRecord.partition()); System.out.println("offset = " + consumerRecord.offset()); System.out.println("key = " + consumerRecord.key()); System.out.println("value = " + consumerRecord.value()); System.out.println("==============================================="); } } }

手动提交offset的消费方式

Properties properties = new Properties(); properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.17.99:9092"); properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test"); //不自动提交 properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); //创建消费者连接 try (Consumer<String, String> consumer = new KafkaConsumer<>(properties)) { //订阅主题 consumer.subscribe(Arrays.asList("shanguoyu")); while (true) { ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(10000)); for (ConsumerRecord<String, String> consumerRecord : consumerRecords) { System.out.println("topic = " + consumerRecord.topic()); System.out.println("partition = " + consumerRecord.partition()); System.out.println("offset = " + consumerRecord.offset()); System.out.println("key = " + consumerRecord.key()); System.out.println("value = " + consumerRecord.value()); System.out.println("==============================================="); } //手动提交offset consumer.commitAsync(); } }

按照分区获取记录并且每个分区提交一次offset

Properties properties = new Properties(); properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.17.99:9092"); properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test"); //不自动提交 properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); //创建消费者连接 try (Consumer<String, String> consumer = new KafkaConsumer<>(properties)) { //订阅主题 consumer.subscribe(Arrays.asList("shanguoyu")); while (true) { //获取所有记录 ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(10000)); //获取所有分区 Set<TopicPartition> partitions = consumerRecords.partitions(); //遍历分区 for (TopicPartition partition : partitions) { //按照分区获取记录 List<ConsumerRecord<String, String>> records = consumerRecords.records(partition); records.forEach(record -> { System.out.println("topic = " + record.topic()); System.out.println("partition = " + record.partition()); System.out.println("offset = " + record.offset()); System.out.println("key = " + record.key()); System.out.println("value = " + record.value()); }); Map<TopicPartition, OffsetAndMetadata> offsetAndMetadataMap = new HashMap<>(); //获取当前分区中最后一个记录的offset long offset = records.get(records.size() - 1).offset(); offsetAndMetadataMap.put(partition, new OffsetAndMetadata(offset + 1)); consumer.commitSync(offsetAndMetadataMap); } } }

手动提交offset并订阅某个Topic下的分区

Properties properties = new Properties(); properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.17.99:9092"); properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test"); //不自动提交 properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); //创建消费者连接 try (Consumer<String, String> consumer = new KafkaConsumer<>(properties)) { TopicPartition p0=new TopicPartition("shanguoyu",0 ); TopicPartition p1=new TopicPartition("shanguoyu",1 ); TopicPartition p2=new TopicPartition("shanguoyu",2 ); //只订阅0分区 consumer.assign(Arrays.asList(p0)); while (true) { //获取所有记录 ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(10000)); //获取所有分区 Set<TopicPartition> partitions = consumerRecords.partitions(); //遍历分区 for (TopicPartition partition : partitions) { //按照分区获取记录 List<ConsumerRecord<String, String>> records = consumerRecords.records(partition); records.forEach(record -> { System.out.println("topic = " + record.topic()); System.out.println("partition = " + record.partition()); System.out.println("offset = " + record.offset()); System.out.println("key = " + record.key()); System.out.println("value = " + record.value()); System.out.println("================================="); }); Map<TopicPartition, OffsetAndMetadata> offsetAndMetadataMap = new HashMap<>(); //获取当前分区中最后一个记录的offset long offset = records.get(records.size() - 1).offset(); offsetAndMetadataMap.put(partition, new OffsetAndMetadata(offset + 1)); consumer.commitSync(offsetAndMetadataMap); } } }

手动指定Consumer Offset位置及手动提交

Properties properties = new Properties(); properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.17.99:9092"); properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test"); //不自动提交 properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); //创建消费者连接 try (Consumer<String, String> consumer = new KafkaConsumer<>(properties)) { TopicPartition p0=new TopicPartition("shanguoyu",0 ); TopicPartition p1=new TopicPartition("shanguoyu",1 ); TopicPartition p2=new TopicPartition("shanguoyu",2 ); //只订阅0分区 consumer.assign(Arrays.asList(p0)); //指定分区的偏移量 consumer.seek(p0, 0); while (true) { //获取所有记录 ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofMillis(10000)); //获取所有分区 Set<TopicPartition> partitions = consumerRecords.partitions(); //遍历分区 for (TopicPartition partition : partitions) { //按照分区获取记录 List<ConsumerRecord<String, String>> records = consumerRecords.records(partition); records.forEach(record -> { System.out.println("topic = " + record.topic()); System.out.println("partition = " + record.partition()); System.out.println("offset = " + record.offset()); System.out.println("key = " + record.key()); System.out.println("value = " + record.value()); System.out.println("================================="); }); Map<TopicPartition, OffsetAndMetadata> offsetAndMetadataMap = new HashMap<>(); //获取当前分区中最后一个记录的offset long offset = records.get(records.size() - 1).offset(); offsetAndMetadataMap.put(partition, new OffsetAndMetadata(offset + 1)); consumer.commitSync(offsetAndMetadataMap); } } }
最新回复(0)