Go語言異步API設計的扇入扇出模式詳解
前言
扇出/扇入模式是更高級 API 集成的主要內容。這些應用程序并不總是表現出相同的可用性或性能特征。
扇出是從電子工程中借用的一個術語,它描述了輸入的邏輯門連接到另一個輸出門的數量。輸出需要提供足夠的電流來驅動所有連接的輸入。在事務處理系統(tǒng)中,用來描述為了服務一個輸入請求而需要做的請求總數。
扇入是指為邏輯單元的輸入方程提供輸入信號的最大數量。扇入是定義單個邏輯門可以接受的最大數字輸入數量的術語。大多數晶體管-晶體管邏輯 (TTL) 門有一個或兩個輸入,盡管有些有兩個以上。典型的邏輯門具有 1 或 2 的扇入。

扇入/扇出服務
我們舉一個現實世界的例子,一個電子商務網站將自己與一個第三方支付網關整合在一起。 這里,網站使用支付網關的 API 來彈出支付屏幕并輸入安全證書。同時,網站可能會調用另一個稱為分析的 API 來記錄支付的嘗試。這種將一個請求分叉成多個請求的過程被稱為 fan-out 扇出。在現實世界中,一個客戶請求可能涉及許多扇出服務。
另一個例子是 MapReduce。Map 是一個扇入的操作,而 Reduce 是一個扇出的 操作。一個服務器可以將一個信息扇出到下一組服務(API),并忽略結果。或者可以等到這些服務器的所有響應都返回。如 如下圖所示,一個傳入的請求被服務器復用為轉換成兩個傳出的請求:

扇入 fan-in 是一種操作,即兩個或更多傳入的請求會聚成一個請求。這種情況下,API如何聚合來自多個后端服務的結果,并將結果即時返回給客戶。
例如,想想一個酒店價格聚合器或航班票務聚合器,它從不同的數據提供者那里獲取關于多個酒店或航班的請求信息并顯示出來。
下圖顯示了扇出操作是如何結合多個請求并準備一個最終的響應,由客戶端消費的。

客戶端也可以是一個服務器,為更多的客戶提供服務。如上圖所示,左側的服務器正在收集來自酒店 A、酒店 B 和 航空公司供應商 A,并為不同的客戶準備另一個響應。
因此,扇入和扇出操作并不總是完全相互獨立的。大多數情況下,它將是一個混合場景,扇入和扇出操作都是相互配合的。
請記住,對下一組服務器的扇出操作可以是異步的。也是如此。對于扇入請求來說,這可能不是真的。扇入操作有時被稱為 API 調用。
Go 語言實現扇入/扇出模式
Fan-out:多個 goroutine 從同一個通道讀取數據,直到該通道關閉。OUT 是一種張開的模式,所以又被稱為扇出,可以用來分發(fā)任務。
Fan-in:1 個 goroutine 從多個通道讀取數據,直到這些通道關閉。IN 是一種收斂的模式,所以又被稱為扇入,用來收集處理的結果。

package main
import (
"context"
"log"
"sync"
"time"
)
// Task 包含任務編號及任務所需時長
type Task struct {
Number int
Cost time.Duration
}
// task channel 生成器
func taskChannelGerenator(ctx context.Context, taskList []Task) <-chan Task {
taskCh := make(chan Task)
go func() {
defer close(taskCh)
for _, task := range taskList {
select {
case <-ctx.Done():
return
case taskCh <- task:
}
}
}()
return taskCh
}
// doTask 處理并返回已處理的任務編號作為通道的函數
func doTask(ctx context.Context, taskCh <-chan Task) <-chan int {
doneTaskCh := make(chan int)
go func() {
defer close(doneTaskCh)
for task := range taskCh {
select {
case <-ctx.Done():
return
default:
log.Printf("do task number: %d\n", task.Number)
// task 任務處理
// 根據任務耗時休眠
time.Sleep(task.Cost)
doneTaskCh <- task.Number // 已處理任務的編號放入通道
}
}
}()
return doneTaskCh
}
// `fan-in` 意味著將多個數據流復用或合并成一個流。
// merge 函數接收參數傳遞的多個通道 “taskChs”,并返回單個通道 “<-chan int”
func merge(ctx context.Context, taskChs []<-chan int) <-chan int {
var wg sync.WaitGroup
mergedTaskCh := make(chan int)
mergeTask := func(taskCh <-chan int) {
defer wg.Done()
for t := range taskCh {
select {
case <-ctx.Done():
return
case mergedTaskCh <- t:
}
}
}
wg.Add(len(taskChs))
for _, taskCh := range taskChs {
go mergeTask(taskCh)
}
// 等待所有任務處理完畢
go func() {
wg.Wait()
close(mergedTaskCh)
}()
return mergedTaskCh
}
func main() {
start := time.Now()
// 使用 context 來防止 goroutine 泄漏,即使在處理過程中被中斷
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// taskList 定義每個任務及其成本
taskList := []Task{
Task{1, 1 * time.Second},
Task{2, 7 * time.Second},
Task{3, 2 * time.Second},
Task{4, 3 * time.Second},
Task{5, 5 * time.Second},
Task{6, 3 * time.Second},
}
// taskChannelGerenator 是一個函數,它接收一個 taskList 并將其轉換為 Task 類型的通道
// 執(zhí)行結果(int slice channel)存儲在 worker 中
// 由于 doTask 的結果是一個通道,被分給了多個 worker,這就對應了 fan-out 處理
taskCh := taskChannelGerenator(ctx, taskList)
numWorkers := 4
workers := make([]<-chan int, numWorkers)
for i := 0; i < numWorkers; i++ {
workers[i] = doTask(ctx, taskCh) // doTask 處理并返回已處理的任務編號作為通道的函數
}
count := 0
for d := range merge(ctx, workers) { // merge 從中讀取已處理的任務編號
count++
log.Printf("done task number: %d\n", d)
}
log.Printf("Finished. Done %d tasks. Total time: %fs", count, time.Since(start).Seconds())
}
參考鏈接:
Understanding the Fan-Out/Fan-In API Integration Pattern
以上就是Go語言異步API設計的扇入扇出模式詳解的詳細內容,更多關于Go異步API扇入扇出模式的資料請關注腳本之家其它相關文章!
相關文章
Go語言defer與return執(zhí)行的先后順序詳解
這篇文章主要為大家介紹了Go語言defer與return執(zhí)行的先后順序詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進步,早日升職加薪2022-12-12

