Spring in Action Covers Spring 5-1--11 — страница 54 из 63

      .just("Garfield", "Kojak", "Barbossa")

      .delayElements(Duration.ofMillis(500));

   Flux foodFlux = Flux

      .just("Lasagna", "Lollipops", "Apples")

      .delaySubscription(Duration.ofMillis(250))

      .delayElements(Duration.ofMillis(500));

   Flux mergedFlux = characterFlux.mergeWith(foodFlux);

   StepVerifier.create(mergedFlux)

      .expectNext("Garfield")

      .expectNext("Lasagna")

      .expectNext("Kojak")

      .expectNext("Lollipops")

      .expectNext("Barbossa")

      .expectNext("Apples")

      .verifyComplete();

}

Обычно Flux публикует данные настолько быстро, насколько это возможно. Таким образом, вы используете операцию delayElements() в обоих созданных потоках Flux, чтобы немного их замедлить - отправляя запись каждые 500 мс. Кроме того, чтобы поток продуктов начинал передаваться, после Flux имен, вы применяете операцию delaySubscription() к потоку продуктов, чтобы он не отправлял никаких данных, пока не пройдет 250 мс после подписки.

После объединения двух объектов Flux создается новый объединенный Flux. Когда StepVerifier подписывается на объединенный поток, он, в свою очередь, подписывается на два исходных Flux потока, начиная поток данных.

Порядок предметов, отдаваемый из объединенного потока, совпадает со временем их передачи из источников. Поскольку оба объекта Flux настроены на отдачу с регулярной скоростью, значения будут чередоваться через объединенный поток, в результате чего будет получено имя, затем пища, затем имя и т. Д. Если время либо Flux должно быть изменены, возможно, вы увидите два персонажа или два продукта, опубликованные один за другим.

Поскольку mergeWith() не может гарантировать идеальное взаимодействие между его источниками, вы можете рассмотреть операцию zip() вместо этого. Когда два объекта Flux сжимаются вместе, это приводит к новому Flux, который создает кортеж элементов, где кортеж содержит один элемент из каждого исходного потока. На рис. 10.7 показано, как два объекта Flux можно сжать вместе.

Рис. 10.7. Сжатие двух потоков Flux приводит к созданию Flux, содержащего наборы по одному элементу от каждого потока.

Чтобы увидеть действие zip() в действии, рассмотрите следующий метод тестирования, который объединяет Flux персонажей и Flux продукты вместе:

@Test

public void zipFluxes() {

   Flux characterFlux = Flux

      .just("Garfield", "Kojak", "Barbossa");

   Flux foodFlux = Flux

      .just("Lasagna", "Lollipops", "Apples");

   Flux> zippedFlux =

   Flux.zip(characterFlux, foodFlux);

   StepVerifier.create(zippedFlux)

      .expectNextMatches(p ->

         p.getT1().equals("Garfield") &&

         p.getT2().equals("Lasagna"))

      .expectNextMatches(p ->

         p.getT1().equals("Kojak") &&

         p.getT2().equals("Lollipops"))

      .expectNextMatches(p ->

         p.getT1().equals("Barbossa") &&

         p.getT2().equals("Apples"))

      .verifyComplete();

}

Обратите внимание, что в отличие от mergeWith(), операция zip() является статической операцией создания. Созданный Flux имеет идеальное выравнивание между персонажами и их любимыми блюдами. Каждый элемент, испускаемый из сжатого потока, представляет собой Tuple2 (контейнерный объект, который содержит два других объекта), содержащий элементы из каждого исходного потока в порядке их публикации.

Если вы предпочитаете не работать с Tuple2, а работать с каким-то другим типом, вы можете предоставить функцию zip(), которая создает любой объект, который вы хотите, учитывая два элемента (как показано на диаграмме marble на рисунке 10.8).

Рисунок 10.8 альтернативная форма операции zip приводит к Flux сообщений, созданных из одного элемента каждого входящего Flux.

Например, в следующем методе тестирования показано, как связать Flux имен с Flux продуктов питания, чтобы получить в результате Flux из String объектов:

@Test

public void zipFluxesToObject() {

   Flux characterFlux = Flux

      .just("Garfield", "Kojak", "Barbossa");

   Flux foodFlux = Flux

      .just("Lasagna", "Lollipops", "Apples");

   Flux zippedFlux =

      Flux.zip(characterFlux, foodFlux, (c, f) -> c + " eats " + f);

   StepVerifier.create(zippedFlux)

      .expectNext("Garfield eats Lasagna")

      .expectNext("Kojak eats Lollipops")

      .expectNext("Barbossa eats Apples")

      .verifyComplete();

}

Функция, заданная для zip() (заданная здесь как лямбда), просто объединяет два элемента в предложение, которое отдается зипованным Flux.

ВЫБОР ПЕРВОГО РЕАКТИВНОГО ТИПА ДЛЯ ПУБЛИКАЦИИ

