五种客户端
Producer负责发送消息Consumer负责消费消息Streams高效的将输入流转换到输出流Connect从一些源系统或应用程序中拉取数据到KafkaAdmin负责管理和检测Topic和broker以及其他Kafka对象
依赖
<dependency>
<groupId>org.apache.kafka
</groupId>
<artifactId>kafka-clients
</artifactId>
<version>2.4.1
</version>
</dependency>
Admin客户端操作
创建AdminClient
Properties properties
= new Properties();
properties
.setProperty(AdminClientConfig
.BOOTSTRAP_SERVERS_CONFIG
, "192.168.17.99:9092");
AdminClient adminClient
= AdminClient
.create(properties
);
创建Topic
ArrayList
<NewTopic> newTopics
= new ArrayList<>();
Short sr
= 1;
newTopics
.add(new NewTopic("shanguoyu", 1, sr
));
CreateTopicsResult topics
= adminClient
.createTopics(newTopics
);
查看所有Topic信息
ListTopicsResult listTopicsResult
= adminClient
.listTopics();
KafkaFuture
<Set
<String>> names
= listTopicsResult
.names();
names
.get().forEach(System
.out
::println
);
删除Topic
DeleteTopicsResult deleteTopicsResult
= adminClient
.deleteTopics(Arrays
.asList("shanguoyu"));
查看Topic详情
DescribeTopicsResult describeTopics
= adminClient
.describeTopics(Arrays
.asList("shanguoyu"));
Map
<String, TopicDescription> stringTopicDescriptionMap
= describeTopics
.all().get();
stringTopicDescriptionMap
.entrySet().forEach(System
.out
::println
);
查看Topic配置信息
ConfigResource configResource
= new ConfigResource(ConfigResource
.Type
.TOPIC
, "shanguoyu");
DescribeConfigsResult describeConfigsResult
= adminClient
.describeConfigs(Arrays
.asList(configResource
));
Map
<ConfigResource, Config> configResourceConfigMap
= describeConfigsResult
.all().get();
configResourceConfigMap
.entrySet().forEach(System
.out
::println
);
修改Topic配置信息
ConfigResource configResource
= new ConfigResource(Type
.TOPIC
, "shanguoyu");
Config config
= new Config(Arrays
.asList(new ConfigEntry("preallocate", "true")));
Map
<ConfigResource, Config> configResourceConfigMap
= new HashMap<>();
configResourceConfigMap
.put(configResource
, config
);
AlterConfigsResult alterConfigsResult
= adminClient
.alterConfigs(configResourceConfigMap
);
增加分区
Map
<String, NewPartitions> stringNewPartitionsMap
=new HashMap<>();
NewPartitions newPartitions
= NewPartitions
.increaseTo(3);
stringNewPartitionsMap
.put("shanguoyu",newPartitions
);
CreatePartitionsResult partitions
= adminClient
.createPartitions(stringNewPartitionsMap
);
Producer客户端操作
异步消息无callback
Properties properties
= new Properties();
properties
.put(ProducerConfig
.BOOTSTRAP_SERVERS_CONFIG
, "192.168.17.99:9092");
properties
.put(ProducerConfig
.KEY_SERIALIZER_CLASS_CONFIG
, "org.apache.kafka.common.serialization.StringSerializer");
properties
.put(ProducerConfig
.VALUE_SERIALIZER_CLASS_CONFIG
, "org.apache.kafka.common.serialization.StringSerializer");
try (Producer producer
= new KafkaProducer<>(properties
)) {
ProducerRecord
<String, String> record
= new ProducerRecord<String, String>("shanguoyu", "key", "value");
producer
.send(record
);
}
异步消息有callback
Properties properties
= new Properties();
properties
.put(ProducerConfig
.BOOTSTRAP_SERVERS_CONFIG
, "192.168.17.99:9092");
properties
.put(ProducerConfig
.KEY_SERIALIZER_CLASS_CONFIG
, "org.apache.kafka.common.serialization.StringSerializer");
properties
.put(ProducerConfig
.VALUE_SERIALIZER_CLASS_CONFIG
, "org.apache.kafka.common.serialization.StringSerializer");
try (Producer producer
= new KafkaProducer<>(properties
)) {
ProducerRecord
<String, String> record
= new ProducerRecord<String, String>("shanguoyu", "key", "value");
producer
.send(record
,(metadata
,exception
)->{
System
.out
.println(metadata
.partition());
System
.out
.println(metadata
.offset());
System
.out
.println(metadata
.topic());
});
}
自定义分区策略
properties
.put(ProducerConfig
.PARTITIONER_CLASS_CONFIG
, "cn.shanguoyu.kafka.kafkaapi.producer.PartitionerLB");
指定压缩算法
properties
.put(ProducerConfig
.COMPRESSION_TYPE_CONFIG
, "gzip");
Consumer
自动提交offset的消费方式
Properties properties
= new Properties();
properties
.setProperty(ConsumerConfig
.BOOTSTRAP_SERVERS_CONFIG
, "192.168.17.99:9092");
properties
.setProperty(ConsumerConfig
.GROUP_ID_CONFIG
, "test");
properties
.setProperty(ConsumerConfig
.ENABLE_AUTO_COMMIT_CONFIG
, "true");
properties
.setProperty(ConsumerConfig
.AUTO_COMMIT_INTERVAL_MS_CONFIG
, "1000");
properties
.setProperty(ConsumerConfig
.KEY_DESERIALIZER_CLASS_CONFIG
, "org.apache.kafka.common.serialization.StringDeserializer");
properties
.setProperty(ConsumerConfig
.VALUE_DESERIALIZER_CLASS_CONFIG
, "org.apache.kafka.common.serialization.StringDeserializer");
try (Consumer
<String, String> consumer
= new KafkaConsumer<>(properties
)) {
consumer
.subscribe(Arrays
.asList("shanguoyu"));
while (true) {
ConsumerRecords
<String, String> consumerRecords
= consumer
.poll(Duration
.ofMillis(10000));
for (ConsumerRecord
<String, String> consumerRecord
: consumerRecords
) {
System
.out
.println("topic = " + consumerRecord
.topic());
System
.out
.println("partition = " + consumerRecord
.partition());
System
.out
.println("offset = " + consumerRecord
.offset());
System
.out
.println("key = " + consumerRecord
.key());
System
.out
.println("value = " + consumerRecord
.value());
System
.out
.println("===============================================");
}
}
}
手动提交offset的消费方式
Properties properties
= new Properties();
properties
.setProperty(ConsumerConfig
.BOOTSTRAP_SERVERS_CONFIG
, "192.168.17.99:9092");
properties
.setProperty(ConsumerConfig
.GROUP_ID_CONFIG
, "test");
properties
.setProperty(ConsumerConfig
.ENABLE_AUTO_COMMIT_CONFIG
, "false");
properties
.setProperty(ConsumerConfig
.KEY_DESERIALIZER_CLASS_CONFIG
, "org.apache.kafka.common.serialization.StringDeserializer");
properties
.setProperty(ConsumerConfig
.VALUE_DESERIALIZER_CLASS_CONFIG
, "org.apache.kafka.common.serialization.StringDeserializer");
try (Consumer
<String, String> consumer
= new KafkaConsumer<>(properties
)) {
consumer
.subscribe(Arrays
.asList("shanguoyu"));
while (true) {
ConsumerRecords
<String, String> consumerRecords
= consumer
.poll(Duration
.ofMillis(10000));
for (ConsumerRecord
<String, String> consumerRecord
: consumerRecords
) {
System
.out
.println("topic = " + consumerRecord
.topic());
System
.out
.println("partition = " + consumerRecord
.partition());
System
.out
.println("offset = " + consumerRecord
.offset());
System
.out
.println("key = " + consumerRecord
.key());
System
.out
.println("value = " + consumerRecord
.value());
System
.out
.println("===============================================");
}
consumer
.commitAsync();
}
}
按照分区获取记录并且每个分区提交一次offset
Properties properties
= new Properties();
properties
.setProperty(ConsumerConfig
.BOOTSTRAP_SERVERS_CONFIG
, "192.168.17.99:9092");
properties
.setProperty(ConsumerConfig
.GROUP_ID_CONFIG
, "test");
properties
.setProperty(ConsumerConfig
.ENABLE_AUTO_COMMIT_CONFIG
, "false");
properties
.setProperty(ConsumerConfig
.KEY_DESERIALIZER_CLASS_CONFIG
, "org.apache.kafka.common.serialization.StringDeserializer");
properties
.setProperty(ConsumerConfig
.VALUE_DESERIALIZER_CLASS_CONFIG
, "org.apache.kafka.common.serialization.StringDeserializer");
try (Consumer
<String, String> consumer
= new KafkaConsumer<>(properties
)) {
consumer
.subscribe(Arrays
.asList("shanguoyu"));
while (true) {
ConsumerRecords
<String, String> consumerRecords
= consumer
.poll(Duration
.ofMillis(10000));
Set
<TopicPartition> partitions
= consumerRecords
.partitions();
for (TopicPartition partition
: partitions
) {
List
<ConsumerRecord
<String, String>> records
= consumerRecords
.records(partition
);
records
.forEach(record
-> {
System
.out
.println("topic = " + record
.topic());
System
.out
.println("partition = " + record
.partition());
System
.out
.println("offset = " + record
.offset());
System
.out
.println("key = " + record
.key());
System
.out
.println("value = " + record
.value());
});
Map
<TopicPartition, OffsetAndMetadata> offsetAndMetadataMap
= new HashMap<>();
long offset
= records
.get(records
.size() - 1).offset();
offsetAndMetadataMap
.put(partition
, new OffsetAndMetadata(offset
+ 1));
consumer
.commitSync(offsetAndMetadataMap
);
}
}
}
手动提交offset并订阅某个Topic下的分区
Properties properties
= new Properties();
properties
.setProperty(ConsumerConfig
.BOOTSTRAP_SERVERS_CONFIG
, "192.168.17.99:9092");
properties
.setProperty(ConsumerConfig
.GROUP_ID_CONFIG
, "test");
properties
.setProperty(ConsumerConfig
.ENABLE_AUTO_COMMIT_CONFIG
, "false");
properties
.setProperty(ConsumerConfig
.KEY_DESERIALIZER_CLASS_CONFIG
, "org.apache.kafka.common.serialization.StringDeserializer");
properties
.setProperty(ConsumerConfig
.VALUE_DESERIALIZER_CLASS_CONFIG
, "org.apache.kafka.common.serialization.StringDeserializer");
try (Consumer
<String, String> consumer
= new KafkaConsumer<>(properties
)) {
TopicPartition p0
=new TopicPartition("shanguoyu",0 );
TopicPartition p1
=new TopicPartition("shanguoyu",1 );
TopicPartition p2
=new TopicPartition("shanguoyu",2 );
consumer
.assign(Arrays
.asList(p0
));
while (true) {
ConsumerRecords
<String, String> consumerRecords
= consumer
.poll(Duration
.ofMillis(10000));
Set
<TopicPartition> partitions
= consumerRecords
.partitions();
for (TopicPartition partition
: partitions
) {
List
<ConsumerRecord
<String, String>> records
= consumerRecords
.records(partition
);
records
.forEach(record
-> {
System
.out
.println("topic = " + record
.topic());
System
.out
.println("partition = " + record
.partition());
System
.out
.println("offset = " + record
.offset());
System
.out
.println("key = " + record
.key());
System
.out
.println("value = " + record
.value());
System
.out
.println("=================================");
});
Map
<TopicPartition, OffsetAndMetadata> offsetAndMetadataMap
= new HashMap<>();
long offset
= records
.get(records
.size() - 1).offset();
offsetAndMetadataMap
.put(partition
, new OffsetAndMetadata(offset
+ 1));
consumer
.commitSync(offsetAndMetadataMap
);
}
}
}
手动指定Consumer Offset位置及手动提交
Properties properties
= new Properties();
properties
.setProperty(ConsumerConfig
.BOOTSTRAP_SERVERS_CONFIG
, "192.168.17.99:9092");
properties
.setProperty(ConsumerConfig
.GROUP_ID_CONFIG
, "test");
properties
.setProperty(ConsumerConfig
.ENABLE_AUTO_COMMIT_CONFIG
, "false");
properties
.setProperty(ConsumerConfig
.KEY_DESERIALIZER_CLASS_CONFIG
, "org.apache.kafka.common.serialization.StringDeserializer");
properties
.setProperty(ConsumerConfig
.VALUE_DESERIALIZER_CLASS_CONFIG
, "org.apache.kafka.common.serialization.StringDeserializer");
try (Consumer
<String, String> consumer
= new KafkaConsumer<>(properties
)) {
TopicPartition p0
=new TopicPartition("shanguoyu",0 );
TopicPartition p1
=new TopicPartition("shanguoyu",1 );
TopicPartition p2
=new TopicPartition("shanguoyu",2 );
consumer
.assign(Arrays
.asList(p0
));
consumer
.seek(p0
, 0);
while (true) {
ConsumerRecords
<String, String> consumerRecords
= consumer
.poll(Duration
.ofMillis(10000));
Set
<TopicPartition> partitions
= consumerRecords
.partitions();
for (TopicPartition partition
: partitions
) {
List
<ConsumerRecord
<String, String>> records
= consumerRecords
.records(partition
);
records
.forEach(record
-> {
System
.out
.println("topic = " + record
.topic());
System
.out
.println("partition = " + record
.partition());
System
.out
.println("offset = " + record
.offset());
System
.out
.println("key = " + record
.key());
System
.out
.println("value = " + record
.value());
System
.out
.println("=================================");
});
Map
<TopicPartition, OffsetAndMetadata> offsetAndMetadataMap
= new HashMap<>();
long offset
= records
.get(records
.size() - 1).offset();
offsetAndMetadataMap
.put(partition
, new OffsetAndMetadata(offset
+ 1));
consumer
.commitSync(offsetAndMetadataMap
);
}
}
}