百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术资源 > 正文

如何利用CAS技术实现无锁队列

off999 2025-03-11 19:46 24 浏览 0 评论

linux服务器开发相关视频解析:

linux后台开发面试必备技能——锁,原子操作,CAS

linux多线程之epoll原理剖析与reactor原理及应用

关于CAS等原子操作

在开始说无锁队列之前,我们需要知道一个很重要的技术就是CAS操作——Compare & Set,或是 Compare & Swap,现在几乎所有的CPU指令都支持CAS的原子操作,X86下对应的是 CMPXCHG 汇编指令。有了这个原子操作,我们就可以用其来实现各种无锁(lock free)的数据结构。

这个操作用C语言来描述就是下面这个样子:意思就是说,看一看内存*reg里的值是不是oldval,如果是的话,则对其赋值newval。

int compare_and_swap (int* reg, int oldval, int newval)
{
  int old_reg_val = *reg;
  if (old_reg_val == oldval) {
     *reg = newval;
  }
  return old_reg_val;
}

我们可以看到,old_reg_val 总是返回,于是,我们可以在 compare_and_swap 操作之后对其进行测试,以查看它是否与 oldval相匹配,因为它可能有所不同,这意味着另一个并发线程已成功地竞争到 compare_and_swap 并成功将 reg 值从 oldval 更改为别的值了。

这个操作可以变种为返回bool值的形式(返回 bool值的好处在于,可以调用者知道有没有更新成功):

bool compare_and_swap (int *addr, int oldval, int newval)
{
  if ( *addr != oldval ) {
      return false;
  }
  *addr = newval;
  return true;
}

与CAS相似的还有下面的原子操作:

  • Fetch And Add,一般用来对变量做 +1 的原子操作
  • Test-and-set,写值到某个内存位置并传回其旧值。汇编指令BST
  • Test and Test-and-set,用来低低Test-and-Set的资源争夺情况

:在实际的C/C++程序中,CAS的各种实现版本如下:

1)GCC的CAS

GCC4.1+版本中支持CAS的原子操作

bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...)
type __sync_val_compare_and_swap (type *ptr, type oldval type newval, ...)

2)Windows的CAS

在Windows下,你可以使用下面的Windows API来完成CAS:

 InterlockedCompareExchange ( __inout LONG volatile *Target,
                                 __in LONG Exchange,
                                 __in LONG Comperand);

3) C++11中的CAS

C++11中的STL中的atomic类的函数可以让你跨平台。

template< class t>
bool atomic_compare_exchange_weak( std::atomic* obj,
                                   T* expected, T desired );
template< class t>
bool atomic_compare_exchange_weak( volatile std::atomic* obj,
                                   T* expected, T desired );

无锁队列的链表实现

初始化一个队列的代码很简,初始化一个dummy结点(注:在链表操作中,使用一个dummy结点,可以少掉很多边界条件的判断),如下所示:

InitQueue(Q)
{
    node = new node()
    node->next = NULL;
    Q->head = Q->tail = node;
}

我们先来看一下进队列用CAS实现的方式,基本上来说就是链表的两步操作:

第一步,把tail指针的next指向要加入的结点。tail->next = p;

第二步,把tail指针移到队尾。tail = p;

EnQueue(Q, data) //进队列
{
    //准备新加入的结点数据
    n = new node();
    n->value = data;
    n->next = NULL;
    do {
        p = Q->tail; //取链表尾指针的快照
    } while( CAS(p->next, NULL, n) != TRUE); 
    //while条件注释:如果没有把结点链在尾指针上,再试
    CAS(Q->tail, p, n); //置尾结点 tail = n;
}

我们可以看到,程序中的那个 do-while 的 Retry-Loop 中的 CAS 操作:如果 p->next 是 NULL,那么,把新结点 n 加到队尾。如果不成功,则重新再来一次!

就是说,很有可能我在准备在队列尾加入结点时,别的线程已经加成功了,于是tail指针就变了,于是我的CAS返回了false,于是程序再试,直到试成功为止。这个很像我们的抢电话热线的不停重播的情况。

【文章福利】需要C/C++ Linux服务器架构师学习资料加群812855908(资料包括C/C++,Linux,golang技术,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK,ffmpeg等)

