日韩黑丝制服一区视频播放|日韩欧美人妻丝袜视频在线观看|九九影院一级蜜桃|亚洲中文在线导航|青草草视频在线观看|婷婷五月色伊人网站|日本一区二区在线|国产AV一二三四区毛片|正在播放久草视频|亚洲色图精品一区

分享

Python學(xué)習(xí)教程:定時(shí)庫(kù)APScheduler的原理及用法

 千鋒Python學(xué)堂 2019-12-12

Python學(xué)習(xí)教程之定時(shí)庫(kù)APScheduler的原理及用法:

APScheduler簡(jiǎn)介

APscheduler全稱Advanced Python Scheduler

作用為在指定的時(shí)間規(guī)則執(zhí)行指定的作業(yè)。

  • 指定時(shí)間規(guī)則的方式可以是間隔多久執(zhí)行,可以是指定日期時(shí)間的執(zhí)行,也可以類似Linux系統(tǒng)中Crontab中的方式執(zhí)行任務(wù)。

  • 指定的任務(wù)就是一個(gè)Python函數(shù)。

APScheduler組件

APScheduler版本 3.6.3

APScheduler中幾個(gè)重要的概念

Job 作業(yè)

作用

Job作為APScheduler最小執(zhí)行單位。
創(chuàng)建Job時(shí)指定執(zhí)行的函數(shù),函數(shù)中所需參數(shù),Job執(zhí)行時(shí)的一些設(shè)置信息。

構(gòu)建說(shuō)明

id:指定作業(yè)的唯一ID

name:指定作業(yè)的名字

trigger:apscheduler定義的觸發(fā)器,用于確定Job的執(zhí)行時(shí)間,根據(jù)設(shè)置的trigger規(guī)則,計(jì)算得到下次執(zhí)行此job的
時(shí)間, 滿足時(shí)將會(huì)執(zhí)行

executor:apscheduler定義的執(zhí)行器,job創(chuàng)建時(shí)設(shè)置執(zhí)行器的名字,根據(jù)字符串你名字到scheduler獲取到執(zhí)行此
job的 執(zhí)行器,執(zhí)行job指定的函數(shù)

max_instances:執(zhí)行此job的最大實(shí)例數(shù),executor執(zhí)行job時(shí),根據(jù)job的id來(lái)計(jì)算執(zhí)行次數(shù),根據(jù)設(shè)置的最大實(shí)例數(shù)
來(lái)確定是否可執(zhí)行

next_run_time:Job下次的執(zhí)行時(shí)間,創(chuàng)建Job時(shí)可以指定一個(gè)時(shí)間[datetime],不指定的話則默認(rèn)根據(jù)trigger獲取觸
發(fā)時(shí)間

misfire_grace_time:Job的延遲執(zhí)行時(shí)間,例如Job的計(jì)劃執(zhí)行時(shí)間是21:00:00,但因服務(wù)重啟或其他原因?qū)е?br>21:00:31才執(zhí)行,如果設(shè)置此key為40,則該job會(huì)繼續(xù)執(zhí)行,否則將會(huì)丟棄此job

coalesce:Job是否合并執(zhí)行,是一個(gè)bool值。例如scheduler停止20s后重啟啟動(dòng),而job的觸發(fā)器設(shè)置為5s執(zhí)行
一次,因此此job錯(cuò)過(guò)了4個(gè)執(zhí)行時(shí)間,如果設(shè)置為是,則會(huì)合并到一次執(zhí)行,否則會(huì)逐個(gè)執(zhí)行

func:Job執(zhí)行的函數(shù)

args:Job執(zhí)行函數(shù)需要的位置參數(shù)

kwargs:Job執(zhí)行函數(shù)需要的關(guān)鍵字參數(shù)

Trigger 觸發(fā)器

Trigger綁定到Job,在scheduler調(diào)度篩選Job時(shí),根據(jù)觸發(fā)器的規(guī)則計(jì)算出Job的觸發(fā)時(shí)間,然后與當(dāng)前時(shí)間比較
確定此Job是否會(huì)被執(zhí)行,總之就是根據(jù)trigger規(guī)則計(jì)算出下一個(gè)執(zhí)行時(shí)間。

