RxJava操作符实践:10_连接操作之4_refCount

一、描述

让一个可连接的Observable行为像普通的Observable。

可连接的Observable (connectable Observable)与普通的Observable差不多,不过它并不会在被订阅时开始发射数据,而是直到使用了Connect操作符时才会开始。用这种方法,你可以在任何时候让一个Observable开始发射数据。

RefCount操作符把从一个可连接的Observable连接和断开的过程自动化了。它操作一个可连接的Observable,返回一个普通的Observable。当第一个订阅者订阅这个Observable时,RefCount连接到下层的可连接Observable。RefCount跟踪有多少个观察者订阅它,直到最后一个观察者完成才断开与下层可连接Observable的连接。

二、示意图

refCount

三、示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
System.out.println("<========before refCount()=======>");

ConnectableObservable connectableObservable = Observable.range(1, 50000).sample(10, TimeUnit.MILLISECONDS).publish();

connectableObservable.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("onCompleted1.");
}

@Override
public void onError(Throwable e) {
System.out.println("onError1: " + e.getMessage());
}

@Override
public void onNext(Integer integer) {
System.out.println("onNext1: " + integer);
}
});

connectableObservable.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("onCompleted2.");
}

@Override
public void onError(Throwable e) {
System.out.println("onError2: " + e.getMessage());
}

@Override
public void onNext(Integer integer) {
System.out.println("onNext2: " + integer);
}
});

connectableObservable.connect();

System.out.println("<========after refCount()=======>");

Observable observable = connectableObservable.refCount();

observable.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("onCompleted3.");
}

@Override
public void onError(Throwable e) {
System.out.println("onError3: " + e.getMessage());
}

@Override
public void onNext(Integer integer) {
System.out.println("onNext3: " + integer);
}
});

observable.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("onCompleted4.");
}

@Override
public void onError(Throwable e) {
System.out.println("onError4: " + e.getMessage());
}

@Override
public void onNext(Integer integer) {
System.out.println("onNext4: " + integer);
}
});

四、运行结果

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
<========before refCount()=======>
onNext1: 11653
onNext2: 11653
onNext1: 21317
onNext2: 21317
onNext1: 32731
onNext2: 32731
onNext1: 40808
onNext2: 40808
onCompleted1.
onCompleted2.
<========after refCount()=======>
onNext3: 15517
onNext3: 30592
onNext3: 45208
onCompleted3.
onNext4: 16052
onNext4: 16090
onNext4: 20936
onNext4: 31274
onNext4: 47841
onCompleted4.

由运行结果可以看出,RefCount操作符将一个Connectable Observable 对象重新转化为一个普通的Observable对象,这时候订阅者进行订阅将会触发数据的发射。

五、更多

refCount操作符还有以下变体:

  • share()

详情可查阅下面的参考资料。

六、参考资料

ReactiveX官方文档

ReactiveX文档中文翻译

PS:欢迎关注 SherlockShi 个人博客

感谢你的支持,让我继续努力分享有用的技术和知识点!