最近写的后端项目需要用到队列,其中有一些东西折腾了好一会儿,在这里记录一下。

Celery 是一个开源的分布式任务队列,可实现异步任务调度、定时任务、任务结果处理等功能。

本文基于 Celery 5.2.7 版本编写。

概览

Celery 其实包含了几个概念:Celery 客户端、消息中间件 Broker、结果存储 Backend 以及 任务执行单元 Worker

大致的流程是:Celer 客户端发起任务 -> 相关信息存储到消息中间件 Broker 中 -> 任务执行单元 WorkerBroker 中读取任务(Task)然后执行 -> 将执行结果存储到 Backend 中 -> Celery 客户端从 Backend 中读取结果。

Celery 支持多种消息中间件和结果存储,本文主要使用 Redis 作为中间件和结果存储。

Celery 对象的初始化 & 配置

Celery 类用 __init__ 方法初始化之后,有两种配置方式:

__init__

可以在初始化 Celery 对象的同时传入对应的配置。有以下几个比较常用的参数:

  • main:主模块的名称
  • broker:消息中间件存储的访问地址
  • backend:结果存储服务的访问地址(可选的)

例如说一个可能的 Celery 对象的初始化代码为:

from celery import Celery
celery = Celery('flaskr', broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')

可以使用同一个 Redis 服务同时作为中间件和结果存储。
flaskr 是包含 Celery 客户端的 Python 程序的名称,如果使用 Flask 框架开发,那么应当是和初始化 Flask 对象时使用的名称一致。

上文有提到 backend 是可选的,如果不传该参数,那么就无法通过 Celery 查询到任务的执行结果。

config_from_object

Celery 类还提供了一个 config_from_object 方法,支持通过 “Python 配置文件” 进行配置。使用上代码类似下面这样:

from celery import Celery

celery = Celery('flaskr')
celery.config_from_object('flaskr.tools.celery_config')

celery_config 是文件名。flaskr.tools.celery_config 是这个文件的 “路径”,或者说当我们想要在代码中 import 该 python 文件时使用的路径。

celery_config.py 文件中的内容为:

from os import environ

CELERY_REDIS_URL = "redis://" + \
                ":" + \
                environ.get("REDIS_PASSWD") + \
                "@" + \
                environ.get("REDIS_HOST") + \
                ":" + \
                environ.get("REDIS_PORT") + \
                "/" + \
                environ.get("REDIS_DATABASE")

BROKER_URL = CELERY_REDIS_URL
CELERY_RESULT_BACKEND = CELERY_REDIS_URL

在该文件中,我们可以从环境变量中读取配置,或者直接写死了我们所需要的配置字段。

注意

Celery 对象的初始化以及配置必须要放在顶层,下面的代码是不行的:

from celery import Celery

celery = Celery('flaskr')

def init_app(app):
    celery.config_from_object('flaskr.tools.celery_config')

因为后续 Worker 将会根据配置的路径,寻找 Celery 对象,从该对象中获取相应的配置。

该过程可以理解为:“不是获取一个已经初始化好了的对象,而是根据配置的路径,找到对应的文件/模块,重新初始化 Celery 对象”。

该过程只会执行顶层属性、方法,也就是说它不会调用你的 init_app 方法,所以这么写之后 Worker 侧的 Celery 对象属于只是被初始化但是没有被配置过的状态,所有的配置项都会是默认值。

Worker

Worker 服务是独立于 Celery 客户端程序的,需要单独另起一个服务。

在终端执行下面的命令启动 Worker 服务:

celery -A flaskr.libs.system worker -l info

其中 -A 参数后面接着的 flaskr.libs.systemCelery 对象所在的模块/文件路径。和 config_from_object 一样,这里也是 “当我们想要在代码中 import 该 python 文件时使用的路径”。要确保该路径下包含着 Celery 对象。

-l 参数是日志等级,示例中指定到 info 级别。

Celery 的使用

下面简单的介绍一下 Celery 框架的使用方法。

上文中有提到,Worker 会在运行时读取 Celery 客户端中的 Task 任务并执行。

定义任务

下面是一个简单的 Task 的定义:

# in tasks.py
from flaskr.libs.system import celery

@celery.task
def add(x, y):
    return x + y

你可以将其定义为顶层/全局方法,也可以将其包裹在类中。

如果包裹在类中,会涉及到 self 参数问题,请参考后面的小节:任务中的 self

上文我们在 flaskr.libs.system 模块下定义了 celery 对象,然后就可以在 tasks.py 文件中使用 @celery.task 装饰器,使一个方法成为 Celery 任务。

注意:如果你的 Celery 对象不叫 celery 而是其他名字,例如 app。那么你的装饰器应该是 @app.task —— taskCelery 对象的方法。

Task 任务内,return 结果代表着 “任务执行成功”,通过 raise 抛出异常意味着 “任务执行失败。”

执行任务

对任务方法调用 delay 方法以传递参数并执行任务:

# 使用参数 4,5 调用任务 add
result = add.delay(4, 5)

# 对执行结果调用 get() 方法以获取任务执行结果
print(result.get())

get() 方法获取的是任务执行成功之后的结果,即 return 的结果。

如果要获取 raise 抛出的异常,我们可以使用 result.result 方法获取异常。

判断状态

在获取任务的执行结果之前,我建议您先判断当前任务的执行状态。

Celery 框架提供了 AsyncResult 类来查询任务执行状态:

def exec_task(self):
    """执行任务,并返回任务 ID"""
    result = add.delay(4, 5)
    task_id = result.task_id
    return task_id

def check_status(self, task_id):
    """根据任务 ID 查询任务状态"""
    from flaskr.libs.system import celery
    result = celery.AsyncResult(task_id)

有一些文章会执行使用 AsyncResult 类,而不是通过 celery 对象调用 AsyncResult
但是在我实际操作过程中发现,不通过 celery 对象调用的话会找不到 Backend,怀疑也是和配置有关。

result 对象有一个 task_id 属性,使用该属性值作为参数初始化 AsyncResult 对象,就可以查询该任务的状态:

  • result.state:获取当前任务的状态(字符串)
  • result.ready():判断任务是否已经结束(成功 or 失败)
  • result.successful():判读任务是否成功
  • result.get():和上文提及的一样,获取执行成功的任务的结果(return
  • result.result:和上文提及的一样,获取执行失败的任务抛出的异常(raise

一种有可能的处理流程如下:

def check_status(self, task_id):
    # 查询任务状态
    from flaskr.libs.system import celery
    result = celery.AsyncResult(task_id)

    logging.info(f"查询任务结果,任务ID({task_id})对应的状态为:{result.state}")

    # 任务仍在执行中
    if not result.ready():
        return { "code": "105", "message": "任务仍在执行中" }
    
    # 任务失败
    if not result.successful():
        error = result.result
        return { "code": "9999", "message": f"{str(error)}" }

    # 获取任务执行结果
    task_result = result.get()
    return { "code": "200", "data": task_result, "message": "任务执行成功" }

任务中的 self

.task() 方法有一个 bind 参数,它的默认值是 False。该参数可以控制是否将任务绑定到实例对象上。这关乎到 self 参数的具体值。

定义在类中的任务

如果你的任务定义是包裹在类中,像是这样:

from flaskr.libs.system import celery

class Tasks(object):
    def __init__(self, value):
        self.value = value

    @celery.task
    def add(self, x):
        return x + self.value

那么此时需要注意,因为 bind 参数的默认值是 False,所以此时 add 任务没有被绑定到 Tasks 类实例上,所以此时 self 参数指的是任务本身,而不是 Tasks 类的实例对象。此时在 add 方法中,你将无法获取到 self.value 的值。

如果你想在 add 方法中使用 self.value,你需要将 bind 参数设置为 True,像是这样:

...

@celery.task(bind=True)
def add(self, x):
    return x + self.value

...

此时你可以顺利的获取到 self.value 的值。

定义在顶层的任务

如果任务直接定义在文件中,类似下面这样:

from flaskr.libs.system import celery

@celery.task
def add(x, y):
    return x + y

那么首先,此时 bind 参数的值是 False。这个时候您无法在 add 方法的参数列表中添加 self,也无法在方法实现内使用 self 参数,这会导致编译失败。

如果你将 bind 设置为 True,那么就可以添加 self 参数了,例如下面这样:

from flaskr.libs.system import celery

@celery.task(bind=True)
def add(self, x, y):
    return x + y

那么此时 self 代表什么呢?

上文有提到,bind 参数用于将任务绑定到类实例。

实际上,此时 Celery 会在后台自动将该任务转换为一个带有实例方法的类,使你可以在任务函数中访问实例属性和方法。在这种情况下,self 参数表示当前任务的实例对象,就像在类中定义任务一样。

重试

Celery 框架本身支持 “重试” 功能,包含自动重试和手动重试两种方法。这里简单介绍一下手动重试的用法。

retry 方法的调用

# in tasks.py
from flaskr.libs.system import celery

@celery.task(bind=True)
def add(self, x, y):
    try:
        # 某些可能会抛出异常的方法
        return __some_method_may_throw_exceptions()

    except Exception as e:
        # 出错每15秒尝试一次,总共尝试3次
        raise self.retry(exc=e, countdown=15, max_retries=3)

    finally:
        pass

该例子中,我们在文件内直接定义了一个任务,而且 bind 参数被设置为了 True。此时我们要通过 self 参数来调用 retry 方法,执行手动重试。

那如果 bind 是 False 的情况呢?此时我们可以使用 add.retry(...) 来触发手动重试。

如果任务定义在类中:

  • 如果 bindFalse,则可以直接使用 self.retry(...) 触发手动重试。
  • 如果 bindTrue,则需要使用 add.retry(...) 触发手动重试。

总结一下不同的场景:

  • 任务定义在类中:
    • 如果 bindFalse,则可以直接使用 self.retry(...) 触发手动重试。
    • 如果 bindTrue,则需要使用 add.retry(...) 触发手动重试。
  • 任务定义在文件中:
    • 如果 bindFalse,则可以直接使用 add.retry(...) 触发手动重试。
    • 如果 bindTrue,则需要使用 self.retry(...) 触发手动重试。

可以说定义在类中和定义在文件中,对于相同的 bind 参数,调用方式是相反的。

retry 方法的参数

说完了调用,我们来说一下参数。

countdown 定义重试的间隔,max_retries 定义最多重试几次。

exc 参数是一个可选参数,它代表着 “当前任务失败时产生的异常”。通过将 e 参数传递到 retry 方法中,可以让重试失败后抛出最开始的异常。您也可以自定义该异常参数。

在使用 retry 方法时,如果没有指定 exc 参数,Celery 框架将默认将任务的异常信息传递给 retry 方法。如果指定了 exc 参数,则会将指定的异常对象传递给 retry 方法。

retry 方法的结果

在上面的例子中,我们使用了 raiseretry 失败后的异常抛出,意味着任务失败。

同时我们还可以不使用该参数,在重试失败之后手动处理异常,例如:

# in tasks.py
from flaskr.libs.system import celery

@celery.task(bind=True)
def add(self, x, y):
    try:
        return __some_method_may_throw_exceptions()

    except Exception as e:
        max_retries = 3

        # 超过最大次数,返回错误信息
        if self.request.retries >= max_retries:
            return __create_error_response()    

        self.retry(exc=e, countdown=15, max_retries=max_retries)

    finally:
        pass

在这个例子中,如果超过了最大重试次数,因为我们没有将重试的异常抛给外界,所以在最后一次重试失败之后会再次触发 except,此时我们可以判断是否超过了最大重试次数,意味着重试失败。

当重试失败后,我们可以根据需求进行失败处理。