前言
功能实现
模型设计
后台管理
# Register your models here.
class SpiderInfofAdmin(admin.ModelAdmin):
#后台展示字段
list_display = ['id', 'sname', 'filepath', 'workpath', 'image', 'addtime', 'snote']
#搜索字段
search_fields = ['sname']
class SpiderTaskAdmin(admin.ModelAdmin):
#后台展示字段
list_display = ['id', 'sname', 'snote', 'addtime', 'runtime_show', 'total', 'repeat', 'valid', 'status_colored', 'operate']
#过滤字段
list_filter = ["status"]
#搜索字段
search_fields = ['sname']
#只读字段
readonly_fields = ['id', 'addtime', 'runtime', 'total', 'repeat', 'valid', 'status']
#自定义动作
actions = ['schedule_switch']
异步任务
项目主settings.py添加内容
INSTALLED_APPS = [ # 略 'django_celery_beat', #略 ] # Celery配置 # BROKER和BACKEND配置,这里用了本地的redis,其中1和2表示分别用redis的第一个和第二个db CELERY_BROKER_URL = 'redis://172.17.0.10:6379/1' CELERY_RESULT_BACKEND = 'redis://172.17.0.10:6379/2' # CELERY 时间 CELERY_TIMEZONE = TIME_ZONE DJANGO_CELERY_BEAT_TZ_AWARE = False #指定任务接收的内容序列化类型 CELERY_ACCEPT_CONTENT = ['application/json'] #任务和任务结果序列化方式 CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' #超过时间 CELERY_TASK_RESULT_EXPIRES = 12 * 30 #是否压缩 CELERY_MESSAGE_COMPRESSION = 'zlib' #并发数默 CELERYD_CONCURRENCY = 2 #celery worker 每次去redis取任务的数量认已CPU数量定 CELERYD_PREFETCH_MULTIPLIER = 2 #每个worker最多执行3个任务就摧毁,避免内存泄漏 CELERYD_MAX_TASKS_PER_CHILD = 3 #可以防止死锁 CELERYD_FORCE_EXECV = True #celery 关闭UTC时区 CELERY_ENABLE_UTC = False #celery 并发数设置,最多可以有20个任务同时运行 CELERYD_CONCURRENCY = 20 CELERYD_MAX_TASKS_PER_CHILD = 4 #celery开启数据库调度器,数据库修改后即时生效 CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler' #解决告警 DEFAULT_AUTO_FIELD = 'django.db.models.AutoField'
2.同目录下新增celery.py
import os
from celery import Celery,platforms
from django.conf import settings
# 设置环境变量
os.environ.setdefault('DJANGO_SETTINGS_MODULE','taskmon.settings')
# 实例化
app = Celery('taskmon')
# namespace='CELERY'作用是允许你在Django配置文件中对Celery进行配置
# 但所有Celery配置项必须以CELERY开头,防止冲突
app.config_from_object('django.conf:settings', namespace='CELERY')
# 自动从Django的已注册app中发现任务
app.autodiscover_tasks()
#允许root 用户运行celery
platforms.C_FORCE_ROOT = True
# 一个测试任务
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
3.项目目录下tasks.py
#操作docker
from celery import shared_task
from .utils import process_start
@shared_task
def sync_start_process(sname):
"""
异步执行任务
"""
process_start(sname)
4.celery启动
#任务调度
celery multi start worker -A taskmon -l info --logfile=/logs/celery_worker.log
celery -A taskmon beat -l info --logfile=/logs/celery_beat.log
添加周期任务
删除周期任务
def remove_celery_task(sid):
"""
删除计划任务
sid : 爬虫任务ID
"""
cname = str(sid) + '-' + '周期任务'
#添加计划任务
with transaction.atomic():
save_id = transaction.savepoint()
try:
_p = PeriodicTask.objects.get(name=cname)
if _p:
_p.delete()
print('{}删除计划任务成功'.format(cname))
return True
except Exception as e:
transaction.savepoint_rollback(save_id)
print('{}删除计划任务失败,错误原因:'.format(cname) + str(e))
return False
任务启动函数
def process_start(sname):
"""
执行任务并处理返回结果
sname: 任务名
cinfo: 启动容器所需的信息
"""
con_name = 'spider_{}_1'.format(sname)
containers = get_containers({"name":con_name})
if containers:
print('有相同任务运行中...|{}|{}'.format(con_name, datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")))
return False
#查询库
spider_task = SpiderTask.objects.get(sname=sname)
#构建docker启动信息
cinfo = {
"name": con_name,
"command": spider_task.command,
#宿主机目录
"volumes": ['/opt/project/taskmon/myapp/spider/{}:{}'.format(spider_task.sinfo.filepath, spider_task.sinfo.workpath),],
"shm_size": spider_task.sinfo.shm,
"image": spider_task.sinfo.image,
"working_dir": spider_task.sinfo.workpath,
"remove": False,
}
#启动容器
result = run_container(cinfo)
if result:
#日志文件
log_path='/logs/{}_{}.log'.format(con_name, datetime.datetime.now().strftime("%H-%M-%S-%Y-%m-%d"))
#保存日志
with open(log_path, 'wb') as fw:
fw.write(result)
#采集结果
d_nums = process_result(result)
#更新
spider_task.total = d_nums[0]
spider_task.repeat = d_nums[1]
spider_task.valid = d_nums[2]
spider_task.logpath = log_path
spider_task.save()
print('任务执行...|{}|{}'.format(con_name, datetime.datetime.now().strftime("%Y-%m-%d-%H-%M-%S")))
容器启动函数
def run_container(ddict):
"""
运行容器
"""
#print(ddict)
container = client.containers.run(
image=ddict['image'],
name=ddict['name'],
shm_size=ddict['shm_size'],
volumes=ddict['volumes'],
working_dir=ddict['working_dir'],
remove=ddict['remove'],
detach=True,
command=ddict['command']
)
container.wait()
result = container.logs()
container.remove()
return result
结果处理函数
def process_result(result):
"""
处理返回结果
"""
a = 0
b = 0
c = 0
lines = str(result, encoding = "utf-8").split('\n')
for line in lines:
if '待入库数据' in line:
tmp_s = line.split('|')[3]
nums = tmp_s.split(' ')
a += int(nums[2])
b += int(nums[5])
c += int(nums[7])
return (a, b, c)
评论 (0)