go語言同步機制有哪些

2022-12-23 14:00:33

go同步機制有:1、channel,著重並行問題中的資料流動,把流動的資料放到channel中,就能使用channel解決這個並行;2、Sync.Mutex,擁有Lock、Unlock兩個方法,主要實現思想體現在Lock函數中;3、Sync.waitGroup;4、Sync.Once;5、Sync.context;6、Sync.pool;7、atomic包,針對變數進行操作。

本教學操作環境:windows7系統、GO 1.18版本、Dell G3電腦。

Golang的提供的同步機制有sync模組下的Mutex、WaitGroup以及語言自身提供的chan等。

1.channel

概述

Golang以如此明顯的方式告訴我們:

優點:channel的核心是資料流動,關注到並行問題中的資料流動,把流動的資料放到channel中,就能使用channel解決這個並行【相關推薦:Go視訊教學、】

問題,而且使用channel是執行緒安全的並且不會有資料衝突,比鎖好用多了

缺點:不太適應同步太複雜的場景,比如多協程的同步等待問題,而且存在死鎖問題 ,channel死鎖問題:

分類

channel型別:無緩衝和緩衝型別
channel有兩種形式的,一種是無緩衝的,一個執行緒向這個channel傳送了訊息後,會阻塞當前的這個執行緒,知道其他執行緒去接收這個channel的訊息。無緩衝的形式如下:

intChan := make(chan int)
 
帶緩衝的channel,是可以指定緩衝的訊息數量,當訊息數量小於指定值時,不會出現阻塞,超過之後才會阻塞,需要等待其他執行緒去接收channel處理,帶緩衝的形式如下:
 
//3為緩衝數量
intChan := make(chan int, 3)
登入後複製

舉例

 type Person struct {
	Name    string
	Age     uint8
	Address Addr
}
 
type Addr struct {
	city     string
	district string
}
 
/*
測試channel傳輸複雜的Struct資料
 */
func testTranslateStruct() {
	personChan := make(chan Person, 1)
 
	person := Person{"xiaoming", 10, Addr{"shenzhen", "longgang"}}
	personChan <- person
 
	person.Address = Addr{"guangzhou", "huadu"}
	fmt.Printf("src person : %+v \n", person)
 
	newPerson := <-personChan
	fmt.Printf("new person : %+v \n", newPerson)
}
登入後複製

在實際應用過程中,等待channel 結束訊號的過程可能不是無期限的,一般會伴隨一個timer,超時時間如下面所示:

/*
檢查channel讀寫超時,並做超時的處理
 */
func testTimeout() {
	g := make(chan int)
	quit := make(chan bool)
 
	go func() {
		for {
			select {
			case v := <-g:
				fmt.Println(v)
			case <-time.After(time.Second * time.Duration(3)):
				quit <- true
				fmt.Println("超時,通知主執行緒退出")
				return
			}
		}
	}()
 
	for i := 0; i < 3; i++ {
		g <- i
	}
 
	<-quit
	fmt.Println("收到退出通知,主執行緒退出")
}
登入後複製

2.Sync.Mutex

Mutex擁有Lock、Unlock兩個方法,主要的實現思想都體現在Lock函數中。

Lock執行時,分三種情況:

  • 無衝突 通過CAS操作把當前狀態設定為加鎖狀態;

  • 有衝突 開始自旋,並等待鎖釋放,如果其他Goroutine在這段時間內釋放了該鎖, 直接獲得該鎖;如果沒有釋放,進入3;

  • 有衝突,且已經過了自旋階段 通過呼叫semacquire函數來讓當前Goroutine進入等待狀態。

無衝突時是最簡單的情況;有衝突時,首先進行自旋,是從效率方面考慮的, 因為大多數的Mutex保護的程式碼段都很短,經過短暫的自旋就可以獲得;如果自旋等待無果,就只好通過號誌來讓當前 Goroutine進入等待了。

