由于在工作中需要用到无锁队列,故在此对其做一下总结。
rabbitMQ、activeMQ、zeroMQ、Kafka、Redis 比较
一、无锁队列用在什么样的场景?
当需要处理的数据非常多,比如行情数据,一秒处理非常多的数据的时候,可以考虑用无锁队列。但是如果一秒只需要处理几百或者几千的数据,是没有必要考虑用无锁队列的。用互斥锁就能解决问题,数据量相对少的时候互斥锁与无锁队列之间差别并不是很明显。
二、为什么要用无锁队列?有锁队列会有哪些问题?
Cache
的损坏,在线程间频繁切换的时候会导致Cache
中数据的丢失;CPU
的运行速度比主存快N
倍,所以大量的处理器时间被浪费在处理器与主存的数据传输上,这就是在处理器与主存之间引入Cache
的原因。Cache
是一种速度更快但容量更小的内存,当处理器要访问主存中的数据时,这些数据首先要被拷贝到Cache
中,因为这些数据在不久的将来可能又会被处理器访问。Cache misses
对性能有非常大的影响,因为处理器访问Cache
中的数据将比直接访问主存快得多。 线程被频繁抢占产生的 Cache 损坏将导致应用程序性能下降。- 在同步机制上争抢队列;
CPU
会将大量的时间浪费在保护队列数据的互斥锁,而不是处理队列中的数据。 然后非阻塞的机制使用了CAS
的特殊操作,使得任务之间可以不争抢任何资源,然后在队列中预定的位置上,插入或者提取数据。 - 多线程动态内存分配性能下降; 多线程同时分配内存时,会涉及到线程分配同一块相同地址内存的问题,这个时候会用锁来进行同步。显然频繁分配内存会导致应用程序性能下降。
三、无锁队列的实现
3.1 简单的无锁队列的实现
通过CAS操作,_head.compare_exchange_strong(node->_next, node)
比较_head
包含的内容与第一个入参node->_next
是否相同:如果相同,则将_head
包含的内容替换为node
;否则,将node->_next
替换为_head
包含的内容。需要注意的是,比较方式是通过内存进行比较
的:The comparison and copying are bitwise (similar to std::memcmp and std::memcpy); no constructor, assignment operator, or comparison operator are used.
template <typename T>
struct LockFreeStackT
{
struct Node
{
T _val;
Node* _next;
};
LockFreeStackT() : _head(nullptr) {}
void push(const T& val) {
Node* node = new Node{val, _head.load()};
while (!_head.compare_exchange_strong(node->_next, node)) {
}
}
void pop() {
Node* node = _head.load();
while (node && !_head.compare_exchange_strong(node, node->_next)) {
}
if (node) delete node;
}
private:
std::atomic<Node*> _head;
};
上述代码的逻辑在于
- 如果新元素的next和栈顶一样,证明在你之前没人操作它,使用新元素替换栈顶退出即可;
- 如果不一样,证明在你之前已经有人操作它,栈顶已发生改变,该函数会自动更新新元素的next值为改变后的栈顶;然后继续循环检测直到状态1成立退出;
GCC编译的时候记得加上
-latomic
这个库,(汗。。。)。
3.2 ABA问题
如下图所示,加入栈上有两个元素A和B,其地址分布为0x114以及0x514。
当线程1执行Pop
操作的时候, 其刚获取到node,指向A,node->next指向B。之后Cpu切换到线程2,Pop
出A和B,并Push
入C
,因为操作系统的内存分配机制会重复使用之前释放的内存,恰好push进去的内存地址和A一样,因此导致在步骤5的时候,会将栈顶指向已经被释放的B的地址。
3.3 ABA问题的解决
ABA问题有两种思路:
- 使用环形缓冲区,其实就是预先分配了内存数组,避免内存重复分配。例如DPDK的rte_ring就是这种实现方式。
- 为地址增加引用计数,使用双倍大小的头指针,在原指针后面增加一个计数器,每次操作将计数器加1。这样即使出现ABA问题,由于计数器对不上,CAS也就不会通过了。
下面代码简单的实现了第二种方式。
template <typename T> class LockFreeStackT { struct Node { T _val; Node* _next; Node(const T& val, Node* next) : _val(val) , _next(next) {} }; struct TaggedPointer { Node* ptr; size_t tag; //size_t tag1; TaggedPointer() {} TaggedPointer(Node* _ptr, size_t _tag) : ptr(_ptr) , tag(_tag) {} }; public: void push(const T& val) { TaggedPointer o = _head.load(); TaggedPointer n(new Node(val, o.ptr), o.tag + 1); while (!_head.compare_exchange_strong(o, n)) { n.ptr->_next = o.ptr; n.tag = o.tag + 1; } } void pop() { TaggedPointer o = _head.load(), n; do { if (!o.ptr) return; n.ptr = o.ptr->_next; n.tag = o.tag + 1; } while(!_head.compare_exchange_strong(o, n)); } private: std::atomic<TaggedPointer> _head = {TaggedPointer(nullptr, 0)}; };