05.Kafka配置文件详解

it2024-04-20  54

server.properties

#此服务器的代理ID,默认为-1 broker.id=-1 #外部连接者要通过什么协议访问指定主机名和端口开放的 Kafka 服务,格式为:协议://ip:port,默认协议为PLAINTEXT,可以将主机名指定为0.0.0.0以绑定到所有接口listeners=PLAINTEXT://IP:9092 # 这组监听器是 Broker 用于对外发布的,主要用于外网访问,格式为:协议://ip:port advertised.listeners=PLAINTEXT://192.168.17.99:9092 # 连接到ZK格式为: ip:port,ip:port zookeeper.connect=zoo1:2181 # 连接到ZK的超时时间 zookeeper.connection.timeout.ms=6000 # 使用Docker部署时要用到 ip为宿主机IP advertised.host.name=ip #过期配置不推荐使用 advertised.port #过期配置不推荐使用 host.name #是否自动创建Topic,默认为true建议改为false auto.create.topics.enable=true #是否允许定期进行 Leader 选举,这个设置不是选leader而是强行卸任当前leader在换一个,换leader是比较耗费资源的#默认为true建议改为false auto.leader.rebalance.enable=true #用于各种后台处理任务的线程数 background.threads=10 #启用删除主题。如果关闭此配置,则通过管理工具删除主题将无效,默认为true delete.topic.enable=true #保留日志数据的目录(log.dirs属性的补充) log.dir=/tmp/kafka-logs #保留日志数据的目录。如果未设置,则使用log.dir中的值 log.dirs=null #是否允许Unclean Leader 选举,建议显式设置为false,只要保存数据多的副本才能参与选举,如果保存数据多的副本都挂了那么还要不要进行副本leader选举,如果是false就是不会让保存数据少的副本竞选leader,如果设置为true可能产生数据丢失 unclean.leader.election.enable #是否允许定期进行 Leader 选举,建议设置为false,因为他不是选leader而是强行卸任当前leader在换一个,换leader是比较耗费资源的 auto.leader.rebalance.enable #这是个“三兄弟”,都是控制一条消息数据被保存多长时间。从优先级上来说 ms 设置最高、minutes 次之、hour 最低,例子:log.retention.hour=168表示默认保存 7 天的数据 log.retention.{hour|minutes|ms} #这是指定 Broker 为消息保存的总磁盘容量大小,默认为-1表示存多少都可以, log.retention.bytes # 控制 Broker 能够接收的最大消息大小,默认1000012比较少还不到1M建议调大一些 message.max.bytes #规定了该 Topic 消息被保存的时长。默认是 7 天,即该 Topic 只保存最近 7 天的消息。一旦设置了这个值,它会覆盖掉 Broker 端的全局参数值 retention.ms #规定了要为该 Topic 预留多大的磁盘空间。和全局参数作用相似,这个值通常在多租户的Kafka 集群中会有用武之地。当前默认值是 -1,表示可以无限使用磁盘空间。 retention.bytes #创建Topic时指定消息保存时间与Topic容量为5M bin/kafka-topics.sh--bootstrap-serverlocalhost:9092--create--topictransaction--partitions1--replication-factor1--configretention.ms=15552000000--configmax.message.bytes=5242880 #修改Topic消息保存时长与容量大小 bin/kafka-configs.sh--zookeeperlocalhost:2181--entity-typetopics--entity-nametransaction--alter--add-configmax.message.bytes=10485760 #环境变量设置堆大小 KAFKA_HEAP_OPTS #环境变量指定GC参数 KAFKA_JVM_PERFORMANCE_OPTS

producer.properties

bootstrap.servers=localhost:9092 #所有生成的数据指定压缩和解码器:none,gzip,snappy,lz4,zstd ,在CPU资源足够的情况下建议使用zstd因为能省网络资源,在吞吐量方面:LZ4 > Snappy > zstd 和 GZIP;而在压缩比方面,zstd > LZ4 > GZIP > Snappy compression.type=none #自定义分区策略类的全限定名,默认分区策略为轮询(根据key 的hash),自定义类需要实现org.apache.kafka.clients.producer.Partitioner接口并实现方法完成自定义分区策略,以下两行代码实现了同一key进入同一分区的策略 #List<PartitionInfo> partitions = cluster.partitionsForTopic(topic); #return Math.abs(key.hashCode()) % partitions.size(); partitioner.class= #客户端等待请求响应的最大时间 request.timeout.ms= #关键字的序列化类。如果没给与这项,默认情况是和消息一致 key.serializer= #消息的序列化类。如果没给与这项,默认情况是和消息一致 value.serializer= #默认为1 #N个broker成功地接收到一条消息并写入到日志文件后然后该消息才会被认为是已提交 #0指的是不等broker的回应始终认为消息已提交 #1指的是领导者完成操作消息就被认为是已提交 #all指的是领导者与副本必须都操作完成才能算是已提交等同 -1 acks=1 #生产者可以用来缓冲等待发送到服务器的记录的内存总字节数。如果记录的发送速度超过了将记录发送到服务器的速度,则生产者将对其进行阻止,max.block.ms然后引发异常 buffer.memory= #设置大于 0 的值将使客户端重新发送任何数据,一旦这些数据发送失败。注意,这些重试与客户端接收到发送错误时的重试没有什么不同。允许重试将潜在的改变数据的顺序,如果这两个消息记录都是发送到同一个 partition,则第一个消息失败第二个发送成功,则第二条消息会比第一条消息出现要早。 retries= #默认为16384,缓冲 batch.size=16384 #请求的最大字节数默认1028576,可以调大一些 max.request.size=1028576

consumer.properties

bootstrap.servers=localhost:9092 #默认为null,一个唯一的字符串,用于标识此使用者所属的使用者组。如果使用者使用通过使用组管理功能subscribe(topic)或基于Kafka的偏移量管理策略,则需要此属性。 group.id=test-consumer-group #key的反序列化类 key.deserializer #value的反序列化类 value.deserializer #当Kafka中没有初始偏移量或服务器上不再存在当前偏移量时(例如因为该数据已被删除),该怎么办 #earliest: 将偏移量自动重置为最早的偏移量 #latest: 自动将偏移量重置为最新偏移量 #none: 如果未找到消费者组的先前偏移量,则向消费者抛出异常 #anything else: 向消费者抛出异常 auto.offset.reset #如果为true,则将在后台定期提交使用者的偏移量。 enable.auto.commit=true
最新回复(0)