kafka-demo 1 快速开始

it2023-03-28  72

1 快速开始

代码地址:https://github.com/luslin1711/kafka_demo/tree/master/kafka_demo_01

一、kafka集群部署

使用docker-compose 文件进行项目部署

version: '3' services: zookeeper: image: wurstmeister/zookeeper ports: - "2181:2181" kafka1: image: wurstmeister/kafka depends_on: [ zookeeper ] ports: - "9092:9092" environment: KAFKA_BROKER_ID: 1 KAFKA_ADVERTISED_HOST_NAME: 192.168.1.88 KAFKA_ADVERTISED_PORT: 9092 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.1.88:9092 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 volumes: - /data/product/zj_bigdata/data/kafka1/docker.sock:/var/run/docker.sock kafka2: image: wurstmeister/kafka depends_on: [ zookeeper ] ports: - "9093:9092" environment: KAFKA_BROKER_ID: 2 KAFKA_ADVERTISED_HOST_NAME: 192.168.1.88 KAFKA_ADVERTISED_PORT: 9093 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.1.88:9093 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 volumes: - /data/product/zj_bigdata/data/kafka2/docker.sock:/var/run/docker.sock kafka3: image: wurstmeister/kafka depends_on: [ zookeeper ] ports: - "9094:9092" environment: KAFKA_BROKER_ID: 3 KAFKA_ADVERTISED_HOST_NAME: 192.168.1.88 KAFKA_ADVERTISED_PORT: 9094 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.1.88:9094 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 volumes: - /data/product/zj_bigdata/data/kafka3/docker.sock:/var/run/docker.sock kafka-manager: image: sheepkiller/kafka-manager:latest ports: - "9000:9000" links: - zookeeper - kafka1 - kafka2 - kafka3 environment: ZK_HOSTS: zookeeper:2181 APPLICATION_SECRET: letmein KM_ARGS: -Djava.net.preferIPv4Stack=true

项目中,开启了zookeeper服务, 端口号2181, 开启了3个kafka服务, 端口号9002, 9003, 9004. 开启了kafka-manager服务在9000端口

使用kafka-manager或命令行工具创建my-topic主题,并将分区设置为9个。

Partition分布:

broker Partition 1 0,2,5,8 2 3,6 3 1,4,7

kafka 服务版本 2.6.0

二、 生产者

使用 gradle 或maven 将依赖导入项目 ‘org.apache.kafka:kafka_2.13:2.6.0’

package com.luslin.demo.kafka.producer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; public class Producer { public static void main(String[] args) throws ExecutionException, InterruptedException { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092, localhost:9093"); props.put("acks", "all"); props.put("enable.idempotence", "true"); props.put("retries", 5); props.put("max.in.flight.requests.per.connection", 1); KafkaProducer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer()); for (int i = 0; i < 100; i++) { Future<RecordMetadata> recordMetadataFuture = producer.send(new ProducerRecord<String, String>("topic01", Integer.toString(i), Integer.toString(i))); RecordMetadata recordMetadata = recordMetadataFuture.get(); // 调用Future对象get()方法, 同步发送数据,获取返回结果。 不调用 System.out.println("offset: " + recordMetadata.offset() + ", partition: " + recordMetadata.partition()); } producer.close(); } }

说明:

1 Properties 用于设置服务配置。全部配置可在http://kafka.apache.org/documentation/#producerconfigs中查询。

bootstrap.servers 用于配置kafka服务地址。 只是启动时去查询集群中可用地址。不需要将全部地址写入。 这里多写是为了防止某个broker不可用 acks 指定有多少分区副本接收到消息才认为消息写入成功。 0 表示不会等待服务器确认,也就是说如果这时发生错误,生产者不会知道。 1 表示首领节点接收到消息就认为消息发送成功。但如果首领节点接收到消息,还没有同步到其他副本,并立刻宕机。那么消息仍然会丢失。 3 all 或 -1 表示全部副本接收到消息才认为消息发送成功 enable.idempotence 表示开启幂等性,确保相同的消息只会本写入一次。开启需要retries > 0 and max.in.flight.requests.per.connection <= 5 and ack = all retries 表示失败重试次数 max.in.flight.requests.per.connection 表示每个连接同时可以发送的请求个数。 key.serializer 与 value.serializer 可在props中定义, 也可传入KafkaProducer的构造方法中。 传入构造方法中的Serializer会覆盖掉Properties中设置的参数。 表示key 与value的序列化类

一般情况下,在同一个分区中的消息顺序是按照发送顺序写入的,但是如果retries > 0, 并且消息写入失败。就有可能发送乱序现象。如果对消息的顺序有严格要求,可以将max.in.flight.requests.per.connection 设置为1, 这会减少吞吐量,增加阻塞时间。但是能够确保消息写入顺序

三、消费者

package com.luslin.demo.kafka.consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.serialization.StringDeserializer; import java.time.Duration; import java.util.Collections; import java.util.Properties; public class Consumer { public static void main(String[] args) { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "consumers01"); // 组id,同一组下共享offset props.put("enable.auto.commit", "false"); // 关闭自动提交 props.put("fetch.min.bytes", "512"); // 指定从服务器获取的最小字节数 props.put("fetch.max.wait.ms", "100"); // 等待时间。在服务器数据不足fetch.min.bytes时, 达到时间也会返回 props.put("auto.offset.reset", "earliest"); // 在offset 失效的情况下,earliest表示从起始位置开始读取分区记录。 latest 表示从最新记录读取(在消费者启动后) KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props, new StringDeserializer(), new StringDeserializer()); try { consumer.subscribe(Collections.singletonList("topic01")); while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100)); if (records.isEmpty()) continue; System.out.println("recordsLen: " + records.count()); for ( ConsumerRecord <String, String> record : records ) { System.out.println("partition: " + record.partition() + " key: " + record.key() + " offset: " + record.offset() ); } consumer.commitAsync(); } } finally { consumer.close(); } }
最新回复(0)