Mars Remote API轻松分布式执行Python函数
off999 2024-11-15 23:14 22 浏览 0 评论
Mars 是一个并行和分布式 Python 框架,能轻松把单机大家耳熟能详的的 numpy、pandas、scikit-learn 等库,以及 Python 函数利用多核或者多机加速。这其中,并行和分布式 Python 函数主要利用 Mars Remote API。
启动 Mars 分布式环境可以参考:
- 命令行方式在集群中部署。
- Kubernetes 中部署。
- MaxCompute 开箱即用的环境,购买了 MaxCompute 服务的可以直接使用。
如何使用 Mars Remote API
使用 Mars Remote API 非常简单,只需要对原有的代码做少许改动,就可以分布式执行。
采用蒙特卡洛方法计算 π 为例。代码如下,我们编写了两个函数,calc_chunk 用来计算每个分片内落在圆内的点的个数,calc_pi 用来把多个分片 calc_chunk 计算的结果汇总最后得出 π 值。
from typing import List
import numpy as np
def calc_chunk(n: int, i: int):
# 计算n个随机点(x和y轴落在-1到1之间)到原点距离小于1的点的个数
rs = np.random.RandomState(i)
a = rs.uniform(-1, 1, size=(n, 2))
d = np.linalg.norm(a, axis=1)
return (d < 1).sum()
def calc_pi(fs: List[int], N: int):
# 将若干次 calc_chunk 计算的结果汇总,计算 pi 的值
return sum(fs) * 4 / N
N = 200_000_000
n = 10_000_000
fs = [calc_chunk(n, i)
for i in range(N // n)]
pi = calc_pi(fs, N)
print(pi)%%time 下可以看到结果:
3.1416312
CPU times: user 9.47 s, sys: 2.62 s, total: 12.1 s
Wall time: 12.3 s在单机需要 12.3 s。
要让这个计算使用 Mars Remote API 并行起来,我们不需要对函数做任何改动,需要变动的仅仅是最后部分。
import mars.remote as mr
# 函数调用改成 mars.remote.spawn
fs = [mr.spawn(calc_chunk, args=(n, i))
for i in range(N // n)]
# 把 spawn 的列表传入作为参数,再 spawn 新的函数
pi = mr.spawn(calc_pi, args=(fs, N))
# 通过 execute() 触发执行,fetch() 获取结果
print(pi.execute().fetch())%%time 下看到结果:
3.1416312
CPU times: user 29.6 ms, sys: 4.23 ms, total: 33.8 ms
Wall time: 2.85 s结果一模一样,但是却有数倍的性能提升。
可以看到,对已有的 Python 代码,Mars remote API 几乎不需要做多少改动,就能有效并行和分布式来加速执行过程。
一个例子
为了让读者理解 Mars Remote API 的作用,我们从另一个例子开始。现在我们有一个数据集,我们希望对它们做一个分类任务。要做分类,我们有很多算法和库可以选择,这里我们用 RandomForest、LogisticRegression,以及 XGBoost。
困难的地方是,除了有多个模型选择,这些模型也会包含多个超参,那哪个超参效果最好呢?对于调参不那么有经验的同学,跑过了才知道。所以,我们希望能生成一堆可选的超参,然后把他们都跑一遍,看看效果。
准备数据
这个例子里我们使用 otto 数据集。
首先,我们准备数据。读取数据后,我们按 2:1 的比例把数据分成训练集和测试集。
import pandas as pd
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split
def gen_data():
df = pd.read_csv('otto/train.csv')
X = df.drop(['target', 'id'], axis=1)
y = df['target']
label_encoder = LabelEncoder()
label_encoder.fit(y)
y = label_encoder.transform(y)
return train_test_split(X, y, test_size=0.33, random_state=123)
X_train, X_test, y_train, y_test = gen_data()模型
接着,我们使用 scikit-learn 的 RandomForest 和 LogisticRegression 来处理分类。
RandomForest:
from sklearn.ensemble import RandomForestClassifier
def random_forest(X_train: pd.DataFrame,
y_train: pd.Series,
verbose: bool = False,
**kw):
model = RandomForestClassifier(verbose=verbose, **kw)
model.fit(X_train, y_train)
return model接着,我们生成供 RandomForest 使用的超参,我们用 yield 的方式来迭代返回。
def gen_random_forest_parameters():
for n_estimators in [50, 100, 600]:
for max_depth in [None, 3, 15]:
for criterion in ['gini', 'entropy']:
yield {
'n_estimators': n_estimators,
'max_depth': max_depth,
'criterion': criterion
}LogisticRegression 也是这个过程。我们先定义模型。
from sklearn.linear_model import LogisticRegression
def logistic_regression(X_train: pd.DataFrame,
y_train: pd.Series,
verbose: bool = False,
**kw):
model = LogisticRegression(verbose=verbose, **kw)
model.fit(X_train, y_train)
return model接着生成供 LogisticRegression 使用的超参。
def gen_lr_parameters():
for penalty in ['l2', 'none']:
for tol in [0.1, 0.01, 1e-4]:
yield {
'penalty': penalty,
'tol': tol
}XGBoost 也是一样,我们用 XGBClassifier 来执行分类任务。
from xgboost import XGBClassifier
def xgb(X_train: pd.DataFrame,
y_train: pd.Series,
verbose: bool = False,
**kw):
model = XGBClassifier(verbosity=int(verbose), **kw)
model.fit(X_train, y_train)
return model生成一系列超参。
def gen_xgb_parameters():
for n_estimators in [100, 600]:
for criterion in ['gini', 'entropy']:
for learning_rate in [0.001, 0.1, 0.5]:
yield {
'n_estimators': n_estimators,
'criterion': criterion,
'learning_rate': learning_rate
}验证
接着我们编写验证逻辑,这里我们使用 log_loss 来作为评价函数。
from sklearn.metrics import log_loss
def metric_model(model,
X_test: pd.DataFrame,
y_test: pd.Series) -> float:
if isinstance(model, bytes):
model = pickle.loads(model)
y_pred = model.predict_proba(X_test)
return log_loss(y_test, y_pred)
def train_and_metric(train_func,
train_params: dict,
X_train: pd.DataFrame,
y_train: pd.Series,
X_test: pd.DataFrame,
y_test: pd.Series,
verbose: bool = False
):
# 把训练和验证封装到一起
model = train_func(X_train, y_train, verbose=verbose, **train_params)
metric = metric_model(model, X_test, y_test)
return model, metric找出最好的模型
做好准备工作后,我们就开始来跑模型了。针对每个模型,我们把每次生成的超参们送进去训练,除了这些超参,我们还把 n_jobs 设成 -1,这样能更好利用单机的多核。
results = []
# -------------
# Random Forest
# -------------
for params in gen_random_forest_parameters():
print(f'calculating on {params}')
# fixed random_state
params['random_state'] = 123
# use all CPU cores
params['n_jobs'] = -1
model, metric = train_and_metric(random_forest, params,
X_train, y_train,
X_test, y_test)
print(f'metric: {metric}')
results.append({'model': model,
'metric': metric})
# -------------------
# Logistic Regression
# -------------------
for params in gen_lr_parameters():
print(f'calculating on {params}')
# fixed random_state
params['random_state'] = 123
# use all CPU cores
params['n_jobs'] = -1
model, metric = train_and_metric(logistic_regression, params,
X_train, y_train,
X_test, y_test)
print(f'metric: {metric}')
results.append({'model': model,
'metric': metric})
# -------
# XGBoost
# -------
for params in gen_xgb_parameters():
print(f'calculating on {params}')
# fixed random_state
params['random_state'] = 123
# use all CPU cores
params['n_jobs'] = -1
model, metric = train_and_metric(xgb, params,
X_train, y_train,
X_test, y_test)
print(f'metric: {metric}')
results.append({'model': model,
'metric': metric})运行一下,需要相当长时间,我们省略掉一部分输出内容。
calculating on {'n_estimators': 50, 'max_depth': None, 'criterion': 'gini'}
metric: 0.6964123781828575
calculating on {'n_estimators': 50, 'max_depth': None, 'criterion': 'entropy'}
metric: 0.6912312790832288
# 省略其他模型的输出结果
CPU times: user 3h 41min 53s, sys: 2min 34s, total: 3h 44min 28s
Wall time: 31min 44s从 CPU 时间和 Wall 时间,能看出来这些训练还是充分利用了多核的性能。但整个过程还是花费了 31 分钟。
使用 Remote API 分布式加速
现在我们尝试使用 Remote API 通过分布式方式加速整个过程。
集群方面,我们使用最开始说的第三种方式,直接在 MaxCompute 上拉起一个集群。大家可以选择其他方式,效果是一样的。
n_cores = 8
mem = 2 * n_cores # 16G
# o 是 MaxCompute 入口,这里创建 10 个 worker 的集群,每个 worker 8核16G
cluster = o.create_mars_cluster(10, n_cores, mem, image='extended')为了方便在分布式读取数据,我们对数据处理稍作改动,把数据上传到 MaxCompute 资源。对于其他环境,用户可以考虑 HDFS、Aliyun OSS 或者 Amazon S3 等存储。
if not o.exist_resource('otto_train.csv'):
with open('otto/train.csv') as f:
# 上传资源
o.create_resource('otto_train.csv', 'file', fileobj=f)
def gen_data():
# 改成从资源读取
df = pd.read_csv(o.open_resource('otto_train.csv'))
X = df.drop(['target', 'id'], axis=1)
y = df['target']
label_encoder = LabelEncoder()
label_encoder.fit(y)
y = label_encoder.transform(y)
return train_test_split(X, y, test_size=0.33, random_state=123)稍作改动之后,我们使用 mars.remote.spawn 方法来让 gen_data 调度到集群上运行。
import mars.remote as mr
# n_output 说明是 4 输出
# execute() 执行后,数据会读取到 Mars 集群内部
data = mr.ExecutableTuple(mr.spawn(gen_data, n_output=4)).execute()
# remote_ 开头的都是 Mars 对象,这时候数据在集群内,这些对象只是引用
remote_X_train, remote_X_test, remote_y_train, remote_y_test = data目前 Mars 能正确序列化 numpy ndarray、pandas DataFrame 等,还不能序列化模型,所以,我们要对 train_and_metric 稍作改动,把模型 pickle 了之后再返回。
def distributed_train_and_metric(train_func,
train_params: dict,
X_train: pd.DataFrame,
y_train: pd.Series,
X_test: pd.DataFrame,
y_test: pd.Series,
verbose: bool = False
):
model, metric = train_and_metric(train_func, train_params,
X_train, y_train,
X_test, y_test, verbose=verbose)
return pickle.dumps(model), metric后续 Mars 支持了序列化模型后可以直接 spawn 原本的函数。
接着我们就对前面的执行过程稍作改动,把函数调用全部都用 mars.remote.spawn 来改写。
import numpy as np
tasks = []
models = []
metrics = []
# -------------
# Random Forest
# -------------
for params in gen_random_forest_parameters():
# fixed random_state
params['random_state'] = 123
task = mr.spawn(distributed_train_and_metric,
args=(random_forest, params,
remote_X_train, remote_y_train,
remote_X_test, remote_y_test),
kwargs={'verbose': 2},
n_output=2
)
tasks.extend(task)
# 把模型和评价分别存储
models.append(task[0])
metrics.append(task[1])
# -------------------
# Logistic Regression
# -------------------
for params in gen_lr_parameters():
# fixed random_state
params['random_state'] = 123
task = mr.spawn(distributed_train_and_metric,
args=(logistic_regression, params,
remote_X_train, remote_y_train,
remote_X_test, remote_y_test),
kwargs={'verbose': 2},
n_output=2
)
tasks.extend(task)
# 把模型和评价分别存储
models.append(task[0])
metrics.append(task[1])
# -------
# XGBoost
# -------
for params in gen_xgb_parameters():
# fixed random_state
params['random_state'] = 123
# 再指定并发为核的个数
params['n_jobs'] = n_cores
task = mr.spawn(distributed_train_and_metric,
args=(xgb, params,
remote_X_train, remote_y_train,
remote_X_test, remote_y_test),
kwargs={'verbose': 2},
n_output=2
)
tasks.extend(task)
# 把模型和评价分别存储
models.append(task[0])
metrics.append(task[1])
# 把顺序打乱,目的是能分散到 worker 上平均一点
shuffled_tasks = np.random.permutation(tasks)
_ = mr.ExecutableTuple(shuffled_tasks).execute()可以看到代码几乎一致。
运行查看结果:
CPU times: user 69.1 ms, sys: 10.9 ms, total: 80 ms
Wall time: 1min 59s时间一下子从 31 分钟多来到了 2 分钟,提升 15x+。但代码修改的代价可以忽略不计。
细心的读者可能注意到了,分布式运行的代码中,我们把模型的 verbose 给打开了,在分布式环境下,因为这些函数远程执行,打印的内容只会输出到 worker 的标准输出流,我们在客户端不会看到打印的结果,但 Mars 提供了一个非常有用的接口来让我们查看每个模型运行时的输出。
以第0个模型为例,我们可以在 Mars 对象上直接调用 fetch_log 方法。
print(models[0].fetch_log())输出我们简略一部分。
building tree 1 of 50
building tree 2 of 50
building tree 3 of 50
building tree 4 of 50
building tree 5 of 50
building tree 6 of 50
# 中间省略
building tree 49 of 50
building tree 50 of 50要看哪个模型都可以通过这种方式。试想下,如果没有 fetch_log API,你确想看中间过程的输出有多麻烦。首先这个函数在哪个 worker 上执行,不得而知;然后,即便知道是哪个 worker,因为每个 worker 上可能有多个函数执行,这些输出就可能混杂在一起,甚至被庞大日志淹没了。fetch_log 接口让用户不需要关心在哪个 worker 上执行,也不用担心日志混合在一起。
本文为阿里云原创内容,未经允许不得转载。
相关推荐
- 迅捷路由器登录(yr1900g路由器登录入口)
-
入口如下:1.打开网页后输入192.168.1.1或tplogin.cn。2.第一次登录路由器或恢复出厂设置后再次设置,按提示设置好管理员密码、上网参数、wifi名称和密码。3.再次进入登录页面中,输...
- pdf格式怎么编辑(怎么创建pdf格式的文件)
-
1、电脑打开PDF文件。2、电脑打开PDF文件后,点击工具栏中的编辑。3、进入编辑页面后,可以点击文字,对pdf文件进行编辑。4、点击裁剪页面选项,就可以对PDF文件中的页面大小进行裁剪。5、PDF文...
- 电脑显示器不亮(电脑显示器不亮了)
-
多种原因:1、检查电脑主机与显示器之间的连接是否松动、损坏,显示器是否正常。2、这是最常见的故障,内存条接触不良导致显示器无信号。解决办法:断电/拔出内存条,用橡皮擦将金手指擦亮再装回去即可。3、显卡...
- 电脑频繁总自动关机(电脑经常性自动关机)
-
电脑总是自动关机原因如下 1、原因一:设置的问题 有的用户会在电脑上安装管家类软件,这些软件里会有一些设置预定时间关机的功能,比如设置为17:00关机,那么到了下午5点后它就会自动关机,一般检查一...
- 邮箱注册百度账号(邮箱注册百度帐号)
-
要使用邮箱注册天翼云盘,首先需要打开天翼云盘的官方网站。在注册页面中,选择使用邮箱注册并输入您的邮箱地址。然后,按照提示填写您的个人信息,包括用户名、密码等等。最后,点击注册按钮,等待验证邮件的发送。...
- 台式电脑截屏键快捷方式(台式电脑的截图快捷键在哪)
-
方法/步骤1第一个办法自然是我们最常见最简单的,使用“PrintScreen”键截图了。点击“PrintScreen”键,我们就可以直接截取全部屏幕,找个对话框或者文字区域粘贴就好了。我截的图是这样的...
- cad2014密钥001f1不对(cad2014密钥001f1无效)
-
Excel中序号要想输成001,我们可以进行如下的操作,我们先将所有输入序号的这一列全部选定,也就是点击英文字母这一列就可以全部选定了,然后我们在这个选定的区域的状拍下去,点击鼠标右键,再点击数值,再...
- xp强行删除管理员开机密码(windowsxp强行删除开机密码)
-
要清除WindowsXP开机密码,首先需要进入安全模式,然后进入控制面板,选择用户账户设置,再选择删除密码或更改密码选项,输入当前密码,然后将密码字段留空即可清除密码。如果忘记了密码,可以使用软件工...
- 台式机u盘装win10系统教程(台式机u盘安装win10)
-
答/具体方法如下一、准备工作1、8G或更大容量空U盘2、制作pe启动盘:微pe工具箱怎么制作u盘启动盘(UEFI&Legacy双模式)3、win10系统下载:ghostwin1064位官方...
-
- 笔记本黑屏了怎么唤醒(笔记本黑屏了怎么唤醒电脑)
-
1.笔记本电脑电池没电,自动关机之后,最好是尽快给电脑充电。2.带电脑冲一会电之后,才开机使用电源键开机。3.有些电脑在电池没电自动关机之后,会进入假关机状态,只要连接电源就会自动重启。4.在电脑使用的时候,最好是保证电源。步骤/方式1任意...
-
2025-11-17 18:51 off999
-
- windows10亮度调节在哪(windows10设置亮度调节)
-
win10屏幕亮度可在系统显示设置中调节。win10调节屏幕亮度的方法步骤如下:1、首先在桌面空白位置点击鼠标右键,选择“显示设置”选项,如图所示。2、在显示页面,在右侧找到“更改亮度”选项,点击滑块左右滑动即可调节屏幕亮度,如图所示。3、...
-
2025-11-17 18:03 off999
- u盘提示写保护 怎么取消啊(u盘写保护取消不了)
-
要解除U盘的写保护,可以使用以下方法:1.检查U盘上的物理写保护开关,将其移到关闭状态。2.通过命令行输入"diskpart"命令打开磁盘工具,选择U盘并输入"attribut...
- 如何重装win7电脑系统(如何重装win7电脑系统)
-
重装win7,就要先下载一个纯净版的win7系统文件,然后下载一个一键ghost重装工具,打开工具选择win7纯净系统文件,开始安装,系统会自动安装直到装完进入系统为止。相对来说还是比较傻瓜式简单便捷...
欢迎 你 发表评论:
- 一周热门
-
-
抖音上好看的小姐姐,Python给你都下载了
-
全网最简单易懂!495页Python漫画教程,高清PDF版免费下载
-
Python 3.14 的 UUIDv6/v7/v8 上新,别再用 uuid4 () 啦!
-
python入门到脱坑 输入与输出—str()函数
-
飞牛NAS部署TVGate Docker项目,实现内网一键转发、代理、jx
-
宝塔面板如何添加免费waf防火墙?(宝塔面板开启https)
-
Python三目运算基础与进阶_python三目运算符判断三个变量
-
(新版)Python 分布式爬虫与 JS 逆向进阶实战吾爱分享
-
慕ke 前端工程师2024「完整」
-
失业程序员复习python笔记——条件与循环
-
- 最近发表
- 标签列表
-
- python计时 (73)
- python安装路径 (56)
- python类型转换 (93)
- python进度条 (67)
- python吧 (67)
- python的for循环 (65)
- python格式化字符串 (61)
- python静态方法 (57)
- python列表切片 (59)
- python面向对象编程 (60)
- python 代码加密 (65)
- python串口编程 (77)
- python封装 (57)
- python写入txt (66)
- python读取文件夹下所有文件 (59)
- python操作mysql数据库 (66)
- python获取列表的长度 (64)
- python接口 (63)
- python调用函数 (57)
- python多态 (60)
- python匿名函数 (59)
- python打印九九乘法表 (65)
- python赋值 (62)
- python异常 (69)
- python元祖 (57)