但是你会看到,为什么我们的“置尾结点”的操作不判断是否成功,因为:

  • 如果有一个线程T1,它的while中的CAS如果成功的话,那么其它所有的 随后线程的CAS都会失败,然后就会再循环,
  • 此时,如果T1 线程还没有更新tail指针,其它的线程继续失败,因为tail->next不是NULL了。
  • 直到T1线程更新完 tail 指针,于是其它的线程中的某个线程就可以得到新的 tail 指针,继续往下走了。
  • 所以,只要线程能从 while 循环中退出来,意味着,它已经“独占”了,tail 指针必然可以被更新。
  • 这里有一个潜在的问题——如果T1线程在用CAS更新tail指针的之前,线程停掉或是挂掉了,那么其它线程就进入死循环了。下面是改良版的EnQueue()
EnQueue(Q, data) //进队列改良版 v1
{
    n = new node();
    n->value = data;
    n->next = NULL;
    p = Q->tail;
    oldp = p
    do {
        while (p->next != NULL)
            p = p->next;
    } while( CAS(p.next, NULL, n) != TRUE); //如果没有把结点链在尾上,再试
    CAS(Q->tail, oldp, n); //置尾结点
}

我们让每个线程,自己fetch 指针 p 到链表尾。但是这样的fetch会很影响性能。而且,如果一个线程不断的EnQueue,会导致所有的其它线程都去 fetch 他们的 p 指针到队尾,能不能不要所有的线程都干同一个事?这样可以节省整体的时间?

比如:直接 fetch Q->tail 到队尾?因为,所有的线程都共享着 Q->tail,所以,一旦有人动了它后,相当于其它的线程也跟着动了,于是,我们的代码可以改进成如下的实现:

EnQueue(Q, data) //进队列改良版 v2 
{
    n = new node();
    n->value = data;
    n->next = NULL;
    while(TRUE) {
        //先取一下尾指针和尾指针的next
        tail = Q->tail;
        next = tail->next;
        //如果尾指针已经被移动了,则重新开始
        if ( tail != Q->tail ) continue;
        //如果尾指针的 next 不为NULL,则 fetch 全局尾指针到next
        if ( next != NULL ) {
            CAS(Q->tail, tail, next);
            continue;
        }
        //如果加入结点成功,则退出
        if ( CAS(tail->next, next, n) == TRUE ) break;
    }
    CAS(Q->tail, tail, n); //置尾结点
}

上述的代码还是很清楚的,相信你一定能看懂,而且,这也是 Java 中的 ConcurrentLinkedQueue 的实现逻辑,当然,我上面的这个版本比 Java 的好一点,因为没有 if 嵌套,嘿嘿。

好了,我们解决了EnQueue,我们再来看看DeQueue的代码:(很简单,我就不解释了)

