RxJava操作符实践:4_结合操作之6_join

一、描述

任何时候,只要在另一个Observable发射的数据定义的时间窗口内,这个Observable发射了一条数据,就结合两个Observable发射的数据。

Join操作符结合两个Observable发射的数据,基于时间窗口(你定义的针对每条数据特定的原则)选择待集合的数据项。你将这些时间窗口实现为一些Observables,它们的生命周期从任何一条Observable发射的每一条数据开始。当这个定义时间窗口的Observable发射了一条数据或者完成时,与这条数据关联的窗口也会关闭。只要这条数据的窗口是打开的,它将继续结合其它Observable发射的任何数据项。你定义一个用于结合数据的函数。

join默认不在任何特定的调度器上执行。

二、示意图

join

三、示例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
Observable create1 = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i < 6; i++) {
subscriber.onNext(i);

try {
Thread.sleep(600);
} catch (InterruptedException e) {
subscriber.onError(e);
}
}
}
}).subscribeOn(Schedulers.newThread());

Observable create2 = Observable.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
for (int i = 0; i < 4; i++) {
subscriber.onNext(i);

try {
Thread.sleep(1000);
} catch (InterruptedException e) {
subscriber.onError(e);
}
}
}
}).subscribeOn(Schedulers.newThread());

create1.join(create2,
new Func1<Integer, Observable<Long>>() {
@Override
public Observable<Long> call(Integer integer) {
return Observable.timer(1000, TimeUnit.MILLISECONDS);
}
},
new Func1<Integer, Observable<Long>>() {
@Override
public Observable<Long> call(Integer integer) {
return Observable.timer(1000, TimeUnit.MILLISECONDS);
}
},
new Func2<Integer, Integer, String>() {
@Override
public String call(Integer integer1, Integer integer2) {
return integer1 + "-" + integer2;
}
})
.subscribe(new Subscriber<String>() {
@Override
public void onCompleted() {
System.out.println("onCompleted.");
}

@Override
public void onError(Throwable e) {
System.out.println("onError: " + e.getMessage());
}

@Override
public void onNext(String s) {
System.out.println("onNext: " + s);
}
});

四、运行结果

1
2
3
4
5
6
7
8
9
10
onNext: 0-0
onNext: 1-0
onNext: 1-1
onNext: 2-1
onNext: 3-1
onNext: 3-2
onNext: 2-2
onNext: 4-2
onNext: 4-3
onNext: 5-3

注意点:
join()方法有4个参数:

  1. Observable right:目标Observable,也可以称为右Observable;与之相对应的是源Observable,也可以称为左Observable。
  2. rx.functions.Func1> leftDurationSelector:左Observable发射的数据的有效期。
  3. rx.functions.Func1> rightDurationSelector:右Observable发射的数据的有效期。
  4. rx.functions.Func2 resultSelector):左Observable和右Observable发射的数据的结合方法。

join2

这个方法比较难理解的地方在于中间两个有效期参数,比如上面的例子,左Observable每隔0.6秒发射一个数字,从0-5;右Observable每隔1秒发射一个数字,从0-3;结合上面的例子和示意图,在每一个Observable发射数据的时候:

  1. 0.0s时:源Observable目标Observable都发射0,这时结合后的数据为:0-0;
  2. 0.6s时:源Observable发射1,这时会去寻找目标Observable有没有在有效期内的数据,发现目标Observable的0在有效期内,所以结合后的数据为:1-0;
  3. 1.0s时:目标Observable发射1,这时会去寻找源Observable有没有在有效期内的数据,发现源Observable的1在有效期内,所以结合后的数据为:1-1;

项目代码已上传到Github:https://github.com/SherlockShi/RxJavaBestPractise

五、更多

跟join相关的操作符还有:

  • groupJoin

详情可查阅下面的参考资料。

六、参考资料

ReactiveX官方文档

ReactiveX文档中文翻译

PS:欢迎关注 SherlockShi 个人博客

感谢你的支持,让我继续努力分享有用的技术和知识点!