在 C++ 的多線程編程世界中,有一個神奇的存在 —— 無鎖隊列。它宛如一座堅固的橋梁,橫跨在多線程協(xié)作的鴻溝之上,成為提升程序性能和穩(wěn)定性的關鍵角色。
隨著計算機硬件的不斷發(fā)展,多核處理器已經(jīng)成為主流。多線程編程由此變得愈發(fā)重要,然而,傳統(tǒng)基于鎖的同步機制卻逐漸暴露出諸多問題。在這一背景下,無鎖隊列應運而生,它宛如黑暗中的燈塔,為多線程程序的高效運行照亮了前行的道路。
想象一下,在一個繁忙的交通樞紐,傳統(tǒng)的鎖機制就像是交通管制中的紅綠燈,雖然能維持秩序,但頻繁的等待和切換也會造成擁堵。而無鎖隊列則更像是智能交通系統(tǒng),讓數(shù)據(jù)在多線程之間流暢地穿梭,無需不必要的停頓。它為我們打開了一扇通往更高效多線程編程的大門,讓我們一同走進 C++ 無鎖隊列的奇妙世界吧。
一、無鎖隊列簡介 無鎖隊列(Lock-Free Queue)是一種并發(fā)數(shù)據(jù)結(jié)構(gòu),用于在多線程環(huán)境下實現(xiàn)高效的數(shù)據(jù)交換。與傳統(tǒng)的基于鎖的隊列相比,無鎖隊列使用了一些特殊的算法和技術(shù),避免了線程之間的互斥操作,從而提高了并發(fā)性能和響應性。
無鎖隊列通?;谠硬僮鳎╝tomic operations)或其他底層同步原語來實現(xiàn),并且它們采用一些巧妙的方法來確保操作的正確性。主要思想是通過使用原子讀寫操作或類似的機制,在沒有顯式鎖定整個隊列的情況下實現(xiàn)線程安全。
典型的無鎖隊列算法有循環(huán)緩沖區(qū)(Circular Buffer)和鏈表(Linked List)等。循環(huán)緩沖區(qū)通常使用兩個指針(head 和 tail)來表示隊列的開始和結(jié)束位置,利用自旋、CAS (Compare-and-Swap) 等原子操作來進行入隊和出隊操作。鏈表則通過利用 CAS 操作插入或刪除節(jié)點來實現(xiàn)并發(fā)訪問。
在當今多核心優(yōu)化的大背景下,無鎖隊列在多線程編程中起著至關重要的作用。多核心優(yōu)化是當前游戲開發(fā)以及許多領域的重點課題,無論是工程實踐還是算法研究,將工作并行化交由多線程去處理是極為普遍的場景。
在這種情況下,線程池與命令隊列的組合常常被采用,而其中的命令隊列就可以選擇使用互斥鎖或者無鎖隊列。由于命令隊列的讀寫通常是較輕量級的操作,采用無鎖隊列能夠獲得比有鎖操作更高的性能。無鎖隊列通過使用原子操作來確保線程安全,避免了鎖的開銷,包括上下文切換、線程調(diào)度延遲以及潛在的死鎖問題。
在多處理器系統(tǒng)中,無鎖隊列可以更好地擴展。隨著處理器數(shù)量的增加,使用鎖的隊列可能會遇到瓶頸,因為多個線程競爭同一個鎖。而無鎖隊列通過減少這種競爭,可以提供更好的并行性。在實時系統(tǒng)中,無鎖隊列可以提供更高效的響應時間,確保系統(tǒng)能夠及時處理各種任務。
優(yōu)點:
缺點:
二、無鎖隊列工作原理 無鎖隊列的原理是通過使用原子操作(atomic operations)或其他底層同步原語來實現(xiàn)并發(fā)安全。它們避免了傳統(tǒng)鎖機制中的互斥操作,以提高并發(fā)性能和響應性。典型的無鎖隊列算法有循環(huán)緩沖區(qū)(Circular Buffer)和鏈表(Linked List)等。
在循環(huán)緩沖區(qū)的實現(xiàn)中,通常使用兩個指針來表示隊列的開始位置和結(jié)束位置,即頭指針(head)和尾指針(tail)。入隊時,通過自旋、CAS (Compare-and-Swap) 等原子操作更新尾指針,并將元素放入相應位置。出隊時,同樣利用原子操作更新頭指針,并返回對應位置上的元素。
鏈表實現(xiàn)無鎖隊列時,在插入或刪除節(jié)點時使用 CAS 操作來確保只有一個線程成功修改節(jié)點的指針值。這樣可以避免對整個鏈表進行加鎖操作。
無論是循環(huán)緩沖區(qū)還是鏈表實現(xiàn),關鍵點在于如何利用原子操作確保不同線程之間的協(xié)調(diào)與一致性。需要仔細處理并發(fā)情況下可能出現(xiàn)的競爭條件,并設計合適的算法來保證正確性和性能。
2.1隊列操作模型 隊列是一種非常重要的數(shù)據(jù)結(jié)構(gòu),其特性是先進先出(FIFO),符合流水線業(yè)務流程。在進程間通信、網(wǎng)絡通信間經(jīng)常采用隊列做緩存,緩解數(shù)據(jù)處理壓力。根據(jù)操作隊列的場景分為:單生產(chǎn)者——單消費者、多生產(chǎn)者——單消費者、單生產(chǎn)者——多消費者、多生產(chǎn)者——多消費者四大模型。根據(jù)隊列中數(shù)據(jù)分為:隊列中的數(shù)據(jù)是定長的、隊列中的數(shù)據(jù)是變長的。
(1)單生產(chǎn)者——單消費者
(2)多生產(chǎn)者——單消費者
(3)單生產(chǎn)者——多消費者
(4)多生產(chǎn)者——多消費者
2.2CAS操作 CAS即Compare and Swap,是所有CPU指令都支持CAS的原子操作(X86中CMPXCHG匯編指令),用于實現(xiàn)實現(xiàn)各種無鎖(lock free)數(shù)據(jù)結(jié)構(gòu)。
CAS操作的C語言實現(xiàn)如下:
bool compare_and_swap ( int *memory_location, int expected_value, int new_value) { if (*memory_location == expected_value) { *memory_location = new_value; return true; } return false; }
CAS用于檢查一個內(nèi)存位置是否包含預期值,如果包含,則把新值復賦值到內(nèi)存位置。成功返回true,失敗返回false。
(1)GGC對CAS支持,GCC4.1+版本中支持CAS原子操作。
bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...); type __sync_val_compare_and_swap (type *ptr, type oldval type newval, ...);
(2)Windows對CAS支持,Windows中使用Windows API支持CAS。
LONG InterlockedCompareExchange( LONG volatile *Destination, LONG ExChange, LONG Comperand );
(3)C11對CAS支持,C11 STL中atomic函數(shù)支持CAS并可以跨平臺。
template< class T > bool atomic_compare_exchange_weak( std::atomic* obj,T* expected, T desired ); template< class T > bool atomic_compare_exchange_weak( volatile std::atomic* obj,T* expected, T desired );
其它原子操作如下:
2.3隊列數(shù)據(jù)定長與變長 (1)隊列數(shù)據(jù)定長
(2)隊列數(shù)據(jù)變長
⑴問題描述
在多線程環(huán)境下,原始隊列會出現(xiàn)各種不可預料的問題。以兩個線程同時寫入為例,假設線程 A 和線程 B 同時對原始隊列進行寫入操作。首先看原始隊列的入隊偽代碼:void Enqueue(Node *node){m_Tail->next = node;m_Tail = node;},這個操作分為兩步。當兩個線程同時執(zhí)行時,可能出現(xiàn)這樣的情況:線程 A 執(zhí)行完第一步m_Tail->next = nodeC后,線程 B 開始執(zhí)行并完成了整個入隊操作,接著線程 A 繼續(xù)執(zhí)行第二步m_Tail = nodeB,這就導致了 Tail 指針失去與隊列的鏈接,后加的節(jié)點從 Head 開始就訪問不到了。這種情況會使得隊列的狀態(tài)變得混亂,無法保證數(shù)據(jù)的正確存儲和讀取。
⑵解決方法
為了解決上述問題,可以使用原子操作實現(xiàn)無鎖同步。原子操作是不可分割的操作,CPU 的一個線程在執(zhí)行原子操作時,不會被其他線程中斷或搶占。其中,典型的原子操作有 Load / Store(讀取與保存)、Test and Set(針對 bool 變量,如果為 true 則返回 true,如果為 false,則將變量置為 true 并返回 false)、Clear(將 bool 變量設為 false)、Exchange(將指定位置的值設置為傳入值,并返回其舊值)等。
而 CAS(Compare And Swap)在實現(xiàn)無鎖同步中起著關鍵作用。CAS 操作包含三個參數(shù):一個內(nèi)存地址 V、一個期望值 A 和一個新值 B。當執(zhí)行 CAS 操作時,如果當前內(nèi)存地址 V 中存儲的值等于期望值 A,則將新值 B 寫入該內(nèi)存地址,并返回 true;否則,不做任何修改,并返回 false。在無鎖隊列中,可以利用 CAS 操作來確保對 Head 或 Tail 指針的讀寫操作是原子性的,從而避免多線程同時寫入或讀取時出現(xiàn)的指針混亂問題。例如,在入隊操作中,可以使用 CAS 來確保在更新 Tail 指針時,不會被其他線程干擾。如果當前 Tail 指針指向的節(jié)點的_next指針與期望值不一致,說明有其他線程進行了寫入操作,此時可以重新嘗試 CAS 操作,直到成功為止。這樣就可以實現(xiàn)無鎖隊列的安全寫入和讀取操作。
三、無鎖隊列方案 3.1boost方案 boost提供了三種無鎖方案,分別適用不同使用場景。
boost::lockfree::queue是支持多個生產(chǎn)者和多個消費者線程的無鎖隊列。
boost::lockfree::stack是支持多個生產(chǎn)者和多個消費者線程的無鎖棧。
boost::lockfree::spsc_queue是僅支持單個生產(chǎn)者和單個消費者線程的無鎖隊列,比boost::lockfree::queue性能更好。
Boost無鎖數(shù)據(jù)結(jié)構(gòu)的API通過輕量級原子鎖實現(xiàn)lock-free,不是真正意義的無鎖。
Boost提供的queue可以設置初始容量,添加新元素時如果容量不夠,則總?cè)萘孔詣釉鲩L;但對于無鎖數(shù)據(jù)結(jié)構(gòu),添加新元素時如果容量不夠,總?cè)萘坎粫詣釉鲩L。
3.2并發(fā)隊列 ConcurrentQueue采用了無鎖算法來實現(xiàn)并發(fā)操作。它基于CAS(Compare-and-Swap)原子操作和其他底層同步原語來保證線程安全性。具體來說,它使用自旋鎖和原子指令來確保對隊列的修改是原子的,并且在多個線程之間共享數(shù)據(jù)時提供正確性保證。
ConcurrentQueue是基于C++實現(xiàn)的工業(yè)級無鎖隊列方案。 GitHub:https://github.com/cameron314/concurrentqueue ReaderWriterQueue是基于C++實現(xiàn)的單生產(chǎn)者單消費者場景的無鎖隊列方案。 GitHub:https://github.com/cameron314/readerwriterqueue
ConcurrentQueue具有以下特點:
線程安全:多個線程可以同時對隊列進行操作而無需額外加鎖。
無阻塞:入隊和出隊操作通常是非阻塞的,并且具有較低的開銷。
先進先出(FIFO)順序:元素按照插入順序排列,在出隊時會返回最早入隊的元素。
使用ConcurrentQueue可以方便地處理多個線程之間共享數(shù)據(jù),并減少由于加鎖引起的性能開銷。但需要注意,雖然ConcurrentQueue提供了高效、線程安全的并發(fā)操作,但在某些特定情況下可能不適合所有應用場景,因此在選擇數(shù)據(jù)結(jié)構(gòu)時需要根據(jù)具體需求進行評估。
3.3Disruptor Disruptor是一種高性能的并發(fā)編程框架,用于實現(xiàn)無鎖(lock-free)的并發(fā)數(shù)據(jù)結(jié)構(gòu)。它最初由LMAX Exchange開發(fā),并成為了其核心交易引擎的關鍵組件。
Disruptor旨在解決在高度多線程環(huán)境下的數(shù)據(jù)共享和通信問題。它基于環(huán)形緩沖區(qū)(Ring Buffer)和事件驅(qū)動模型,通過優(yōu)化內(nèi)存訪問和線程調(diào)度,提供了非常高效的消息傳遞機制。
Disruptor是英國外匯交易公司LMAX基于JAVA開發(fā)的一個高性能隊列。 GitHub:https://github.com/LMAX-Exchange/disruptor
主要特點如下:
無鎖設計:Disruptor使用CAS(Compare-and-Swap)等無鎖算法來避免使用傳統(tǒng)鎖帶來的競爭和阻塞。
高吞吐量:Disruptor利用環(huán)形緩沖區(qū)和預分配內(nèi)存等技術(shù),在保證正確性前提下追求盡可能高的處理速度。
低延遲:由于無鎖設計和緊湊的內(nèi)存布局,Disruptor能夠?qū)崿F(xiàn)非常低的消息處理延遲。
線程間協(xié)調(diào):Disruptor提供了靈活而強大的事件發(fā)布、消費者等待及觸發(fā)機制,可用于實現(xiàn)復雜的線程間通信模式。
使用Disruptor可以有效地解決生產(chǎn)者-消費者模型中數(shù)據(jù)傳遞過程中的性能瓶頸,特別適用于高并發(fā)、低延遲的應用場景,例如金融交易系統(tǒng)、消息隊列等。然而,由于Disruptor對編程模型和理解要求較高,使用時需要仔細考慮,并根據(jù)具體需求評估是否適合。
四、無鎖隊列的實現(xiàn)方式 4.1環(huán)形緩沖區(qū) RingBuffer是生產(chǎn)者和消費者模型中常用的數(shù)據(jù)結(jié)構(gòu),生產(chǎn)者將數(shù)據(jù)追加到數(shù)組尾端,當達到數(shù)組的尾部時,生產(chǎn)者繞回到數(shù)組的頭部;消費者從數(shù)組頭端取走數(shù)據(jù),當?shù)竭_數(shù)組的尾部時,消費者繞回到數(shù)組頭部。
如果只有一個生產(chǎn)者和一個消費者,環(huán)形緩沖區(qū)可以無鎖訪問,環(huán)形緩沖區(qū)的寫入index只允許生產(chǎn)者訪問并修改,只要生產(chǎn)者在更新index前將新的值保存到緩沖區(qū)中,則消費者將始終看到一致的數(shù)據(jù)結(jié)構(gòu);讀取index也只允許消費者訪問并修改,消費者只要在取走數(shù)據(jù)后更新讀index,則生產(chǎn)者將始終看到一致的數(shù)據(jù)結(jié)構(gòu)。
入隊操作:
data[rear] = x; rear = (rear+1)%maxn;
出隊操作:
x = data[front]; front = (front+1)%maxn;
單生產(chǎn)者單消費者
對于單生產(chǎn)者和單消費者場景,由于read_index和write_index都只會有一個線程寫,因此不需要加鎖也不需要原子操作,直接修改即可,但讀寫數(shù)據(jù)時需要考慮遇到數(shù)組尾部的情況。
線程對write_index和read_index的讀寫操作如下:
多生產(chǎn)者單消費者:多生產(chǎn)者和單消費者場景中,由于多個生產(chǎn)者都會修改write_index,所以在不加鎖的情況下必須使用原子操作。
4.2RingBuffer實現(xiàn) RingBuffer.hpp文件:
#pragma once template <class T> class RingBuffer { public: RingBuffer(unsigned size): m_size(size), m_front(0), m_rear(0) { m_data = new T[size]; } ~RingBuffer() { delete [] m_data; m_data = NULL; } inline bool isEmpty() const { return m_front == m_rear; } inline bool isFull() const { return m_front == (m_rear + 1) % m_size; } bool push(const T& value) { if(isFull()) { return false; } m_data[m_rear] = value; m_rear = (m_rear + 1) % m_size; return true; } bool push(const T* value) { if(isFull()) { return false; } m_data[m_rear] = *value; m_rear = (m_rear + 1) % m_size; return true; } inline bool pop(T& value) { if(isEmpty()) { return false; } value = m_data[m_front]; m_front = (m_front + 1) % m_size; return true; } inline unsigned int front()const { return m_front; } inline unsigned int rear()const { return m_rear; } inline unsigned int size()const { return m_size; } private: unsigned int m_size;// 隊列長度 int m_front;// 隊列頭部索引 int m_rear;// 隊列尾部索引 T* m_data;// 數(shù)據(jù)緩沖區(qū) };
RingBufferTest.cpp測試代碼:
#include <stdio.h> #include <thread> #include <unistd.h> #include <sys/time.h> #include "RingBuffer.hpp" class Test { public: Test(int id = 0, int value = 0) { this->id = id; this->value = value; sprintf(data, "id = %d, value = %d\n", this->id, this->value); } void display() { printf("%s", data); } private: int id; int value; char data[128]; }; double getdetlatimeofday(struct timeval *begin, struct timeval *end) { return (end->tv_sec + end->tv_usec * 1.0 / 1000000) - (begin->tv_sec + begin->tv_usec * 1.0 / 1000000); } RingBuffer<Test> queue(1 << 12);2u000 #define N (10 * (1 << 20)) void produce() { struct timeval begin, end; gettimeofday(&begin, NULL); unsigned int i = 0; while(i < N) { if(queue.push(Test(i % 1024, i))) { i++; } } gettimeofday(&end, NULL); double tm = getdetlatimeofday(&begin, &end); printf("producer tid=%lu %f MB/s %f msg/s elapsed= %f size= %u\n", pthread_self(), N * sizeof(Test) * 1.0 / (tm * 1024 * 1024), N * 1.0 / tm, tm, i); } void consume() { sleep(1); Test test; struct timeval begin, end; gettimeofday(&begin, NULL); unsigned int i = 0; while(i < N) { if(queue.pop(test)) { // test.display(); i++; } } gettimeofday(&end, NULL); double tm = getdetlatimeofday(&begin, &end); printf("consumer tid=%lu %f MB/s %f msg/s elapsed= %f, size=%u \n", pthread_self(), N * sizeof(Test) * 1.0 / (tm * 1024 * 1024), N * 1.0 / tm, tm, i); } int main(int argc, char const *argv[]) { std::thread producer1(produce); std::thread consumer(consume); producer1.join(); consumer.join(); return 0; }
編譯:
g++ --std=c++11 RingBufferTest.cpp -o test -pthread
單生產(chǎn)者單消費者場景下,消息吞吐量為350萬條/秒左右。
4.3LockFreeQueue實現(xiàn) LockFreeQueue.hpp:
#include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <fcntl.h> #include <stdbool.h> #include <sys/stat.h> #include <sys/types.h> #include <sys/time.h> #include <sys/mman.h> #define SHM_NAME_LEN 128 #define MIN(a, b) ((a) > (b) ? (b) : (a)) #define IS_POT(x) ((x) && !((x) & ((x)-1))) #define MEMORY_BARRIER __sync_synchronize() template <class T> class LockFreeQueue { protected: typedef struct { int m_lock; inline void spinlock_init() { m_lock = 0; } inline void spinlock_lock() { while(!__sync_bool_compare_and_swap(&m_lock, 0, 1)) {} } inline void spinlock_unlock() { __sync_lock_release(&m_lock); } } spinlock_t; public: // size:隊列大小 // name:共享內(nèi)存key的路徑名稱,默認為NULL,使用數(shù)組作為底層緩沖區(qū)。 LockFreeQueue(unsigned int size, const char* name = NULL) { memset(shm_name, 0, sizeof(shm_name)); createQueue(name, size); } ~LockFreeQueue() { if(shm_name[0] == 0) { delete [] m_buffer; m_buffer = NULL; } else { if (munmap(m_buffer, m_size * sizeof(T)) == -1) { perror("munmap"); } if (shm_unlink(shm_name) == -1) { perror("shm_unlink"); } } } bool isFull()const { #ifdef USE_POT return m_head == (m_tail + 1) & (m_size - 1); #else return m_head == (m_tail + 1) % m_size; #endif } bool isEmpty()const { return m_head == m_tail; } unsigned int front()const { return m_head; } unsigned int tail()const { return m_tail; } bool push(const T& value) { #ifdef USE_LOCK m_spinLock.spinlock_lock(); #endif if(isFull()) { #ifdef USE_LOCK m_spinLock.spinlock_unlock(); #endif return false; } memcpy(m_buffer + m_tail, &value, sizeof(T)); #ifdef USE_MB MEMORY_BARRIER; #endif #ifdef USE_POT m_tail = (m_tail + 1) & (m_size - 1); #else m_tail = (m_tail + 1) % m_size; #endif #ifdef USE_LOCK m_spinLock.spinlock_unlock(); #endif return true; } bool pop(T& value) { #ifdef USE_LOCK m_spinLock.spinlock_lock(); #endif if (isEmpty()) { #ifdef USE_LOCK m_spinLock.spinlock_unlock(); #endif return false; } memcpy(&value, m_buffer + m_head, sizeof(T)); #ifdef USE_MB MEMORY_BARRIER; #endif #ifdef USE_POT m_head = (m_head + 1) & (m_size - 1); #else m_head = (m_head + 1) % m_size; #endif #ifdef USE_LOCK m_spinLock.spinlock_unlock(); #endif return true; } protected: virtual void createQueue(const char* name, unsigned int size) { #ifdef USE_POT if (!IS_POT(size)) { size = roundup_pow_of_two(size); } #endif m_size = size; m_head = m_tail = 0; if(name == NULL) { m_buffer = new T[m_size]; } else { int shm_fd = shm_open(name, O_CREAT | O_RDWR, 0666); if (shm_fd < 0) { perror("shm_open"); } if (ftruncate(shm_fd, m_size * sizeof(T)) < 0) { perror("ftruncate"); close(shm_fd); } void *addr = mmap(0, m_size * sizeof(T), PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0); if (addr == MAP_FAILED) { perror("mmap"); close(shm_fd); } if (close(shm_fd) == -1) { perror("close"); exit(1); } m_buffer = static_cast<T*>(addr); memcpy(shm_name, name, SHM_NAME_LEN - 1); } #ifdef USE_LOCK spinlock_init(m_lock); #endif } inline unsigned int roundup_pow_of_two(size_t size) { size |= size >> 1; size |= size >> 2; size |= size >> 4; size |= size >> 8; size |= size >> 16; size |= size >> 32; return size + 1; } protected: char shm_name[SHM_NAME_LEN]; volatile unsigned int m_head; volatile unsigned int m_tail; unsigned int m_size; #ifdef USE_LOCK spinlock_t m_spinLock; #endif T* m_buffer; };
#define USE_LOCK 開啟spinlock鎖,多生產(chǎn)者多消費者場景 #define USE_MB 開啟Memory Barrier #define USE_POT 開啟隊列大小的2的冪對齊
LockFreeQueueTest.cpp測試文件:
#include "LockFreeQueue.hpp" #include <thread> //#define USE_LOCK class Test { public: Test(int id = 0, int value = 0) { this->id = id; this->value = value; sprintf(data, "id = %d, value = %d\n", this->id, this->value); } void display() { printf("%s", data); } private: int id; int value; char data[128]; }; double getdetlatimeofday(struct timeval *begin, struct timeval *end) { return (end->tv_sec + end->tv_usec * 1.0 / 1000000) - (begin->tv_sec + begin->tv_usec * 1.0 / 1000000); } LockFreeQueue<Test> queue(1 << 10, "/shm"); #define N ((1 << 20)) void produce() { struct timeval begin, end; gettimeofday(&begin, NULL); unsigned int i = 0; while(i < N) { if(queue.push(Test(i >> 10, i))) i++; } gettimeofday(&end, NULL); double tm = getdetlatimeofday(&begin, &end); printf("producer tid=%lu %f MB/s %f msg/s elapsed= %f size= %u\n", pthread_self(), N * sizeof(Test) * 1.0 / (tm * 1024 * 1024), N * 1.0 / tm, tm, i); } void consume() { Test test; struct timeval begin, end; gettimeofday(&begin, NULL); unsigned int i = 0; while(i < N) { if(queue.pop(test)) { //test.display(); i++; } } gettimeofday(&end, NULL); double tm = getdetlatimeofday(&begin, &end); printf("consumer tid=%lu %f MB/s %f msg/s elapsed= %f size= %u\n", pthread_self(), N * sizeof(Test) * 1.0 / (tm * 1024 * 1024), N * 1.0 / tm, tm, i); } int main(int argc, char const *argv[]) { std::thread producer1(produce); //std::thread producer2(produce); std::thread consumer(consume); producer1.join(); //producer2.join(); consumer.join(); return 0; }
多線程場景下,需要定義USE_LOCK宏,開啟鎖保護。
編譯:
g++ --std=c++11 -O3 LockFreeQueueTest.cpp -o test -lrt -pthread
4.4場景分析(不同場景的實現(xiàn)) 單生產(chǎn)者單消費者隊列(SPSC 隊列):在這種隊列中,只有一個生產(chǎn)者線程和一個消費者線程操作該隊列。由于只有一個線程操作隊列,因此不需要考慮線程同步和數(shù)據(jù)競爭的問題,可以實現(xiàn)非常高效的數(shù)據(jù)訪問。
例如,在某些特定的任務處理場景中,一個線程負責生成任務,另一個線程負責處理任務,此時使用 SPSC 隊列可以避免復雜的同步機制,提高處理效率。
多生產(chǎn)者多消費者隊列(MPMC 隊列):在這種隊列中,有多個生產(chǎn)者線程和多個消費者線程操作該隊列。由于存在多個線程同時操作隊列,因此必須考慮線程同步和數(shù)據(jù)競爭的問題,需要使用一些同步機制來保證數(shù)據(jù)的正確性。
常見的實現(xiàn)方式是使用原子操作和 CAS 等技術(shù)。例如,在一個高并發(fā)的服務器程序中,多個客戶端請求可以看作是生產(chǎn)者,服務器的多個處理線程可以看作是消費者,使用 MPMC 隊列可以有效地管理這些請求和處理任務。
單生產(chǎn)者多消費者隊列(SPMC 隊列):在這種隊列中,生產(chǎn)者線程向隊列中寫入數(shù)據(jù),多個消費者線程從隊列中讀取數(shù)據(jù)。這種隊列的實現(xiàn)可以使用原子操作或者互斥鎖來實現(xiàn)線程同步。
比如在一個視頻處理系統(tǒng)中,一個視頻采集線程作為生產(chǎn)者,多個視頻編碼線程作為消費者,使用 SPMC 隊列可以實現(xiàn)高效的數(shù)據(jù)傳輸和處理。
多生產(chǎn)者單消費者隊列(MPSC 隊列):在這種隊列中,多個生產(chǎn)者線程向隊列中寫入數(shù)據(jù),一個消費者線程從隊列中讀取數(shù)據(jù)。這種隊列的實現(xiàn)也可以使用原子操作或者互斥鎖來實現(xiàn)線程同步。
例如在一個日志收集系統(tǒng)中,多個日志生成線程作為生產(chǎn)者,一個日志分析線程作為消費者,使用 MPSC 隊列可以方便地管理日志數(shù)據(jù)。
4.5常見隊列形式 ⑴鏈式隊列(Lock-free Linked Queue)
鏈式隊列是一種基于鏈表實現(xiàn)的隊列,每個節(jié)點包含一個數(shù)據(jù)元素和一個指向下一個節(jié)點的指針。
特點:可以動態(tài)地分配和釋放內(nèi)存,適用于數(shù)據(jù)量不確定或者數(shù)據(jù)大小不固定的情況。在多線程環(huán)境下,需要使用無鎖算法來避免鎖的性能損失。
例如,在一個實時數(shù)據(jù)采集系統(tǒng)中,數(shù)據(jù)量可能隨時變化,使用鏈式隊列可以靈活地適應這種變化。
⑵數(shù)組隊列(Lock-free Array Queue)
數(shù)組隊列是一種基于數(shù)組實現(xiàn)的隊列,它可以提高數(shù)據(jù)的讀寫效率,適用于數(shù)據(jù)量比較大且大小固定的情況。
實現(xiàn)比較簡單,可以使用一個指針來記錄隊尾位置,一個指針來記錄隊頭位置。在多線程環(huán)境下,需要使用無鎖算法來避免鎖的性能損失。
比如在一個圖像處理系統(tǒng)中,圖像數(shù)據(jù)的大小相對固定,使用數(shù)組隊列可以提高數(shù)據(jù)處理效率。
⑶環(huán)形隊列
實現(xiàn)環(huán)形隊列的方式可以基于數(shù)組或者基于鏈表。
優(yōu)點:可以有效地利用內(nèi)存空間,避免了數(shù)據(jù)的移動和浪費。在多線程環(huán)境下,也可以使用無鎖算法來實現(xiàn)高效的數(shù)據(jù)訪問。
例如,在一個網(wǎng)絡數(shù)據(jù)包處理系統(tǒng)中,環(huán)形隊列可以快速地存儲和讀取數(shù)據(jù)包,提高系統(tǒng)的性能。
五、無鎖隊列的性能優(yōu)勢 5.1高效性 無鎖隊列之所以能夠避免鎖競爭和開銷,從而提高性能,主要有以下幾個原因。首先,鎖的使用會帶來上下文切換和線程調(diào)度延遲。當一個線程獲取鎖時,如果其他線程也在競爭這個鎖,那么這些線程可能會被阻塞,等待鎖的釋放。而上下文切換和線程調(diào)度需要消耗一定的時間和系統(tǒng)資源,這會降低程序的執(zhí)行效率。無鎖隊列通過使用原子操作和 CAS 等技術(shù),避免了鎖的使用,從而減少了上下文切換和線程調(diào)度的次數(shù),提高了程序的性能。
其次,無鎖隊列可以更好地利用處理器的緩存。在多線程環(huán)境下,鎖的使用可能會導致緩存一致性問題,因為多個線程可能會同時訪問和修改共享數(shù)據(jù)。為了保證數(shù)據(jù)的一致性,處理器需要進行緩存同步操作,這會降低緩存的命中率,增加內(nèi)存訪問的延遲。無鎖隊列通過使用原子操作和 CAS 等技術(shù),可以避免緩存一致性問題,提高緩存的命中率,從而減少內(nèi)存訪問的延遲,提高程序的性能。
據(jù)統(tǒng)計,在某些高并發(fā)的場景下,無鎖隊列的性能可以比有鎖隊列提高數(shù)倍甚至數(shù)十倍。
5.2線程安全 無鎖隊列在多線程環(huán)境下具有很高的安全性。這是因為無鎖隊列通過使用原子操作和 CAS 等技術(shù),確保了對隊列的操作是原子性的。原子操作是不可分割的操作,它要么全部執(zhí)行成功,要么全部執(zhí)行失敗,不會出現(xiàn)部分執(zhí)行成功的情況。CAS 操作可以確保在對隊列進行操作時,不會被其他線程干擾。如果當前內(nèi)存地址中存儲的值與期望值不一致,說明有其他線程進行了寫入操作,此時可以重新嘗試 CAS 操作,直到成功為止。
此外,無鎖隊列還可以避免死鎖問題。在有鎖隊列中,如果多個線程同時獲取鎖,并且在獲取鎖的順序上出現(xiàn)問題,就可能會導致死鎖。而無鎖隊列不需要使用鎖,自然也就避免了死鎖問題。
5.3可擴展性和低延遲 無鎖隊列具有很好的可擴展性,可以在多個處理器上并行運行。在多處理器系統(tǒng)中,無鎖隊列可以更好地利用多個處理器的資源,提高隊列的吞吐量。這是因為無鎖隊列通過使用原子操作和 CAS 等技術(shù),避免了鎖的競爭,從而可以讓多個線程在不同的處理器上同時對隊列進行操作。
無鎖隊列還具有低延遲的特點。在實時系統(tǒng)中,低延遲是非常重要的。無鎖隊列可以實現(xiàn)非阻塞式的數(shù)據(jù)訪問,從而降低隊列的延遲。這是因為無鎖隊列通過使用原子操作和 CAS 等技術(shù),可以在不阻塞其他線程的情況下,對隊列進行操作。如果當前操作無法成功,可以立即返回,而不會等待其他線程釋放鎖。這樣可以減少線程的等待時間,提高系統(tǒng)的響應速度。
例如,在一個高并發(fā)的網(wǎng)絡服務器中,無鎖隊列可以快速地處理大量的網(wǎng)絡請求,提高服務器的響應速度和吞吐量。
六、Kfifo內(nèi)核隊列 計算機科學家已經(jīng)證明,當只有一個讀線程和一個寫線程并發(fā)操作時,不需要任何額外的鎖,就可以確保是線程安全的,也即kfifo使用了無鎖編程技術(shù),以提高kernel的并發(fā)。
Linux kernel里面從來就不缺少簡潔,優(yōu)雅和高效的代碼,只是我們?nèi)鄙侔l(fā)現(xiàn)和品味的眼光。在Linux kernel里面,簡潔并不表示代碼使用神出鬼沒的超然技巧,相反,它使用的不過是大家非常熟悉的基礎數(shù)據(jù)結(jié)構(gòu),但是kernel開發(fā)者能從基礎的數(shù)據(jù)結(jié)構(gòu)中,提煉出優(yōu)美的特性。
kfifo就是這樣的一類優(yōu)美代碼,它十分簡潔,絕無多余的一行代碼,卻非常高效。
關于kfifo信息如下:
本文分析的原代碼版本: 2.6.24.4 kfifo的定義文件: kernel/kfifo.c kfifo的頭文件: include/linux/kfifo.h
kfifo是Linux內(nèi)核的一個FIFO數(shù)據(jù)結(jié)構(gòu),采用環(huán)形循環(huán)隊列的數(shù)據(jù)結(jié)構(gòu)來實現(xiàn),提供一個無邊界的字節(jié)流服務,并且使用并行無鎖編程技術(shù),即單生產(chǎn)者單消費者場景下兩個線程可以并發(fā)操作,不需要任何加鎖行為就可以保證kfifo線程安全。
kfifo代碼既然肩負著這么多特性,那我們先一敝它的代碼:
struct kfifo { unsigned char *buffer; /* the buffer holding the data */ unsigned int size; /* the size of the allocated buffer */ unsigned int in; /* data is added at offset (in % size) */ unsigned int out; /* data is extracted from off. (out % size) */ spinlock_t *lock; /* protects concurrent modifications */ };
這是kfifo的數(shù)據(jù)結(jié)構(gòu),kfifo主要提供了兩個操作,__kfifo_put(入隊操作)和__kfifo_get(出隊操作)。 它的各個數(shù)據(jù)成員如下:
buffer: 用于存放數(shù)據(jù)的緩存
size: buffer空間的大小,在初化時,將它向上擴展成2的冪(如5,向上擴展 與它最接近的值且是2的n次方的值是2^3,即8)
lock: 如果使用不能保證任何時間最多只有一個讀線程和寫線程,需要使用該lock實施同步。
in, out: 和buffer一起構(gòu)成一個循環(huán)隊列。 in指向buffer中隊頭,而且out指向buffer中的隊尾
它的結(jié)構(gòu)如示圖如下:
+--------------------------------------------------------------+ | |<----------data---------->| | +--------------------------------------------------------------+ ^ ^ ^ | | | out in size
當然,內(nèi)核開發(fā)者使用了一種更好的技術(shù)處理了in, out和buffer的關系,我們將在下面進行詳細分析。
6.1kfifo功能描述 kfifo提供如下對外功能規(guī)格
(1)kfifo_alloc 分配kfifo內(nèi)存和初始化工作
struct kfifo *kfifo_alloc(unsigned int size, gfp_t gfp_mask, spinlock_t *lock) { unsigned char *buffer; struct kfifo *ret; /* * round up to the next power of 2, since our 'let the indices * wrap' tachnique works only in this case. */ if (size & (size - 1)) { BUG_ON(size > 0x80000000); size = roundup_pow_of_two(size); } buffer = kmalloc(size, gfp_mask); if (!buffer) return ERR_PTR(-ENOMEM); ret = kfifo_init(buffer, size, gfp_mask, lock); if (IS_ERR(ret)) kfree(buffer); return ret; }
這里值得一提的是,kfifo->size的值總是在調(diào)用者傳進來的size參數(shù)的基礎上向2的冪擴展(roundup_pow_of_two,我自己的實現(xiàn)在文章末尾),這是內(nèi)核一貫的做法。這樣的好處不言而喻——對kfifo->size取模運算可以轉(zhuǎn)化為與運算,如下:
kfifo->in % kfifo->size 可以轉(zhuǎn)化為 kfifo->in & (kfifo->size – 1)
在kfifo_alloc函數(shù)中,使用size & (size – 1)來判斷size 是否為2冪,如果條件為真,則表示size不是2的冪,然后調(diào)用roundup_pow_of_two將之向上擴展為2的冪。
這都是常用的技巧,只不過大家沒有將它們結(jié)合起來使用而已,下面要分析的__kfifo_put和__kfifo_get則是將kfifo->size的特點發(fā)揮到了極致。
(2)__kfifo_put和__kfifo_get巧妙的入隊和出隊
__kfifo_put是入隊操作,它先將數(shù)據(jù)放入buffer里面,最后才修改in參數(shù);__kfifo_get是出隊操作,它先將數(shù)據(jù)從buffer中移走,最后才修改out。(確保即使in和out修改失敗,也可以再來一遍)
你會發(fā)現(xiàn)in和out兩者各司其職。下面是__kfifo_put和__kfifo_get的代碼
unsigned int __kfifo_put(struct kfifo *fifo, unsigned char *buffer, unsigned int len) { unsigned int l; len = min(len, fifo->size - fifo->in + fifo->out); /* * Ensure that we sample the fifo->out index -before- we * start putting bytes into the kfifo. */ smp_mb(); /* first put the data starting from fifo->in to buffer end */ l = min(len, fifo->size - (fifo->in & (fifo->size - 1))); memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l); /* then put the rest (if any) at the beginning of the buffer */ memcpy(fifo->buffer, buffer + l, len - l); /* * Ensure that we add the bytes to the kfifo -before- * we update the fifo->in index. */ smp_wmb(); fifo->in += len; return len; }
奇怪嗎?代碼完全是線性結(jié)構(gòu),沒有任何if-else分支來判斷是否有足夠的空間存放數(shù)據(jù)。內(nèi)核在這里的代碼非常簡潔,沒有一行多余的代碼。
l = min(len, fifo->size - (fifo->in & (fifo->size - 1)));
這個表達式計算當前寫入的空間,換成人可理解的語言就是:
l = kfifo可寫空間和預期寫入空間的最小值
(3)使用min宏來代if-else分支
__kfifo_get也應用了同樣技巧,代碼如下:
unsigned int __kfifo_get(struct kfifo *fifo, unsigned char *buffer, unsigned int len) { unsigned int l; len = min(len, fifo->in - fifo->out); /* * Ensure that we sample the fifo->in index -before- we * start removing bytes from the kfifo. */ smp_rmb(); /* first get the data from fifo->out until the end of the buffer */ l = min(len, fifo->size - (fifo->out & (fifo->size - 1))); memcpy(buffer, fifo->buffer + (fifo->out & (fifo->size - 1)), l); /* then get the rest (if any) from the beginning of the buffer */ memcpy(buffer + l, fifo->buffer, len - l); /* * Ensure that we remove the bytes from the kfifo -before- * we update the fifo->out index. */ smp_mb(); fifo->out += len; return len; }
認真讀兩遍吧,我也讀了多次,每次總是有新發(fā)現(xiàn),因為in, out和size的關系太巧妙了,竟然能利用上unsigned int回繞的特性。
原來,kfifo每次入隊或出隊,kfifo->in或kfifo->out只是簡單地kfifo->in/kfifo->out += len,并沒有對kfifo->size 進行取模運算。因此kfifo->in和kfifo->out總是一直增大,直到unsigned in最大值時,又會繞回到0這一起始端。但始終滿足:
kfifo->in - kfifo->out <= kfifo->size 即使kfifo->in回繞到了0的那一端,這個性質(zhì)仍然是保持的。
對于給定的kfifo:
數(shù)據(jù)空間長度為:kfifo->in - kfifo->out 而剩余空間(可寫入空間)長度為:kfifo->size - (kfifo->in - kfifo->out)
盡管kfifo->in和kfofo->out一直超過kfifo->size進行增長,但它對應在kfifo->buffer空間的下標卻是如下:
kfifo->in % kfifo->size (i.e. kfifo->in & (kfifo->size - 1)) kfifo->out % kfifo->size (i.e. kfifo->out & (kfifo->size - 1))
往kfifo里面寫一塊數(shù)據(jù)時,數(shù)據(jù)空間、寫入空間和kfifo->size的關系如果滿足:
kfifo->in % size + len > size
那就要做寫拆分了,見下圖:
kfifo_put(寫)空間開始地址 | \_/ |XXXXXXXXXX XXXXXXXX| +--------------------------------------------------------------+ | |<----------data---------->| | +--------------------------------------------------------------+ ^ ^ ^ | | | out%size in%size size ^ | 寫空間結(jié)束地址
第一塊當然是: [kfifo->in % kfifo->size, kfifo->size] 第二塊當然是:[0, len - (kfifo->size - kfifo->in % kfifo->size)]
下面是代碼,細細體味吧:
/* first put the data starting from fifo->in to buffer end */ l = min(len, fifo->size - (fifo->in & (fifo->size - 1))); memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l); /* then put the rest (if any) at the beginning of the buffer */ memcpy(fifo->buffer, buffer + l, len - l);
對于kfifo_get過程,也是類似的,請各位自行分析。
(4)kfifo_get和kfifo_put無鎖并發(fā)操作
計算機科學家已經(jīng)證明,當只有一個讀經(jīng)程和一個寫線程并發(fā)操作時,不需要任何額外的鎖,就可以確保是線程安全的,也即kfifo使用了無鎖編程技術(shù),以提高kernel的并發(fā)。
kfifo使用in和out兩個指針來描述寫入和讀取游標,對于寫入操作,只更新in指針,而讀取操作,只更新out指針,可謂井水不犯河水,示意圖如下:
|<--寫入-->| +--------------------------------------------------------------+ | |<----------data----->| | +--------------------------------------------------------------+ |<--讀取-->| ^ ^ ^ | | | out in size
為了避免讀者看到寫者預計寫入,但實際沒有寫入數(shù)據(jù)的空間,寫者必須保證以下的寫入順序:
往[kfifo->in, kfifo->in + len]空間寫入數(shù)據(jù) 更新kfifo->in指針為 kfifo->in + len
在操作1完成時,讀者是還沒有看到寫入的信息的,因為kfifo->in沒有變化,認為讀者還沒有開始寫操作,只有更新kfifo->in之后,讀者才能看到。
那么如何保證1必須在2之前完成,秘密就是使用內(nèi)存屏障:smp_mb(),smp_rmb(), smp_wmb(),來保證對方觀察到的內(nèi)存操作順序。
6.2kfifo內(nèi)核隊列實現(xiàn) kfifo數(shù)據(jù)結(jié)構(gòu)定義如下:
struct kfifo { unsigned char *buffer; unsigned int size; unsigned int in; unsigned int out; spinlock_t *lock; }; // 創(chuàng)建隊列 struct kfifo *kfifo_init(unsigned char *buffer, unsigned int size, gfp_t gfp_mask, spinlock_t *lock) { struct kfifo *fifo; // 判斷是否為2的冪 BUG_ON(!is_power_of_2(size)); fifo = kmalloc(sizeof(struct kfifo), gfp_mask); if (!fifo) return ERR_PTR(-ENOMEM); fifo->buffer = buffer; fifo->size = size; fifo->in = fifo->out = 0; fifo->lock = lock; return fifo; } // 分配空間 struct kfifo *kfifo_alloc(unsigned int size, gfp_t gfp_mask, spinlock_t *lock) { unsigned char *buffer; struct kfifo *ret; // 判斷是否為2的冪 if (!is_power_of_2(size)) { BUG_ON(size > 0x80000000); // 向上擴展成2的冪 size = roundup_pow_of_two(size); } buffer = kmalloc(size, gfp_mask); if (!buffer) return ERR_PTR(-ENOMEM); ret = kfifo_init(buffer, size, gfp_mask, lock); if (IS_ERR(ret)) kfree(buffer); return ret; } void kfifo_free(struct kfifo *fifo) { kfree(fifo->buffer); kfree(fifo); } // 入隊操作 static inline unsigned int kfifo_put(struct kfifo *fifo, const unsigned char *buffer, unsigned int len) { unsigned long flags; unsigned int ret; spin_lock_irqsave(fifo->lock, flags); ret = __kfifo_put(fifo, buffer, len); spin_unlock_irqrestore(fifo->lock, flags); return ret; } // 出隊操作 static inline unsigned int kfifo_get(struct kfifo *fifo, unsigned char *buffer, unsigned int len) { unsigned long flags; unsigned int ret; spin_lock_irqsave(fifo->lock, flags); ret = __kfifo_get(fifo, buffer, len); //當fifo->in == fifo->out時,buufer為空 if (fifo->in == fifo->out) fifo->in = fifo->out = 0; spin_unlock_irqrestore(fifo->lock, flags); return ret; } // 入隊操作 unsigned int __kfifo_put(struct kfifo *fifo, const unsigned char *buffer, unsigned int len) { unsigned int l; //buffer中空的長度 len = min(len, fifo->size - fifo->in + fifo->out); // 內(nèi)存屏障:smp_mb(),smp_rmb(), smp_wmb()來保證對方觀察到的內(nèi)存操作順序 smp_mb(); // 將數(shù)據(jù)追加到隊列尾部 l = min(len, fifo->size - (fifo->in & (fifo->size - 1))); memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l); memcpy(fifo->buffer, buffer + l, len - l); smp_wmb(); //每次累加,到達最大值后溢出,自動轉(zhuǎn)為0 fifo->in += len; return len; } // 出隊操作 unsigned int __kfifo_get(struct kfifo *fifo, unsigned char *buffer, unsigned int len) { unsigned int l; //有數(shù)據(jù)的緩沖區(qū)的長度 len = min(len, fifo->in - fifo->out); smp_rmb(); l = min(len, fifo->size - (fifo->out & (fifo->size - 1))); memcpy(buffer, fifo->buffer + (fifo->out & (fifo->size - 1)), l); memcpy(buffer + l, fifo->buffer, len - l); smp_mb(); fifo->out += len; //每次累加,到達最大值后溢出,自動轉(zhuǎn)為0 return len; } static inline void __kfifo_reset(struct kfifo *fifo) { fifo->in = fifo->out = 0; } static inline void kfifo_reset(struct kfifo *fifo) { unsigned long flags; spin_lock_irqsave(fifo->lock, flags); __kfifo_reset(fifo); spin_unlock_irqrestore(fifo->lock, flags); } static inline unsigned int __kfifo_len(struct kfifo *fifo) { return fifo->in - fifo->out; } static inline unsigned int kfifo_len(struct kfifo *fifo) { unsigned long flags; unsigned int ret; spin_lock_irqsave(fifo->lock, flags); ret = __kfifo_len(fifo); spin_unlock_irqrestore(fifo->lock, flags); return ret; }
6.3kfifo設計要點 (1)保證buffer size為2的冪
kfifo->size值在調(diào)用者傳遞參數(shù)size的基礎上向2的冪擴展,目的是使kfifo->size取模運算可以轉(zhuǎn)化為位與運算(提高運行效率)。kfifo->in % kfifo->size轉(zhuǎn)化為 kfifo->in & (kfifo->size – 1)
保證size是2的冪可以通過位運算的方式求余,在頻繁操作隊列的情況下可以大大提高效率。
(2)使用spin_lock_irqsave與spin_unlock_irqrestore 實現(xiàn)同步。
Linux內(nèi)核中有spin_lock、spin_lock_irq和spin_lock_irqsave保證同步。
static inline void __raw_spin_lock(raw_spinlock_t *lock) { preempt_disable(); spin_acquire(&lock->dep_map, 0, 0, _RET_IP_); LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock); } static inline void __raw_spin_lock_irq(raw_spinlock_t *lock) { local_irq_disable(); preempt_disable(); spin_acquire(&lock->dep_map, 0, 0, _RET_IP_); LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock); }
spin_lock比spin_lock_irq速度快,但并不是線程安全的。spin_lock_irq增加調(diào)用local_irq_disable函數(shù),即禁止本地中斷,是線程安全的,既禁止本地中斷,又禁止內(nèi)核搶占。
spin_lock_irqsave是基于spin_lock_irq實現(xiàn)的一個輔助接口,在進入和離開臨界區(qū)后,不會改變中斷的開啟、關閉狀態(tài)。
如果自旋鎖在中斷處理函數(shù)中被用到,在獲取自旋鎖前需要關閉本地中斷,spin_lock_irqsave實現(xiàn)如下:
A、保存本地中斷狀態(tài);
B、關閉本地中斷;
C、獲取自旋鎖。
解鎖時通過 spin_unlock_irqrestore完成釋放鎖、恢復本地中斷到原來狀態(tài)等工作。
(3)線性代碼結(jié)構(gòu)
代碼中沒有任何if-else分支來判斷是否有足夠的空間存放數(shù)據(jù),kfifo每次入隊或出隊只是簡單的 +len 判斷剩余空間,并沒有對kfifo->size 進行取模運算,所以kfifo->in和kfifo->out總是一直增大,直到unsigned in超過最大值時繞回到0這一起始端,但始終滿足:kfifo->in - kfifo->out <= kfifo->size。
(4)使用Memory Barrier
mb():適用于多處理器和單處理器的內(nèi)存屏障。
rmb():適用于多處理器和單處理器的讀內(nèi)存屏障。
wmb():適用于多處理器和單處理器的寫內(nèi)存屏障。
smp_mb():適用于多處理器的內(nèi)存屏障。
smp_rmb():適用于多處理器的讀內(nèi)存屏障。
smp_wmb():適用于多處理器的寫內(nèi)存屏障。
Memory Barrier使用場景如下:
程序在運行時內(nèi)存實際訪問順序和程序代碼編寫的訪問順序不一定一致,即內(nèi)存亂序訪問。內(nèi)存亂序訪問行為出現(xiàn)是為了提升程序運行時的性能。內(nèi)存亂序訪問主要發(fā)生在兩個階段:
Memory Barrier能夠讓CPU或編譯器在內(nèi)存訪問上有序。Memory barrier前的內(nèi)存訪問操作必定先于其后的完成。Memory Barrier包括兩類:
A、編譯器Memory Barrier。
B、CPU Memory Barrier。
通常,編譯器和CPU引起內(nèi)存亂序訪問不會帶來問題,但如果程序邏輯的正確性依賴于內(nèi)存訪問順序,內(nèi)存亂序訪問會帶來邏輯上的錯誤。
在編譯時,編譯器對代碼做出優(yōu)化時可能改變實際執(zhí)行指令的順序(如GCC的O2或O3都會改變實際執(zhí)行指令的順序)。
在運行時,CPU雖然會亂序執(zhí)行指令,但在單個CPU上,硬件能夠保證程序執(zhí)行時所有的內(nèi)存訪問操作都是按程序代碼編寫的順序執(zhí)行的,Memory Barrier沒有必要使用(不考慮編譯器優(yōu)化)。為了更快執(zhí)行指令,CPU采取流水線的執(zhí)行方式,編譯器在編譯代碼時為了使指令更適合CPU的流水線執(zhí)行方式以及多CPU執(zhí)行,原本指令就會出現(xiàn)亂序的情況。在亂序執(zhí)行時,CPU真正執(zhí)行指令的順序由可用的輸入數(shù)據(jù)決定,而非程序員編寫的順序。
七、總結(jié) C++ 無鎖隊列在多線程編程中占據(jù)著舉足輕重的地位。它不僅是解決多核心優(yōu)化問題的關鍵技術(shù)之一,也是邁向高效多線程編程的重要基石。
在多線程編程的實際應用中,無鎖隊列的性能優(yōu)勢使其在各種場景下都能發(fā)揮出色的作用。無論是游戲開發(fā)、高性能計算、并發(fā)編程還是高并發(fā)網(wǎng)絡編程,無鎖隊列都能為程序提供高效的數(shù)據(jù)處理和同步機制。
從單生產(chǎn)者單消費者隊列到多生產(chǎn)者多消費者隊列,不同的場景需求都能找到合適的無鎖隊列實現(xiàn)方式。鏈式隊列、數(shù)組隊列和環(huán)形隊列等多種形式的無鎖隊列,為開發(fā)者提供了豐富的選擇,以適應不同的數(shù)據(jù)量和應用場景。
無鎖隊列的高效性、線程安全性、可擴展性和低延遲等特點,使其成為處理高并發(fā)任務的理想選擇。它避免了鎖競爭和死鎖問題,減少了上下文切換和線程調(diào)度的開銷,提高了緩存的命中率,從而極大地提升了程序的性能。
隨著多核心處理器的普及和高并發(fā)應用的不斷增加,C++ 無鎖隊列的應用前景將更加廣闊。開發(fā)者可以繼續(xù)深入研究和優(yōu)化無鎖隊列的實現(xiàn),以滿足不斷增長的性能需求和復雜的應用場景。相信在未來的多線程編程領域,C++ 無鎖隊列將繼續(xù)發(fā)揮重要作用,為構(gòu)建高效、可靠的多線程應用程序提供有力支持。