百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术资源 > 正文

Redis 超越缓存,使用 Python 配合

off999 2024-11-04 13:16 24 浏览 0 评论


作为一名Python 开发者, 肯定使用过 Redis , 并且认为它是一个很棒的缓存。 虽然你的印象没有错, Redis 的确是一个很棒的缓存, 但使用 Redis 能够解决的问题并不仅限于缓存。

我们将探索 Redis 和 Redis Enterprise 的一些其他用途。 为了找点乐子, 我将使用之前《 使用 Redis 储存地理位置数据 》一文中的大脚怪(Bigfoot)数据。 此外, 由于这篇文章的读者都是 Python 开发者, 所以我将使用 Python 来编写本文的所有代码!

我在接下来展示的代码中使用了 aioredis 客户端库, 因为它对 async/await 提供了非常棒的支持。 如果你对 async/await 不熟悉的话, 那么可以去看看 这篇文章 , 里面提到了 async/await 对提升性能的帮助。

使用 Redis 构建队列

Redis 提供了字符串、哈希、集合和列表等多种数据结构可供使用。 这些数据结构都是储存数据的好帮手, 其中列表就可以用作一个非常棒的队列(queue)。

为了将列表用作队列, 我们需要使用 RPUSH 将新项目推送至列表末尾, 然后使用 LPOP 或者 BLPOP 将它们从列表的前面弹出。 由于 Redis 对数据库的所有修改都是在单个线程里面完成的, 所以这些操作都是原子的。

作为例子, 下面这段在队列里面添加了一些大脚怪的踪迹。

 import asyncio
 import aioredis

 async def main():

   redis = await aioredis.create_redis('redis://:foobared@localhost:6379/0', encoding='utf-8')

   await asyncio.gather(
     add_to_queue(redis, 'Possible vocalizations east of Makanda'),
     add_to_queue(redis, 'Sighting near the Columbia River'),
     add_to_queue(redis, 'Chased by a tall hairy creature')
   )

   redis.close()
   await redis.wait_closed()

 def add_to_queue(redis, message):
   return redis.rpush('bigfoot:sightings:received', message)

 asyncio.run(main())

这个程序非常直接。 我们只需要在第 18 行调用 redis.rpush , 就能够将指定的元素推入到队列。 接下来是从队列另一端读取元素的代码, 同样非常简单。

 import asyncio
 import aioredis

 from pprint import pp

 async def main():

   redis = await aioredis.create_redis('redis://:foobared@localhost:6379/0', encoding='utf-8')

   while True:
     sighting = await redis.blpop('bigfoot:sightings:received')
     pp(sighting)

 asyncio.run(main())

第 11 行和第 12 行的无限循环将等待并且打印被推入至队列中的大脚怪踪迹。 这里使用了 redis.blpop 而不是 redis.lpop , 因为前者可以阻塞客户端并等待列表中的元素返回。 比起让 Redis 和 Python 代码之间的网络无休止地轮询并做无用功, 让客户端阻塞并等待元素出现的做法会高效得多。

Redis 还有 一些同样很酷的命令 , 它们不仅可以将列表用作队列甚至堆栈。 我最喜欢的是 BRPOPLPUSH , 它可以从列表的右侧阻塞并弹出一些元素, 然后将被弹出的元素推入到另一个列表。 你可以使用这个命令来将一个队列中的元素传递至另一个队列, 这是非常棒的一个命令。

使用 Redis 订阅和发送事件

Redis 提供的东西中有些并不是数据结构, 比如订阅与发布(Pub/Sub)特性就是其中之一。 这个特性就像它的名字一样, 是一个内置于 Redis 中的发布与订阅机制。 得益于这个特性, 我们只需要 使用一些命令 就可以在自己的 Python 应用里面添加强大的订阅与发布机制。

通过执行订阅操作可以让我们发现事件, 以下是代码:

 import asyncio
 import aioredis

 from pprint import pp

 async def main():

   redis = await aioredis.create_redis('redis://:foobared@localhost:6379/0', encoding='utf-8')

   [channel] = await redis.psubscribe('bigfoot:broadcast:channel:*')

   while True:
     message = await channel.get()
     pp(message)

 asyncio.run(main())

因为我想要接收所有跟大脚兽有关的消息, 所以我在这段代码的第 10 行使用 redis.psubscribe 订阅了一个 Glob 风格的模式, 通过使用 bigfoot:broadcast:channel:* 作为模式, 客户端将接收到所有以 bigfoot:broadcast:channel: 开头的事件。

