Rust 通過異步實現(xiàn)并發(fā)的方法示例
在本文中,我們將重點討論線程和 future 之間的區(qū)別。
在許多情況下,使用異步處理并發(fā)性的 API 與使用線程的 API 非常相似,但它們通常具有不同的行為,并且它們幾乎總是具有不同的性能特征。
用 spawn_task 創(chuàng)建一個新任務
使用 thread::spawn 可以創(chuàng)建一個新線程,我們編寫的第一個程序是在兩個單獨的線程上進行計數(shù)。
讓我們使用 async 做同樣的事情。trpl crate 提供了一個看起來與 thread::spawn API 非常相似的 spawn_task 函數(shù),以及一個 sleep 函數(shù),它是 thread::sleep API 的異步版本。我們可以一起使用它們來實現(xiàn)計數(shù)示例:
use std::time::Duration;
fn main() {
trpl::run(async {
trpl::spawn_task(async {
for i in 1..10 {
println!("hi number {i} from the first task!");
trpl::sleep(Duration::from_millis(500)).await;
}
});
for i in 1..5 {
println!("hi number {i} from the second task!");
trpl::sleep(Duration::from_millis(500)).await;
}
});
}
我們使用 trpl::run 設置 main 函數(shù),以便我們的頂級函數(shù)可以是異步的。
然后我們在該塊中編寫兩個循環(huán),每個循環(huán)都包含一個 trpl::sleep 調用,該調用在發(fā)送下一條消息之前等待半秒。我們將一個循環(huán)放在 trpl::spawn_task 中,另一個放在頂層的 for 循環(huán)中。我們還在 sleep 調用之后添加了一個 await。
這段代碼的行為類似于基于線程的實現(xiàn)——包括當你運行它時,你可能會看到消息以不同的順序出現(xiàn)在你自己的終端上。

這個版本在主異步塊體中的 for 循環(huán)完成后立即停止,因為在主函數(shù)結束時,由 spawn_task 生成的任務被關閉。如果希望它一直運行到任務完成,則需要使用連接句柄來等待第一個任務完成。對于線程,我們使用 join 方法來“阻塞”,直到線程完成運行。我們可以使用 await 來做同樣的事情,因為任務句柄本身就是一個 future。它的輸出類型是 Result,所以我們也在等待它之后展開它。
use std::time::Duration;
fn main() {
trpl::run(async {
let handle = trpl::spawn_task(async {
for i in 1..10 {
println!("hi number {i} from the first task!");
trpl::sleep(Duration::from_millis(500)).await;
}
});
for i in 1..5 {
println!("hi number {i} from the second task!");
trpl::sleep(Duration::from_millis(500)).await;
}
handle.await.unwrap();
});
}
這個更新的版本運行直到兩個循環(huán)結束。

到目前為止,看起來 async 和線程給出了相同的基本結果,只是語法不同:使用 await 而不是在連接句柄上調用 join,并等待 sleep 調用。
更大的區(qū)別在于,我們不需要生成另一個操作系統(tǒng)線程來執(zhí)行此操作。實際上,我們甚至不需要在這里生成任務。由于 async 塊編譯為匿名的 future,我們可以將每個循環(huán)放在 async 塊中,并讓運行時使用 trpl::join 函數(shù)將它們運行到完成。
我們之前展示了如何在調用 std::thread::spawn 時對返回的 JoinHandle 類型使用 join 方法。trpl::join 函數(shù)與此類似,但用于 future。當你給它兩個 future 時,它會產(chǎn)生一個新的 future,它的輸出是一個元組,其中包含你傳入的每個 future 完成后的輸出。
我們使用 trpl::join 來等待 fut1 和 fut2 完成。我們不等待 fut1 和 fut2,而是等待 trpl::join 生成的新 future。
use std::time::Duration;
fn main() {
trpl::run(async {
let fut1 = async {
for i in 1..10 {
println!("hi number {i} from the first task!");
trpl::sleep(Duration::from_millis(500)).await;
}
};
let fut2 = async {
for i in 1..5 {
println!("hi number {i} from the second task!");
trpl::sleep(Duration::from_millis(500)).await;
}
};
trpl::join(fut1, fut2).await;
});
}
編譯運行,我們看到兩個 future 都運行到完成:

