Spring in Action Covers Spring 5-1--11 — страница 52 из 63

Существует много общего между Java Streams и Reactive Streams. Для начала, они оба имеют слово потоки в своих именах. Они также предоставляют функциональный API для работы с данными. На самом деле, как вы увидите позже, когда мы посмотрим на Reactor, они даже выполняют одни и те же операции.

Однако Java Streams обычно являются синхронными и работают с конечным набором данных. По сути, они являются средством перебора коллекции с помощью функций.

Reactive Streams поддерживают асинхронную обработку наборов данных любого размера, включая бесконечные наборы данных. Они обрабатывают данные в режиме реального времени, когда они становятся доступными, с противодавлением (backpressure), чтобы не перегружать своих потребителей.

Спецификация реактивных потоков может быть описана четырьмя определениями интерфейсами: Publisher, Subscriber, Subscription, и Processor. Publisher создает данные, которые он отправляет Subscriber на Subscription. Интерфейс Publisher объявляет единый метод subscribe() с помощью которого Subscriber может подписаться на Publisher:

public interface Publisher {

    void subscribe(Subscriber subscriber);

}

После того как Subscriber подписался, он может получать события от Publisher. Эти события отправляются через методы интерфейса Subscriber:

public interface Subscriber {

    void onSubscribe(Subscription sub);

    void onNext(T item);

    void onError(Throwable ex);

    void onComplete();

}

Первое событие, которое получит Subscriber, - это вызов функции onSubscribe(). Когда Publisher вызывает функцию onSubscribe(), он передает Subscription объект Subscriber. Именно через Subscription Subscriber может управлять своей подпиской:

public interface Subscription {

    void request(long n);

    void cancel();

}

Subscriber может вызвать функцию request(), чтобы запросить отправку данных, или функцию cancel(), чтобы указать, что он больше не заинтересован в получении данных и отменяет подписку. При вызове функции request() Subscriber передает long значение, чтобы указать, сколько элементов данных он готов принять. Именно здесь возникает обратное давление (backpressure), препятствующее Publisher отправлять больше данных, чем может обработать Subscriber. После того, как Publisher отправил столько элементов, сколько было запрошено, Subscriber может снова вызвать функцию request(), чтобы запросить больше.

После того, как Subscriber запросил данные, данные начинают поступать через поток. Для каждого элемента, опубликованного Publisher, будет вызван метод onNext() для доставки данных Subscriber. Если есть какие-либо ошибки, вызывается onError(). Если у Publisher нет больше данных для отправки и он не будет генерировать больше данных, он вызовет onComplete(), чтобы сообщить подписчику, что он завершил процесс.

Что касается интерфейса Processor, это комбинация Subscriber и Publisher, как показано здесь:

public interface Processor extends Subscriber, Publisher {}

Как Subscriber, Processor будет получать данные и обрабатывать их каким-либо образом. Затем он будет “переоденется” и выступит в качестве Publisher, чтобы публиковать результаты для своих Subscribers.

Как вы можете видеть, спецификация Reactive Streams довольно проста. Довольно легко понять, как можно построить конвейер обработки данных, который начинается с Publisher, прогоняет данные через ноль или более Processors, а затем передает конечные результаты в Subscriber.

Однако интерфейсы Reactive Streams не могут использоваться для создания такого потока функциональным способом. Project Reactor - это реализация спецификации Reactive Streams, которая предоставляет функциональный API для создания Reactive Streams. Как вы увидите в следующих главах, Reactor является основой модели реактивного программирования в Spring 5. В оставшейся части этой главы мы собираемся исследовать (и, осмелюсь сказать, очень весело провести время) Project Reactor.

10.2 Начало работы с Reactor

Реактивное программирование требует от нас думать совсем иначе, чем императивное программирование. Вместо того, чтобы описывать набор шагов, которые необходимо предпринять, реактивное программирование означает построение конвейера, по которому будут проходить данные. Когда данные проходят через конвейер, они могут быть изменены или использованы каким-либо образом.

Например, предположим, что вы хотите взять имя человека, изменить все его буквы на заглавные, использовать его для создания приветственного сообщения, а затем, наконец, напечатать его. В модели императивного программирования код будет выглядеть примерно так:

String name = "Craig";

String capitalName = name.toUpperCase();

String greeting = "Hello, " + capitalName + "!";

System.out.println(greeting);

В императивной модели каждая строка кода выполняет шаг, один за другим, и определенно в одном и том же потоке. Каждый шаг блокирует выполнение потока до следующего шага до его завершения.

Функциональный, реактивный код может достичь того же этого же:

