RxJava2 - 基础扫盲&响应式编程

响应式编程是一种面向数据流和变化传播的编程范式,数据流就像一条河:它可以被观测,被过滤,被操作,或者为新的消费者与另外一条流合并为一条新的流。

最基本的代码流程和概念

接口
Observable 可被观察的
Observer 观察者
1
2
3
public interface ObservableSource<T> {
void subscribe(Observer<? super T> var1);
}
1
2
3
4
5
6
7
8
9
public interface Observer<T> {
void onSubscribe(Disposable var1);
void onNext(T var1);
void onError(Throwable var1);
void onComplete();
}

Observable 通过 subscribe 方法订阅 Observer,通常来说我们把 Observable 叫成 事件流 或者 数据流

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Observable.just("Hello"
.map(new Function<String, Integer>() {
@Override
public Integer apply(String s) throws Exception {
System.out.println("map: " + s);
return s.hashCode();
}
})
.subscribe(new DisposableObserver<Integer>() {
@Override
public void onNext(Integer o) {
System.out.println("onNext: " + o);
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
});

通过 just 方法 生成一个 Observable,然后使用 map 方法 转换成 Integer,最终使用 subscribe 订阅一个 Observer

执行结果:

1
2
3
map: Hello
onNext: 69609650
onComplete

操作符(Operators)

数据流就像一条河:它可以被观测,被过滤,被操作,或者为新的消费者与另外一条流合并为一条新的流。

那么操作符就是完成这件事的。

操作符可分成下面几大类:

  1. 创建型
  2. 转换型
  3. 过滤型
  4. 组合型
  5. 异常处理型
  6. 工具型
  7. 条件判断型
  8. 数学集合型
  9. Backpressure
  10. 连接型
  11. 运算转换型

有没有被吓到,RxJava 定义了丰富的操作符,光是类型就有 11 种。不过不用担心常用的其实也就那么几种,拿上面的示例举例:

just 方法 是一个 创建型操作符,map 方法 则是一个 转换型操作符,具体所有的操作符可以先查阅官方文档,本篇作为基础扫盲不做过多介绍,后面专门开篇文章详解。

调度器(Scheduler)

调度器可以修改数据流执行的线程:

ObserveOn 操作符 定义 Observer 在哪个线程
SubscribeOn 操作符 定义 Observable 在哪个线程

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
Observable.just("Hello"
.subscribeOn(Schedulers.io())
.map(new Function<String, Integer>() {
@Override
public Integer apply(String s) throws Exception {
System.out.println("map: " + s + " - " + Thread.currentThread().getName());
return s.hashCode();
}
})
.observeOn(Schedulers.newThread())
.subscribe(new DisposableObserver<Integer>() {
@Override
public void onNext(Integer o) {
System.out.println("onNext: " + o + " - " + Thread.currentThread().getName());
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
System.out.println("onComplete" + " - " + Thread.currentThread().getName());
}
});

执行结果:

1
2
3
map: Hello - RxCachedThreadScheduler-1
onNext: 69609650 - RxNewThreadScheduler-1
onComplete - RxNewThreadScheduler-1

这里需要提到的是,subscribeOnobserveOn 同样也是操作符,他们属于工具型操作符。

Single

Single 有点类似 Observable,但它不会重复发出多个事件流,而是只发出一个 成功 或者 失败。

1
2
3
public interface SingleSource<T> {
void subscribe(SingleObserver<? super T> var1);
}
1
2
3
4
5
6
7
public interface SingleObserver<T> {
void onSubscribe(Disposable var1);
void onSuccess(T var1);
void onError(Throwable var1);
}

这里大家可以看到出 Single 的接口设计,这就是为什么说Single 有点类似 Observable 的原因。

1
2
ObservableSource -->> Observer
SingleSource -->> SingleObserver

示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Observable.just("Hello", "World"
.map(new Function<String, Integer>() {
@Override
public Integer apply(String s) throws Exception {
System.out.println("map: " + s);
return s.hashCode();
}
})
.last(1)
.subscribe(new SingleObserver<Integer>() {
@Override
public void onSubscribe(Disposable disposable) {
}
@Override
public void onSuccess(Integer integer) {
System.out.println("onSuccess:" + integer);
}
@Override
public void onError(Throwable throwable) {
}
});

相关资料

Gavin Liu wechat
欢迎您扫一扫上面的二维码,订阅我的微信公众号!