日韩黑丝制服一区视频播放|日韩欧美人妻丝袜视频在线观看|九九影院一级蜜桃|亚洲中文在线导航|青草草视频在线观看|婷婷五月色伊人网站|日本一区二区在线|国产AV一二三四区毛片|正在播放久草视频|亚洲色图精品一区

分享

手把手教姐姐寫消息隊(duì)列

 F2967527 2021-04-24

前言

這周姐姐入職了新公司,老板想探探他的底,看了一眼他的簡(jiǎn)歷,呦呵,精通kafka,這小姑娘有兩下子,既然這樣,那你寫一個(gè)消息隊(duì)列吧。因?yàn)橐胓o語言寫,這可給姐姐愁壞了。趕緊來求助我,我這么堅(jiān)貞不屈一人,在姐姐的軟磨硬泡下還是答應(yīng)他了,所以接下來我就手把手教姐姐怎么寫一個(gè)消息隊(duì)列。下面我們就來看一看我是怎么寫的吧~~~。

本代碼已上傳到我的github:

有需要的小伙伴,可自行下載,順便給個(gè)小星星吧~~~

什么是消息隊(duì)列

姐姐真是把我愁壞了,自己寫的精通kafka,竟然不知道什么是消息隊(duì)列,于是,一向好脾氣的我開始給姐姐講一講什么是消息隊(duì)列。

消息隊(duì)列,我們一般稱它為MQ(Message Queue),兩個(gè)單詞的結(jié)合,這兩個(gè)英文單詞想必大家都應(yīng)該知道吧,其實(shí)最熟悉的還是Queue吧,即隊(duì)列。隊(duì)列是一種先進(jìn)先出的數(shù)據(jù)結(jié)構(gòu),隊(duì)列的使用還是比較普遍的,但是已經(jīng)有隊(duì)列了,怎么還需要MQ呢?

我:?jiǎn)柲隳?,姐姐,知道嗎?為什么還需要MQ

姐姐:快點(diǎn)講,想挨打呀?

我:噗。。。算我多嘴,哼~~~

欠欠的我開始了接下來的耐心講解......

舉一個(gè)簡(jiǎn)單的例子,假設(shè)現(xiàn)在我們要做一個(gè)系統(tǒng),該登陸系統(tǒng)需要在用戶登陸成功后,發(fā)送封郵件到用戶郵箱進(jìn)行提醒,需求還是很簡(jiǎn)單的,我們先看一看沒有MQ,我們?cè)撛趺磳?shí)現(xiàn)呢?畫一個(gè)時(shí)序圖來看一看:

圖片

看這個(gè)圖,郵件發(fā)送在請(qǐng)求登陸時(shí)進(jìn)行,當(dāng)密碼驗(yàn)證成功后,就發(fā)送郵件,然后返回登陸成功。這樣是可以的,但是他是有缺陷的。這讓我們的登陸操作變得復(fù)雜了,每次請(qǐng)求登陸都需要進(jìn)行郵件發(fā)送,如果這里出現(xiàn)錯(cuò)誤,整個(gè)登陸請(qǐng)求也出現(xiàn)了錯(cuò)誤,導(dǎo)致登陸不成功;還有一個(gè)問題,本來我們登陸請(qǐng)求調(diào)用接口僅僅需要100ms,因?yàn)橹虚g要做一次發(fā)送郵件的等待,那么調(diào)用一次登陸接口的時(shí)間就要增長(zhǎng),這就是問題所在,一封郵件他的優(yōu)先級(jí) 不是很高的,用戶也不需要實(shí)時(shí)收到這封郵件,所以這時(shí),就體現(xiàn)了消息隊(duì)列的重要性了,我們用消息隊(duì)列進(jìn)行改進(jìn)一下。

圖片

這里我們將發(fā)送郵件請(qǐng)求放到Mq中,這樣我們就能提高用戶體驗(yàn)的吞吐量,這個(gè)很重要,顧客就是上帝嘛,畢竟也沒有人喜歡用一個(gè)很慢很慢的app。

這里只是舉了MQ眾多應(yīng)用中的其中一個(gè),即異步應(yīng)用,MQ還在系統(tǒng)解藕、削峰/限流中有著重要應(yīng)用,這兩個(gè)我就不具體講解了,原理都一樣,好好思考一下,你們都能懂得。

channel

好啦,姐姐終于知道什么是消息隊(duì)列了,但是現(xiàn)在還是沒法進(jìn)行消息隊(duì)列開發(fā)的,因?yàn)檫€差一個(gè)知識(shí)點(diǎn),即go語言中的channel。這個(gè)很重要,我們還需要靠這個(gè)來開發(fā)我們的消息隊(duì)列呢。

