Overview
downstream(Subscriber)에서 upstream(Publisher)으로 구독을 하고나면, Subscriber는 Subscription을 통해 Publisher로 데이터를 요청하게 됩니다. 종종 이 요청을 커스터마이징할 필요가 있는데, 가장 쉬운 방법으로는 BaseSubscriber를 이용하는 방법이 있습니다.
Reactor에서 배압(backpressure)처리는 request를 upstream으로 요청하는 것과 같습니다. 배압을 사용하지 않는 방식의 연산자들을 추가적으로 살펴보고 마지막으로 인입된 요청을 변경 시킬 수 있는 연산자들을 살펴보겠습니다.
BaseSubscriber
BaseSubscriber는 추상클래스로 보통 이를 확장한 하위 Subscriber를 구현하여 사용합니다. subscribe 메서드에 입력 파라미터로 익명의 람다코드가 아닌 BaseSubscriber구현체를 사용합니다.
BaseSubscriber 구현체들은 single-use 특성을 가집니다., 이 뜻은 한 BaseSubscriber가 기존에 구독하던 기존 Publisher가 아닌 새로운 Publisher를 구독하게 된다면, 기존 Publisher의 구독을 취소한다는 의미입니다. 왜냐하면 한 Subscriber가 두 Publisher를 사용하게 되면 Reactive Streams의 룰을 위반하게 되기 때문입니다. (Subscriber의 onNext 메서드는 병렬로 호출되면 안된다) 이것이 BaseSubscriber가 single-use 특성을 가지는 이유입니다.
package io.projectreactor.samples;
import org.reactivestreams.Subscription;
import reactor.core.publisher.BaseSubscriber;
public class SampleSubscriber<T> extends BaseSubscriber<T> {
public void hookOnSubscribe(Subscription subscription) {
System.out.println("Subscribed");
request(1);
}
public void hookOnNext(T value) {
System.out.println(value);
request(1);
}
public static void main() {
SampleSubscriber<Integer> ss = new SampleSubscriber<Integer>();
Flux<Integer> ints = Flux.range(1, 4);
ints.subscribe(ss);
}
}
위 코드는 BaseSubscriber를 확장한 SampleSubscriber입니다. 그리고 main()에는 ints Flux에 Subscriber를 부착하고 있습니다. hookOnSubscribe와 hookOnNext를 보자면, hookOnSubscribe는 표준 출력을 한 번하고 첫 번재 요청을 보냅니다. hookOnNext는 받아온 value를 노출하고 추가 요청을 한번에 한 요청씩 보냅니다. 예제에서는 요청을 하나씩 보냈지만 상황에 따라서 커스마이징이 가능합니다.
SampleSubscriber 클래스는 Subscriber의 행동을 재정의할 수 있도록 hooks 기능을 제공하고 있습니다. 기본적인 행동은 subscribe()와 완전히 동일하며, 이렇듯 BaseSubscriber를 확장하면 요청량(Request Amount; request(N))을 커스터마이징하는데 유용합니다.
Flux.range(1, 10)
.doOnRequest(r -> System.out.println("request of " + r))
.subscribe(new BaseSubscriber<Integer>() {
@Override
public void hookOnSubscribe(Subscription subscription) {
request(1);
}
@Override
public void hookOnNext(Integer integer) {
System.out.println("Cancelling after having received " + integer);
cancel();
}
});
위 코드에서 hookOnSubscribe에서 첫 reuqst(1)을 요청하고 hookOnNext에서 값을 바고 출력한 뒤에 취소 시그널을 보냅니다. 한 가지 조심해야 할 것은 hookOnSubscribe를 재정의 할때, 요청을 적어도 반드시 한 번은 해야 합니다. 그렇지 않으면 Flux는 아무 동작없이 멈춰버릴 수 있습니다. 그렇기 때문에 BaseSubscriber의 hookOnSubscribe 기본 설정이 무제한 요청으로 되어 있는 이유이기도 합니다.
추가적으로 보았던 BaseSubscriber는 onNext, onSubscribe 뿐만아니라 다른 상태 또는 상황에서 쓰이는 메서드를 커스터마이징할 수 있습니다. (hookOnComplete, hookOnError, hookOnCancel, 그리고 hookFinally)
On Backpressure and Ways to Reshape Requests
Reactor에서 배압(backpressure) 전파방식은 간단하게 request를 upstream으로 보내는 방식으로 동작합니다. request에 입력하는 수요/요청 값은 최대 Long.MAX_VALUE까지 가능하고 사실상 무제한 요청이면서 배압을 사용하지 않겠다는 의미입니다.
아래는 기본적으로 무제한 요청(Request(Long.MAX_VALUE))을 보내는 연산자입니다.
-
subscribe()와 대부분 람다기반 subscribe() 바리에이션 (Consumer<Subscription>을 제외)
-
block(), blockFirst(), blockLast()
-
toIterable() 또는 toStream()
Operators that Change the Demand from Downstream
downstream에서 upstream에 대한 요청은 연산자 체인을 걸치면서 변경될 수 있습니다.
Flux.buffer(int maxSize) - 생성되는 데이터를 maxSize 크기의 list buffer로 모아 반환합니다. 즉 반환 값인 Flux<List<T>> 의 각 List는 최대 크기는 maxSize입니다. 예를 들어, buffer(N)으로 설정된 Flux에 request(2)를 요청하면, 내부적으로 요청을 request(2 * N)으로 변경합니다. 즉 buffer(N)에게 request(2)는 두 개의 full list buffer를 요청한 것과 같습니다.
flatMap과 같은 일부 연산자는 prefetch라 불리는 int형 input parameter를 받는 경우가 있습니다. 보통 내부 sequence를 처리하고 들어오는 각 요소로 새로운 Publisher를 만드는 연산자들이 해당합니다. 이 연산자들은 downstream에서 최초로 요청을 보낼때 그 요청의 크기를 지정할 수 있도록 합니다. 그리고 이런 prefetch를 지원하는 연산자들은 보통 replenishing optimization도 함께 구현하고 있습니다.
Prefetch - subscribe이후로 최초의 request(Subscription.request(long))를 Publisher에게 보낼때 그 크기를 말합니다. 설정하지 않는다면 대부분 32개의 request를 요청하는게 기본입니다.
Replenshing Optimization(보충 최적화) - downstream에서 prefetch(초기 요청) 크기의 75%만큼 작업이 성공하면, 그 만큼 자동적으로 upstream에 추가 요청을 보냅니다. 즉 작업에 필요한 전체 원소를 한번에 일괄 처리하지 않고, 작은 작업을 처리하고 보충하는 식으로 처리합니다. 이러한 동작방식은 일부라도 빠른 결과가 노출되어야 할때 유용합니다. (예를 들어, Paging시 제일 앞 page가 빠르게 노출되어야 할때)
limitRate, limitRequest
마지막으로, request를 직접 튜닝할 수 있게 해주는 limitRate와 limitRequest 연산자가 있습니다.
Flux.limitRate(int prefetchRate) - subscribers 요청(배압)을 최대 prefetchRate만큼 제한합니다. 이 연산자는 prefetch-and-replenish strategy을 따르기 때문에, 실제로는 최초의 요청을 제외하고는 prefetchRate * 75% 만큼의 보충 요청을 통해 전파가 일어 납니다. 예를 들어, limitRate(10)에 100개의 요청이 들어오는 경우를 가정합시다. 그러면 처음에는 10개의 요청을 처리하고, 나머지는 90개의 요청은 8개씩 다발로 묶여 12번으로 처리됩니다. (10 * 75/100 ≒ 8)
Flux.limitRate(int highTide, int lowTide) - subscribers의 요청(배압)이 초기 요청은 최대 highTide(initial request amount) 만큼 제한하고, 나머지 보충요청은 lowTide만큼 제한합니다. 이 연산자도 prefetch-and-replenish strategy을 따르지만 보충량(lowTide)를 고칠 수 있습니다. 예를 들어, 14개의 요청이 Flux에 처음 인입된 상황이고 highTide 10, lowTide 2인 경우를 가정해봅시다. 14개의 요청 중 초기 요청으로 10개가 처리가 되고 나머지 4개는 lowTide로 만큼 2개씩 두번에 처리가 됩니다. lowTide가 8인 경우라면, (10 * 75/100 ≒ 8) 초기 요청을 제외한 나머지 요청은 lowTide인 8로 묶여 한번에 처리하게 됩니다. lowTide가 0이라면, 항상 highTide으로 일정하게 묶어 요청을 처리하게 됩니다.
limitRequest(long requestCap) - downstream의 요구를 최대 총 요구인 N만큼 제한합니다. 이 연산자는 N개까지 request를 추가합니다. 만약 단일 request가 N개를 넘어서 최대 총 요구를 넘지 않는다면, 이 특정 요청은 upstream으로 완전히 전파됩니다. N개가 방출되고 나면, limitRequest 연산자는 해당 sequence가 완료된 것으로 취급합니다. 그리곤 onComplete 시그널을 downstream에 보내고 sequence를 취소하게 됩니다.
References
'IT > Spring WebFlux' 카테고리의 다른 글
Create, Subscribe, Disposable (0) | 2020.12.21 |
---|---|
Reactor Core - Mono & Flux (0) | 2020.12.14 |
Project Reactor (0) | 2020.12.13 |
Event-Driven & WebFlux (0) | 2020.11.10 |
Back Pressure (0) | 2020.11.07 |