日韩黑丝制服一区视频播放|日韩欧美人妻丝袜视频在线观看|九九影院一级蜜桃|亚洲中文在线导航|青草草视频在线观看|婷婷五月色伊人网站|日本一区二区在线|国产AV一二三四区毛片|正在播放久草视频|亚洲色图精品一区

分享

探索C++無鎖隊列:多線程編程的高效利器

 深度Linux 2024-11-06 發(fā)布于湖南

在 C++ 的多線程編程世界中,有一個神奇的存在 —— 無鎖隊列。它宛如一座堅固的橋梁,橫跨在多線程協(xié)作的鴻溝之上,成為提升程序性能和穩(wěn)定性的關鍵角色。

隨著計算機硬件的不斷發(fā)展,多核處理器已經(jīng)成為主流。多線程編程由此變得愈發(fā)重要,然而,傳統(tǒng)基于鎖的同步機制卻逐漸暴露出諸多問題。在這一背景下,無鎖隊列應運而生,它宛如黑暗中的燈塔,為多線程程序的高效運行照亮了前行的道路。

想象一下,在一個繁忙的交通樞紐,傳統(tǒng)的鎖機制就像是交通管制中的紅綠燈,雖然能維持秩序,但頻繁的等待和切換也會造成擁堵。而無鎖隊列則更像是智能交通系統(tǒng),讓數(shù)據(jù)在多線程之間流暢地穿梭,無需不必要的停頓。它為我們打開了一扇通往更高效多線程編程的大門,讓我們一同走進 C++ 無鎖隊列的奇妙世界吧。

一、無鎖隊列簡介

無鎖隊列(Lock-Free Queue)是一種并發(fā)數(shù)據(jù)結(jié)構(gòu),用于在多線程環(huán)境下實現(xiàn)高效的數(shù)據(jù)交換。與傳統(tǒng)的基于鎖的隊列相比,無鎖隊列使用了一些特殊的算法和技術(shù),避免了線程之間的互斥操作,從而提高了并發(fā)性能和響應性。

無鎖隊列通?;谠硬僮鳎╝tomic operations)或其他底層同步原語來實現(xiàn),并且它們采用一些巧妙的方法來確保操作的正確性。主要思想是通過使用原子讀寫操作或類似的機制,在沒有顯式鎖定整個隊列的情況下實現(xiàn)線程安全。

典型的無鎖隊列算法有循環(huán)緩沖區(qū)(Circular Buffer)和鏈表(Linked List)等。循環(huán)緩沖區(qū)通常使用兩個指針(head 和 tail)來表示隊列的開始和結(jié)束位置,利用自旋、CAS (Compare-and-Swap) 等原子操作來進行入隊和出隊操作。鏈表則通過利用 CAS 操作插入或刪除節(jié)點來實現(xiàn)并發(fā)訪問。

在當今多核心優(yōu)化的大背景下,無鎖隊列在多線程編程中起著至關重要的作用。多核心優(yōu)化是當前游戲開發(fā)以及許多領域的重點課題,無論是工程實踐還是算法研究,將工作并行化交由多線程去處理是極為普遍的場景。

在這種情況下,線程池與命令隊列的組合常常被采用,而其中的命令隊列就可以選擇使用互斥鎖或者無鎖隊列。由于命令隊列的讀寫通常是較輕量級的操作,采用無鎖隊列能夠獲得比有鎖操作更高的性能。無鎖隊列通過使用原子操作來確保線程安全,避免了鎖的開銷,包括上下文切換、線程調(diào)度延遲以及潛在的死鎖問題。

在多處理器系統(tǒng)中,無鎖隊列可以更好地擴展。隨著處理器數(shù)量的增加,使用鎖的隊列可能會遇到瓶頸,因為多個線程競爭同一個鎖。而無鎖隊列通過減少這種競爭,可以提供更好的并行性。在實時系統(tǒng)中,無鎖隊列可以提供更高效的響應時間,確保系統(tǒng)能夠及時處理各種任務。

優(yōu)點:

  • 提供了更好的并發(fā)性能,避免了互斥操作帶來的性能瓶頸。

  • 對于高度競爭情況下可以提供更好的可伸縮性。

缺點:

  • 實現(xiàn)相對復雜,需要考慮并發(fā)安全和正確性問題。

  • 在高度競爭的情況下可能出現(xiàn)自旋等待導致的性能損失。

二、無鎖隊列工作原理

無鎖隊列的原理是通過使用原子操作(atomic operations)或其他底層同步原語來實現(xiàn)并發(fā)安全。它們避免了傳統(tǒng)鎖機制中的互斥操作,以提高并發(fā)性能和響應性。典型的無鎖隊列算法有循環(huán)緩沖區(qū)(Circular Buffer)和鏈表(Linked List)等。

在循環(huán)緩沖區(qū)的實現(xiàn)中,通常使用兩個指針來表示隊列的開始位置和結(jié)束位置,即頭指針(head)和尾指針(tail)。入隊時,通過自旋、CAS (Compare-and-Swap) 等原子操作更新尾指針,并將元素放入相應位置。出隊時,同樣利用原子操作更新頭指針,并返回對應位置上的元素。

鏈表實現(xiàn)無鎖隊列時,在插入或刪除節(jié)點時使用 CAS 操作來確保只有一個線程成功修改節(jié)點的指針值。這樣可以避免對整個鏈表進行加鎖操作。

無論是循環(huán)緩沖區(qū)還是鏈表實現(xiàn),關鍵點在于如何利用原子操作確保不同線程之間的協(xié)調(diào)與一致性。需要仔細處理并發(fā)情況下可能出現(xiàn)的競爭條件,并設計合適的算法來保證正確性和性能。

2.1隊列操作模型

隊列是一種非常重要的數(shù)據(jù)結(jié)構(gòu),其特性是先進先出(FIFO),符合流水線業(yè)務流程。在進程間通信、網(wǎng)絡通信間經(jīng)常采用隊列做緩存,緩解數(shù)據(jù)處理壓力。根據(jù)操作隊列的場景分為:單生產(chǎn)者——單消費者、多生產(chǎn)者——單消費者、單生產(chǎn)者——多消費者、多生產(chǎn)者——多消費者四大模型。根據(jù)隊列中數(shù)據(jù)分為:隊列中的數(shù)據(jù)是定長的、隊列中的數(shù)據(jù)是變長的。

(1)單生產(chǎn)者——單消費者

(2)多生產(chǎn)者——單消費者

(3)單生產(chǎn)者——多消費者

(4)多生產(chǎn)者——多消費者

2.2CAS操作

CAS即Compare and Swap,是所有CPU指令都支持CAS的原子操作(X86中CMPXCHG匯編指令),用于實現(xiàn)實現(xiàn)各種無鎖(lock free)數(shù)據(jù)結(jié)構(gòu)。

CAS操作的C語言實現(xiàn)如下:

bool compare_and_swap ( int *memory_location, int expected_value, int new_value)
{
if (*memory_location == expected_value)
{
*memory_location = new_value;
return true;
}
return false;
}

CAS用于檢查一個內(nèi)存位置是否包含預期值,如果包含,則把新值復賦值到內(nèi)存位置。成功返回true,失敗返回false。

(1)GGC對CAS支持,GCC4.1+版本中支持CAS原子操作。

bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...);
type __sync_val_compare_and_swap (type *ptr, type oldval type newval, ...);

