闽公网安备 35020302035485号

Flux<T>是一个标准的Publisher<T>,表示一个异步的0到N个发出的项目序列,可选择终止于完成信号或错误信号。根据Reactive Streams规范,这三种类型的信号转换为对下游Subscriber的onNext、onComplete和onError方法的调用。

Mono<T>是一种特殊的Publisher<T>,通过onNext信号发出最多一个项目,然后通过onComplete信号终止(成功的Mono,有或没有值),或者只发出一个onError信号(失败的Mono)。
大多数Mono实现在调用onNext后立即调用其Subscriber的onComplete。Mono.never()是一个例外:它不发出任何信号,在技术上并不禁止,但在测试之外没有太大用处。另一方面,明确禁止使用onNext和onError的组合。
// 堆代码 duidaima.com
package top.emanjusaka;
import reactor.core.publisher.Flux;
public class Main {
public static void main(String[] args) {
Flux<String> flux = Flux.just("Hello", "emanjusaka", "!");
flux.subscribe(System.out::println);
}
}
// 输出
Hello
emanjusaka
!
2.创建一个Mono,发出一个字符串元素并订阅打印出来:package top.emanjusaka;
import reactor.core.publisher.Mono;
public class Main {
public static void main(String[] args) {
Mono<String> mono = Mono.just("Hello");
mono.subscribe(System.out::println);
}
}
// 输出
Hello
3.使用Flux的操作符进行元素转换和过滤:package top.emanjusaka;
import reactor.core.publisher.Flux;
public class Main {
public static void main(String[] args) {
Flux<Integer> numbers = Flux.range(1, 10);
numbers.map(num -> num * 2)
.filter(num -> num % 3 == 0)
.subscribe(System.out::println);
}
}
// 输出
6
12
18
4.使用Mono的操作符进行元素转换和错误处理:package top.emanjusaka;
import reactor.core.publisher.Mono;
public class Main {
public static void main(String[] args) {
Mono<Integer> number = Mono.just(5);
number.map(num -> num * 2)
.doOnError(Throwable::printStackTrace)
.subscribe(System.out::println);
}
}
// 输出
10