Golang超全面講解并發(fā)
1. goroutine
1.1 定義
func main() {
for i := 0; i < 10; i++ {
//開啟并發(fā)打印
go func(i int) {
fmt.Printf("hello goroutine : %d \n", i)
}(i)
}
time.Sleep(time.Millisecond)
}
go語言是采用一種叫 協(xié)程(Coroutine)
輕量級(jí) “線程”
非搶占式 多任務(wù)處理,由協(xié)程主動(dòng)交出CPU控制權(quán)
- 線程是由CPU來決定是否移交控制權(quán),做到一半可能線程就會(huì)進(jìn)行切換
- 協(xié)程則是由內(nèi)部進(jìn)行決定是否要移交CPU的控制權(quán)
編譯器/解釋器/虛擬機(jī)層面的多任務(wù)
多個(gè)協(xié)程可以在一個(gè)或者多個(gè)線程上運(yùn)行 (由調(diào)度器來決定)
以下例子,通過 runtime.Gosched() 可以手動(dòng)交出控制權(quán),如果不交出控制權(quán);還有如果在 gorotine 里面使用外部函數(shù),如果不傳入的話,就是一個(gè)閉包的變量,會(huì)導(dǎo)致數(shù)據(jù)沖突,就是通過不同的協(xié)程寫入數(shù)據(jù)。檢查數(shù)據(jù)是否有沖突可以通過以下語句進(jìn)行檢測(cè)
go run -race
func main() {
var a [10]int
for i := 0; i < 10; i++ {
//如果這里不將i傳入?yún)?shù)直接引用外部的參數(shù)會(huì)出現(xiàn):數(shù)據(jù)沖突(race condition)
go func(i int) {
// 打印語句會(huì)進(jìn)行協(xié)程的調(diào)度,會(huì)交出控制權(quán):fmt.Printf("hello goroutine : %d \n", i)
a[i]++
//通過 Gosched() 可以手動(dòng)交出協(xié)程的控制權(quán);如果不寫這個(gè)語句協(xié)程就不會(huì)交出控制權(quán)進(jìn)行調(diào)度執(zhí)行,就會(huì)一直卡死在這里
runtime.Gosched()
}(i)
}
time.Sleep(time.Millisecond)
fmt.Println(a)
}
普通函數(shù):在一個(gè)線程里面執(zhí)行,調(diào)用完后釋放資源,單向調(diào)用
協(xié)程:雙向流通

但go的程序啟動(dòng)時(shí),一個(gè)線程里面可能有多個(gè) goroutine 執(zhí)行,具體在哪個(gè)線程執(zhí)行,由調(diào)度器決定;傳統(tǒng)意義上的 routine 需要顯示的寫出釋放控制權(quán),而 goroutine 不需要寫出來,調(diào)度器會(huì)進(jìn)行切換

