关于无锁队列的说明请参考陈皓的博客上的一篇关于无锁队列的实现的理论说明(coolshell)
分享使用环形数组实现的无锁队列的c++实现代码。
单生产者-单消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 |
#ifndef WATER_BASE_LOCK_FREE_CIRCULAR_QUEUE_SS_HPP #define WATER_BASE_LOCK_FREE_CIRCULAR_QUEUE_SS_HPP #include "class_helper.h" #include <vector> #include <atomic> namespace water{ namespace componet{ template <typename T> class LockFreeCircularQueueSPSC final //不可作为基类 { struct Cell { enum class Status : uint_fast32_t { empty, full, }; Cell() : status(Status::empty), t() { } std::atomic<Status> status; T t; }; public: TYPEDEF_PTR(LockFreeCircularQueueSPSC); explicit LockFreeCircularQueueSPSC(uint32_t powArg = 10) //队列长度为 2^powArg : m_begin(0), m_end(0), m_maxSize(1u << (powArg < 24 ? powArg : 24)), m_data(m_maxSize) { } ~LockFreeCircularQueueSPSC() = default; //noncopyable LockFreeCircularQueueSPSC(const LockFreeCircularQueueSPSC&) = delete; LockFreeCircularQueueSPSC& operator=(const LockFreeCircularQueueSPSC&) = delete; bool isLockFree() const { return m_data[0].status.is_lock_free(); } bool push(const T& item) { const uint_fast32_t pos = realPos(m_end); int i = 0; while(true) { if(++i == 3) //最多尝试3次数依然队列满,返回失败,无法放入 return false; //尾部为空,即队列非满,可以push if(m_data[pos].status.load(std::memory_order_acquire) == Cell::Status::empty) break; }; //队尾后移 m_end++; //向节点填入数据 m_data[pos].t = item; //标记为已放入并将提交内存修改,令所有线程可见 m_data[pos].status.store(Cell::Status::full, std::memory_order_release); return true; } bool pop(T* t) { const uint_fast32_t pos = realPos(m_begin); int i = 0; while(true) { if(i++ == 3) //最多尝试3次数依然队列空,返回失败,无法取出 return false; //pos(头部)位置非空,即队列非空,可以pop if(m_data[pos].status.load(std::memory_order_acquire) == Cell::Status::full) break; } //队首后移 m_begin++; //取出节点内的数据 *t = m_data[pos].t; m_data[pos].t = T(); //标记为已取出并将提交内存修改,令所有线程可见 m_data[pos].status.store(Cell::Status::empty, std::memory_order_release); return true; } bool empty() const { return m_data[realPos(m_begin)].status.load(std::memory_order_relaxed) == Cell::Status::empty; } bool full() const { return m_data[realPos(m_end)].status.load(std::memory_order_relaxed) == Cell::Status::full; } uint64_t maxSize() const { return static_cast<uint64_t>(m_data.size()); } private: uint64_t realPos(uint64_t pos) const { return pos & (maxSize() - 1); } private: uint_fast32_t m_begin; uint_fast32_t m_end; const uint_fast32_t m_maxSize; std::vector<Cell> m_data; }; }} #endif //#ifndef WATER_BASE_CIRCULAR_QUEUE_HPP |
多生产者多消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 |
#ifndef WATER_BASE_M_LOCK_FREE_CIRCULAR_QUEUE_MM_HPP #define WATER_BASE_M_LOCK_FREE_CIRCULAR_QUEUE_MM_HPP #include <vector> #include <atomic> #include <iostream> using std::cerr; using std::endl; namespace water{ namespace componet{ template <typename T> class LockFreeCircularQueueMPMC final //不可作为基类 { struct Cell { enum class Status : uint8_t { writing, reading, empty, full, }; Cell() : status(Status::empty) ,t() { } std::atomic<Status> status; T t; }; public: explicit LockFreeCircularQueueMPMC(uint64_t powArg = 16) : m_begin(0), m_end(0), m_maxSize(1u << (powArg < 24 ? powArg : 24)), m_data(m_maxSize) { } ~LockFreeCircularQueueMPMC() = default; //noncopyable LockFreeCircularQueueMPMC(const LockFreeCircularQueueMPMC&) = delete; LockFreeCircularQueueMPMC& operator=(const LockFreeCircularQueueMPMC&) = delete; bool isLockFree() const { return m_data[0].status.is_lock_free(); } bool push(const T& item) { int i = 0; while(true) //尝试锁定队尾为可写 { if(i++ == 5) //最大尝试次数 return false; uint32_t oldEnd = m_end.load(std::memory_order_acquire);//队尾 uint32_t index = realIndex(oldEnd); typename Cell::Status oldStatus = m_data[index].status.load(std::memory_order_acquire); if(oldStatus != Cell::Status::empty) continue; if(!m_data[index].status.compare_exchange_weak(oldStatus, Cell::Status::writing)) continue; //队尾后移 if(!m_end.compare_exchange_weak(oldEnd, oldEnd + 1)) { m_data[index].status.store(Cell::Status::empty, std::memory_order_relaxed); continue; } //数据放入队列 m_data[index].t = item; m_data[index].status.store(Cell::Status::full, std::memory_order_release); break; } return true; } bool pop(T* t) { int i = 1; while(true) //尝试锁定队首为可读 { if(i++ == 5) //最大尝试次数 return false; uint32_t oldBegin = m_begin.load(std::memory_order_acquire);//队首 uint_fast32_t index = realIndex(oldBegin); typename Cell::Status oldStatus = m_data[index].status.load(std::memory_order_acquire); if(oldStatus != Cell::Status::full) continue; if(!m_data[index].status.compare_exchange_weak(oldStatus, Cell::Status::reading)) continue; //队首后移 if(!m_begin.compare_exchange_weak(oldBegin, oldBegin + 1)) { m_data[index].status.store(Cell::Status::full, std::memory_order_relaxed); continue; } //取出节结点内的数据 *t = m_data[index].t; m_data[index].t = T(); m_data[index].status.store(Cell::Status::empty, std::memory_order_release); break; } return true; } bool empty() const { return m_data[realIndex(m_begin.load(std::memory_order_relaxed))].status.load(std::memory_order_relaxed) == Cell::Status::empty; } bool full() const { return m_data[realIndex(m_end.load(std::memory_order_relaxed))].status.load(std::memory_order_relaxed) == Cell::Status::full; } inline uint32_t maxSize() const { return m_maxSize; } private: inline uint32_t realIndex(uint64_t index) const { return index & (maxSize() - 1); } private: std::atomic<uint32_t> m_begin; std::atomic<uint32_t> m_end; const uint_fast32_t m_maxSize; std::vector<Cell> m_data; }; }} #endif //#ifndef WATER_BASE_CIRCULAR_QUEUE_HPP |
嗨,这是一条评论。
要开始审核、编辑及删除评论,请访问仪表盘的“评论”页面。
评论者头像来自Gravatar。
[…] 原文链接: http://chenshungen.cn/lock_free_queue/ […]
m_head和m_head一直加会发生溢出吧,应该处理下
你这个不是无锁队列,是用原子操作实现的互斥锁
看错了,是无锁