Django的celery通过配置添加周期性任务

行云流水
2023-08-25 / 0 评论 / 136 阅读 / 正在检测是否收录...

前言

以前都是通过函数,动态添加周期性任务。新的项目比较简单。直接在项目启动时加载周期性任务,加载后也不变动。

代码

编辑etcman/celery.py,新增代码
#周期执行
app.conf.beat_schedule = {
    #'project-autodiscover-host-task-every-60-minutes': {
    #    'task': 'project.tasks.project_autodiscover_host_task',
    #    'schedule': timedelta(minutes=60),
    #},
    'project-autodiscover-host-task-every-2-hours': {
        'task': 'project.tasks.project_autodiscover_host_task',
        'schedule': timedelta(hours=2),
    },
    'monitor-autodiscover-open-ports-task-every-6-hours': {
        'task': 'monitor.tasks.monitor_autodiscover_open_ports_task',
        'schedule': timedelta(hours=6),
    },
}

函数

在模块下的tasks.py文件内定义,如
from etcman.celery import app
from .models import *
from .utils import *

from loguru import logger

@app.task
def project_autodiscover_host_task():
    """
    自动发现在线IP
    """
    #记录日志
    logger.info('自动发现在线主机任务开始')

    projects = Project.objects.all()
    for project in projects:
        ipranges = IPRange.objects.filter(project=project)
        for iprange in ipranges:
            if iprange.ip_type == 'ipv4':
                check_ip_alive_ip(iprange.ip_range, project)

启动

celery -A etcman worker -l info -B

评论 (0)

取消
只有登录/注册用户才可评论