因篇幅有限,這里不詳細(xì)介紹channel,只介紹基本使用方法。

什么是channel

Goroutine 和 Channel 是 Go 語言并發(fā)編程的兩大基石。Goroutine 用于執(zhí)行并發(fā)任務(wù),Channel 用于 goroutine 之間的同步、通信。Go提倡使用通信的方法代替共享內(nèi)存,當(dāng)一個(gè)Goroutine需要和其他Goroutine資源共享時(shí),Channel就會(huì)在他們之間架起一座橋梁,并提供確保安全同步的機(jī)制。channel本質(zhì)上其實(shí)還是一個(gè)隊(duì)列,遵循FIFO原則。具體規(guī)則如下:

  • 先從 Channel 讀取數(shù)據(jù)的 Goroutine 會(huì)先接收到數(shù)據(jù);
  • 先向 Channel 發(fā)送數(shù)據(jù)的 Goroutine 會(huì)得到先發(fā)送數(shù)據(jù)的權(quán)利;

創(chuàng)建通道

創(chuàng)建通道需要用到關(guān)鍵字 make ,格式如下:

通道實(shí)例 := make(chan 數(shù)據(jù)類型)
  • 數(shù)據(jù)類型:通道內(nèi)傳輸?shù)脑仡愋汀?/section>
  • 通道實(shí)例:通過make創(chuàng)建的通道句柄。

無緩沖通道的使用

Go語言中無緩沖的通道(unbuffered channel)是指在接收前沒有能力保存任何值的通道。這種類型的通道要求發(fā)送 goroutine 和接收 goroutine 同時(shí)準(zhǔn)備好,才能完成發(fā)送和接收操作。

無緩沖通道的定義方式如下:

通道實(shí)例 := make(chan 通道類型)
  • 通道類型:和無緩沖通道用法一致,影響通道發(fā)送和接收的數(shù)據(jù)類型。
  • 緩沖大?。?
  • 通道實(shí)例:被創(chuàng)建出的通道實(shí)例。

寫個(gè)例子來幫助大家理解一下吧:

package main

import (
    'sync'
    'time'
)

func main() {
    c := make(chan string)

    var wg sync.WaitGroup
    wg.Add(2)

    go func() {
        defer wg.Done()
        c <- `Golang夢(mèng)工廠`
    }()

    go func() {
        defer wg.Done()

        time.Sleep(time.Second * 1)
        println(`Message: `+ <-c)
    }()

    wg.Wait()
}

帶緩沖的通道的使用

Go語言中有緩沖的通道(buffered channel)是一種在被接收前能存儲(chǔ)一個(gè)或者多個(gè)值的通道。這種類型的通道并不強(qiáng)制要求 goroutine 之間必須同時(shí)完成發(fā)送和接收。通道會(huì)阻塞發(fā)送和接收動(dòng)作的條件也會(huì)不同。只有在通道中沒有要接收的值時(shí),接收動(dòng)作才會(huì)阻塞。只有在通道沒有可用緩沖區(qū)容納被發(fā)送的值時(shí),發(fā)送動(dòng)作才會(huì)阻塞。

有緩沖通道的定義方式如下:

通道實(shí)例 := make(chan 通道類型, 緩沖大小)
  • 通道類型:和無緩沖通道用法一致,影響通道發(fā)送和接收的數(shù)據(jù)類型。
  • 緩沖大小:決定通道最多可以保存的元素?cái)?shù)量。
  • 通道實(shí)例:被創(chuàng)建出的通道實(shí)例。

來寫一個(gè)例子講解一下:

package main

import (
    'sync'
    'time'
)

func main() {
    c := make(chan string2)

    var wg sync.WaitGroup
    wg.Add(2)

    go func() {
        defer wg.Done()

        c <- `Golang夢(mèng)工廠`
        c <- `asong`
    }()

    go func() {
        defer wg.Done()

        time.Sleep(time.Second * 1)
        println(`公眾號(hào): `+ <-c)
        println(`作者: `+ <-c)
    }()

    wg.Wait()
}

好啦,通道的概念就介紹到這里了,如果需要,下一篇我出一個(gè)channel詳細(xì)講解的文章。

消息隊(duì)列編碼實(shí)現(xiàn)

準(zhǔn)備篇

終于開始進(jìn)入主題了,姐姐都聽的快要睡著了,我轟隆一嗓子,立馬精神,但是呢,asong也是挨了一頓小電炮,代價(jià)慘痛呀,嗚嗚嗚............

