作者:dcguo,騰訊 CSIG 電子簽開放平臺中心
go 原生/擴展庫提倡的原則 不要通過共享內存進行通信;相反,通過通信來共享內存。 Goroutine goroutine 并發(fā)模型 調度器主要結構 主要調度器結構是 M,P,G
P 的數(shù)量由環(huán)境變量中的 GOMAXPROCS 決定,通常來說和核心數(shù)對應。 映射關系用戶空間線程和內核空間線程映射關系有如下三種:
調度圖關系如圖,灰色的 G 則是暫時還未運行的,處于就緒態(tài),等待被調度,這個隊列被 P 維護 注: 簡單調度圖如上,有關于 P 再多個 M 中切換,公共 goroutine 隊列,M 從線程緩存中創(chuàng)建等步驟沒有體現(xiàn),復雜過程可以參考文章簡單了解 goroutine 如何實現(xiàn)。 goroutine 使用
go list.Sort()
channelchannel 特性 創(chuàng)建 // 創(chuàng)建 channela := make(chan int)b := make(chan int, 10)// 單向 channelc := make(chan<- int)d := make(<-chan int) 存入/讀取/關閉tip:
channel 使用/基礎
ci := make(chan int)cj := make(chan int, 0)cs := make(chan *os.File, 100)
func Server(queue chan *Request) {for req := range queue {sem <- 1go func() {process(req)<- sem}()}}
func Serve(queue chan *Request) {for req := range queue {req := reqsem <- 1go func() {process(req)<-sem}()}} channel 使用/技巧等待一個事件,也可以通過 close 一個 channel 就足夠了。
阻塞程序開源項目【是一個支持集群的 im 及實時推送服務】里面的基準測試的案例 取最快結果func main() { ret := make(chan string, 3) for i := 0; i < cap(ret); i++ { go call(ret) } fmt.Println(<-ret)}func call(ret chan<- string) { // do something // ... ret <- 'result'} 協(xié)同多個 goroutines注: 協(xié)同多個 goroutines 方案很多,這里只展示 channel 的一種。
搭配 select 操作for { select { case a := <- testChanA: // todo a case b, ok := testChanB: // todo b, 通過 ok 判斷 tesChanB 的關閉情況 default: // 默認分支 }} main go routinue 確認 worker goroutinue 真正退出的方式
關閉的 channel 不會被阻塞testChan := make(chan bool)close(testChan)zeroValue := <- testChanfmt.Println(zeroValue) // falsetestChan <- true // panic: send on closed channel 注: 如果是 buffered channel, 即使被 close, 也可以讀到之前存入的值,讀取完畢后開始讀零值,寫入則會觸發(fā) panic nil channel 讀取和存入都不會阻塞,close 會 panic略 range 遍歷 channel
例: 唯一 idfunc newUniqueIdService() <-chan string { id := make(chan string) go func() { var counter int64 = 0 for { id <- fmt.Sprintf('%x', counter) counter += 1 } }() return id}func newUniqueIdServerMain() { id := newUniqueIdService() for i := 0; i < 10; i++ { fmt.Println(<- id) }} 帶緩沖隊列構造略 超時 timeout 和心跳 heart beat
demo 開源 im/goim 項目中的應用 2.心跳 done := make(chan bool)defer func() {close(done)}()ticker := time.NewTicker(10 * time.Second)go func() {for {select {case <-done:ticker.Stop()returncase <-ticker.C:message.Touch()}}}()} 多個 goroutine 同步響應
利用 channel 阻塞的特性和帶緩沖的 channel 來實現(xiàn)控制并發(fā)數(shù)量func channel() { count := 10 // 最大并發(fā) sum := 100 // 總數(shù) c := make(chan struct{}, count) sc := make(chan struct{}, sum) defer close(c) defer close(sc) for i:=0; i<sum; i++ { c <- struct{} go func(j int) { fmt.Println(j) <- c // 執(zhí)行完畢,釋放資源 sc <- struct {}{} // 記錄到執(zhí)行總數(shù) } } for i:=sum; i>0; i++ { <- sc }} go 并發(fā)編程(基礎庫)
Mutex/RWMutex
Mutex demo
結果: 可以看到整個執(zhí)行持續(xù)了 3 s 多,內部多個協(xié)程已經被 “鎖” 住了。 RWMutex demo 注意: 這東西可以并發(fā)讀,不可以并發(fā)讀寫/并發(fā)寫寫,不過現(xiàn)在即便場景是讀多寫少也很少用到這,一般集群環(huán)境都得分布式鎖了。 package mainimport ( 'fmt' 'sync' 'time')var m *sync.RWMutexfunc init() { m = new(sync.RWMutex)}func main() { go read() go read() go write() time.Sleep(time.Second * 3)}func read() { m.RLock() fmt.Println('startR') time.Sleep(time.Second) fmt.Println('endR') m.RUnlock()}func write() { m.Lock() fmt.Println('startW') time.Sleep(time.Second) fmt.Println('endW') m.Unlock()} 輸出: Atomic
demo:增
結果: WaitGroup/ErrGroup
注意
demo: errGroup package mainimport ( 'golang.org/x/sync/errgroup' 'log' 'net/http')func main() { var g errgroup.Group var urls = []string{ 'https://github.com/', 'errUrl', } for _, url := range urls { url := url g.Go(func() error { resp, err := http.Get(url) if err == nil { _ = resp.Body.Close() } return err }) } err := g.Wait() if err != nil { log.Fatal('getErr', err) return }} 結果: once
demo:
結果: Context
對他的說明文章太多了,詳細可以跳轉看這篇 一文理解 golang context 這邊列一個遇到得問題:
并行
我們可以再每個 CPU 上進行循環(huán)無關的迭代計算,我們僅需要創(chuàng)建完所有的 goroutine 后,從 channel 中讀取結束信號進行計數(shù)即可。 并發(fā)編程/工作流方案擴展
singlelFlight(go 官方擴展同步包)
demo package mainimport ( 'golang.org/x/sync/singleflight' 'log' 'math/rand' 'sync' 'time')var ( g singleflight.Group)const ( funcKey = 'key' times = 5 randomNum = 100)func init() { rand.Seed(time.Now().UnixNano())}func main() { var wg sync.WaitGroup wg.Add(times) for i := 0; i < times; i++ { go func() { defer wg.Done() num, err := run(funcKey) if err != nil { log.Fatal(err) return } log.Println(num) }() } wg.Wait()}func run(key string) (num int, err error) { v, err, isShare := g.Do(key, func() (interface{}, error) { time.Sleep(time.Second * 5) num = rand.Intn(randomNum) //[0,100) return num, nil }) if err != nil { log.Fatal(err) return 0, err } data := v.(int) log.Println(isShare) return data, nil} 連續(xù)執(zhí)行 3 次,返回結果如下,全部取了共享得結果: 但是注釋掉 time.Sleep(time.Second * 5) 再嘗試一次看看。 ![]() 這次全部取得真實值 實踐: 伙伴部門高峰期可以減少 20% 的 Redis 調用, 大大減少了 Redis 的負載 實踐開發(fā)案例
批量校驗
批量校驗接口限頻單賬戶最高 100qps/s,整個系統(tǒng)多個校驗場景公用一個賬戶限頻需要限制批量校驗最高為 50~80 qps/s(需要預留令牌供其他場景使用,否則頻繁調用批量接口時候其他場景均會失敗限頻)。
1.使用 go routine 來并發(fā)進行三要素校驗,因為 go routinue,所以每次開啟 50 ~ 80 go routine 同時進行單次三要素校驗; 2.每輪校驗耗時 1s,如果所有 routinue 校驗后與校驗開始時間間隔不滿一秒,則需要主動程序睡眠至 1s,然后開始下輪校驗; 3.因為只是校驗場景,如果某次校驗失敗,最容易的原因其實是校驗方異常,或者被其他校驗場景再當前 1s 內消耗過多令牌;那么整個批量接口返回 err,運營同學重新發(fā)起就好。
代碼需要進行的優(yōu)化點:
1.sleep 1s 這個操作可以從調用前開始計時,調用完成后不滿 1s 補充至 1s,而不是每次最長調用時間 elapsedTime + 1s; 2.通道中獲取的三要素校驗結果順序和入參數(shù)據數(shù)組順序不對應,這里通過兩種方案: 3.分組調用 ![]()
歷史數(shù)據批量標簽
func (s ServiceOnceJob) CompensatingHistoricalLabel(ctx context.Context,request *api.CompensatingHistoricalLabelRequest) (response *api.CompensatingHistoricalLabelResponse, err error) {if request.Key != interfaceKey {return nil, transform.Simple('err')}ctx, cancelFunc := context.WithCancel(ctx)var (wg = new(sync.WaitGroup)userRegisterDb = new(datareportdb.DataReportUserRegisteredRecords)startNum = int64(0))wg.Add(1)countHistory, err := userRegisterDb.GetUserRegisteredCountForHistory(ctx, historyStartTime, historyEndTime)if err != nil {return nil, err}div := decimal.NewFromFloat(float64(countHistory)).Div(decimal.NewFromFloat(float64(theNumberOfConcurrent)))f, _ := div.Float64()num := int64(math.Ceil(f))for i := 0; i < theNumberOfConcurrent; i++ {go func(startNum int64) {defer wg.Done()for {select {case <- ctx.Done():returndefault:userDataArr, err := userRegisterDb.GetUserRegisteredDataHistory(ctx, startNum, num)if err != nil {cancelFunc()}for _, userData := range userDataArr {if err := analyseUserAction(userData); err != nil {cancelFunc()}}}}}(startNum)startNum = startNum + num}wg.Wait()return response, nil} 批量發(fā)起/批量簽署實現(xiàn)思路和上面其實差不多,都是需要支持批量的特性,基本上現(xiàn)在業(yè)務中統(tǒng)一使用多協(xié)程處理。 思考golang 協(xié)程很牛 x,協(xié)程的數(shù)目最大到底多大合適,有什么衡量指標么?
基本上可以這樣理解這件事
使用鎖時候正確釋放鎖的方式
goroutine 泄露預防與排查
goroutine 的退出其實只有以下幾種方式可以做到
大多數(shù)引起 goroutine 泄露的原因基本上都是如下情況
杜絕:
排查:
案例:
輸出: ![]() pprof:
![]() 復雜情況也可以用其他的可視化工具:
![]() 父協(xié)程捕獲子協(xié)程 panic
父協(xié)程捕獲子協(xié)程 panic 有鎖的地方就去用 channel 優(yōu)化
分享一個很不錯的優(yōu)化 demo: 場景:
分析:
問題:
解決
增加鎖機制,解決針對鏈接池的并發(fā)問題發(fā)送消息也需要去加鎖因為要防止出現(xiàn) panic: concurrent write to websocket connection
假設網絡延時,用戶新增時候還有消息再發(fā)送中,新加入的用戶就無法獲得鎖了,后面其他的相關操作都會被阻塞導致問題。 使用 channel 優(yōu)化:
2.使用通道
3.通道消息方法,代碼來自于開源項目 簡單聊天架構演變: // 處理所有管道任務func (room *Room) ProcessTask() {log := zap.S()log.Info('啟動處理任務')for {select {case c := <-room.register:log.Info('當前有客戶端進行注冊')room.clientsPool[c] = truecase c := <-room.unregister:log.Info('當前有客戶端離開')if room.clientsPool[c] {close(c.send)delete(room.clientsPool, c)}case m := <-room.send:for c := range room.clientsPool {select {case c.send <- m:default:break}}}}} 結果: 成功使用 channel 替換了鎖。 參考
|
|
來自: 菌心說 > 《編程+、計算機、信息技術》