百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术资源 > 正文

共享内存无锁队列的实现(共享内存 实现)

off999 2025-01-24 13:26 34 浏览 0 评论

导语: 共享内存无锁队列是老调重弹了,相关的实现网上都能找到很多。但看了公司内外的很多实现,都有不少的问题,于是自己做了重新实现。主要是考虑了一些异常情况加强健壮性,并且考虑了C++11的内存模型。

为什么需要共享内存无锁队列?

为了便于查找定位问题,需要做一个日志收集跟踪系统,每个业务模块都需要调用SDK输出格式化的本地日志并将日志发送到远端。

为了避免发送日志阻塞业务,典型的做法是业务线程将日志写入队列,另一个线程异步地从队列中读取数据并发送。考虑到IO性能,且日志数据能容忍小概率的丢失,所以队列不应该是在磁盘上。又因为业务模块可能是多线程模式也可能是多进程模式,所以队列应该是在共享内存中。

简单的做法是,对队列的读写都加锁,但这样无疑会导致高并发下性能瓶颈就在这把锁上。所以我们需要无锁队列。看了公司内外很多版本的无锁队列实现,多多少少都有些问题,所以自己重新实现了一个版本。

环形数组

大部分无锁队列都是用环形数组实现的,简单高效,这里也不例外。假设队列长度为queue_len,用read_index表示可读的位置,用write_index表示可写的位置。

每次修改read_index或write_index的时候都需要将其归一化:

read_index %= queue_len

队列已使用空间used_len的计算为:

write_index >= read_index ?
  write_index - read_index : queue_len - read_index + write_index

判断队列IsEmpty的条件为:

read_index == write_index

如果不做特殊处理,判断队列IsFull的条件和IsEmpty的条件一样,从而难以区分。所以我们将队列可写入长度设为queue_len-1。这样判断长度为write_len的数据是否可以写入的条件为:

// 注意是 < 而不是 <= 
used_len + write_len < queue_len

一写一读

先来考虑一写一读的场景,实现起来最简单。

写操作:先判断是否可以写入,如果可以,则先写数据,写完数据后再修改write_index。

读操作:先判断是否可以读取used_len > 0,如果可以,则先读数据,读完再修改read_index。

因为read_index和write_index都只会有一个地方去写,所以其实不需要加锁也不需要原子操作,直接修改即可。需要注意读写数据的时候都需要考虑遇到数组尾部的情况。

多写一读

再来考虑复杂些的多写一读的场景。因为多个生产者都会修改write_index,所以在不加锁的情况下必须使用原子操作,笔者使用的是GCC内置原子操作函数:

/ __sync系列的内置函数在C++11之后已经过时,不建议使用
// C++11的std::atomic函数就是用__atomic系列内置函数实现的,所以也考虑了C++11提出的内存模型
// 该函数在*ptr == *expected的时候,将*ptr = desired,并返回true,否则返回false,并将*expected = *ptr
// 最后两个参数分别表示修改成功和失败时使用的内存模型,后面会讲
bool __atomic_compare_exchange_n (type *ptr, type *expected, type desired, bool weak, int success_memorder, int failure_memorder);

一种错误实现:

有的实现在写入过程中对write_index使用了多次原子操作,比如先原子增加write_index,再写入数据,如果写入失败,再原子减小write_index,看起来每次操作都是原子的,但多个原子操作连在一起就不是原子操作了,整个写入过程中对write_index应当只有一次原子操作。

相关视频推荐

高并发场景3种锁方案:自旋锁、互斥锁、原子操作的优缺点

高性能服务器开发必备组件-无锁队列的设计与实现

初识linux内核,进程通信还能这么玩

需要C/C++ Linux服务器架构师学习资料加qun812855908获取(资料包括C/C++,Linux,golang技术,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK,ffmpeg等),免费分享

常见的错误实现:

1 .先读取write_index,判断新的数据是否有足够的空间可以写入。

1.1 如果没有足够空间则返回队列满。

2 .如果有足够的空间,则准备写入。

2.1 一写的时候,是先写数据再改write_index。多写的时候为了避免同时写到同一片内存,需要先申请空间再写入数据。即先原子增加write_index,如果成功,再写入数据。

2.2 为了避免在生产者还未写完数据的时候,消费者就尝试读取,所以需要个同步机制告诉消费者数据正在写入中。比如头部预留一个字节,初始为0表示正在写入,写完数据后再改为1表示写入完成。头部中一般还有2字节表示数据长度。

3 .消费者发现used_len > 0即可尝试读取。

3.1 如果首字节为0,表示数据正在写入,等待。

3.2 如果首字节不为0,表示数据已写完,可以读取。

4 .消费者读取数据后,需要将read_index前移到合适的位置,且因为只有一个消费者,这里无需使用原子操作。

这种实现看似OK,其实也有问题。如果生产者在修改write_index之后,在修改头部首字节为1之前,这段时间内crash的话,就会导致消费者永远停留在等待生产者写完的状态上,且这个状态无法自动恢复。

