Spring in Action Covers Spring 5-1--11 — страница 55 из 63

Рис. 10.13. Альтернативная форма операции take передает сообщения в результирующий поток до тех пор, пока не пройдет некоторое время.

В следующем методе тестирования используется альтернативная форма take() для отправки максимально возможного количества элементов в первые 3,5 секунды после подписки:

@Test

public void take() {

   Flux nationalParkFlux = Flux.just(

         "Yellowstone", "Yosemite", "Grand Canyon",

         "Zion", "Grand Teton")

      .delayElements(Duration.ofSeconds(1))

      .take(Duration.ofMillis(3500));

   StepVerifier.create(nationalParkFlux)

      .expectNext("Yellowstone", "Yosemite", "Grand Canyon")

      .verifyComplete();

}

Операции skip() и take() можно рассматривать как операции фильтрации, где критерии фильтра основаны на количестве или длительности. Для более общей фильтрации значений Flux вы найдете операцию filter() весьма полезной.

При наличии предиката, который решает, будет ли элемент проходить через поток или нет, операция filter() позволяет выборочно публиковать на основе любых критериев, которые вы хотите. Мраморная диаграмма на рисунке 10.14 показывает, как работает filter().

Рисунок 10.14. Входящий Flux может быть отфильтрован так, что полученный Flux получает только сообщения, которые соответствуют заданному предикату.

Чтобы увидеть filter() в действии, рассмотрите следующий метод тестирования:

@Test

public void filter() {

   Flux nationalParkFlux = Flux.just(

         "Yellowstone", "Yosemite", "Grand Canyon",

         "Zion", "Grand Teton")

      .filter(np -> !np.contains(" "));

   StepVerifier.create(nationalParkFlux)

      .expectNext("Yellowstone", "Yosemite", "Zion")

      .verifyComplete();

}

Здесь filter() задается предикатом в виде лямбды, которая принимает только String значения без пробелов. Следовательно, «Grand Canyon» и «Grand Teton» отфильтровываются из итогового Flux.

Возможно, вам нужна фильтрация для всех предметов, которые вы уже получили. Операция Different(), как показано на рисунке 10.15, приводит к тому, что Flux публикует только элементы из исходного потока, которые еще не были опубликованы.

Рисунок 10.15. Операция distinct отфильтровывает любые повторяющиеся сообщения.

В следующем тесте только уникальные String значения будут излучаться из Flux:

@Test

public void distinct() {

   Flux animalFlux = Flux.just(

         "dog", "cat", "bird", "dog", "bird", "anteater")

      .distinct();

   StepVerifier.create(animalFlux)

      .expectNext("dog", "cat", "bird", "anteater")

      .verifyComplete();

}

Хотя «dog» и «bird» публикуются дважды из исходного потока, отдельный поток публикует их только один раз.

МАППИНГ РЕАКТИВНЫХ ДАННЫХ

Одной из наиболее распространенных операций, которые вы будете использовать в Flux или Моно, является преобразование опубликованных элементов в какую-либо другую форму или тип. Типы Reactor предлагают операции map() и flatMap() для этой цели.

Операция map() создает Flux, который просто выполняет преобразование, как предписано данной функцией, для каждого объекта, который он получает до его повторной публикации. На рисунке 10.16 показано, как работает операция map().

Рисунок 10.16. Операция map выполняет преобразование входящих сообщений в новые сообщения в результирующем потоке.

В следующем методе тестирования Flux String значения, представляющие баскетболистов, сопоставляются с новы Flux объектами Player:

@Test

public void map() {

   Flux playerFlux = Flux

      .just("Michael Jordan", "Scottie Pippen", "Steve Kerr")

      .map(n -> {

         String[] split = n.split("\\s");

         return new Player(split[0], split[1]);

      });

   StepVerifier.create(playerFlux)

      .expectNext(new Player("Michael", "Jordan"))

      .expectNext(new Player("Scottie", "Pippen"))

      .expectNext(new Player("Steve", "Kerr"))

      .verifyComplete();

}

Функция, заданная для map() (как лямбда), разбивает входящую String по пробелу и использует полученный массив String-ов для создания объекта Player. Хотя поток, созданный с помощью just(), переносил объекты String, Flux, полученный из map(), переносит объекты Player.

Что важно понимать в map(), так это то, что сопоставление выполняется синхронно, так как каждый элемент публикуется исходным Flux. Если вы хотите выполнить сопоставление асинхронно, вы должны рассмотреть операцию flatMap().

Операция flatMap() требует некоторой мысли и практики, чтобы овладеть всеми навыками. Как показано на рисунке 10.17, вместо простого сопоставления одного объекта другому, как в случае map(), flatMap() сопоставляет каждый объект новому Mono или Flux. Результаты Mono или Flux сведены в новый результирующий Flux. Когда используется вместе с subscribeOn(), flatMap() может раскрыть асинхронную мощь типов Reactor.