(2)Windows對CAS支持,Windows中使用Windows API支持CAS。

LONG InterlockedCompareExchange(
LONG volatile *Destination,
LONG ExChange,
LONG Comperand
);

(3)C11對CAS支持,C11 STL中atomic函數(shù)支持CAS并可以跨平臺。

template< class T >
bool atomic_compare_exchange_weak( std::atomic* obj,T* expected, T desired );
template< class T >
bool atomic_compare_exchange_weak( volatile std::atomic* obj,T* expected, T desired );

其它原子操作如下:

  • Fetch-And-Add:一般用來對變量做+1的原子操作

  • Test-and-set:寫值到某個內(nèi)存位置并傳回其舊值

2.3隊列數(shù)據(jù)定長與變長

(1)隊列數(shù)據(jù)定長

(2)隊列數(shù)據(jù)變長

⑴問題描述

在多線程環(huán)境下,原始隊列會出現(xiàn)各種不可預料的問題。以兩個線程同時寫入為例,假設線程 A 和線程 B 同時對原始隊列進行寫入操作。首先看原始隊列的入隊偽代碼:void Enqueue(Node *node){m_Tail->next = node;m_Tail = node;},這個操作分為兩步。當兩個線程同時執(zhí)行時,可能出現(xiàn)這樣的情況:線程 A 執(zhí)行完第一步m_Tail->next = nodeC后,線程 B 開始執(zhí)行并完成了整個入隊操作,接著線程 A 繼續(xù)執(zhí)行第二步m_Tail = nodeB,這就導致了 Tail 指針失去與隊列的鏈接,后加的節(jié)點從 Head 開始就訪問不到了。這種情況會使得隊列的狀態(tài)變得混亂,無法保證數(shù)據(jù)的正確存儲和讀取。

⑵解決方法

為了解決上述問題,可以使用原子操作實現(xiàn)無鎖同步。原子操作是不可分割的操作,CPU 的一個線程在執(zhí)行原子操作時,不會被其他線程中斷或搶占。其中,典型的原子操作有 Load / Store(讀取與保存)、Test and Set(針對 bool 變量,如果為 true 則返回 true,如果為 false,則將變量置為 true 并返回 false)、Clear(將 bool 變量設為 false)、Exchange(將指定位置的值設置為傳入值,并返回其舊值)等。

而 CAS(Compare And Swap)在實現(xiàn)無鎖同步中起著關鍵作用。CAS 操作包含三個參數(shù):一個內(nèi)存地址 V、一個期望值 A 和一個新值 B。當執(zhí)行 CAS 操作時,如果當前內(nèi)存地址 V 中存儲的值等于期望值 A,則將新值 B 寫入該內(nèi)存地址,并返回 true;否則,不做任何修改,并返回 false。在無鎖隊列中,可以利用 CAS 操作來確保對 Head 或 Tail 指針的讀寫操作是原子性的,從而避免多線程同時寫入或讀取時出現(xiàn)的指針混亂問題。例如,在入隊操作中,可以使用 CAS 來確保在更新 Tail 指針時,不會被其他線程干擾。如果當前 Tail 指針指向的節(jié)點的_next指針與期望值不一致,說明有其他線程進行了寫入操作,此時可以重新嘗試 CAS 操作,直到成功為止。這樣就可以實現(xiàn)無鎖隊列的安全寫入和讀取操作。

三、無鎖隊列方案

3.1boost方案

boost提供了三種無鎖方案,分別適用不同使用場景。

  • boost::lockfree::queue是支持多個生產(chǎn)者和多個消費者線程的無鎖隊列。

  • boost::lockfree::stack是支持多個生產(chǎn)者和多個消費者線程的無鎖棧。

  • boost::lockfree::spsc_queue是僅支持單個生產(chǎn)者和單個消費者線程的無鎖隊列,比boost::lockfree::queue性能更好。

Boost無鎖數(shù)據(jù)結(jié)構(gòu)的API通過輕量級原子鎖實現(xiàn)lock-free,不是真正意義的無鎖。

Boost提供的queue可以設置初始容量,添加新元素時如果容量不夠,則總?cè)萘孔詣釉鲩L;但對于無鎖數(shù)據(jù)結(jié)構(gòu),添加新元素時如果容量不夠,總?cè)萘坎粫詣釉鲩L。

3.2并發(fā)隊列

ConcurrentQueue采用了無鎖算法來實現(xiàn)并發(fā)操作。它基于CAS(Compare-and-Swap)原子操作和其他底層同步原語來保證線程安全性。具體來說,它使用自旋鎖和原子指令來確保對隊列的修改是原子的,并且在多個線程之間共享數(shù)據(jù)時提供正確性保證。

ConcurrentQueue是基于C++實現(xiàn)的工業(yè)級無鎖隊列方案。
GitHub:https://github.com/cameron314/concurrentqueue
ReaderWriterQueue是基于C++實現(xiàn)的單生產(chǎn)者單消費者場景的無鎖隊列方案。
GitHub:https://github.com/cameron314/readerwriterqueue

ConcurrentQueue具有以下特點:

  • 線程安全:多個線程可以同時對隊列進行操作而無需額外加鎖。

  • 無阻塞:入隊和出隊操作通常是非阻塞的,并且具有較低的開銷。

  • 先進先出(FIFO)順序:元素按照插入順序排列,在出隊時會返回最早入隊的元素。

使用ConcurrentQueue可以方便地處理多個線程之間共享數(shù)據(jù),并減少由于加鎖引起的性能開銷。但需要注意,雖然ConcurrentQueue提供了高效、線程安全的并發(fā)操作,但在某些特定情況下可能不適合所有應用場景,因此在選擇數(shù)據(jù)結(jié)構(gòu)時需要根據(jù)具體需求進行評估。

3.3Disruptor

Disruptor是一種高性能的并發(fā)編程框架,用于實現(xiàn)無鎖(lock-free)的并發(fā)數(shù)據(jù)結(jié)構(gòu)。它最初由LMAX Exchange開發(fā),并成為了其核心交易引擎的關鍵組件。

Disruptor旨在解決在高度多線程環(huán)境下的數(shù)據(jù)共享和通信問題。它基于環(huán)形緩沖區(qū)(Ring Buffer)和事件驅(qū)動模型,通過優(yōu)化內(nèi)存訪問和線程調(diào)度,提供了非常高效的消息傳遞機制。

Disruptor是英國外匯交易公司LMAX基于JAVA開發(fā)的一個高性能隊列。

GitHub:https://github.com/LMAX-Exchange/disruptor

主要特點如下:

  • 無鎖設計:Disruptor使用CAS(Compare-and-Swap)等無鎖算法來避免使用傳統(tǒng)鎖帶來的競爭和阻塞。

  • 高吞吐量:Disruptor利用環(huán)形緩沖區(qū)和預分配內(nèi)存等技術(shù),在保證正確性前提下追求盡可能高的處理速度。

  • 低延遲:由于無鎖設計和緊湊的內(nèi)存布局,Disruptor能夠?qū)崿F(xiàn)非常低的消息處理延遲。

  • 線程間協(xié)調(diào):Disruptor提供了靈活而強大的事件發(fā)布、消費者等待及觸發(fā)機制,可用于實現(xiàn)復雜的線程間通信模式。

