百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术资源 > 正文

共享内存无锁队列的实现(共享内存 实现)

off999 2025-01-24 13:26 23 浏览 0 评论

导语: 共享内存无锁队列是老调重弹了,相关的实现网上都能找到很多。但看了公司内外的很多实现,都有不少的问题,于是自己做了重新实现。主要是考虑了一些异常情况加强健壮性,并且考虑了C++11的内存模型。

为什么需要共享内存无锁队列?

为了便于查找定位问题,需要做一个日志收集跟踪系统,每个业务模块都需要调用SDK输出格式化的本地日志并将日志发送到远端。

为了避免发送日志阻塞业务,典型的做法是业务线程将日志写入队列,另一个线程异步地从队列中读取数据并发送。考虑到IO性能,且日志数据能容忍小概率的丢失,所以队列不应该是在磁盘上。又因为业务模块可能是多线程模式也可能是多进程模式,所以队列应该是在共享内存中。

简单的做法是,对队列的读写都加锁,但这样无疑会导致高并发下性能瓶颈就在这把锁上。所以我们需要无锁队列。看了公司内外很多版本的无锁队列实现,多多少少都有些问题,所以自己重新实现了一个版本。

环形数组

大部分无锁队列都是用环形数组实现的,简单高效,这里也不例外。假设队列长度为queue_len,用read_index表示可读的位置,用write_index表示可写的位置。

每次修改read_index或write_index的时候都需要将其归一化:

read_index %= queue_len

队列已使用空间used_len的计算为:

write_index >= read_index ?
  write_index - read_index : queue_len - read_index + write_index

判断队列IsEmpty的条件为:

read_index == write_index

如果不做特殊处理,判断队列IsFull的条件和IsEmpty的条件一样,从而难以区分。所以我们将队列可写入长度设为queue_len-1。这样判断长度为write_len的数据是否可以写入的条件为:

// 注意是 < 而不是 <= 
used_len + write_len < queue_len

一写一读

先来考虑一写一读的场景,实现起来最简单。

写操作:先判断是否可以写入,如果可以,则先写数据,写完数据后再修改write_index。

读操作:先判断是否可以读取used_len > 0,如果可以,则先读数据,读完再修改read_index。

因为read_index和write_index都只会有一个地方去写,所以其实不需要加锁也不需要原子操作,直接修改即可。需要注意读写数据的时候都需要考虑遇到数组尾部的情况。

多写一读

再来考虑复杂些的多写一读的场景。因为多个生产者都会修改write_index,所以在不加锁的情况下必须使用原子操作,笔者使用的是GCC内置原子操作函数:

/ __sync系列的内置函数在C++11之后已经过时,不建议使用
// C++11的std::atomic函数就是用__atomic系列内置函数实现的,所以也考虑了C++11提出的内存模型
// 该函数在*ptr == *expected的时候,将*ptr = desired,并返回true,否则返回false,并将*expected = *ptr
// 最后两个参数分别表示修改成功和失败时使用的内存模型,后面会讲
bool __atomic_compare_exchange_n (type *ptr, type *expected, type desired, bool weak, int success_memorder, int failure_memorder);

一种错误实现:

有的实现在写入过程中对write_index使用了多次原子操作,比如先原子增加write_index,再写入数据,如果写入失败,再原子减小write_index,看起来每次操作都是原子的,但多个原子操作连在一起就不是原子操作了,整个写入过程中对write_index应当只有一次原子操作。

相关视频推荐

高并发场景3种锁方案:自旋锁、互斥锁、原子操作的优缺点

高性能服务器开发必备组件-无锁队列的设计与实现

初识linux内核,进程通信还能这么玩

需要C/C++ Linux服务器架构师学习资料加qun812855908获取(资料包括C/C++,Linux,golang技术,Nginx,ZeroMQ,MySQL,Redis,fastdfs,MongoDB,ZK,流媒体,CDN,P2P,K8S,Docker,TCP/IP,协程,DPDK,ffmpeg等),免费分享

常见的错误实现:

1 .先读取write_index,判断新的数据是否有足够的空间可以写入。

1.1 如果没有足够空间则返回队列满。

2 .如果有足够的空间,则准备写入。

2.1 一写的时候,是先写数据再改write_index。多写的时候为了避免同时写到同一片内存,需要先申请空间再写入数据。即先原子增加write_index,如果成功,再写入数据。

2.2 为了避免在生产者还未写完数据的时候,消费者就尝试读取,所以需要个同步机制告诉消费者数据正在写入中。比如头部预留一个字节,初始为0表示正在写入,写完数据后再改为1表示写入完成。头部中一般还有2字节表示数据长度。

3 .消费者发现used_len > 0即可尝试读取。

3.1 如果首字节为0,表示数据正在写入,等待。

3.2 如果首字节不为0,表示数据已写完,可以读取。

4 .消费者读取数据后,需要将read_index前移到合适的位置,且因为只有一个消费者,这里无需使用原子操作。

