使用了 RxJava2 有一段時(shí)間了,深深感受到了其“牛逼”之處。下面,就從 RxJava2 的基礎(chǔ)開始,一步步與大家分享一下這個(gè)強(qiáng)大的異步庫的用法!RxJava 是 一個(gè)在 Java VM 上使用可觀測(cè)的序列來組成異步的、基于事件的程序的庫,也就是用于實(shí)現(xiàn)異步操作的庫。 一、RxJava2 基礎(chǔ)RxJava可以濃縮為異步兩個(gè)字,其核心的東西不外乎兩個(gè), Observables(被觀察者) 和 Observable(觀察者)。Observables可以發(fā)出一系列的 事件(例如網(wǎng)絡(luò)請(qǐng)求、復(fù)雜計(jì)算、數(shù)據(jù)庫操作、文件讀取等),事件執(zhí)行結(jié)束后交給Observable 的回調(diào)處理。 1.RxJava2 的觀察者模式觀察者模式是對(duì)象的行為模式,也叫做發(fā)布-訂閱(Publish/Subscribe)模式、模型-視圖(Model/View)模式、源-監(jiān)聽器(Source/Listener)模式或從屬者(Dependents)模式。 什么是觀察者模式?舉個(gè)栗子,Android中View的點(diǎn)擊監(jiān)聽器的實(shí)現(xiàn),View是被觀察者,OnClickListener對(duì)象是觀察者,Activity要如何知道View被點(diǎn)擊了?那就是派一個(gè)OnClickListener對(duì)象,入駐View,與View達(dá)成一個(gè)訂閱關(guān)系,一旦View被點(diǎn)擊了,就通過OnClickListener對(duì)象的OnClick方法傳達(dá)給Activity。采用觀察者模式可以避免去輪詢檢查,節(jié)約有限的cpu資源。 RxJava 作為一個(gè)工具庫,使用的便是通用形式的觀察者模式: 
普通事件:onNext(),相當(dāng)于 onClick()、onEvent();特殊事件:onCompleted() 和 onError() 如圖所示,RxJava 的基本概念分別為:Observable(被觀察者,事件源),Observer(觀察者,訂閱者),subscribe (訂閱)、事件;不同的是,RxJava 把多個(gè)事件看做一個(gè)隊(duì)列,并對(duì)每個(gè)事件單獨(dú)處理。在一個(gè)隊(duì)列中 onCompleted() 和 onError(),只有一個(gè)會(huì)被調(diào)用。如果調(diào)用了 onCompleted() 就說明隊(duì)列執(zhí)行完畢,沒有出現(xiàn)異常,否則調(diào)用 onError() 方法并終止隊(duì)列。 2.RxJava2 響應(yīng)式編程結(jié)構(gòu)什么是響應(yīng)式編程?舉個(gè)栗子,a = b + c; 這句代碼將b+c的值賦給a,而之后如果b和c的值改變了不會(huì)影響到a,然而,對(duì)于響應(yīng)式編程,之后b和c的值的改變也動(dòng)態(tài)影響著a,意味著a會(huì)隨著b和c的變化而變化。 響應(yīng)式編程的組成為Observable/Operator/Subscriber,RxJava在響應(yīng)式編程中的基本流程如下: 這個(gè)流程,可以簡單的理解為:Observable -> Operator1 -> Operator2 -> Operator3 -> Subscriber Observable發(fā)出一系列事件,他是事件的產(chǎn)生者; Subscriber負(fù)責(zé)處理事件,他是事件的消費(fèi)者; Operator是對(duì)Observable發(fā)出的事件進(jìn)行修改和變換; 若事件從產(chǎn)生到消費(fèi)不需要其他處理,則可以省略掉中間的Operator,從而流程變?yōu)镺bsevable -> Subscriber; Subscriber通常在主線程執(zhí)行,所以原則上不要去處理太多的事務(wù),而這些復(fù)雜的處理則交給Operator;
3.創(chuàng)建一個(gè)完整的 RxJava2 調(diào)用首先需要添加 RxJava2 在 Android 中的 Gradle 依賴: compile 'io.reactivex.rxjava2:rxandroid:2.0.1' compile 'io.reactivex.rxjava2:rxjava:2.0.8'
RxJava2 可以通過下面這幾種方法創(chuàng)建被觀察者: // 發(fā)送對(duì)應(yīng)的方法 Observable.create(new ObservableOnSubscribeString>() { // 默認(rèn)在主線程里執(zhí)行該方法 @Override public void subscribe(@NonNull ObservableEmitterString> e) throws Exception { e.onNext('Hello'); e.onNext('World'); // 結(jié)束標(biāo)識(shí) e.onComplete(); } }); // 發(fā)送多個(gè)數(shù)據(jù) Observable.just('Hello', 'World'); // 發(fā)送數(shù)組 Observable.fromArray('Hello', 'World'); // 發(fā)送一個(gè)數(shù)據(jù) Observable.fromCallable(new CallableString>() { @Override public String call() throws Exception { return 'Hello'; } });
RxJava2 支持鏈?zhǔn)骄幊?,下來我們?chuàng)建被觀察者,然后創(chuàng)建觀察者并訂閱: // 創(chuàng)建被觀察者 Observable.just('Hello', 'World') // 將被觀察者切換到子線程 .subscribeOn(Schedulers.io()) // 將觀察者切換到主線程 .observeOn(AndroidSchedulers.mainThread()) // 創(chuàng)建觀察者并訂閱 .subscribe(new ObserverString>() { // Disposable 相當(dāng)于RxJava1.x中的 Subscription,用于解除訂閱 private Disposable disposable; @Override public void onSubscribe(Disposable d) { disposable = d; } @Override public void onNext(String s) { Log.i('JAVA', '被觀察者向觀察者發(fā)送的數(shù)據(jù):' + s); if (s == '-1') { // '-1' 時(shí)為異常數(shù)據(jù),解除訂閱 disposable.dispose(); } } @Override public void onError(Throwable e) { } @Override public void onComplete() { } });
一旦 Observer 訂閱了 Observable,Observable 就會(huì)調(diào)用 Observer 的 onNext()、onCompleted()、onError() 等方法。至此一個(gè)完整的 RxJava 調(diào)用就完成了。看一下輸出的Log: I/JAVA: 被觀察者向觀察者發(fā)送的數(shù)據(jù):Hello I/JAVA: 被觀察者向觀察者發(fā)送的數(shù)據(jù):World
若喜歡簡潔、定制服務(wù),那么可以實(shí)現(xiàn)的方法跟上面的實(shí)現(xiàn)方法是對(duì)應(yīng)起來的,大家看參數(shù)就知道哪個(gè)對(duì)應(yīng)哪個(gè)了,你可以通過new Consumer(不需要實(shí)現(xiàn)的方法你可以不寫,看上去更簡潔),Consumer就是消費(fèi)者的意思,可以理解為消費(fèi)了 onNext 等事件: Observable.just('Hello', 'World') .subscribe(new ConsumerString>() { @Override public void accept(@NonNull String s) throws Exception { Log.i('JAVA', '被觀察者向觀察者發(fā)送的數(shù)據(jù):' + s); } }, new Consumer() { @Override public void accept(@NonNull Throwable throwable) throws Exception { } }, new Action() { @Override public void run() throws Exception { } }, new Consumer() { @Override public void accept(@NonNull Disposable disposable) throws Exception { } });
4.RxJava2 的操作符RxJava中提供了大量不同種類,不同場(chǎng)景的Operators(操作符),RxJava的強(qiáng)大性就來自于它所定義的操作符。主要分類: 
其中有一些高頻使用的操作符如下:

