Github - agronholm/apscheduler
Python中定时任务的解决方案,大概可以分四种:
[1] - crontab, 不适合多台服务器的配置
[2] - scheduler, 太过于简单
[3] - Celery, 依赖的软件比较多,比较耗资源
[4] - APScheduler
其安装如:
pip install apscheduler
1. APScheduler 基本概念
https://github.com/agronholm/apscheduler/blob/master/docs/userguide.rst
[1] - 触发器(triggers):
触发器包含调度逻辑,描述一个任务何时被触发,按日期或按时间间隔或按 cronjob 表达式三种方式触发。每个作业都有它自己的触发器,除了初始配置之外,触发器是完全无状态的。
[2] - 作业存储器(job stores):
作业存储器指定了作业被存放的位置,默认情况下作业保存在内存,也可将作业保存在各种数据库中,当作业被存放在数据库中时,它会被序列化,当被重新加载时会反序列化。作业存储器充当保存、加载、更新和查找作业的中间商。在调度器之间不能共享作业存储。
[3] - 执行器(executors):
执行器是将指定的作业(调用函数)提交到线程池或进程池中运行,当任务完成时,执行器通知调度器触发相应的事件。
[4] - 调度器(schedulers):
任务调度器,属于控制角色,通过它配置作业存储器、执行器和触发器,添加、修改和删除任务。调度器协调触发器、作业存储器、执行器的运行,通常只有一个调度程序运行在应用程序中,开发人员通常不需要直接处理作业存储器、执行器或触发器,配置作业存储器和执行器是通过调度器来完成的。
2. 示例 - 间隔任务
#!--*-- coding: utf-8 --*--
import time
from apscheduler.schedulers.blocking import BlockingScheduler
def time_task():
print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
#
sched = BlockingScheduler()
#interval固定间隔模式,每隔3秒只执行一次
sched.add_job(time_task, 'interval', seconds=3)
sched.start()
3. 示例 - 定时任务
import time
from apscheduler.schedulers.blocking import BlockingScheduler
def cron_task():
print(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
#do_sth()
#
sched = BlockingScheduler()
#每天早上八点半和十二点半各执行一次任务
sched.add_job(cron_task, 'cron', hour='8, 12', minute='30')
sched.start()
4. 示例 - 间隔任务进程池
https://github.com/agronholm/apscheduler/blob/master/examples/executors/processpool.py
import os
from datetime import datetime
from apscheduler.schedulers.blocking import BlockingScheduler
def tick():
print('Tick! The time is: %s' % datetime.now())
if __name__ == '__main__':
scheduler = BlockingScheduler()
scheduler.add_executor('processpool')
scheduler.add_job(tick, 'interval', seconds=3)
print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))
try:
scheduler.initialize()
except (KeyboardInterrupt, SystemExit):
pass