这种实现看似OK,其实也有问题。如果生产者在修改write_index之后,在修改头部首字节为1之前,这段时间内crash的话,就会导致消费者永远停留在等待生产者写完的状态上,且这个状态无法自动恢复。

我的优化一:

  1. 消费者发现头部首字节为0,则等待,但最多等待一段时间比如5ms。
  2. 在写入数据限制了最大长度的前提下,以现代计算机的速度,从修改write_index然后copy数据最后修改头部首字节为1,这段时间是非常快的,远小于5ms。
  3. 如果等待5ms后,发现首字节还是0,则认为该生产者crash了,根据头部中的长度信息,向前跳过这个非法数据块。

但如果生产者还没来得及写入数据长度就crash了呢?就想跳过非法数据块也不知道该跳多少了。

我的优化二:

1 .将队列分成N个定长block,定义如下:

struct Block {
 union {
     struct {
         bool m_used;
         uint8_t m_blk_cnt;
         uint8_t m_blk_idx;
         uint16_t m_blk_len;
     };
     char m_head_reserved[8];
 };
 char m_data[kBlockDataSize];

 bool CanUsed(uint8_t expected_blk_idx) const {
     return m_used && expected_blk_idx == m_blk_idx
         && m_blk_cnt <= kMaxBlockCount
         && m_blk_idx < m_blk_cnt
         && m_blk_len <= kBlockDataSize;
 }
};

2 .生产者写数据时先计算需要的blk_cnt,再原子地将write_index前移blk_cnt。写数据的时候第一个block最后写,每个block内部依然是最后写头部首字节m_used = true。

3 .当等待5ms后发现m_used还是false,认为写入者crash之后,就可以以block为单位向前跳跃,直到跳到一个合法block或者没有可以读取的数据为止。合法block判断条件为blk.CanUsed(0)。

这样就算生产者在任意时刻crash,消费者都有能力自动恢复,找到下一个合法block。但如果消费者并没有真正crash只是因为某种神秘的原因写入太慢超过了5ms,怎么办?

  1. 首先,因为消费者已经跳过,所以它这次写入的数据肯定是不会被消费了,即极小概率会遗漏数据。
  2. 其次,我们考虑更极小概率的情况,只有当生产者慢到队列循环了完整一轮,其它生产者重新申请到这片block准备写入,才会产生数据脏写。
  3. 再次,就算真的出现数据脏写,一般头部的blk_cnt和blk_idx等信息不会对不上,消费者每次消费数据都会通过CanUsed函数检测,检测不通过的都会跳过。
  4. 最后,如果说非要考虑极端情况,可以通过在头部中再加入block_crc和total_crc来校验数据。笔者考虑到日志数据容忍这种极小概率的错乱,所以省略了。

内存模型

看似完美了,真的吗?其实不然。以上还没有考虑内存模型。因为编译器的优化,实际代码执行顺序不一定是你写的顺序。也就是说虽然我们是先写数据最后设置m_used = true,但实际执行顺序并不一定真的如此,有可能先执行了m_used = true,再执行数据copy,这就乱套了。因此我们需要指定内存模型。

1.生产者对于m_used的修改,内存模型应该使用release。保证在这个操作之前的memory accesses不会重排到这个操作之后去,这样就不会向消费者提前释放可用信号。

__atomic_store_n(&blk.m_used, true, __ATOMIC_RELEASE);

2 .消费者对于m_used的读取,内存模型应该使用acquire。保证在这个操作之后的memory accesses不会重排到这个操作之前去,这样就不会提前读到生产者还未写完的数据。

__atomic_load_n(&m_used, __ATOMIC_ACQUIRE);

3 .对write_index的修改,即调用atomic_compare_exchange_n函数,最后两个参数应该都是ATOMIC_RELAXED,即内存模式使用relaxed,即没有约束。因为write_index只是多生产者之间用来做类似互斥的竞争,本来就是靠m_used真正约束生产者和消费者之间的行为顺序。

共享内存

另外一个值得一提的点是,共享内存我使用mmap,而非shmget。因为担心一台机器上部署的程序太多,可能出现共享内存key冲突的情况。万一出现共享内存冲突,被别的程序写坏了,就会出现莫名其妙的情况。所以使用mmap指定模块相关的文件路径,就不用太担心了。

需要多读吗?

如果再进一步实现多写多读,需要对read_index也考虑原子操作,加上稍显复杂的block检查跳跃逻辑,实现难度较高。但我们首先该问一个问题,真的需要多读吗?

我认为是不需要的:

  1. 首先,消费者可以批量读取,一次读取足够或者全部的可读数据。通过对后续业务逻辑的优化,一般单读都能满足性能要求。
  2. 其次,可以一读批量读取后再做进一步进程内多线程分发,会更加简单。
  3. 再次,如果单读真的不能满足性能要求,说明读后的业务逻辑非常重,那么这个时候,性能瓶颈就肯定不会是队列读取这里了,那么给读加锁无疑是更合适的选择。