使用Disruptor可以有效地解決生產(chǎn)者-消費者模型中數(shù)據(jù)傳遞過程中的性能瓶頸,特別適用于高并發(fā)、低延遲的應用場景,例如金融交易系統(tǒng)、消息隊列等。然而,由于Disruptor對編程模型和理解要求較高,使用時需要仔細考慮,并根據(jù)具體需求評估是否適合。

四、無鎖隊列的實現(xiàn)方式

4.1環(huán)形緩沖區(qū)

RingBuffer是生產(chǎn)者和消費者模型中常用的數(shù)據(jù)結(jié)構(gòu),生產(chǎn)者將數(shù)據(jù)追加到數(shù)組尾端,當達到數(shù)組的尾部時,生產(chǎn)者繞回到數(shù)組的頭部;消費者從數(shù)組頭端取走數(shù)據(jù),當?shù)竭_數(shù)組的尾部時,消費者繞回到數(shù)組頭部。

如果只有一個生產(chǎn)者和一個消費者,環(huán)形緩沖區(qū)可以無鎖訪問,環(huán)形緩沖區(qū)的寫入index只允許生產(chǎn)者訪問并修改,只要生產(chǎn)者在更新index前將新的值保存到緩沖區(qū)中,則消費者將始終看到一致的數(shù)據(jù)結(jié)構(gòu);讀取index也只允許消費者訪問并修改,消費者只要在取走數(shù)據(jù)后更新讀index,則生產(chǎn)者將始終看到一致的數(shù)據(jù)結(jié)構(gòu)。

  • 空隊列時,front與rear相等;當有元素進隊,則rear后移;有元素出隊,則front后移。

  • 空隊列時,rear等于front;滿隊列時,隊列尾部空一個位置,因此判斷循環(huán)隊列滿時使用(rear-front+maxn)%maxn。

入隊操作:

data[rear] = x;
rear = (rear+1)%maxn;

出隊操作:

x = data[front];
front = (front+1)%maxn;

單生產(chǎn)者單消費者

對于單生產(chǎn)者和單消費者場景,由于read_index和write_index都只會有一個線程寫,因此不需要加鎖也不需要原子操作,直接修改即可,但讀寫數(shù)據(jù)時需要考慮遇到數(shù)組尾部的情況。

線程對write_index和read_index的讀寫操作如下:

  • (1)寫操作。先判斷隊列時否為滿,如果隊列未滿,則先寫數(shù)據(jù),寫完數(shù)據(jù)后再修改write_index。

  • (2)讀操作。先判斷隊列是否為空,如果隊列不為空,則先讀數(shù)據(jù),讀完再修改read_index。

多生產(chǎn)者單消費者:多生產(chǎn)者和單消費者場景中,由于多個生產(chǎn)者都會修改write_index,所以在不加鎖的情況下必須使用原子操作。

4.2RingBuffer實現(xiàn)

RingBuffer.hpp文件:

#pragma once

template <class T>
class RingBuffer
{
public:
RingBuffer(unsigned size): m_size(size), m_front(0), m_rear(0)
{
m_data = new T[size];
}

~RingBuffer()
{
delete [] m_data;
m_data = NULL;
}

inline bool isEmpty() const
{
return m_front == m_rear;
}

inline bool isFull() const
{
return m_front == (m_rear + 1) % m_size;
}

bool push(const T& value)
{
if(isFull())
{
return false;
}
m_data[m_rear] = value;
m_rear = (m_rear + 1) % m_size;
return true;
}

bool push(const T* value)
{
if(isFull())
{
return false;
}
m_data[m_rear] = *value;
m_rear = (m_rear + 1) % m_size;
return true;
}

inline bool pop(T& value)
{
if(isEmpty())
{
return false;
}
value = m_data[m_front];
m_front = (m_front + 1) % m_size;
return true;
}

inline unsigned int front()const
{
return m_front;
}

inline unsigned int rear()const
{
return m_rear;
}

inline unsigned int size()const
{
return m_size;
}
private:
unsigned int m_size;// 隊列長度
int m_front;// 隊列頭部索引
int m_rear;// 隊列尾部索引
T* m_data;// 數(shù)據(jù)緩沖區(qū)
};

RingBufferTest.cpp測試代碼:

#include <stdio.h>
#include <thread>
#include <unistd.h>
#include <sys/time.h>
#include "RingBuffer.hpp"


class Test
{
public:
Test(int id = 0, int value = 0)
{
this->id = id;
this->value = value;
sprintf(data, "id = %d, value = %d\n", this->id, this->value);
}

void display()
{
printf("%s", data);
}
private:
int id;
int value;
char data[128];
};

double getdetlatimeofday(struct timeval *begin, struct timeval *end)
{
return (end->tv_sec + end->tv_usec * 1.0 / 1000000) -
(begin->tv_sec + begin->tv_usec * 1.0 / 1000000);
}

RingBuffer<Test> queue(1 << 12);2u000

#define N (10 * (1 << 20))

void produce()
{
struct timeval begin, end;
gettimeofday(&begin, NULL);
unsigned int i = 0;
while(i < N)
{
if(queue.push(Test(i % 1024, i)))
{
i++;
}
}

gettimeofday(&end, NULL);
double tm = getdetlatimeofday(&begin, &end);
printf("producer tid=%lu %f MB/s %f msg/s elapsed= %f size= %u\n", pthread_self(), N * sizeof(Test) * 1.0 / (tm * 1024 * 1024), N * 1.0 / tm, tm, i);
}

void consume()
{
sleep(1);
Test test;
struct timeval begin, end;
gettimeofday(&begin, NULL);
unsigned int i = 0;
while(i < N)
{
if(queue.pop(test))
{
// test.display();
i++;
}
}
gettimeofday(&end, NULL);
double tm = getdetlatimeofday(&begin, &end);
printf("consumer tid=%lu %f MB/s %f msg/s elapsed= %f, size=%u \n", pthread_self(), N * sizeof(Test) * 1.0 / (tm * 1024 * 1024), N * 1.0 / tm, tm, i);
}

int main(int argc, char const *argv[])
{
std::thread producer1(produce);
std::thread consumer(consume);
producer1.join();
consumer.join();
return 0;
}

編譯:

g++ --std=c++11  RingBufferTest.cpp -o test -pthread

單生產(chǎn)者單消費者場景下,消息吞吐量為350萬條/秒左右。

4.3LockFreeQueue實現(xiàn)

LockFreeQueue.hpp:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <fcntl.h>
#include <stdbool.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/time.h>
#include <sys/mman.h>

#define SHM_NAME_LEN 128
#define MIN(a, b) ((a) > (b) ? (b) : (a))
#define IS_POT(x) ((x) && !((x) & ((x)-1)))
#define MEMORY_BARRIER __sync_synchronize()