3. Sync.waitGroup

Channel在某些同步場景下,使用略顯複雜,不管是使用多個channel還是使用channel陣列,如下:

func coordinateWithChan() {
 sign := make(chan struct{}, 2)
 num := int32(0)
 fmt.Printf("The number: %d [with chan struct{}]\n", num)
 max := int32(10)
 go addNum(&num, 1, max, func() {
  sign <- struct{}{}
 })
 go addNum(&num, 2, max, func() {
  sign <- struct{}{}
 })
 <-sign
 <-sign
}
登入後複製

所以Sync.waitGroup 就顯得更為優雅,Sync.waitGroup 用來等待一組goroutines的結束,在主Goroutine裡宣告,並且設定要等待的goroutine的個數,每個goroutine執行完成之後呼叫 Done,最後在主Goroutines 裡Wait即可。類似於JAVA中的CountDownLatch或者回圈屏障,並且Sync.waitGroup可以被重複使用,提供瞭如下API:

func (wg *WaitGroup) Add(delta int)
func (wg *WaitGroup) Done()
func (wg *WaitGroup) Wait()
登入後複製

但是Sync.waitGroup的使用需要遵循一些規則,避免丟擲Panic:
a. 錯誤呼叫Done方法, 導致waitGroup內部計數值出現負數的情況

b. 錯誤的呼叫Add方法,在waitGroup內部計數值到達0的時候,Add方法被呼叫,導致應該被喚起的goroutine沒有被喚起,就開始了新的一輪計數週期

所以在呼叫的時候,就要遵循一下原則:

先統一Add,再並行Done,最後Wait

4. Sync.Once

Sync.once實現方式是內部包含一個int32位元的標誌,用來判斷方式是否被執行過,標誌值更改的時機為方法執行完之後,當有多個goroutine進行呼叫的時候,使用double-check方式進行驗證,首先在在沒有同步方式的情況下,進行標誌值的判定,為0則競爭獲取mutex鎖,進入臨界區內,此時會在此進行標誌值的判斷,確保方法真的被執行一次。double-check第一次是為了更快的進行判斷,但是存在錯誤的情況,第二次check是為了正確的確定標誌值此時的狀態。

使用:

func main() {
    var once sync.Once
    onceBody := func() {
        time.Sleep(3e9)
        fmt.Println("Only once")
    }
    done := make(chan bool)
    for i := 0; i < 10; i++ {
        j := i
        go func(int) {
            once.Do(onceBody)
            fmt.Println(j)
            done <- true
        }(j)
    }
    //給一部分時間保證能夠輸出完整【方法一】
    //for i := 0; i < 10; i++ {
    //    <-done
    //}

    //給一部分時間保證能夠輸出完整【方法二】
    <-done
    time.Sleep(3e9)
}
登入後複製

5. Sync.context

場景

當需要進行多批次的計算任務同步,或者需要一對多的共同作業流程的時候

使用舉例

func coordinateWithContext() {
 total := 12
 var num int32
 fmt.Printf("The number: %d [with context.Context]\n", num)
 cxt, cancelFunc := context.WithCancel(context.Background())
 for i := 1; i <= total; i++ {
  go addNum(&num, i, func() {
   if atomic.LoadInt32(&num) == int32(total) {
    cancelFunc()
   }
  })
 }
 <-cxt.Done()
 fmt.Println("End.")
}
登入後複製

注意事項

a.如何生成自己的context

通過WithCancel、WithDeadline、WithTimeout和WithValue四個方法從context.Background中派生出自己的子context

注意context.background這個上下文根節點僅僅是一個最基本的支點,它不提供任何額外的功能,也就是說,它既不可以被複原(cancel),也不能攜帶任何資料,在使用是必須通過以上4種方法派生出自己的context

b.子context是會繼承父context的值

c.復原訊息的傳播

復原訊息會按照深度遍歷的方式傳播給子context(注意因為多routine呼叫的原因,最終的復原順序可能不會是深度遍歷的順序)

