В этом случае String элементы Flux помещаются в новые Flux коллекции List, содержащие не более трех элементов в каждой. Следовательно, исходный Flux, который передает пять значений String, будет преобразован в Flux, который передает две коллекции List, одна из которых содержит три фрукта, а другая - два фрукта.
И что? Буферизация значений из реактивного потока в нереактивные коллекции List представляется контрпродуктивной. Но когда вы комбинируете buffer() с flatMap(), это позволяет параллельно обрабатывать каждую из коллекций List:
Flux.just(
"apple", "orange", "banana", "kiwi", "strawberry")
.buffer(3)
.flatMap(x ->
Flux.fromIterable(x)
.map(y -> y.toUpperCase())
.subscribeOn(Schedulers.parallel())
.log()
).subscribe();
В этом новом примере вы по-прежнему буферизуете Flux из пяти String значений в Flux коллекций List. Но затем вы применяете flatMap() к этому Flux коллекций List. Это берет каждый List буфер и создает новый Flux из его элементов, а затем применяет к нему операцию map(). Следовательно, каждый буферный List дополнительно обрабатывается параллельно в отдельных потоках.
Чтобы доказать, что это работает, я также включил операцию log() для применения к каждому под-Flux-у. Операция log() просто регистрирует все события Reactive Streams, чтобы вы могли видеть, что на самом деле происходит. В результате в log записываются следующие записи (для краткости удален компонент времени):
[main] INFO reactor.Flux.SubscribeOn.1 -
onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)
[main] INFO reactor.Flux.SubscribeOn.1 - request(32)
[main] INFO reactor.Flux.SubscribeOn.2 -
onSubscribe(FluxSubscribeOn.SubscribeOnSubscriber)
[main] INFO reactor.Flux.SubscribeOn.2 - request(32)
[parallel-1] INFO reactor.Flux.SubscribeOn.1 - onNext(APPLE)
[parallel-2] INFO reactor.Flux.SubscribeOn.2 - onNext(KIWI)
[parallel-1] INFO reactor.Flux.SubscribeOn.1 - onNext(ORANGE)
[parallel-2] INFO reactor.Flux.SubscribeOn.2 - onNext(STRAWBERRY)
[parallel-1] INFO reactor.Flux.SubscribeOn.1 - onNext(BANANA)
[parallel-1] INFO reactor.Flux.SubscribeOn.1 - onComplete()
[parallel-2] INFO reactor.Flux.SubscribeOn.2 - onComplete()
Как ясно видно из записей журнала, фрукты в первом буфере (apple, orange и banana) обрабатываются в потоке parallel-1. Между тем, фрукты во втором буфере (kiwi и strawberry) обрабатываются в parallel-2. Как видно из того факта, что записи журнала из каждого буфера сплетены вместе, два буфера обрабатываются параллельно.
Если по какой-то причине вам нужно собрать все, что Flux генерирует в List, вы можете вызвать buffer() без аргументов:
Flux> bufferedFlux = fruitFlux.buffer();
В результате создается новый Flux, который генерирует List, содержащий все элементы, опубликованные исходным Flux. Вы можете добиться того же самого с помощью операции collectList(), показанной на диаграмме на рисунке 10.19.
Рисунок 10.19 В результате операции сбора списка получается Mono, содержащий список всех сообщений, отправляемых входящим Flux.
Вместо того чтобы создавать Flux, который публикует List, collectList() создает Mono, который публикует List. Следующий метод тестирования показывает, как это можно использовать:
@Test
public void collectList() {
Flux
"apple", "orange", "banana", "kiwi", "strawberry");
Mono> fruitListMono = fruitFlux.collectList();
StepVerifier
.create(fruitListMono)
.expectNext(Arrays.asList(
"apple", "orange", "banana", "kiwi", "strawberry"))
.verifyComplete();
}
Еще более интересный способ сбора элементов, возвращаемых Flux, - это собирать их в Map. Как показано на рисунке 10.20, операция collectMap() приводит к Mono, который публикует Map, заполненную записями, ключ которых рассчитывается данной функцией.
Рис. 10.20. Операция collectMap приводит к получению Mono, содержащему Map сообщений, передаваемым входящим Flux, где ключ выводится из некоторой характеристики входящих сообщений.
Чтобы увидеть collectMap() в действии, взгляните на следующий метод тестирования:
@Test
public void collectMap() {
Flux
"aardvark", "elephant", "koala", "eagle", "kangaroo");
Mono
animalFlux.collectMap(a -> a.charAt(0));
StepVerifier
.create(animalMapMono)
.expectNextMatches(map -> {
return
map.size() == 3 &&
map.get('a').equals("aardvark") &&
map.get('e').equals("eagle") &&
map.get('k').equals("kangaroo");
})
.verifyComplete();
}
Источник Flux испускает имена нескольких животных. К этому Flux вы применяете collectMap() для создания нового Mono, который создает Map, где значение ключа определяется первой буквой имени животного, а значение-само имя животного. В случае, если два названия животных начинаются с одной и той же буквы (например, elephant и eagle или koala и kangaroo), последняя запись, проходящая через поток, переопределяет все предыдущие записи.
10.3.4 Выполнение логических операций над реактивными типами
Иногда вам просто нужно знать, соответствуют ли записи, опубликованные Mono или Flux, некоторым критериям. Операции all() и any() выполняют такую логику. Рисунки 10.21 и 10.22 иллюстрируют, как работают all() и any().
Рисунок 10.21 Поток может быть проверен, чтобы убедиться, что все сообщения удовлетворяют некоторому условию в операции all.
Рисунок 10.22 поток может быть проверен, что по крайней мере одно сообщение удовлетворяет некоторому условию any операции.
Предположим, вы хотите знать, что каждая строка, публикуемая Flux, содержит букву a или букву k. Следующий тест показывает, как использовать all() для проверки этого условия:
@Test
public void all() {
Flux
"aardvark", "elephant", "koala", "eagle", "kangaroo");
Mono
StepVerifier.create(hasAMono)
.expectNext(true)
.verifyComplete();
Mono
StepVerifier.create(hasKMono)
.expectNext(false)
.verifyComplete();
}
В первом StepVerifier, проверяется наличие буквы a. Операция all применяется к исходному Flux, в результате чего получается Mono типа Boolean. В этом случае все названия животных содержат букву а, поэтому Mono будет содержать true. Но на втором этапе проверки результирующий Mono будет выдавать false, потому что не все имена животных содержат k.
Вместо того, чтобы выполнять проверку "все или ничего", Возможно, будет достаточно, если хотя бы одна запись соответствует условиям. В этом случае операция any() - это то, что вы хотите. Этот новый тестовый случай использует any() для проверки букв t и z:
@Test
public void any() {
Flux
"aardvark", "elephant", "koala", "eagle", "kangaroo");
Mono
StepVerifier.create(hasAMono)
.expectNext(true)
.verifyComplete();
Mono
StepVerifier.create(hasZMono)
.expectNext(false)
.verifyComplete();
}
В первом StepVerifier вы видите, что полученный Mono возвращает true, потому что по крайней мере одно имя животного имеет букву t (в частности, elephant). Во втором случае полученное Mono возвращает false, потому что ни одно из имен животных не содержит z.
ИТОГО:
-Реактивное программирование включает в себя создание конвейеров, по которым передаются данные.
-Спецификация Reactive Streams определяет четыре типа: «Издатель» (Publisher), «Подписчик» (Subscriber), «Подписка» (Subscription) и «Трансформер» (Transformer) (который является комбинацией Publisher и Subscriber).
-Проект Reactor реализует Reactive Streams и абстрагирует определения потоков в два основных типа, Flux и Mono, каждый из которых предлагает несколько сотен операций.
-Spring 5 использует Reactor для создания реактивных контроллеров, репозиториев, REST клиентов и другой поддержки реактивной платформы.
Spring in Action Covers Spring 5.0 перевод на русский. Глава 11
Глава 11. Разработка реактивных API
Эта глава охватывает:
-Использование Spring WebFlux
-Написание и тестирование реактивных контроллеров и клиентов
-Использование REST API
-Защита реактивных веб-приложений
Теперь, когда вы хорошо познакомились с реактивным программированием и Project Reactor, вы готовы начать применять эти методы в своих Spring приложениях. В этой главе мы собираемся вернуться к некоторым контроллерам, которые вы написали в главе 6, чтобы воспользоваться преимуществами модели реактивного программирования Spring 5.
Более конкретно, мы собираемся взглянуть на новую реактивную веб-инфраструктуру Spring 5 - Spring WebFlux. Как вы вскоре поймете, Spring WebFlux удивительно похож на Spring MVC, что делает его простым в применении, наряду с тем, что вы уже знаете о создании REST API в Spring.
11.1 Работа с Spring WebFlux
Типичные веб-фреймворки на основе сервлетов, такие как Spring MVC, являются блокирующими и многопоточными по своей природе, используя один поток на соединение. Когда запросы обрабатываются, рабочий поток извлекается из пула потоков для обработки запроса. Тем временем поток запросов блокируется, пока рабочий поток не уведомит его о завершении.