template <class T>
class LockFreeQueue
{
protected:
typedef struct
{
int m_lock;
inline void spinlock_init()
{
m_lock = 0;
}

inline void spinlock_lock()
{
while(!__sync_bool_compare_and_swap(&m_lock, 0, 1)) {}
}

inline void spinlock_unlock()
{
__sync_lock_release(&m_lock);
}
} spinlock_t;

public:
// size:隊列大小
// name:共享內(nèi)存key的路徑名稱,默認為NULL,使用數(shù)組作為底層緩沖區(qū)。
LockFreeQueue(unsigned int size, const char* name = NULL)
{
memset(shm_name, 0, sizeof(shm_name));
createQueue(name, size);
}

~LockFreeQueue()
{
if(shm_name[0] == 0)
{
delete [] m_buffer;
m_buffer = NULL;
}
else
{
if (munmap(m_buffer, m_size * sizeof(T)) == -1) {
perror("munmap");
}
if (shm_unlink(shm_name) == -1) {
perror("shm_unlink");
}
}
}

bool isFull()const
{
#ifdef USE_POT
return m_head == (m_tail + 1) & (m_size - 1);
#else
return m_head == (m_tail + 1) % m_size;
#endif
}

bool isEmpty()const
{
return m_head == m_tail;
}

unsigned int front()const
{
return m_head;
}

unsigned int tail()const
{
return m_tail;
}

bool push(const T& value)
{
#ifdef USE_LOCK
m_spinLock.spinlock_lock();
#endif
if(isFull())
{
#ifdef USE_LOCK
m_spinLock.spinlock_unlock();
#endif
return false;
}
memcpy(m_buffer + m_tail, &value, sizeof(T));
#ifdef USE_MB
MEMORY_BARRIER;
#endif

#ifdef USE_POT
m_tail = (m_tail + 1) & (m_size - 1);
#else
m_tail = (m_tail + 1) % m_size;
#endif

#ifdef USE_LOCK
m_spinLock.spinlock_unlock();
#endif
return true;
}

bool pop(T& value)
{
#ifdef USE_LOCK
m_spinLock.spinlock_lock();
#endif
if (isEmpty())
{
#ifdef USE_LOCK
m_spinLock.spinlock_unlock();
#endif
return false;
}
memcpy(&value, m_buffer + m_head, sizeof(T));
#ifdef USE_MB
MEMORY_BARRIER;
#endif

#ifdef USE_POT
m_head = (m_head + 1) & (m_size - 1);
#else
m_head = (m_head + 1) % m_size;
#endif

#ifdef USE_LOCK
m_spinLock.spinlock_unlock();
#endif
return true;
}

protected:
virtual void createQueue(const char* name, unsigned int size)
{
#ifdef USE_POT
if (!IS_POT(size))
{
size = roundup_pow_of_two(size);
}
#endif
m_size = size;
m_head = m_tail = 0;
if(name == NULL)
{
m_buffer = new T[m_size];
}
else
{
int shm_fd = shm_open(name, O_CREAT | O_RDWR, 0666);
if (shm_fd < 0)
{
perror("shm_open");
}

if (ftruncate(shm_fd, m_size * sizeof(T)) < 0)
{
perror("ftruncate");
close(shm_fd);
}

void *addr = mmap(0, m_size * sizeof(T), PROT_READ | PROT_WRITE, MAP_SHARED, shm_fd, 0);
if (addr == MAP_FAILED)
{
perror("mmap");
close(shm_fd);
}
if (close(shm_fd) == -1)
{
perror("close");
exit(1);
}

m_buffer = static_cast<T*>(addr);
memcpy(shm_name, name, SHM_NAME_LEN - 1);
}
#ifdef USE_LOCK
spinlock_init(m_lock);
#endif
}
inline unsigned int roundup_pow_of_two(size_t size)
{
size |= size >> 1;
size |= size >> 2;
size |= size >> 4;
size |= size >> 8;
size |= size >> 16;
size |= size >> 32;
return size + 1;
}
protected:
char shm_name[SHM_NAME_LEN];
volatile unsigned int m_head;
volatile unsigned int m_tail;
unsigned int m_size;
#ifdef USE_LOCK
spinlock_t m_spinLock;
#endif
T* m_buffer;
};
#define USE_LOCK

開啟spinlock鎖,多生產(chǎn)者多消費者場景

#define USE_MB

開啟Memory Barrier

#define USE_POT

開啟隊列大小的2的冪對齊

LockFreeQueueTest.cpp測試文件:

#include "LockFreeQueue.hpp"
#include <thread>

//#define USE_LOCK

class Test
{
public:
Test(int id = 0, int value = 0)
{
this->id = id;
this->value = value;
sprintf(data, "id = %d, value = %d\n", this->id, this->value);
}

void display()
{
printf("%s", data);
}
private:
int id;
int value;
char data[128];
};

double getdetlatimeofday(struct timeval *begin, struct timeval *end)
{
return (end->tv_sec + end->tv_usec * 1.0 / 1000000) -
(begin->tv_sec + begin->tv_usec * 1.0 / 1000000);
}

LockFreeQueue<Test> queue(1 << 10, "/shm");

#define N ((1 << 20))

void produce()
{
struct timeval begin, end;
gettimeofday(&begin, NULL);
unsigned int i = 0;
while(i < N)
{
if(queue.push(Test(i >> 10, i)))
i++;
}
gettimeofday(&end, NULL);
double tm = getdetlatimeofday(&begin, &end);
printf("producer tid=%lu %f MB/s %f msg/s elapsed= %f size= %u\n", pthread_self(), N * sizeof(Test) * 1.0 / (tm * 1024 * 1024), N * 1.0 / tm, tm, i);
}

void consume()
{
Test test;
struct timeval begin, end;
gettimeofday(&begin, NULL);
unsigned int i = 0;
while(i < N)
{
if(queue.pop(test))
{
//test.display();
i++;
}
}
gettimeofday(&end, NULL);
double tm = getdetlatimeofday(&begin, &end);
printf("consumer tid=%lu %f MB/s %f msg/s elapsed= %f size= %u\n", pthread_self(), N * sizeof(Test) * 1.0 / (tm * 1024 * 1024), N * 1.0 / tm, tm, i);
}

int main(int argc, char const *argv[])
{
std::thread producer1(produce);
//std::thread producer2(produce);
std::thread consumer(consume);
producer1.join();
//producer2.join();
consumer.join();

return 0;
}

多線程場景下,需要定義USE_LOCK宏,開啟鎖保護。

編譯:

g++ --std=c++11 -O3 LockFreeQueueTest.cpp -o test -lrt -pthread

4.4場景分析(不同場景的實現(xiàn))

單生產(chǎn)者單消費者隊列(SPSC 隊列):在這種隊列中,只有一個生產(chǎn)者線程和一個消費者線程操作該隊列。由于只有一個線程操作隊列,因此不需要考慮線程同步和數(shù)據(jù)競爭的問題,可以實現(xiàn)非常高效的數(shù)據(jù)訪問。

例如,在某些特定的任務處理場景中,一個線程負責生成任務,另一個線程負責處理任務,此時使用 SPSC 隊列可以避免復雜的同步機制,提高處理效率。

多生產(chǎn)者多消費者隊列(MPMC 隊列):在這種隊列中,有多個生產(chǎn)者線程和多個消費者線程操作該隊列。由于存在多個線程同時操作隊列,因此必須考慮線程同步和數(shù)據(jù)競爭的問題,需要使用一些同步機制來保證數(shù)據(jù)的正確性。

常見的實現(xiàn)方式是使用原子操作和 CAS 等技術(shù)。例如,在一個高并發(fā)的服務器程序中,多個客戶端請求可以看作是生產(chǎn)者,服務器的多個處理線程可以看作是消費者,使用 MPMC 隊列可以有效地管理這些請求和處理任務。

單生產(chǎn)者多消費者隊列(SPMC 隊列):在這種隊列中,生產(chǎn)者線程向隊列中寫入數(shù)據(jù),多個消費者線程從隊列中讀取數(shù)據(jù)。這種隊列的實現(xiàn)可以使用原子操作或者互斥鎖來實現(xiàn)線程同步。

比如在一個視頻處理系統(tǒng)中,一個視頻采集線程作為生產(chǎn)者,多個視頻編碼線程作為消費者,使用 SPMC 隊列可以實現(xiàn)高效的數(shù)據(jù)傳輸和處理。

多生產(chǎn)者單消費者隊列(MPSC 隊列):在這種隊列中,多個生產(chǎn)者線程向隊列中寫入數(shù)據(jù),一個消費者線程從隊列中讀取數(shù)據(jù)。這種隊列的實現(xiàn)也可以使用原子操作或者互斥鎖來實現(xiàn)線程同步。

例如在一個日志收集系統(tǒng)中,多個日志生成線程作為生產(chǎn)者,一個日志分析線程作為消費者,使用 MPSC 隊列可以方便地管理日志數(shù)據(jù)。

4.5常見隊列形式

⑴鏈式隊列(Lock-free Linked Queue)

  • 鏈式隊列是一種基于鏈表實現(xiàn)的隊列,每個節(jié)點包含一個數(shù)據(jù)元素和一個指向下一個節(jié)點的指針。

  • 特點:可以動態(tài)地分配和釋放內(nèi)存,適用于數(shù)據(jù)量不確定或者數(shù)據(jù)大小不固定的情況。在多線程環(huán)境下,需要使用無鎖算法來避免鎖的性能損失。

  • 例如,在一個實時數(shù)據(jù)采集系統(tǒng)中,數(shù)據(jù)量可能隨時變化,使用鏈式隊列可以靈活地適應這種變化。

⑵數(shù)組隊列(Lock-free Array Queue)

  • 數(shù)組隊列是一種基于數(shù)組實現(xiàn)的隊列,它可以提高數(shù)據(jù)的讀寫效率,適用于數(shù)據(jù)量比較大且大小固定的情況。

  • 實現(xiàn)比較簡單,可以使用一個指針來記錄隊尾位置,一個指針來記錄隊頭位置。在多線程環(huán)境下,需要使用無鎖算法來避免鎖的性能損失。

  • 比如在一個圖像處理系統(tǒng)中,圖像數(shù)據(jù)的大小相對固定,使用數(shù)組隊列可以提高數(shù)據(jù)處理效率。

⑶環(huán)形隊列

  • 實現(xiàn)環(huán)形隊列的方式可以基于數(shù)組或者基于鏈表。

  • 優(yōu)點:可以有效地利用內(nèi)存空間,避免了數(shù)據(jù)的移動和浪費。在多線程環(huán)境下,也可以使用無鎖算法來實現(xiàn)高效的數(shù)據(jù)訪問。

  • 例如,在一個網(wǎng)絡數(shù)據(jù)包處理系統(tǒng)中,環(huán)形隊列可以快速地存儲和讀取數(shù)據(jù)包,提高系統(tǒng)的性能。

五、無鎖隊列的性能優(yōu)勢

5.1高效性

無鎖隊列之所以能夠避免鎖競爭和開銷,從而提高性能,主要有以下幾個原因。首先,鎖的使用會帶來上下文切換和線程調(diào)度延遲。當一個線程獲取鎖時,如果其他線程也在競爭這個鎖,那么這些線程可能會被阻塞,等待鎖的釋放。而上下文切換和線程調(diào)度需要消耗一定的時間和系統(tǒng)資源,這會降低程序的執(zhí)行效率。無鎖隊列通過使用原子操作和 CAS 等技術(shù),避免了鎖的使用,從而減少了上下文切換和線程調(diào)度的次數(shù),提高了程序的性能。

其次,無鎖隊列可以更好地利用處理器的緩存。在多線程環(huán)境下,鎖的使用可能會導致緩存一致性問題,因為多個線程可能會同時訪問和修改共享數(shù)據(jù)。為了保證數(shù)據(jù)的一致性,處理器需要進行緩存同步操作,這會降低緩存的命中率,增加內(nèi)存訪問的延遲。無鎖隊列通過使用原子操作和 CAS 等技術(shù),可以避免緩存一致性問題,提高緩存的命中率,從而減少內(nèi)存訪問的延遲,提高程序的性能。

據(jù)統(tǒng)計,在某些高并發(fā)的場景下,無鎖隊列的性能可以比有鎖隊列提高數(shù)倍甚至數(shù)十倍。

5.2線程安全

無鎖隊列在多線程環(huán)境下具有很高的安全性。這是因為無鎖隊列通過使用原子操作和 CAS 等技術(shù),確保了對隊列的操作是原子性的。原子操作是不可分割的操作,它要么全部執(zhí)行成功,要么全部執(zhí)行失敗,不會出現(xiàn)部分執(zhí)行成功的情況。CAS 操作可以確保在對隊列進行操作時,不會被其他線程干擾。如果當前內(nèi)存地址中存儲的值與期望值不一致,說明有其他線程進行了寫入操作,此時可以重新嘗試 CAS 操作,直到成功為止。

此外,無鎖隊列還可以避免死鎖問題。在有鎖隊列中,如果多個線程同時獲取鎖,并且在獲取鎖的順序上出現(xiàn)問題,就可能會導致死鎖。而無鎖隊列不需要使用鎖,自然也就避免了死鎖問題。

5.3可擴展性和低延遲

無鎖隊列具有很好的可擴展性,可以在多個處理器上并行運行。在多處理器系統(tǒng)中,無鎖隊列可以更好地利用多個處理器的資源,提高隊列的吞吐量。這是因為無鎖隊列通過使用原子操作和 CAS 等技術(shù),避免了鎖的競爭,從而可以讓多個線程在不同的處理器上同時對隊列進行操作。

無鎖隊列還具有低延遲的特點。在實時系統(tǒng)中,低延遲是非常重要的。無鎖隊列可以實現(xiàn)非阻塞式的數(shù)據(jù)訪問,從而降低隊列的延遲。這是因為無鎖隊列通過使用原子操作和 CAS 等技術(shù),可以在不阻塞其他線程的情況下,對隊列進行操作。如果當前操作無法成功,可以立即返回,而不會等待其他線程釋放鎖。這樣可以減少線程的等待時間,提高系統(tǒng)的響應速度。

例如,在一個高并發(fā)的網(wǎng)絡服務器中,無鎖隊列可以快速地處理大量的網(wǎng)絡請求,提高服務器的響應速度和吞吐量。

六、Kfifo內(nèi)核隊列

計算機科學家已經(jīng)證明,當只有一個讀線程和一個寫線程并發(fā)操作時,不需要任何額外的鎖,就可以確保是線程安全的,也即kfifo使用了無鎖編程技術(shù),以提高kernel的并發(fā)。

Linux kernel里面從來就不缺少簡潔,優(yōu)雅和高效的代碼,只是我們?nèi)鄙侔l(fā)現(xiàn)和品味的眼光。在Linux kernel里面,簡潔并不表示代碼使用神出鬼沒的超然技巧,相反,它使用的不過是大家非常熟悉的基礎數(shù)據(jù)結(jié)構(gòu),但是kernel開發(fā)者能從基礎的數(shù)據(jù)結(jié)構(gòu)中,提煉出優(yōu)美的特性。