5.RxJava2 線程調(diào)度器調(diào)度器 Scheduler 用于控制操作符和被觀察者事件所執(zhí)行的線程,不同的調(diào)度器對(duì)應(yīng)不同的線程。RxJava提供了5種調(diào)度器: 
可以使用 subscribeOn() 和 ObserveOn() 操作符進(jìn)行線程調(diào)度,讓 Observable 在一個(gè)特定的調(diào)度器上執(zhí)行。subscribeOn() 指定 subscribe() 所發(fā)生的線程,事件產(chǎn)生的線程。ObserveOn() 指定 Observer 所運(yùn)行在的線程,事件消費(fèi)的線程。 6.RxJava2 模擬發(fā)送驗(yàn)證碼倒計(jì)時(shí)功能public void onCodeClick() { final long count = 60; // 設(shè)置60秒 Observable.interval(0, 1, TimeUnit.SECONDS) .take(count + 1) .map(new Function() { @Override public Long apply(@NonNull Long aLong) throws Exception { return count - aLong; // 由于是倒計(jì)時(shí),需要將倒計(jì)時(shí)的數(shù)字反過來 } }) .observeOn(AndroidSchedulers.mainThread()) .doOnSubscribe(new Consumer() { @Override public void accept(@NonNull Disposable disposable) throws Exception { button.setEnabled(false); button.setTextColor(Color.GRAY); } }) .subscribe(new Observer() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Long aLong) { button.setText(aLong + '秒后重發(fā)'); } @Override public void onError(Throwable e) { } @Override public void onComplete() { button.setEnabled(true); button.setTextColor(Color.RED); button.setText('發(fā)送驗(yàn)證碼'); } }); }
二、RxJava2 系列框架
三、RxJava2 與 Retrofit 的使用RxJava 與 Retrofit 的使用,更像我們的 AsyncTask,通過網(wǎng)絡(luò)獲取數(shù)據(jù)然后通過 Handler 更新UI。首先需要導(dǎo)入依賴: compile 'com.squareup.retrofit2:retrofit:2.2.0' compile 'com.squareup.retrofit2:converter-gson:2.2.0' compile 'com.squareup.retrofit2:adapter-rxjava2:2.2.0'
1.模擬用戶登陸獲取用戶數(shù)據(jù) 模擬用戶登陸獲取用戶數(shù)據(jù)
1.Bean對(duì)象: public class UserParam { private String param1; private String param2; public UserParam(String param1, String param2) { this.param1 = param1; this.param2 = param2; } // 省略了 getter setter } public class NetBean { private FormBean form; // 省略了 getter setter public static class FormBean { private String username; private String password; // 省略了 getter setter } } public class UserBean { private String username; private String password; public UserBean(String username, String password) { this.username = username; this.password = password; } // 省略了 getter setter }
2.ApiService,這里返回Observable對(duì)象,也就是我們RxJava的被觀察者 public interface ApiService { @FormUrlEncoded @POST('/post') Observable getUserInfo(@Field('username')String username, @Field('password')String password); }
3.RxJava + Retrofit 的實(shí)現(xiàn) // 構(gòu)建Retrofit ApiService apiService = new Retrofit.Builder() .baseUrl('http:///') .addConverterFactory(GsonConverterFactory.create()) // RxJava2與Gson混用 .addCallAdapterFactory(RxJava2CallAdapterFactory.create()) // RxJava2與Retrofit混用 .build() .create(ApiService.class);
// 構(gòu)建RxJava UserParam param = new UserParam('zhangsan', '123'); // 發(fā)送param參數(shù) Observable.just(param) // flatMap方法是用于數(shù)據(jù)格式轉(zhuǎn)換的方法,參數(shù)一表示原數(shù)據(jù), // 參數(shù)二表示轉(zhuǎn)換的數(shù)據(jù),那么就是通過發(fā)送網(wǎng)絡(luò)參數(shù),轉(zhuǎn)換成網(wǎng)絡(luò)返回的數(shù)據(jù),調(diào)用Retrofit .flatMap(new Function>() { @Override public ObservableSource apply(@NonNull UserParam userParam) throws Exception { // 1.發(fā)送網(wǎng)絡(luò)請(qǐng)求,獲取NetBean return apiService.getUserInfo(userParam.getParam1(), userParam.getParam2()); } }) .flatMap(new Function>() { @Override public ObservableSource apply(@NonNull NetBean netBean) throws Exception { UserBean user = new UserBean(netBean.getForm().getUsername(), netBean.getForm().getPassword()); // 2.轉(zhuǎn)換NetBean數(shù)據(jù)為我們需要的UserBean數(shù)據(jù) return Observable.just(user); } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer() { @Override public void accept(@NonNull UserBean userBean) throws Exception { Log.i('JAVA', '' + '用戶名:' + userBean.getUsername() + ', 密碼:' + userBean.getPassword()); } });
2.模擬合并本地與服務(wù)器購物車列表這個(gè)案例其實(shí)就是用戶添加購物車的時(shí)候,首先會(huì)在本地存儲(chǔ)一份,然后發(fā)現(xiàn)如果沒有網(wǎng)絡(luò),那么沒辦法提交到服務(wù)器上,只能等下一次有網(wǎng)絡(luò)的時(shí)候采用本地?cái)?shù)據(jù)庫和服務(wù)器數(shù)據(jù)的合并來實(shí)現(xiàn)上傳到服務(wù)器。  模擬合并本地與服務(wù)器購物車列表
首先需要準(zhǔn)備 Retrofit 對(duì)象和獲取本地?cái)?shù)據(jù)、網(wǎng)絡(luò)數(shù)據(jù)的方法: private ApiService apiService; @Override protected void onCreate(@Nullable Bundle savedInstanceState) { super.onCreate(savedInstanceState); // 省略 // 構(gòu)建Retrofit apiService = new Retrofit.Builder() .baseUrl('http:///') .addConverterFactory(GsonConverterFactory.create()) // RxJava2與Gson混用 .addCallAdapterFactory(RxJava2CallAdapterFactory.create()) // RxJava2與Retrofit混用 .build() .create(ApiService.class); } /** * 獲取本地?cái)?shù)據(jù) */ private Observable<>String>> getDataForLocal() { ListString> list = new ArrayList<>(); list.add('購物車的商品1'); list.add('購物車的商品2'); return Observable.just(list);} /** * 獲取網(wǎng)絡(luò)數(shù)據(jù) */ private Observable<>String>> getDataForNet() { return Observable.just('shopName') // flatMap方法是用于數(shù)據(jù)格式轉(zhuǎn)換的方法,參數(shù)一表示原數(shù)據(jù), // 參數(shù)二表示轉(zhuǎn)換的數(shù)據(jù),那么就是通過發(fā)送網(wǎng)絡(luò)參數(shù),轉(zhuǎn)換成網(wǎng)絡(luò)返回的數(shù)據(jù),調(diào)用Retrofit .flatMap(new FunctionString, ObservableSource>() { @Override public ObservableSource apply(@NonNull String s) throws Exception { // 1.發(fā)送網(wǎng)絡(luò)請(qǐng)求,獲取數(shù)據(jù) return apiService.getCartList(s); } }).flatMap(new Function<>String>>>() { @Override public ObservableSource<>String>> apply(@NonNull NetBean netBean) throws Exception { // String shop = netBean.get_$Args257().getShopName(); String shop = '購物車的商品3'; ListString> list = new ArrayList<>(); list.add(shop); // 2.轉(zhuǎn)換NetBean數(shù)據(jù)為我們需要的List數(shù)據(jù) return Observable.just(list); } }).subscribeOn(Schedulers.io()); }
然后我們就可以創(chuàng)建被觀察者并訂閱了,來完成合并本地與服務(wù)器購物車列表操作: // merge操作符: 將兩個(gè)ObservableSource合并為一個(gè)ObservableSource Observable.merge(getDataForLocal(), getDataForNet()) .subscribe(new Observer<>String>>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(ListString> strings) { for (String str: strings) { Log.i('JAVA', str); } } @Override public void onError(Throwable e) { } @Override public void onComplete() { Log.i('JAVA', 'onComplete'); } });
最后的打印結(jié)果是: I/JAVA: 購物車的商品1 I/JAVA: 購物車的商品2 I/JAVA: 購物車的商品3 I/JAVA: onComplete
四、RxJava2 與 RxBinding 的使用1.優(yōu)化搜索請(qǐng)求當(dāng)我們?cè)?EditText 打字時(shí)搜索的時(shí)候,可能用戶會(huì)打字很會(huì)快,那么我們就沒有必要一直發(fā)送網(wǎng)絡(luò)請(qǐng)求,請(qǐng)求搜索結(jié)果,我們可以通過當(dāng)用戶打字停止后的延時(shí)500毫秒再發(fā)送搜索請(qǐng)求: // RxTextView.textChanges(edittext): Rxbinding用法 RxTextView.textChanges(editText) // 表示延時(shí)多少秒后執(zhí)行,當(dāng)你敲完字之后停下來的半秒就會(huì)執(zhí)行下面語句 .debounce(500, TimeUnit.MILLISECONDS) // 數(shù)據(jù)轉(zhuǎn)換 flatMap: 當(dāng)同時(shí)多個(gè)數(shù)據(jù)請(qǐng)求訪問的時(shí)候,前面的網(wǎng)絡(luò)數(shù)據(jù)會(huì)覆蓋后面的網(wǎng)絡(luò)數(shù)據(jù) // 數(shù)據(jù)轉(zhuǎn)換 switchMap: 當(dāng)同時(shí)多個(gè)網(wǎng)絡(luò)請(qǐng)求訪問的時(shí)候,會(huì)以最后一個(gè)發(fā)送請(qǐng)求為準(zhǔn),前面網(wǎng)絡(luò)數(shù)據(jù)會(huì)被最后一個(gè)覆蓋 .switchMap(new Function<>String>>>() { @Override public ObservableSource<>String>> apply( @NonNull CharSequence charSequence) throws Exception { // 網(wǎng)絡(luò)請(qǐng)求操作,獲取我們需要的數(shù)據(jù) ListString> list = new ArrayListString>(); list.add('2017'); list.add('2018'); return Observable.just(list); } }) .subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Consumer<>String>>() { @Override public void accept(@NonNull ListString> strings) throws Exception { // 更新UI Log.i('JAVA', strings.toString()); } });
2.優(yōu)化點(diǎn)擊請(qǐng)求當(dāng)用戶一直點(diǎn)擊一個(gè)按鈕的時(shí)候,我們不應(yīng)該一直調(diào)用訪問網(wǎng)絡(luò)請(qǐng)求,而是 1秒內(nèi),只執(zhí)行一次網(wǎng)絡(luò)請(qǐng)求。 RxView.clicks(button).throttleFirst(1, TimeUnit.SECONDS) .subscribe(new ObserverObject>() { @Override public void onSubscribe(Disposable d) { } @Override public void onNext(Object o) { Log.i('JAVA', 'onClick'); } @Override public void onError(Throwable e) { } @Override public void onComplete() { } });
五、RxJava2 踩過的一些坑1.未解除訂閱而引起的內(nèi)存泄漏舉個(gè)例子,對(duì)于前面常用操作符 interval 做周期性操作的例子,并沒有使之停下來的,沒有去控制訂閱的生命周期,這樣,就有可能引發(fā)內(nèi)存泄漏。所以,在 Activity 的 onDestroy() 方法執(zhí)行的時(shí)候或者不需要繼續(xù)執(zhí)行的時(shí)候應(yīng)該解除訂閱。
|