RxJava操作符实践:4_结合操作之5_combineLatest

一、描述

当两个Observables中的任何一个发射了数据时,使用一个函数结合每个Observable发射的最近数据项,并且基于这个函数的结果发射数据。

CombineLatest操作符行为类似于zip,但是只有当原始的Observable中的每一个都发射了一条数据时zip才发射数据。CombineLatest则在原始的Observable中任意一个发射了数据时发射一条数据。当原始Observables的任何一个发射了一条数据时,CombineLatest使用一个函数结合它们最近发射的数据,然后发射这个函数的返回值。

RxJava将这个操作符实现为combineLatest,它接受二到九个Observable作为参数,或者单个Observables列表作为参数。它默认不在任何特定的调度器上执行。

二、示意图

combineLatest

combineLatest2

三、示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
Observable create1 = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i < 3; i++) {
subscriber.onNext(i);

try {
Thread.sleep(1000);
} catch (InterruptedException e) {
subscriber.onError(e);
}
}
}
}).subscribeOn(Schedulers.newThread());

Observable create2 = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i < 3; i++) {
subscriber.onNext(i);

try {
Thread.sleep(600);
} catch (InterruptedException e) {
subscriber.onError(e);
}
}
}
}).subscribeOn(Schedulers.newThread());

subscriber = new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted.");
}

@Override
public void onError(Throwable e) {
System.out.println("onError: " + e.getMessage());
}

@Override
public void onNext(String s) {
System.out.println("onNext: " + s);
}
};

Observable.combineLatest(create1, create2, new Func2<Integer, Integer, String>() {
@Override
public String call(Integer integer1, Integer integer2) {
return integer1 + "-" + integer2;
}
}).subscribe(subscriber);

四、运行结果

1
2
3
4
5
6
onNext: 0-0
onNext: 0-1
onNext: 1-1
onNext: 1-2
onNext: 2-2
onCompleted.

注意点:
create操作符生成数据,需在新的线程上执行,否则会出现先发射完第一个create的数据,再发射第二个create的数据。

项目代码已上传到Github:https://github.com/SherlockShi/RxJavaBestPractise

五、更多

combineLatest操作符还有以下变体:

  • combineLatest(List,FuncN)
  • combineLatest(Observable,Observable,Func2)

跟combineLatest相关的操作符还有:

  • withLatestFrom

详情可查阅下面的参考资料。

六、参考资料

ReactiveX官方文档

ReactiveX文档中文翻译

PS:欢迎关注 SherlockShi 个人博客

感谢你的支持,让我继续努力分享有用的技术和知识点!