我的优化一:

  1. 消费者发现头部首字节为0,则等待,但最多等待一段时间比如5ms。
  2. 在写入数据限制了最大长度的前提下,以现代计算机的速度,从修改write_index然后copy数据最后修改头部首字节为1,这段时间是非常快的,远小于5ms。
  3. 如果等待5ms后,发现首字节还是0,则认为该生产者crash了,根据头部中的长度信息,向前跳过这个非法数据块。

但如果生产者还没来得及写入数据长度就crash了呢?就想跳过非法数据块也不知道该跳多少了。

我的优化二:

1 .将队列分成N个定长block,定义如下:

struct Block {
 union {
     struct {
         bool m_used;
         uint8_t m_blk_cnt;
         uint8_t m_blk_idx;
         uint16_t m_blk_len;
     };
     char m_head_reserved[8];
 };
 char m_data[kBlockDataSize];

 bool CanUsed(uint8_t expected_blk_idx) const {
     return m_used && expected_blk_idx == m_blk_idx
         && m_blk_cnt <= kMaxBlockCount
         && m_blk_idx < m_blk_cnt
         && m_blk_len <= kBlockDataSize;
 }
};

2 .生产者写数据时先计算需要的blk_cnt,再原子地将write_index前移blk_cnt。写数据的时候第一个block最后写,每个block内部依然是最后写头部首字节m_used = true。

3 .当等待5ms后发现m_used还是false,认为写入者crash之后,就可以以block为单位向前跳跃,直到跳到一个合法block或者没有可以读取的数据为止。合法block判断条件为blk.CanUsed(0)。

这样就算生产者在任意时刻crash,消费者都有能力自动恢复,找到下一个合法block。但如果消费者并没有真正crash只是因为某种神秘的原因写入太慢超过了5ms,怎么办?

  1. 首先,因为消费者已经跳过,所以它这次写入的数据肯定是不会被消费了,即极小概率会遗漏数据。
  2. 其次,我们考虑更极小概率的情况,只有当生产者慢到队列循环了完整一轮,其它生产者重新申请到这片block准备写入,才会产生数据脏写。
  3. 再次,就算真的出现数据脏写,一般头部的blk_cnt和blk_idx等信息不会对不上,消费者每次消费数据都会通过CanUsed函数检测,检测不通过的都会跳过。
  4. 最后,如果说非要考虑极端情况,可以通过在头部中再加入block_crc和total_crc来校验数据。笔者考虑到日志数据容忍这种极小概率的错乱,所以省略了。

内存模型

看似完美了,真的吗?其实不然。以上还没有考虑内存模型。因为编译器的优化,实际代码执行顺序不一定是你写的顺序。也就是说虽然我们是先写数据最后设置m_used = true,但实际执行顺序并不一定真的如此,有可能先执行了m_used = true,再执行数据copy,这就乱套了。因此我们需要指定内存模型。

1.生产者对于m_used的修改,内存模型应该使用release。保证在这个操作之前的memory accesses不会重排到这个操作之后去,这样就不会向消费者提前释放可用信号。

__atomic_store_n(&blk.m_used, true, __ATOMIC_RELEASE);

2 .消费者对于m_used的读取,内存模型应该使用acquire。保证在这个操作之后的memory accesses不会重排到这个操作之前去,这样就不会提前读到生产者还未写完的数据。

__atomic_load_n(&m_used, __ATOMIC_ACQUIRE);

3 .对write_index的修改,即调用atomic_compare_exchange_n函数,最后两个参数应该都是ATOMIC_RELAXED,即内存模式使用relaxed,即没有约束。因为write_index只是多生产者之间用来做类似互斥的竞争,本来就是靠m_used真正约束生产者和消费者之间的行为顺序。

共享内存

另外一个值得一提的点是,共享内存我使用mmap,而非shmget。因为担心一台机器上部署的程序太多,可能出现共享内存key冲突的情况。万一出现共享内存冲突,被别的程序写坏了,就会出现莫名其妙的情况。所以使用mmap指定模块相关的文件路径,就不用太担心了。

需要多读吗?

如果再进一步实现多写多读,需要对read_index也考虑原子操作,加上稍显复杂的block检查跳跃逻辑,实现难度较高。但我们首先该问一个问题,真的需要多读吗?

我认为是不需要的:

  1. 首先,消费者可以批量读取,一次读取足够或者全部的可读数据。通过对后续业务逻辑的优化,一般单读都能满足性能要求。
  2. 其次,可以一读批量读取后再做进一步进程内多线程分发,会更加简单。
  3. 再次,如果单读真的不能满足性能要求,说明读后的业务逻辑非常重,那么这个时候,性能瓶颈就肯定不会是队列读取这里了,那么给读加锁无疑是更合适的选择。

有感而发

  1. 要写出高健壮性的代码,一定要时刻记得,程序可能会在你的任何一行代码处因为bug或者意外crash,不要想当然以为执行了上一行代码就一定会执行下一行代码。crash后重启是否能正常恢复?
  2. 写多线程多进程相关的逻辑,涉及到并发操作的时候,要考虑仔细,需不需要加锁?不加锁会有什么问题?
  3. 使用共享内存等共享资源时,更要想到,这资源不是我独占的,万一被有意或无意的篡改了数据该怎么办?能否尽量避免被别人篡改?如果被篡改,是否有发现和恢复机制?
  4. 不要以为你写的代码顺序就是真正的执行顺序,需要考虑内存模型。