用于匹配模式的 redis.psubscribe 函数和非模式匹配的 redis.subscribe 函数都返回 Python 列表, 以便包含不定数量的元素。 程序将解构这个列表(Python 的术语是解包)以获得我想要的通道, 并在之后使用 .get 进行阻塞调用以等待下一条消息。

发布事件非常简单, 下面是代码:

 import asyncio
 import aioredis

 async def main():

   redis = await aioredis.create_redis('redis://:foobared@localhost:6379/0', encoding='utf-8')

   await asyncio.gather(
     publish(redis, 1, 'Possible vocalizations east of Makanda'),
     publish(redis, 2, 'Sighting near the Columbia River'),
     publish(redis, 2, 'Chased by a tall hairy creature')
   )

   redis.close()
   await redis.wait_closed()

 def publish(redis, channel, message):
   return redis.publish(f'bigfoot:broadcast:channel:{channel}', message)

 asyncio.run(main())

这段代码的重点是第 18 行, 它使用了名字非常直接的 redis.publish 来将消息发布至所需的通道。

值得注意的是, 发布与订阅是一个发送即遗忘机制(fire-and-forget)。 如果代码发布了一个事件但是却没有人监听, 那么该事件就会消失。 如果你想让自己的事件持续存在, 那么可以考虑使用前面提到的队列, 又或者接下来将要介绍的 Redis 流。

使用 Redis 储存数据流

除了发布与订阅之外, Redis 还可以使用流来发布和订阅事件。 Redis 流 是一个非常大的话题, 但使用它只需要 掌握少量命令 。 从 Python 来看, 这些命令的用法都是非常简单的, 我将一一向你说明。

下面的代码将把三次大脚兽的目击事件添加到流里面。

 import asyncio
 import aioredis

 async def main():

   redis = await aioredis.create_redis('redis://:foobared@localhost:6379/0', encoding='utf-8')

   await asyncio.gather(
     add_to_stream(redis, 1, 'Possible vocalizations east of Makanda', 'Class B'),
     add_to_stream(redis, 2, 'Sighting near the Columbia River', 'Class A'),
     add_to_stream(redis, 3, 'Chased by a tall hairy creature', 'Class A'))

   redis.close()
   await redis.wait_closed()

 def add_to_stream(redis, id, title, classification):
   return redis.xadd('bigfoot:sightings:stream', {
     'id': id, 'title': title, 'classification': classification })

 asyncio.run(main())

这段代码中最重要的就是第 17 行和第 18 行, 它使用了 redis.xadd 函数将一次目击事件的字段添加到流里面。

每个新添加的流事件都有一个唯一标识符, 其中包含自 1970 年开始的时间戳(毫秒)和一个用破折号连接的序列号。 例如, 当我写这篇文章的时候, 1970 年 1 月 1 日(Unix纪元)午夜已经过去了 1,593,120,357,193 毫秒(1.59千兆秒)。 因此当我运行上面这段代码的时候, 命令将创建出 ID 为 1593120357193-0 的事件。

我们在添加事件的时候可以使用 * 来代替具体的 ID , 这样 Redis 就会根据当前时间来自动生成事件的 ID , 这也是 redis.xadd 函数的默认行为。

正如接下来的代码所示, 在读取流元素的时候, 我们需要设置一个起始 ID 。 你可以看到, 在第 10 行, 程序将变量 last_id 设置成了 0-0 , 这个 ID 代表流的起始位置。

 import asyncio
 import aioredis

 from pprint import pp

 async def main():

   redis = await aioredis.create_redis('redis://:foobared@localhost:6379/0', encoding='utf8')

   last_id = '0-0'
   while True:
     events = await redis.xread(['bigfoot:sightings:stream'], timeout=0, count=5, latest_ids=[last_id])
     for key, id, fields in events:
       pp(fields)
       last_id = id

 asyncio.run(main())

程序的第 12 行使用 redis.xread 函数从流中请求最多 5 个 0-0 之后的事件。 该调用将返回一个列表, 然后程序将对其进行循环和解构, 以获得事件的字段和标识符。 事件的标识符会被储存起来, 以便将来调用 redis.xread 时可以获得新的事件并在有需要时重新读取之前读取过的旧事件。

将 Redis 用作搜索引擎

Redis 可以通过模块(Module)扩展来增加新的命令和功能。 有 大量的模块 可以用于 AI 模型服务、图形数据库、时间序列数据库以及本例中的搜索引擎。

