RxJava操作符实践:10_连接操作之1_connect

一、描述

让一个可连接的Observable开始发射数据给订阅者。

可连接的Observable (connectable Observable)与普通的Observable差不多,不过它并不会在被订阅时开始发射数据,而是直到使用了Connect操作符时才会开始。用这个方法,你可以等待所有的观察者都订阅了Observable之后再开始发射数据。

RxJava中connect是ConnectableObservable接口的一个方法,使用publish操作符可以将一个普通的Observable转换为一个ConnectableObservable。

调用ConnectableObservable的connect方法会让它后面的Observable开始给发射数据给订阅者。

connect方法返回一个Subscription对象,可以调用它的unsubscribe方法让Observable停止发射数据给观察者。

即使没有任何订阅者订阅它,你也可以使用connect方法让一个Observable开始发射数据(或者开始生成待发射的数据)。这样,你可以将一个”冷”的Observable变为”热”的。

二、示意图

connect

三、示例代码

1. 未使用connect操作符时

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
Observable observable = Observable.range(1, 1000000)
.sample(10, TimeUnit.MILLISECONDS);

observable.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("onCompleted1.");
}

@Override
public void onError(Throwable e) {
System.out.println("onError1: " + e.getMessage());
}

@Override
public void onNext(Integer integer) {
System.out.println("onNext1: " + integer);
}
});

observable.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("onCompleted2.");
}

@Override
public void onError(Throwable e) {
System.out.println("onError2: " + e.getMessage());
}

@Override
public void onNext(Integer integer) {
System.out.println("onNext2: " + integer);
}
});

2. 运行结果

1
2
3
4
5
6
7
8
9
10
onNext1: 15281
onNext1: 40401
...
onNext1: 983620
onCompleted1.
onNext2: 25895
onNext2: 48456
...
onNext2: 983356
onCompleted2.

可见Observable在订阅的时候就开始发射数据,导致两个观察者收到的数据是不一样的。

3. 使用connect操作符后

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
ConnectableObservable observable = Observable.range(1, 1000000).sample(10, TimeUnit.MILLISECONDS).publish();

observable.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("onCompleted1.");
}

@Override
public void onError(Throwable e) {
System.out.println("onError1: " + e.getMessage());
}

@Override
public void onNext(Integer integer) {
System.out.println("onNext1: " + integer);
}
});

observable.subscribe(new Subscriber<Integer>() {
@Override
public void onCompleted() {
System.out.println("onCompleted2.");
}

@Override
public void onError(Throwable e) {
System.out.println("onError2: " + e.getMessage());
}

@Override
public void onNext(Integer integer) {
System.out.println("onNext2: " + integer);
}
});

observable.connect();

四、运行结果

1
2
3
4
5
6
7
8
9
onNext1: 20491
onNext2: 20491
onNext1: 39191
onNext2: 39191
...
onNext1: 997372
onNext2: 997372
onCompleted1.
onCompleted2.

可见在订阅的时候,并不会开始发射数据,只有等到connect连接后,才开始发射数据,所以两个观察者接收到的数据是一样的。

五、参考资料

ReactiveX官方文档

ReactiveX文档中文翻译

PS:欢迎关注 SherlockShi 个人博客

感谢你的支持,让我继续努力分享有用的技术和知识点!