現(xiàn)在,每次運行的結果的順序都完全相同,這與我們在線程中看到的非常不同。
這是因為 trpl::join 函數(shù)是公平的,這意味著它同樣頻繁地檢查每個 future,在它們之間交替,如果另一個準備好了,它永遠不會讓一個搶先。對于線程,操作系統(tǒng)決定檢查哪個線程以及讓它運行多長時間。對于異步 Rust,運行時決定檢查哪個任務。
在實踐中,細節(jié)變得復雜,因為異步運行時可能會在后臺使用操作系統(tǒng)線程作為管理并發(fā)性的一部分,因此保證公平性對運行時來說可能需要更多的工作。
運行時不必保證任何給定操作的公平性,它們通常提供不同的 API,讓你選擇是否需要公平性。
使用消息傳遞計算兩個任務
我們使用消息傳遞的異步版本在 future 之間共享數(shù)據(jù)。
我們將采用與使用消息傳遞在線程之間傳輸數(shù)據(jù)略有不同的方法來說明基于線程的并發(fā)和基于 future 的并發(fā)之間的一些關鍵區(qū)別。
在 trpl::run 的 async 塊中創(chuàng)建通道:
fn main() {
trpl::run(async {
let (tx, mut rx) = trpl::channel();
let val = String::from("hi");
tx.send(val).unwrap();
let received = rx.recv().await.unwrap();
println!("Got: {received}");
});
}
這里,我們使用 trpl::channel,這是 std::mpsc::channel(多生產(chǎn)者、單消費者通道)的異步版本。異步版本的 API 與基于線程的版本只有一點不同:它使用一個可變的接收端 rx,它的 recv 方法產(chǎn)生一個我們需要等待的 future,而不是直接產(chǎn)生值?,F(xiàn)在我們可以將消息從發(fā)送者發(fā)送到接收者。注意,我們不需要生成一個單獨的線程或任務,我們只需要等待 rx.recv 調用。
在 std::mpsc::channel 中的 Receiver::recv 方法阻塞線程,直到它接收到消息。trpl::Receiver::recv 方法是異步的,它不阻塞,而是將控制權交還給運行時,直到接收到消息或通道的發(fā)送端關閉為止。相比之下,我們不等待 send 調用,因為它不會阻塞。
注意:由于所有這些異步代碼都在 trpl::run 調用中的異步塊中運行,因此其中的所有代碼都可以避免阻塞。但是,它外面的代碼將在運行函數(shù)返回時阻塞。這就是 trpl::run 函數(shù)的全部意義:它允許你選擇在哪里阻塞某些異步代碼集,以及在哪里在同步代碼和異步代碼之間轉換。在大多數(shù)異步運行時,run 實際上被命名為 block_on 正是出于這個原因。
關于這個例子,請注意兩點。首先,消息會馬上到達。第二,雖然我們在這里使用了 future,但是還沒有并發(fā)。程序的一切都是按順序進行的,就像不涉及 future 一樣。
讓我們通過發(fā)送一系列消息并在它們之間休眠來解決第一部分:
use std::time::Duration;
fn main() {
trpl::run(async {
let (tx, mut rx) = trpl::channel();
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("future"),
];
for val in vals {
tx.send(val).unwrap();
trpl::sleep(Duration::from_millis(500)).await;
}
while let Some(value) = rx.recv().await {
println!("received '{value}'");
}
});
}
Rust 還沒有一種方法可以在一系列異步項上編寫 for 循環(huán),因此我們需要使用 while let 條件循環(huán),只要循環(huán)指定的模式繼續(xù)匹配該值,循環(huán)就會繼續(xù)執(zhí)行。
rx.recv() 產(chǎn)生一個我們等待的 future。運行時將暫停 future,直到它準備好。一旦消息到達,future 將解析為 Some(message)。當通道關閉時,無論是否有消息到達,future 都將解析為 None,表示沒有更多的值,因此我們應該停止輪詢——也就是說,停止 await。
while let 循環(huán)將所有這些組合在一起。如果調用 rx.recv().await 的結果是Some(message),則可以訪問該消息,并可以在循環(huán)體中使用它。如果結果為 None,則循環(huán)結束。每次循環(huán)完成時,它都會再次到達等待點,因此運行時將再次暫停它,直到另一條消息到達。
代碼現(xiàn)在成功地發(fā)送和接收了所有消息:

