无锁队列以及内存屏障.

由于在工作中需要用到无锁队列,故在此对其做一下总结。

rabbitMQ、activeMQ、zeroMQ、Kafka、Redis 比较 

乐观锁和悲观锁

无锁队列的几种实现及其性能对比

C++性能优化(十三)——无锁队列

迈向多线程——解析无锁队列的原理与实现

一、无锁队列用在什么样的场景? 

当需要处理的数据非常多,比如行情数据,一秒处理非常多的数据的时候,可以考虑用无锁队列。但是如果一秒只需要处理几百或者几千的数据,是没有必要考虑用无锁队列的。用互斥锁就能解决问题,数据量相对少的时候互斥锁与无锁队列之间差别并不是很明显。

二、为什么要用无锁队列?有锁队列会有哪些问题?

  1. Cache的损坏,在线程间频繁切换的时候会导致 Cache 中数据的丢失; CPU 的运行速度比主存快 N 倍,所以大量的处理器时间被浪费在处理器与主存的数据传输上,这就是在处理器与主存之间引入 Cache 的原因。Cache 是一种速度更快但容量更小的内存,当处理器要访问主存中的数据时,这些数据首先要被拷贝到 Cache 中,因为这些数据在不久的将来可能又会被处理器访问。Cache misses 对性能有非常大的影响,因为处理器访问 Cache 中的数据将比直接访问主存快得多。 线程被频繁抢占产生的 Cache 损坏将导致应用程序性能下降。
  2. 在同步机制上争抢队列; CPU 会将大量的时间浪费在保护队列数据的互斥锁,而不是处理队列中的数据。 然后非阻塞的机制使用了 CAS 的特殊操作,使得任务之间可以不争抢任何资源,然后在队列中预定的位置上,插入或者提取数据。
  3. 多线程动态内存分配性能下降; 多线程同时分配内存时,会涉及到线程分配同一块相同地址内存的问题,这个时候会用锁来进行同步。显然频繁分配内存会导致应用程序性能下降。

三、无锁队列的实现

3.1 简单的无锁队列的实现

通过CAS操作,_head.compare_exchange_strong(node->_next, node)比较_head包含的内容与第一个入参node->_next是否相同:如果相同,则将_head包含的内容替换为node;否则,将node->_next替换为_head包含的内容。需要注意的是,比较方式是通过内存进行比较的:The comparison and copying are bitwise (similar to std::memcmp and std::memcpy); no constructor, assignment operator, or comparison operator are used.

template <typename T>
struct LockFreeStackT
{
    struct Node
    {
        T _val;
        Node* _next;
    };
    LockFreeStackT() : _head(nullptr) {}
    void push(const T& val) {
        Node* node = new Node{val, _head.load()};
        while (!_head.compare_exchange_strong(node->_next, node)) {
        }
    }
    void pop() {
        Node* node = _head.load();
        while (node && !_head.compare_exchange_strong(node, node->_next)) {
        }
        if (node) delete node;
    }
   
private:
    std::atomic<Node*> _head;
};

上述代码的逻辑在于

  1. 如果新元素的next和栈顶一样,证明在你之前没人操作它,使用新元素替换栈顶退出即可;
  2. 如果不一样,证明在你之前已经有人操作它,栈顶已发生改变,该函数会自动更新新元素的next值为改变后的栈顶;然后继续循环检测直到状态1成立退出; GCC编译的时候记得加上-latomic这个库,(汗。。。)。

3.2 ABA问题

如下图所示,加入栈上有两个元素A和B,其地址分布为0x114以及0x514。 当线程1执行Pop操作的时候, 其刚获取到node,指向A,node->next指向B。之后Cpu切换到线程2,Pop出A和B,并PushC,因为操作系统的内存分配机制会重复使用之前释放的内存,恰好push进去的内存地址和A一样,因此导致在步骤5的时候,会将栈顶指向已经被释放的B的地址。

3.3 ABA问题的解决

ABA问题有两种思路:

  1. 使用环形缓冲区,其实就是预先分配了内存数组,避免内存重复分配。例如DPDK的rte_ring就是这种实现方式。
  2. 为地址增加引用计数,使用双倍大小的头指针,在原指针后面增加一个计数器,每次操作将计数器加1。这样即使出现ABA问题,由于计数器对不上,CAS也就不会通过了。 下面代码简单的实现了第二种方式。
    template <typename T>
    class LockFreeStackT
    {
        struct Node
        {
            T _val;
            Node* _next;
            Node(const T& val, Node* next)
                : _val(val)
                , _next(next)
            {}
        };
        struct TaggedPointer
        {
            Node* ptr;
            size_t tag;
            //size_t tag1;
            TaggedPointer() {}
            TaggedPointer(Node* _ptr, size_t _tag)
            : ptr(_ptr)
            , tag(_tag)
            {}
        };
    public:
        void push(const T& val) {
            TaggedPointer o = _head.load();
            TaggedPointer n(new Node(val, o.ptr), o.tag + 1);
            while (!_head.compare_exchange_strong(o, n)) {
                n.ptr->_next = o.ptr;
                n.tag = o.tag + 1;
            }
        }
        void pop() {
            TaggedPointer o = _head.load(), n;
           
            do {
                if (!o.ptr) return;
                n.ptr = o.ptr->_next;
                n.tag = o.tag + 1;
            } while(!_head.compare_exchange_strong(o, n));      
        }
    private:
        std::atomic<TaggedPointer> _head = {TaggedPointer(nullptr, 0)};
    };
    

四、内存屏障

彻底搞懂内存屏障(上)

JMM的前世今生

Share: X (Twitter) Facebook LinkedIn