RxJava与Retrofit的碰撞

it2024-11-20  17

RxJava

RxJava说到底,它就是一个实现异步操作的库,但是在Android开发中,我们有现成的AsyncTask 或Handler来实现异步,为什么还需要它呢?使用RxJava是有好处的,它就好在逻辑简洁,好在那把什么复杂逻辑都能串成一条线的简洁 先附上git地址:https://github.com/ReactiveX/RxAndroid

原理浅析

RxJava 的异步实现,是通过一种扩展的观察者模式来实现的

RxJava 的观察者模式: Observable (被观察者)、 Observer (观察者)、 subscribe(订阅)、事件。被观察者和观察者通过subscribe()方法实现订阅关系,从而被观察者可以在需要的时候发出事件来通知观察者,与传统观察者模式不同, RxJava 的事件回调方法除了普通事件onNext()之外,还定义了两个特殊的事件:onCompleted() 和 onError() ,其中onCompleted表示事件队列完结,RxJava不仅把每个事件单独处理,还会把它们看做一个队列,当不会再有新的 onNext() 发出时,需要触发onCompleted() 方法作为标志,而onError表示事件队列异常,在事件处理过程中出异常时,onError()会被触发,同时队列自动终止,不允许再有事件发出

基本实现

方式一 (1)创建 Observer,它决定事件触发的时候将有怎样的行为

Observer<Integer> observer = new Observer<Integer>() { @Override public void onSubscribe(@NonNull Disposable d) { //观察者接收事件前,默认最先调用复写onSubscribe } @Override public void onNext(@NonNull Integer s) { //当被观察者生产Next事件和观察者接收到时,会调用该复写方法进行响应 } @Override public void onError(@NonNull Throwable e) { //当被观察者生产Error事件和观察者接收到时,会调用该复写方法进行响应 } @Override public void onComplete() { //当被观察者生产Complete事件和观察者接收到时,会调用该复写方法 进行响应 } };

(2) 创建Observable,它决定什么时候触发事件以及触发怎样的事件

Observable observable = Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable { //通过ObservableEmitter类对象产生事件并通知观察者 emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onComplete(); } });

(3)通过订阅(Subscribe)连接观察者和被观察者

observable.subscribe(observer);

方式二:基于事件流的链式调用

Observable.create(new ObservableOnSubscribe<Integer>() { @Override public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Throwable { emitter.onNext(1); emitter.onNext(2); emitter.onNext(3); emitter.onComplete(); } }).subscribe(new Observer<Integer>() { @Override public void onSubscribe(@NonNull Disposable d) { } @Override public void onNext(@NonNull Integer integer) { Log.d("jay", integer.toString()); } @Override public void onError(@NonNull Throwable e) { } @Override public void onComplete() { } });

方式三:简便式的观察者模式

Observable.just("hello world").subscribe(new Consumer<String>() { @Override public void accept(String s) throws Throwable { Log.d("jay", s); } });

(4)可采用 Disposable.dispose() 切断观察者与被观察者之间的连接

private Disposable disposable; Observer<Integer> observer = new Observer<Integer>() { @Override public void onSubscribe(@NonNull Disposable d) { //观察者接收事件前,默认最先调用复写onSubscribe disposable = d; } @Override public void onNext(@NonNull Integer s) { //当被观察者生产Next事件和观察者接收到时,会调用该复写方法进行响应 if (s == 2) { disposable.dispose(); } Log.d("jay",s.toString()); } @Override public void onError(@NonNull Throwable e) { //当被观察者生产Error事件和观察者接收到时,会调用该复写方法进行响应 } @Override public void onComplete() { //当被观察者生产Complete事件和观察者接收到时,会调用该复写方法 进行响应 } };

retrofit

retrofit基于okhttp封装的网络请求框架,网络请求的工作本质上是 OkHttp 完成,而retrofit 仅负责网络请求接口的封装,我觉得retrofit最大的特点就是解耦 先附上github地址:https://github.com/square/retrofit (1)依赖

implementation "com.squareup.retrofit2:retrofit:2.9.0" implementation 'com.squareup.retrofit2:converter-gson:2.9.0'

(2)创建网络请求接口

public interface HttpList { @FormUrlEncoded @POST(HttpUrl.JOUR_LIST) Call<JourBean> httpsend(@Field("companyid") String companyid, @Field("index") String index, @Field("size") String size, @Field("keyword") String keyword, @Field("status") String status, @Field("classno") String classno, @Field("checkdate") String checkdate); }

(3)请求网络

//baseUrl不能为空,且强制要求必需以斜杠结尾 Retrofit retrofit = new Retrofit.Builder() .baseUrl(HttpUrl.BaseApi) .addConverterFactory(GsonConverterFactory.create()) .callbackExecutor(Executors.newSingleThreadExecutor()) .build(); HttpList httpList = retrofit.create(HttpList.class); Call<JourBean> call = httpList.httpsend(companyid, index, size, keyword, status, classno, checkdate); call.enqueue(new Callback<JourBean>() { @Override public void onResponse(Call<JourBean> call, Response<JourBean> response) { JourBean bean = response.body(); } @Override public void onFailure(Call<JourBean> call, Throwable t) { } });

RxJava和Retrofit的结合使用

(1)加入依赖

implementation "com.squareup.retrofit2:retrofit:2.9.0" implementation 'com.squareup.retrofit2:converter-gson:2.9.0' implementation 'com.squareup.retrofit2:adapter-rxjava2:2.5.0' implementation 'io.reactivex.rxjava2:rxjava:2.2.9' implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'

(2)修改网络请求接口

public interface HttpList { @FormUrlEncoded @POST(HttpUrl.JOUR_LIST) Observable<JourBean> httpsend(@Field("companyid") String companyid, @Field("index") String index, @Field("size") String size, @Field("keyword") String keyword, @Field("status") String status, @Field("classno") String classno, @Field("checkdate") String checkdate); }

(3)RxJava+Retrofit结合式请求网络

Retrofit retrofit = new Retrofit.Builder() .baseUrl(HttpUrl.BaseApi) .addConverterFactory(GsonConverterFactory.create()) .callbackExecutor(Executors.newSingleThreadExecutor()) .addCallAdapterFactory(RxJava2CallAdapterFactory.create()) .build(); HttpList httpList = retrofit.create(HttpList.class); httpList.httpsend(companyid, index, size, keyword, status, classno, checkdate) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<JourBean>() { @Override public void onSubscribe(@NonNull Disposable d) { } @Override public void onNext(@NonNull JourBean jourBean) { Log.d("jayonNext", jourBean.toString()); } @Override public void onError(@NonNull Throwable e) { Log.d("jayonError", e.getMessage()); } @Override public void onComplete() { Log.d("jay", "请求完成"); } });
最新回复(0)