在開始編寫代碼編寫直接,我需要構(gòu)思我們的整個(gè)代碼架構(gòu),這才是正確的編碼方式。我們先來定義一個(gè)接口,把我們需要實(shí)現(xiàn)的方法先列出來,后期對(duì)每一個(gè)代碼進(jìn)行實(shí)現(xiàn)就可以了。因此可以列出如下方法:

type Broker interface {
 publish(topic string, msg interface{}) error
 subscribe(topic string) (<-chan interface{}, error)
 unsubscribe(topic string, sub <-chan interface{}) error
 close()
 broadcast(msg interface{}, subscribers []chan interface{})
 setConditions(capacity int)
}
  • publish:進(jìn)行消息的推送,有兩個(gè)參數(shù)即topic、msg,分別是訂閱的主題、要傳遞的消息
  • subscribe:消息的訂閱,傳入訂閱的主題,即可完成訂閱,并返回對(duì)應(yīng)的channel通道用來接收數(shù)據(jù)
  • unsubscribe:取消訂閱,傳入訂閱的主題和對(duì)應(yīng)的通道
  • Golang夢(mèng)工廠 發(fā)起了一個(gè)讀者討論 小伙伴們,你們有什么想學(xué)的嘛,可以留言的呦
  • close:這個(gè)的作用就是很明顯了,就是用來關(guān)閉消息隊(duì)列的
  • broadCast:這個(gè)屬于內(nèi)部方法,作用是進(jìn)行廣播,對(duì)推送的消息進(jìn)行廣播,保證每一個(gè)訂閱者都可以收到
  • setConditions:這里是用來設(shè)置條件,條件就是消息隊(duì)列的容量,這樣我們就可以控制消息隊(duì)列的大小了

細(xì)心的你們有沒有發(fā)現(xiàn)什么問題,這些代碼我都定義的是內(nèi)部方法,也就是包外不可用。為什么這么做呢,因?yàn)檫@里屬于代理要做的事情,我們還需要在封裝一層,也就是客戶端能直接調(diào)用的方法,這樣才符合軟件架構(gòu)。因此可以寫出如下代碼:

package mq


type Client struct {
 bro *BrokerImpl
}

func NewClient() *Client {
 return &Client{
  bro: NewBroker(),
 }
}

func (c *Client)SetConditions(capacity int)  {
 c.bro.setConditions(capacity)
}

func (c *Client)Publish(topic string, msg interface{}) error{
 return c.bro.publish(topic,msg)
}

func (c *Client)Subscribe(topic string) (<-chan interface{}, error){
 return c.bro.subscribe(topic)
}

func (c *Client)Unsubscribe(topic string, sub <-chan interface{}) error {
 return c.bro.unsubscribe(topic,sub)
}

func (c *Client)Close()  {
  c.bro.close()
}

func (c *Client)GetPayLoad(sub <-chan interface{})  interface{}{
 for val:= range sub{
  if val != nil{
   return val
  }
 }
 return nil
}

上面只是準(zhǔn)好了代碼結(jié)構(gòu),但是消息隊(duì)列實(shí)現(xiàn)的結(jié)構(gòu)我們還沒有設(shè)計(jì),現(xiàn)在我們就來設(shè)計(jì)一下。

type BrokerImpl struct {
 exit chan bool
 capacity int

 topics map[string][]chan interface{} // key:topic value :queue
 sync.RWMutex // 同步鎖
}
  • exit:也是一個(gè)通道,這個(gè)用來做關(guān)閉消息隊(duì)列用的
  • capacity:即用來設(shè)置消息隊(duì)列的容量
  • topics:這里使用一個(gè)map結(jié)構(gòu),key即是topic,其值則是一個(gè)切片,chan類型,這里這么做的原因是我們一個(gè)topic可以有多個(gè)訂閱者,所以一個(gè)訂閱者對(duì)應(yīng)著一個(gè)通道
  • sync.RWMutex:讀寫鎖,這里是為了防止并發(fā)情況下,數(shù)據(jù)的推送出現(xiàn)錯(cuò)誤,所以采用加鎖的方式進(jìn)行保證

好啦,現(xiàn)在我們已經(jīng)準(zhǔn)備的很充分啦,開始接下來方法填充之旅吧~~~

Publishbroadcast

這里兩個(gè)合在一起講的原因是braodcast是屬于publish里的。這里的思路很簡(jiǎn)單,我們只需要把傳入的數(shù)據(jù)進(jìn)行廣播即可了,下面我們來看代碼實(shí)現(xiàn):

func (b *BrokerImpl) publish(topic string, pub interface{}) error {
 select {
 case <-b.exit:
  return errors.New('broker closed')
 default:
 }

 b.RLock()
 subscribers, ok := b.topics[topic]
 b.RUnlock()
 if !ok {
  return nil
 }

 b.broadcast(pub, subscribers)
 return nil
}


