Spring Cloud Stream 高級特性使用詳解
重試
Consumer端可以配置重試次數(shù),當消息消費失敗的時候會進行重試。
底層使用Spring Retry去重試,重試次數(shù)可自定義配置。
# 默認重試次數(shù)為3,配置大于1時才會生效 spring.cloud.stream.bindings.<channelName>.consumer.maxAttempte=3
消息發(fā)送失敗的處理
Producer發(fā)送消息出錯的情況下,可以配置錯誤處理,將錯誤信息發(fā)送給對應(yīng)ID的MessageChannel
- 消息發(fā)送失敗的場景下,會將消息發(fā)送到一個
MessageChannel。這個MessageChannel會取ApplicationContext中name為topic.errors(topic就是配置的destination)的Bean。 - 如果找不到就會自動構(gòu)建一個
PublishSubscribeChannel。 - 然后使用
BridgeHandler訂閱這個MessageChannel,同時再設(shè)置ApplicationContext中name為errorChannel的PublishSubscribeChannel消息通道為BridgeHandler的outputChannel。
public static final String ERROR_CHANNEL_BEAN_NAME = "errorChannel"
private SubscribableChannel registerErrorInfrastructure(
ProducerDestination destination) {
// destination.getName() + ".errors"
String errorChannelName = errorsBaseName(destination);
SubscribableChannel errorChannel;
if (getApplicationContext().containsBean(errorChannelName)) {
Object errorChannelObject = getApplicationContext().getBean(errorChannelName);
if (!(errorChannelObject instanceof SubscribableChannel)) {
throw new IllegalStateException("Error channel '" + errorChannelName
+ "' must be a SubscribableChannel");
}
errorChannel = (SubscribableChannel) errorChannelObject;
}
else {
errorChannel = new PublishSubscribeChannel();
((GenericApplicationContext) getApplicationContext()).registerBean(
errorChannelName, SubscribableChannel.class, () -> errorChannel);
}
MessageChannel defaultErrorChannel = null;
if (getApplicationContext()
.containsBean(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)) {
defaultErrorChannel = getApplicationContext().getBean(
IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME,
MessageChannel.class);
}
if (defaultErrorChannel != null) {
BridgeHandler errorBridge = new BridgeHandler();
errorBridge.setOutputChannel(defaultErrorChannel);
errorChannel.subscribe(errorBridge);
String errorBridgeHandlerName = getErrorBridgeName(destination);
((GenericApplicationContext) getApplicationContext()).registerBean(
errorBridgeHandlerName, BridgeHandler.class, () -> errorBridge);
}
return errorChannel;
}
- 示例代碼
spring.cloud.stream.bindings.output.destination=test-output # 消息發(fā)送失敗的處理邏輯默認是關(guān)閉的 spring.cloud.stream.bindings.output.producer.errorChannelEnabled=true
@Bean("test-output.errors")
MessageChannel testOutputErrorChannel() {
return new PublishSubscribeChannel();
}
@Service
class ErrorProduceService {
@ServiceActivator(inputChannel = "test-output.errors")
public void receiveProduceError(Message receiveMsg) {
System.out.println("receive error msg: " + receiveMsg);
}
}
消費錯誤處理
Consumer消費消息出錯的情況下,可以配置錯誤處理,將錯誤信息發(fā)給對應(yīng)ID的MessageChannel
消息錯誤處理與生產(chǎn)錯誤處理大致相同。錯誤的MessageChannel對應(yīng)的name為topic.group.errors,還會加上多個MessageHandler訂閱的一些判斷,使用ErrorMessageStrategy創(chuàng)建錯誤消息等內(nèi)容。
- 示例代碼
spring.cloud.stream.bindings.input.destination=test-input spring.cloud.stream.bindings.input.group=test-input-group
@StreamListener(Sink.INPUT)
public void receive(String receiveMsg) {
throw new RuntimeException("Oops");
}
@ServiceActivator(inputChannel = "test-input.test-input-group.errors")
public void receiveConsumeError(Message receiveMsg) {
System.out.println("receive error msg: " + receiveMsg);
}
建議直接使用topic.group.errors這個消息通道,并設(shè)置發(fā)送到單播模式的DirectChannel消息通道中(使用@ServiceActivator注解接收會直接構(gòu)成DirectChannel),這樣會確保只會被唯一的一個訂閱了topic.group.errors的MessageHandler處理,否則可能會被多個MessageHandler處理,導致出現(xiàn)一些意想不到的結(jié)果。
自定義MessageHandler類型
默認情況下,Output Binding對應(yīng)的MessageChannel和Input Binding對應(yīng)的SubscribeChannel會被構(gòu)造成DirectChannel。
SCS提供了BindingTargetFactory接口進行擴展,比如可以擴展構(gòu)造PublishSubscribeChannel這種廣播類型的MessageChannel。
BindingTargetFactory接口只有兩個實現(xiàn)類
SubscribableChannelBindingTargetFactory:針對Input Binding和Output Binding都會構(gòu)造成DirectWithAttributesChannel類型的MessageChannel(一種帶有HashMap屬性的DirectChannel)。MessageSourceBindingTargetFactory:不支持Output Binding,Input Binding會構(gòu)造成DefaultPollableMessageSource。DefaultPollableMessageSource內(nèi)部維護著MessageSource屬性,該屬性用于拉取消息。
Endpoint端點
SCS提供了BindingsEndpoint,可以獲取Binding信息或?qū)?code>Binding生命周期進行修改,比如start、stop、pause或resume。
BindingsEndpoint的ID是bindings,對外暴露了一下3個操作:
- 修改
Binding狀態(tài),可以改成STARTED、STOPPED、PAUSED和RESUMED,對應(yīng)Binding接口的4個操作。 - 查詢單個
Binding的狀態(tài)信息。 - 查詢所有
Binding的狀態(tài)信息。
@Endpoint(id = "bindings")
public class BindingsEndpoint {
...
@WriteOperation
public void changeState(@Selector String name, State state) {
Binding<?> binding = BindingsEndpoint.this.locateBinding(name);
if (binding != null) {
switch (state) {
case STARTED:
binding.start();
break;
case STOPPED:
binding.stop();
break;
case PAUSED:
binding.pause();
break;
case RESUMED:
binding.resume();
break;
default:
break;
}
}
}
@ReadOperation
public List<?> queryStates() {
List<Binding<?>> bindings = new ArrayList<>(gatherInputBindings());
bindings.addAll(gatherOutputBindings());
return this.objectMapper.convertValue(bindings, List.class);
}
@ReadOperation
public Binding<?> queryState(@Selector String name) {
Assert.notNull(name, "'name' must not be null");
return this.locateBinding(name);
}
...
}
Metrics指標
該功能自動與micrometer集成進行Metrics統(tǒng)計,可以通過前綴spring.cloud.stream.metrics進行相關(guān)配置,配置項spring.cloud.stream.bindings.applicationMetrics.destination會構(gòu)造MetersPublisherBinding,將相關(guān)的metrics發(fā)送到MQ中。
Serverless
默認與Spring Cloud Function集成。
可以使用Function處理消息。配置文件需要加上function配置。
spring.cloud.stream.function.definition=uppercase | addprefix
@Bean
public Function<String, String> uppercase() {
return x -> x.toUpperCase();
}
@Bean
public Function<String, String> addprefix() {
return x -> "prefix-" + x;
}
Partition統(tǒng)一
SCS統(tǒng)一Partition相關(guān)的設(shè)置,可以屏蔽不同MQ Partition的設(shè)置。
Producer Binding提供的ProducerProperties提供了一些Partition相關(guān)的配置:
partitionKeyExpression:partition key提取表達式。partitionKeyExtractorName:是一個實現(xiàn)PartitionKeyExtractorStrategy接口的Bean name。PartitionKeyExtractorStrategy是一個根據(jù)Message獲取partition key的接口。如果兩者都配置,優(yōu)先級高于partitionKeyExtractorName。partitionSelectorName:是一個實現(xiàn)PartitionSelectorStrategy接口的Bean name。PartitionSelectorStrategy是一個根據(jù)partition key決定選擇哪個partition 的接口。partitionSelectorExpression:partition 選擇表達式,會根據(jù)表達式和partition key得到最終的partition。如果兩者都配置,優(yōu)先partitionSelectorExpression表達式解析partition。partitionCount:partition 個數(shù)。該屬性不一定會生效,Kafka Binder 和RocketMQ Binder會使用topic上的partition 個數(shù)覆蓋該屬性。
public final class PartitioningInterceptor implements ChannelInterceptor {
...
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
if (!message.getHeaders().containsKey(BinderHeaders.PARTITION_OVERRIDE)) {
int partition = this.partitionHandler.determinePartition(message);
return MessageConverterConfigurer.this.messageBuilderFactory
.fromMessage(message)
.setHeader(BinderHeaders.PARTITION_HEADER, partition).build();
}
else {
return MessageConverterConfigurer.this.messageBuilderFactory
.fromMessage(message)
.setHeader(BinderHeaders.PARTITION_HEADER,
message.getHeaders()
.get(BinderHeaders.PARTITION_OVERRIDE))
.removeHeader(BinderHeaders.PARTITION_OVERRIDE).build();
}
}
}
public class PartitionHandler {
...
public int determinePartition(Message<?> message) {
Object key = extractKey(message);
int partition;
if (this.producerProperties.getPartitionSelectorExpression() != null) {
partition = this.producerProperties.getPartitionSelectorExpression()
.getValue(this.evaluationContext, key, Integer.class);
}
else {
partition = this.partitionSelectorStrategy.selectPartition(key,
this.partitionCount);
}
// protection in case a user selector returns a negative.
return Math.abs(partition % this.partitionCount);
}
private Object extractKey(Message<?> message) {
Object key = invokeKeyExtractor(message);
if (key == null && this.producerProperties.getPartitionKeyExpression() != null) {
key = this.producerProperties.getPartitionKeyExpression()
.getValue(this.evaluationContext, message);
}
Assert.notNull(key, "Partition key cannot be null");
return key;
}
...
}
Polling Consumer
實現(xiàn)MessageSource進行polling操作的Consumer。
普通的Pub/Sub模式需要定義SubscribeableChannel類型的返回值,Polling Consumer需要定義PollableMessageSource類型的返回值。
public interface PollableSink {
/**
* Input channel name.
*/
String INPUT = "input";
/**
* @return input channel.
*/
@Input(Sink.INPUT)
PollableMessageSource input();
}
支持多個Binder同時使用
支持多個Binder同時使用,在配置Binding的時候需要指定對應(yīng)的Binder。
配置全局默認的Binder:spring.cloud.stream.default-binder=rocketmq。
配置各個Binder內(nèi)部的配置信息:
spring.cloud.stream.binders.rocketmq.environment.<xx>=xx
spring.cloud.stream.binders.rocketmq.type=rocketmq
配置Binding對應(yīng)的Binder:
spring.cloud.stream.bindings.<channelName>.binder=kafka
spring.cloud.stream.bindings.<channelName>.binder=rocketmq
spring.cloud.stream.bindings.<channelName>.binder=rabbit
建立事件機制
比如,新建BindingCreateEvent事件,用戶的應(yīng)用就可以監(jiān)聽該事件在創(chuàng)建Input Binding或Output Binding 時做業(yè)務(wù)相關(guān)的處理。
以上就是Spring Cloud Stream 高級特性使用詳解的詳細內(nèi)容,更多關(guān)于Spring Cloud Stream 高級特性的資料請關(guān)注腳本之家其它相關(guān)文章!
- SpringCloudStream原理和深入使用小結(jié)
- SpringCloud中的Stream服務(wù)間消息傳遞詳解
- 使用Spring?Cloud?Stream處理事件的示例詳解
- spring-cloud-stream的手動消息確認問題
- SpringCloudStream中的消息分區(qū)數(shù)詳解
- 關(guān)于SpringCloudStream配置問題
- SpringCloud微服務(wù)開發(fā)基于RocketMQ實現(xiàn)分布式事務(wù)管理詳解
- SpringCloud+RocketMQ實現(xiàn)分布式事務(wù)的實踐
- Spring Cloud Stream整合RocketMQ的搭建方法
相關(guān)文章
java多線程開發(fā)ScheduledExecutorService簡化方式
這篇文章主要為大家介紹了java多線程開發(fā)ScheduledExecutorService的簡化方式,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步2022-03-03
Java 集合實現(xiàn)分頁的方法(業(yè)務(wù)代碼實現(xiàn)分頁)
在Java開發(fā)中,有些場景比較復雜,受限制,不好在sql查詢層面實現(xiàn)分頁,需要在查詢的list結(jié)果后,將list分頁返回,如何實現(xiàn)呢,帶著這個問題一起通過本文學習吧2025-02-02
JDK1.8使用的垃圾回收器和執(zhí)行GC的時長以及GC的頻率方式
這篇文章主要介紹了JDK1.8使用的垃圾回收器和執(zhí)行GC的時長以及GC的頻率方式,具有很好的參考價值,希望對大家有所幫助。如有錯誤或未考慮完全的地方,望不吝賜教2023-05-05
Java concurrency之Condition條件_動力節(jié)點Java學院整理
Condition的作用是對鎖進行更精確的控制。下面通過本文給大家分享Java concurrency之Condition條件的相關(guān)知識,非常不錯,具有參考借鑒價值,需要的朋友參考下吧2017-06-06
Java的四種常見線程池及Scheduled定時線程池實現(xiàn)詳解
這篇文章主要介紹了Java的四種常見線程池及Scheduled定時線程池實現(xiàn)詳解,在Java中,我們可以通過Executors類來創(chuàng)建ScheduledThreadPool,Executors類提供了幾個靜態(tài)方法來創(chuàng)建不同類型的線程池,包括ScheduledThreadPool,需要的朋友可以參考下2023-09-09
關(guān)于ApplicationContext的啟動流程詳解
ApplicationContext是Spring框架中用于管理和配置Bean的核心接口,它的啟動流程包括準備刷新、獲取BeanFactory、準備BeanFactory、后置處理BeanFactory、調(diào)用BeanFactoryPostProcessor、注冊BeanPostProcessor2025-03-03