kfifo就是這樣的一類優(yōu)美代碼,它十分簡潔,絕無多余的一行代碼,卻非常高效。

關于kfifo信息如下:

本文分析的原代碼版本: 2.6.24.4
kfifo的定義文件: kernel/kfifo.c
kfifo的頭文件: include/linux/kfifo.h

kfifo是Linux內(nèi)核的一個FIFO數(shù)據(jù)結(jié)構(gòu),采用環(huán)形循環(huán)隊列的數(shù)據(jù)結(jié)構(gòu)來實現(xiàn),提供一個無邊界的字節(jié)流服務,并且使用并行無鎖編程技術(shù),即單生產(chǎn)者單消費者場景下兩個線程可以并發(fā)操作,不需要任何加鎖行為就可以保證kfifo線程安全。

kfifo代碼既然肩負著這么多特性,那我們先一敝它的代碼:

struct kfifo {
unsigned char *buffer; /* the buffer holding the data */
unsigned int size; /* the size of the allocated buffer */
unsigned int in; /* data is added at offset (in % size) */
unsigned int out; /* data is extracted from off. (out % size) */
spinlock_t *lock; /* protects concurrent modifications */
};

這是kfifo的數(shù)據(jù)結(jié)構(gòu),kfifo主要提供了兩個操作,__kfifo_put(入隊操作)和__kfifo_get(出隊操作)。 它的各個數(shù)據(jù)成員如下:

  • buffer: 用于存放數(shù)據(jù)的緩存

  • size: buffer空間的大小,在初化時,將它向上擴展成2的冪(如5,向上擴展 與它最接近的值且是2的n次方的值是2^3,即8)

  • lock: 如果使用不能保證任何時間最多只有一個讀線程和寫線程,需要使用該lock實施同步。

  • in, out: 和buffer一起構(gòu)成一個循環(huán)隊列。 in指向buffer中隊頭,而且out指向buffer中的隊尾

它的結(jié)構(gòu)如示圖如下:

+--------------------------------------------------------------+
| |<----------data---------->| |
+--------------------------------------------------------------+
^ ^ ^
| | |
out in size

當然,內(nèi)核開發(fā)者使用了一種更好的技術(shù)處理了in, out和buffer的關系,我們將在下面進行詳細分析。

6.1kfifo功能描述

kfifo提供如下對外功能規(guī)格

  • 只支持一個讀者和一個讀者并發(fā)操作

  • 無阻塞的讀寫操作,如果空間不夠,則返回實際訪問空間

(1)kfifo_alloc 分配kfifo內(nèi)存和初始化工作

struct kfifo *kfifo_alloc(unsigned int size, gfp_t gfp_mask, spinlock_t *lock)
{
unsigned char *buffer;
struct kfifo *ret;

/*
* round up to the next power of 2, since our 'let the indices
* wrap' tachnique works only in this case.
*/
if (size & (size - 1)) {
BUG_ON(size > 0x80000000);
size = roundup_pow_of_two(size);
}

buffer = kmalloc(size, gfp_mask);
if (!buffer)
return ERR_PTR(-ENOMEM);

ret = kfifo_init(buffer, size, gfp_mask, lock);

if (IS_ERR(ret))
kfree(buffer);

return ret;
}

這里值得一提的是,kfifo->size的值總是在調(diào)用者傳進來的size參數(shù)的基礎上向2的冪擴展(roundup_pow_of_two,我自己的實現(xiàn)在文章末尾),這是內(nèi)核一貫的做法。這樣的好處不言而喻——對kfifo->size取模運算可以轉(zhuǎn)化為與運算,如下:

kfifo->in % kfifo->size 可以轉(zhuǎn)化為 kfifo->in & (kfifo->size – 1)

在kfifo_alloc函數(shù)中,使用size & (size – 1)來判斷size 是否為2冪,如果條件為真,則表示size不是2的冪,然后調(diào)用roundup_pow_of_two將之向上擴展為2的冪。

這都是常用的技巧,只不過大家沒有將它們結(jié)合起來使用而已,下面要分析的__kfifo_put和__kfifo_get則是將kfifo->size的特點發(fā)揮到了極致。

(2)__kfifo_put和__kfifo_get巧妙的入隊和出隊

__kfifo_put是入隊操作,它先將數(shù)據(jù)放入buffer里面,最后才修改in參數(shù);__kfifo_get是出隊操作,它先將數(shù)據(jù)從buffer中移走,最后才修改out。(確保即使in和out修改失敗,也可以再來一遍)

你會發(fā)現(xiàn)in和out兩者各司其職。下面是__kfifo_put和__kfifo_get的代碼

unsigned int __kfifo_put(struct kfifo *fifo,
unsigned char *buffer, unsigned int len)
{
unsigned int l;

len = min(len, fifo->size - fifo->in + fifo->out);
/*
* Ensure that we sample the fifo->out index -before- we
* start putting bytes into the kfifo.
*/
smp_mb();

/* first put the data starting from fifo->in to buffer end */
l = min(len, fifo->size - (fifo->in & (fifo->size - 1)));
memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l);

/* then put the rest (if any) at the beginning of the buffer */
memcpy(fifo->buffer, buffer + l, len - l);

/*
* Ensure that we add the bytes to the kfifo -before-
* we update the fifo->in index.
*/

smp_wmb();

fifo->in += len;

return len;
}

奇怪嗎?代碼完全是線性結(jié)構(gòu),沒有任何if-else分支來判斷是否有足夠的空間存放數(shù)據(jù)。內(nèi)核在這里的代碼非常簡潔,沒有一行多余的代碼。

l = min(len, fifo->size - (fifo->in & (fifo->size - 1)));

這個表達式計算當前寫入的空間,換成人可理解的語言就是:

l = kfifo可寫空間和預期寫入空間的最小值

(3)使用min宏來代if-else分支

__kfifo_get也應用了同樣技巧,代碼如下:

unsigned int __kfifo_get(struct kfifo *fifo,
unsigned char *buffer, unsigned int len)
{
unsigned int l;

len = min(len, fifo->in - fifo->out);
/*
* Ensure that we sample the fifo->in index -before- we
* start removing bytes from the kfifo.
*/
smp_rmb();

/* first get the data from fifo->out until the end of the buffer */
l = min(len, fifo->size - (fifo->out & (fifo->size - 1)));
memcpy(buffer, fifo->buffer + (fifo->out & (fifo->size - 1)), l);

/* then get the rest (if any) from the beginning of the buffer */
memcpy(buffer + l, fifo->buffer, len - l);

/*
* Ensure that we remove the bytes from the kfifo -before-
* we update the fifo->out index.
*/

smp_mb();

fifo->out += len;

return len;
}

認真讀兩遍吧,我也讀了多次,每次總是有新發(fā)現(xiàn),因為in, out和size的關系太巧妙了,竟然能利用上unsigned int回繞的特性。

原來,kfifo每次入隊或出隊,kfifo->in或kfifo->out只是簡單地kfifo->in/kfifo->out += len,并沒有對kfifo->size 進行取模運算。因此kfifo->in和kfifo->out總是一直增大,直到unsigned in最大值時,又會繞回到0這一起始端。但始終滿足:

kfifo->in - kfifo->out <= kfifo->size
即使kfifo->in回繞到了0的那一端,這個性質(zhì)仍然是保持的。

對于給定的kfifo:

