基于Java實現(xiàn)Actor模型
Actor模型是一種常見的并發(fā)模型,與最常見的并發(fā)模型——共享內(nèi)存(同步鎖)不同,它將程序分為許多獨立的計算單元——Actor,每個Actor獨立管理自己的資源,不同Actor之間通過消息傳遞來交互。它的好處是全異步執(zhí)行,不會造成線程阻塞,從而提升CPU使用率,另外由于線程之間是異步交互,所以也不用考慮加鎖和線程同步的問題。
Actor模型在業(yè)界有許多應(yīng)用,例如游戲服務(wù)器框架Skynet、編程語言Erlang。
因為歷史原因,Java下的Actor模型應(yīng)用較少,知名的只有基于Scala的Akka。而且Actor模型也不是萬能的,異步編程會需要編寫更多的回調(diào)代碼,原本的一步需要拆分成若干步來處理,無疑增加了代碼編寫復(fù)雜度(callback hell)。
本文以學(xué)習(xí)和研究為目的,使用Java實現(xiàn)一個簡單Actor模型,功能上模仿Skynet,支持的功能包括:
- Actor基礎(chǔ)功能:消息發(fā)送接收、異步處理等。
- 集群功能:支持多節(jié)點之間通信。
- 非阻塞的sleep和網(wǎng)絡(luò)通信。
完整的源代碼在可以在Github獲取。以下是部分關(guān)鍵代碼以及設(shè)計思路講解。
Actor
Actor是Actor模型中的核心概念,每個Actor獨立管理自己的資源,與其他Actor之間通信通過Message。
這里的每個Actor由單線程驅(qū)動,相當于Skynet中的服務(wù)。Actor不斷從mailbox中獲取尚未處理的Message,mailbox使用的結(jié)構(gòu)是無界阻塞的LinkedBlockingQueue。
Actor類是抽象類,其中處理消息的handleMessage方法為抽象方法,需要每個具體類來重載實現(xiàn)。
public abstract class Actor {
private Node node;
private String name;
private final BlockingQueue<Message> mailbox = new LinkedBlockingQueue<>();
private Thread actorThread;
public Node getNode() {
return node;
}
public void setNode(Node node) {
this.node = node;
}
public void setName(String name) {
this.name = name;
}
public String getName() {
return name;
}
public void start() {
actorThread = new Thread(() -> {
ActorSystem.setThreadLocalActor(this);
for(;;) {
try {
Message message = mailbox.take();
try {
handleMessage(message);
} catch (Exception e) {
e.printStackTrace();
}
} catch (InterruptedException ignore) {
// ignore
}
}
});
actorThread.start();
}
public void act(Message msg) {
mailbox.offer(msg);
}
protected abstract void handleMessage(Message message);
}
Node
Node代表節(jié)點,與Skynet中節(jié)點意義相同。它是一個獨立的Java進程,有自己的IP和端口,Node之間通過異步的網(wǎng)絡(luò)通信發(fā)送和接收消息。一個Node中可以運行多個Actor,一個Actor僅可與一個Node綁定。
Node的唯一標識也是它的name,與Actor的name稍有不同,Node的name是全局唯一,而Actor的name是Node內(nèi)唯一。
public abstract class Node {
/**
* 名字
* 需要是唯一的,按名字查找
*/
private String name;
private InetSocketAddress address;
public String getName() {
return name;
}
public void setName(String nodeName) {
name = nodeName;
}
public void setAddress(InetSocketAddress address) {
this.address = address;
}
}
ActorSystem
ActorSystem是Actor的管理系統(tǒng),也是外部調(diào)用API的主要入口,提供本框架中的主要功能:創(chuàng)建Actor、發(fā)送消息、休眠Actor、網(wǎng)絡(luò)通信等。下面分別詳細說明。
ActorSystem初始化
分為以下三步:
首先是調(diào)用conf方法讀取集群配置,包括每個Node的name和address。
其次是調(diào)用bindNode方法綁定當前Node。
最后是調(diào)用start方法初始化自身,包括對定時器的初始化和Netty服務(wù)端的初始化。之所以引入定時器,是因為無阻塞sleep需要用到,這個具體后面再說,另外也可以用于擴展實現(xiàn)通用的定時任務(wù)功能。Node之間發(fā)送消息都是異步的,客戶端和服務(wù)端都使用了Netty做異步網(wǎng)絡(luò)通信。
public class ActorSystem {
private static Map<String, InetSocketAddress> clusterConfig;
/**
* 當前綁定到的節(jié)點
*/
private static Node currNode;
private final static Map<String, Actor> actors = new HashMap<>();
/**
* 維護線程與Actor的對應(yīng)關(guān)系
*/
private final static ThreadLocal<Actor> currThreadActor = new ThreadLocal<>();
/**
* 客戶端Netty bootstrap
*/
private static Bootstrap clientBootstrap;
/**
* 維護節(jié)點與通道的對應(yīng)關(guān)系
*/
private final static Map<String, Channel> channels = new ConcurrentHashMap<>();
private static void startNettyBootstrap() {
try {
// 先啟動服務(wù)端bootstrap
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 100)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)))
.addLast(new ObjectEncoder())
.addLast(new ServerHandler());
}
});
InetSocketAddress address = clusterConfig.get(currNode.getName());
b.bind(address).sync();
// 再啟動客戶端bootstrap
EventLoopGroup group = new NioEventLoopGroup();
clientBootstrap = new Bootstrap();
clientBootstrap.group(group)
.channel(NioSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new ObjectDecoder(ClassResolvers.cacheDisabled(null)))
.addLast(new ObjectEncoder())
.addLast(new ClientHandler());
}
});
} catch (Exception e) {
throw new RuntimeException("actor system start fail", e);
}
}
public static void start() {
// 啟動定時器
Timer.start();
// 啟動Netty bootstrap
startNettyBootstrap();
}
public static void conf(Map<String, InetSocketAddress> config) {
clusterConfig = config;
}
/**
* 將當前系統(tǒng)綁定到某個節(jié)點
*/
public static void bindNode(Class<? extends Node> nodeClass, String nodeName) {
InetSocketAddress address = clusterConfig.get(nodeName);
try {
Constructor<? extends Node> constructor = nodeClass.getDeclaredConstructor();
Node node = constructor.newInstance();
node.setName(nodeName);
currNode = node;
} catch (Exception e) {
throw new RuntimeException("create node fail", e);
}
}
創(chuàng)建Actor
創(chuàng)建Actor調(diào)用newActor方法,指定要創(chuàng)建的Actor具體類和Actor name,Actor name需要Node內(nèi)部唯一。
創(chuàng)建Actor時,先綁定當前Node,再調(diào)用Actor的start方法初始化,然后將name與Actor的映射關(guān)系加入到actors中。
/**
* 啟動新的Actor
*/
public static void newActor(Class<? extends Actor> actorClass, String name) {
try {
Constructor<? extends Actor> constructor = actorClass.getDeclaredConstructor();
Actor actor = constructor.newInstance();
actor.setName(name);
actor.setNode(currNode);
actor.start();
actors.put(name, actor);
} catch (Exception e) {
throw new RuntimeException("create actor fail", e);
}
}
}
發(fā)送消息
核心是send方法,指定目標Node name、目標Actor name、命令名和參數(shù)后發(fā)送消息,也可以把這些信息包裝在Message中發(fā)出。
消息的來源Node和來源Actor保存在一個ThreadLocal變量currThreadActor中。它的作用是在Actor創(chuàng)建時,將Actor線程與Actor綁定在一起,這樣當調(diào)用send方法發(fā)送消息時,無需再顯式指定來源Node和來源Actor,因為如果是Actor線程本身調(diào)用的send方法,那么直接從currThreadActor中取值即可;否則取不到值,那么來源Node和來源Actor都是null。
如果消息的目標Node與來源Node相同,那么直接找到對應(yīng)的Actor添加消息即可;否則,需要走網(wǎng)絡(luò)通信。這里的網(wǎng)絡(luò)通信實際上就是一個簡單的RPC通信,此處使用了Netty的ObjectEncoder和ObjectDecoder做消息的序列化和反序列化(注意:ObjectEncoder和ObjectDecoder在Netty的最新版本中已被廢棄,因為Java序列化具有很大的安全隱患,這里仍然使用它們僅是為了演示方便)。
當走網(wǎng)絡(luò)通信發(fā)送消息時,先判斷到目標Node的Channel是否有效,若是,則直接發(fā)送消息;否則,先重新創(chuàng)建好Channel,再異步發(fā)送。這里實際上會有一個多線程同步的問題,就是多個線程同時嘗試創(chuàng)建Channel,那么后面創(chuàng)建的Channel會把前面的覆蓋掉,最后只會保留最后創(chuàng)建的一個。優(yōu)化方法有兩種:一是允許多個線程同時嘗試創(chuàng)建Channel,但是當創(chuàng)建Channel成功時,如果發(fā)現(xiàn)已經(jīng)有創(chuàng)建好的Channel引用了(來自別的線程創(chuàng)建),那么不保留這次創(chuàng)建的Channel,發(fā)送也通過已有的Channel引用;二是每次嘗試創(chuàng)建Channel時都禁止別的線程做同樣的操作。兩種優(yōu)化方法各有優(yōu)劣,限于時間,這里沒有用優(yōu)化方法做具體實現(xiàn)。
public static void send(Message msg) {
String destNodeName = msg.getDestNode();
String destActorName = msg.getDestActor();
if (destNodeName.equals(currNode.getName())) {
Actor destActor = actors.get(destActorName);
destActor.act(msg);
} else {
sendToAnotherNode(msg);
}
}
private static void sendToAnotherNode(Message msg) {
try {
String destNodeName = msg.getDestNode();
// 如果沒有連接,那么先建立連接
Channel channel = getChannel(destNodeName);
if (!isChannelValid(channel)) {
InetSocketAddress address = clusterConfig.get(destNodeName);
// TODO 有可能出現(xiàn)多線程同時嘗試建立連接的情況,這里會保留最后一個
// 優(yōu)化方法有兩種:
// 1. 允許多次嘗試,當建立連接成功后,如果已有成功連接的引用,那么不保留這次創(chuàng)建的連接
// 2. 嘗試時阻塞其他嘗試
clientBootstrap.connect(address).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
setChannel(destNodeName, future.channel());
future.channel().writeAndFlush(msg);
}
});
} else {
// 否則直接發(fā)送消息
channel.writeAndFlush(msg);
}
} catch (Exception e) {
throw new RuntimeException("send to another node fail");
}
}
public static void send(String destNodeName, String destActorName, String command, Object... params) {
Actor srcActor = currThreadActor.get();
String srcActorName = srcActor == null ? null : srcActor.getName();
String srcNodeName = srcActor == null ? null : srcActor.getNode().getName();
Message msg = new Message(command, srcNodeName, srcActorName, destNodeName, destActorName, params);
send(msg);
}
public static boolean isChannelValid(Channel channel) {
return channel != null && channel.isActive() && channel.isWritable();
}
public static Channel getChannel(String destNodeName) {
return channels.get(destNodeName);
}
public static void setChannel(String destNodeName, Channel channel) {
channels.put(destNodeName, channel);
}
/**
* Actor發(fā)送給自己
*/
public static void sendSelf(String command, Object... params) {
Actor selfActor = currThreadActor.get();
if (selfActor == null) {
throw new RuntimeException("not in an actor, send fail");
}
send(selfActor.getNode().getName(), selfActor.getName(), command, params);
}
public static void setThreadLocalActor(Actor actor) {
currThreadActor.set(actor);
}
休眠Actor
休眠Actor調(diào)用sleep方法實現(xiàn),它制定了需要休眠的毫秒數(shù),休眠完后回調(diào)的命令及參數(shù)。
sleep方法對應(yīng)于Skynet中的skynet.sleep,它們都是阻塞任務(wù)但是不阻塞線程。不同的是,skynet.sleep使用了Lua的協(xié)程yield/resume,在實現(xiàn)上更加優(yōu)雅,對用戶是透明的,用戶無需指定回調(diào)函數(shù),就能在sleep到期時自動切換回當前任務(wù)繼續(xù)執(zhí)行。而Java沒有這種特性,所以此處乞丐版的實現(xiàn)需要指定回調(diào)方法。
這里的sleep方法和skynet.sleep一樣,底層都是通過定時任務(wù)來實現(xiàn)。具體來說,sleep調(diào)用后會添加一個TimerTask,封裝了過期時間和回調(diào)命令及參數(shù),待任務(wù)到期后將命令封裝成Message發(fā)送給當前Actor自身。
public static void sleep(long millis, String command, Object... params) {
String destActorName = currThreadActor.get().getName();
Timer.addTimeTask(new TimerTask(System.currentTimeMillis() + millis, () -> {
ActorSystem.send(currNode.getName(), destActorName, command, params);
}));
}
定時器
上面說到sleep方法依賴定時器的實現(xiàn)。定時器在Timer類中實現(xiàn),它在start方法中啟動一個線程不斷輪詢處理定時任務(wù),并提供了addTimeTask方法添加新的定時任務(wù)。
Timer使用優(yōu)先級隊列作為存儲定時任務(wù)的數(shù)據(jù)結(jié)構(gòu),這樣在插入任務(wù)時可以達到O(logN)的時間復(fù)雜度。
為性能考慮,Timer主線程非采用每隔一小段時間不斷輪詢的方式,而是在當前沒有任務(wù)需要執(zhí)行時保持阻塞。為此需要考慮兩個喚醒阻塞條件,一是任務(wù)隊列由空到非空時喚醒,二是當下個定時任務(wù)還沒到期而阻塞時,插入一個到期時間更早的定時任務(wù),需要重新設(shè)定阻塞時間,因此先喚醒主線程。
public class Timer {
/**
* 基于優(yōu)先級隊列實現(xiàn)的定時任務(wù)隊列
*/
private static final PriorityQueue<TimerTask> timerTasks = new PriorityQueue<>();
private static final ReentrantLock lock = new ReentrantLock();
/**
* 喚醒阻塞條件一:隊列非空
*/
private static final Condition notEmpty = lock.newCondition();
/**
* 喚醒阻塞條件二:當前時刻有任務(wù)需要執(zhí)行
*/
private static final Condition hasCurrTask = lock.newCondition();
/**
* 添加新的定時任務(wù)
*/
public static void addTimeTask(TimerTask task) {
lock.lock();
if (timerTasks.isEmpty()) {
notEmpty.signal();
}
TimerTask firstTask = timerTasks.peek();
timerTasks.offer(task);
if (firstTask != null && task.getExecTime() < firstTask.getExecTime()) {
hasCurrTask.signal();
}
lock.unlock();
}
/**
* 啟動定時器
*/
public static void start() {
Executor executor = Executors.newSingleThreadExecutor();
executor.execute(() -> {
while (true) {
TimerTask firstTask;
lock.lock();
if (timerTasks.isEmpty()) {
try {
notEmpty.await();
} catch (InterruptedException ignore) {
// ignore
}
}
firstTask = timerTasks.peek();
long currDeadlineMillis = firstTask.getExecTime();
long currTime = System.currentTimeMillis();
long delay = currDeadlineMillis - currTime;
if (delay > 0) {
try {
hasCurrTask.await(delay, TimeUnit.MILLISECONDS);
} catch (InterruptedException ignore) {
// ignore
}
} else {
firstTask = timerTasks.poll();
}
lock.unlock();
if (firstTask != null) {
firstTask.run();
}
}
});
}
}
程序運行
示例程序放在test包下面,涉及到的類說明:
ActorPing:每隔固定間隔向ActorPong發(fā)送消息,并接收回包。
ActorPong:接收ActorPing發(fā)送的消息并原樣返回。
Cluster:包含NodeA和NodeB兩個節(jié)點的配置。
NodeA:啟動時創(chuàng)建兩個ActorPing,分別命名為ping1和ping2,分別以1s和5s的間隔向NodeB上的pong發(fā)送消息。
NodeB:啟動時創(chuàng)建一個ActorPong,命名為pong。
運行時,先啟動NodeB,再啟動NodeA,NodeA下面會打印帶時間戳的如下信息:
[time:8, srcActor:null, destActor:ping1]command:start,params:[1000]
[time:8, srcActor:null, destActor:ping2]command:start,params:[5000]
[time:9, srcActor:ping1, destActor:ping1]command:ping,params:[1000]
[time:9, srcActor:ping2, destActor:ping2]command:ping,params:[5000]
[time:22, taskId:2]addTask
[time:22, taskId:1]addTask
[time:143, srcActor:pong, destActor:ping1]command:receivePong,params:[msg]
[time:143, srcActor:pong, destActor:ping2]command:receivePong,params:[msg]
[time:1026, taskId:2]execTask
[time:1026, srcActor:null, destActor:ping1]command:ping,params:[1000]
[time:1029, taskId:3]addTask
[time:1035, srcActor:pong, destActor:ping1]command:receivePong,params:[msg]
[time:2033, taskId:3]execTask
[time:2034, srcActor:null, destActor:ping1]command:ping,params:[1000]
[time:2034, taskId:4]addTask
[time:2037, srcActor:pong, destActor:ping1]command:receivePong,params:[msg]
[time:3036, taskId:4]execTask
[time:3036, srcActor:null, destActor:ping1]command:ping,params:[1000]
[time:3036, taskId:5]addTask
[time:3039, srcActor:pong, destActor:ping1]command:receivePong,params:[msg]
[time:4041, taskId:5]execTask
[time:4042, srcActor:null, destActor:ping1]command:ping,params:[1000]
[time:4042, taskId:6]addTask
[time:4044, srcActor:pong, destActor:ping1]command:receivePong,params:[msg]
[time:5022, taskId:1]execTask
[time:5022, srcActor:null, destActor:ping2]command:ping,params:[5000]
[time:5022, taskId:7]addTask
NodeB下面會打印如下信息:
[time:1938, srcActor:ping2, destActor:pong]command:pong,params:[msg]
[time:1940, srcActor:ping1, destActor:pong]command:pong,params:[msg]
[time:2855, srcActor:ping1, destActor:pong]command:pong,params:[msg]
[time:3856, srcActor:ping1, destActor:pong]command:pong,params:[msg]
[time:4856, srcActor:ping1, destActor:pong]command:pong,params:[msg]
[time:5860, srcActor:ping1, destActor:pong]command:pong,params:[msg]
[time:6850, srcActor:ping2, destActor:pong]command:pong,params:[msg]
小結(jié)
本文總結(jié)了使用Java實現(xiàn)一個簡單Actor模型的完整流程。由于時間所限,本文只實現(xiàn)了Actor模型的基礎(chǔ)功能。不過造輪子的目的主要是為了深入掌握Actor模型的核心概念,作為演示和研究的用途。對于并發(fā)模型來說,不管用哪種語言來實現(xiàn),原理才是主要的、相通的,語言只不過是實現(xiàn)的工具。相信筆者的這篇文章也會幫助讀者對Actor模型有更為深入的了解。
以上就是基于Java實現(xiàn)Actor模型的詳細內(nèi)容,更多關(guān)于Java Actor模型的資料請關(guān)注腳本之家其它相關(guān)文章!
相關(guān)文章
Java日期時間及日期相互轉(zhuǎn)換實現(xiàn)代碼
這篇文章主要介紹了Java日期時間及日期相互轉(zhuǎn)換實現(xiàn)代碼,文中通過示例代碼介紹的非常詳細,對大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價值,需要的朋友可以參考下2019-12-12
解讀SpringBoot中addCorsMappings配置跨域與攔截器互斥問題的原因
這篇文章主要介紹了解讀SpringBoot中addCorsMappings配置跨域與攔截器互斥問題的原因,具有很好的參考價值,希望對大家有所幫助,如有錯誤或未考慮完全的地方,望不吝賜教2023-12-12