有感而发

  1. 要写出高健壮性的代码,一定要时刻记得,程序可能会在你的任何一行代码处因为bug或者意外crash,不要想当然以为执行了上一行代码就一定会执行下一行代码。crash后重启是否能正常恢复?
  2. 写多线程多进程相关的逻辑,涉及到并发操作的时候,要考虑仔细,需不需要加锁?不加锁会有什么问题?
  3. 使用共享内存等共享资源时,更要想到,这资源不是我独占的,万一被有意或无意的篡改了数据该怎么办?能否尽量避免被别人篡改?如果被篡改,是否有发现和恢复机制?
  4. 不要以为你写的代码顺序就是真正的执行顺序,需要考虑内存模型。

相关推荐

python pip 命令 参数(python pip命令用不了)

usage:python[option]...[-ccmd|-mmod|file|-][arg]...Options(andcorrespondingenvironm...

Python 包管理:uv 来了!比 pip 快 100 倍的神器,开发者的终极选择?

为什么Python开发者需要uv?Python生态虽繁荣,但包管理一直是痛点:pip安装慢如蜗牛、依赖冲突让人头秃、虚拟环境配置繁琐……直到uv横空出世!这个用Rust语言打造的...

UV:Python包管理的未来已来!比pip快100倍的新选择

引言Python开发者们,是否厌倦了pip的缓慢安装速度?是否希望有一个更快、更现代、更高效的包管理工具?今天,我要向大家介绍一个革命性的Python包管理工具——UV!UV由Rust编写,是pip和...

「Python」 常用的pip命令和Django命令

pip命令如何根据关键词找到PyPI(Python包仓库)上的可用包#方法1:直接访问PyPI官网,输入关键词搜索#方法2#为何不用pipsearchdjango?因为这个命令已不可...

python包管理工具pip freeze详解(python工具包怎么用)

freeze就像其名字表示的意思一样,主要用来以requirement的格式输出已安装的包,这里我们主要讨论以下3个选项:--local、--user、--pathlocal--local选项一般用在...

python包管理工具pip config详解(python的pulp包)

pipconfig主要包含以下子命令:set、get、edit、list、debug、unset。下面我们逐一介绍下它们。pipconfigset这个命令允许我们以name=value的形式配...

pip常用命令,学Python不会这个寸步难行哦(26)

小朋友们好,大朋友们好!我是猫妹,一名爱上Python编程的小学生。欢迎和猫妹一起,趣味学Python。今日主题学习下pip的使用。pip什么是pippip全称PythonPackageIndex...

Python pip 包管理需知(python的包管理)

简介在Python编程中,pip是一个强大且广泛使用的包管理工具。它使我们能够方便地安装、升级和管理Python包。无论是使用第三方库还是分享自己的代码,pip都是我们的得力助手。本文将深入解析pip...

比pip快100倍的Python包安装工具(python如何用pip安装包)

简介uv是一款开源的Python包安装工具,GitHubstar高达56k,以性能极快著称,具有以下特性(官方英文原文):Asingletooltoreplacepip,pip-tool...

Python安装包总报错?这篇解决指南让你告别pip烦恼!

在Python开发中,pip是安装和管理第三方包的必备工具,但你是否经常遇到各种报错,比如无法创建进程、权限不足、版本冲突,甚至SSL证书错误?这些问题不仅浪费时间,还让人抓狂!别担心!本文整理了...

pip vs pipx: Python 包管理器,你选择哪个?

高效的包管理对于Python开发至关重要。pip和pipx是两个最常用的工具。虽然两者都支持安装Python包,但它们的设计和用例却大相径庭。本文将探讨这些差异,解释何时使用每种工具,并...

【python】5分钟掌握pip(包管理)操作

安装一个软件包从庞大的仓库中找到一个库,将其导入您的环境:pipinstallnumpy2.已安装软件包列表调查您领域内存在的库的概要,注意它们的版本:piplist3.升级软件包赋予已安装...

Python pip安装与使用步骤(python的pip安装方法)

安装和使用Python的包管理工具pip是管理Python包和依赖项的基础技能。以下是详细的步骤:安装pip使用系统包管理器安装Windows:通常,安装Python时会自动安装p...

Python自动化办公应用学习笔记3—— pip工具安装

3.1pip工具安装最常用且最高效的Python第三方库安装方式是采用pip工具安装。pip是Python包管理工具,提供了对Python包的查找、下载、安装、卸载的功能。pip是Python官方提...

Python文件压缩神器:ZipFile功能全解析,支持一键压缩和解压

在Python中处理ZIP文件时,zipfile模块是最常用的工具。它提供了创建、读取、修改ZIP文件的完整功能,无需依赖外部命令。本文将通过核心函数、实战案例和避坑指南,带你掌握这个高效的文件处理模...

取消回复欢迎 发表评论: