-DirectChannel - Подобно PublishSubscribeChannel, но отправляет сообщение одному потребителю, вызывая потребителя в том же потоке, что и отправитель. Это позволяет транзакциям распространяться по каналу.
-ExecutorChannel - аналогичен DirectChannel, но отправка сообщения происходит через TaskExecutor, происходящий в отдельном потоке от отправителя. Этот тип канала не поддерживает транзакции, которые охватывают канал.
-FluxMessageChannel - канал сообщений Reactive Streams Publisher, основанный на Flux в Project Reactor. (Мы поговорим подробнее о Reactive Streams, Reactor и Flux в главе 10.)
Как в конфигурации Java, так и в стилях Java DSL входные каналы создаются автоматически с DirectChannel по умолчанию. Но если вы хотите использовать другую реализацию канала, вам необходимо явно объявить канал как bean-компонент и сослаться на него в потоке интеграции. Например, чтобы объявить PublishSubscribeChannel, вы должны объявить следующий метод @Bean:
@Bean
public MessageChannel orderChannel() {
return new PublishSubscribeChannel();
}
Тогда вы будете ссылаться на этот канал по имени в определении потока интеграции. Например, если канал использовался bean-компонентом-активатором службы, вы бы ссылались на него в атрибуте inputChannel @ServiceActivator:
@ServiceActivator(inputChannel="orderChannel")
Или, если вы используете стиль конфигурации JAVA DSL, вы должны ссылаться на него с помощью вызова channel():
@Beanpublic
IntegrationFlow orderFlow() {
return IntegrationFlows
…
.channel("orderChannel")
…
.get();
}
Важно отметить, что если вы используете QueueChannel, потребители должны быть настроены с помощью средства опроса. Например, предположим, что вы объявили QueueChannel bean следующим образом:
@Bean
public MessageChannel orderChannel() {
return new QueueChannel();
}
Вам нужно будет убедиться, что потребитель настроен на опрос канала для сообщений. В случае службы активатора аннотация @ServiceActivator может выглядеть следующим образом:
@ServiceActivator(inputChannel="orderChannel", poller=@Poller(fixedRate="1000"))
В этом примере служба активатора опрашивает канал с именем orderChannelevery каждую 1 секунду (или 1000 мс).
9.2.2 Фильтры
Фильтры могут быть размещены в середине конвейера интеграции, чтобы разрешить или запретить переход сообщений к следующему шагу в потоке (рис.9.3).
Рисунок 9.3 Фильтры, основанные на некоторых критериях, разрешают или запрещают передачу сообщений в конвейере.
Например, предположим, что сообщения, содержащие целочисленные значения, публикуются через канал с именем numberChannel, но вы хотите, чтобы на канал с именем evenNumberChannel передавались только четные числа. В этом случае вы можете объявить фильтр с аннотацией @Filter следующим образом:
@Filter(inputChannel="numberChannel", outputChannel="evenNumberChannel")
public boolean evenNumberFilter(Integer number) {
return number % 2 == 0;
}
Кроме того, если вы используете стиль конфигурации JAVA DSL для определения потока интеграции, вы можете сделать вызов filter() следующим образом:
@Bean
public IntegrationFlow evenNumberFlow(AtomicInteger integerSource) {
return IntegrationFlows
…
.
…
.get();
}
В этом случае для реализации фильтра используется лямбда. Но, по правде говоря, метод filter() принимает GenericSelector в качестве аргумента. Это означает, что вы можете реализовать интерфейс GenericSelector вместо этого, если ваша фильтрация слишком сложна для простой лямбды.
9.2.3 Трансформаторы
Трансформаторы выполняют некоторые операции над сообщениями, обычно приводя к другому сообщению и, возможно, с другим типом полезной нагрузки (см. рис.9.4). Преобразование может быть чем-то простым, например, выполнение математических операций над числом или манипулирование строковым значением. Или преобразование может быть более сложным,например, использование строкового значения, представляющего ISBN, для поиска и возврата сведений о соответствующей книге.
Рисунок 9.4 Трансформаторы преобразуют сообщения по мере их прохождения через поток интеграции.
Например, предположим, что целочисленные значения публикуются на канале с именем numberChannel, и вы хотите преобразовать эти числа в String, содержащую Римский числовой эквивалент. В этом случае вы можете объявить bean GenericTransformer и аннотировать его с помощью @Transformer следующим образом:
@Bean
@Transformer(inputChannel="numberChannel", outputChannel="romanNumberChannel")
public GenericTransformer
return RomanNumbers::toRoman;
}
Аннотация @Transformer определяет этот компонент как компонент transformer, который получает целочисленные значения из канала с именем numberChannel и использует статический метод с именем toRoman() для выполнения преобразования. (Метод toRoman () статически определен в классе с именем RomanNumbers и ссылается здесь со ссылкой на метод) Результат будет опубликован на канале с именем romanNumberChannel.
В стиле конфигурации Java DSL еще проще с вызовом transform(), передавая ссылку на метод toRoman():
@Bean
public IntegrationFlow transformerFlow() {
return IntegrationFlows
…
.transform(RomanNumbers::toRoman)
…
.get();
}
Хотя вы использовали ссылку на метод в обоих примерах кода преобразователя, знайте, что преобразователь также можно указать как лямбду. Или, если преобразователь достаточно сложен, чтобы потребовался отдельный класс Java, вы можете внедрить его как bean-компонент в конфигурацию потока и передать ссылку на метод transform():
@Bean
public RomanNumberTransformer romanNumberTransformer() {
return new RomanNumberTransformer();
}
@Bean
public IntegrationFlow transformerFlow( RomanNumberTransformer romanNumberTransformer) {
return IntegrationFlows
…
.transform(romanNumberTransformer)
…
.get();
}
Здесь вы объявляете bean-компонент типа RomanNumberTransformer, который сам является реализацией Spring Integration Transformer или интерфейса GenericTransformer. Bean внедряется в метод transformerFlow() и передается в метод transform() при определении потока интеграции.
9.2.4 Маршрутизаторы
Маршрутизаторы, основанные на некоторых критериях маршрутизации, позволяют разветвляться в потоке интеграции,направляя сообщения в разные каналы (см. рисунок 9.5).
Рис. 9.5 Маршрутизаторы направляют сообщения в различные каналы на основе некоторых критериев, применяемых к сообщениям.
Например, предположим, что у вас есть канал с именем numberChannel, через который проходят целочисленные значения. Допустим, вы хотите направить все сообщения с четными номерами в канал с именем evenChannel, а сообщения с нечетными номерами направляются в канал с именем oddChannel. Чтобы создать такую маршрутизацию в потоке интеграции, вы можете объявить bean-компонент типа AbstractMessageRouter и аннотировать bean-компонент с помощью @Router:
@Bean
@Router(inputChannel="numberChannel")
public AbstractMessageRouter evenOddRouter() {
return new AbstractMessageRouter() {
@Override
protected Collection
determineTargetChannels(Message> message) {
Integer number = (Integer) message.getPayload();
if (number % 2 == 0) {
return Collections.singleton(evenChannel());
}
return Collections.singleton(oddChannel());
}
};
}
@Bean
public MessageChannel evenChannel() {
return new DirectChannel();
}
@Bean
public MessageChannel oddChannel() {
return new DirectChannel();
}
Объявленный здесь bean AbstractMessageRouter принимает сообщения от входного канала с именем numberChannel. Реализация, определенная как анонимный внутренний класс, проверяет полезную нагрузку сообщения и, если это четное число, возвращает канал с именем evenChannel (объявленный как bean). В противном случае число в полезной нагрузке канала должно быть нечетным; в этом случае возвращается канал с именем oddChannel (также созданный как bean).
В Java DSL маршрутизаторы объявляются путем вызова route() в ходе определения потока, как показано ниже:
@Bean
public IntegrationFlow numberRoutingFlow(AtomicInteger source) {
return IntegrationFlows
…
.
.subFlowMapping("EVEN", sf -> sf .
.handle((i,h) -> { ... })
)
.subFlowMapping("ODD", sf -> sf .transform(RomanNumbers::toRoman) .handle((i,h) -> { ... })
)
)
.get();
}
Хотя по-прежнему можно объявить AbstractMessageRouter и передать его в router(), в этом примере используется лямбда для определения того, является ли полезная нагрузка сообщения четной или нечетной. Если она четная, то возвращается строковое значение EVEN. Если нечетная, то возвращается ODD.Эти значения затем используются для определения того, какое под-сопоставление будет обрабатывать сообщение.
9.2.5 Разделители
Иногда в потоке интеграции может быть полезно разделить сообщение на несколько сообщений, которые будут обрабатываться независимо. Разделители, как показано на рис.9.6, будут разделять и обрабатывать эти сообщения для вас.
Рис. 9.6 Hазделители разбивают сообщения на два или более отдельных сообщения, которые могут обрабатываться отдельными подпотоками.