2021.03.26 - [Java/Reactive] - 간단한 Flux method 테스트 (Reactive WebFlux)
위 글이 길어져서 새롭게 포스팅을 더 작성한다.
usingTest
일회성 리소스에 의존하는 스트림을 만들 때 using 메소드를 사용한다. 일반적인 try-with-resources 와 비슷한 방식이다.
package com.example.demo.flux;
import lombok.extern.slf4j.Slf4j;
import java.util.Arrays;
import java.util.Random;
@Slf4j
public class UsingConnection implements AutoCloseable {
private final Random rnd = new Random();
public Iterable<String> getData() {
int nextInt = rnd.nextInt(10);
log.debug("nextInt : {}", nextInt);
if (nextInt < 3) {
throw new RuntimeException("Communication error");
}
return Arrays.asList("Some", "data");
}
@Override
public void close() {
log.debug("IO Connection closed");
}
public static UsingConnection newConnection() {
log.debug("IO Connection created");
return new UsingConnection();
}
}
일반적으로 자원에 대한 리소스를 가져오는 코드들이 있는데 위의 소스는 그러한 것을 흉내만 낸 것이며, 리소스에 접속하지 못할 경우를 시뮬레이션 하기 위해 random 값이 3보다 작으면 일부러 에러를 내뱉고 있다.
이를 일반적인 프로그램, 그리고 리액티브 코드로 작성하면 아래와 같다.
package com.example.demo.reactive;
import com.example.demo.flux.UsingConnection;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import java.time.Duration;
import java.util.LinkedList;
import java.util.Random;
import java.util.stream.IntStream;
public class FluxTest {
Logger log = (Logger) LoggerFactory.getLogger(FluxTest.class);
@Test
public void usingTest() {
try(UsingConnection conn = UsingConnection.newConnection()) {
conn.getData().forEach(
data -> log.debug("Recived data : {}", data)
);
} catch (Exception e) {
log.debug("Error : {}", e.getMessage());
}
Flux<String> ioRequestResults = Flux.using(
UsingConnection::newConnection,
usingConnection -> Flux.fromIterable(usingConnection.getData()),
UsingConnection::close
);
ioRequestResults.subscribe(
data -> log.debug("Recived data : {}", data),
e -> log.debug("Error : {}", e.getMessage()),
() -> log.debug("Stream finished")
);
}
}
커넥션에서 가져오는 값 Some과 data가 출력되며 커넥션 오류를 재현한 경우 (random값이 3보자 작은 경우)에는 Communication error 를 보여주게 된다.
위를 실행 하게 되면 아래와 같이 출력된다.
17:08:25.852 [main] DEBUG com.example.demo.flux.UsingConnection - IO Connection created
17:08:25.857 [main] DEBUG com.example.demo.flux.UsingConnection - nextInt : 2
17:08:25.859 [main] DEBUG com.example.demo.flux.UsingConnection - IO Connection closed
17:08:25.859 [main] DEBUG com.example.demo.reactive.FluxTest - Error : Communication error
17:08:25.992 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
17:08:25.998 [main] DEBUG com.example.demo.flux.UsingConnection - IO Connection created
17:08:25.998 [main] DEBUG com.example.demo.flux.UsingConnection - nextInt : 2
17:08:26.001 [main] DEBUG com.example.demo.flux.UsingConnection - IO Connection closed
17:08:26.002 [main] DEBUG com.example.demo.reactive.FluxTest - Error : Communication error
random 값이 3 이상일 경우에는 Connection 오류가 안난 것을 시뮬레이션 한 것이므로 다음과 같이 나오게 된다.
17:20:41.833 [main] DEBUG com.example.demo.flux.UsingConnection - IO Connection created
17:20:41.839 [main] DEBUG com.example.demo.flux.UsingConnection - nextInt : 9
17:20:41.891 [main] DEBUG com.example.demo.reactive.FluxTest - Recived data : Some
17:20:41.891 [main] DEBUG com.example.demo.reactive.FluxTest - Recived data : data
17:20:41.891 [main] DEBUG com.example.demo.flux.UsingConnection - IO Connection closed
17:20:42.020 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
17:20:42.026 [main] DEBUG com.example.demo.flux.UsingConnection - IO Connection created
17:20:42.026 [main] DEBUG com.example.demo.flux.UsingConnection - nextInt : 7
17:20:42.029 [main] DEBUG com.example.demo.reactive.FluxTest - Recived data : Some
17:20:42.029 [main] DEBUG com.example.demo.reactive.FluxTest - Recived data : data
17:20:42.029 [main] DEBUG com.example.demo.flux.UsingConnection - IO Connection closed
17:20:42.030 [main] DEBUG com.example.demo.reactive.FluxTest - Stream finished
onErrorTest
외부 서비스와 연동할 경우, 문제 발생 시 예외를 처리할 수 있도록 핸들러를 지정할 수 있다.
클라이언트 입장에서 onError 이벤트를 받게 될 경우 시퀀스를 중지시킬 수 있으므로, onErrorReturn, onErrorResume, onErrorMap 등을 사용할 수 있다.
error가 날 경우 특정 값을 리턴할 수도 있고, 다른 워크플로우를 실행할 수도 있고, 다른 예외로 변환할 수도 있다.
이는 마치 일반적인 MVC 에서 Exception을 잘 처리하여 Clean Code를 만들려고 한 아래의 포스트와 비슷해 보인다.
2021.03.11 - [Java/Spring] - Exception을 활용하여 클린코드 작성하기
어째든 예제 소스를 보자. github.com/wikibook/spring5-reactive/blob/master/chapter-04/src/test/java/org/rpis5/chapters/chapter_04/ReactorEssentialsTest.java 에서 작성한 예제 코드에는 retryBackoff 메소드를 사용 하였는데, 현재 내가 사용하는 reactor-core 3.4.1 에서는 deprecated 되어서 그냥 retry를 사용하였다.
projectreactor.io/docs/core/3.3.11.RELEASE/api/deprecated-list.html 의 deprecated method 부분 참조
우선 소스를 보자.
@Test
public void onErrorTest() {
Flux.just("user-1")
.flatMap(user ->
recommendedBooks(user)
.retry(5)
.timeout(Duration.ofSeconds(3))
.onErrorResume(e -> Flux.just("The Martian"))
)
.subscribe(
b -> log.info("onNext: {}", b),
e -> log.warn("onError: {}", e.getMessage()),
() -> log.info("onComplete")
);
}
private Flux<String> recommendedBooks(String userId) {
return Flux.defer(() -> {
int rnd = random.nextInt();
log.debug("rnd : {}", rnd);
if (rnd < 7) {
return Flux.<String>error(new RuntimeException("Err"));
} else {
return Flux.just("Blue Mars", "The Expanse");
}
}).doOnSubscribe(s -> log.debug("Request for {}", userId));
}
random 값이 7보다 작을 경우 에러를 내 뱉도록 시뮬레이션 코드를 recommendedBooks에 구현 하였다.
에러가 날 경우 retry를 5번 하도록 하였다. 성공할 경우 Blue Mars와 The Expanse를 onNext에서 출력하게 되며, 5번 모두 실패하게 될 경우 onErrorResume을 통해 대체할 수 있는 스트림인 The Martian을 출력하게 된다.
출력 내용의 예는 아래와 같다.
성공하는 경우
08:50:49.739 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
08:50:49.759 [main] DEBUG com.example.demo.reactive.FluxTest - rnd : 10228240
08:50:49.762 [main] DEBUG com.example.demo.reactive.FluxTest - Request for user-1
08:50:49.762 [main] INFO com.example.demo.reactive.FluxTest - onNext: Blue Mars
08:50:49.762 [main] INFO com.example.demo.reactive.FluxTest - onNext: The Expanse
08:50:49.762 [main] INFO com.example.demo.reactive.FluxTest - onComplete
실패하는 경우
08:49:15.637 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
08:49:15.655 [main] DEBUG com.example.demo.reactive.FluxTest - rnd : -2138980707
08:49:15.657 [main] DEBUG com.example.demo.reactive.FluxTest - Request for user-1
08:49:15.657 [main] DEBUG com.example.demo.reactive.FluxTest - rnd : -647306902
08:49:15.657 [main] DEBUG com.example.demo.reactive.FluxTest - Request for user-1
08:49:15.657 [main] DEBUG com.example.demo.reactive.FluxTest - rnd : -627097766
08:49:15.657 [main] DEBUG com.example.demo.reactive.FluxTest - Request for user-1
08:49:15.657 [main] DEBUG com.example.demo.reactive.FluxTest - rnd : -70873276
08:49:15.657 [main] DEBUG com.example.demo.reactive.FluxTest - Request for user-1
08:49:15.657 [main] DEBUG com.example.demo.reactive.FluxTest - rnd : -612847718
08:49:15.657 [main] DEBUG com.example.demo.reactive.FluxTest - Request for user-1
08:49:15.657 [main] DEBUG com.example.demo.reactive.FluxTest - rnd : -1843908755
08:49:15.657 [main] DEBUG com.example.demo.reactive.FluxTest - Request for user-1
08:49:46.481 [main] INFO com.example.demo.reactive.FluxTest - onNext: The Martian
08:49:49.437 [main] INFO com.example.demo.reactive.FluxTest - onComplete
배압(BackPressure)
배압이란 Consumer가 들어오는 Stream을 모두 다 처리할 수 없어서 자신이 처리할 수 있는 개수를 reqeust하고 그에 맞는 숫자만큼 Publisher가 보내주는 것이다.
이를 위해 onBackPressureBuffer, onBackPressureDrop, onBackPressureLatest, onBackPressureError 가있다.
onBackPressureBuffer는 중간에 큐를 버퍼처럼 사용하여 컨슈머가 처리하지 못할 경우 대기(Buffer)할 수있도록 만드는 것이다.
onBackPressureDrop은 컨슈머의 처리 속도가 충분하지 않을 경우 삭제(Drop) 시키는 것이다.
onBackPressureLatest는 가장 최근의 것만 내려주는 것이다. 책에서는 onBackPressureLast 라고 되어 있는데 intellij에서 자동완성으로 해 본 결과 onBackPressureLatest 로 정의 되어 있다.
onBackPressureError의 경우 컨슈머의 속도가 충분하지 않다면 에러를 내 뱉게 만드는 것이다.
coldStreamTest
publisher의 경우 Hot과 Cold 타입이 있다. Cold의 경우에는 subscriber가 나타나기 전에는 sequence를 생성하지 않는다. 마치 http의 client가 요청을 하기 전에 server가 무엇인가를 미리 준비하지 않는 것과 같다.
다음은 cold publisher와 관련된 소스코드이다.
@Test
public void coldStreamTest() {
Flux<String> coldPublisher = Flux.defer(() -> {
log.debug("Generating new items");
return Flux.just(UUID.randomUUID().toString());
});
log.debug("No data was generated so far");
coldPublisher.subscribe(e -> log.debug("onNext1 : {}", e));
coldPublisher.subscribe(e -> log.debug("onNext2 : {}", e));
log.debug("Data was generated twice for two subscribers");
}
이를 실행하게 되면 아래와 같은 값이 나오게 된다.
09:29:46.743 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
09:29:46.745 [main] DEBUG com.example.demo.reactive.FluxTest - No data was generated so far
09:29:46.750 [main] DEBUG com.example.demo.reactive.FluxTest - Generating new items
09:29:46.919 [main] DEBUG com.example.demo.reactive.FluxTest - onNext1 : 7bfd98e0-303a-4efa-aaa0-da6eb11e917c
09:29:46.920 [main] DEBUG com.example.demo.reactive.FluxTest - Generating new items
09:29:46.921 [main] DEBUG com.example.demo.reactive.FluxTest - onNext2 : 97da4dd0-44d7-4ce7-a371-003d23501c1e
09:29:46.921 [main] DEBUG com.example.demo.reactive.FluxTest - Data was generated twice for two subscribers
coldPublisher는 데이터를 생성하고 있지 않다가 subscriber가 구독을 시작하는 시점에 값을 만들어서 리턴하게 된다.
just의 경우는 hot, defer의 경우는 cold로 동작하는데 아래의 그림에서 그 동작 원리를 간단하게 알 수 있다.
Javadoc에서도 다음과 같이 설명하고 있다.
defer
Lazily supply a Publisher every time a Subscription is made on the resulting Flux, so the actual source instantiation is deferred until each subscribe and the Supplier can create a subscriber-specific instance. If the supplier doesn't generate a new instance however, this operator will effectively behave like from(Publisher).
just
Create a Flux that emits the provided elements and then completes.
connectableFluxTest
publisher는 데이터를 1번만 생성하고, subscriber 들에게 각각 stream을 줄 수 있도록 캐슁할 수 있다. connectableFlux가 바로 그러한 역할을 한다.
@Test
public void connectableFluxTest() {
Flux<Integer> source = Flux.range(0, 3)
.doOnSubscribe(s -> log.debug("new subscription for the cold publisher"));
ConnectableFlux<Integer> conn = source.publish();
conn.subscribe(e -> log.debug("[Subscriber 1] onNext : {}", e));
conn.subscribe(e -> log.debug("[Subscriber 2] onNext : {}", e));
log.debug("all subscribers are ready, connecting");
conn.connect();
}
위의 코드를 실행해 보면 아래와 같이 나오게 된다.
12:10:47.306 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
12:10:47.321 [main] DEBUG com.example.demo.reactive.FluxTest - all subscribers are ready, connecting
12:10:47.324 [main] DEBUG com.example.demo.reactive.FluxTest - new subscription for the cold publisher
12:10:47.327 [main] DEBUG com.example.demo.reactive.FluxTest - [Subscriber 1] onNext : 0
12:10:47.328 [main] DEBUG com.example.demo.reactive.FluxTest - [Subscriber 2] onNext : 0
12:10:47.328 [main] DEBUG com.example.demo.reactive.FluxTest - [Subscriber 1] onNext : 1
12:10:47.328 [main] DEBUG com.example.demo.reactive.FluxTest - [Subscriber 2] onNext : 1
12:10:47.328 [main] DEBUG com.example.demo.reactive.FluxTest - [Subscriber 1] onNext : 2
12:10:47.328 [main] DEBUG com.example.demo.reactive.FluxTest - [Subscriber 2] onNext : 2
"all subscribers are ready, connecting" 문구가 먼저 나오고 나서 "new subscription for..."가 나중에 나오는 것을 보면 con.subscribe 시점이 아닌 conn.connect 시점에 구독 및 발행이 시작되는 것을 알 수 있다.
또한 하나의 stream 이지만 subscriber1과 2에게 모두 flux stream이 전달 되는 것을 볼 수 있다.
cacheTest
connectableFlux가 캐슁을 한 다는 것을 알게 되었다. 하지만 실제 캐쉬를 위해서 cache라는 메소드가 존재한다. cache 메소드를 따라 들어가면 connectableFlux를 사용하고 있다는 것을 알 수 있다.
cache 메소드는 데이터의 양과 expire time을 줄 수 있기 때문에 좀 더 자유롭게 사용 가능하다.
cache의 동작 상태를 확인해 보기 위해 아래와 같이 테스트를 해 볼 수있다.
@Test
public void cacheTest() throws InterruptedException {
Flux<Integer> source = Flux.range(0, 2)
.doOnSubscribe(s -> log.debug("new subscriptioin for the cold publisher"));
Flux<Integer> cachedSource = source.cache(Duration.ofSeconds(1));
cachedSource.subscribe(e -> log.debug("[S 1] onNext: {}", e));
cachedSource.subscribe(e -> log.debug("[S 2] onNext: {}", e));
Thread.sleep(1200);
cachedSource.subscribe(e -> log.debug("[S 3] onNext: {}", e));
}
cache에 1초의 expire time을 주고 subscribe를 2번 하고 1.2초 대기 후 3번째 subscribe를 하게 되면 아래와 같이 출력되게 된다.
13:54:09.452 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
13:54:09.480 [main] DEBUG com.example.demo.reactive.FluxTest - new subscriptioin for the cold publisher
13:54:09.480 [main] DEBUG com.example.demo.reactive.FluxTest - [S 1] onNext: 0
13:54:09.481 [main] DEBUG com.example.demo.reactive.FluxTest - [S 1] onNext: 1
13:54:09.482 [main] DEBUG com.example.demo.reactive.FluxTest - [S 2] onNext: 0
13:54:09.482 [main] DEBUG com.example.demo.reactive.FluxTest - [S 2] onNext: 1
13:54:10.682 [main] DEBUG com.example.demo.reactive.FluxTest - new subscriptioin for the cold publisher
13:54:10.682 [main] DEBUG com.example.demo.reactive.FluxTest - [S 3] onNext: 0
13:54:10.682 [main] DEBUG com.example.demo.reactive.FluxTest - [S 3] onNext: 1
하지만 sleep time을 900으로 변경 후 실행하게 되면 아래처럼 나타나게 된다.
14:14:15.081 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
14:14:15.115 [main] DEBUG com.example.demo.reactive.FluxTest - new subscriptioin for the cold publisher
14:14:15.116 [main] DEBUG com.example.demo.reactive.FluxTest - [S 1] onNext: 0
14:14:15.117 [main] DEBUG com.example.demo.reactive.FluxTest - [S 1] onNext: 1
14:14:15.117 [main] DEBUG com.example.demo.reactive.FluxTest - [S 2] onNext: 0
14:14:15.117 [main] DEBUG com.example.demo.reactive.FluxTest - [S 2] onNext: 1
14:14:16.018 [main] DEBUG com.example.demo.reactive.FluxTest - [S 3] onNext: 0
14:14:16.018 [main] DEBUG com.example.demo.reactive.FluxTest - [S 3] onNext: 1
S2 이후 0.8초 정도 이후에 실행이 되었지만 S3역시 caching 되어 있다는 것을 볼 수 있다.
transformTest
Reactive Stream을 중간에 변환하기 위해 사용할 수 있다. 관련된 소스코드를 일단 보고 알아보자.
@Test
public void transformTest() {
Function<Flux<String>, Flux<String>> logUserInfo = stream -> stream.index()
.doOnNext(tp -> log.debug("[{}] User: {}", tp.getT1(), tp.getT2()))
.map(Tuple2::getT2);
Flux.range(1000, 3)
.map(i -> "user-" + i)
.transform(logUserInfo)
.subscribe(e -> log.debug("onNext: {}", e));
}
logUserInfo 라는 메소드를 정의 한다. 들어온 stream을 index 하게 되면 각 원소에 인덱스 번호가 붙게 된다.
doOnNext 시점에 추가 된 내용을 찍어 보게 되면 index 값과 실제 넘어온 값을 볼 수 있게 된다.
그리고 나서 다시 원래 값을 돌려 주게 된다 (map(Tuple2::getT2)
실제 logUserInfo 라는 메소드가 잘 동작하는지 확인하기 위해 1000 ~ 1003 이라는 stream을 만들고 "user-" + i 형태로 변환 한 다음 transform을 이용하여 logUserInfo를 호출한다.
이렇게 하게 되면 "1000 > user-1000"으로 변환된 상태에서 logUserInfo에 들어가게 되며 안에서 index를 만들어 로깅한 후 다시 원래의 값만 돌려 주게 된다.
subscriber 입장에서는 원래의 값인 user-1000 값만 받기 때문에 로그에서는 인덱스 값 없이 출력하게 된다.
위의 소스를 실행 해 보면 아래와 같이 나오게 된다.
14:16:31.204 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
14:17:43.118 [main] DEBUG com.example.demo.reactive.FluxTest - [0] User: user-1000
14:17:43.120 [main] DEBUG com.example.demo.reactive.FluxTest - onNext: user-1000
14:17:43.120 [main] DEBUG com.example.demo.reactive.FluxTest - [1] User: user-1001
14:17:43.120 [main] DEBUG com.example.demo.reactive.FluxTest - onNext: user-1001
14:17:43.120 [main] DEBUG com.example.demo.reactive.FluxTest - [2] User: user-1002
14:17:43.120 [main] DEBUG com.example.demo.reactive.FluxTest - onNext: user-1002
이와 비슷한 역할을 하는 compose 라는 메소드도 있는데 찾아 보니 이 역시 3.3.11 에서 Deprecated 되었다.
projectreactor.io/docs/core/3.3.11.RELEASE/api/deprecated-list.html
publishOnTest
publishOn 메소드의 경우 별도의 쓰레드에서 작업을 하게 만든다. 이로 인해 1개의 싸이클이 짧아져서 빠르게 처리하는데 좋다. 다만 쓰레드가 달라지기 때문에 다음과 같은 경우 조심해서 사용해야 한다.
@Test
public void threadLocalTest() {
ThreadLocal<Map<Object, Object>> threadLocal = new ThreadLocal<>();
threadLocal.set(new HashMap<>());
Flux.range(0, 10)
.doOnNext(k ->threadLocal
.get()
.put(k, new Random(k).nextGaussian())
)
.publishOn(Schedulers.parallel())
.map(k -> threadLocal.get().get(k))
.blockLast();
}
@Test
public void subscriptionContextTest() {
Flux.range(0, 10)
.flatMap(k -> {
return Mono.subscriberContext().doOnNext(context -> {
log.debug("before k : {}", k);
Map<Object, Object> map = context.get("randoms");
map.put(k, new Random(k).nextGaussian());
}).thenReturn(k);
}
)
.publishOn(Schedulers.parallel())
.flatMap(k -> {
return Mono.subscriberContext()
.map(context -> {
log.debug("after k : {}", k);
Map<Object, Object> map = context.get("randoms");
return map.get(k);
});
}
)
.subscriberContext(context ->
context.put("randoms", new HashMap<>())
)
.blockLast();
}
threadLocalTest 메소드를 보면 ThreadLocal을 사용하여 값을 저장 했다가 publishOn 이후 threadLocal 값을 가져와 사용하는 것이다. (ThreadLocal 관련해서는 javacan.tistory.com/entry/ThreadLocalUsage 을 참고할 수 있다.)
이렇게 될 경우 publishOn에서 새로우 쓰레드를 생성해서 작업 하기 때문에 같은 쓰레드에서만 사용할 수 있는 ThreadLocal에서는 값을 가져올 수 없고 이로 인해 NPE가 생기게 된다.
java.lang.NullPointerException
at com.example.demo.reactive.FluxTest.lambda$threadLocalTest$56(FluxTest.java:336)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:113)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:440)
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:527)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Suppressed: java.lang.Exception: #block terminated with an error
at reactor.core.publisher.BlockingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99)
at reactor.core.publisher.Flux.blockLast(Flux.java:2510)
at com.example.demo.reactive.FluxTest.threadLocalTest(FluxTest.java:337)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:69)
at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:220)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:53)
이를 위해서 쓰레드에 상관없는 Mono.subscriptionContext를 사용할 수 있다.
다만 확인해 보니 테스트 중인 reactor-core 3.4.1 기준 deprecated 되어 있는 상태이다.
subscriptionContextTest 메소드를 천천히 분석해 보면 다음과 같다.
context에 randoms라는 키에 empty HashMap을 하나 만들어서 생성한다. (subscriberContext 부분)
이렇게 생성 된 context에 random gaussian 값을 해쉬에 담아 밀어 넣고, publishOn으로 새로운 쓰레드를 만든 다음, context에서 꺼내와서 처리하게 된다.
subscriptionContextTest 를 실행하게 되면 아래와 같이 출력되게 된다.
08:12:12.503 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
08:12:12.577 [main] DEBUG com.example.demo.reactive.FluxTest - before k : 0
08:12:12.579 [main] DEBUG com.example.demo.reactive.FluxTest - before k : 1
08:12:12.579 [main] DEBUG com.example.demo.reactive.FluxTest - before k : 2
08:12:12.579 [main] DEBUG com.example.demo.reactive.FluxTest - before k : 3
08:12:12.579 [main] DEBUG com.example.demo.reactive.FluxTest - before k : 4
08:12:12.579 [main] DEBUG com.example.demo.reactive.FluxTest - before k : 5
08:12:12.580 [main] DEBUG com.example.demo.reactive.FluxTest - before k : 6
08:12:12.580 [main] DEBUG com.example.demo.reactive.FluxTest - before k : 7
08:12:12.580 [main] DEBUG com.example.demo.reactive.FluxTest - before k : 8
08:12:12.580 [main] DEBUG com.example.demo.reactive.FluxTest - before k : 9
08:12:12.581 [parallel-1] DEBUG com.example.demo.reactive.FluxTest - after k : 0
08:12:12.581 [parallel-1] DEBUG com.example.demo.reactive.FluxTest - after k : 1
08:12:12.581 [parallel-1] DEBUG com.example.demo.reactive.FluxTest - after k : 2
08:12:12.581 [parallel-1] DEBUG com.example.demo.reactive.FluxTest - after k : 3
08:12:12.581 [parallel-1] DEBUG com.example.demo.reactive.FluxTest - after k : 4
08:12:12.581 [parallel-1] DEBUG com.example.demo.reactive.FluxTest - after k : 5
08:12:12.581 [parallel-1] DEBUG com.example.demo.reactive.FluxTest - after k : 6
08:12:12.582 [parallel-1] DEBUG com.example.demo.reactive.FluxTest - after k : 7
08:12:12.582 [parallel-1] DEBUG com.example.demo.reactive.FluxTest - after k : 8
08:12:12.582 [parallel-1] DEBUG com.example.demo.reactive.FluxTest - after k : 9
after log가 찍히는 시점부터 main이 아닌 parallel-1 이라는 이름의 쓰레드에서 실행 됨을 알 수 있다. 이 처럼 쓰레드가 달라짐으로 인해 ThreadLocal은 사용할 수 없다는 것을 알게 되었다.
다만 사용하게 된 subscriberContext가 deprecated 되었기에 다음과 같이 바꿔 사용할 수 있다.
@Test
public void deferContextualTest() {
String key = "message";
Mono<String> r = Mono.just("Hello")
.flatMap(s -> Mono.deferContextual(contextView -> {
log.debug("s : {}", s);
return Mono.just(s + " " + contextView.get(key));
}))
.contextWrite(context -> {
return context.put(key, "World");
});
StepVerifier.create(r)
.expectNext("Hello World")
.verifyComplete();
}
@Test
public void deferContextualTest2() {
Flux.range(0, 10)
.flatMap(i -> {
return Mono.deferContextual(contextView -> {
Map<Object, Object> map = contextView.get("randoms");
map.put(i, new Random(i).nextGaussian());
log.debug("before i : {}", i);
return Mono.just(i);
});
})
.publishOn(Schedulers.parallel())
.flatMap(i -> {
log.debug("after i : {}", i);
return Mono.just(i);
})
.contextWrite(context -> {
return context.put("randoms", new HashMap<>());
})
// .subscribe(s -> log.debug("s : {}", s))
.blockLast();
}
deferContextual를 사용하면 된다. 간단한 동작 방법은 deferContextualTest를 참고해 보면 된다.
contextWrite 에서 기본값을 만들어 둔 후 deferContextual에서 context를 가져와 사용하게 된다.
deferContextualTest2는 subscriptionContextTest를 deprecated 되지 않은 메소드를 사용하여 재구성한 것인데 contextWrite 및 Mono.deferContextual를 사용하면 된다. deferContextualTest2를실행하게되면아래와같이출력되게된다.
08:18:20.631 [main] DEBUG reactor.util.Loggers - Using Slf4j logging framework
08:18:20.692 [main] DEBUG com.example.demo.reactive.FluxTest - before i : 0
08:18:20.694 [main] DEBUG com.example.demo.reactive.FluxTest - before i : 1
08:18:20.694 [parallel-1] DEBUG com.example.demo.reactive.FluxTest - after i : 0
08:18:20.694 [main] DEBUG com.example.demo.reactive.FluxTest - before i : 2
08:18:20.694 [parallel-1] DEBUG com.example.demo.reactive.FluxTest - after i : 1
08:18:20.694 [main] DEBUG com.example.demo.reactive.FluxTest - before i : 3
08:18:20.694 [parallel-1] DEBUG com.example.demo.reactive.FluxTest - after i : 2
08:18:20.694 [parallel-1] DEBUG com.example.demo.reactive.FluxTest - after i : 3
08:18:20.694 [main] DEBUG com.example.demo.reactive.FluxTest - before i : 4
08:18:20.694 [main] DEBUG com.example.demo.reactive.FluxTest - before i : 5
08:18:20.694 [parallel-1] DEBUG com.example.demo.reactive.FluxTest - after i : 4
08:18:20.694 [parallel-1] DEBUG com.example.demo.reactive.FluxTest - after i : 5
08:18:20.694 [main] DEBUG com.example.demo.reactive.FluxTest - before i : 6
08:18:20.694 [parallel-1] DEBUG com.example.demo.reactive.FluxTest - after i : 6
08:18:20.694 [main] DEBUG com.example.demo.reactive.FluxTest - before i : 7
08:18:20.695 [main] DEBUG com.example.demo.reactive.FluxTest - before i : 8
08:18:20.695 [parallel-1] DEBUG com.example.demo.reactive.FluxTest - after i : 7
08:18:20.695 [parallel-1] DEBUG com.example.demo.reactive.FluxTest - after i : 8
08:18:20.695 [main] DEBUG com.example.demo.reactive.FluxTest - before i : 9
08:18:20.695 [parallel-1] DEBUG com.example.demo.reactive.FluxTest - after i : 9
subscriptionContextTest 와 달리 before가 끝난 후 after가 실행 되는 것이 아니라 말 그대로 stream 처럼 동작함을 볼 수 있다.
그리고 여기서도 publishOn으로 인해 before는 main에서 시작하지만 after는 parallel-1 쓰레드에서 동작함을 볼 수 있다.
참고로 context에 key를 부여하고 그 안에 Map으로 값을 담은 것은 context 자체가 불변객체이기 때문이다. 따라서 context.put('a', value), context.put('b', value) 식으로 하게 될 경우 서로 다른 객체가 되게 된다.
아래는 put을 구현한 소스이다.
final class Context0 implements CoreContext {
static final Context0 INSTANCE = new Context0();
@Override
public Context put(Object key, Object value) {
Objects.requireNonNull(key, "key");
Objects.requireNonNull(value, "value");
return new Context1(key, value);
}
...
}
final class Context1 implements CoreContext {
final Object key;
final Object value;
Context1(Object key, Object value) {
this.key = Objects.requireNonNull(key, "key");
this.value = Objects.requireNonNull(value, "value");
}
@Override
public Context put(Object key, Object value) {
Objects.requireNonNull(key, "key");
Objects.requireNonNull(value, "value");
if(this.key.equals(key)){
return new Context1(key, value);
}
return new Context2(this.key, this.value, key, value);
}
...
}
put을 구현한 소스에 보면 new를 해서 반환을 하기 때문에 기존 객체를 재활용하지 않게 된다.
context 클래스를 0 ~ 5까지 구현하고 ContextN 클래스까지 만든 이유는 명확히 알 수 없다. 관련 내용은 확인해 봐야 할 것으로 보인다.
'Java > Reactive' 카테고리의 다른 글
간단한 Flux method 테스트 (Reactive WebFlux) 3 (0) | 2021.04.12 |
---|---|
Flux Test 중 희한한 현상 (0) | 2021.04.05 |
간단한 Flux method 테스트 (Reactive WebFlux) (0) | 2021.03.26 |
Reative 관련 북마크 (0) | 2021.01.27 |