Trigger有多種種類,指定時(shí)間的DateTrigger,指定間隔時(shí)間的IntervalTrigger,像Linux的crontab
一樣的CronTrigger

目前APScheduler支持觸發(fā)器:

DateTrigger
IntervalTrigger
CronTrigger

Executor 執(zhí)行器

Executor在scheduler中初始化,另外也可通過(guò)scheduler的add_executor動(dòng)態(tài)添加Executor。 
每個(gè)executor都會(huì)綁定一個(gè)alias,這個(gè)作為唯一標(biāo)識(shí)綁定到Job,在實(shí)際執(zhí)行時(shí)會(huì)根據(jù)Job綁定的executor
找到實(shí)際的執(zhí)行器對(duì)象,然后根據(jù)執(zhí)行器對(duì)象執(zhí)行Job

Executor的種類會(huì)根據(jù)不同的調(diào)度來(lái)選擇,如果選擇AsyncIO作為調(diào)度的庫(kù),那么選擇AsyncIOExecutor,如果
選擇tornado作為調(diào)度的庫(kù),選擇TornadoExecutor,如果選擇啟動(dòng)進(jìn)程作為調(diào)度,
選擇ThreadPoolExecutor或者ProcessPoolExecutor都可以

Executor的選擇需要根據(jù)實(shí)際的scheduler來(lái)選擇不同的執(zhí)行器

目前APScheduler支持的Executor:

AsyncIOExecutor
GeventExecutor
ThreadPoolExecutor
ProcessPoolExecutor
TornadoExecutor
TwistedExecutor

Jobstore 作業(yè)存儲(chǔ)

Jobstore在scheduler中初始化,另外也可通過(guò)scheduler的add_jobstore動(dòng)態(tài)添加Jobstore。每個(gè)jobstore都會(huì)
綁定一個(gè)alias,scheduler在Add Job時(shí),根據(jù)指定的jobstore在scheduler中找到相應(yīng)的jobstore,
并將job添加到j(luò)obstore中。

Jobstore主要是通過(guò)pickle庫(kù)的loads和dumps【實(shí)現(xiàn)核心是通過(guò)python的__getstate__和__setstate__重寫(xiě)實(shí)現(xiàn)】,
每次變更時(shí)將Job動(dòng)態(tài)保存到存儲(chǔ)中,使用時(shí)再動(dòng)態(tài)的加載出來(lái),作為存儲(chǔ)的可以是redis,也可以是數(shù)據(jù)庫(kù)【通過(guò)
sqlarchemy這個(gè)庫(kù)集成多種數(shù)據(jù)庫(kù)】,也可以是mongodb等

目前APScheduler支持的Jobstore:

MemoryJobStore
MongoDBJobStore
RedisJobStore
RethinkDBJobStore
SQLAlchemyJobStore
ZooKeeperJobStore

Event 事件

Event是APScheduler在進(jìn)行某些操作時(shí)觸發(fā)相應(yīng)的事件,用戶可以自定義一些函數(shù)來(lái)監(jiān)聽(tīng)這些事件,
當(dāng)觸發(fā)某些Event時(shí),做一些具體的操作

常見(jiàn)的比如。Job執(zhí)行異常事件 EVENT_JOB_ERROR。Job執(zhí)行時(shí)間錯(cuò)過(guò)事件 EVENT_JOB_MISSED。

目前APScheduler定義的Event

EVENT_SCHEDULER_STARTED
EVENT_SCHEDULER_START
EVENT_SCHEDULER_SHUTDOWN
EVENT_SCHEDULER_PAUSED
EVENT_SCHEDULER_RESUMED
EVENT_EXECUTOR_ADDED
EVENT_EXECUTOR_REMOVED
EVENT_JOBSTORE_ADDED
EVENT_JOBSTORE_REMOVED
EVENT_ALL_JOBS_REMOVED
EVENT_JOB_ADDED
EVENT_JOB_REMOVED
EVENT_JOB_MODIFIED
EVENT_JOB_EXECUTED
EVENT_JOB_ERROR
EVENT_JOB_MISSED
EVENT_JOB_SUBMITTED
EVENT_JOB_MAX_INSTANCES

