一、描述
合并多个Observables的发射物。
使用Merge操作符你可以将多个Observables的输出合并,就好像它们是一个单个的Observable一样。
Merge可能会让合并的Observables发射的数据交错(有一个类似的操作符Concat不会让数据交错,它会按顺序一个接着一个发射多个Observables的发射物)。
正如图例上展示的,任何一个原始Observable的onError通知会被立即传递给观察者,而且会终止合并后的Observable。
在很多ReactiveX实现中还有一个叫MergeDelayError的操作符,它的行为有一点不同,它会保留onError通知直到合并后的Observable所有的数据发射完成,在那时它才会把onError传递给观察者。
RxJava将它实现为merge, mergeWith和mergeDelayError。
除了传递多个Observable给merge,你还可以传递一个Observable列表List,数组,甚至是一个发射Observable序列的Observable,merge将合并它们的输出作为单个Observable的输出。
如果你传递一个发射Observables序列的Observable,你可以指定merge应该同时订阅的Observable’的最大数量。一旦达到订阅数的限制,它将不再订阅原始Observable发射的任何其它Observable,直到某个已经订阅的Observable发射了onCompleted通知。
merge是静态方法,mergeWith是对象方法,举个例子,Observable.merge(odds,evens)等价于odds.mergeWith(evens)。
如果传递给merge的任何一个的Observable发射了onError通知终止了,merge操作符生成的Observable也会立即以onError通知终止。如果你想让它继续发射数据,在最后才报告错误,可以使用mergeDelayError。
二、示意图
三、示例代码
1 | Observable just1 = Observable.just(1, 2); |
四、运行结果
1 | onNext: 1 |
项目代码已上传到Github:https://github.com/SherlockShi/RxJavaBestPractise
五、更多
merge操作符还有以下变体:
- merge(Iterable)
- merge(Iterable,int)
- merge(Observable[])
- merge(Observable,Observable)
- merge(Observable)
- merge(Observable,int)
跟merge相关的操作符还有:
- mergeDelayError(Observable)
- mergeDelayError(Observable,Observable)
详情可查阅下面的参考资料。
六、参考资料
PS:欢迎关注 SherlockShi 个人博客