Python:Celery 的简单使用
最近写的后端项目需要用到队列,其中有一些东西折腾了好一会儿,在这里记录一下。
Celery 是一个开源的分布式任务队列,可实现异步任务调度、定时任务、任务结果处理等功能。
本文基于 Celery 5.2.7 版本编写。
概览
Celery 其实包含了几个概念:Celery
客户端、消息中间件 Broker
、结果存储 Backend
以及 任务执行单元 Worker
。
大致的流程是:Celer
客户端发起任务 -> 相关信息存储到消息中间件 Broker
中 -> 任务执行单元 Worker
从 Broker
中读取任务(Task
)然后执行 -> 将执行结果存储到 Backend
中 -> Celery
客户端从 Backend
中读取结果。
Celery 支持多种消息中间件和结果存储,本文主要使用 Redis 作为中间件和结果存储。
Celery
对象的初始化 & 配置
在 Celery
类用 __init__
方法初始化之后,有两种配置方式:
__init__
可以在初始化 Celery
对象的同时传入对应的配置。有以下几个比较常用的参数:
main
:主模块的名称broker
:消息中间件存储的访问地址backend
:结果存储服务的访问地址(可选的)
例如说一个可能的 Celery
对象的初始化代码为:
from celery import Celery
celery = Celery('flaskr', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')
可以使用同一个 Redis 服务同时作为中间件和结果存储。
flaskr
是包含 Celery
客户端的 Python 程序的名称,如果使用 Flask 框架开发,那么应当是和初始化 Flask
对象时使用的名称一致。
上文有提到 backend
是可选的,如果不传该参数,那么就无法通过 Celery 查询到任务的执行结果。
config_from_object
Celery
类还提供了一个 config_from_object
方法,支持通过 “Python 配置文件” 进行配置。使用上代码类似下面这样:
from celery import Celery
celery = Celery('flaskr')
celery.config_from_object('flaskr.tools.celery_config')
celery_config
是文件名。flaskr.tools.celery_config
是这个文件的 “路径”,或者说当我们想要在代码中 import
该 python 文件时使用的路径。
celery_config.py
文件中的内容为:
from os import environ
CELERY_REDIS_URL = "redis://" + \
":" + \
environ.get("REDIS_PASSWD") + \
"@" + \
environ.get("REDIS_HOST") + \
":" + \
environ.get("REDIS_PORT") + \
"/" + \
environ.get("REDIS_DATABASE")
BROKER_URL = CELERY_REDIS_URL
CELERY_RESULT_BACKEND = CELERY_REDIS_URL
在该文件中,我们可以从环境变量中读取配置,或者直接写死了我们所需要的配置字段。
注意
Celery
对象的初始化以及配置必须要放在顶层,下面的代码是不行的:
from celery import Celery
celery = Celery('flaskr')
def init_app(app):
celery.config_from_object('flaskr.tools.celery_config')
因为后续 Worker
将会根据配置的路径,寻找 Celery
对象,从该对象中获取相应的配置。
该过程可以理解为:“不是获取一个已经初始化好了的对象,而是根据配置的路径,找到对应的文件/模块,重新初始化 Celery
对象”。
该过程只会执行顶层属性、方法,也就是说它不会调用你的 init_app
方法,所以这么写之后 Worker
侧的 Celery
对象属于只是被初始化但是没有被配置过的状态,所有的配置项都会是默认值。
Worker
Worker
服务是独立于 Celery
客户端程序的,需要单独另起一个服务。
在终端执行下面的命令启动 Worker
服务:
celery -A flaskr.libs.system worker -l info
其中 -A
参数后面接着的 flaskr.libs.system
是 Celery
对象所在的模块/文件路径。和 config_from_object
一样,这里也是 “当我们想要在代码中 import
该 python 文件时使用的路径”。要确保该路径下包含着 Celery
对象。
-l
参数是日志等级,示例中指定到 info
级别。
Celery 的使用
下面简单的介绍一下 Celery 框架的使用方法。
上文中有提到,Worker
会在运行时读取 Celery
客户端中的 Task
任务并执行。
定义任务
下面是一个简单的 Task
的定义:
# in tasks.py
from flaskr.libs.system import celery
@celery.task
def add(x, y):
return x + y
你可以将其定义为顶层/全局方法,也可以将其包裹在类中。
如果包裹在类中,会涉及到
self
参数问题,请参考后面的小节:任务中的 self
上文我们在 flaskr.libs.system
模块下定义了 celery
对象,然后就可以在 tasks.py
文件中使用 @celery.task
装饰器,使一个方法成为 Celery 任务。
注意:如果你的
Celery
对象不叫celery
而是其他名字,例如app
。那么你的装饰器应该是@app.task
——task
是Celery
对象的方法。
在 Task
任务内,return
结果代表着 “任务执行成功”,通过 raise
抛出异常意味着 “任务执行失败。”
执行任务
对任务方法调用 delay
方法以传递参数并执行任务:
# 使用参数 4,5 调用任务 add
result = add.delay(4, 5)
# 对执行结果调用 get() 方法以获取任务执行结果
print(result.get())
get()
方法获取的是任务执行成功之后的结果,即 return
的结果。
如果要获取 raise
抛出的异常,我们可以使用 result.result
方法获取异常。
判断状态
在获取任务的执行结果之前,我建议您先判断当前任务的执行状态。
Celery 框架提供了 AsyncResult
类来查询任务执行状态:
def exec_task(self):
"""执行任务,并返回任务 ID"""
result = add.delay(4, 5)
task_id = result.task_id
return task_id
def check_status(self, task_id):
"""根据任务 ID 查询任务状态"""
from flaskr.libs.system import celery
result = celery.AsyncResult(task_id)
有一些文章会执行使用
AsyncResult
类,而不是通过celery
对象调用AsyncResult
。
但是在我实际操作过程中发现,不通过celery
对象调用的话会找不到Backend
,怀疑也是和配置有关。
result
对象有一个 task_id
属性,使用该属性值作为参数初始化 AsyncResult
对象,就可以查询该任务的状态:
result.state
:获取当前任务的状态(字符串)result.ready()
:判断任务是否已经结束(成功 or 失败)result.successful()
:判读任务是否成功result.get()
:和上文提及的一样,获取执行成功的任务的结果(return
)result.result
:和上文提及的一样,获取执行失败的任务抛出的异常(raise
)
一种有可能的处理流程如下:
def check_status(self, task_id):
# 查询任务状态
from flaskr.libs.system import celery
result = celery.AsyncResult(task_id)
logging.info(f"查询任务结果,任务ID({task_id})对应的状态为:{result.state}")
# 任务仍在执行中
if not result.ready():
return { "code": "105", "message": "任务仍在执行中" }
# 任务失败
if not result.successful():
error = result.result
return { "code": "9999", "message": f"{str(error)}" }
# 获取任务执行结果
task_result = result.get()
return { "code": "200", "data": task_result, "message": "任务执行成功" }
任务中的 self
.task()
方法有一个 bind
参数,它的默认值是 False
。该参数可以控制是否将任务绑定到实例对象上。这关乎到 self
参数的具体值。
定义在类中的任务
如果你的任务定义是包裹在类中,像是这样:
from flaskr.libs.system import celery
class Tasks(object):
def __init__(self, value):
self.value = value
@celery.task
def add(self, x):
return x + self.value
那么此时需要注意,因为 bind
参数的默认值是 False
,所以此时 add
任务没有被绑定到 Tasks
类实例上,所以此时 self
参数指的是任务本身,而不是 Tasks
类的实例对象。此时在 add
方法中,你将无法获取到 self.value
的值。
如果你想在 add
方法中使用 self.value
,你需要将 bind
参数设置为 True
,像是这样:
...
@celery.task(bind=True)
def add(self, x):
return x + self.value
...
此时你可以顺利的获取到 self.value
的值。
定义在顶层的任务
如果任务直接定义在文件中,类似下面这样:
from flaskr.libs.system import celery
@celery.task
def add(x, y):
return x + y
那么首先,此时 bind
参数的值是 False
。这个时候您无法在 add
方法的参数列表中添加 self
,也无法在方法实现内使用 self
参数,这会导致编译失败。
如果你将 bind
设置为 True
,那么就可以添加 self
参数了,例如下面这样:
from flaskr.libs.system import celery
@celery.task(bind=True)
def add(self, x, y):
return x + y
那么此时 self
代表什么呢?
上文有提到,bind
参数用于将任务绑定到类实例。
实际上,此时 Celery 会在后台自动将该任务转换为一个带有实例方法的类,使你可以在任务函数中访问实例属性和方法。在这种情况下,self
参数表示当前任务的实例对象,就像在类中定义任务一样。
重试
Celery 框架本身支持 “重试” 功能,包含自动重试和手动重试两种方法。这里简单介绍一下手动重试的用法。
retry
方法的调用
# in tasks.py
from flaskr.libs.system import celery
@celery.task(bind=True)
def add(self, x, y):
try:
# 某些可能会抛出异常的方法
return __some_method_may_throw_exceptions()
except Exception as e:
# 出错每15秒尝试一次,总共尝试3次
raise self.retry(exc=e, countdown=15, max_retries=3)
finally:
pass
该例子中,我们在文件内直接定义了一个任务,而且 bind
参数被设置为了 True
。此时我们要通过 self
参数来调用 retry
方法,执行手动重试。
那如果 bind
是 False 的情况呢?此时我们可以使用 add.retry(...)
来触发手动重试。
如果任务定义在类中:
- 如果
bind
是False
,则可以直接使用self.retry(...)
触发手动重试。 - 如果
bind
是True
,则需要使用add.retry(...)
触发手动重试。
总结一下不同的场景:
- 任务定义在类中:
- 如果
bind
是False
,则可以直接使用self.retry(...)
触发手动重试。 - 如果
bind
是True
,则需要使用add.retry(...)
触发手动重试。
- 如果
- 任务定义在文件中:
- 如果
bind
是False
,则可以直接使用add.retry(...)
触发手动重试。 - 如果
bind
是True
,则需要使用self.retry(...)
触发手动重试。
- 如果
可以说定义在类中和定义在文件中,对于相同的 bind
参数,调用方式是相反的。
retry
方法的参数
说完了调用,我们来说一下参数。
countdown
定义重试的间隔,max_retries
定义最多重试几次。
exc
参数是一个可选参数,它代表着 “当前任务失败时产生的异常”。通过将 e
参数传递到 retry
方法中,可以让重试失败后抛出最开始的异常。您也可以自定义该异常参数。
在使用 retry
方法时,如果没有指定 exc
参数,Celery 框架将默认将任务的异常信息传递给 retry
方法。如果指定了 exc
参数,则会将指定的异常对象传递给 retry
方法。
retry
方法的结果
在上面的例子中,我们使用了 raise
将 retry
失败后的异常抛出,意味着任务失败。
同时我们还可以不使用该参数,在重试失败之后手动处理异常,例如:
# in tasks.py
from flaskr.libs.system import celery
@celery.task(bind=True)
def add(self, x, y):
try:
return __some_method_may_throw_exceptions()
except Exception as e:
max_retries = 3
# 超过最大次数,返回错误信息
if self.request.retries >= max_retries:
return __create_error_response()
self.retry(exc=e, countdown=15, max_retries=max_retries)
finally:
pass
在这个例子中,如果超过了最大重试次数,因为我们没有将重试的异常抛给外界,所以在最后一次重试失败之后会再次触发 except
,此时我们可以判断是否超过了最大重试次数,意味着重试失败。
当重试失败后,我们可以根据需求进行失败处理。