RedisSearch 是一个强大的搜索引擎, 它摄取数据的速度快得惊人。 有些人喜欢用它来进行 瞬时搜索 , 但除此之外它也可以用来进行其他搜索。 下面是使用该模块的一个例子:

 import asyncio
 import aioredis

 from pprint import pp

 async def main():

   redis = await aioredis.create_redis('redis://:foobared@localhost:6379/0', encoding='utf-8')

   await redis.execute('FT.DROP', 'bigfoot:sightings:search')

   await redis.execute('FT.CREATE', 'bigfoot:sightings:search',
     'SCHEMA', 'title', 'TEXT', 'classification', 'TEXT')

   await asyncio.gather(
     add_document(redis, 1, 'Possible vocalizations east of Makanda', 'Class B'),
     add_document(redis, 2, 'Sighting near the Columbia River', 'Class A'),
     add_document(redis, 3, 'Chased by a tall hairy creature', 'Class A'))

   results = await search(redis, 'chase|east')
   pp(results)

   redis.close()
   await redis.wait_closed()

 def add_document(redis, id, title, classification):
   return redis.execute('FT.ADD', 'bigfoot:sightings:search', id, '1.0',
     'FIELDS', 'title', title, 'classification', classification)

 def search(redis, query):
   return redis.execute('FT.SEARCH', 'bigfoot:sightings:search', query)

 asyncio.run(main())

在第 12 和第 13 行, 程序使用 FT.CREATE 创建了一个索引。 索引需要描述程序将要添加的每个文档中的字段的模式。 在这个例子中, 程序需要添加大脚兽的目击事件, 该文档包含一个标题和一个分类, 并且它们都是文本字段。

在拥有了索引之后, 程序就可以向里面添加文档了, 这一操作发生在程序的第 27 行和第 28 行, 通过 FT.ADD 命令来完成。 每个文档偶读需要一个唯一 ID 、一个介于 0.0 和 1.0 之间的权重(rank)以及相应的字段。

正如程序的第 31 行所示, 在索引加载文档之后, 程序就可以使用 FT.SEARCH 命令和具体的查询语句来执行查询操作。 第 20 行的特定查询指示 RedisSearch 在索引中查找包含这些术语之一的文档。 在这个例子中, 该查询将返回两个文档。

使用 Redis 作为主数据库

Redis 可以作为一个速度奇快的内存存储数据库来使用。 下面的代码使用了哈希来演示这种用法。 哈希是一种非常棒的数据结构, 它可以建模你想要储存的记录类型, 并且能够将数据的主键用作键名的其中一部分。

 import asyncio
 import aioredis

 from pprint import pp

 async def main():

   redis = await aioredis.create_redis('redis://:foobared@localhost:6379/0', encoding='utf-8')

   await asyncio.gather(
     add_sighting(redis, 1, 'Possible vocalizations east of Makanda', 'Class B'),
     add_sighting(redis, 2, 'Sighting near the Columbia River', 'Class A'),
     add_sighting(redis, 3, 'Chased by a tall hairy creature', 'Class A'))

   sightings = await asyncio.gather(
     read_sighting(redis, 1),
     read_sighting(redis, 2),
     read_sighting(redis, 3))

   pp(sightings)

   redis.close()
   await redis.wait_closed()

 def add_sighting(redis, id, title, classification):
   return redis.hmset(f'bigfoot:sighting:{id}',
     'id', id, 'title', title, 'classification', classification)

 def read_sighting(redis, id):
   return redis.hgetall(f'bigfoot:sighting:{id}')

 asyncio.run(main())

你可能会这样想”如果我把服务器关掉了怎么办?如果它崩溃了怎么办?那我就什么数据都没有了!“ No,不会的! 你可以修改你的 redis.conf 文件, 用几种不同的方式来持久化内存中的数据 。 此外, 如果你使用的是 Redis Enterprise , 我们也有为你提供 相应的解决方案 , 使得你可以直接使用 Redis 而不必担心持久化的问题。

为了方便你亲手尝试这些例子, 我把文中涉及的 所有代码都放到了 GitHub 上面 , 你可以克隆并开始使用它们。 如果你是 Docker 用户, 项目里面也有一个名为 start-redis.sh 的 shell 脚本, 它可以拉取一个镜像, 然后启动一个能够运行这些例子的 Redis 版本。

如果你在玩耍完毕之后想要认真地构建一些软件, 那么可以注册并尝试 Redis Cloud Essentials 。 它和你所熟悉和喜欢的 Redis 一样, 唯一的区别就是这种 Redis 由云端进行管理, 所以你只需要专注于构建你的软件即可。

大家平时学习Python的时候肯定会遇到很多问题,小编我为大家准备了Python学习资料,将这些免费分享给大家!如果想要的可以找我领取

领取方式:

