Spring Cloud Stream(消息驱动) 详细

it2025-05-28  15

Stream

前言:

我们需要创建三个模块:分别是生产者8801,消费者8802,消费者8803

开始:

1.创建新的模块

2.导入依赖

<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <!--开启热部署--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!--eureka--> <!--添加eureka-client的依赖--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <!--添加新的依赖 stream-rabbit--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> </dependencies>

3.写yml

server: port: 8801 #端口号 spring: application: name: cloud-stream-provider cloud: stream: binders: #在此处要绑定rabbitmq的信息 defaultRabbit: #表示定义的名称,用于binding的整合 type: rabbit #消息组件的类型 environment: spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: #服务的整合处理 output: #名字是通道的名称 destination: studyExchange #表示使用exchange名称定义 content-type: application/json #设置消息类型 本次为json 文本则设置成 text/plain binder: defaultRabbit #设定绑定消息的服务的具体设置 eureka: client: service-url: defaultZone: http://localhost:7001/eureka instance: lease-renewal-interval-in-seconds: 2 #设置心跳间隔时间秒 默认是30秒 lease-expiration-duration-in-seconds: 5 #设置超时时间5秒 默认90秒 instance-id: send-8801.com #在信息列表显示主机名称 prefer-ip-address: true #访问的路径变为ip地址

4.写启动类

import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class StreamRabbitMQMain8801 { public static void main(String[] args) { SpringApplication.run(StreamRabbitMQMain8801.class,args); } }

5.写业务代码

service

public interface IMessageService { public String send(); }

impl

import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; import org.springframework.integration.support.MessageBuilder; import org.springframework.messaging.MessageChannel; //特别注意千万,千万别导错包 import javax.annotation.Resource; import java.util.UUID; @EnableBinding(Source.class) // public class IMessageServiceImpl implements IMessageService { @Resource private MessageChannel output; //发送消息的管道 @Override public String send() { String serail = UUID.randomUUID().toString(); //MessageBuilder.withPayload(serail).build() 构造一个消息 //import org.springframework.integration.support.MessageBuilder; 注意不要导错包 output.send(MessageBuilder.withPayload(serail).build()); System.out.println("----"+serail+"-----"); return null; } }

controller

import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import javax.annotation.Resource; @RestController public class IMessageController { @Resource private IMessageService iMessageService; @GetMapping(value = "/sendMessage") private String sendMessage(){ return iMessageService.send(); } }

消费者

步骤和上面一样 ,只不过yml 和业务类不一样 ,消费者只负责接收消息

1.创建消费者模块

2.导入依赖

<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency> <!--开启热部署--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <scope>runtime</scope> <optional>true</optional> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <optional>true</optional> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!--eureka--> <!--添加eureka-client的依赖--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <!--添加新的依赖 stream-rabbit--> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> </dependencies>

写yml

server: port: 8802 spring: application: name: cloud-stream-consumer cloud: stream: binders: # 在此处配置要绑定的rabbitmq的服务信息; defaultRabbit: # 表示定义的名称,用于于binding整合 type: rabbit # 消息组件类型 environment: # 设置rabbitmq的相关的环境配置 spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: # 服务的整合处理 千万要注意缩进 intput: # 这个名字是一个通道的名称 接收消息 input 生产者 output 发送消息 destination: studyExchange # 表示要使用的Exchange名称定义 content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain” binder: defaultRabbit #在这里会爆红不过不影响 具体为什么爆红 目前我也不知啊0.0 eureka: client: # 客户端进行Eureka注册的配置 service-url: defaultZone: http://localhost:7001/eureka instance: lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒) lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒) instance-id: receive-8802.com # 在信息列表时显示主机名称 prefer-ip-address: true # 访问的路径变为IP地址

写启动类

import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class StreamRabbitMQMain8802 { public static void main(String[] args) { SpringApplication.run(StreamRabbitMQMain8802.class,args); } }

写业务

controller

