代码地址:https://github.com/luslin1711/kafka_demo/tree/master/kafka_demo_01
使用docker-compose 文件进行项目部署
version: '3' services: zookeeper: image: wurstmeister/zookeeper ports: - "2181:2181" kafka1: image: wurstmeister/kafka depends_on: [ zookeeper ] ports: - "9092:9092" environment: KAFKA_BROKER_ID: 1 KAFKA_ADVERTISED_HOST_NAME: 192.168.1.88 KAFKA_ADVERTISED_PORT: 9092 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.1.88:9092 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 volumes: - /data/product/zj_bigdata/data/kafka1/docker.sock:/var/run/docker.sock kafka2: image: wurstmeister/kafka depends_on: [ zookeeper ] ports: - "9093:9092" environment: KAFKA_BROKER_ID: 2 KAFKA_ADVERTISED_HOST_NAME: 192.168.1.88 KAFKA_ADVERTISED_PORT: 9093 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.1.88:9093 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 volumes: - /data/product/zj_bigdata/data/kafka2/docker.sock:/var/run/docker.sock kafka3: image: wurstmeister/kafka depends_on: [ zookeeper ] ports: - "9094:9092" environment: KAFKA_BROKER_ID: 3 KAFKA_ADVERTISED_HOST_NAME: 192.168.1.88 KAFKA_ADVERTISED_PORT: 9094 KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.1.88:9094 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 volumes: - /data/product/zj_bigdata/data/kafka3/docker.sock:/var/run/docker.sock kafka-manager: image: sheepkiller/kafka-manager:latest ports: - "9000:9000" links: - zookeeper - kafka1 - kafka2 - kafka3 environment: ZK_HOSTS: zookeeper:2181 APPLICATION_SECRET: letmein KM_ARGS: -Djava.net.preferIPv4Stack=true项目中,开启了zookeeper服务, 端口号2181, 开启了3个kafka服务, 端口号9002, 9003, 9004. 开启了kafka-manager服务在9000端口
使用kafka-manager或命令行工具创建my-topic主题,并将分区设置为9个。
Partition分布:
broker Partition 1 0,2,5,8 2 3,6 3 1,4,7kafka 服务版本 2.6.0
使用 gradle 或maven 将依赖导入项目 ‘org.apache.kafka:kafka_2.13:2.6.0’
package com.luslin.demo.kafka.producer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.serialization.StringSerializer; import java.util.Properties; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; public class Producer { public static void main(String[] args) throws ExecutionException, InterruptedException { Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092, localhost:9093"); props.put("acks", "all"); props.put("enable.idempotence", "true"); props.put("retries", 5); props.put("max.in.flight.requests.per.connection", 1); KafkaProducer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer()); for (int i = 0; i < 100; i++) { Future<RecordMetadata> recordMetadataFuture = producer.send(new ProducerRecord<String, String>("topic01", Integer.toString(i), Integer.toString(i))); RecordMetadata recordMetadata = recordMetadataFuture.get(); // 调用Future对象get()方法, 同步发送数据,获取返回结果。 不调用 System.out.println("offset: " + recordMetadata.offset() + ", partition: " + recordMetadata.partition()); } producer.close(); } }说明:
1 Properties 用于设置服务配置。全部配置可在http://kafka.apache.org/documentation/#producerconfigs中查询。
bootstrap.servers 用于配置kafka服务地址。 只是启动时去查询集群中可用地址。不需要将全部地址写入。 这里多写是为了防止某个broker不可用 acks 指定有多少分区副本接收到消息才认为消息写入成功。 0 表示不会等待服务器确认,也就是说如果这时发生错误,生产者不会知道。 1 表示首领节点接收到消息就认为消息发送成功。但如果首领节点接收到消息,还没有同步到其他副本,并立刻宕机。那么消息仍然会丢失。 3 all 或 -1 表示全部副本接收到消息才认为消息发送成功 enable.idempotence 表示开启幂等性,确保相同的消息只会本写入一次。开启需要retries > 0 and max.in.flight.requests.per.connection <= 5 and ack = all retries 表示失败重试次数 max.in.flight.requests.per.connection 表示每个连接同时可以发送的请求个数。 key.serializer 与 value.serializer 可在props中定义, 也可传入KafkaProducer的构造方法中。 传入构造方法中的Serializer会覆盖掉Properties中设置的参数。 表示key 与value的序列化类一般情况下,在同一个分区中的消息顺序是按照发送顺序写入的,但是如果retries > 0, 并且消息写入失败。就有可能发送乱序现象。如果对消息的顺序有严格要求,可以将max.in.flight.requests.per.connection 设置为1, 这会减少吞吐量,增加阻塞时间。但是能够确保消息写入顺序