Reactive Stream响应式流API
概念
Reactive Stream (响应式流/反应流) 是JDK9引入的一套标准,是一套基于发布/订阅模式的数据处理规范。 响应式流从2013年开始,作为提供非阻塞背压的异步流处理标准的倡议。它旨在解决处理元素流的问题——如何将元素流从发布者传递到订阅者, 而不需要发布者阻塞,或订阅者有无限制的缓冲区或丢弃。 更确切地说,Reactive流目的是“找到最小的一组接口,方法和协议,用来描述必要的操作和实体以实现这样的目标:以非阻塞背压方式实现数据的异步流”。
响应式流规范定义了如下四个接口
接口 | 作用 |
---|---|
Publisher | 消息发布者 |
Subscriber | 消息订阅者 |
Subscription | 发布者和订阅者连接方法 |
Processor | 处理器 |
null
综合示例
java
public class ReactiveStreamSpec {
public static void main(String[] args) throws InterruptedException {
// 生产50个数字
SubmissionPublisher<Integer> producer = new SubmissionPublisher<>();
producer.subscribe(new BinaryProcessor());
IntStream.range(0, 50).forEach(producer::submit);
// 生产完成
producer.close();
TimeUnit.SECONDS.sleep(3);
}
// 数字转二进制字符串处理器
static class BinaryProcessor extends SubmissionPublisher<String> implements Flow.Processor<Integer, String> {
Flow.Subscription subscription;
public BinaryProcessor() {
super();
super.subscribe(new BinarySubscriber());
super.subscribe(new LimitedBinarySubscriber());
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
this.subscription.request(1L);
}
@Override
public void onNext(Integer item) {
System.out.printf("on next in binary processor %d \n", item);
this.subscription.request(1L);
super.submit(Integer.toBinaryString(item));
}
@Override
public void onError(Throwable throwable) {
System.out.println(throwable.getMessage());
throwable.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("binary processor complete");
// 触发消费完成
this.close();
}
}
// 数量限制消费者
static class LimitedBinarySubscriber extends BinarySubscriber {
AtomicInteger counter = new AtomicInteger();
@Override
public void onNext(String item) {
counter.getAndAdd(1);
System.out.printf("on next in limited binary subscriber %s\n", item);
if (counter.get() >= 5) {
super.subscription.cancel();
}
this.subscription.request(1L);
}
@Override
public void onComplete() {
System.out.println("odd binary subscriber complete");
}
}
// 二进制数字字符串消费者
static class BinarySubscriber implements Flow.Subscriber<String> {
Flow.Subscription subscription;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
this.subscription.request(1L);
}
@Override
public void onNext(String item) {
System.out.printf("on next in binary subscriber %s\n", item);
this.subscription.request(1L);
}
@Override
public void onError(Throwable throwable) {
System.out.println(throwable.getMessage());
throwable.printStackTrace();
this.subscription.cancel();
}
@Override
public void onComplete() {
System.out.println("binary subscriber complete");
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98