Django利用celery执行周期任务添加互斥锁

行云流水
2023-07-14 / 0 评论 / 350 阅读 / 正在检测是否收录...

前言

写了一个每三秒执行一次,有需要处理的数据则处理,没有则退出的函数。用celery周期性任务执行。遇到了一个问题就是,有数据处理时。3秒后,下一个进程会关掉此进程,加了一个锁。保证同一时间只有一个进程执行,其他进程直接退出。

实现代码

from celery import shared_task, current_task
@shared_task
def sync_start_process(sname):
    """
    异步执行任务
    """
    lock_name = f"lock_sync_start_process_{sname}"

    # 尝试获取锁定
    with current_task.app.Lock(lock_name, timeout=0):
        # 如果成功获取到锁定,则说明之前没有相同任务在执行,可以继续执行任务逻辑
        # 执行任务逻辑...
        process_start(sname)

    # 如果无法获取到锁定(即有相同任务正在执行),则会抛出 Reject 异常,任务会被自动丢弃
    raise Reject("Task is already in progress")

遇到问题

一开始调试成功了,后来遇到启动报错。死活找不到原因。

#操作docker
import redis
from celery import shared_task, current_task
from celery.exceptions import Reject

from .utils import process_start

#日志
from loguru import logger

@shared_task
def sync_start_process(sname):
    """
    异步执行任务
    """
    lock_name = f"lock_sync_start_process_{sname}"

    #创建redis链接
    r = redis.Redis(host='redis', port=6379, db=3, password='redis@2022')

    # 尝试获取锁定
    lock_acquired = r.set(lock_name, 'locked', nx=True, ex=1)

    if lock_acquired:
        try:
            # 如果成功获取到锁定,则说明之前没有相同任务在执行,可以继续执行任务逻辑
            # 执行任务逻辑...
            process_start(sname)
        finally:
            #释放锁
            r.delete(lock_name)
    else:
        # 如果无法获取到锁定(即有相同任务正在执行),则会抛出 Reject 异常,任务会被自动丢弃
        raise Reject("Task is already in progress")

评论 (0)

取消
只有登录/注册用户才可评论