spring與disruptor集成的簡單示例
disruptor不過多介紹了,描述下當(dāng)前的業(yè)務(wù)場景,兩個應(yīng)用A,B,應(yīng)用 A 向應(yīng)用 B 傳遞數(shù)據(jù) . 數(shù)據(jù)傳送比較快,如果用http直接push數(shù)據(jù)然后入庫,效率不高.有可能導(dǎo)致A應(yīng)用比較大的壓力. 使用mq 太重量級,所以選擇了disruptor. 也可以使用Reactor
BaseQueueHelper.java
/**
* lmax.disruptor 高效隊列處理模板. 支持初始隊列,即在init()前進(jìn)行發(fā)布。
*
* 調(diào)用init()時才真正啟動線程開始處理 系統(tǒng)退出自動清理資源.
*
* @author xielongwang
* @create 2018-01-18 下午3:49
* @email xielong.wang@nvr-china.com
* @description
*/
public abstract class BaseQueueHelper<D, E extends ValueWrapper<D>, H extends WorkHandler<E>> {
/**
* 記錄所有的隊列,系統(tǒng)退出時統(tǒng)一清理資源
*/
private static List<BaseQueueHelper> queueHelperList = new ArrayList<BaseQueueHelper>();
/**
* Disruptor 對象
*/
private Disruptor<E> disruptor;
/**
* RingBuffer
*/
private RingBuffer<E> ringBuffer;
/**
* initQueue
*/
private List<D> initQueue = new ArrayList<D>();
/**
* 隊列大小
*
* @return 隊列長度,必須是2的冪
*/
protected abstract int getQueueSize();
/**
* 事件工廠
*
* @return EventFactory
*/
protected abstract EventFactory<E> eventFactory();
/**
* 事件消費者
*
* @return WorkHandler[]
*/
protected abstract WorkHandler[] getHandler();
/**
* 初始化
*/
public void init() {
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("DisruptorThreadPool").build();
disruptor = new Disruptor<E>(eventFactory(), getQueueSize(), namedThreadFactory, ProducerType.SINGLE, getStrategy());
disruptor.setDefaultExceptionHandler(new MyHandlerException());
disruptor.handleEventsWithWorkerPool(getHandler());
ringBuffer = disruptor.start();
//初始化數(shù)據(jù)發(fā)布
for (D data : initQueue) {
ringBuffer.publishEvent(new EventTranslatorOneArg<E, D>() {
@Override
public void translateTo(E event, long sequence, D data) {
event.setValue(data);
}
}, data);
}
//加入資源清理鉤子
synchronized (queueHelperList) {
if (queueHelperList.isEmpty()) {
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
for (BaseQueueHelper baseQueueHelper : queueHelperList) {
baseQueueHelper.shutdown();
}
}
});
}
queueHelperList.add(this);
}
}
/**
* 如果要改變線程執(zhí)行優(yōu)先級,override此策略. YieldingWaitStrategy會提高響應(yīng)并在閑時占用70%以上CPU,
* 慎用SleepingWaitStrategy會降低響應(yīng)更減少CPU占用,用于日志等場景.
*
* @return WaitStrategy
*/
protected abstract WaitStrategy getStrategy();
/**
* 插入隊列消息,支持在對象init前插入隊列,則在隊列建立時立即發(fā)布到隊列處理.
*/
public synchronized void publishEvent(D data) {
if (ringBuffer == null) {
initQueue.add(data);
return;
}
ringBuffer.publishEvent(new EventTranslatorOneArg<E, D>() {
@Override
public void translateTo(E event, long sequence, D data) {
event.setValue(data);
}
}, data);
}
/**
* 關(guān)閉隊列
*/
public void shutdown() {
disruptor.shutdown();
}
}
EventFactory.java
/**
* @author xielongwang
* @create 2018-01-18 下午6:24
* @email xielong.wang@nvr-china.com
* @description
*/
public class EventFactory implements com.lmax.disruptor.EventFactory<SeriesDataEvent> {
@Override
public SeriesDataEvent newInstance() {
return new SeriesDataEvent();
}
}
MyHandlerException.java
public class MyHandlerException implements ExceptionHandler {
private Logger logger = LoggerFactory.getLogger(MyHandlerException.class);
/*
* (non-Javadoc) 運行過程中發(fā)生時的異常
*
* @see
* com.lmax.disruptor.ExceptionHandler#handleEventException(java.lang.Throwable
* , long, java.lang.Object)
*/
@Override
public void handleEventException(Throwable ex, long sequence, Object event) {
ex.printStackTrace();
logger.error("process data error sequence ==[{}] event==[{}] ,ex ==[{}]", sequence, event.toString(), ex.getMessage());
}
/*
* (non-Javadoc) 啟動時的異常
*
* @see
* com.lmax.disruptor.ExceptionHandler#handleOnStartException(java.lang.
* Throwable)
*/
@Override
public void handleOnStartException(Throwable ex) {
logger.error("start disruptor error ==[{}]!", ex.getMessage());
}
/*
* (non-Javadoc) 關(guān)閉時的異常
*
* @see
* com.lmax.disruptor.ExceptionHandler#handleOnShutdownException(java.lang
* .Throwable)
*/
@Override
public void handleOnShutdownException(Throwable ex) {
logger.error("shutdown disruptor error ==[{}]!", ex.getMessage());
}
}
SeriesData.java (代表應(yīng)用A發(fā)送給應(yīng)用B的消息)
public class SeriesData {
private String deviceInfoStr;
public SeriesData() {
}
public SeriesData(String deviceInfoStr) {
this.deviceInfoStr = deviceInfoStr;
}
public String getDeviceInfoStr() {
return deviceInfoStr;
}
public void setDeviceInfoStr(String deviceInfoStr) {
this.deviceInfoStr = deviceInfoStr;
}
@Override
public String toString() {
return "SeriesData{" +
"deviceInfoStr='" + deviceInfoStr + '\'' +
'}';
}
}
SeriesDataEvent.java
public class SeriesDataEvent extends ValueWrapper<SeriesData> {
}
SeriesDataEventHandler.java
public class SeriesDataEventHandler implements WorkHandler<SeriesDataEvent> {
private Logger logger = LoggerFactory.getLogger(SeriesDataEventHandler.class);
@Autowired
private DeviceInfoService deviceInfoService;
@Override
public void onEvent(SeriesDataEvent event) {
if (event.getValue() == null || StringUtils.isEmpty(event.getValue().getDeviceInfoStr())) {
logger.warn("receiver series data is empty!");
}
//業(yè)務(wù)處理
deviceInfoService.processData(event.getValue().getDeviceInfoStr());
}
}
SeriesDataEventQueueHelper.java
@Component
public class SeriesDataEventQueueHelper extends BaseQueueHelper<SeriesData, SeriesDataEvent, SeriesDataEventHandler> implements InitializingBean {
private static final int QUEUE_SIZE = 1024;
@Autowired
private List<SeriesDataEventHandler> seriesDataEventHandler;
@Override
protected int getQueueSize() {
return QUEUE_SIZE;
}
@Override
protected com.lmax.disruptor.EventFactory eventFactory() {
return new EventFactory();
}
@Override
protected WorkHandler[] getHandler() {
int size = seriesDataEventHandler.size();
SeriesDataEventHandler[] paramEventHandlers = (SeriesDataEventHandler[]) seriesDataEventHandler.toArray(new SeriesDataEventHandler[size]);
return paramEventHandlers;
}
@Override
protected WaitStrategy getStrategy() {
return new BlockingWaitStrategy();
//return new YieldingWaitStrategy();
}
@Override
public void afterPropertiesSet() throws Exception {
this.init();
}
}
ValueWrapper.java
public abstract class ValueWrapper<T> {
private T value;
public ValueWrapper() {}
public ValueWrapper(T value) {
this.value = value;
}
public T getValue() {
return value;
}
public void setValue(T value) {
this.value = value;
}
}
DisruptorConfig.java
@Configuration
@ComponentScan(value = {"com.portal.disruptor"})
//多實例幾個消費者
public class DisruptorConfig {
/**
* smsParamEventHandler1
*
* @return SeriesDataEventHandler
*/
@Bean
public SeriesDataEventHandler smsParamEventHandler1() {
return new SeriesDataEventHandler();
}
/**
* smsParamEventHandler2
*
* @return SeriesDataEventHandler
*/
@Bean
public SeriesDataEventHandler smsParamEventHandler2() {
return new SeriesDataEventHandler();
}
/**
* smsParamEventHandler3
*
* @return SeriesDataEventHandler
*/
@Bean
public SeriesDataEventHandler smsParamEventHandler3() {
return new SeriesDataEventHandler();
}
/**
* smsParamEventHandler4
*
* @return SeriesDataEventHandler
*/
@Bean
public SeriesDataEventHandler smsParamEventHandler4() {
return new SeriesDataEventHandler();
}
/**
* smsParamEventHandler5
*
* @return SeriesDataEventHandler
*/
@Bean
public SeriesDataEventHandler smsParamEventHandler5() {
return new SeriesDataEventHandler();
}
}
測試
//注入SeriesDataEventQueueHelper消息生產(chǎn)者
@Autowired
private SeriesDataEventQueueHelper seriesDataEventQueueHelper;
@RequestMapping(value = "/data", method = RequestMethod.POST, produces = MediaType.APPLICATION_JSON_VALUE)
public DataResponseVo<String> receiverDeviceData(@RequestBody String deviceData) {
long startTime1 = System.currentTimeMillis();
if (StringUtils.isEmpty(deviceData)) {
logger.info("receiver data is empty !");
return new DataResponseVo<String>(400, "failed");
}
seriesDataEventQueueHelper.publishEvent(new SeriesData(deviceData));
long startTime2 = System.currentTimeMillis();
logger.info("receiver data ==[{}] millisecond ==[{}]", deviceData, startTime2 - startTime1);
return new DataResponseVo<String>(200, "success");
}
應(yīng)用A通過/data 接口把數(shù)據(jù)發(fā)送到應(yīng)用B ,然后通過seriesDataEventQueueHelper 把消息發(fā)給disruptor隊列,消費者去消費,整個過程對不會堵塞應(yīng)用A. 可接受消息丟失, 可以通過擴展SeriesDataEventQueueHelper來達(dá)到對disruptor隊列的監(jiān)控
以上就是本文的全部內(nèi)容,希望對大家的學(xué)習(xí)有所幫助,也希望大家多多支持腳本之家。
相關(guān)文章
Springboot之@Async不執(zhí)行原因及分析
這篇文章主要介紹了Springboot之@Async不執(zhí)行原因及分析,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-09-09
通過springboot+mybatis+druid配置動態(tài)數(shù)據(jù)源
這篇文章主要介紹了通過springboot+mybatis+druid配置動態(tài)數(shù)據(jù)源,文中通過示例代碼介紹的非常詳細(xì),對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,,需要的朋友可以參考下2019-06-06
Java泛型T,E,K,V,N,?與Object區(qū)別和含義
Java?泛型(generics)是?JDK?5?中引入的一個新特性,?泛型提供了編譯時類型安全檢測機制,該機制允許程序員在編譯時檢測到非法的類型。本文將詳細(xì)講講Java泛型T、E、K、V、N、?和Object區(qū)別和含義,需要發(fā)可以參考一下2022-03-03
Java?SpringBoot整合shiro-spring-boot-starterqi項目報錯解決
這篇文章主要介紹了Java?SpringBoot整合shiro-spring-boot-starterqi項目報錯解決,文章圍繞主題展開詳細(xì)的內(nèi)容介紹,具有一定的參考一下2022-08-08
Java基礎(chǔ)學(xué)習(xí)之關(guān)鍵字和變量數(shù)據(jù)類型的那些事
變量就是系統(tǒng)為程序分配的一塊內(nèi)存單元,用來存儲各種類型的數(shù)據(jù),下面這篇文章主要給大家介紹了關(guān)于Java基礎(chǔ)學(xué)習(xí)之關(guān)鍵字和變量數(shù)據(jù)類型的那些事,文中通過實例代碼介紹的非常詳細(xì),需要的朋友可以參考下2022-07-07