func (b *BrokerImpl) broadcast(msg interface{}, subscribers []chan interface{}) {
 count := len(subscribers)
 concurrency := 1

 switch {
 case count > 1000:
  concurrency = 3
 case count > 100:
  concurrency = 2
 default:
  concurrency = 1
 }
 pub := func(start int) {
  for j := start; j < count; j += concurrency {
   select {
   case subscribers[j] <- msg:
   case <-time.After(time.Millisecond * 5):
   case <-b.exit:
    return
   }
  }
 }
 for i := 0; i < concurrency; i++ {
  go pub(i)
 }
}

publish方法中沒有什么好講的,這里主要說一下broadcast的實(shí)現(xiàn):

這里主要對(duì)數(shù)據(jù)進(jìn)行廣播,所以數(shù)據(jù)推送出去就可以了,沒必要一直等著他推送成功,所以這里我們我們采用goroutine。在推送的時(shí)候,當(dāng)推送失敗時(shí),我們也不能一直等待呀,所以這里我們加了一個(gè)超時(shí)機(jī)制,超過5毫秒就停止推送,接著進(jìn)行下面的推送。

可能你們會(huì)有疑惑,上面怎么還有一個(gè)switch選項(xiàng)呀,干什么用的呢?考慮這樣一個(gè)問題,當(dāng)有大量的訂閱者時(shí),,比如10000個(gè),我們一個(gè)for循環(huán)去做消息的推送,那推送一次就會(huì)耗費(fèi)很多時(shí)間,并且不同的消費(fèi)者之間也會(huì)產(chǎn)生延時(shí),,所以采用這種方法進(jìn)行分解可以降低一定的時(shí)間。

subscribeunsubScribe

我們先來看代碼:

func (b *BrokerImpl) subscribe(topic string) (<-chan interface{}, error) {
 select {
 case <-b.exit:
  return nil, errors.New('broker closed')
 default:
 }

 ch := make(chan interface{}, b.capacity)
 b.Lock()
 b.topics[topic] = append(b.topics[topic], ch)
 b.Unlock()
 return ch, nil
}
func (b *BrokerImpl) unsubscribe(topic string, sub <-chan interface{}) error {
 select {
 case <-b.exit:
  return errors.New('broker closed')
 default:
 }

 b.RLock()
 subscribers, ok := b.topics[topic]
 b.RUnlock()

 if !ok {
  return nil
 }
 // delete subscriber
 var newSubs []chan interface{}
 for _, subscriber := range subscribers {
  if subscriber == sub {
   continue
  }
  newSubs = append(newSubs, subscriber)
 }

 b.Lock()
 b.topics[topic] = newSubs
 b.Unlock()
 return nil
}

這里其實(shí)就很簡(jiǎn)單了:

  • subscribe:這里的實(shí)現(xiàn)則是為訂閱的主題創(chuàng)建一個(gè)channel,然后將訂閱者加入到對(duì)應(yīng)的topic中就可以了,并且返回一個(gè)接收channel。
  • unsubScribe:這里實(shí)現(xiàn)的思路就是將我們剛才添加的channel刪除就可以了。

close

func (b *BrokerImpl) close()  {
 select {
 case <-b.exit:
  return
 default:
  close(b.exit)
  b.Lock()
  b.topics = make(map[string][]chan interface{})
  b.Unlock()
 }
 return
}

這里就是為了關(guān)閉整個(gè)消息隊(duì)列,這句代碼b.topics = make(map[string][]chan interface{})比較重要,這里主要是為了保證下一次使用該消息隊(duì)列不發(fā)生沖突。

setConditions GetPayLoad

還差最后兩個(gè)方法,一個(gè)是設(shè)置我們的消息隊(duì)列容量,另一個(gè)是封裝一個(gè)方法來獲取我們訂閱的消息:

func (b *BrokerImpl)setConditions(capacity int)  {
 b.capacity = capacity
}
func (c *Client)GetPayLoad(sub <-chan interface{})  interface{}{
 for val:= range sub{
  if val != nil{
   return val
  }
 }
 return nil
}

測(cè)試

好啦,代碼這么快就被寫完了,接下來我們進(jìn)行測(cè)試一下吧。

單元測(cè)試

正式測(cè)試之前,我們還是需要先進(jìn)行一下單元測(cè)試,養(yǎng)成好的習(xí)慣,只有先自測(cè)了,才能有底氣說我的代碼沒問題,要不直接跑程序,會(huì)出現(xiàn)很多bug的。