Рис. 10.17. Операция плоской карты (flat map) использует промежуточный Flux для выполнения преобразования, следовательно, допускает асинхронные преобразования.

Следующий метод тестирования демонстрирует использование flatMap() и subscribeOn():

@Test

public void flatMap() {

   Flux playerFlux = Flux

      .just("Michael Jordan", "Scottie Pippen", "Steve Kerr")

      .flatMap(n -> Mono.just(n)

      .map(p -> {

         String[] split = p.split("\\s");

         return new Player(split[0], split[1]);

      })

      .subscribeOn(Schedulers.parallel())

   );

   List playerList = Arrays.asList(

      new Player("Michael", "Jordan"),

      new Player("Scottie", "Pippen"),

      new Player("Steve", "Kerr"));

   StepVerifier.create(playerFlux)

      .expectNextMatches(p -> playerList.contains(p))

      .expectNextMatches(p -> playerList.contains(p))

      .expectNextMatches(p -> playerList.contains(p))

      .verifyComplete();

}

Обратите внимание, что flatMap() получает лямбда-функцию, которая преобразует входящую String в Mono типа String. Затем к Mono применяется операция map() для преобразования String в Player.

Если вы остановитесь прямо здесь, результирующий поток будет передавать объекты Player, созданные синхронно в том же порядке, что и в примере с map(). Но операции с Mono завершаются вызовом subscribeOn(), чтобы указать, что каждая подписка должна проходить в параллельном потоке. Следовательно, операции сопоставления для нескольких входящих объектов String могут выполняться асинхронно и параллельно.

Хотя subscribeOn() называется очень похоже на subscribe(), но они совершенно разные. В то время как subscribe() - это глагол, подписывающийся на реактивный поток и эффективно запускающий его, subscribeOn() - является более описательной, определяя, как подписка должна обрабатываться параллельно. Reactor не навязывает какую-либо конкретную модель параллелизма; с помощью subscribeOn() вы можете указать модель параллелизма, используя один из статических методов из планировщиков, который вы хотите использовать. В этом примере вы использовали parallel(), которая использует рабочие потоки из фиксированного пула (размер которого соответствует числу ядер ЦП). Но планировщики поддерживают несколько моделей параллелизма, например, описанных в таблице 10.1.

Таблица 10.1 Модели параллелизма для планировщиков (Schedulers)

Метод планировщика: Описание

.immediate() - Выполняет подписку в текущем потоке.

.single() - Выполняет подписку в одном многоразовом потоке. Повторно использует один и тот же поток для всех абонентов.

.newSingle() - Выполняет подписку в выделенном потоке для каждого вызова.

.elastic() - Выполняет подписку в в рабочем пуле из неограниченного эластичного пула. Новые рабочие потоки создаются по мере необходимости, а простаивающие рабочие потоки удаляются (по умолчанию через 60 секунд).

.parallel() - Выполняет подписку в рабочем пуле из из пула фиксированного размера, размер которого соответствует числу ядер ЦП.

Преимуществом использования flatMap() и subscribeOn() является то, что вы можете увеличить пропускную способность потока, разделив работу между несколькими параллельными потоками. Но поскольку работа выполняется параллельно, без гарантий того, что будет постоянный порядок выполнения потоков, невозможно определить порядок элементов, передаваемых в результирующий поток. Таким образом, StepVerifier может только проверить, что каждый исходящий элемент существует в ожидаемом списке объектов Player и что до завершения потока будет три таких элемента.

БУФЕРИЗАЦИЯ ДАННЫХ В РЕАКТИВНОМ ПОТОКЕ

В процессе обработки данных, проходящих через Flux, может оказаться полезным разбить поток данных на небольшие части. Операция buffer(), показанная на рисунке 10.18, может помочь в этом.

Рисунок 10.18 Операция buffer приводит к листу Flux заданного максимального размера, которые формируется на основе входящего Flux.

Учитывая, что Flux у нас String значений, каждое из которых содержит имя фрукта, вы можете создать новый Flux коллекции List, в которой каждый List содержит не более указанного числа элементов:

@Test

public void buffer() {

   Flux fruitFlux = Flux.just(

      "apple", "orange", "banana", "kiwi", "strawberry");

   Flux> bufferedFlux = fruitFlux.buffer(3);

   StepVerifier

      .create(bufferedFlux)

      .expectNext(Arrays.asList("apple", "orange", "banana"))

      .expectNext(Arrays.asList("kiwi", "strawberry"))

      .verifyComplete();

}