Предположим, у вас есть два Flux объекта, и вместо того, чтобы объединить их, вы просто хотите создать новый Flux, который будет генерировать значения из первого Flux, который создает значение. Как показано на рисунке 10.9, операция first() выбирает первый из двух объектов Flux и отображает значения, которые она публикует.

Рис. 10.9. first операция выбирает первый Flux, который отправляет сообщение, и после этого создает только сообщения из этого потока.

Следующий метод тестирования создает быстрый Flux и медленный Flux (где “медленный " означает, что он не будет публиковать элемент до 100 мс после подписки). Используя first(), он создает новый Flux, который будет публиковать значения только из первого исходного Flux для публикации значения:

@Test

public void firstFlux() {

   Flux slowFlux = Flux.just("tortoise", "snail", "sloth")

      .delaySubscription(Duration.ofMillis(100));

   Flux fastFlux = Flux.just("hare", "cheetah", "squirrel");

   Flux firstFlux = Flux.first(slowFlux, fastFlux);

   StepVerifier.create(firstFlux)

      .expectNext("hare")

      .expectNext("cheetah")

      .expectNext("squirrel")

      .verifyComplete();

}

В этом случае, поскольку медленный Flux не будет публиковать никаких значений до 100 мс после начала публикации быстрого Flux, вновь созданный Flux будет просто игнорировать медленный Flux и публиковать только значения из быстрого Flux.

10.3.3 Преобразование и фильтрация реактивных потоков

Когда данные проходят через поток, вам, вероятно, потребуется отфильтровать некоторые значения и изменить другие значения. В этом разделе мы рассмотрим операции, которые преобразуют и фильтруют данные, проходящие через реактивный поток.

ФИЛЬТРАЦИЯ ДАННЫХ ИЗ РЕАКТИВНЫХ ТИПОВ

Один из самых основных способов фильтрации данных при их поступлении из Flux - просто игнорировать первые записи. Операция skip(), показанная на рисунке 10.10, делает именно это.

Рисунок 10.10. Операция skip пропускает указанное количество сообщений перед передачей оставшихся сообщений в результирующий Flux.

Учитывая Flux с несколькими записями, операция skip() создаст новый Flux, который пропускает заданное количество элементов, прежде чем отдавать остальные элементы из исходного Flux. Следующий метод тестирования показывает, как использовать skip():

@Test

public void skipAFew() {

   Flux skipFlux = Flux.just(

      "one", "two", "skip a few", "ninety nine", "one hundred")

   .skip(3);

   StepVerifier.create(skipFlux)

      .expectNext("ninety nine", "one hundred")

      .verifyComplete();

}

В этом случае у вас есть Flux из пяти String элементов. Вызов метода skip(3) для этого потока создает новый Flux, который пропускает первые три элемента и публикует только последние два элемента.

Но, возможно, вы не хотите пропускать определенное количество элементов, а вместо этого нужно пропустить некоторое количество элементов, определяемое не количеством, а временем. Альтернативная форма операции skip(), показанная на рисунке 10.11, создает Flux, который ожидает, пока не пройдет некоторое заданное время, прежде чем отдавать элементы из исходного Flux.

Рисунок 10.11. Альтернативная форма операции skip ждет, пока не пройдет некоторое время, прежде чем передавать сообщения в результирующий поток.

Следующий метод тестирования использует функцию skip() для создания Flux, который ожидает четыре секунды, прежде чем начинает отдавать какие-либо значения. Поскольку этот Flux был создан из Flux, который имеет односекундную задержку между элементами (используя delayElements()), будут переданы только последние два элемента:

@Test

public void skipAFewSeconds() {

   Flux skipFlux = Flux.just(

         "one", "two", "skip a few", "ninety nine", "one hundred")

      .delayElements(Duration.ofSeconds(1))

      .skip(Duration.ofSeconds(4));

   StepVerifier.create(skipFlux)

      .expectNext("ninety nine", "one hundred")

      .verifyComplete();

}

Вы уже видели пример метода take(), но в познакомившись с методом skip(), take() можно рассматривать как противоположность skip (). В то время как функция skip() пропускает первые несколько элементов, функция take() выдает только первые несколько элементов (как показано на диаграмме marble на рис. 10.12):

@Test

public void take() {

   Flux nationalParkFlux = Flux.just(

         "Yellowstone", "Yosemite", "Grand Canyon",

         "Zion", "Grand Teton")

      .take(3);

   StepVerifier.create(nationalParkFlux)

      .expectNext("Yellowstone", "Yosemite", "Grand Canyon")

      .verifyComplete();

}

Рис. 10.12 операция take передает только первые несколько сообщений из входящего Flux, а затем отменяет подписку.

Как и skip(), take() также имеет альтернативную форму, основанную на длительности, а не на количестве элементов. Он будет принимать и отдавать столько элементов, сколько проходит через исходный поток, пока не пройдет некоторый период времени, после чего поток завершится. Это показано на рисунке 10.13.