數(shù)據(jù)空間長度為:kfifo->in - kfifo->out
而剩余空間(可寫入空間)長度為:kfifo->size - (kfifo->in - kfifo->out)

盡管kfifo->in和kfofo->out一直超過kfifo->size進行增長,但它對應在kfifo->buffer空間的下標卻是如下:

kfifo->in % kfifo->size (i.e. kfifo->in & (kfifo->size - 1))
kfifo->out % kfifo->size (i.e. kfifo->out & (kfifo->size - 1))

往kfifo里面寫一塊數(shù)據(jù)時,數(shù)據(jù)空間、寫入空間和kfifo->size的關系如果滿足:

kfifo->in % size + len > size

那就要做寫拆分了,見下圖:

kfifo_put(寫)空間開始地址
|
\_/
|XXXXXXXXXX
XXXXXXXX|
+--------------------------------------------------------------+
| |<----------data---------->| |
+--------------------------------------------------------------+
^ ^ ^
| | |
out%size in%size size
^
|
寫空間結(jié)束地址

第一塊當然是: [kfifo->in % kfifo->size, kfifo->size]
第二塊當然是:[0, len - (kfifo->size - kfifo->in % kfifo->size)]

下面是代碼,細細體味吧:

/* first put the data starting from fifo->in to buffer end */   
l = min(len, fifo->size - (fifo->in & (fifo->size - 1)));
memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l);

/* then put the rest (if any) at the beginning of the buffer */
memcpy(fifo->buffer, buffer + l, len - l);

對于kfifo_get過程,也是類似的,請各位自行分析。

(4)kfifo_get和kfifo_put無鎖并發(fā)操作

計算機科學家已經(jīng)證明,當只有一個讀經(jīng)程和一個寫線程并發(fā)操作時,不需要任何額外的鎖,就可以確保是線程安全的,也即kfifo使用了無鎖編程技術(shù),以提高kernel的并發(fā)。

kfifo使用in和out兩個指針來描述寫入和讀取游標,對于寫入操作,只更新in指針,而讀取操作,只更新out指針,可謂井水不犯河水,示意圖如下:

|<--寫入-->|
+--------------------------------------------------------------+
| |<----------data----->| |
+--------------------------------------------------------------+
|<--讀取-->|
^ ^ ^
| | |
out in size

為了避免讀者看到寫者預計寫入,但實際沒有寫入數(shù)據(jù)的空間,寫者必須保證以下的寫入順序:

往[kfifo->in, kfifo->in + len]空間寫入數(shù)據(jù)
更新kfifo->in指針為 kfifo->in + len

在操作1完成時,讀者是還沒有看到寫入的信息的,因為kfifo->in沒有變化,認為讀者還沒有開始寫操作,只有更新kfifo->in之后,讀者才能看到。

那么如何保證1必須在2之前完成,秘密就是使用內(nèi)存屏障:smp_mb(),smp_rmb(), smp_wmb(),來保證對方觀察到的內(nèi)存操作順序。

6.2kfifo內(nèi)核隊列實現(xiàn)

kfifo數(shù)據(jù)結(jié)構(gòu)定義如下:

struct kfifo
{
unsigned char *buffer;
unsigned int size;
unsigned int in;
unsigned int out;
spinlock_t *lock;
};

// 創(chuàng)建隊列
struct kfifo *kfifo_init(unsigned char *buffer, unsigned int size, gfp_t gfp_mask, spinlock_t *lock)
{
struct kfifo *fifo;
// 判斷是否為2的冪
BUG_ON(!is_power_of_2(size));
fifo = kmalloc(sizeof(struct kfifo), gfp_mask);
if (!fifo)
return ERR_PTR(-ENOMEM);
fifo->buffer = buffer;
fifo->size = size;
fifo->in = fifo->out = 0;
fifo->lock = lock;

return fifo;
}

// 分配空間
struct kfifo *kfifo_alloc(unsigned int size, gfp_t gfp_mask, spinlock_t *lock)
{
unsigned char *buffer;
struct kfifo *ret;
// 判斷是否為2的冪
if (!is_power_of_2(size))
{
BUG_ON(size > 0x80000000);
// 向上擴展成2的冪
size = roundup_pow_of_two(size);
}

buffer = kmalloc(size, gfp_mask);
if (!buffer)
return ERR_PTR(-ENOMEM);
ret = kfifo_init(buffer, size, gfp_mask, lock);

if (IS_ERR(ret))
kfree(buffer);
return ret;
}

void kfifo_free(struct kfifo *fifo)
{
kfree(fifo->buffer);
kfree(fifo);
}

// 入隊操作
static inline unsigned int kfifo_put(struct kfifo *fifo, const unsigned char *buffer, unsigned int len)
{
unsigned long flags;
unsigned int ret;
spin_lock_irqsave(fifo->lock, flags);
ret = __kfifo_put(fifo, buffer, len);
spin_unlock_irqrestore(fifo->lock, flags);
return ret;
}

// 出隊操作
static inline unsigned int kfifo_get(struct kfifo *fifo, unsigned char *buffer, unsigned int len)
{
unsigned long flags;
unsigned int ret;
spin_lock_irqsave(fifo->lock, flags);
ret = __kfifo_get(fifo, buffer, len);
//當fifo->in == fifo->out時,buufer為空
if (fifo->in == fifo->out)
fifo->in = fifo->out = 0;

spin_unlock_irqrestore(fifo->lock, flags);
return ret;
}

// 入隊操作
unsigned int __kfifo_put(struct kfifo *fifo, const unsigned char *buffer, unsigned int len)
{
unsigned int l;
//buffer中空的長度
len = min(len, fifo->size - fifo->in + fifo->out);
// 內(nèi)存屏障:smp_mb(),smp_rmb(), smp_wmb()來保證對方觀察到的內(nèi)存操作順序
smp_mb();
// 將數(shù)據(jù)追加到隊列尾部
l = min(len, fifo->size - (fifo->in & (fifo->size - 1)));
memcpy(fifo->buffer + (fifo->in & (fifo->size - 1)), buffer, l);
memcpy(fifo->buffer, buffer + l, len - l);

smp_wmb();
//每次累加,到達最大值后溢出,自動轉(zhuǎn)為0
fifo->in += len;
return len;
}

// 出隊操作
unsigned int __kfifo_get(struct kfifo *fifo, unsigned char *buffer, unsigned int len)
{
unsigned int l;
//有數(shù)據(jù)的緩沖區(qū)的長度
len = min(len, fifo->in - fifo->out);
smp_rmb();
l = min(len, fifo->size - (fifo->out & (fifo->size - 1)));
memcpy(buffer, fifo->buffer + (fifo->out & (fifo->size - 1)), l);
memcpy(buffer + l, fifo->buffer, len - l);
smp_mb();
fifo->out += len; //每次累加,到達最大值后溢出,自動轉(zhuǎn)為0

return len;
}

static inline void __kfifo_reset(struct kfifo *fifo)
{
fifo->in = fifo->out = 0;
}

static inline void kfifo_reset(struct kfifo *fifo)
{
unsigned long flags;
spin_lock_irqsave(fifo->lock, flags);
__kfifo_reset(fifo);
spin_unlock_irqrestore(fifo->lock, flags);
}

static inline unsigned int __kfifo_len(struct kfifo *fifo)
{
return fifo->in - fifo->out;
}

