java反应式编程解释 及 示例

it2025-11-13  11

名词说明Publisher出版商(发布者)Subscriber订阅人Subscription订阅管理Processor订阅者和出版商的组合 出版商(Publisher)接口声明了一个方法subscribe()(订阅)。订阅人(Subscriber)可以通过此方法向出版商(Publisher)发起订阅。出版商(Publisher)创建数据,并将数据发送给订阅的订阅人(Subscriber)。 public interface Publisher<T> { void subscribe(Subscriber<? super T> var1); } 一旦订阅人(Subscriber)订阅成功就可以接收来自出版商(Publisher)的事件。出版商(Publisher)会调用订阅人(Subscriber)的以下方法给订阅人(Subscriber)发送信息 public interface Subscriber<T> { // 订阅时 出版商调用此方法,将订阅管理发送给订阅者 void onSubscribe(Subscription var1); // 出版商调用此方法向订阅者发送数据 void onNext(T var1); // 数据传输的 void onError(Throwable var1); // 完成时触发 void onComplete(); } 方法说明onSubscribe出版商调用此方法时会把 Subscription 存送给订阅者,订阅者可以通过Subscription 管理其订阅情况。onNext出版商发布的每个数据都会通过调用订阅者的 onNext 方法onError如果有任何错误就会触发onError 方法。onComplete如果出版商没有更多数据,则会调用订阅者的onComplete方法,表示已经结束。 public interface Subscription { void request(long var1); void cancel(); } 订阅者可以通过request方法请求出版商发送数据,当调用request时,订阅者可以传入一个long值,表示订阅者接收多少数据。避免出版商发送过多的数据给订阅者(回压)。当出版商发送完请求数量的数据后。订阅者可以再次调用request订阅。订阅者请求数据后,数据就会开始流经反应式流。出版商发布的每个数据都会通过调用订阅者的 onNext 方法 将数据递交给订阅者。如果有任何错误就会触发onError 方法。如果出版商没有更多数据,则会调用订阅者的onComplete方法,表示已经结束。订阅者可以通过cancel方法取消订阅。

Processor 是 订阅者和出版商的组合

public interface Processor<T, R> extends Subscriber<T>, Publisher<R> { }

当作为订阅者时,Processor会接收数据并以某种方式对数据进行处理。然后会将角色转换成 出版商,并将处理处理的结果发布给它的订阅者

示例:

<dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> <version>3.1.4.RELEASE</version> </dependency> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-test</artifactId> <version>3.1.4.RELEASE</version> <scope>test</scope> </dependency> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>4.12</version> <scope>test</scope> </dependency> import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import reactor.core.Exceptions; import reactor.core.publisher.Flux; /** * @author 刘志强 * @date 2020/9/29 13:56 */ public class Test { public static void main(String[] a) { MySubscriber mySubscriber = new MySubscriber(); // Flux 实现出版商(Publisher)接口。 Flux.just("刘志强","王妍","张三") .doOnRequest(n -> {System.out.println("订阅者发起请求数据,请求长度" + n);}) .subscribe(mySubscriber); } } class MySubscriber implements Subscriber<String> { /** * 订阅管理 */ volatile Subscription subscription; /** * 每次请求条数 */ final int i = 2; /** * 一次请求中的 出版商当前是第几次给订阅者发送消息 */ int j = 0; @Override public void onSubscribe(Subscription subscription) { System.out.println("已发起订阅"); this.subscription = subscription; subscription.request(i); } @Override public void onNext(String value) { j = j + 1; System.out.println("出版商发送来了数据====" + value + "===第" + j + "条"); if (j == i) { j = 0; subscription.request(i); } } @Override public void onComplete() { System.out.println("数据已发送完"); } @Override public void onError(Throwable throwable) { System.out.println("数据发送异常"); throw Exceptions.errorCallbackNotImplemented(throwable); } }

断言

MySubscriber mySubscriber = new MySubscriber(); Flux<String> flux = Flux.just("刘志强","王妍","张三") .doOnRequest(n -> {System.out.println("订阅者发起请求数据,请求长度" + n);}); flux.subscribe(mySubscriber); StepVerifier.create(flux).expectNext("刘志强").expectNext("王妍").expectNext("张三").verifyComplete();

创建Flux

数组创建

Flux<String> fluxArr = Flux.fromArray(new String[]{"刘志强","王妍","张三"});

iterable(list set)创建

List<String> list = new ArrayList(); list.add("刘志强"); list.add("王妍"); list.add("张三"); Flux<String> fluxArr = Flux.fromIterable(list);

生成Flux数据

range(int start, int count)

// 每次调用订阅者的onNext方法时 加一 知道等于count时 Flux<Integer> flux = Flux.range(1, 6);

interval 创建定时递增的 Flux

public static void main(String[] a) throws InterruptedException { MySubscriber mySubscriber = new MySubscriber(); // Flux 实现出版商(Publisher)接口。 Flux<Long> flux = Flux.interval(Duration.ofSeconds(1)).take(200) .doOnRequest(n -> {System.out.println("订阅者发起请求数据,请求长度" + n);}); flux.subscribe(mySubscriber); Thread.sleep(10000); }

组合反应式类型

mergeWith 将 两个Flux 交叉合并 为一个新的 Flux

public static void main(String[] a) throws InterruptedException { MySubscriber mySubscriber = new MySubscriber(); // Flux 实现出版商(Publisher)接口。 Flux<Long> flux = Flux.interval(Duration.ofSeconds(1)).take(200); Flux<Long> flux1 = Flux.interval(Duration.ofSeconds(1)).take(200); Flux<Long> flux2 = flux.mergeWith(flux1).doOnRequest(n -> {System.out.println("订阅者发起请求数据,请求长度" + n);});; flux2.subscribe(mySubscriber); Thread.sleep(100000); }
最新回复(0)