Listener 監(jiān)聽(tīng)事件

Listener表示用戶自定義監(jiān)聽(tīng)的一些Event,當(dāng)Job觸發(fā)了EVENT_JOB_MISSED事件時(shí)可以根據(jù)需求做一些其他處理。

Scheduler 調(diào)度器

Scheduler是APScheduler的核心,所有相關(guān)組件通過(guò)其定義。scheduler啟動(dòng)之后,將開(kāi)始按照配置的任務(wù)進(jìn)行調(diào)度。
除了依據(jù)所有定義Job的trigger生成的將要調(diào)度時(shí)間喚醒調(diào)度之外。當(dāng)發(fā)生Job信息變更時(shí)也會(huì)觸發(fā)調(diào)度。

scheduler可根據(jù)自身的需求選擇不同的組件,如果是使用AsyncIO則選擇AsyncIOScheduler,使用tornado則選擇
TornadoScheduler。

目前APScheduler支持的Scheduler:

AsyncIOScheduler
BackgroundScheduler
BlockingScheduler
GeventScheduler
QtScheduler
TornadoScheduler
TwistedScheduler

這里前面一期的Python學(xué)習(xí)教程里有提到過(guò)!

Scheduler工作流程圖

這里重點(diǎn)挑選兩個(gè)重要的流程畫(huà)一個(gè)簡(jiǎn)陋的流程圖,來(lái)看一下scheduler的工作原理。其一個(gè)是添加add job,另一是scheduler每次喚醒調(diào)度時(shí)的執(zhí)行過(guò)程

Scheduler添加job流程

Python學(xué)習(xí)教程:定時(shí)庫(kù)APScheduler的原理及用法

Scheduler調(diào)度流程

Python學(xué)習(xí)教程:定時(shí)庫(kù)APScheduler的原理及用法

APScheduler使用示例

AsyncIO調(diào)度示例

import asyncio
import datetime

from apscheduler.events import EVENT_JOB_EXECUTED
from apscheduler.executors.asyncio import AsyncIOExecutor
from apscheduler.jobstores.redis import RedisJobStore # 需要安裝redis
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.triggers.cron import CronTrigger

# 定義jobstore 使用redis 存儲(chǔ)job信息
default_redis_jobstore = RedisJobStore(
db=2,
jobs_key="apschedulers.default_jobs",
run_times_key="apschedulers.default_run_times",
host="127.0.0.1",
port=6379,
password="test"
)

# 定義executor 使用asyncio是的調(diào)度執(zhí)行規(guī)則
first_executor = AsyncIOExecutor()

# 初始化scheduler時(shí),可以直接指定jobstore和executor
init_scheduler_options = {
"jobstores": {
# first 為 jobstore的名字,在創(chuàng)建Job時(shí)直接直接此名字即可
"default": default_redis_jobstore
},
"executors": {
# first 為 executor 的名字,在創(chuàng)建Job時(shí)直接直接此名字,執(zhí)行時(shí)則會(huì)使用此executor執(zhí)行
"first": first_executor
},
# 創(chuàng)建job時(shí)的默認(rèn)參數(shù)
"job_defaults": {
'coalesce': False, # 是否合并執(zhí)行
'max_instances': 1 # 最大實(shí)例數(shù)
}
}
# 創(chuàng)建scheduler
scheduler = AsyncIOScheduler(**init_scheduler_options)

# 啟動(dòng)調(diào)度
scheduler.start()

second_redis_jobstore = RedisJobStore(
db=2,
jobs_key="apschedulers.second_jobs",
run_times_key="apschedulers.second_run_times",
host="127.0.0.1",
port=6379,
password="test"
)

scheduler.add_jobstore(second_redis_jobstore, 'second')
# 定義executor 使用asyncio是的調(diào)度執(zhí)行規(guī)則
second_executor = AsyncIOExecutor()
scheduler.add_executor(second_executor, "second")


