我喜欢把结论摆在前面,后面再做解释。
同步写法,等待结果返回: SendResult<String, String> stringStringSendResult = kafkaTemplate.send("cancan", "5", msg).get(); 异步的写法,不等待结果返回: ProducerRecord<String, String> record = new ProducerRecord<>("xixi", "5", msg); ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(record); future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() { @Override public void onFailure(@NonNull Throwable throwable) { //错误结果 } @Override public void onSuccess(SendResult<String, String> result) { //正确结果 } });有些同学在用rocketmq的时候,同步发送和异步发送一目了然,如下图
到了 springboot kafka,有些同学就迷惑了,如下图,没有很明显的说明哪一个是同步,哪一个是异步发送。
我用的是 springboot 2.2.1 kafka的依赖为:
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>同步发送
@Autowired private KafkaTemplate<String, String> kafkaTemplate; //get的时候,为同步 @RequestMapping("/sendsyn") public String sendSyn() throws ExecutionException, InterruptedException { //一个400k的字符串 String msg = FileUtil.fileToBase64("test2"); long s1 = System.currentTimeMillis(); SendResult<String, String> stringStringSendResult = kafkaTemplate.send("cancan", "5", msg).get(); ProducerRecord<String, String> producerRecord = stringStringSendResult.getProducerRecord(); System.out.println(producerRecord.toString()); long s2 = System.currentTimeMillis(); System.out.println("同步耗时:"+ (s2 - s1)); //同步耗时长:312 return "同步发送"; }异步发送1
//不为get,为异步 @RequestMapping("/sendasy") public String sendAsy() throws ExecutionException, InterruptedException { //一个400k的字符串 String msg = FileUtil.fileToBase64("test2"); long s1 = System.currentTimeMillis(); ListenableFuture<SendResult<String, String>> cancan = kafkaTemplate.send("cancan", "5", msg); cancan.addCallback(new ListenableFutureCallback<SendResult<String, String>>() { @Override public void onFailure(@NonNull Throwable throwable) { System.out.println("结果失败"); } @Override public void onSuccess(SendResult<String, String> result) { ProducerRecord<String, String> producerRecord = result.getProducerRecord(); System.out.println(producerRecord.toString()); long s4 = System.currentTimeMillis(); System.out.println("结果成功,耗时:"+(s4 - s1)); //结果成功,耗时:521 } }); long s2 = System.currentTimeMillis(); System.out.println("异步耗时:"+ (s2 - s1)); //异步耗时长:3 return "异步发送"; }异步发送2
//异步的其他写法 @RequestMapping("sendasy2") public String sendasy2() throws ExecutionException, InterruptedException { //一个400k的字符串 String msg = FileUtil.fileToBase64("test2"); ProducerRecord<String, String> record = new ProducerRecord<>("cancan", "5", msg); long s1 = System.currentTimeMillis(); ListenableFuture<SendResult<String, String>> cancan = kafkaTemplate.send(record); cancan.addCallback(new ListenableFutureCallback<SendResult<String, String>>() { @Override public void onFailure(@NonNull Throwable throwable) { System.out.println("结果失败"); } @Override public void onSuccess(SendResult<String, String> result) { ProducerRecord<String, String> producerRecord = result.getProducerRecord(); System.out.println(producerRecord.toString()); long s4 = System.currentTimeMillis(); System.out.println("结果成功,耗时:"+(s4 - s1)); //结果成功,耗时:521 } }); long s2 = System.currentTimeMillis(); System.out.println("异步耗时:"+ (s2 - s1)); //异步耗时长:2 return "异步发送2"; }从以上看出,kafka的 send 都是异步发送,如果想同步,就调用get的方法,就可以等待结果的出现。
kafka服务端的默认只接收一条数据为 1m 大小的数据 如果想要发送大于 1m 大小的数据 需要设置 服务端配置 message.max.bytes = 20000000 默认 1000000 server可以接收的消息最大尺寸。重要的是,consumer和producer有关这个属性的设置必须同步,否则producer发布的消息对consumer来说太大
客户端配置 max.request.size = 20000000 默认 1028576 请求的最大字节数。这也是对最大记录尺寸的有效覆盖。注意:server具有自己对消息记录尺寸的覆盖,这些尺寸和这个设置不同。此项设置将会限制producer每次批量发送请求的数目,以防发出巨量的请求。