Spring Boot集成Java DSL的實(shí)現(xiàn)代碼
Spring Integration Java DSL已經(jīng)融合到Spring Integration Core 5.0,這是一個(gè)聰明而明顯的舉動(dòng),因?yàn)椋?/p>
- 基于Java Config啟動(dòng)新Spring項(xiàng)目的每個(gè)人都使用它
- SI Java DSL使您可以使用Lambdas等新的強(qiáng)大Java 8功能
- 您可以使用 基于IntegrationFlowBuilder的Builder模式構(gòu)建流
讓我們看看基于ActiveMQ JMS的示例如何使用它。
Maven依賴:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-jms</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-kahadb-store</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.integration/spring-integration-java-dsl -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-java-dsl</artifactId>
<version>1.2.3.RELEASE</version>
</dependency>
</dependencies>
示例1:Jms入站網(wǎng)關(guān)
我們有以下ServiceActivator:
@Service
public class ActiveMQEndpoint {
@ServiceActivator(inputChannel = "inboundChannel")
public void processMessage(final String inboundPayload) {
System.out.println("Inbound message: "+inboundPayload);
}
}
如果您想使用SI Java DSL 將inboundPayload從Jms隊(duì)列發(fā)送到Gateway風(fēng)格的激活器,那么請(qǐng)使用DSLJms工廠:
@Bean
public DynamicDestinationResolver dynamicDestinationResolver() {
return new DynamicDestinationResolver();
}
@Bean
public ActiveMQConnectionFactory connectionFactory() {
return new ActiveMQConnectionFactory();
}
@Bean
public DefaultMessageListenerContainer listenerContainer() {
final DefaultMessageListenerContainer defaultMessageListenerContainer = new DefaultMessageListenerContainer();
defaultMessageListenerContainer.setDestinationResolver(dynamicDestinationResolver());
defaultMessageListenerContainer.setConnectionFactory(connectionFactory());
defaultMessageListenerContainer.setDestinationName("jms.activeMQ.Test");
return defaultMessageListenerContainer;
}
@Bean
public MessageChannel inboundChannel() {
return MessageChannels.direct("inboundChannel").get();
}
@Bean
public JmsInboundGateway dataEndpoint() {
return Jms.inboundGateway(listenerContainer())
.requestChannel(inboundChannel()).get();
}
通過(guò)dataEndpoint bean 返回JmsInboundGatewaySpec,您還可以向SI通道或Jms目標(biāo)發(fā)送回復(fù)。查看文檔。
示例2:Jms消息驅(qū)動(dòng)的通道適配器
如果您正在尋找替換消息驅(qū)動(dòng)通道適配器的XML JMS配置,那么JmsMessageDrivenChannelAdapter是一種適合您的方式:
@Bean
public DynamicDestinationResolver dynamicDestinationResolver() {
return new DynamicDestinationResolver();
}
@Bean
public ActiveMQConnectionFactory connectionFactory() {
return new ActiveMQConnectionFactory();
}
@Bean
public DefaultMessageListenerContainer listenerContainer() {
final DefaultMessageListenerContainer defaultMessageListenerContainer = new DefaultMessageListenerContainer();
defaultMessageListenerContainer.setDestinationResolver(dynamicDestinationResolver());
defaultMessageListenerContainer.setConnectionFactory(connectionFactory());
defaultMessageListenerContainer.setDestinationName("jms.activeMQ.Test");
return defaultMessageListenerContainer;
}
@Bean
public MessageChannel inboundChannel() {
return MessageChannels.direct("inboundChannel").get();
}
@Bean
public JmsMessageDrivenChannelAdapter dataEndpoint() {
final ChannelPublishingJmsMessageListener channelPublishingJmsMessageListener =
new ChannelPublishingJmsMessageListener();
channelPublishingJmsMessageListener.setExpectReply(false);
final JmsMessageDrivenChannelAdapter messageDrivenChannelAdapter = new
JmsMessageDrivenChannelAdapter(listenerContainer(), channelPublishingJmsMessageListener
);
messageDrivenChannelAdapter.setOutputChannel(inboundChannel());
return messageDrivenChannelAdapter;
}
與前面的示例一樣,入站有效負(fù)載如樣本1中一樣發(fā)送給激活器。
示例3:使用JAXB的Jms消息驅(qū)動(dòng)的通道適配器
在典型的場(chǎng)景中,您希望通過(guò)Jms接受XML作為文本消息,將其轉(zhuǎn)換為JAXB存根并在服務(wù)激活器中處理它。我將向您展示如何使用SI Java DSL執(zhí)行此操作,但首先讓我們?yōu)閤ml處理添加兩個(gè)依賴項(xiàng):
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-xml</artifactId>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-oxm</artifactId>
</dependency>
我們將通過(guò)JMS接受shiporders ,所以首先XSD命名為shiporder.xsd:
<?xml version="1.0" encoding="UTF-8" ?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:element name="shiporder">
<xs:complexType>
<xs:sequence>
<xs:element name="orderperson" type="xs:string"/>
<xs:element name="shipto">
<xs:complexType>
<xs:sequence>
<xs:element name="name" type="xs:string"/>
<xs:element name="address" type="xs:string"/>
<xs:element name="city" type="xs:string"/>
<xs:element name="country" type="xs:string"/>
</xs:sequence>
</xs:complexType>
</xs:element>
<xs:element name="item" maxOccurs="unbounded">
<xs:complexType>
<xs:sequence>
<xs:element name="title" type="xs:string"/>
<xs:element name="note" type="xs:string" minOccurs="0"/>
<xs:element name="quantity" type="xs:positiveInteger"/>
<xs:element name="price" type="xs:decimal"/>
</xs:sequence>
</xs:complexType>
</xs:element>
</xs:sequence>
<xs:attribute name="orderid" type="xs:string" use="required"/>
</xs:complexType>
</xs:element>
</xs:schema>
新增JAXB maven plugin 生成JAXB存根:
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>jaxb2-maven-plugin</artifactId>
<version>2.3.1</version>
<executions>
<execution>
<id>xjc-schema1</id>
<goals>
<goal>xjc</goal>
</goals>
<configuration>
<!-- Use all XSDs under the west directory for sources here. -->
<sources>
<source>src/main/resources/xsds/shiporder.xsd</source>
</sources>
<!-- Package name of the generated sources. -->
<packageName>com.example.stubs</packageName>
<outputDirectory>src/main/java</outputDirectory>
<clearOutputDir>false</clearOutputDir>
</configuration>
</execution>
</executions>
</plugin>
我們已經(jīng)準(zhǔn)備好了存根類和一切,現(xiàn)在使用Jaxb magic的Java DSL JMS消息驅(qū)動(dòng)適配器:
/**
* Sample 3: Jms message driven adapter with JAXB
*/
@Bean
public JmsMessageDrivenChannelAdapter dataEndpoint() {
final ChannelPublishingJmsMessageListener channelPublishingJmsMessageListener =
new ChannelPublishingJmsMessageListener();
channelPublishingJmsMessageListener.setExpectReply(false);
channelPublishingJmsMessageListener.setMessageConverter(new MarshallingMessageConverter(shipOrdersMarshaller()));
final JmsMessageDrivenChannelAdapter messageDrivenChannelAdapter = new
JmsMessageDrivenChannelAdapter(listenerContainer(), channelPublishingJmsMessageListener
);
messageDrivenChannelAdapter.setOutputChannel(inboundChannel());
return messageDrivenChannelAdapter;
}
@Bean
public Jaxb2Marshaller shipOrdersMarshaller() {
Jaxb2Marshaller marshaller = new Jaxb2Marshaller();
marshaller.setContextPath("com.example.stubs");
return marshaller;
}
XML配置在Java中使用它可以為您提供如此強(qiáng)大的功能和靈活性。要完成此示例,inboundChannel的服務(wù)激活器將如下所示:
/**
* Sample 3
* @param shiporder
*/
@ServiceActivator(inputChannel = "inboundChannel")
public void processMessage(final Shiporder shiporder) {
System.out.println(shiporder.getOrderid());
System.out.println(shiporder.getOrderperson());
}
要測(cè)試流,您可以使用以下XML通過(guò)JConsole發(fā)送到JMS隊(duì)列:
<?xml version="1.0" encoding="UTF-8"?>
<shiporder orderid="889923"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="shiporder.xsd">
<orderperson>John Smith</orderperson>
<shipto>
<name>Ola Nordmann</name>
<address>Langgt 23</address>
<city>4000 Stavanger</city>
<country>Norway</country>
</shipto>
<item>
<title>Empire Burlesque</title>
<note>Special Edition</note>
<quantity>1</quantity>
<price>10.90</price>
</item>
<item>
<title>Hide your heart</title>
<quantity>1</quantity>
<price>9.90</price>
</item>
</shiporder>
示例4:具有JAXB和有效負(fù)載根路由的Jms消息驅(qū)動(dòng)的通道適配器
另一種典型情況是接受XML作為JMS文本消息,將其轉(zhuǎn)換為JAXB存根并根據(jù)有效負(fù)載根類型將有效負(fù)載路由到某個(gè)服務(wù)激活器。當(dāng)然SI Java DSL支持所有類型的路由,我將向您展示如何根據(jù)有效載荷類型進(jìn)行路由。
首先,將以下XSD添加到shiporder.xsd所在的文件夾中,并將其命名為purchaseorder.xsd:
<xsd:schema xmlns:xsd="http://www.w3.org/2001/XMLSchema"
xmlns:tns="http://tempuri.org/PurchaseOrderSchema.xsd"
targetNamespace="http://tempuri.org/PurchaseOrderSchema.xsd"
elementFormDefault="qualified">
<xsd:element name="PurchaseOrder">
<xsd:complexType>
<xsd:sequence>
<xsd:element name="ShipTo" type="tns:USAddress" maxOccurs="2"/>
<xsd:element name="BillTo" type="tns:USAddress"/>
</xsd:sequence>
<xsd:attribute name="OrderDate" type="xsd:date"/>
</xsd:complexType>
</xsd:element>
<xsd:complexType name="USAddress">
<xsd:sequence>
<xsd:element name="name" type="xsd:string"/>
<xsd:element name="street" type="xsd:string"/>
<xsd:element name="city" type="xsd:string"/>
<xsd:element name="state" type="xsd:string"/>
<xsd:element name="zip" type="xsd:integer"/>
</xsd:sequence>
<xsd:attribute name="country" type="xsd:NMTOKEN" fixed="US"/>
</xsd:complexType>
</xsd:schema>
然后添加到j(luò)axb maven插件配置:
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>jaxb2-maven-plugin</artifactId>
<version>2.3.1</version>
<executions>
<execution>
<id>xjc-schema1</id>
<goals>
<goal>xjc</goal>
</goals>
<configuration>
<!-- Use all XSDs under the west directory for sources here. -->
<sources>
<source>src/main/resources/xsds/shiporder.xsd</source>
<source>src/main/resources/xsds/purchaseorder.xsd</source>
</sources>
<!-- Package name of the generated sources. -->
<packageName>com.example.stubs</packageName>
<outputDirectory>src/main/java</outputDirectory>
<clearOutputDir>false</clearOutputDir>
</configuration>
</execution>
</executions>
</plugin>
運(yùn)行mvn clean install以生成新XSD的JAXB存根?,F(xiàn)在承諾有效負(fù)載根映射:
@Bean
public Jaxb2Marshaller ordersMarshaller() {
Jaxb2Marshaller marshaller = new Jaxb2Marshaller();
marshaller.setContextPath("com.example.stubs");
return marshaller;
}
/**
* Sample 4: Jms message driven adapter with Jaxb and Payload routing.
* @return
*/
@Bean
public JmsMessageDrivenChannelAdapter dataEndpoint() {
final ChannelPublishingJmsMessageListener channelPublishingJmsMessageListener =
new ChannelPublishingJmsMessageListener();
channelPublishingJmsMessageListener.setMessageConverter(new MarshallingMessageConverter(ordersMarshaller()));
final JmsMessageDrivenChannelAdapter messageDrivenChannelAdapter = new
JmsMessageDrivenChannelAdapter(listenerContainer(), channelPublishingJmsMessageListener
);
messageDrivenChannelAdapter.setOutputChannel(inboundChannel());
return messageDrivenChannelAdapter;
}
@Bean
public IntegrationFlow payloadRootMapping() {
return IntegrationFlows.from(inboundChannel()).<Object, Class<?>>route(Object::getClass, m->m
.subFlowMapping(Shiporder.class, sf->sf.handle((MessageHandler) message -> {
final Shiporder shiporder = (Shiporder) message.getPayload();
System.out.println(shiporder.getOrderperson());
System.out.println(shiporder.getOrderid());
}))
.subFlowMapping(PurchaseOrder.class, sf->sf.handle((MessageHandler) message -> {
final PurchaseOrder purchaseOrderType = (PurchaseOrder) message.getPayload();
System.out.println(purchaseOrderType.getBillTo().getName());
}))
).get();
}
注意payloadRootMapping bean,讓我們解釋一下重要的部分:
- <Object, Class<?>> route - 表示來(lái)自inboundChannel的輸入將是Object,并且將根據(jù)Class <?>執(zhí)行路由
- subFlowMapping(Shiporder.class.. - ShipOders的處理。
- subFlowMapping(PurchaseOrder.class ... - 處理PurchaseOrders。
要測(cè)試ShipOrder有效負(fù)載,請(qǐng)使用示例3中的XML,以測(cè)試PurchaseOrder有效負(fù)載,使用以下XML:
<?xml version="1.0" encoding="utf-8"?> <PurchaseOrder OrderDate="1900-01-01" xmlns="http://tempuri.org/PurchaseOrderSchema.xsd"> <ShipTo country="US"> <name>name1</name> <street>street1</street> <city>city1</city> <state>state1</state> <zip>1</zip> </ShipTo> <ShipTo country="US"> <name>name2</name> <street>street2</street> <city>city2</city> <state>state2</state> <zip>-79228162514264337593543950335</zip> </ShipTo> <BillTo country="US"> <name>name1</name> <street>street1</street> <city>city1</city> <state>state1</state> <zip>1</zip> </BillTo> </PurchaseOrder>
應(yīng)根據(jù)subflow 子流Map路由兩個(gè)有效載荷。
示例5:IntegrationFlowAdapter
除了企業(yè)集成模式的其他實(shí)現(xiàn)(check them out)),我需要提到IntegrationFlowAdapter。通過(guò)擴(kuò)展此類并實(shí)現(xiàn)buildFlow方法,如:
[url=https://bitbucket.org/Component/]@Component[/url]
public class MyFlowAdapter extends IntegrationFlowAdapter {
@Autowired
private ConnectionFactory rabbitConnectionFactory;
@Override
protected IntegrationFlowDefinition<?> buildFlow() {
return from(Amqp.inboundAdapter(this.rabbitConnectionFactory, "myQueue"))
.<String, String>transform(String::toLowerCase)
.channel(c -> c.queue("myFlowAdapterOutput"));
}
你可以將bean的重復(fù)聲明包裝成一個(gè)組件并給它們所需的流量。然后可以配置這樣的組件并將其作為一個(gè)類實(shí)例提供給調(diào)用代碼!
因此,讓我們舉例說(shuō)明這個(gè)repo中的示例3更短一些,并為所有JmsEndpoints定義基類,并在其中定義重復(fù)bean:
public class JmsEndpoint extends IntegrationFlowAdapter {
private String queueName;
private String channelName;
private String contextPath;
/**
* @param queueName
* @param channelName
* @param contextPath
*/
public JmsEndpoint(String queueName, String channelName, String contextPath) {
this.queueName = queueName;
this.channelName = channelName;
this.contextPath = contextPath;
}
@Override
protected IntegrationFlowDefinition<?> buildFlow() {
return from(Jms.messageDrivenChannelAdapter(listenerContainer())
.jmsMessageConverter(new MarshallingMessageConverter(shipOrdersMarshaller()))
).channel(channelName);
}
@Bean
public Jaxb2Marshaller shipOrdersMarshaller() {
Jaxb2Marshaller marshaller = new Jaxb2Marshaller();
marshaller.setContextPath(contextPath);
return marshaller;
}
@Bean
public DynamicDestinationResolver dynamicDestinationResolver() {
return new DynamicDestinationResolver();
}
@Bean
public ActiveMQConnectionFactory connectionFactory() {
return new ActiveMQConnectionFactory();
}
@Bean
public DefaultMessageListenerContainer listenerContainer() {
final DefaultMessageListenerContainer defaultMessageListenerContainer = new DefaultMessageListenerContainer();
defaultMessageListenerContainer.setDestinationResolver(dynamicDestinationResolver());
defaultMessageListenerContainer.setConnectionFactory(connectionFactory());
defaultMessageListenerContainer.setDestinationName(queueName);
return defaultMessageListenerContainer;
}
@Bean
public MessageChannel inboundChannel() {
return MessageChannels.direct(channelName).get();
}
}
現(xiàn)在聲明特定隊(duì)列的Jms端點(diǎn)很容易:
@Bean
public JmsEndpoint jmsEndpoint() {
return new JmsEndpoint("jms.activeMQ.Test", "inboundChannel", "com.example.stubs");
}
inboundChannel的服務(wù)激活器:
/**
* Sample 3, 5
* @param shiporder
*/
@ServiceActivator(inputChannel = "inboundChannel")
public void processMessage(final Shiporder shiporder) {
System.out.println(shiporder.getOrderid());
System.out.println(shiporder.getOrderperson());
}
您不應(yīng)該錯(cuò)過(guò)在項(xiàng)目中使用IntegrationFlowAdapter。我喜歡它的概念。
我最近在Embedit的新的基于Spring Boot的項(xiàng)目中開始使用Spring Integration Java DSL 。即使有一些配置,我發(fā)現(xiàn)它非常有用。
- 它很容易調(diào)試。不添加像wiretap這樣的配置。
- 閱讀起來(lái)要容易得多。是的,即使是lambdas!
- 它很強(qiáng)大。在Java配置中,您現(xiàn)在有很多選擇。
源碼地址:https://bitbucket.org/tomask79/spring-integration-java-dsl
以上就是本文的全部?jī)?nèi)容,希望對(duì)大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
HashMap和List遍歷方法及如何遍歷刪除元素總結(jié)
在本篇文章中小編給大家分享了關(guān)于HashMap和List遍歷方法及如何遍歷刪除元素知識(shí)點(diǎn)總結(jié),需要的朋友們參考下。2019-05-05
解決MyBatis中Enum字段參數(shù)解析問(wèn)題
本文主要介紹了解決MyBatis中Enum字段參數(shù)解析問(wèn)題,文中通過(guò)示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2021-08-08
Java日期格式化的實(shí)現(xiàn)(@JsonFormat和@JSONField)
本文主要介紹了Java日期格式化的實(shí)現(xiàn),主要介紹了@JsonFormat和@JSONField兩種方式,具有一定的參考價(jià)值,感興趣的可以了解一下2024-05-05
基于SpringBoot實(shí)現(xiàn)郵箱找回密碼的代碼示例
本文主要介紹了如何基于SpringBoot實(shí)現(xiàn)郵箱找回密碼,文中通過(guò)代碼示例給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作有一定的幫助,需要的朋友可以參考下2024-02-02
Java?SSM實(shí)現(xiàn)前后端協(xié)議聯(lián)調(diào)詳解下篇
首先我們已經(jīng)知道,在現(xiàn)在流行的“前后端完全分離”架構(gòu)中,前后端聯(lián)調(diào)是一個(gè)不可能避免的問(wèn)題,這篇文章主要介紹了Java?SSM實(shí)現(xiàn)前后端協(xié)議聯(lián)調(diào)過(guò)程2022-08-08
Springbootadmin與security沖突問(wèn)題及解決
這篇文章主要介紹了Springbootadmin與security沖突問(wèn)題及解決方案,具有很好的參考價(jià)值,希望對(duì)大家有所幫助,如有錯(cuò)誤或未考慮完全的地方,望不吝賜教2024-08-08
Java解析http協(xié)議字符串的方法實(shí)現(xiàn)
本文主要介紹了Java解析http協(xié)議字符串的方法實(shí)現(xiàn),我們探討了如何使用Java解析HTTP協(xié)議字符串,并將其封裝成了一個(gè)HttpRequest類,具有一定的參考價(jià)值,感興趣的可以了解一下2023-09-09

