-Операции создания
-Комбинированные операции
-Операции трансформации
-Логические операции
Как бы ни было интересно смотреть на каждую из 500 операций, чтобы увидеть, как они работают, в этой главе просто не хватит места. В этом разделе я выбрал несколько наиболее полезных операций для экспериментов. Начнем с операций создания.
ПРИМЕЧАНИЕ
Где примеры Mono? Mono и Flux используют много одинаковых операций, поэтому в большинстве случаев нет необходимости показывать одну и ту же операцию дважды, один раз для Mono и еще раз для Flux. Более того, хотя операции Mono полезны, на них немного менее интересно смотреть, чем на те же самые операции, когда предоставляется Flux. Большинство примеров, с которыми мы будем работать, будут связаны с Flux. Просто знайте, что Mono обычно имеет эквивалентные операции.
10.3.1 Реактивные типы создания
Часто при работе с реактивными типами в Spring вы получаете Flux или Mono из репозитория или службы, поэтому вам не нужно создавать их самостоятельно. Но иногда вам нужно будет создать нового реактивного издателя.
Reactor предоставляет несколько операций для создания Flux и Mono. В этом разделе мы рассмотрим некоторые из наиболее полезных операций создания.
СОЗДАНИЕ ИЗ ОБЪЕКТОВ
Если у вас есть один или несколько объектов, из которых вы хотите создать Flux или Mono, вы можете использовать статический метод just() в Flux или Mono для создания реактивного типа, данные которого управляются этими объектами. Например, следующий метод тестирования создает Flux из пяти объектов String:
@Test
public void createAFlux_just() {
Flux
.just("Apple", "Orange", "Grape", "Banana", "Strawberry");
}
На данный момент Flux создан, но у него нет подписчиков. Без подписчиков данные не будут передаваться. Думая об аналогии с садовым шлангом, вы прикрепили садовый шланг к патрубку, и с другой стороны есть вода из коммунальной компании - но пока вы не включите патрубок, вода не будет течь. Подписка на реактивный тип - это то, как вы включаете поток данных.
Чтобы добавить подписчика, вы можете вызвать метод subscribe() в Flux:
fruitFlux.subscribe(
f -> System.out.println("Here's some fruit: " + f)
);
Здесь лямбда, заданная для subscribe(), на самом деле является java.util.Consumer, который используется для создания Reactive Streams Subscriber. При вызове subscribe() данные начинают передаваться. В этом примере промежуточных операций нет, поэтому данные передаются напрямую от Flux к Subscriber.
Печать записей из Flux или Mono на консоль - это хороший способ увидеть реактивный тип в действии. Но лучший способ на самом деле протестировать Flux или Mono - использовать StepVerifier от Reactor. С учетом Flux или Mono, StepVerifier подписывается на реактивный тип и затем применяет утверждения к данным, когда они проходят через поток, в конце концов проверяя, что поток завершается должным образом.
Например, чтобы проверить, что предписанные данные проходят через fruitFlux, вы можете написать тест, который выглядит следующим образом:
StepVerifier.create(fruitFlux)
.expectNext("Apple")
.expectNext("Orange")
.expectNext("Grape")
.expectNext("Banana")
.expectNext("Strawberry")
.verifyComplete();
В этом случае StepVerifier подписывается на Flux и затем утверждает, что каждый элемент соответствует ожидаемому названию плода. Наконец, он проверяет, что после того, как клубника произведена Flux, Flux завершается.
В оставшейся части примеров в этой главе вы будете использовать StepVerifier для написания обучающих тестов - тестов, которые проверяют поведение и помогают понять, как что-то работает, - чтобы узнать некоторые из наиболее полезных операций Reactor.
СОЗДАНИЕ ИЗ КОЛЛЕКЦИЙ
Flux также может быть создан из массива, Iterable или Java Stream. Рисунок 10.3 иллюстрирует, как это работает с мраморной диаграммой.
Рисунок 10.3. Поток может быть создан из массива, Iterable или Stream.
Чтобы создать Flux из массива, вызовите статический метод fromArray(), передав в исходный массив:
@Test
public void createAFlux_fromArray() {
String[] fruits = new String[] {
"Apple", "Orange", "Grape", "Banana", "Strawberry" };
Flux
StepVerifier.create(fruitFlux)
.expectNext("Apple")
.expectNext("Orange")
.expectNext("Grape")
.expectNext("Banana")
.expectNext("Strawberry")
.verifyComplete();
}
Поскольку исходный массив содержит те же имена фруктов, которые вы использовали при создании Flux из списка объектов, данные, передаваемые Flux, будут иметь те же значения. Таким образом, вы можете использовать тот же StepVerifier, что и раньше, чтобы проверить этот Flux.
Если вам нужно создать Flux из java.util.List, java.util.Set или любой другой реализации java.lang.Iterable, вы можете передать его в статический метод fromIterable():
@Test
public void createAFlux_fromIterable() {
List
fruitList.add("Apple");
fruitList.add("Orange");
fruitList.add("Grape");
fruitList.add("Banana");
fruitList.add("Strawberry");
Flux
// ... проверить шаги
}
Или, если у вас есть Java Stream, который вы хотели бы использовать в качестве источника для Flux, fromStream() - это метод, который вы будете использовать:
@Test
public void createAFlux_fromStream() {
Stream
Stream.of("Apple", "Orange", "Grape", "Banana", "Strawberry");
Flux
// ... проверить шаги
}
Опять же, тот же StepVerifier, что и раньше, можно использовать для проверки данных, опубликованных Flux.
ГЕНЕРИРОВАНИЕ FLUX ДАННЫХ
Иногда у вас нет данных для работы, и вам просто нужно, чтобы Flux действовал как счетчик, отдавая число, которое увеличивается с каждым новым значением. Для создания счетчика Flux вы можете использовать статический метод range(). Диаграмма на рисунке 10.4 иллюстрирует, как работает range().
Рисунок 10.4 Создание Flux из диапазона приводит к публикации сообщений в противоположном стиле.
Следующий метод тестирования демонстрирует, как создать Flux диапазон:
@Test
public void createAFlux_range() {
Flux
Flux.range(1, 5);
StepVerifier.create(intervalFlux)
.expectNext(1)
.expectNext(2)
.expectNext(3)
.expectNext(4)
.expectNext(5)
.verifyComplete();
}
В этом примере диапазон Flux создается с начальным значением 1 и конечным значением 5. StepVerifier доказывает, что он опубликует пять элементов, которые являются целыми числами от 1 до 5.
Еще один метод создания Flux, похожий на range(), представляет собой interval(). Как и метод range(), interval() создает поток, который возвращает увеличивающееся значение. Но то, что делает interval() особенным, заключается в том, что вместо того, чтобы указывать начальное и конечное значение, вы указываете длительность или как часто значение должно возвращаться. На рисунке 10.5 показана мраморная диаграмма для метода создания interval().
Рисунок 10.5 Flux, созданный из интервала, имеет периодическую запись, опубликованную в нем. (A Flux created from an interval has a periodic entry published to it.)
Например, чтобы создать поток интервалов, который выдает значение каждую секунду, можно использовать статический метод interval() следующим образом:
@Test
public void createAFlux_interval() {
Flux
Flux.interval(Duration.ofSeconds(1))
.take(5);
StepVerifier.create(intervalFlux)
.expectNext(0L)
.expectNext(1L)
.expectNext(2L)
.expectNext(3L)
.expectNext(4L)
.verifyComplete();
}
Обратите внимание,что значение, возвращаемое потоком интервалов, начинается с 0 и увеличивается на каждом последующем элементе. Кроме того, поскольку interval() не имеет максимального значения, он потенциально будет работать вечно. Поэтому вы также используете операцию take(), чтобы ограничить результаты первыми пятью записями. Подробнее об операции take() мы поговорим в следующем разделе.
10.3.2 Комбинирование реактивных типов
Однажды может возникнуть задача когда Вам придется работать с двумя реактивными типами, которые вам нужно как-то объединить. Или, в других случаях, вам может потребоваться разделить Flux на несколько реактивных типов. В этом разделе мы рассмотрим операции, которые объединяют и разделяют Flux и Mono в Reactor.
СЛИЯНИЕ РЕАКТИВНЫХ ТИПОВ
Предположим, у вас есть два потока потока и нужно создать один результирующий поток, который будет производить данные, как только он станет доступен из любого из вышерасположенных Flux streams. Чтобы объединить один поток с другим, можно использовать операцию mergeWith(), как показано на marble диаграмме на рис.10.6.
Рисунок 10.6 слияние двух Flux потоков чередя их сообщения в новом Flux.
Например, предположим, что у вас есть Flux, значения которого являются именами телевизионных и киношных персонажей, и у вас есть второй Flux, значениями которого являются названия продуктов, которые эти персонажи любят есть. Следующий тестовый метод показывает, как можно объединить два объекта Flux с помощью метода mergeWith():
@Test
public void mergeFluxes() {
Flux