From 6cf248dedbb5e3e23a790b08c25a5183aaab2c87 Mon Sep 17 00:00:00 2001 From: Aroooba Date: Tue, 11 Jul 2023 17:00:38 +0900 Subject: [PATCH] Move the operations on cache list to boundedElastic thread The will prevent the operations from blocking the reactive pipeline --- .../java/io/github/pellse/cohereflux/caching/CacheFactory.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cohereflux-core/src/main/java/io/github/pellse/cohereflux/caching/CacheFactory.java b/cohereflux-core/src/main/java/io/github/pellse/cohereflux/caching/CacheFactory.java index 759658bd..a66582eb 100644 --- a/cohereflux-core/src/main/java/io/github/pellse/cohereflux/caching/CacheFactory.java +++ b/cohereflux-core/src/main/java/io/github/pellse/cohereflux/caching/CacheFactory.java @@ -22,6 +22,7 @@ import io.github.pellse.util.collection.CollectionUtils; import org.reactivestreams.Publisher; import reactor.core.publisher.Mono; +import reactor.core.scheduler.Schedulers; import java.util.*; import java.util.function.BiFunction; @@ -130,6 +131,7 @@ static , ID, EID, R, RRC> RuleMapperSource(isEmptySource, ruleContext)); return entities -> cache.getAll(ids(entities, ruleContext), isEmptySource ? ids -> just(of()) : buildFetchFunction(entities, ruleContext, queryFunction)) + .subscribeOn(Schedulers.boundedElastic()) .filter(CollectionUtils::isNotEmpty) .flatMapMany(map -> fromStream(map.values().stream().flatMap(Collection::stream))) .onErrorResume(not(QueryFunctionException.class::isInstance), __ -> queryFunction.apply(entities))