前言
這周姐姐入職了新公司,老板想探探他的底,看了一眼他的簡(jiǎn)歷,呦呵,精通kafka,這小姑娘有兩下子,既然這樣,那你寫一個(gè)消息隊(duì)列吧。因?yàn)橐胓o語言寫,這可給姐姐愁壞了。趕緊來求助我,我這么堅(jiān)貞不屈一人,在姐姐的軟磨硬泡下還是答應(yīng)他了,所以接下來我就手把手教姐姐怎么寫一個(gè)消息隊(duì)列。下面我們就來看一看我是怎么寫的吧~~~。
本代碼已上傳到我的github:
有需要的小伙伴,可自行下載,順便給個(gè)小星星吧~~~
什么是消息隊(duì)列
姐姐真是把我愁壞了,自己寫的精通kafka
,竟然不知道什么是消息隊(duì)列,于是,一向好脾氣的我開始給姐姐講一講什么是消息隊(duì)列。
消息隊(duì)列,我們一般稱它為MQ(Message Queue)
,兩個(gè)單詞的結(jié)合,這兩個(gè)英文單詞想必大家都應(yīng)該知道吧,其實(shí)最熟悉的還是Queue
吧,即隊(duì)列。隊(duì)列是一種先進(jìn)先出的數(shù)據(jù)結(jié)構(gòu),隊(duì)列的使用還是比較普遍的,但是已經(jīng)有隊(duì)列了,怎么還需要MQ
呢?
我:?jiǎn)柲隳?,姐姐,知道嗎?為什么還需要MQ
?
姐姐:快點(diǎn)講,想挨打呀?
我:噗。。。算我多嘴,哼~~~
欠欠的我開始了接下來的耐心講解......
舉一個(gè)簡(jiǎn)單的例子,假設(shè)現(xiàn)在我們要做一個(gè)系統(tǒng),該登陸系統(tǒng)需要在用戶登陸成功后,發(fā)送封郵件到用戶郵箱進(jìn)行提醒,需求還是很簡(jiǎn)單的,我們先看一看沒有MQ
,我們?cè)撛趺磳?shí)現(xiàn)呢?畫一個(gè)時(shí)序圖來看一看:

看這個(gè)圖,郵件發(fā)送在請(qǐng)求登陸時(shí)進(jìn)行,當(dāng)密碼驗(yàn)證成功后,就發(fā)送郵件,然后返回登陸成功。這樣是可以的,但是他是有缺陷的。這讓我們的登陸操作變得復(fù)雜了,每次請(qǐng)求登陸都需要進(jìn)行郵件發(fā)送,如果這里出現(xiàn)錯(cuò)誤,整個(gè)登陸請(qǐng)求也出現(xiàn)了錯(cuò)誤,導(dǎo)致登陸不成功;還有一個(gè)問題,本來我們登陸請(qǐng)求調(diào)用接口僅僅需要100ms,因?yàn)橹虚g要做一次發(fā)送郵件的等待,那么調(diào)用一次登陸接口的時(shí)間就要增長(zhǎng),這就是問題所在,一封郵件他的優(yōu)先級(jí) 不是很高的,用戶也不需要實(shí)時(shí)收到這封郵件,所以這時(shí),就體現(xiàn)了消息隊(duì)列的重要性了,我們用消息隊(duì)列進(jìn)行改進(jìn)一下。

這里我們將發(fā)送郵件請(qǐng)求放到Mq
中,這樣我們就能提高用戶體驗(yàn)的吞吐量,這個(gè)很重要,顧客就是上帝嘛,畢竟也沒有人喜歡用一個(gè)很慢很慢的app。
這里只是舉了MQ
眾多應(yīng)用中的其中一個(gè),即異步應(yīng)用,MQ
還在系統(tǒng)解藕、削峰/限流中有著重要應(yīng)用,這兩個(gè)我就不具體講解了,原理都一樣,好好思考一下,你們都能懂得。
channel
好啦,姐姐終于知道什么是消息隊(duì)列了,但是現(xiàn)在還是沒法進(jìn)行消息隊(duì)列開發(fā)的,因?yàn)檫€差一個(gè)知識(shí)點(diǎn),即go語言中的channel
。這個(gè)很重要,我們還需要靠這個(gè)來開發(fā)我們的消息隊(duì)列呢。
因篇幅有限,這里不詳細(xì)介紹channel
,只介紹基本使用方法。
什么是channel
Goroutine 和 Channel 是 Go 語言并發(fā)編程的兩大基石。Goroutine 用于執(zhí)行并發(fā)任務(wù),Channel 用于 goroutine 之間的同步、通信。Go提倡使用通信的方法代替共享內(nèi)存,當(dāng)一個(gè)Goroutine需要和其他Goroutine資源共享時(shí),Channel就會(huì)在他們之間架起一座橋梁,并提供確保安全同步的機(jī)制。channel
本質(zhì)上其實(shí)還是一個(gè)隊(duì)列,遵循FIFO原則。具體規(guī)則如下:
- 先從 Channel 讀取數(shù)據(jù)的 Goroutine 會(huì)先接收到數(shù)據(jù);
- 先向 Channel 發(fā)送數(shù)據(jù)的 Goroutine 會(huì)得到先發(fā)送數(shù)據(jù)的權(quán)利;
創(chuàng)建通道
創(chuàng)建通道需要用到關(guān)鍵字 make ,格式如下:
通道實(shí)例 := make(chan 數(shù)據(jù)類型)
- 數(shù)據(jù)類型:通道內(nèi)傳輸?shù)脑仡愋汀?/section>
- 通道實(shí)例:通過make創(chuàng)建的通道句柄。
無緩沖通道的使用
Go語言中無緩沖的通道(unbuffered channel)是指在接收前沒有能力保存任何值的通道。這種類型的通道要求發(fā)送 goroutine 和接收 goroutine 同時(shí)準(zhǔn)備好,才能完成發(fā)送和接收操作。
無緩沖通道的定義方式如下:
通道實(shí)例 := make(chan 通道類型)
- 通道類型:和無緩沖通道用法一致,影響通道發(fā)送和接收的數(shù)據(jù)類型。
- 通道實(shí)例:被創(chuàng)建出的通道實(shí)例。
寫個(gè)例子來幫助大家理解一下吧:
package main
import (
'sync'
'time'
)
func main() {
c := make(chan string)
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
c <- `Golang夢(mèng)工廠`
}()
go func() {
defer wg.Done()
time.Sleep(time.Second * 1)
println(`Message: `+ <-c)
}()
wg.Wait()
}
帶緩沖的通道的使用
Go語言中有緩沖的通道(buffered channel)是一種在被接收前能存儲(chǔ)一個(gè)或者多個(gè)值的通道。這種類型的通道并不強(qiáng)制要求 goroutine 之間必須同時(shí)完成發(fā)送和接收。通道會(huì)阻塞發(fā)送和接收動(dòng)作的條件也會(huì)不同。只有在通道中沒有要接收的值時(shí),接收動(dòng)作才會(huì)阻塞。只有在通道沒有可用緩沖區(qū)容納被發(fā)送的值時(shí),發(fā)送動(dòng)作才會(huì)阻塞。
有緩沖通道的定義方式如下:
通道實(shí)例 := make(chan 通道類型, 緩沖大小)
- 通道類型:和無緩沖通道用法一致,影響通道發(fā)送和接收的數(shù)據(jù)類型。
- 緩沖大小:決定通道最多可以保存的元素?cái)?shù)量。
- 通道實(shí)例:被創(chuàng)建出的通道實(shí)例。
來寫一個(gè)例子講解一下:
package main
import (
'sync'
'time'
)
func main() {
c := make(chan string, 2)
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
c <- `Golang夢(mèng)工廠`
c <- `asong`
}()
go func() {
defer wg.Done()
time.Sleep(time.Second * 1)
println(`公眾號(hào): `+ <-c)
println(`作者: `+ <-c)
}()
wg.Wait()
}
好啦,通道的概念就介紹到這里了,如果需要,下一篇我出一個(gè)channel
詳細(xì)講解的文章。
消息隊(duì)列編碼實(shí)現(xiàn)
準(zhǔn)備篇
終于開始進(jìn)入主題了,姐姐都聽的快要睡著了,我轟隆一嗓子,立馬精神,但是呢,asong也是挨了一頓小電炮,代價(jià)慘痛呀,嗚嗚嗚............
在開始編寫代碼編寫直接,我需要構(gòu)思我們的整個(gè)代碼架構(gòu),這才是正確的編碼方式。我們先來定義一個(gè)接口,把我們需要實(shí)現(xiàn)的方法先列出來,后期對(duì)每一個(gè)代碼進(jìn)行實(shí)現(xiàn)就可以了。因此可以列出如下方法:
type Broker interface {
publish(topic string, msg interface{}) error
subscribe(topic string) (<-chan interface{}, error)
unsubscribe(topic string, sub <-chan interface{}) error
close()
broadcast(msg interface{}, subscribers []chan interface{})
setConditions(capacity int)
}
publish
:進(jìn)行消息的推送,有兩個(gè)參數(shù)即topic
、msg
,分別是訂閱的主題、要傳遞的消息subscribe
:消息的訂閱,傳入訂閱的主題,即可完成訂閱,并返回對(duì)應(yīng)的channel
通道用來接收數(shù)據(jù)unsubscribe
:取消訂閱,傳入訂閱的主題和對(duì)應(yīng)的通道- Golang夢(mèng)工廠 發(fā)起了一個(gè)讀者討論 小伙伴們,你們有什么想學(xué)的嘛,可以留言的呦
close
:這個(gè)的作用就是很明顯了,就是用來關(guān)閉消息隊(duì)列的broadCast
:這個(gè)屬于內(nèi)部方法,作用是進(jìn)行廣播,對(duì)推送的消息進(jìn)行廣播,保證每一個(gè)訂閱者都可以收到setConditions
:這里是用來設(shè)置條件,條件就是消息隊(duì)列的容量,這樣我們就可以控制消息隊(duì)列的大小了
細(xì)心的你們有沒有發(fā)現(xiàn)什么問題,這些代碼我都定義的是內(nèi)部方法,也就是包外不可用。為什么這么做呢,因?yàn)檫@里屬于代理要做的事情,我們還需要在封裝一層,也就是客戶端能直接調(diào)用的方法,這樣才符合軟件架構(gòu)。因此可以寫出如下代碼:
package mq
type Client struct {
bro *BrokerImpl
}
func NewClient() *Client {
return &Client{
bro: NewBroker(),
}
}
func (c *Client)SetConditions(capacity int) {
c.bro.setConditions(capacity)
}
func (c *Client)Publish(topic string, msg interface{}) error{
return c.bro.publish(topic,msg)
}
func (c *Client)Subscribe(topic string) (<-chan interface{}, error){
return c.bro.subscribe(topic)
}
func (c *Client)Unsubscribe(topic string, sub <-chan interface{}) error {
return c.bro.unsubscribe(topic,sub)
}
func (c *Client)Close() {
c.bro.close()
}
func (c *Client)GetPayLoad(sub <-chan interface{}) interface{}{
for val:= range sub{
if val != nil{
return val
}
}
return nil
}
上面只是準(zhǔn)好了代碼結(jié)構(gòu),但是消息隊(duì)列實(shí)現(xiàn)的結(jié)構(gòu)我們還沒有設(shè)計(jì),現(xiàn)在我們就來設(shè)計(jì)一下。
type BrokerImpl struct {
exit chan bool
capacity int
topics map[string][]chan interface{} // key:topic value :queue
sync.RWMutex // 同步鎖
}
exit
:也是一個(gè)通道,這個(gè)用來做關(guān)閉消息隊(duì)列用的capacity
:即用來設(shè)置消息隊(duì)列的容量topics
:這里使用一個(gè)map結(jié)構(gòu),key即是topic
,其值則是一個(gè)切片,chan
類型,這里這么做的原因是我們一個(gè)topic可以有多個(gè)訂閱者,所以一個(gè)訂閱者對(duì)應(yīng)著一個(gè)通道sync.RWMutex
:讀寫鎖,這里是為了防止并發(fā)情況下,數(shù)據(jù)的推送出現(xiàn)錯(cuò)誤,所以采用加鎖的方式進(jìn)行保證
好啦,現(xiàn)在我們已經(jīng)準(zhǔn)備的很充分啦,開始接下來方法填充之旅吧~~~
Publish
和broadcast
這里兩個(gè)合在一起講的原因是braodcast
是屬于publish
里的。這里的思路很簡(jiǎn)單,我們只需要把傳入的數(shù)據(jù)進(jìn)行廣播即可了,下面我們來看代碼實(shí)現(xiàn):
func (b *BrokerImpl) publish(topic string, pub interface{}) error {
select {
case <-b.exit:
return errors.New('broker closed')
default:
}
b.RLock()
subscribers, ok := b.topics[topic]
b.RUnlock()
if !ok {
return nil
}
b.broadcast(pub, subscribers)
return nil
}
func (b *BrokerImpl) broadcast(msg interface{}, subscribers []chan interface{}) {
count := len(subscribers)
concurrency := 1
switch {
case count > 1000:
concurrency = 3
case count > 100:
concurrency = 2
default:
concurrency = 1
}
pub := func(start int) {
for j := start; j < count; j += concurrency {
select {
case subscribers[j] <- msg:
case <-time.After(time.Millisecond * 5):
case <-b.exit:
return
}
}
}
for i := 0; i < concurrency; i++ {
go pub(i)
}
}
publish
方法中沒有什么好講的,這里主要說一下broadcast
的實(shí)現(xiàn):
這里主要對(duì)數(shù)據(jù)進(jìn)行廣播,所以數(shù)據(jù)推送出去就可以了,沒必要一直等著他推送成功,所以這里我們我們采用goroutine
。在推送的時(shí)候,當(dāng)推送失敗時(shí),我們也不能一直等待呀,所以這里我們加了一個(gè)超時(shí)機(jī)制,超過5毫秒就停止推送,接著進(jìn)行下面的推送。
可能你們會(huì)有疑惑,上面怎么還有一個(gè)switch
選項(xiàng)呀,干什么用的呢?考慮這樣一個(gè)問題,當(dāng)有大量的訂閱者時(shí),,比如10000個(gè),我們一個(gè)for循環(huán)去做消息的推送,那推送一次就會(huì)耗費(fèi)很多時(shí)間,并且不同的消費(fèi)者之間也會(huì)產(chǎn)生延時(shí),,所以采用這種方法進(jìn)行分解可以降低一定的時(shí)間。
subscribe
和 unsubScribe
我們先來看代碼:
func (b *BrokerImpl) subscribe(topic string) (<-chan interface{}, error) {
select {
case <-b.exit:
return nil, errors.New('broker closed')
default:
}
ch := make(chan interface{}, b.capacity)
b.Lock()
b.topics[topic] = append(b.topics[topic], ch)
b.Unlock()
return ch, nil
}
func (b *BrokerImpl) unsubscribe(topic string, sub <-chan interface{}) error {
select {
case <-b.exit:
return errors.New('broker closed')
default:
}
b.RLock()
subscribers, ok := b.topics[topic]
b.RUnlock()
if !ok {
return nil
}
// delete subscriber
var newSubs []chan interface{}
for _, subscriber := range subscribers {
if subscriber == sub {
continue
}
newSubs = append(newSubs, subscriber)
}
b.Lock()
b.topics[topic] = newSubs
b.Unlock()
return nil
}
這里其實(shí)就很簡(jiǎn)單了:
subscribe
:這里的實(shí)現(xiàn)則是為訂閱的主題創(chuàng)建一個(gè)channel
,然后將訂閱者加入到對(duì)應(yīng)的topic
中就可以了,并且返回一個(gè)接收channel
。unsubScribe
:這里實(shí)現(xiàn)的思路就是將我們剛才添加的channel
刪除就可以了。
close
func (b *BrokerImpl) close() {
select {
case <-b.exit:
return
default:
close(b.exit)
b.Lock()
b.topics = make(map[string][]chan interface{})
b.Unlock()
}
return
}
這里就是為了關(guān)閉整個(gè)消息隊(duì)列,這句代碼b.topics = make(map[string][]chan interface{})
比較重要,這里主要是為了保證下一次使用該消息隊(duì)列不發(fā)生沖突。
setConditions
GetPayLoad
還差最后兩個(gè)方法,一個(gè)是設(shè)置我們的消息隊(duì)列容量,另一個(gè)是封裝一個(gè)方法來獲取我們訂閱的消息:
func (b *BrokerImpl)setConditions(capacity int) {
b.capacity = capacity
}
func (c *Client)GetPayLoad(sub <-chan interface{}) interface{}{
for val:= range sub{
if val != nil{
return val
}
}
return nil
}
測(cè)試
好啦,代碼這么快就被寫完了,接下來我們進(jìn)行測(cè)試一下吧。
單元測(cè)試
正式測(cè)試之前,我們還是需要先進(jìn)行一下單元測(cè)試,養(yǎng)成好的習(xí)慣,只有先自測(cè)了,才能有底氣說我的代碼沒問題,要不直接跑程序,會(huì)出現(xiàn)很多bug
的。
這里我們測(cè)試方法如下:我們向不同的topic
發(fā)送不同的信息,當(dāng)訂閱者收到消息后,就行取消訂閱。
func TestClient(t *testing.T) {
b := NewClient()
b.SetConditions(100)
var wg sync.WaitGroup
for i := 0; i < 100; i++ {
topic := fmt.Sprintf('Golang夢(mèng)工廠%d', i)
payload := fmt.Sprintf('asong%d', i)
ch, err := b.Subscribe(topic)
if err != nil {
t.Fatal(err)
}
wg.Add(1)
go func() {
e := b.GetPayLoad(ch)
if e != payload {
t.Fatalf('%s expected %s but get %s', topic, payload, e)
}
if err := b.Unsubscribe(topic, ch); err != nil {
t.Fatal(err)
}
wg.Done()
}()
if err := b.Publish(topic, payload); err != nil {
t.Fatal(err)
}
}
wg.Wait()
}
測(cè)試通過,沒問題,接下來我們?cè)趯憥讉€(gè)方法測(cè)試一下
測(cè)試
這里分為兩種方式測(cè)試
測(cè)試一:使用一個(gè)定時(shí)器,向一個(gè)主題定時(shí)推送消息.
// 一個(gè)topic 測(cè)試
func OnceTopic() {
m := mq.NewClient()
m.SetConditions(10)
ch,err :=m.Subscribe(topic)
if err != nil{
fmt.Println('subscribe failed')
return
}
go OncePub(m)
OnceSub(ch,m)
defer m.Close()
}
// 定時(shí)推送
func OncePub(c *mq.Client) {
t := time.NewTicker(10 * time.Second)
defer t.Stop()
for {
select {
case <- t.C:
err := c.Publish(topic,'asong真帥')
if err != nil{
fmt.Println('pub message failed')
}
default:
}
}
}
// 接受訂閱消息
func OnceSub(m <-chan interface{},c *mq.Client) {
for {
val := c.GetPayLoad(m)
fmt.Printf('get message is %s\n',val)
}
}
測(cè)試二:使用一個(gè)定時(shí)器,定時(shí)向多個(gè)主題發(fā)送消息:
//多個(gè)topic測(cè)試
func ManyTopic() {
m := mq.NewClient()
defer m.Close()
m.SetConditions(10)
top := ''
for i:=0;i<10;i++{
top = fmt.Sprintf('Golang夢(mèng)工廠_%02d',i)
go Sub(m,top)
}
ManyPub(m)
}
func ManyPub(c *mq.Client) {
t := time.NewTicker(10 * time.Second)
defer t.Stop()
for {
select {
case <- t.C:
for i:= 0;i<10;i++{
//多個(gè)topic 推送不同的消息
top := fmt.Sprintf('Golang夢(mèng)工廠_%02d',i)
payload := fmt.Sprintf('asong真帥_%02d',i)
err := c.Publish(top,payload)
if err != nil{
fmt.Println('pub message failed')
}
}
default:
}
}
}
func Sub(c *mq.Client,top string) {
ch,err := c.Subscribe(top)
if err != nil{
fmt.Printf('sub top:%s failed\n',top)
}
for {
val := c.GetPayLoad(ch)
if val != nil{
fmt.Printf('%s get message is %s\n',top,val)
}
}
}
總結(jié)
終于幫助姐姐解決了這個(gè)問題,姐姐開心死了,給我一頓親,啊不對(duì),是一頓夸,夸的人家都不好意思了。
這一篇你學(xué)會(huì)了嗎?沒學(xué)會(huì)不要緊,趕快去把源代碼下載下來,好好通讀一下,很好理解的~~~。
其實(shí)這一篇是為了接下來的kafka學(xué)習(xí)打基礎(chǔ)的,學(xué)好了這一篇,接下來學(xué)習(xí)的kafka就會(huì)容易很多啦~~~
github地址:https://github.com/asong2020/Golang_Dream/tree/master/code_demo/queue
如果能給一個(gè)小星星就好了~~~