如果想获取这些学习资料,先关注我然后私信小编“01”即可免费领取!(私信方法:点击我头像进我主页右上面有个私信按钮)

如果这篇文章对你有帮助,请记得给我来个评论+转发

相关推荐

python import 出现 ModuleNotFoundError 解决方法

错误的原因是你的Python环境没有正确安装库文件。本文以Scapy为例,给出详细方案:1.确认是否成功安装Scapy运行以下命令检查Scapy是否已安装:pip3list|gre...

Github 7.4k star,一个强大的 Python 库-sh!

大家好,今天为大家分享一个强大的Python库-sh。Github地址:https://github.com/amoffat/shsh库是Python生态系统中一个专门用于执行系统命令的第三方...

学习编程第148天 python编程循环的嵌套使用

今天学习的是刘金玉老师零基础Python教程第32期,主要内容是python编程循环的嵌套使用。(一)一维数组及输出#一维数组list1=["110001","四川二流子...

2025-07-09:使数组元素互不相同所需的最少操作次数。用go语言,

2025-07-09:使数组元素互不相同所需的最少操作次数。用go语言,给定一个整数数组nums和一个整数k,对于数组中的每个元素,你最多可以对其进行一次操作:将一个在区间[-k,k]内的...

python数据分析numpy基础之max求数组最大值

1python数据分析numpy基础之max求数组最大值python的numpy库的max()函数,用于计算沿指定轴(一个轴或多个轴)的最大值。用法numpy.max(a,axis=None,...

加快Python算法的四个方法(四)Dask

CDA数据分析师出品相信大家在做一些算法经常会被庞大的数据量所造成的超多计算量需要的时间而折磨的痛苦不已,接下来我们围绕四个方法来帮助大家加快一下Python的计算时间,减少大家在算法上的等待时间。...

六十六、Leetcode数组系列(中篇)(leetcode679)

@Author:Runsen@Date:2020/6/8人生最重要的不是所站的位置,而是内心所朝的方向。只要我在每篇博文中写得自己体会,修炼身心;在每天的不断重复学习中,耐住寂寞,练就真功,不畏艰难...

Numpy中的ndarray是什么?('numpy.ndarray' object has no attribute 'append')

1.创建ndarray创建数组最简单的办法就是使用array函数。它接受一切序列型的对象(包括其他数组),然后产生一个新的含有传入数据的Numpy数组。np.array会尝试为新建的这个数组推断出一个...

Python中的数据导入与查询(python怎样导入数据库)

适用场景:快速导入文本/Excel数据→Pandas读取大型数值数据→Numpy处理复杂二进制文件→h5py/scipy.io数据库交互→SQLAlchemy+Pandas一、数据...

2025-07-02:统计数组中的美丽分割。用go语言,给定一个整数数组

2025-07-02:统计数组中的美丽分割。用go语言,给定一个整数数组nums,我们要把它划分成三个连续且非空的子数组nums1、nums2、nums3,且这三个子数组按顺序拼接后还原为原数组...

2025-07-10:字符相同的最短子字符串Ⅰ。用go语言,给定一个长度

2025-07-10:字符相同的最短子字符串Ⅰ。用go语言,给定一个长度为n的二进制字符串s和一个允许执行的最大操作次数numOps。每次操作可以选择字符串中的任意一个位置i(0≤i...

2025-06-19:识别数组中的最大异常值。用go语言,你有一个长度为

2025-06-19:识别数组中的最大异常值。用go语言,你有一个长度为n的整数数组nums,其中恰好有n-2个元素属于“特殊数字”类别。剩下的两个元素中,一个等于所有这些特殊数字的总和,另...

2025-06-28:长度可被 K 整除的子数组的最大元素和。用go语言,给

2025-06-28:长度可被K整除的子数组的最大元素和。用go语言,给定一个整数数组nums和一个整数k,求nums中长度为k的倍数的非空子数组中,子数组和的最大值。返回该最大和...

在 Python 中如何向一个已排序的数组(列表) 中插入一个数呢

在Python中如何向一个已排序的数组(列表)中插入一个数呢?方法有很多种,关键在于原来数组是什么样的排序,用到啥排序方法效率高,就用哪种。我们来练习其中的几种插入方法,另外也掌握下遍历数组的...

2025-07-04:统计符合条件长度为 3 的子数组数目。用go语言,给定

2025-07-04:统计符合条件长度为3的子数组数目。用go语言,给定一个整数数组nums,请你计算有多少个长度恰好为3的连续子数组满足这样的条件:子数组的第一个元素与第三个元素的和,正好...

取消回复欢迎 发表评论: