Stream
前言:
我们需要创建三个模块:分别是生产者8801,消费者8802,消费者8803
开始:
1.创建新的模块
2.导入依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot
</groupId>
<artifactId>spring-boot-starter-web
</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot
</groupId>
<artifactId>spring-boot-starter-actuator
</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot
</groupId>
<artifactId>spring-boot-devtools
</artifactId>
<scope>runtime
</scope>
<optional>true
</optional>
</dependency>
<dependency>
<groupId>org.projectlombok
</groupId>
<artifactId>lombok
</artifactId>
<optional>true
</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot
</groupId>
<artifactId>spring-boot-starter-test
</artifactId>
<scope>test
</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud
</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client
</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud
</groupId>
<artifactId>spring-cloud-starter-stream-rabbit
</artifactId>
</dependency>
</dependencies>
3.写yml
server:
port: 8801
spring:
application:
name: cloud
-stream
-provider
cloud:
stream:
binders:
defaultRabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings:
output:
destination: studyExchange
content-type: application/json
binder: defaultRabbit
eureka:
client:
service-url:
defaultZone: http
://localhost
:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2
lease-expiration-duration-in-seconds: 5
instance-id: send
-8801.com
prefer-ip-address: true
4.写启动类
import org
.springframework
.boot
.SpringApplication
;
import org
.springframework
.boot
.autoconfigure
.SpringBootApplication
;
@SpringBootApplication
public class StreamRabbitMQMain8801 {
public static void main(String
[] args
) {
SpringApplication
.run(StreamRabbitMQMain8801
.class,args
);
}
}
5.写业务代码
service
public interface IMessageService {
public String
send();
}
impl
import org
.springframework
.cloud
.stream
.annotation
.EnableBinding
;
import org
.springframework
.cloud
.stream
.messaging
.Source
;
import org
.springframework
.integration
.support
.MessageBuilder
;
import org
.springframework
.messaging
.MessageChannel
;
import javax
.annotation
.Resource
;
import java
.util
.UUID
;
@EnableBinding(Source
.class)
public class IMessageServiceImpl implements IMessageService {
@Resource
private MessageChannel output
;
@Override
public String
send() {
String serail
= UUID
.randomUUID().toString();
output
.send(MessageBuilder
.withPayload(serail
).build());
System
.out
.println("----"+serail
+"-----");
return null
;
}
}
controller
import org
.springframework
.web
.bind
.annotation
.GetMapping
;
import org
.springframework
.web
.bind
.annotation
.RestController
;
import javax
.annotation
.Resource
;
@RestController
public class IMessageController {
@Resource
private IMessageService iMessageService
;
@GetMapping(value
= "/sendMessage")
private String
sendMessage(){
return iMessageService
.send();
}
}
消费者
步骤和上面一样 ,只不过yml 和业务类不一样 ,消费者只负责接收消息
1.创建消费者模块
2.导入依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot
</groupId>
<artifactId>spring-boot-starter-web
</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot
</groupId>
<artifactId>spring-boot-starter-actuator
</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot
</groupId>
<artifactId>spring-boot-devtools
</artifactId>
<scope>runtime
</scope>
<optional>true
</optional>
</dependency>
<dependency>
<groupId>org.projectlombok
</groupId>
<artifactId>lombok
</artifactId>
<optional>true
</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot
</groupId>
<artifactId>spring-boot-starter-test
</artifactId>
<scope>test
</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud
</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client
</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud
</groupId>
<artifactId>spring-cloud-starter-stream-rabbit
</artifactId>
</dependency>
</dependencies>
写yml
server:
port: 8802
spring:
application:
name: cloud
-stream
-consumer
cloud:
stream:
binders:
defaultRabbit:
type: rabbit
environment:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings:
intput:
destination: studyExchange
content-type: application/json
binder: defaultRabbit
eureka:
client:
service-url:
defaultZone: http
://localhost
:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2
lease-expiration-duration-in-seconds: 5
instance-id: receive
-8802.com
prefer-ip-address: true
写启动类
import org
.springframework
.boot
.SpringApplication
;
import org
.springframework
.boot
.autoconfigure
.SpringBootApplication
;
@SpringBootApplication
public class StreamRabbitMQMain8802 {
public static void main(String
[] args
) {
SpringApplication
.run(StreamRabbitMQMain8802
.class,args
);
}
}
写业务
controller
import org
.springframework
.beans
.factory
.annotation
.Value
;
import org
.springframework
.cloud
.stream
.annotation
.EnableBinding
;
import org
.springframework
.cloud
.stream
.annotation
.StreamListener
;
import org
.springframework
.cloud
.stream
.messaging
.Sink
;
import org
.springframework
.messaging
.Message
;
import org
.springframework
.stereotype
.Component
;
@Component
@EnableBinding(Sink
.class)
public class StreamRabbitController {
@Value("${server.port}")
private String serverPort
;
@StreamListener(Sink
.INPUT
)
public void getMessage(Message
<String> message
){
System
.out
.println("接受到的消息是:"+message
.getPayload()+"\t"+"端口号是:"+serverPort
);
}
}
测试一下
注意:0.首先检查yml的缩进问题
1.如果没有接收成功的话,查看代码
2.确保代码逻辑没有错误
3.查看包的问题 有没有导错包
4.查看是否启动rabbitmq
启动我们的eureka注册中心 和我们那的消费者 和提供者 8801:发送消息
8801:提供者控制台
----8fb5a487
-923a
-4e74-b31a
-b1e2d244035a
-----
----039dc608
-34c9
-4768-b3df
-d7901b880987
-----
8802:消费者控制台
接受到的消息是
:8fb5a487
-923a
-4e74-b31a
-b1e2d244035a 端口号是
:8802
接受到的消息是
:039dc608
-34c9
-4768-b3df
-d7901b880987 端口号是
:8802
在进行创建一个消费者模块 模仿8803
注意 :只需要修改yaml中的端口和注册eureka的Id
创建好新的模块以后我们来在此发送消息;再次查看8801、8802、8803 的控制台信息
8801:提供者控制台
----02b1157d
-4b39
-449e-9531-cdd46a0caa9e
-----
----1add1c81
-481a
-4587-9471-c1cc89a3b553
-----
----587ab6a8
-48f4-48e5-b2cc
-f5f77720506e
-----
----c3b513df
-cb78
-4c4d
-b4f1
-7fde829b6354
-----
8802:消费者控制台
接受到的消息是
:02b1157d
-4b39
-449e-9531-cdd46a0caa9e 端口号是
:8802
接受到的消息是
:1add1c81
-481a
-4587-9471-c1cc89a3b553 端口号是
:8802
接受到的消息是
:587ab6a8
-48f4-48e5-b2cc
-f5f77720506e 端口号是
:8802
接受到的消息是
:c3b513df
-cb78
-4c4d
-b4f1
-7fde829b6354 端口号是
:8802
8803:消费者控制台
接受到消息:
02b1157d
-4b39
-449e-9531-cdd46a0caa9e
8803
接受到消息:
1add1c81
-481a
-4587-9471-c1cc89a3b553
8803
接受到消息:
587ab6a8
-48f4-48e5-b2cc
-f5f77720506e
8803
接受到消息:c3b513df
-cb78
-4c4d
-b4f1
-7fde829b6354
8803
问题:
通过控制台可以看出我们提供者发送的消息,在消费者8802,8803,都接收到了,说明被重复消费了
原因
是因为默认分组的group是不同的
原理
不同的组是可以消费的,同一个组内会发生竞争关系,只有其中一个可以消费
解决方案
自定义分组修改yml 文件 使我们的消费者都在自己定义的group组下面
在8802、8803的yml中添加
bindings:
input:
destination: studyExchange
content-type: application/json
binder: defaultRabbit
group: RabbitChao1
注意:缩进问题
添加之后
此时我们的消费者都在一个组里面了
再次测试
8801:提供者控制台
----c76e7234
-47ad
-4345-ab40
-9fe4aa49cab4
-----
----b3d3908d
-d131
-4637-aa52
-c1f455bb4747
-----
----1abbea9a
-28a3
-41f0-95e6-dcbcf011fcaf
-----
8802:消费者控制台
接受到的消息是
:c76e7234
-47ad
-4345-ab40
-9fe4aa49cab4 端口号是
:8802
接受到的消息是
:1abbea9a
-28a3
-41f0-95e6-dcbcf011fcaf 端口号是
:8802
8803:消费者控制台
接受到消息:b3d3908d
-d131
-4637-aa52
-c1f455bb4747
8803
测试得出:
我们通过进行同一的分组之后,两个消费者并没有出现重复的消息;