Mono.just("Craig")

   .map(n -> n.toUpperCase())

   .map(cn -> "Hello, " + cn + "!")

   .subscribe(System.out::println);

Не волнуйтесь про, возможное, недопонимание этого примера; мы скоро поговорим об операциях just(), map() и subscribe(). На данный момент важно понимать, что хотя реактивный пример все еще следует пошаговой модели, на самом деле это конвейер, через который проходят данные. На каждом этапе конвейера данные каким-то образом изменяются, но нельзя точно сказать, какой поток выполняет какие операции. Все операции могут выполняться в одном потоке ... или не в одном.

Mono в этом примере является одним из двух основных типов Reactor. Второй это - Flux. Оба являются реализациями Reactive Streams Publisher. Поток представляет собой конвейер из нуля, одного или многих (потенциально бесконечных) элементов данных. Mono - это специализированный реактивный тип, оптимизированный для случаев, когда известно, что в наборе данных содержится не более одного элемента данных.

Reactor vs. RxJava (ReactiveX)

Если вы уже знакомы с RxJava или ReactiveX, возможно, вы думаете, что Mono и Flux звучат во многом как Observable и Single. На самом деле, они примерно эквивалентны семантически. Они даже предлагают много одинаковых операций.

Хотя в этой книге мы сосредоточимся на Reactor, вы, возможно, будете рады узнать, что можно скрывать типы Reactor и RxJava. Более того, как вы увидите в следующих главах, Spring также может работать с типами RxJava.

На самом деле в предыдущем примере три Mono. Операция just() создает первую. Когда Mono выдает значение, это значение присваивается операции map(), которая описывает что слово должно быть написано заглавными буквами и использовано для создания другого Mono. Когда второй Mono публикует свои данные, он передается второй операции map() для выполнения конкатенации строк, результаты которой используются для создания третьего Mono. Наконец, вызов метода subscribe() подписывается на Mono, получает данные и печатает их.

10.2.1 Схема реактивных потоков

Реактивные потоки часто иллюстрируются мраморными (marble) диаграммами. Мраморные (marble) диаграммы в своей простейшей форме изображают временную шкалу данных, потоков проходящих через Flux или Mono вверху, операцию в середине и временную шкалу результирующего Flux или Mono внизу. На рис. 10.1 показан шаблон диаграммы состояния потока. Как вы можете видеть, когда данные проходят через исходный Flux, он обрабатывается через некоторую операцию, в результате чего возникает новый Flux, через который проходят обработанные данные.

На рисунке 10.2 показана аналогичная мраморная (marble) диаграмма, но для Mono. Как видите, ключевое отличие состоит в том, что у Mono будет либо ноль, либо один элемент данных, либо ошибка.

В разделе 10.3 мы рассмотрим многие операции, поддерживаемые Flux и Mono, и будем использовать мраморные (marble) диаграммы для визуализации их работы.

Рисунок 10.1 Мраморная (marble) диаграмма, иллюстрирующая основной поток Flux

Рисунок 10.2 Мраморная (marble) диаграмма, иллюстрирующая основной поток Mono

10.2.2 Добавление Reactor зависимостей

Чтобы начать работу с Reactor, добавьте следующую зависимость в сборку проекта:

io.projectreactor

reactor-core

Reactor также предоставляет отличную поддержку тестирования. Мы собираемся написать множество тестов для своего кода Reactor, поэтому вам обязательно нужно добавить эту зависимость в вашу сборку:

io.projectreactor

reactor-test

test

Я предполагаю, что вы добавляете эти зависимости в проект Spring Boot, который обрабатывает управление зависимостями (dependency management) для вас, поэтому нет необходимости указывать элемент для зависимостей. Но если вы хотите использовать Reactor в проекте, отличном от Spring Boot, вам потребуется настроить спецификацию Reactor BOM (bill of materials) в сборке. Следующая запись управления зависимостями добавляет Reactor Bismuth в сборку:

io.projectreactor

reactor-bom

Bismuth-RELEASE

pom

import

Теперь, когда Reactor находится в разработке вашего проекта, вы можете начать создавать реактивные конвейеры с Mono и Flux. В оставшейся части этой главы мы рассмотрим несколько операций, предлагаемых Mono и Flux.

10.3 Применение общих реактивных операций

Flux и Mono являются наиболее важными строительными блоками, предоставляемыми Reactor, и операции, предлагаемые этими двумя реактивными типами, представляют собой раствор, который связывает их вместе для создания конвейеров, по которым могут передаваться данные. Между Flux и Mono существует более 500 операций, каждую из которых можно условно классифицировать как