這里我們測(cè)試方法如下:我們向不同的topic發(fā)送不同的信息,當(dāng)訂閱者收到消息后,就行取消訂閱。

func TestClient(t *testing.T) {
 b := NewClient()
 b.SetConditions(100)
 var wg sync.WaitGroup

 for i := 0; i < 100; i++ {
  topic := fmt.Sprintf('Golang夢(mèng)工廠%d', i)
  payload := fmt.Sprintf('asong%d', i)

  ch, err := b.Subscribe(topic)
  if err != nil {
   t.Fatal(err)
  }

  wg.Add(1)
  go func() {
   e := b.GetPayLoad(ch)
   if e != payload {
    t.Fatalf('%s expected %s but get %s', topic, payload, e)
   }
   if err := b.Unsubscribe(topic, ch); err != nil {
    t.Fatal(err)
   }
   wg.Done()
  }()

  if err := b.Publish(topic, payload); err != nil {
   t.Fatal(err)
  }
 }

 wg.Wait()
}

測(cè)試通過,沒問題,接下來我們?cè)趯憥讉€(gè)方法測(cè)試一下

測(cè)試

這里分為兩種方式測(cè)試

測(cè)試一:使用一個(gè)定時(shí)器,向一個(gè)主題定時(shí)推送消息.

// 一個(gè)topic 測(cè)試
func OnceTopic()  {
 m := mq.NewClient()
 m.SetConditions(10)
 ch,err :=m.Subscribe(topic)
 if err != nil{
  fmt.Println('subscribe failed')
  return
 }
 go OncePub(m)
 OnceSub(ch,m)
 defer m.Close()
}

// 定時(shí)推送
func OncePub(c *mq.Client)  {
 t := time.NewTicker(10 * time.Second)
 defer t.Stop()
 for  {
  select {
  case <- t.C:
   err := c.Publish(topic,'asong真帥')
   if err != nil{
    fmt.Println('pub message failed')
   }
  default:

  }
 }
}

// 接受訂閱消息
func OnceSub(m <-chan interface{},c *mq.Client)  {
 for  {
  val := c.GetPayLoad(m)
  fmt.Printf('get message is %s\n',val)
 }
}

測(cè)試二:使用一個(gè)定時(shí)器,定時(shí)向多個(gè)主題發(fā)送消息:

//多個(gè)topic測(cè)試
func ManyTopic()  {
 m := mq.NewClient()
 defer m.Close()
 m.SetConditions(10)
 top := ''
 for i:=0;i<10;i++{
  top = fmt.Sprintf('Golang夢(mèng)工廠_%02d',i)
  go Sub(m,top)
 }
 ManyPub(m)
}

func ManyPub(c *mq.Client)  {
 t := time.NewTicker(10 * time.Second)
 defer t.Stop()
 for  {
  select {
  case <- t.C:
   for i:= 0;i<10;i++{
    //多個(gè)topic 推送不同的消息
    top := fmt.Sprintf('Golang夢(mèng)工廠_%02d',i)
    payload := fmt.Sprintf('asong真帥_%02d',i)
    err := c.Publish(top,payload)
    if err != nil{
     fmt.Println('pub message failed')
    }
   }
  default:

  }
 }
}

func Sub(c *mq.Client,top string)  {
 ch,err := c.Subscribe(top)
 if err != nil{
  fmt.Printf('sub top:%s failed\n',top)
 }
 for  {
  val := c.GetPayLoad(ch)
  if val != nil{
   fmt.Printf('%s get message is %s\n',top,val)
  }
 }
}

總結(jié)

終于幫助姐姐解決了這個(gè)問題,姐姐開心死了,給我一頓親,啊不對(duì),是一頓夸,夸的人家都不好意思了。

這一篇你學(xué)會(huì)了嗎?沒學(xué)會(huì)不要緊,趕快去把源代碼下載下來,好好通讀一下,很好理解的~~~。

其實(shí)這一篇是為了接下來的kafka學(xué)習(xí)打基礎(chǔ)的,學(xué)好了這一篇,接下來學(xué)習(xí)的kafka就會(huì)容易很多啦~~~

github地址:https://github.com/asong2020/Golang_Dream/tree/master/code_demo/queue

如果能給一個(gè)小星星就好了~~~

    本站是提供個(gè)人知識(shí)管理的網(wǎng)絡(luò)存儲(chǔ)空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點(diǎn)。請(qǐng)注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購買等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請(qǐng)點(diǎn)擊一鍵舉報(bào)。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評(píng)論

    發(fā)表

    請(qǐng)遵守用戶 評(píng)論公約

    類似文章 更多