Java多線程之Disruptor入門(mén)
一、Disruptor簡(jiǎn)介
Disruptor目前是世界上最快的單機(jī)消息隊(duì)列,由英國(guó)外匯交易公司LMAX開(kāi)發(fā),研發(fā)的初衷是解決內(nèi)存隊(duì)列的延遲問(wèn)題(在性能測(cè)試中發(fā)現(xiàn)竟然與I/O操作處于同樣的數(shù)量級(jí))。基于Disruptor開(kāi)發(fā)的系統(tǒng)單線程能支撐每秒600萬(wàn)訂單,2010年在QCon演講后,獲得了業(yè)界關(guān)注。2011年,企業(yè)應(yīng)用軟件專(zhuān)家Martin Fowler專(zhuān)門(mén)撰寫(xiě)長(zhǎng)文介紹。同年它還獲得了Oracle官方的Duke大獎(jiǎng)。目前,包括Apache Storm、Camel、Log4j 2在內(nèi)的很多知名項(xiàng)目都應(yīng)用了Disruptor以獲取高性能。
二、淺聊Disruptor的核心

Disruptor維護(hù)了一個(gè)環(huán)形隊(duì)列RingBuffer,這個(gè)隊(duì)列本質(zhì)上是一個(gè)首位相連的數(shù)組。相比于LinkedBlockdingQueue,RingBuffer的數(shù)組結(jié)構(gòu)在查找方面效率更高。此外,LinkedBlockingQueue需要維護(hù)一個(gè)頭節(jié)點(diǎn)指針head和一個(gè)尾節(jié)點(diǎn)指針tail,而RingBuffer只需要維護(hù)一個(gè)sequence指向下一個(gè)可用的位置即可。所以從這兩點(diǎn)來(lái)說(shuō),RingBuffer比LinkedBlockingQueue要快。
三、Disruptor使用
3.1 pom.xml
<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.4.3</version>
</dependency>
3.2 事件Event
Disruptor是基于事件的生產(chǎn)者消費(fèi)者模型。其RingBuffer中存放的其實(shí)是將消息封裝成的事件。這里定義了一個(gè)LongEvent,表示消息隊(duì)列中存放的是long類(lèi)型的數(shù)據(jù)。
public class LongEvent {
private long value;
public void set(long value) {
this.value = value;
}
@Override
public String toString() {
return "LongEvent{" +
"value=" + value +
'}';
}
}
3.3 EventFactory
實(shí)現(xiàn)EventFactory接口,定義Event工廠,用于填充隊(duì)列。Event工廠其實(shí)是為了提高Disruptor的效率,初始化的時(shí)候,會(huì)調(diào)用Event工廠,對(duì)RingBuffer進(jìn)行內(nèi)存的提前分配,GC的頻率會(huì)降低。
import com.lmax.disruptor.EventFactory;
public class LongEventFactory implements EventFactory<LongEvent> {
public LongEvent newInstance() {
return new LongEvent();
}
}
3.4 EventHandler
實(shí)現(xiàn)EventHandler接口,定義EventHandler(消費(fèi)者),處理容器中的元素。
import com.lmax.disruptor.EventHandler;
public class LongEventHandler implements EventHandler<LongEvent> {
public void onEvent(LongEvent event, long sequence, boolean endOfBatch) {
System.out.println("Event: " + event + ", sequence: " + sequence);
}
}
3.5 使用Disruptor原始API發(fā)布消息
import cn.flying.space.disruptor.demo.LongEvent;
import com.lmax.disruptor.RingBuffer;
import java.nio.ByteBuffer;
/**
* 定義一個(gè)生產(chǎn)者,往Disruptor中投遞消息
*/
public class LongEventProducer {
private RingBuffer<LongEvent> ringBuffer;
public LongEventProducer(RingBuffer<LongEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
public void onData(ByteBuffer byteBuffer) {
// 定位到下一個(gè)可存放的位置
long sequence = ringBuffer.next();
try {
// 拿到該位置的event
LongEvent event = ringBuffer.get(sequence);
// 設(shè)置event的值
event.set(byteBuffer.getLong(0));
} finally {
// 發(fā)布
ringBuffer.publish(sequence);
}
}
}
import cn.flying.space.disruptor.demo.LongEvent;
import cn.flying.space.disruptor.demo.LongEventFactory;
import cn.flying.space.disruptor.demo.LongEventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import java.nio.ByteBuffer;
import java.util.concurrent.Executors;
public class TestMain {
public static void main(String[] args) throws InterruptedException {
// 定義event工廠
LongEventFactory factory = new LongEventFactory();
// ringBuffer長(zhǎng)度
int bufferSize = 1024;
// 構(gòu)造一個(gè)Disruptor
Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, Executors.defaultThreadFactory());
// 綁定handler
disruptor.handleEventsWith(new LongEventHandler());
// 啟動(dòng)Disruptor
disruptor.start();
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
LongEventProducer producer = new LongEventProducer(ringBuffer);
ByteBuffer byteBuffer = ByteBuffer.allocate(8);
for (long i = 0; true; i++) {
byteBuffer.clear();
byteBuffer.putLong(i);
// 投遞消息
producer.onData(byteBuffer);
Thread.sleep(1000);
}
}
}
3.6 使用Translators發(fā)布消息
import cn.flying.space.disruptor.demo.LongEvent;
import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.RingBuffer;
import java.nio.ByteBuffer;
public class LongEventProducerUsingTranslator {
private RingBuffer<LongEvent> ringBuffer;
public LongEventProducerUsingTranslator(RingBuffer<LongEvent> ringBuffer) {
this.ringBuffer = ringBuffer;
}
private static final EventTranslatorOneArg<LongEvent, ByteBuffer> TRANSLATOR = new EventTranslatorOneArg<LongEvent, ByteBuffer>() {
@Override
public void translateTo(LongEvent longEvent, long l, ByteBuffer byteBuffer) {
longEvent.set(byteBuffer.getLong(0));
}
};
public void onData(ByteBuffer byteBuffer) {
ringBuffer.publishEvent(TRANSLATOR, byteBuffer);
}
}
import cn.flying.space.disruptor.demo.LongEvent;
import cn.flying.space.disruptor.demo.LongEventFactory;
import cn.flying.space.disruptor.demo.LongEventHandler;
import com.lmax.disruptor.RingBuffer;
import com.lmax.disruptor.dsl.Disruptor;
import com.lmax.disruptor.util.DaemonThreadFactory;
import java.nio.ByteBuffer;
/**
* @author ZhangSheng
* @date 2021-4-26 14:23
*/
public class TestMain {
public static void main(String[] args) throws InterruptedException {
LongEventFactory factory = new LongEventFactory();
int bufferSize = 1024;
Disruptor<LongEvent> disruptor = new Disruptor<>(factory, bufferSize, DaemonThreadFactory.INSTANCE);
disruptor.handleEventsWith(new LongEventHandler());
disruptor.start();
RingBuffer<LongEvent> ringBuffer = disruptor.getRingBuffer();
LongEventProducerUsingTranslator producer = new LongEventProducerUsingTranslator(ringBuffer);
ByteBuffer byteBuffer = ByteBuffer.allocate(8);
for (long i = 0L; true; i++) {
byteBuffer.putLong(0, i);
// 發(fā)布
producer.onData(byteBuffer);
Thread.sleep(1000);
}
}
}
到此這篇關(guān)于Java多線程之Disruptor入門(mén)的文章就介紹到這了,更多相關(guān)Java Disruptor入門(mén)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Java多線程(單例模式,阻塞隊(duì)列,定時(shí)器,線程池)詳解
本文是多線程初級(jí)入門(mén),主要介紹了多線程單例模式、阻塞隊(duì)列、定時(shí)器、線程池、多線程面試考點(diǎn),感興趣的小伙伴可以跟隨小編一起了解一下2022-09-09
Caused by: java.lang.ClassNotFoundException: org.objectweb.a
這篇文章主要介紹了Caused by: java.lang.ClassNotFoundException: org.objectweb.asm.Type異常,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-07-07
Java虛擬機(jī)JVM性能優(yōu)化(三):垃圾收集詳解
這篇文章主要介紹了Java虛擬機(jī)JVM性能優(yōu)化(三):垃圾收集詳解,本文講解了眾多的JVM垃圾收集器知識(shí)點(diǎn),需要的朋友可以參考下2014-09-09
Intellij IDEA導(dǎo)入JAVA項(xiàng)目并啟動(dòng)(圖文教程)
這篇文章主要介紹了Intellij IDEA導(dǎo)入JAVA項(xiàng)目并啟動(dòng),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2020-08-08
Java 8實(shí)現(xiàn)任意參數(shù)的單鏈表
這篇文章主要為大家詳細(xì)介紹了Java 8實(shí)現(xiàn)任意參數(shù)的單鏈表,文中示例代碼介紹的非常詳細(xì),具有一定的參考價(jià)值,感興趣的小伙伴們可以參考一下2020-10-10
SpringBoot+Logback實(shí)現(xiàn)一個(gè)簡(jiǎn)單的鏈路追蹤功能
Spring Boot默認(rèn)使用LogBack日志系統(tǒng),并且已經(jīng)引入了相關(guān)的jar包,所以我們無(wú)需任何配置便可以使用LogBack打印日志。這篇文章主要介紹了SpringBoot+Logback實(shí)現(xiàn)一個(gè)簡(jiǎn)單的鏈路追蹤功能,需要的朋友可以參考下2019-10-10
MyBatis攔截器如何自動(dòng)設(shè)置創(chuàng)建時(shí)間和修改時(shí)間
文章介紹了如何通過(guò)實(shí)現(xiàn)MyBatis的Interceptor接口,在實(shí)體類(lèi)中自動(dòng)設(shè)置創(chuàng)建時(shí)間和修改時(shí)間,從而提高開(kāi)發(fā)效率2025-02-02
Mybatis plus中使用in查詢(xún)出錯(cuò)如何解決
這篇文章主要介紹了Mybatis plus中使用in查詢(xún)出錯(cuò)的問(wèn)題及解決方法,本文給大家介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或工作具有一定的參考借鑒價(jià)值,需要的朋友可以參考下2020-08-08