import org.springframework.beans.factory.annotation.Value; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.messaging.Message; import org.springframework.stereotype.Component; //注意在这千万别dao错包 @Component @EnableBinding(Sink.class) public class StreamRabbitController { @Value("${server.port}") private String serverPort; //stream的监听者接受消息的 @StreamListener(Sink.INPUT) public void getMessage(Message<String> message){ //消息构造者构造消息withPayload(消息) 我们的是消费者 接受消息 getPayload System.out.println("接受到的消息是:"+message.getPayload()+"\t"+"端口号是:"+serverPort ); } }

测试一下

注意:0.首先检查yml的缩进问题

​ 1.如果没有接收成功的话,查看代码

​ 2.确保代码逻辑没有错误

​ 3.查看包的问题 有没有导错包

​ 4.查看是否启动rabbitmq

启动我们的eureka注册中心 和我们那的消费者 和提供者 8801:发送消息

8801:提供者控制台

----8fb5a487-923a-4e74-b31a-b1e2d244035a----- ----039dc608-34c9-4768-b3df-d7901b880987-----

8802:消费者控制台

接受到的消息是:8fb5a487-923a-4e74-b31a-b1e2d244035a 端口号是:8802 接受到的消息是:039dc608-34c9-4768-b3df-d7901b880987 端口号是:8802

在进行创建一个消费者模块 模仿8803

注意 :只需要修改yaml中的端口和注册eureka的Id

创建好新的模块以后我们来在此发送消息;再次查看8801、8802、8803 的控制台信息

8801:提供者控制台

----02b1157d-4b39-449e-9531-cdd46a0caa9e----- ----1add1c81-481a-4587-9471-c1cc89a3b553----- ----587ab6a8-48f4-48e5-b2cc-f5f77720506e----- ----c3b513df-cb78-4c4d-b4f1-7fde829b6354-----

8802:消费者控制台

接受到的消息是:02b1157d-4b39-449e-9531-cdd46a0caa9e 端口号是:8802 接受到的消息是:1add1c81-481a-4587-9471-c1cc89a3b553 端口号是:8802 接受到的消息是:587ab6a8-48f4-48e5-b2cc-f5f77720506e 端口号是:8802 接受到的消息是:c3b513df-cb78-4c4d-b4f1-7fde829b6354 端口号是:8802

8803:消费者控制台

接受到消息:02b1157d-4b39-449e-9531-cdd46a0caa9e 8803 接受到消息:1add1c81-481a-4587-9471-c1cc89a3b553 8803 接受到消息:587ab6a8-48f4-48e5-b2cc-f5f77720506e 8803 接受到消息:c3b513df-cb78-4c4d-b4f1-7fde829b6354 8803

问题:

通过控制台可以看出我们提供者发送的消息,在消费者8802,8803,都接收到了,说明被重复消费了

原因

是因为默认分组的group是不同的

原理

不同的组是可以消费的,同一个组内会发生竞争关系,只有其中一个可以消费

解决方案

自定义分组修改yml 文件 使我们的消费者都在自己定义的group组下面

在8802、8803的yml中添加

bindings: # 服务的整合处理 input: # 这个名字是一个通道的名称 destination: studyExchange # 表示要使用的Exchange名称定义 content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain” binder: defaultRabbit group: RabbitChao1 #这个就是组名 注意缩进

注意:缩进问题

添加之后

此时我们的消费者都在一个组里面了

再次测试

8801:提供者控制台

----c76e7234-47ad-4345-ab40-9fe4aa49cab4----- ----b3d3908d-d131-4637-aa52-c1f455bb4747----- ----1abbea9a-28a3-41f0-95e6-dcbcf011fcaf-----

8802:消费者控制台

接受到的消息是:c76e7234-47ad-4345-ab40-9fe4aa49cab4 端口号是:8802 接受到的消息是:1abbea9a-28a3-41f0-95e6-dcbcf011fcaf 端口号是:8802

8803:消费者控制台

接受到消息:b3d3908d-d131-4637-aa52-c1f455bb4747 8803

测试得出:

我们通过进行同一的分组之后,两个消费者并没有出现重复的消息;

最新回复(0)