RxJava操作符实践:10_连接操作之3_replay

一、描述

保证所有的观察者收到相同的数据序列,即使它们在Observable开始发射数据之后才订阅。

可连接的Observable (connectable Observable)与普通的Observable差不多,不过它并不会在被订阅时开始发射数据,而是直到使用了Connect操作符时才会开始。用这种方法,你可以在任何时候让一个Observable开始发射数据。

如果在将一个Observable转换为可连接的Observable之前对它使用Replay操作符,产生的这个可连接Observable将总是发射完整的数据序列给任何未来的观察者,即使那些观察者在这个Observable开始给其它观察者发射数据之后才订阅。

二、示意图

replay

三、示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
final ConnectableObservable observable = Observable.range(1, 6).replay();

final Subscriber subscriber2 = new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("onCompleted2.");
}

@Override
public void onError(Throwable e) {
System.out.println("onError2: " + e.getMessage());
}

@Override
public void onNext(Integer integer) {
System.out.println("onNext2: " + integer);
}
};

Subscriber subscriber1 = new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("onCompleted1.");
}

@Override
public void onError(Throwable e) {
System.out.println("onError1: " + e.getMessage());
}

@Override
public void onNext(Integer integer) {
if (integer == 3) {
observable.subscribe(subscriber2);
}
System.out.println("onNext1: " + integer);
}
};

observable.subscribe(subscriber1);

observable.connect();

四、运行结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
onNext1: 1
onNext1: 2
onNext2: 1
onNext2: 2
onNext2: 3
onNext1: 3
onNext1: 4
onNext2: 4
onNext1: 5
onNext2: 5
onNext1: 6
onNext2: 6
onCompleted1.
onCompleted2.

Replay操作符返回一个Connectable Observable 对象并且可以缓存其发射过的数据,这样即使有订阅者在其发射数据之后进行订阅也能收到其之前发射过的数据。不过使用Replay操作符我们最好还是限定其缓存的大小,否则缓存的数据太多了可会占用很大的一块内存。对缓存的控制可以从空间和时间两个方面来实现。

五、更多

replay操作符还有以下变体:

  • replay()
  • replay(int)
  • replay(long,TimeUnit)
  • replay(int,long,TimeUnit)
  • replay(Func1)
  • replay(Func1,int)
  • replay(Func1,long,TimeUnit)
  • replay(Func1,int,long,TimeUnit)

详情可查阅下面的参考资料。

六、参考资料

ReactiveX官方文档

ReactiveX文档中文翻译

PS:欢迎关注 SherlockShi 个人博客

感谢你的支持,让我继续努力分享有用的技术和知识点!