相关推荐

photoshop6序列号(photoshop8.01序列号)
  • photoshop6序列号(photoshop8.01序列号)
  • photoshop6序列号(photoshop8.01序列号)
  • photoshop6序列号(photoshop8.01序列号)
  • photoshop6序列号(photoshop8.01序列号)
win10下载应用商店(win10应用商店打不开)

1、点击Win10系统的开始菜单,然后在点击应用商店;2、打开Win10应用商店后,在搜索框里输入想要搜索的应用软件,然后点击检索;3、点击搜索到的应用,点击安装;4、点击安装后,系统会提示要切换到这...

dell电脑重装系统win10(dell 重装win10系统)

戴尔笔记本重装系统win10的步骤如下:制作好wepe启动盘之后,将win10系统iso镜像直接复制到U盘。在需要重装系统的戴尔电脑上插入pe启动盘,重启后不停按F12启动快捷键,调出启动菜单对话框,...

android升级包下载安装(android 升级包)

打开手机系统更新升级,前提是官方有新系统推送才能更新  哪个大不一定,但一般规律如下:  1、小版本的更新,通常越更新越大。比如3.1更新到3.2,通常是修复bug,代码量通常会增大,体积就会增大。 ...

hdd硬盘和ssd(ssd硬盘和hdd硬盘是什么意思)

HDD硬盘和SSD硬盘是两种不同类型的电脑存储设备,它们有着以下区别:1.工作原理:HDD硬盘使用机械旋转的磁盘和读写磁头来存储和读取数据,而SSD硬盘则使用闪存存储数据,类似于USB闪存盘。2....

电脑免费软件下载大全(电脑上免费的下载软件)

正常情况下,如果我们想要在自己的电脑上面下载一个不要钱的单机游戏,那么我们是可以直接在我们的软件管理中心进行一个下载的,这个时候我们只需要通过一个权限就能够正常的下载,当然我们也是可以在一些小游戏的软...

mpp文件转换excel(mpp转换成pdf)

要将Excel表格转换为MPP格式,您可以按照以下步骤操作:1.打开Excel表格并确保数据按照项目的不同阶段或任务进行组织。2.将Excel表格中的数据复制到一个新的MicrosoftProj...

win7旗舰版开机密码忘记按f2

方法如下:开始-控制面板-用户帐户;在打开的更改用户帐户界面点击要更改的帐户;然后点击帐户左面的更改密码按钮;在打开的页面上,输入一次当前使用的密码,输入2次要更改的新密码然后保存退出就可以了...

笔记本无音频输出设备(笔记本无音频输出设备)

1、没有声卡驱动,解决方法就是找到笔记本的官网,下载电脑声卡的驱动安装即可。2、没有外界的音频播放设备,解决方法就是买一个外界的音频播放设备插到电脑主机的音频接口上即可。笔记本电脑显示未安装任何音频输...

iso文件能用手机打开吗(iso文件能用手机打开吗安全吗)

一般的压缩软件就可以打开的,比如,好压软件,这个打开只是解压形式的,如果你说的是运行iso文件,这个没有,况且安卓系统也不支持iso运行ISO文件一般用于光盘镜像文件的存储,如果想要在手机上运行ISO...

win7系统卡顿怎么优化(win7很慢很卡怎么优化)

1、首先打开安全卫士,进入安全卫士首页,单击软件窗口右下角的“更多”图标,打开扩展应用程序。2、单击选择“我的工具”。3、在我的工具菜单里面找到“人工服务”单击打开人工服务。4、在人工服务对话框有很多...

如何查看c盘微信聊天记录(如何查看c盘微信聊天记录内存大小)

微信群中的消息只要没删除基本都能保存,想要找微信群中几个多月前的消息可以直接根据日期来查找聊天记录。操作如下:1、打开想要查找记录的微信群,点击右上角人形图标;2、点击查找聊天内容;3、选择按日...

office2016家庭版激活密钥(office家庭版激活码2019)

走淘宝吧,因为零售版的密钥只能用一次。大概几块钱就能激活2016。如果你不在乎钱的话可以向我一样,订阅一个office365.实在不行可以和几个人一起买一个家庭版的365.出现这个情况,找微软申诉是没...

移动硬盘驱动器下载安装(移动硬盘驱动器下载安装教程)

1、右键单击您的桌面,选择“新建文件夹”,并命名该文件夹(例如“usb驱动程序”);2、然后到本站下载驱动程序;3、将其解压缩至在您的桌面上刚刚创建的usb驱动程序文件夹;4、单击开始菜单,然后选择设...

电脑硬盘格式化工具(电脑 格式化硬盘)

硬盘格式化工具很多,PQMACGIG8.0(中文就叫硬盘分区魔法师)是比较好的一个,这个是在WINDOWS下比叫好用,(个人感觉)FDISK也是比较好的一个,这个一般用在DOS下分区格式化WIN...

取消回复欢迎 发表评论: