阻塞隊(duì)列(BlockingQueue)是一個(gè)支持兩個(gè)附加操作的隊(duì)列。這兩個(gè)附加的操作是:在隊(duì)列為空時(shí),獲取元素的線程會(huì)等待隊(duì)列變?yōu)榉强?。?dāng)隊(duì)列滿時(shí),存儲(chǔ)元素的線程會(huì)等待隊(duì)列可用。阻塞隊(duì)列常用于生產(chǎn)者和消費(fèi)者的場景,生產(chǎn)者是往隊(duì)列里添加元素的線程,消費(fèi)者是從隊(duì)列里拿元素的線程。阻塞隊(duì)列就是生產(chǎn)者存放元素的容器,而消費(fèi)者也只從容器里拿元素。 先放張圖:
也就是說,實(shí)現(xiàn)阻塞隊(duì)列需要考慮這三個(gè)點(diǎn)。 查了下資料,大多都是java的封裝好的類庫,不過沒事,反正思想,理論都是一樣的,不同的就是實(shí)現(xiàn)不同。但還是有個(gè)不錯(cuò)的C#實(shí)現(xiàn)----<< http://www.cnblogs.com/samgk/p/4772806.html C# 實(shí)現(xiàn)生產(chǎn)者消費(fèi)者隊(duì)列 >>。該文其實(shí)也道出了阻塞隊(duì)列在除去生產(chǎn)者-消費(fèi)者模型外的應(yīng)用,昨天查資料的時(shí)候,阿里程序員寫了篇文章關(guān)于郵件接收下載的,就是使用阻塞隊(duì)列,但是我忘了原文在哪了。當(dāng)時(shí)看的時(shí)候,想起來當(dāng)初看<<C#高級(jí)編程>>第十章的管道。書上介紹的是:開一個(gè)task去讀取文件名,放到阻塞隊(duì)列中,然后開一個(gè)隊(duì)列根據(jù)文件名讀取內(nèi)容,這個(gè)應(yīng)用于郵件接收下載是一樣的。暫時(shí)先不說這個(gè)了,有興趣的可以自己去看看那本書。 using System;using System.Collections.Generic;using System.Linq;using System.Text;using System.Threading;using System.Threading.Tasks;namespace SuiBao.Utility { ///阻塞隊(duì)列(BlockingQueue)是一個(gè)支持兩個(gè)附加操作的隊(duì)列。這兩個(gè)附加的操作是:在隊(duì)列為空時(shí),獲取元素的線程會(huì)等待隊(duì)列變?yōu)榉强铡.?dāng)隊(duì)列滿時(shí),存儲(chǔ)元素的線程會(huì)等待隊(duì)列可用。阻塞隊(duì)列常用于生產(chǎn)者和消費(fèi)者的場景,生產(chǎn)者是往隊(duì)列里添加元素的線程,消費(fèi)者是從隊(duì)列里拿元素的線程。阻塞隊(duì)列就是生產(chǎn)者存放元素的容器,而消費(fèi)者也只從容器里拿元素。 //阻塞隊(duì)列 需要實(shí)現(xiàn)兩個(gè)功能: 使線程等待與喚醒線程. 具體介紹如下: // 在極端條件下, 需要掛起線程, 等待隊(duì)列滿足條件后,再去執(zhí)行添加或提取 操作 // 待隊(duì)列滿足了條件之后, 通知線程去繼續(xù)其掛起之前的操作.... //涉及到的技術(shù): //線程同步(此實(shí)例用到了lock) 與 線程間通信(此示例用到了event) // // 可能產(chǎn)生死鎖的分析: // 在某個(gè)時(shí)刻,隊(duì)列為空或者是已滿, 此時(shí)生產(chǎn)者未能存入數(shù)據(jù)或者還在存入數(shù)據(jù)到隊(duì)列中, 這就會(huì)產(chǎn)生使得隊(duì)列出錯(cuò) // 如果此時(shí), 消費(fèi)者對(duì)隊(duì)列在進(jìn)行操作就會(huì)產(chǎn)生死鎖...由于之前的生產(chǎn)者的操作使得隊(duì)列出了問題并沒有釋放鎖, 此時(shí)就會(huì)造成死鎖 // 這是從預(yù)防死鎖的角度來解決死鎖問題 public class BlockQueue<T> { private Queue<T> _inner_queue = null; private ManualResetEvent _dequeue_wait = null; public int Count { get { return _inner_queue.Count; } } public BlockQueue(int capacity = -1) { this._inner_queue = capacity == -1 ? new Queue<T>() : new Queue<T>(capacity); this._dequeue_wait = new ManualResetEvent(false); } // 入隊(duì)加鎖 public void EnQueue(T item) { if (this._IsShutdown == true) throw new InvalidOperationException("服務(wù)未開啟.[EnQueue]"); lock (this._inner_queue) { this._inner_queue.Enqueue(item); this._dequeue_wait.Set(); } } // 出隊(duì)加鎖 public T DeQueue(int waitTime) { bool _queueEmpty = false; T item = default(T); while (true) { lock (this._inner_queue) { // 判斷隊(duì)列中是否有元素.... if (this._inner_queue.Count > 0) { item = this._inner_queue.Dequeue(); this._dequeue_wait.Reset(); //break; } else { if (this._IsShutdown == true) { throw new InvalidOperationException("服務(wù)未開啟[DeQueue]."); } else { _queueEmpty = true; } } } if (item != null) { return item; } if (_queueEmpty) { this._dequeue_wait.WaitOne(waitTime); } } } private bool _IsShutdown = false; public void Shutdown() { this._IsShutdown = true; this._dequeue_wait.Set(); } public void Clear() { this._inner_queue.Clear(); } } } 那么.net中有沒有封裝好的阻塞隊(duì)列?有??!BlockingCollection<>類,其實(shí)我之前寫的好些關(guān)于線程的文章都說到了這個(gè)類庫,用到的地方也多。該類默認(rèn)的容器是ConcurrentQueue,因此,同步就做好了,而且該類還實(shí)現(xiàn)了阻塞的功能: |
|