,在遍歷的過程中,通過WithCancel、WithDeadline、WithTimeout派生的context會被複原,但是通過WithValue方法派生的context不會被複原

6. Sync.pool

7.atomic包,針對變數進行操作

我們呼叫sync/atomic中的幾個函數可以對幾種簡單的型別進行原子操作。這些型別包括int32,int64,uint32,uint64,uintptr,unsafe.Pointer,共6個。這些函數的原子操作共有5種:增或減,比較並交換、載入、儲存和交換它們提供了不同的功能,切使用的場景也有區別。

增或減

   顧名思義,原子增或減即可實現對被操作值的增大或減少。因此該操作只能運算元值型別。

   被用於進行增或減的原子操作都是以「Add」為字首,並後面跟針對具體型別的名稱。

//方法原始碼
func AddUint32(addr *uint32, delta uint32) (new uint32)
登入後複製

栗子:(在原來的基礎上加n)

atomic.AddUint32(&addr,n)
登入後複製

栗子:(在原來的基礎上加n(n為負數))

atomic.AddUint32(*addr,uint32(int32(n)))
//或
atomic.AddUint32(&addr,^uint32(-n-1))
登入後複製

比較並交換

   比較並交換----Compare And Swap 簡稱CAS

   他是假設被操作的值未曾被改變(即與舊值相等),並一旦確定這個假設的真實性就立即進行值替換

   如果想安全的並行一些型別的值,我們總是應該優先使用CAS

//方法原始碼
func CompareAndSwapInt32(addr *int32, old, new int32) (swapped bool)
登入後複製

栗子:(如果addr和old相同,就用new代替addr)

ok:=atomic.CompareAndSwapInt32(&addr,old,new)
登入後複製

載入

   如果一個寫操作未完成,有一個讀操作就已經發生了,這樣讀操作使很糟糕的。

   為了原子的讀取某個值sync/atomic程式碼包同樣為我們提供了一系列的函數。這些函數都以"Load"為字首,意為載入。

//方法原始碼
func LoadInt32(addr *int32) (val int32)
登入後複製

栗子

fun addValue(delta int32){
    for{
        v:=atomic.LoadInt32(&addr)
        if atomic.CompareAndSwapInt32(&v,addr,(delta+v)){
            break;
        }
    }
}
登入後複製

儲存

   與讀操作對應的是寫入操作,sync/atomic也提供了與原子的值載入函數相對應的原子的值儲存函數。這些函數的名稱均以「Store」為字首

   在原子的儲存某個值的過程中,任何cpu都不會進行鍼對進行同一個值的讀或寫操作。如果我們把所有針對此值的寫操作都改為原子操作,那麼就不會出現針對此值的讀操作讀操作因被並行的進行而讀到修改了一半的情況。

   原子操作總會成功,因為他不必關心被操作值的舊值是什麼。

//方法原始碼
func StoreInt32(addr *int32, val int32)
登入後複製

栗子

atomic.StoreInt32(被操作值的指標,新值)
atomic.StoreInt32(&value,newaddr)
登入後複製

交換

   原子交換操作,這類函數的名稱都以「Swap」為字首。

   與CAS不同,交換操作直接賦予新值,不管舊值。

   會返回舊值

//方法原始碼
func SwapInt32(addr *int32, new int32) (old int32)
登入後複製

栗子

atomic.SwapInt32(被操作值的指標,新值)(返回舊值)
oldval:=atomic.StoreInt32(&value,newaddr)
登入後複製

擴充套件知識:Sync包簡述

1. 什麼是Sync包?

Package sync provides basic synchronization primitives such as mutual exclusion locks. Other than the Once and WaitGroup types, most are intended for use by low-level library routines. Higher-level synchronization is better done via channels and communication.

Values containing the types defined in this package should not be copied.

