名词说明
Publisher出版商(发布者)Subscriber订阅人Subscription订阅管理Processor订阅者和出版商的组合
出版商(Publisher)接口声明了一个方法subscribe()(订阅)。订阅人(Subscriber)可以通过此方法向出版商(Publisher)发起订阅。出版商(Publisher)创建数据,并将数据发送给订阅的订阅人(Subscriber)。
public interface Publisher<T> {
void subscribe(Subscriber
<? super T
> var1
);
}
一旦订阅人(Subscriber)订阅成功就可以接收来自出版商(Publisher)的事件。出版商(Publisher)会调用订阅人(Subscriber)的以下方法给订阅人(Subscriber)发送信息
public interface Subscriber<T> {
void onSubscribe(Subscription var1
);
void onNext(T var1
);
void onError(Throwable var1
);
void onComplete();
}
方法说明
onSubscribe出版商调用此方法时会把 Subscription 存送给订阅者,订阅者可以通过Subscription 管理其订阅情况。onNext出版商发布的每个数据都会通过调用订阅者的 onNext 方法onError如果有任何错误就会触发onError 方法。onComplete如果出版商没有更多数据,则会调用订阅者的onComplete方法,表示已经结束。
public interface Subscription {
void request(long var1
);
void cancel();
}
订阅者可以通过request方法请求出版商发送数据,当调用request时,订阅者可以传入一个long值,表示订阅者接收多少数据。避免出版商发送过多的数据给订阅者(回压)。当出版商发送完请求数量的数据后。订阅者可以再次调用request订阅。订阅者请求数据后,数据就会开始流经反应式流。出版商发布的每个数据都会通过调用订阅者的 onNext 方法 将数据递交给订阅者。如果有任何错误就会触发onError 方法。如果出版商没有更多数据,则会调用订阅者的onComplete方法,表示已经结束。订阅者可以通过cancel方法取消订阅。
Processor 是 订阅者和出版商的组合
public interface Processor<T, R> extends Subscriber<T>, Publisher
<R> {
}
当作为订阅者时,Processor会接收数据并以某种方式对数据进行处理。然后会将角色转换成 出版商,并将处理处理的结果发布给它的订阅者
示例:
<dependency>
<groupId>io.projectreactor
</groupId>
<artifactId>reactor-core
</artifactId>
<version>3.1.4.RELEASE
</version>
</dependency>
<dependency>
<groupId>io.projectreactor
</groupId>
<artifactId>reactor-test
</artifactId>
<version>3.1.4.RELEASE
</version>
<scope>test
</scope>
</dependency>
<dependency>
<groupId>junit
</groupId>
<artifactId>junit
</artifactId>
<version>4.12
</version>
<scope>test
</scope>
</dependency>
import org
.reactivestreams
.Subscriber
;
import org
.reactivestreams
.Subscription
;
import reactor
.core
.Exceptions
;
import reactor
.core
.publisher
.Flux
;
public class Test {
public static void main(String
[] a
) {
MySubscriber mySubscriber
= new MySubscriber();
Flux
.just("刘志强","王妍","张三")
.doOnRequest(n
-> {System
.out
.println("订阅者发起请求数据,请求长度" + n
);})
.subscribe(mySubscriber
);
}
}
class MySubscriber implements Subscriber<String> {
volatile Subscription subscription
;
final int i
= 2;
int j
= 0;
@Override
public void onSubscribe(Subscription subscription
) {
System
.out
.println("已发起订阅");
this.subscription
= subscription
;
subscription
.request(i
);
}
@Override
public void onNext(String value
) {
j
= j
+ 1;
System
.out
.println("出版商发送来了数据====" + value
+ "===第" + j
+ "条");
if (j
== i
) {
j
= 0;
subscription
.request(i
);
}
}
@Override
public void onComplete() {
System
.out
.println("数据已发送完");
}
@Override
public void onError(Throwable throwable
) {
System
.out
.println("数据发送异常");
throw Exceptions
.errorCallbackNotImplemented(throwable
);
}
}
断言
MySubscriber mySubscriber
= new MySubscriber();
Flux
<String> flux
= Flux
.just("刘志强","王妍","张三")
.doOnRequest(n
-> {System
.out
.println("订阅者发起请求数据,请求长度" + n
);});
flux
.subscribe(mySubscriber
);
StepVerifier
.create(flux
).expectNext("刘志强").expectNext("王妍").expectNext("张三").verifyComplete();
创建Flux
数组创建
Flux<String> fluxArr = Flux.fromArray(new String[]{"刘志强","王妍","张三"});
iterable(list set)创建
List
<String> list
= new ArrayList();
list
.add("刘志强");
list
.add("王妍");
list
.add("张三");
Flux
<String> fluxArr
= Flux
.fromIterable(list
);
生成Flux数据
range(int start, int count)
Flux
<Integer> flux
= Flux
.range(1, 6);
interval 创建定时递增的 Flux
public static void main(String[] a) throws InterruptedException {
MySubscriber mySubscriber = new MySubscriber();
// Flux 实现出版商(Publisher)接口。
Flux<Long> flux = Flux.interval(Duration.ofSeconds(1)).take(200)
.doOnRequest(n -> {System.out.println("订阅者发起请求数据,请求长度" + n);});
flux.subscribe(mySubscriber);
Thread.sleep(10000);
}
组合反应式类型
mergeWith 将 两个Flux 交叉合并 为一个新的 Flux
public static void main(String[] a) throws InterruptedException {
MySubscriber mySubscriber = new MySubscriber();
// Flux 实现出版商(Publisher)接口。
Flux<Long> flux = Flux.interval(Duration.ofSeconds(1)).take(200);
Flux<Long> flux1 = Flux.interval(Duration.ofSeconds(1)).take(200);
Flux<Long> flux2 = flux.mergeWith(flux1).doOnRequest(n -> {System.out.println("订阅者发起请求数据,请求长度" + n);});;
flux2.subscribe(mySubscriber);
Thread.sleep(100000);
}