DeQueue(Q) //出队列
{
    do{
        p = Q->head;
        if (p->next == NULL){
            return ERR_EMPTY_QUEUE;
        }
    while( CAS(Q->head, p, p->next) != TRUE );
    return p->next->value;
}

我们可以看到,DeQueue的代码操作的是 head->next,而不是 head 本身。这样考虑是因为一个边界条件,我们需要一个dummy的头指针来解决链表中如果只有一个元素,head 和 tail 都指向同一个结点的问题,这样 EnQueue 和 DeQueue 要互相排斥了。

但是,如果 head 和 tail 都指向同一个结点,这意味着队列为空,应该返回 ERR_EMPTY_QUEUE,但是,在判断 p->next == NULL 时,另外一个EnQueue操作做了一半,此时的 p->next 不为 NULL了,但是 tail 指针还差最后一步,没有更新到新加的结点,这个时候就会出现,在 EnQueue 并没有完成的时候, DeQueue 已经把新增加的结点给取走了,此时,队列为空,但是,head 与 tail 并没有指向同一个结点。如下所示:

虽然,EnQueue的函数会把 tail 指针置对,但是,这种情况可能还是会导致一些并发问题,所以,严谨来说,我们需要避免这种情况。于是,我们需要加入更多的判断条件,还确保这个问题。下面是相关的改进代码:

DeQueue(Q) //出队列,改进版
{
    while(TRUE) {
        //取出头指针,尾指针,和第一个元素的指针
        head = Q->head;
        tail = Q->tail;
        next = head->next;
        // Q->head 指针已移动,重新取 head指针
        if ( head != Q->head ) continue;
        
        // 如果是空队列
        if ( head == tail && next == NULL ) {
            return ERR_EMPTY_QUEUE;
        }
        
        //如果 tail 指针落后了
        if ( head == tail && next == NULL ) {
            CAS(Q->tail, tail, next);
            continue;
        }
        //移动 head 指针成功后,取出数据
        if ( CAS( Q->head, head, next) == TRUE){
            value = next->value;
            break;
        }
    }
    free(head); //释放老的dummy结点
    return value;
}

CAS的ABA问题

所谓ABA,问题基本是这个样子:

  • 进程P1在共享变量中读到值为A
  • P1被抢占了,进程P2执行
  • P2把共享变量里的值从A改成了B,再改回到A,此时被P1抢占。
  • P1回来看到共享变量里的值没有被改变,于是继续执行。

虽然P1以为变量值没有改变,继续执行了,但是这个会引发一些潜在的问题。ABA问题最容易发生在lock free 的算法中的,CAS首当其冲,因为CAS判断的是指针的值。很明显,值是很容易又变成原样的。

比如上述的DeQueue()函数,因为我们要让head和tail分开,所以我们引入了一个dummy指针给head,当我们做CAS的之前,如果head的那块内存被回收并被重用了,而重用的内存又被EnQueue()进来了,这会有很大的问题。(内存管理中重用内存基本上是一种很常见的行为)

这个例子你可能没有看懂,一个活生生的例子——

你拿着一个装满钱的手提箱在飞机场,此时过来了一个火辣性感的美女,然后她很暖昧地挑逗着你,并趁你不注意的时候,把用一个一模一样的手提箱和你那装满钱的箱子调了个包,然后就离开了,你看到你的手提箱还在那,于是就提着手提箱去赶飞机去了。

这就是ABA的问题。

解决ABA的问题

维基百科上给了一个解——使用double-CAS(双保险的CAS),例如,在32位系统上,我们要检查64位的内容

  • 一次用CAS检查双倍长度的值,前半部是值,后半部分是一个计数器。
  • 只有这两个都一样,才算通过检查,要吧赋新的值。并把计数器累加1。

这样一来,ABA发生时,虽然值一样,但是计数器就不一样(但是在32位的系统上,这个计数器会溢出回来又从1开始的,这还是会有ABA的问题)

当然,我们这个队列的问题就是不想让那个内存重用,这样明确的业务问题比较好解决。

SafeRead(q)
{
    loop:
        p = q->next;
        if (p == NULL){
            return p;
        }
        Fetch&Add(p->refcnt, 1);
        if (p == q->next){
            return p;
        }else{
            Release(p);
        }
    goto loop;
}

其中的 Fetch&Add和Release分是是加引用计数和减引用计数,都是原子操作,这样就可以阻止内存被回收了。

用数组实现无锁队列

使用数组来实现队列是很常见的方法,因为没有内存的分部和释放,一切都会变得简单,实现的思路如下:

  • 数组队列应该是一个ring buffer形式的数组(环形数组)
  • 数组的元素应该有三个可能的值:HEAD,TAIL,EMPTY(当然,还有实际的数据)
  • 数组一开始全部初始化成EMPTY,有两个相邻的元素要初始化成HEAD和TAIL,这代表空队列。
  • EnQueue操作。假设数据x要入队列,定位TAIL的位置,使用double-CAS方法把(TAIL, EMPTY) 更新成 (x, TAIL)。需要注意,如果找不到(TAIL, EMPTY),则说明队列满了。
  • DeQueue操作。定位HEAD的位置,把(HEAD, x)更新成(EMPTY, HEAD),并把x返回。同样需要注意,如果x是TAIL,则说明队列为空。

算法的一个关键是——如何定位HEAD或TAIL?

  • 我们可以声明两个计数器,一个用来计数EnQueue的次数,一个用来计数DeQueue的次数。
  • 这两个计算器使用使用Fetch&ADD来进行原子累加,在EnQueue或DeQueue完成的时候累加就好了。
  • 累加后求个模什么的就可以知道TAIL和HEAD的位置了。

如下图所示:

小结

以上基本上就是所有的无锁队列的技术细节,这些技术都可以用在其它的无锁数据结构上。

  • 无锁队列主要是通过CAS、FAA这些原子操作,和Retry-Loop实现。
  • 对于Retry-Loop,我个人感觉其实和锁什么什么两样。只是这种“锁”的粒度变小了,主要是“锁”HEAD和TAIL这两个关键资源。而不是整个数据结构。

相关推荐

电脑c盘满了应该怎么办(如果电脑c盘满了怎么办啊)
电脑c盘满了应该怎么办(如果电脑c盘满了怎么办啊)

1、电脑桌面双击此电脑2、进入后找到Windows(C)盘,然后鼠标右击选择属性3、点击磁盘清理4、勾选需要清理的文件,最后点击确定即可1、运用磁盘清理软件清理C盘,大约可为C盘释放50M-800M空间。2、关闭休眠功能,在开始菜单的运行里...

2025-12-18 11:03 off999

win10桌面突然清空了(电脑桌面全部被隐藏了怎么恢复)

1、右键点击任务栏,然后选择任务管理器或按快捷键Ctrl+Shift+Esc;  2、打开任务管理器后,切换到详细信息模式。在进程中找到“桌面窗口管理器”(英文版系统找DesktopWindowM...

华硕笔记本全系列介绍(华硕笔记本全系列介绍视频)

关于这个问题,华硕笔记本一共有多个系列,每个系列定位不同。以下是华硕笔记本的主要系列及其定位:1.ASUSVivoBook(维沃系列):面向普通用户和学生,注重轻薄、时尚设计和价格实惠。2.AS...

华为笔记本电脑i5和i7区别(华为笔记本电脑i5和i7区别是什么)

主要是性能上的区别。如果将CPU比作火车运输,那么i5等于4条高铁,i7可以是6条或者8条高铁,运输量倍数增加。i7可以看作是i5的高配版。功能不同。i5和i7两个版本,其主要区别是在处理器的频率不...

如何下载office2007办公软件
  • 如何下载office2007办公软件
  • 如何下载office2007办公软件
  • 如何下载office2007办公软件
  • 如何下载office2007办公软件
u盘启动蓝屏(联想电脑进入u盘启动蓝屏)
u盘启动蓝屏(联想电脑进入u盘启动蓝屏)

电脑插入U盘后蓝屏的原因如下:1、Windows的系统分区存在磁盘错误或文件错误2、主板的SATA或IDE控制器驱动程序受到了损坏或安装不正确3、计算机遭到了病毒木马、流氓软件等恶意程序的攻击解决办法如下:1、执行磁盘扫描程序对所有的磁盘驱...

2025-12-18 08:51 off999

下载新版微信并安装(下载新版微信并安装到手机)

1.首先打开手机的浏览器,在搜索栏中输入微信官网,并点击搜索。2.出现微信后点击下载,下载完成后,点击安装。   3.安装完成后,再回到桌面,点击“微信”4.输入账号密...

测速在线测试(测速在线测试高铁)

回答:不靠谱。例如:SPEEDTEST是一家叫Ookla的公司开发的测速工具,稍有经验的朋友想必对它都不会陌生。Ookla在全世界各地维护了大量测速节点,SPEEDTEST测量的就是与这些测速节点间的...

格式工厂免费版(格式工厂免费版破解版)

不收费用格式工厂是由上海格式工厂网络有限公司创立于2008年2月,是面向全球用户的互联网软件。格式工厂发展至今,已经成为全球领先的视频图片等格式转换客户端。格式工厂致力于帮用户更好的解决文件使用问题,...

路由器连接电脑插哪个端口(路由器跟电脑的连接线怎么插)

电脑连接路由器插入路由器LAN口。具体方法如下1、光纤或网线插到路由器的WAN口上,(或网线连接modem的line口,modem的lan口连接了无线路由的wan口);2、电脑网线从路由器的LAN口上...

华硕官方客服在线解答(华硕客服售后在线咨询)

如果您需要寻找ROG的售后服务,可以通过以下几种方式进行联系:1.官方网站:您可以在ROG的官方网站上找到售后服务的联系方式,如客服电话、在线客服、邮件等。2.客服热线:您可以通过ROG的客服热线...

召唤系统之最强帝国(召唤系统之最强人皇)

名字叫做《绝世皇帝》。讲述林谦,地球的国战游戏达人,发生意外后,重生到异界。然而,在这异界之中,除了修炼天赋好点,却没有特殊的一技之长。炼丹他炸炉,炼器成废铁,更别提阵法这些其他生活技能,一窍不通。不...

如何将网址粘贴为链接(网址复制到word怎么变成链接)

可以复制网址链接的方法有多种,最常用的方法是在浏览器中打开需要复制的网页,在地址栏中选中网址,然后右键选择“复制”,或者按下“Ctrl+C”快捷键,即可将网址复制到剪贴板中。另外,某些应用程序也提供了...

i主题app下载(i主题官方版)

OPPO手机使用i主题的操作方法如下首先我们打开OPPO手机,在手机桌面找到i主题的图标,点击图标进入到主题页面,在主题页面,我们选择主题专区,然后再推荐主题专区里面找到你喜欢的主题之后,点击...

win10官方下载工具打不开(win10下载软件打不开怎么办)
win10官方下载工具打不开(win10下载软件打不开怎么办)

步骤/方式1首先右击win10桌面的此电脑图标,选择管理进入。步骤/方式2展开本地用户和组,点击用户,右击右侧的Administrator账户,选择属性打开,然后取消账户已禁用的勾选点击确定。步骤/方式3然后再次登录Administrato...

2025-12-18 04:03 off999

取消回复欢迎 发表评论: