RxJava操作符实践:4_结合操作之2_merge

一、描述

合并多个Observables的发射物。

使用Merge操作符你可以将多个Observables的输出合并,就好像它们是一个单个的Observable一样。

Merge可能会让合并的Observables发射的数据交错(有一个类似的操作符Concat不会让数据交错,它会按顺序一个接着一个发射多个Observables的发射物)。

正如图例上展示的,任何一个原始Observable的onError通知会被立即传递给观察者,而且会终止合并后的Observable。

在很多ReactiveX实现中还有一个叫MergeDelayError的操作符,它的行为有一点不同,它会保留onError通知直到合并后的Observable所有的数据发射完成,在那时它才会把onError传递给观察者。

RxJava将它实现为merge, mergeWith和mergeDelayError。

除了传递多个Observable给merge,你还可以传递一个Observable列表List,数组,甚至是一个发射Observable序列的Observable,merge将合并它们的输出作为单个Observable的输出。

如果你传递一个发射Observables序列的Observable,你可以指定merge应该同时订阅的Observable’的最大数量。一旦达到订阅数的限制,它将不再订阅原始Observable发射的任何其它Observable,直到某个已经订阅的Observable发射了onCompleted通知。

merge是静态方法,mergeWith是对象方法,举个例子,Observable.merge(odds,evens)等价于odds.mergeWith(evens)。

如果传递给merge的任何一个的Observable发射了onError通知终止了,merge操作符生成的Observable也会立即以onError通知终止。如果你想让它继续发射数据,在最后才报告错误,可以使用mergeDelayError。

二、示意图

merge

三、示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Observable just1 = Observable.just(1, 2);
Observable just2 = Observable.just(6, 7, 8);

Observable.merge(just1, just2)
.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("onCompleted.");
}

@Override
public void onError(Throwable e) {
System.out.println("onError: " + e.getMessage());
}

@Override
public void onNext(Integer integer) {
System.out.println("onNext: " + integer);
}
});

四、运行结果

1
2
3
4
5
6
onNext: 1
onNext: 2
onNext: 6
onNext: 7
onNext: 8
onCompleted.

项目代码已上传到Github:https://github.com/SherlockShi/RxJavaBestPractise

五、更多

merge操作符还有以下变体:

  • merge(Iterable)
  • merge(Iterable,int)
  • merge(Observable[])
  • merge(Observable,Observable)
  • merge(Observable)
  • merge(Observable,int)

跟merge相关的操作符还有:

  • mergeDelayError(Observable)
  • mergeDelayError(Observable,Observable)

详情可查阅下面的参考资料。

六、参考资料

ReactiveX官方文档

ReactiveX文档中文翻译

PS:欢迎关注 SherlockShi 个人博客

感谢你的支持,让我继续努力分享有用的技术和知识点!