這句話大意是說:
Sync包同步提供基本的同步原語,如互斥鎖。 除了Once和WaitGroup型別之外,大多數型別都是供低階庫例程使用的。 通過Channel和溝通可以更好地完成更高階別的同步。並且此包中的值在使用過後不要拷貝。

從描述中可以看到的是,golang 並不推薦這個包中的大多數並行控制方法,但還是提供了相關方法,主要原因是golang中提倡以共用記憶體的方式來通訊:

不要以共用記憶體的方式來通訊,作為替代,我們應該以通訊的手段來共用記憶體

共用記憶體的方式使得多執行緒中的通訊變得簡單,但是在並行的安全性控制上將變得異常繁瑣。
正確性不是我們唯一想要的,我們想要的還有系統的可伸縮性,以及可理解性,我覺得這點非常重要,比如現在廣泛使用的Raft演演算法。

2. 包中的Type

包中主要有: Locker, Cond, Map, Mutex, Once, Pool,
RWMutex, WaitGroup

type Locker interface {
        Lock()
        Unlock()
}
type Cond struct {
        // L is held while observing or changing the condition
        L Locker
}
登入後複製

3. 什麼是鎖,為什麼需要鎖?

鎖是sync包中的核心,他主要有兩個方法,加鎖和解鎖。
在單執行緒執行的時候程式是順序執行的,程式對資料的存取也是:
讀取 => 一頓操作(加減乘除之類的) => 寫回原地址
但是一旦程式中進行了並行程式設計,也就是說,某一個函數可能同時被不同的執行緒執行的時候,以時間為維度會發生以下情況:

2.png

可以看到的是,A地址的數位被執行了兩次自增,若A=5,我們在執行完成後預期的A值是7,但是在這種情況下我們得到的A卻是6,bug了~
還有很多類似的並行錯誤,所以才有鎖的引入。若是我們線上程2讀取A的值的時候對A進行加鎖,讓執行緒2等待,執行緒1執行完成之後在執行執行緒2,這樣就能夠保證資料的正確性。但是正確性不是我們唯一想要的。

4 寫更優雅的程式碼

在很多語言中我們經常為了保證資料安全正確,會在並行的時候對資料加鎖

Lock()
doSomething()
Unlock()
登入後複製

Golang在此包中也提供了相關的鎖,但是標明瞭"most are intended for use by low-level library routines" 所以我這裡只對 Once and WaitGroup types做簡述。

5.Once 物件

Once 是一個可以被多次呼叫但是隻執行一次,若每次呼叫Do時傳入引數f不同,但是隻有第一個才會被執行。

func (o *Once) Do(f func())
登入後複製
    var once sync.Once
    onceBody := func() {
        fmt.Println("Only once")
    }
    done := make(chan bool)
    for i := 0; i < 10; i++ {
        go func() {
            once.Do(onceBody)
            done <- true
        }()
    }
    for i := 0; i < 10; i++ {
        <-done
    }
登入後複製

如果你執行這段程式碼會發現,雖然呼叫了10次,但是隻執行了1次。BTW:這個東西可以用來寫單例。

6. WaitGroup

下面是個官方的例子:

var wg sync.WaitGroup
var urls = []string{
        "http://www.golang.org/",
        "http://www.google.com/",
        "http://www.somestupidname.com/",
}
for _, url := range urls {
        // Increment the WaitGroup counter.
        wg.Add(1)
        // Launch a goroutine to fetch the URL.
        go func(url string) {
                // Decrement the counter when the goroutine completes.
                defer wg.Done()
                // Fetch the URL.
                http.Get(url)
        }(url)
}
// Wait for all HTTP fetches to complete.
wg.Wait()
登入後複製

7. 簡述

Golang中高階的並行可以通過channel來實現,這是golang所倡導的,但是go也提供了鎖等先關操作。

更多程式設計相關知識,請存取:!!

以上就是go語言同步機制有哪些的詳細內容,更多請關注TW511.COM其它相關文章!