1.2 goroutine切換點(diǎn)
只是參考,不能保證肯定會(huì)切換
- I/O, select:打印數(shù)據(jù)的時(shí)候
- channel
- 等待鎖
- 函數(shù)調(diào)用(有時(shí))
- runtime.Gosched()
2. channel
協(xié)程與協(xié)程之間的雙向通信
2.1 語法
func chanDemo() {
//var c chan int //為 nil 的chan不能使用
c := make(chan int)
go func() {
for {
n := <- c
fmt.Println(n)
}
}()
c <- 1 //向 c 里面發(fā)送數(shù)據(jù)(在發(fā)送時(shí)需要 goroutine 來進(jìn)行接收,否則就會(huì)死鎖)
c <- 2
}
2.2 channel作為參數(shù)
func worker(i int, c chan int) {
for {
fmt.Printf("Worker :%d, accpet: %c \n", i, <-c)
}
}
func chanDemo() {
var channels [10]chan int
for i := 0; i < 10; i++ {
channels[i] = make(chan int)
go worker(i, channels[i])
}
for i := 0; i < 10; i++ {
channels[i] <- 'a' + i
}
for i := 0; i < 10; i++ {
channels[i] <- 'A' + i
}
}2.3 channel作為返回值
返回值定義:chan<- int :代表只能發(fā)數(shù)據(jù); <-chan int:代表只能收數(shù)據(jù)
// chan<- int :代表只能發(fā)數(shù)據(jù); <-chan int:代表只能收數(shù)據(jù)
func worker(i int) chan int {
c := make(chan int)
go func() {
for {
fmt.Printf("Worker :%d, accpet: %c \n", i, <-c)
}
}()
return c
}
func chanDemo() {
var channels [10]chan int
for i := 0; i < 10; i++ {
channels[i] = worker(i)
}
for i := 0; i < 10; i++ {
channels[i] <- 'a' + i
}
for i := 0; i < 10; i++ {
channels[i] <- 'A' + i
}
}創(chuàng)建時(shí)可以指定 channel 的大小,make(chan int, 3) 如果數(shù)據(jù)超過3個(gè)就會(huì)死鎖,對(duì)提升性能有好處。
2.4 chan關(guān)閉
如果不 close 那么會(huì)一直接收下去,可以是否 if、range 進(jìn)行判斷是否 close
不要通過共享內(nèi)存來進(jìn)行通信,要通過通信來共享內(nèi)存
func chanClose() {
//創(chuàng)建一個(gè)緩沖區(qū)
c := make(chan int, 3)
c <- 1
c <- 2
c <- 3
//方法一:
go func() {
for {
//如果沒有接收到數(shù)據(jù)就直接返回
if i, ok := <-c; !ok {
fmt.Println(" close channel.....")
break
} else {
//死循環(huán)讀取,如果外部 chan 已經(jīng)關(guān)閉了,這里會(huì)一直接收具體數(shù)據(jù)的默認(rèn)值
fmt.Printf("%d\n", i)
}
}
}()
//方法二:
go func() {
for n := range c {
//如果沒有接收到數(shù)據(jù)就直接返回
fmt.Printf("%d\n", n)
}
}()
//close關(guān)閉 channel
close(c)
}
2.5 等待goroutine
如何等待所有的 goroutine 執(zhí)行完之后才退出程序?
方式一:
func ChanDemo() {
workers := make([]worker, 10)
for i, _ := range workers {
workers[i] = createWorker(i)
}
for i, worker := range workers{
worker.in <- 'a' + i
}
for i, worker := range workers{
worker.in <- 'A' + i
}
//這種方式可以接收消費(fèi)者傳入的數(shù)據(jù),如果消費(fèi)者傳入的數(shù)據(jù)是同步的話,這里也會(huì)出現(xiàn)死鎖
for _, worker := range workers {
<- worker.done
<- worker.done
}
}
//定義一個(gè)worker的結(jié)構(gòu)用來存放 chan數(shù)據(jù)
type worker struct {
done chan bool
in chan int
}
func createWorker(id int) worker {
w := worker{
done: make(chan bool),
in: make(chan int),
}
go doWorker(id, w.in, w.done)
return w
}
func doWorker(id int, in chan int, done chan bool) {
for i := range in {
fmt.Printf("Worker:%d, accept:%c \n", id, i)
go func() {
//異步發(fā)送,否則會(huì)出現(xiàn)死鎖,因?yàn)榘l(fā)送會(huì)加鎖
done <- true
}()
}
}方式二:
使用 sync.WaitGroup 等待所有的 gorounte 執(zhí)行結(jié)束
//定義一個(gè)worker的結(jié)構(gòu)用來存放 chan數(shù)據(jù)
type done2Worker struct {
in chan int
//使用指針傳遞
wg *sync.WaitGroup
}
func create2Worker(id int, group *sync.WaitGroup) done2Worker {
w := done2Worker{
wg: group,
in: make(chan int),
}
go do2Worker(id, w)
return w
}
func do2Worker(id int, worker2 done2Worker) {
for i := range worker2.in {
fmt.Printf("Worker:%d, accept:%c \n", id, i)
//執(zhí)行完成
worker2.wg.Done()
}
}
func Chan2Demo() {
var wg sync.WaitGroup
workers := make([]done2Worker, 10)
for i, _ := range workers {
workers[i] = create2Worker(i, &wg)
}
wg.Add(20)
for i, worker := range workers {
worker.in <- 'a' + i
}
for i, worker := range workers {
worker.in <- 'A' + i
}
wg.Wait()
}3. select
select 可以對(duì) channel 進(jìn)行非阻塞式調(diào)用,誰先來執(zhí)行誰,在 select 中也可以使用 nil 進(jìn)行調(diào)度
func generator() chan int {
out := make(chan int)
go func() {
i := 0
for {
//休眠隨機(jī)數(shù)
time.Sleep(time.Duration(rand.Intn(1500)) * time.Millisecond)
out <- i
i++
}
}()
return out
}
func main() {
//非阻塞式獲取數(shù)據(jù),誰先出數(shù)據(jù)就執(zhí)行哪一段邏輯
var c1, c2 = generator(), generator()
// 10秒鐘后發(fā)送一次數(shù)據(jù)
after := time.After(time.Second * 10)
//每秒鐘都會(huì)寫一次數(shù)據(jù)
tick := time.Tick(time.Second)
//死循環(huán)獲取channel中的數(shù)據(jù)
for {
select {
//會(huì)通過select關(guān)鍵字進(jìn)行調(diào)用,誰先來數(shù)據(jù),就執(zhí)行誰
case n := <-c1:
fmt.Println("Received from c1:", n)
case n := <-c2:
fmt.Println("Received from c2:", n)
case <-after:
fmt.Println("ten second after......")
case <-tick:
fmt.Println("tick task exec .....")
default:
fmt.Println("No value received")
}
}
}
4. 傳統(tǒng)同步機(jī)制
CSP : 模型下面盡量少用傳統(tǒng)的同步方式,傳統(tǒng)的方式使用共享變量進(jìn)行使用
- WaitGroup
- Mutex
type atomicInt struct {
a int
//定義互斥量進(jìn)行同步
lock sync.Mutex
}
func (a *atomicInt) add() {
a.lock.Lock()
defer a.lock.Unlock()
a.a++
}
func main() {
a := atomicInt{
a: 0,
}
a.add()
go a.add()
time.Sleep(time.Millisecond)
fmt.Println("value : ", a.a)
}
Cond
5. 并發(fā)模式
5.1 生成器
//傳入多個(gè)chan,返回一個(gè)只能輸出的chan
func fanIn(chs...chan string) <-chan string {
//創(chuàng)建管道
c := make(chan string)
//這里循環(huán)讀取 chs 管道傳入的數(shù)據(jù)
for _ , ch := range chs {
go func(in chan string) {
for {
//循環(huán)從chCopy里面讀取數(shù)據(jù)后傳入到返回出去的chan
//這里不能直接使用 ch ,因?yàn)樵撟兞渴且粋€(gè)閉包,后續(xù)遍歷的管道會(huì)將其覆蓋
c <- <-in
}
}(ch)
}
return c
}
//創(chuàng)建一個(gè)channel,循環(huán)的發(fā)送數(shù)據(jù)
func msgGen(serviceName string) chan string {
ch := make(chan string)
go func() {
for {
ch<- fmt.Sprintf("hello:%s", serviceName)
}
}()
return ch
}
func main() {
s1 := msgGen("service1")
s2 := msgGen("service2")
s3 := msgGen("service3")
//可以拿到返回出來的 channel 跟服務(wù)繼續(xù)做交互
m := fanIn(s1, s2, s3)
for {
fmt.Println(<-m)
}
}5.2 定義接口
// ChannelCreateFunc 創(chuàng)建接口,需要傳入管道,以及參數(shù)
type ChannelCreateFunc interface {
// Create 創(chuàng)建函數(shù)傳入一個(gè)任何類型的管道,后面參數(shù)選擇性傳入
Create(ch <-chan any, V...any) (any, bool)
}
func Creator(c ChannelCreateFunc, ch <-chan any) {
if r, ok := c.Create(ch, time.Duration(time.Second)); ok {
fmt.Println(r)
} else {
fmt.Println("未接收到數(shù)據(jù)")
}
}
func main() {
s1 := msgGen("service1")
m := fanIn(s1)
Creator(timeout.TimeoutCreator{}, m)
Creator(noblock.NotBlockCreator{}, m)
}
5.3 非阻塞管道
新建 noblock.go,定義下面這樣的格式
type NotBlockCreator struct {
}
func (n NotBlockCreator) Create(ch <-chan any, V...any) (any, bool) {
select {
case m := <-ch:
return m, true
default:
return "", false
}
}
5.4 超時(shí)管道
新建 timeout.go
type TimeoutCreator struct {
}
func (t TimeoutCreator) Create(ch <-chan any, V...any) (any, bool) {
size := len(V)
if size == 1 {
var timeoutValue = V[0]
switch v := timeoutValue.(type) {
case time.Duration:
for {
select {
case m := <-ch:
return m, true
case <-time.After(v):
fmt.Println("數(shù)據(jù)超時(shí)接收,直接返回false")
return "", false
}
}
}
}
return "", false
}
6. 廣度優(yōu)先算法(迷宮)

