kafka的设计初衷是希望作为一个统一的信息收集平台,能够实时的收集反馈信息,并需要能够支撑较大的数据量,且具备良好的容错能力。
这就需要kafka具有以下特性:
1)高吞吐量,低延迟
每秒处理消息的数。我们希望消息引擎的吞吐量越大越好。
延迟:客户端发起请求与服务端处理请求并发送响应给客户端之间的这一段时间。
顺序读写
零拷贝:在Linux kernel2.2 之后出现了一种叫做"零拷贝(zero-copy)"系统调用机制,就是跳过“用户缓冲区”的拷贝,建立一个磁盘空间和内存的直接映射,数据不再复制到“用户态缓冲区”。
分区:
kafka中的topic中的内容可以被分为多分partition存在,每个partition又分为多个段segment,所以每次操作都是针对一小部分做操作,很轻便,并且增加并行操作的能力
批量发送:
kafka允许进行批量发送消息,producter发送消息的时候,可以将消息缓存在本地,等到了固定条件发送到kafka。等消息条数到固定条数 ,一段时间发送一次。
数据压缩: Kafka还支持对消息集合进行压缩,Producer可以通过GZIP或Snappy格式对消息集合进行压缩 压缩的好处就是减少传输的数据量,减轻对网络传输的压力
2)消息持久化
解耦消息发送与消息消费
实现灵活的消息处理: 已经处理过的消息在未来的某个时间需要重新处理。
负载均衡和故障转移Kafka提供了智能的leader选举算法,可以在集群的所有机器上以均等的机会分散各个partition的leader,从而整体上实现了负载均衡。
当服务器意外终止时,整个集群可以快速检测到,并立即将该服务器上的应用或服务转移到其他服务器上。Kafka启动后每个节点会议会话的形式将自己注册到zookeeper服务器上。一旦服务出现问题,则会话超时,集群会选举出另一台服务器来代替他的工作。
伸缩性分布式系统中增加额外的计算资源时吞吐量提升的能力。
例如:一个cpu的处理能力是U,我们希望两个是2U(即线性扩容)
阻碍线性扩容的很常见的因素是状态保存。
Kafka解决这个问题的方法:每台kafka服务器上的状态统一交由zookeeper管理,扩展kafka集群只需要一步, 启动新的kafka服务器。
Broker:Kafka节点,一个Kafka节点就是一个broker,多个broker可以组成一个Kafka集群。
Topic:一类消息,消息存放的目录即主题,例如page view日志、click日志等都可以以topic的形式存在,Kafka集群能够同时负责多个topic的分发。
Partition:topic物理上的分组,一个topic可以分为多个partition,每个partition是一个有序的队列
Segment:partition物理上由多个segment组成,每个Segment存着message信息
Producer : 生产message发送到topic
Consumer : 订阅topic消费message, consumer作为一个线程来消费
Consumer Group:一个Consumer Group包含多个consumer, 这个是预先在配置文件中配置好的。各个consumer(consumer 线程)可以组成一个组(Consumer group ),partition中的每个message只能被组(Consumer group ) 中的一个consumer(consumer 线程 )消费,如果一个message可以被多个consumer(consumer 线程 ) 消费的话,那么这些consumer必须在不同的组。(它不能像MQ那样可以多个线程作为consumer去处理message,这是因为多个线程去消费一个Queue中的数据的时候,由于要保证不能多个线程拿同一条message,所以就需要行级别悲观所(for update),这就导致了consume的性能下降,吞吐量不够。而kafka为了保证吞吐量,只允许一个consumer线程去访问一个partition。如果觉得效率不高的时候,可以加partition的数量来横向扩展,那么再加新的consumer thread去消费。这样没有锁竞争,充分发挥了横向的扩展性,吞吐量极高。这也就形成了分布式消费的概念。)
Broker主要参数配置:
参数描述listenersbroker监听列表。格式是[协议]://[主机名]:[端口], [[协议]://[主机名]:[端口]]。该参数主要用于客户端连接broker使用,可以认为是broker端开放给clients的监听端口。log.retention.{hours|minutes|ms}日志留存时间,默认只保留最近7天的数据。num.partitionsPartition数量zookeeper.connection.timeout.msBroker与zookeeper之间的超时时间zookeeper.connectZookeeper 地址列表log.roll.hours这个参数会在日志segment没有达到log.segment.bytes设置的大小,也会强制新建一个segment 会被 topic创建时的指定参数覆盖log.segment.bytestopic的分区是以一堆segment文件存储的,这个控制每个segment的大小,会被topic创建时的指定参数覆盖num.io.threadsbroker处理磁盘IO 的线程数num.network.threadsbroker 处理消息的最大线程数,一般情况下不需要去修改message.max.bytes消息体的最大大小,单位是字节log.dirskafka数据的存放地址,多个地址的话用逗号分割 /tmp/kafka-logs-1,/tmp/kafka-logs-2broker.id每一个broker在集群中的唯一标示,要求是正数。在改变IP地址,不改变broker.id的话不会影响consumersConsumer参数配置:
参数参数描述socket.timeout.mssocket的超时时间。如果消费者没有在 session.timeout.ms 指定的时间内发送心跳给群组协调器,就被认为已经死亡,协调器就会触发再均衡,把它的分区分配给群组里的其他消费者 。该属性与 heartbeat.interval.ms紧密相关。heartbeat.interval.ms 指定了poll()方法向协调器 发送心跳的频 率, session.timeout.ms 则指定了消费者可以多久不发送心跳。所以, 一般需要同时修改这两个属性, heartbeat.interval.ms 必须比 session.timeout.ms 小, 一般是 session.timeout.ms 的三分之一auto.commit.enable是否在消费消息后将offset自动提交auto.commit.interval.ms自动提交的时间间隔auto.offset.reset该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下(因消费者长时间失效,包含偏移量的记录已经过时井被删除)该作何处理。它的默认值是latest, 意 思是说,在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者 启动之 后生成的记录)。另一个值是 earliest,意思是说,在偏移量无效的情况下,消费者将从 起始位置读取分区的记录fetch.min.bytes该属性指定了消费者从服务器获取记录的最小字节数。 broker 在收到消费者的数据请求时, 如果可用的数据量小于 fetch.min.bytes指定的大小,那么它会等到有足够的可用数据时才把它返回给消费者fetch.max.wait.ms我们通过 fetch.min.bytes 告诉 Kafka,等到有足够的数据时才把它返回给消费者。而 fetch.max.wait.ms则用于指定 broker的等待时间,默认是 500ms。如果没有足够的数据流入 Kafka,消费者获取最小数据量的要求就得不到满足,最终导致500ms的延迟max.parition.fetch.bytes该属性指定了服务器从每个分区里返回给消费者的最大字节数。它的默认值是 1MB,也 就是说, KafkaConsumer.poll() 方法从每个分区里返回的记录最多不超过 max.parition.fetch.bytes 指定的字节。如果一个主题有 20个分区和 5 个消费者,那么每个消费者需要至少 4MB 的可用内存来接收记录。在为消费者分配内存时,可以给它们多分配一些,因 为如果群组里有消费者发生崩溃,剩下的消费者需要处理更多的分区。 max.parition.fetch.bytes 的值必须比 broker能够接收的最大消息的字节数(通过 max.message.size属 性配置 )大, 否则消费者可能无法读取这些消息,导致消费者一直挂起重试。在设置该属性时,另一个需要考虑的因素是消费者处理数据的时间。 消费者需要频繁调用 poll() 方法来避免会话过期和发生分区再均衡,如果单次调用 poll() 返回的数据太多,消费者需要更多的时间来处理,可能无法及时进行下一个轮询来避免会话过期。如果出现这种情况, 可以把 max.parition.fetch.bytes 值改小 ,或者延长会话过期时间。Producer参数配置:
参数参数描述acksacks 参数指定了必须要有多少个分区副本收到消息,生产者才会认为消息写入是成功的retries生产者从服务器收到的错误有可能是临时性的错误(比如分区找不到首领)。在这种情况下, retries参数的值决定了生产者可以重发消息的次数,如果达到这个次数,生产者会放弃重试并返回错误。retry.backoff.ms每次失败重试的间隔时间max.block.ms配置控制了KafkaProducer.send()并将KafkaProducer.partitionsFor()被阻塞多长时间。由于缓冲区已满或元数据不可用,这些方法可能会被阻塞止bootstrap.servers用于建立到Kafka集群的初始连接的主机/端口对列表。该列表应该以表格的形式出现host1:port1,host2:port2,...batch.size只要有多个记录被发送到同一个分区,生产者就会尝试将记录一起分成更少的请求。这有助于客户端和服务器的性能。该配置以字节为单位控制默认的批量大小linger.ms批量延迟的上限:一旦我们得到batch.size值得记录的分区,它将被立即发送而不管这个设置如何,但是如果我们为这个分区累积的字节数少于这个数字,我们将在指定的时间内“等待”,等待更多的记录出现。request.timeout.ms指定了生产者在发送数据时等待服务器返回响应的时间metadata.fetch.timeout.ms指定了生产者在获取元数据(比如目标分区的首领是谁)时等待服务器返回响应的时间。如果等待响应超时,那么生产者要么重试发送数据,要么返回 一个错误 (抛出异常或执行回调)。创建topic
sh kafka-topics.sh --create --topic test --partitions 12 --zookeeper 22.188.144.194:2182, 22.188.144.195:2182, 22.188.147.120:2183
删除topic(server配置中的delete.topic.enable设置为ture,执行以下命令才会删除,否则删除不生效)
sh kafka-topics.sh --delete --topic test --zookeeper 22.188.144.194:2181,22.188.144.195:2182,22.188.147.120:2183
查看topic列表
sh kafka-topics.sh --list --zookeeper 22.188.144.194:2181,22.188.144.195:2182,22.188.147.120:2183
查具体的topic
sh kafka-topics.sh --describe --topic --topic test --zookeeper 22.188.144.194:2181,22.188.144.195:2182,22.188.147.120:2183
5)修改topic的partition数量(注意:只能改大,不能改小)
sh kafka-topics.sh --alter --partitions 12 --topic test --zookeeper 22.188.144.194:2181,22.188.144.195:2182,22.188.147.120:2183
6) 发送消息
sh kafka-console-producer.sh --broker-list 22.188.144.194:50005 --topic test
消费消息sh kafka-console-consumer.sh --zookeeper 22.188.126.86:2181 --from-beginning --topic test