static inline unsigned int kfifo_len(struct kfifo *fifo)
{
unsigned long flags;
unsigned int ret;
spin_lock_irqsave(fifo->lock, flags);
ret = __kfifo_len(fifo);
spin_unlock_irqrestore(fifo->lock, flags);
return ret;
}

6.3kfifo設計要點

(1)保證buffer size為2的冪

kfifo->size值在調(diào)用者傳遞參數(shù)size的基礎上向2的冪擴展,目的是使kfifo->size取模運算可以轉(zhuǎn)化為位與運算(提高運行效率)。kfifo->in % kfifo->size轉(zhuǎn)化為 kfifo->in & (kfifo->size – 1)

保證size是2的冪可以通過位運算的方式求余,在頻繁操作隊列的情況下可以大大提高效率。

(2)使用spin_lock_irqsave與spin_unlock_irqrestore 實現(xiàn)同步。

Linux內(nèi)核中有spin_lock、spin_lock_irq和spin_lock_irqsave保證同步。

static inline void __raw_spin_lock(raw_spinlock_t *lock)
{
preempt_disable();
spin_acquire(&lock->dep_map, 0, 0, _RET_IP_);
LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);
}

static inline void __raw_spin_lock_irq(raw_spinlock_t *lock)
{
local_irq_disable();
preempt_disable();
spin_acquire(&lock->dep_map, 0, 0, _RET_IP_);
LOCK_CONTENDED(lock, do_raw_spin_trylock, do_raw_spin_lock);
}

spin_lock比spin_lock_irq速度快,但并不是線程安全的。spin_lock_irq增加調(diào)用local_irq_disable函數(shù),即禁止本地中斷,是線程安全的,既禁止本地中斷,又禁止內(nèi)核搶占。

spin_lock_irqsave是基于spin_lock_irq實現(xiàn)的一個輔助接口,在進入和離開臨界區(qū)后,不會改變中斷的開啟、關閉狀態(tài)。

如果自旋鎖在中斷處理函數(shù)中被用到,在獲取自旋鎖前需要關閉本地中斷,spin_lock_irqsave實現(xiàn)如下:

  • A、保存本地中斷狀態(tài);

  • B、關閉本地中斷;

  • C、獲取自旋鎖。

解鎖時通過 spin_unlock_irqrestore完成釋放鎖、恢復本地中斷到原來狀態(tài)等工作。

(3)線性代碼結(jié)構(gòu)

代碼中沒有任何if-else分支來判斷是否有足夠的空間存放數(shù)據(jù),kfifo每次入隊或出隊只是簡單的 +len 判斷剩余空間,并沒有對kfifo->size 進行取模運算,所以kfifo->in和kfifo->out總是一直增大,直到unsigned in超過最大值時繞回到0這一起始端,但始終滿足:kfifo->in - kfifo->out <= kfifo->size。

(4)使用Memory Barrier

  • mb():適用于多處理器和單處理器的內(nèi)存屏障。

  • rmb():適用于多處理器和單處理器的讀內(nèi)存屏障。

  • wmb():適用于多處理器和單處理器的寫內(nèi)存屏障。

  • smp_mb():適用于多處理器的內(nèi)存屏障。

  • smp_rmb():適用于多處理器的讀內(nèi)存屏障。

  • smp_wmb():適用于多處理器的寫內(nèi)存屏障。

Memory Barrier使用場景如下:

  • A、實現(xiàn)同步原語(synchronization primitives)

  • B、實現(xiàn)無鎖數(shù)據(jù)結(jié)構(gòu)(lock-free data structures)

  • C、驅(qū)動程序

程序在運行時內(nèi)存實際訪問順序和程序代碼編寫的訪問順序不一定一致,即內(nèi)存亂序訪問。內(nèi)存亂序訪問行為出現(xiàn)是為了提升程序運行時的性能。內(nèi)存亂序訪問主要發(fā)生在兩個階段:

  • A、編譯時,編譯器優(yōu)化導致內(nèi)存亂序訪問(指令重排)。

  • B、運行時,多CPU間交互引起內(nèi)存亂序訪問。

Memory Barrier能夠讓CPU或編譯器在內(nèi)存訪問上有序。Memory barrier前的內(nèi)存訪問操作必定先于其后的完成。Memory Barrier包括兩類:

  • A、編譯器Memory Barrier。

  • B、CPU Memory Barrier。

通常,編譯器和CPU引起內(nèi)存亂序訪問不會帶來問題,但如果程序邏輯的正確性依賴于內(nèi)存訪問順序,內(nèi)存亂序訪問會帶來邏輯上的錯誤。

在編譯時,編譯器對代碼做出優(yōu)化時可能改變實際執(zhí)行指令的順序(如GCC的O2或O3都會改變實際執(zhí)行指令的順序)。

在運行時,CPU雖然會亂序執(zhí)行指令,但在單個CPU上,硬件能夠保證程序執(zhí)行時所有的內(nèi)存訪問操作都是按程序代碼編寫的順序執(zhí)行的,Memory Barrier沒有必要使用(不考慮編譯器優(yōu)化)。為了更快執(zhí)行指令,CPU采取流水線的執(zhí)行方式,編譯器在編譯代碼時為了使指令更適合CPU的流水線執(zhí)行方式以及多CPU執(zhí)行,原本指令就會出現(xiàn)亂序的情況。在亂序執(zhí)行時,CPU真正執(zhí)行指令的順序由可用的輸入數(shù)據(jù)決定,而非程序員編寫的順序。

七、總結(jié)

C++ 無鎖隊列在多線程編程中占據(jù)著舉足輕重的地位。它不僅是解決多核心優(yōu)化問題的關鍵技術(shù)之一,也是邁向高效多線程編程的重要基石。

在多線程編程的實際應用中,無鎖隊列的性能優(yōu)勢使其在各種場景下都能發(fā)揮出色的作用。無論是游戲開發(fā)、高性能計算、并發(fā)編程還是高并發(fā)網(wǎng)絡編程,無鎖隊列都能為程序提供高效的數(shù)據(jù)處理和同步機制。

從單生產(chǎn)者單消費者隊列到多生產(chǎn)者多消費者隊列,不同的場景需求都能找到合適的無鎖隊列實現(xiàn)方式。鏈式隊列、數(shù)組隊列和環(huán)形隊列等多種形式的無鎖隊列,為開發(fā)者提供了豐富的選擇,以適應不同的數(shù)據(jù)量和應用場景。

無鎖隊列的高效性、線程安全性、可擴展性和低延遲等特點,使其成為處理高并發(fā)任務的理想選擇。它避免了鎖競爭和死鎖問題,減少了上下文切換和線程調(diào)度的開銷,提高了緩存的命中率,從而極大地提升了程序的性能。

隨著多核心處理器的普及和高并發(fā)應用的不斷增加,C++ 無鎖隊列的應用前景將更加廣闊。開發(fā)者可以繼續(xù)深入研究和優(yōu)化無鎖隊列的實現(xiàn),以滿足不斷增長的性能需求和復雜的應用場景。相信在未來的多線程編程領域,C++ 無鎖隊列將繼續(xù)發(fā)揮重要作用,為構(gòu)建高效、可靠的多線程應用程序提供有力支持。

    轉(zhuǎn)藏 分享 獻花(0

    0條評論

    發(fā)表

    請遵守用戶 評論公約

    類似文章 更多