主页 > 其他  > 

响应式编程ReactorAPI大全(中)


Reactor 是一个基于响应式编程的库,主要用于构建异步和事件驱动的应用程序。Reactor 提供了丰富的 API,包括创建、转换、过滤、组合等操作符,用于处理异步数据流。以下是一些 Reactor 的主要 API 示例:

pom依赖

<dependencyManagement> <dependencies> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-bom</artifactId> <version>2023.0.0</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <dependencies> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-core</artifactId> </dependency> <dependency> <groupId>io.projectreactor</groupId> <artifactId>reactor-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.junit.jupiter</groupId> <artifactId>junit-jupiter</artifactId> <version>5.7.2</version> <scope>test</scope> </dependency> </dependencies> 22. 使用 Reactor 的 elapsed 方法进行时间测量

elapsed 方法可以用于测量元素发射之间的时间间隔,返回包含时间间隔和元素的元组。

import reactor.core.publisher.Flux; import java.time.Duration; public class ReactorElapsedExample { public static void main(String[] args) throws InterruptedException { Flux<Integer> source = Flux.just(1, 2, 3, 4, 5) .delayElements(Duration.ofSeconds(1)); source.elapsed() .subscribe(tuple -> { long elapsedTime = tuple.getT1(); int value = tuple.getT2(); System.out.println("Elapsed Time: " + elapsedTime + "ms, Value: " + value); }); Thread.sleep(23333); } } 23. 使用 Reactor 的 cache 方法进行结果缓存

cache 方法可以用于缓存结果,避免多次计算相同的数据流。