# *********** 關(guān)于 APScheduler中有關(guān)Event相關(guān)使用示例 *************
# 定義函數(shù)監(jiān)聽(tīng)事件
def job_execute(event):
"""
監(jiān)聽(tīng)事件處理
:param event:
:return:
"""
print(
"job執(zhí)行job:\ncode => {}\njob.id => {}\njobstore=>{}".format(
event.code,
event.job_id,
event.jobstore
))


# 給EVENT_JOB_EXECUTED[執(zhí)行完成job事件]添加回調(diào),這里就是每次Job執(zhí)行完成了我們就輸出一些信息
scheduler.add_listener(job_execute, EVENT_JOB_EXECUTED)


# *********** 關(guān)于 APScheduler中有關(guān)Job使用示例 *************
# 使用的是asyncio,所以job執(zhí)行的函數(shù)可以是一個(gè)協(xié)程,也可以是一個(gè)普通函數(shù),AsyncIOExecutor會(huì)根據(jù)配置的函數(shù)來(lái)進(jìn)行調(diào)度,
# 如果是協(xié)程則會(huì)直接丟入到loop中,如果是普通函數(shù)則會(huì)啟用線程處理
# 我們定義兩個(gè)函數(shù)來(lái)看看執(zhí)行的結(jié)果

def interval_func(message):
print("現(xiàn)在時(shí)間: {}".format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
print("我是普通函數(shù)")
print(message)


async def async_func(message):
print("現(xiàn)在時(shí)間: {}".format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
print("我是協(xié)程")
print(message)


# 將上述的兩個(gè)函數(shù)按照不同的方式創(chuàng)造觸發(fā)器來(lái)執(zhí)行
# *********** 關(guān)于 APScheduler中有關(guān)Trigger使用示例 *************
# 使用Trigger有兩種方式,一種是用類創(chuàng)建使用,另一個(gè)是使用字符串的方式
# 使用字符串指定別名, scheduler初始化時(shí)已將其定義的trigger加載,所以指定字符串可以直接使用


if scheduler.get_job("interval_func_test", "default"):
# 存在的話,先刪除
scheduler.remove_job("interval_func_test", "default")

# 立馬開(kāi)始 2分鐘后結(jié)束, 每10s執(zhí)行一次 存儲(chǔ)到first jobstore second執(zhí)行
scheduler.add_job(interval_func, "interval",
args=["我是10s執(zhí)行一次,存放在jobstore default, executor default"],
seconds=10,
id="interval_func_test",
jobstore="default",
executor="default",
start_date=datetime.datetime.now(),
end_date=datetime.datetime.now() + datetime.timedelta(seconds=240))

# 先創(chuàng)建tigger
trigger = IntervalTrigger(seconds=5)

if scheduler.get_job("interval_func_test_2", "second"):
# 存在的話,先刪除
scheduler.remove_job("interval_func_test_2", "second")
# 每隔5s執(zhí)行一次
scheduler.add_job(async_func, trigger, args=["我是每隔5s執(zhí)行一次,存放在jobstore second, executor = second"],
id="interval_func_test_2",
jobstore="second",
executor="second")

# 使用協(xié)程的函數(shù)執(zhí)行,且使用cron的方式配置觸發(fā)器

if scheduler.get_job("cron_func_test", "default"):
# 存在的話,先刪除
scheduler.remove_job("cron_func_test", "default")

# 立馬開(kāi)始 每10s執(zhí)行一次
scheduler.add_job(async_func, "cron",
args=["我是 每分鐘 30s 時(shí)執(zhí)行一次,存放在jobstore default, executor default"],
second='30',
id="cron_func_test",
jobstore="default",
executor="default")

# 先創(chuàng)建tigger
trigger = CronTrigger(second='20,40')

if scheduler.get_job("cron_func_test_2", "second"):
# 存在的話,先刪除
scheduler.remove_job("cron_func_test_2", "second")
# 每隔5s執(zhí)行一次
scheduler.add_job(async_func, trigger, args=["我是每分鐘 20s 40s時(shí)各執(zhí)行一次,存放在jobstore second, executor = second"],
id="cron_func_test_2",
jobstore="second",
executor="second")

# 使用創(chuàng)建trigger對(duì)象直接創(chuàng)建
print("啟動(dòng): {}".format(datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")))
asyncio.get_event_loop().run_forever()

輸出結(jié)果部分截取

啟動(dòng)之后,每隔5s運(yùn)行一次的JOB

啟動(dòng): 2019-12-05 14:13:11
【這部分是定義的協(xié)程函數(shù)輸出的內(nèi)容】
現(xiàn)在時(shí)間: 2019-12-05 14:13:16
我是協(xié)程
我是每隔5s執(zhí)行一次,存放在jobstore second, executor = second
【這部分是監(jiān)聽(tīng)job執(zhí)行完成之后的回調(diào)輸出】
job執(zhí)行job:
code => 4096
job.id => interval_func_test_2
jobstore=>second

在20s和40s時(shí)各執(zhí)行一次的Job

現(xiàn)在時(shí)間: 2019-12-05 14:13:20
我是協(xié)程
我是每分鐘 20s 40s時(shí)各執(zhí)行一次,存放在jobstore second, executor = second
job執(zhí)行job:
code => 4096
job.id => cron_func_test_2
jobstore=>second

每隔10s執(zhí)行一次的job

現(xiàn)在時(shí)間: 2019-12-05 14:13:21
我是普通函數(shù)
我是10s執(zhí)行一次,存放在jobstore default, executor default
現(xiàn)在時(shí)間: 2019-12-05 14:13:21
我是協(xié)程
我是每隔5s執(zhí)行一次,存放在jobstore second, executor = second
job執(zhí)行job:
code => 4096
job.id => interval_func_test
jobstore=>default
job執(zhí)行job:
code => 4096
job.id => interval_func_test_2
jobstore=>second

每隔5s執(zhí)行一次的Job

現(xiàn)在時(shí)間: 2019-12-05 14:13:26
我是協(xié)程
我是每隔5s執(zhí)行一次,存放在jobstore second, executor = second
job執(zhí)行job:
code => 4096
job.id => interval_func_test_2
jobstore=>second

每分鐘30s時(shí)執(zhí)行一次

現(xiàn)在時(shí)間: 2019-12-05 14:13:30
我是協(xié)程
我是 每分鐘 30s 時(shí)執(zhí)行一次,存放在jobstore default, executor default
job執(zhí)行job:
code => 4096
job.id => cron_func_test
jobstore=>default

總結(jié)

apscheduler的工作原理及用法基本這樣。

apscheduler強(qiáng)大的地方是可以集成到tornado,django,flask等框架,也可以單獨(dú)運(yùn)行。比如CronTrigger還有更強(qiáng)大的用法,可以參照官網(wǎng)的cron用法

上面例子只列舉了一些常規(guī)用法,其實(shí)還有一些更切合實(shí)際的用法,利用APSchedulder的特性,動(dòng)態(tài)的添加Job,暫停Job,刪除Job,重啟Job等。先按照功能性質(zhì)定義好不同的函數(shù),然后開(kāi)發(fā)一個(gè)web服務(wù)。在web服務(wù)中動(dòng)態(tài)操作各種Job,可以想象在監(jiān)控系統(tǒng)中根據(jù)需求添加一些任務(wù),豈不美哉。

有時(shí)間將這部分做一個(gè)例子再來(lái)分享,大家可以期待一下下次的Python學(xué)習(xí)教程,本期Python教程有不清楚的地方可以留言哈!

    本站是提供個(gè)人知識(shí)管理的網(wǎng)絡(luò)存儲(chǔ)空間,所有內(nèi)容均由用戶發(fā)布,不代表本站觀點(diǎn)。請(qǐng)注意甄別內(nèi)容中的聯(lián)系方式、誘導(dǎo)購(gòu)買等信息,謹(jǐn)防詐騙。如發(fā)現(xiàn)有害或侵權(quán)內(nèi)容,請(qǐng)點(diǎn)擊一鍵舉報(bào)。
    轉(zhuǎn)藏 分享 獻(xiàn)花(0

    0條評(píng)論

    發(fā)表

    請(qǐng)遵守用戶 評(píng)論公約

    類似文章 更多