不幸的是,仍然存在一些問題。首先,消息不會以半秒的間隔到達,它們在我們啟動程序后 2 秒同時到達。其次,這個程序永遠不會退出!相反,它會永遠等待新的消息。
因為程序中只有一個異步塊,因此其中的所有內容都是線性運行的,仍然沒有并發(fā)性。所有的 tx.send 調用都會發(fā)生,并與所有的 trpl::sleep 調用及其相關的等待點穿插在一起。只有這樣,while let 循環(huán)才能通過 recv 調用上的任何等待點。
為了獲得我們想要的行為,即在每個消息之間發(fā)生睡眠延遲,我們需要將 tx 和 rx 操作放在各自的異步塊中,然后運行時可以使用 trpl::join 分別執(zhí)行它們中的每一個。同樣,我們等待調用 trpl::join 的結果,而不是單個的 future。
use std::time::Duration;
fn main() {
trpl::run(async {
let (tx, mut rx) = trpl::channel();
let tx_fut = async {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("future"),
];
for val in vals {
tx.send(val).unwrap();
trpl::sleep(Duration::from_millis(500)).await;
}
};
let rx_fut = async {
while let Some(value) = rx.recv().await {
println!("received '{value}'");
}
};
trpl::join(tx_fut, rx_fut).await;
});
}
消息以 500 ms 的間隔打印,而不是在 2 s 后匆忙打印。
然而,由于 while let 循環(huán)與 trpl::join 的交互方式,程序仍然不會退出:
- 只有當傳遞給它的兩個 future 都完成后,從 trpl::join 返回的 future 才會完成。
- 在發(fā)送 vals 中的最后一條消息后,一旦結束 sleep,tx future 就完成了。
- 直到 while let 循環(huán)結束,rx future 才會完成。
- while let 循環(huán)直到等待 rx.recv 產(chǎn)生 None 才會結束。
- 等待 rx.recv 只會在通道的另一端關閉時返回 None。
- 只有當我們調用 rx.close 或當發(fā)送端 tx 被丟棄時,通道才會關閉。
- 我們不會在任何地方調用 rx.close,并且在傳遞給 trpl::run 的最外層異步塊結束之前,tx 不會被丟棄。
- 這個塊不能結束,因為它在 trpl::join 完成時被阻塞了,這將我們帶回到列表的頂部。
我們可以通過在某處調用 rx.close 來手動關閉 rx,但這沒有多大意義。在處理任意數(shù)量的消息后停止將使程序關閉,但我們可能會錯過消息。我們需要一些其他的方法來確保 tx 在函數(shù)結束前被刪除。
現(xiàn)在,我們發(fā)送消息的異步塊只借用 tx,因為發(fā)送消息不需要所有權,但是如果我們可以將 tx 移動到異步塊中,那么一旦該塊結束,它就會被丟棄。move 關鍵字對異步塊的作用就像對閉包的作用一樣,將數(shù)據(jù)轉移到異步塊中。
我們將用于發(fā)送消息的塊從 async 更改為 async move。當我們運行這個版本的代碼時,它會在發(fā)送和接收最后一條消息后優(yōu)雅地關閉。
use std::time::Duration;
fn main() {
trpl::run(async {
let (tx, mut rx) = trpl::channel();
let tx_fut = async move {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("future"),
];
for val in vals {
tx.send(val).unwrap();
trpl::sleep(Duration::from_millis(500)).await;
}
};
let rx_fut = async {
while let Some(value) = rx.recv().await {
println!("received '{value}'");
}
};
trpl::join(tx_fut, rx_fut).await;
});
}
因為 tx 所有權被轉移到 async 塊內,在該塊執(zhí)行完也就是發(fā)送作業(yè)結束之后,tx 隨之被銷毀,觸發(fā)通道關閉,接收端返回 None。
這個異步通道也是一個多生產(chǎn)者通道,所以如果我們想從多個 future 發(fā)送消息,我們可以在 tx 上調用 clone。
use std::time::Duration;
fn main() {
trpl::run(async {
let (tx, mut rx) = trpl::channel();
let tx1 = tx.clone();
let tx1_fut = async move {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("future"),
];
for val in vals {
tx1.send(val).unwrap();
trpl::sleep(Duration::from_millis(500)).await;
}
};
let rx_fut = async {
while let Some(value) = rx.recv().await {
println!("received '{value}'");
}
};
let tx_fut = async move {
let vals = vec![
String::from("more"),
String::from("messages"),
String::from("for"),
String::from("you"),
];
for val in vals {
tx.send(val).unwrap();
trpl::sleep(Duration::from_millis(1500)).await;
}
};
trpl::join3(tx1_fut, tx_fut, rx_fut).await;
});
}
克隆 tx,在第一個異步塊之外創(chuàng)建 tx1,我們將 tx1 移動到該塊中。然后將原始 tx 移動到一個新的異步塊中,在那里我們以稍慢的延遲發(fā)送更多消息。
用于發(fā)送消息的兩個異步塊都需要是 async move 塊,以便在這些塊完成時丟棄 tx 和 tx1。最后,我們從 trpl::join 切換到 trpl::join3 來處理額外的 future。
現(xiàn)在我們看到了來自兩個發(fā)送 future 的所有消息,由于發(fā)送 future 在發(fā)送后使用的延遲略有不同,因此接收消息的間隔也不同。

這是一個良好的開端,但它限制了我們的 future 數(shù)量:兩個對應 join,或三個對應 join3。
到此這篇關于Rust 通過異步實現(xiàn)并發(fā)的方法示例的文章就介紹到這了,更多相關Rust 異步并發(fā)內容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關文章希望大家以后多多支持腳本之家!
相關文章
Windows系統(tǒng)下安裝Rust環(huán)境超詳細教程
這篇文章主要介紹了如何在Windows系統(tǒng)上安裝mingw64和Rust,mingw64是一個輕便的C語言編譯環(huán)境,可以替代Rust默認使用的Visual?Studio,文中通過圖文介紹的非常詳細,需要的朋友可以參考下2025-02-02
libbpf和Rust開發(fā)ebpf程序實戰(zhàn)示例
這篇文章主要為大家介紹了libbpf和Rust開發(fā)ebpf程序實戰(zhàn)示例,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2023-12-12