import reactor.core.publisher.Flux; public class ReactorCacheExample { public static void main(String[] args) { Flux<Integer> source = Flux.range(1, 3) .log() //日志 .cache(); source.subscribe(System.out::println); // 输出: 1, 2, 3 source.subscribe(System.out::println); // 输出: 1, 2, 3 直接从缓存中取,日志中显示,未调用request、onNext等方法 } } 24. 使用 Reactor 的 reduce 方法进行聚合操作

reduce 方法用于对数据流中的元素进行聚合操作,返回一个包含最终结果的 Mono。

import reactor.core.publisher.Flux; public class ReactorReduceExample { public static void main(String[] args) { Flux<Integer> source = Flux.range(1, 5); source.reduce(Integer::sum) .subscribe(result -> System.out.println("Sum: " + result)); // 输出: Sum: 15 } } 25. 使用 Reactor 的 interval 方法进行周期性操作

interval 方法可以用于创建一个周期性的数据流,用于执行定时任务。

import reactor.core.publisher.Flux; import java.time.Duration; public class ReactorIntervalExample { public static void main(String[] args) throws InterruptedException { Flux.interval(Duration.ofSeconds(1)) .take(5) // 限制产生的元素数量 .subscribe(System.out::println); Thread.sleep(233333); } } 26. 使用 Reactor 的 onErrorContinue 方法进行错误处理

onErrorContinue 方法允许在发生错误时继续处理数据流,并提供一个处理函数,用于处理错误。

import reactor.core.publisher.Flux; public class ReactorOnErrorContinueExample { public static void main(String[] args) { Flux<Integer> source = Flux.just(1, 2, 0, 4, 5); // 在发生除零错误时继续处理数据流 source.map(x -> 10 / x) .onErrorContinue((error, value) -> { //10/0触发的异常会在最后打印 System.err.println("Error: " + error.getMessage() + ", Value: " + value); }) .subscribe(System.out::println); } } 28. 使用 Reactor 的 materialize 方法进行错误通知

materialize 方法用于将正常元素和错误信息封装为通知对象,使得错误信息也成为数据流的一部分。

import reactor.core.publisher.Flux; public class ReactorMaterializeExample { public static void main(String[] args) { Flux<Integer> source = Flux.just(1, 2, 0, 4, 5); // 将正常元素和错误信息封装为通知对象 source.map(x -> 10 / x) .materialize() .subscribe(System.out::println); } } 29. 使用 Reactor 的 expand 方法进行递归操作

expand 方法用于对数据流进行递归操作,产生新的元素并加入数据流。

import reactor.core.publisher.Flux; public class ReactorExpandExample { public static void main(String[] args) { Flux<Integer> source = Flux.just(1, 2, 3); // 对数据流进行递归操作,每个元素产生两个新元素 source.expand(value -> Flux.just(value * 2, value * 3)) .take(22) // 限制产生的元素数量 .subscribe(System.out::println); //原始 新元素 ->新元素 ->新元素... //1 2 3 -> 2 3 4 6 6 9 ->4 6 6 9 8 12 12 18 12 18 18 27 -> 8 ... } } 30. 使用 Reactor 的 checkpoint 方法进行调试

checkpoint 方法用于在操作链中设置断点,以便在调试时更容易定位问题。

import reactor.core.publisher.Flux; public class ReactorCheckpointExample { public static void main(String[] args) { Flux<Integer> source = Flux.range(1, 5); // 在操作链中设置断点 source.checkpoint("Initial Source") .map(x -> x * 2) .checkpoint("Mapped Source") .subscribe(System.out::println); } } 好像没啥用 31. 使用 Reactor 的 groupBy 方法进行分组操作

groupBy 方法用于将数据流中的元素进行分组,返回一个 GroupedFlux。

import reactor.core.publisher.Flux; import reactor.core.publisher.GroupedFlux; public class ReactorGroupByExample { public static void main(String[] args) { Flux<Integer> source = Flux.range(1, 10); // 将数据流中的元素按奇偶分组 Flux<GroupedFlux<String, Integer>> groupedFlux = source.groupBy(value -> value % 2 == 0 ? "Even" : "Odd"); groupedFlux.subscribe(group -> { String key = group.key(); group.subscribe(value -> System.out.println(key + ": " + value)); }); } } 32. 使用 Reactor 的 concatMap 方法进行顺序操作

concatMap 方法用于对数据流中的元素进行顺序操作,并保持元素的相对顺序。

import reactor.core.publisher.Flux; public class ReactorConcatMapExample { public static void main(String[] args) { Flux<Integer> source = Flux.range(1, 3); // 对每个元素进行异步操作,保持相对顺序 source.concatMap(value -> Flux.just(value * 2).log()) .subscribe(System.out::println); } } 33. 使用 Reactor 的 block 方法获取结果

在某些情况下,可以使用 block 方法来阻塞等待数据流的完成,并获取最终结果。

import reactor.core.publisher.Flux; public class ReactorBlockExample { public static void main(String[] args) { Flux<Integer> source = Flux.range(1, 3); // 阻塞等待数据流的完成,并获取最终结果 Integer result = source.reduce((x, y) -> x + y).block(); System.out.println("Sum: " + result); // 输出: Sum: 6 } } 35. 使用 Reactor 的 doFinally 方法进行清理操作

doFinally 方法用于在数据流完成时执行清理操作,无论是正常完成还是发生错误。

import reactor.core.publisher.Flux; public class ReactorDoFinallyExample { public static void main(String[] args) { Flux<Integer> source = Flux.range(1, 3); source .doFinally(signalType -> System.out.println("Finally: " + signalType)) .subscribe(System.out::println); } } 36. 使用 Reactor 的 log 方法进行日志记录

log 方法用于在操作链中添加日志记录,以便更好地了解数据流的处理过程。

import reactor.core.publisher.Flux; public class ReactorLogExample { public static void main(String[] args) { Flux<Integer> source = Flux.range(1, 3); source.log() .subscribe(System.out::println); } } 37. 使用 Reactor 的 create 方法创建自定义 Publisher

create 方法用于创建自定义的 Flux 或 Mono,通过编程方式发射元素和控制订阅。

import reactor.core.publisher.Flux; public class ReactorCreate2Example { public static void main(String[] args) { Flux<Integer> customFlux = Flux.create(emitter -> { for (int i = 1; i <= 5; i++) { emitter.next(i); } emitter.complete(); }); customFlux.subscribe(System.out::println); } } 38. 使用 Reactor 的 sample 方法进行采样操作

sample 方法用于在固定的时间间隔内从数据流中采样元素。

import reactor.core.publisher.Flux; import java.time.Duration; public class ReactorSampleExample { public static void main(String[] args) throws InterruptedException { Flux<Integer> source = Flux.range(1, 10).delayElements(Duration.ofSeconds(1)); // 模拟延迟; // 在2秒钟采样一个元素 source.sample(Duration.ofSeconds(2)) //数据源1秒一个,采用2秒一次。会漏掉部分数据 .subscribe(System.out::println); // 阻塞主线程,让采样执行完 Thread.sleep(233333); } } 41. 使用 Reactor 的 limitRate 方法进行限流

limitRate 方法用于限制数据流的速率,防止快速生产者导致的资源耗尽。

import reactor.core.publisher.Flux; public class ReactorLimitRateExample { public static void main(String[] args) { Flux<Integer> source = Flux.range(1, 1000).log(); // 限制数据流的速率为每秒产生100个元素 source.limitRate(100) //一次预取100个元素; 第一次 request(100),以后request(75) (100*75=75) .subscribe( data -> { // 模拟慢速消费者 try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(data); }, error -> System.err.println("Error: " + error), () -> System.out.println("Done") ); } }

学习打卡day08:响应式编程Reactor API大全(中)

标签:

响应式编程ReactorAPI大全(中)由讯客互联其他栏目发布,感谢您对讯客互联的认可,以及对我们原创作品以及文章的青睐,非常欢迎各位朋友分享到个人网站或者朋友圈,但转载请说明文章出处“响应式编程ReactorAPI大全(中)

上一篇
MongoDB索引详解

下一篇
googledriveapi