springboot kafka2.x 生产者同步发送异步发送说明

it2025-03-30  16

文章目录

一、前言1.1 结论2.1 对比 二、案例2.1 依赖2.2 直接摆出案例2.3 小结,kafka的 send 都是异步发送,调用get()实现同步 三、题外话3.1 message.max.bytes3.2 max.request.size3.3 文件转base64的类

一、前言

1.1 结论

我喜欢把结论摆在前面,后面再做解释。

同步写法,等待结果返回: SendResult<String, String> stringStringSendResult = kafkaTemplate.send("cancan", "5", msg).get(); 异步的写法,不等待结果返回: ProducerRecord<String, String> record = new ProducerRecord<>("xixi", "5", msg); ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(record); future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() { @Override public void onFailure(@NonNull Throwable throwable) { //错误结果 } @Override public void onSuccess(SendResult<String, String> result) { //正确结果 } });

2.1 对比

有些同学在用rocketmq的时候,同步发送和异步发送一目了然,如下图

到了 springboot kafka,有些同学就迷惑了,如下图,没有很明显的说明哪一个是同步,哪一个是异步发送。

二、案例

2.1 依赖

我用的是 springboot 2.2.1 kafka的依赖为:

<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency>

2.2 直接摆出案例

同步发送

@Autowired private KafkaTemplate<String, String> kafkaTemplate; //get的时候,为同步 @RequestMapping("/sendsyn") public String sendSyn() throws ExecutionException, InterruptedException { //一个400k的字符串 String msg = FileUtil.fileToBase64("test2"); long s1 = System.currentTimeMillis(); SendResult<String, String> stringStringSendResult = kafkaTemplate.send("cancan", "5", msg).get(); ProducerRecord<String, String> producerRecord = stringStringSendResult.getProducerRecord(); System.out.println(producerRecord.toString()); long s2 = System.currentTimeMillis(); System.out.println("同步耗时:"+ (s2 - s1)); //同步耗时长:312 return "同步发送"; }

异步发送1

//不为get,为异步 @RequestMapping("/sendasy") public String sendAsy() throws ExecutionException, InterruptedException { //一个400k的字符串 String msg = FileUtil.fileToBase64("test2"); long s1 = System.currentTimeMillis(); ListenableFuture<SendResult<String, String>> cancan = kafkaTemplate.send("cancan", "5", msg); cancan.addCallback(new ListenableFutureCallback<SendResult<String, String>>() { @Override public void onFailure(@NonNull Throwable throwable) { System.out.println("结果失败"); } @Override public void onSuccess(SendResult<String, String> result) { ProducerRecord<String, String> producerRecord = result.getProducerRecord(); System.out.println(producerRecord.toString()); long s4 = System.currentTimeMillis(); System.out.println("结果成功,耗时:"+(s4 - s1)); //结果成功,耗时:521 } }); long s2 = System.currentTimeMillis(); System.out.println("异步耗时:"+ (s2 - s1)); //异步耗时长:3 return "异步发送"; }

异步发送2

//异步的其他写法 @RequestMapping("sendasy2") public String sendasy2() throws ExecutionException, InterruptedException { //一个400k的字符串 String msg = FileUtil.fileToBase64("test2"); ProducerRecord<String, String> record = new ProducerRecord<>("cancan", "5", msg); long s1 = System.currentTimeMillis(); ListenableFuture<SendResult<String, String>> cancan = kafkaTemplate.send(record); cancan.addCallback(new ListenableFutureCallback<SendResult<String, String>>() { @Override public void onFailure(@NonNull Throwable throwable) { System.out.println("结果失败"); } @Override public void onSuccess(SendResult<String, String> result) { ProducerRecord<String, String> producerRecord = result.getProducerRecord(); System.out.println(producerRecord.toString()); long s4 = System.currentTimeMillis(); System.out.println("结果成功,耗时:"+(s4 - s1)); //结果成功,耗时:521 } }); long s2 = System.currentTimeMillis(); System.out.println("异步耗时:"+ (s2 - s1)); //异步耗时长:2 return "异步发送2"; }

2.3 小结,kafka的 send 都是异步发送,调用get()实现同步

从以上看出,kafka的 send 都是异步发送,如果想同步,就调用get的方法,就可以等待结果的出现。

三、题外话

3.1 message.max.bytes

kafka服务端的默认只接收一条数据为 1m 大小的数据 如果想要发送大于 1m 大小的数据 需要设置 服务端配置 message.max.bytes = 20000000 默认 1000000 server可以接收的消息最大尺寸。重要的是,consumer和producer有关这个属性的设置必须同步,否则producer发布的消息对consumer来说太大

3.2 max.request.size

客户端配置 max.request.size = 20000000 默认 1028576 请求的最大字节数。这也是对最大记录尺寸的有效覆盖。注意:server具有自己对消息记录尺寸的覆盖,这些尺寸和这个设置不同。此项设置将会限制producer每次批量发送请求的数目,以防发出巨量的请求。

3.3 文件转base64的类

package com.cat.demo.kafkaspringboot.utils; import org.apache.tomcat.util.codec.binary.Base64; import java.io.*; //需要的jar包名commons-codec-1.6.jar /** * 文件和Base64之间的相互转化工具类 * @author rmk */ public class FileUtil { /** * * @param path 文件全路径(加文件名) * @return String * @description 将文件转base64字符串 * @date 2019年11月24日 * @author rmk */ public static String fileToBase64(String path) { String base64 = null; InputStream in = null; try { File file = new File(path); in = new FileInputStream(file); byte[] bytes = new byte[(int) file.length()]; in.read(bytes); base64 = new String(Base64.encodeBase64(bytes),"UTF-8"); System.out.println("将文件["+path+"]转base64字符串:"+base64); } catch (Exception e) { e.printStackTrace(); } finally { if (in != null) { try { in.close(); } catch (IOException e) { e.printStackTrace(); } } } return base64; } /** * @param outFilePath 输出文件路径, base64 base64文件编码字符串, outFileName 输出文件名 * @return String * @description BASE64解码成File文件 * @date 2019年11月24日 * @author rmk */ public static void base64ToFile(String outFilePath,String base64, String outFileName) { System.out.println("BASE64:["+base64+"]解码成File文件["+outFilePath+"\\"+outFileName+"]"); File file = null; //创建文件目录 String filePath=outFilePath; File dir=new File(filePath); if (!dir.exists() && !dir.isDirectory()) { dir.mkdirs(); } BufferedOutputStream bos = null; java.io.FileOutputStream fos = null; try { byte[] bytes = Base64.decodeBase64(base64); file=new File(filePath+"/"+outFileName); fos = new java.io.FileOutputStream(file); bos = new BufferedOutputStream(fos); bos.write(bytes); } catch (Exception e) { e.printStackTrace(); } finally { if (bos != null) { try { bos.close(); } catch (IOException e) { e.printStackTrace(); } } if (fos != null) { try { fos.close(); } catch (IOException e) { e.printStackTrace(); } } } } }
最新回复(0)