每次探索都是一層一層的向外進(jìn)行探索,如果起始為0,那么先將周邊的 1 進(jìn)行探索完畢,探索1時(shí)會(huì)將1的點(diǎn)位先存入到隊(duì)列中,等后續(xù)所有的1都探索完成之后,再取出1的點(diǎn)位進(jìn)行1周邊的探索

通過上面的這種點(diǎn)位算法,就可以將迷宮的路畫出來

6.1 代碼實(shí)現(xiàn)
創(chuàng)建文本,這里需要注意,idea創(chuàng)建文件分隔符編碼需要設(shè)置以下,否則后續(xù)讀取文件時(shí)會(huì)有問題
6 5
0 1 0 0 0
0 0 0 1 0
0 1 0 1 0
1 1 1 0 0
0 1 0 0 1
0 1 0 0 0

讀取文件成二維數(shù)組
// Fscanf函數(shù)在讀取文件時(shí),遇到\r為默認(rèn)替換為0,讀取\n結(jié)束,如果編碼不對(duì),這里讀取就會(huì)出問題
func readMaze(path string) [][]int {
file, err := os.Open(path)
if err != nil {
panic(err)
}
var row, col int
//這里需要取地址,函數(shù)里面會(huì)更改row和col的值
fmt.Fscanf(file, "%d %d", &row, &col)
fmt.Printf("%d\t%d\n", row, col)
//創(chuàng)建一個(gè)二位數(shù)組,一共有多少行
maze := make([][]int, row)
for i := range maze {
//創(chuàng)建列
maze[i] = make([]int, col)
for j := range maze[i] {
fmt.Fscanf(file, "%d", &maze[i][j])
}
}
return maze
}
func main() {
//讀取迷宮文件
maze := readMaze("maze/maze.in")
for _, row := range maze {
for _, col := range row {
fmt.Printf("%d\t", col)
}
fmt.Println()
}
}
//點(diǎn)位的結(jié)構(gòu)體
type point struct {
i, j int
}
//定義需要探索的方向
var dirs = [4]point {
//當(dāng)前位置-1,就是向上
{-1, 0},
//左邊的點(diǎn)位
{0, -1},
//向下的點(diǎn)位
{1, 0},
//向右的點(diǎn)位
{0, 1},
}
//將兩個(gè)點(diǎn)位相加,就可以獲取到下一個(gè)點(diǎn)位
func (p point) add(r point) point {
return point{p.i + r.i, p.j + r.j}
}
func (p point) at(grid [][]int) (int, bool) {
//首先判斷點(diǎn)位是否越界了,例如傳入的點(diǎn)位 (-1,0)或者(1, -1)
if p.i < 0 || p.i >= len(grid) {
return 0, false
}
//判斷j列是否越界了
if p.j < 0 || p.j >= len(grid[p.i]) {
return 0, false
}
//返回?cái)?shù)據(jù)
return grid[p.i][p.j], true
}
// walk 傳入迷宮,指定迷宮開始的點(diǎn)位,以及出口的點(diǎn)位
func walk(maze [][]int, start, end point) [][]int {
//創(chuàng)建走過的步
steps := make([][]int, len(maze))
for i := range steps {
steps[i] = make([]int, len(maze[i]))
}
//創(chuàng)建需要探索的隊(duì)列,初始的點(diǎn)位(0,0)
Q := []point{start}
for len(Q) > 0 {
cur := Q[0]
//截取出隊(duì)列中的頭部
Q = Q[1:]
//判斷如果點(diǎn)位等于出口的點(diǎn)位,那么直接退出
if cur == end {
break
}
//dirs為點(diǎn)位周邊的四個(gè)方向,我這里采用的是 上、左、下、右 的方向進(jìn)行探索
for _, dir := range dirs {
//將當(dāng)前點(diǎn)位跟四個(gè)方向相加,例如 (0,0) 向上的方向就是(-1,0),將i的值進(jìn)行減1
next := cur.add(dir)
//判斷向上的點(diǎn)位不能超出迷宮的界限,并且返回在迷宮中的值,因?yàn)槿绻祷氐闹禐?,就證明是墻
val, ok := next.at(maze)
if !ok || val == 1 {
continue
}
//不等0就證明是墻
val, ok = next.at(steps)
if !ok || val != 0 {
continue
}
//如果是起點(diǎn),就跳過
if next == start {
continue
}
//獲取到當(dāng)前步數(shù)的值
curSteps, _ := cur.at(steps)
//將走過的點(diǎn)位追加到切片中
steps[next.i][next.j] = curSteps + 1
//繼續(xù)將下一個(gè)點(diǎn)位添加到需要探索的隊(duì)列當(dāng)中
Q = append(Q, next)
}
}
return steps
}到此這篇關(guān)于Golang超全面講解并發(fā)的文章就介紹到這了,更多相關(guān)Golang并發(fā)內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
GoLang bytes.Buffer基礎(chǔ)使用方法詳解
Go標(biāo)準(zhǔn)庫(kù)中的bytes.Buffer(下文用Buffer表示)類似于一個(gè)FIFO的隊(duì)列,它是一個(gè)流式字節(jié)緩沖區(qū),我們可以持續(xù)向Buffer尾部寫入數(shù)據(jù),從Buffer頭部讀取數(shù)據(jù)。當(dāng)Buffer內(nèi)部空間不足以滿足寫入數(shù)據(jù)的大小時(shí),會(huì)自動(dòng)擴(kuò)容2023-03-03
Golang Model 字段自動(dòng)化校驗(yàn)設(shè)計(jì)方案
在我們?nèi)粘i_發(fā)中,不可避免的總要去進(jìn)行各種參數(shù)校驗(yàn),但是如果在某個(gè)場(chǎng)景中,要校驗(yàn)的字段非常多,并且在其中還有耦合關(guān)系,那么我們手寫校驗(yàn)邏輯就變得非常的低效且難以維護(hù),本篇文檔就基于 DDD 領(lǐng)域模型設(shè)計(jì)的思想下,提供自動(dòng)化的校驗(yàn)?zāi)P妥侄?感興趣的朋友一起看看吧2025-02-02
Go?json自定義Unmarshal避免判斷nil示例詳解
這篇文章主要為大家介紹了Go?json自定義Unmarshal避免判斷nil示例詳解,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-06-06
go高并發(fā)時(shí)append方法偶現(xiàn)錯(cuò)誤解決分析
這篇文章主要為大家介紹了go高并發(fā)時(shí)append方法偶現(xiàn)錯(cuò)誤解決分析,有需要的朋友可以借鑒參考下,希望能夠有所幫助,祝大家多多進(jìn)步,早日升職加薪2022-10-10

