在 FastAPI 中,你可以使用后台任务(Background Tasks)来异步执行一些任务,而不会阻塞主请求的处理。这对于需要执行一些耗时的操作,但不需要等待其完成的情况非常有用。

以下是一个使用后台任务的 FastAPI 示例:
from fastapi import FastAPI, BackgroundTasks

app = FastAPI()

def write_log(message: str):
    with open("logs.txt", mode="a") as log_file:
        log_file.write(message)

@app.post("/send-notification/{email}")
async def send_notification(
    email: str, background_tasks: BackgroundTasks
):
    message = f"message to {email}\n"
    background_tasks.add_task(write_log, message)
    return {"message": "Message sent in the background"}

在这个例子中:

1. 定义了一个 write_log 函数,它将消息写入一个日志文件(logs.txt)。

2. 创建一个 /send-notification/{email} 路由,当该路由被请求时,会异步执行后台任务。

3. 在路径操作函数中,使用 background_tasks.add_task 来添加后台任务。在这个例子中,添加了一个写入日志文件的后台任务。

当客户端访问 /send-notification/{email} 路由时,路径操作函数会立即返回响应,而后台任务会在后台异步执行,不会阻塞主请求的处理。

请注意,后台任务是在应用程序的进程内执行的,因此如果主应用程序关闭,后台任务可能会被终止。如果需要更高级的异步任务处理,可以考虑使用 Celery 等异步任务队列。


转载请注明出处:http://www.pingtaimeng.com/article/detail/7398/FastAPI