【Java响应式编程】Project Reactor 实战:从 Flux 入门到 Sinks 进阶
在 Java 后端开发中,随着 Spring WebFlux 的普及(尤其是当前越来越多的LLM流失开始用到FLUX),响应式编程(Reactive Programming)变得越来越重要。很多同学可能听说过 Flux 和 Mono,但在实际业务中,如何将传统的“回调接口”或“外部事件”转换为响应式流,往往是新手最头疼的问题。
今天这篇文章,带你从最基础的 Flux 开始,一步步掌握 Reactor 中最强大的工具——Sinks。
一、 什么是 Flux?
在 Project Reactor(Spring WebFlux 的核心库)中,Flux<T> 代表一个包含 0 到 N 个元素的异步序列。
你可以把它想象成一条传送带(或者水管):
- List:是一次性把所有货物打包给你。
- Flux:是货物一件件(或一批批)随着时间推移传送过来。
1.1 最简单的 Flux 示例
1
2
3
4
5
6
|
// 创建一个包含 1, 2, 3 的流
Flux<Integer> flux = Flux.just(1, 2, 3);
// 只有订阅(subscribe)了,流才会开始流动
flux.map(i -> "数字: " + i)
.subscribe(System.out::println);
|
1.2 为什么需要 Flux?
它的强大在于操作符。你可以像处理集合一样处理异步流:过滤(filter)、转换(map)、聚合(reduce),甚至处理时间窗口(window)。
1
2
3
4
5
|
// 模拟:每 500ms 产生一个数据,只取前 5 个,且只要偶数
Flux.interval(Duration.ofMillis(500))
.take(5)
.filter(i -> i % 2 == 0)
.subscribe(i -> System.out.println("收到偶数: " + i));
|
二、 遇到的问题:如何手动触发数据?
上面的例子中,数据源都是静态的(just)或者自动生成的(interval)。
但在实际业务中,我们经常遇到这种场景:
场景1: 我有一个 MQ 的监听器,或者一个 WebSocket 的连接。当外部事件发生时(比如收到一条消息),我希望手动把这个消息“推”到 Flux 里,让下游去处理。
场景2:有一个Flux流,但是突然我需要服务端主动关闭这个流,并且让前端感知这个特殊的error断开。那么我可以创建一个errorSink。需要的时候对这个sink进行主动的推送。
这时候,Flux.just() 就无能为力了。我们需要一个**“水龙头”**,这就是 Sinks。
三、 Sinks:连接传统与响应式的桥梁
Sinks 是 Reactor 3.4.0 之后推出的新一代 API,用于替代旧的 Processor。它既是数据的生产者(我们可以往里塞数据),又是数据的发布者(可以被转成 Flux 供人订阅)。
3.1 Sinks 的三种常见模式
在使用 Sinks.many() 时,我们通常面临三种选择:
unicast() (单播):只能有一个订阅者。适合点对点消息。
multicast() (多播):支持多个订阅者。适合广播消息。
replay() (重放):多播,且新来的订阅者能看到之前的历史数据。
3.2 实战代码:将“回调接口”转换为 Flux
这是 Sinks 最经典的用法。假设我们有一个老旧的订单服务,它通过回调通知新订单:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
// 1. 这是一个传统的监听器接口
interface OrderListener {
void onNewOrder(String orderId);
}
// 2. 这是一个传统的服务,只能注册回调
class LegacyOrderService {
private OrderListener listener;
public void register(OrderListener listener) {
this.listener = listener;
}
// 模拟收到外部订单
public void fireEvent(String id) {
if (listener != null) listener.onNewOrder(id);
}
}
|
我们要把它改造成响应式的 Flux<String>,代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
|
import reactor.core.publisher.Flux;
import reactor.core.publisher.Sinks;
public class ReactiveBridge {
// 1. 创建一个 Sink,支持多播,带有背压缓存
private final Sinks.Many<String> sink = Sinks.many().multicast().onBackpressureBuffer();
public ReactiveBridge(LegacyOrderService legacyService) {
// 2. 在传统回调中,把数据“推”进 Sink
legacyService.register(orderId -> {
// tryEmitNext 是线程安全的
// 它会尝试推送,如果失败(比如没有订阅者或流被取消)会返回结果状态
sink.tryEmitNext(orderId);
});
}
// 3. 对外暴露 Flux,隐藏 Sink 的写入能力
public Flux<String> getOrderFlux() {
return sink.asFlux();
}
}
|
3.3 测试效果
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
|
public static void main(String[] args) {
LegacyOrderService oldService = new LegacyOrderService();
ReactiveBridge bridge = new ReactiveBridge(oldService);
// 订阅者 1:处理所有订单
bridge.getOrderFlux()
.subscribe(id -> System.out.println("[订阅者1] 处理订单: " + id));
// 订阅者 2:只关心 VIP 订单
bridge.getOrderFlux()
.filter(id -> id.startsWith("VIP"))
.subscribe(id -> System.out.println("[订阅者2] 发现 VIP: " + id));
// 模拟外部事件触发
System.out.println("--- 外部事件开始 ---");
oldService.fireEvent("ORDER-001");
oldService.fireEvent("VIP-888");
}
|
输出结果:
1
2
3
4
|
--- 外部事件开始 ---
[订阅者1] 处理订单: ORDER-001
[订阅者1] 处理订单: VIP-888
[订阅者2] 发现 VIP: VIP-888
|
四、 避坑指南
在使用 Sinks 时,有两点需要特别注意:
-
线程安全:
- 尽量使用
sink.tryEmitNext(data) 而不是 sink.emitNext(data, handler)。
tryEmitNext 是线程安全的,适合在多线程环境下(如 Web 请求、MQ 线程池)并发调用。
-
背压(Backpressure):
- 在创建 Sink 时,最好指定背压策略,例如
.onBackpressureBuffer()。
- 如果下游消费速度太慢,Sink 默认会缓存数据。如果缓存满了,你需要决定是丢弃新数据还是报错(
FAIL_OVERFLOW)。
五、 总结
- Flux 是水流,负责数据的异步传递和处理。
- Sinks 是水龙头,负责将外部世界的同步调用、回调事件转换成响应式水流。