python實(shí)現(xiàn)定時(shí)器的5種方法
1. 使用time,threading 模塊
第一種:
代碼實(shí)現(xiàn):
# -*- coding:utf-8 -*-
import threading
import time
cancel_tmr = False
def start():
#具體任務(wù)執(zhí)行內(nèi)容
print("hello world")
def heart_beat():
# 打印當(dāng)前時(shí)間
print(time.strftime('%Y-%m-%d %H:%M:%S'))
if not cancel_tmr:
start()
# 每隔3秒執(zhí)行一次
threading.Timer(3, heart_beat).start()
if __name__ == '__main__':
heart_beat()
# 15秒后停止定時(shí)器
time.sleep(15)
cancel_tmr = True
第二種:
# -*- coding:utf-8 -*-
import threading
import time
exec_count = 0
def start():
print("hello world", exec_count)
def heart_beat():
print(time.strftime('%Y-%m-%d %H:%M:%S'))
global exec_count
exec_count += 1
# 執(zhí)行15次后停止定時(shí)器
if exec_count < 15:
start()
threading.Timer(5, heart_beat).start()
if __name__ == '__main__':
heart_beat()
2. 使用datetime,threading 模塊
要求:每天凌晨3點(diǎn)執(zhí)行func方法。
代碼實(shí)現(xiàn):
# -*- coding:utf-8 -*-
import datetime
import threading
def func():
print("haha")
# 如果需要循環(huán)調(diào)用,就要添加以下方法
timer = threading.Timer(86400, func)
timer.start()
# 獲取現(xiàn)在時(shí)間
now_time = datetime.datetime.now()
# 獲取明天時(shí)間
next_time = now_time + datetime.timedelta(days=+1)
next_year = next_time.date().year
next_month = next_time.date().month
next_day = next_time.date().day
# print(next_time, next_year, next_month, next_day)
# 獲取明天3點(diǎn)時(shí)間
next_time = datetime.datetime.strptime(str(next_year) + "-" + str(next_month) + "-" + str(next_day) + " 03:00:00", "%Y-%m-%d %H:%M:%S")
# print(next_time)
# # 獲取昨天時(shí)間
# last_time = now_time + datetime.timedelta(days=-1)
# 獲取距離明天3點(diǎn)時(shí)間,單位為秒
timer_start_time = (next_time - now_time).total_seconds()
print(timer_start_time)
# 54186.75975
# 定時(shí)器,參數(shù)為(多少時(shí)間后執(zhí)行,單位為秒,執(zhí)行的方法)
timer = threading.Timer(timer_start_time, func)
timer.start()
3. 使用time,schedule 模塊 (執(zhí)行單個(gè)任務(wù))
schedule:一個(gè)輕量級(jí)的定時(shí)任務(wù)調(diào)度的庫(kù)。
代碼實(shí)現(xiàn):
# !/usr/bin/env python
# -*- coding:utf-8 -*-
import schedule
import time
def job(text=""):
print(text, "I'm working...")
schedule.every().seconds.do(job, "每秒一次")
schedule.every(5).seconds.do(job, "五秒一次")
schedule.every(10).minutes.do(job, "10分鐘一次")
schedule.every().hour.do(job, "1小時(shí)一次")
# 每天10:30執(zhí)行
schedule.every().day.at("10:30").do(job)
# 每隔5到10天執(zhí)行一次任務(wù)
schedule.every(5).to(10).days.do(job)
# 每周一的這個(gè)時(shí)候執(zhí)行一次任務(wù)
schedule.every().monday.do(job)
# 每周三13:15執(zhí)行一次任務(wù)
schedule.every().wednesday.at("13:15").do(job)
while True:
# run_pending:運(yùn)行所有可以運(yùn)行的任務(wù)
schedule.run_pending()
time.sleep(1)
4. 使用time,schedule 模塊 (執(zhí)行多個(gè)任務(wù))
時(shí)間有沖突的執(zhí)行多個(gè)任務(wù)
代碼實(shí)現(xiàn):
# -*- coding:utf-8 -*-
import schedule
import time
def job():
print("I'm working... in job1 start")
time.sleep(5)
print("I'm working... in job1 end")
def job2():
print("I'm working... in job2")
schedule.every(3).seconds.do(job)
schedule.every(3).seconds.do(job2)
while True:
schedule.run_pending()
time.sleep(1)
運(yùn)行結(jié)果:

根據(jù)代碼運(yùn)行結(jié)果可以看出,schedule方法是串行的,代碼中有時(shí)間沖突,所以執(zhí)行完第一個(gè)任務(wù),才會(huì)執(zhí)行第二個(gè)任務(wù),即任務(wù)二每隔3秒執(zhí)行一次,而任務(wù)一執(zhí)行時(shí)間是5秒。
如果不用多線程,則定時(shí)任務(wù)會(huì)不準(zhǔn)確,因?yàn)槿蝿?wù)會(huì)按照順序執(zhí)行,如果上一個(gè)任務(wù)比較耗時(shí),則下一個(gè)任務(wù)就會(huì)"延誤"
多線程并發(fā)運(yùn)行多個(gè)任務(wù)
代碼實(shí)現(xiàn):
# -*- coding:utf-8 -*-
import datetime
import schedule
import threading
import time
def job1():
print("I'm working for job1 start", datetime.datetime.now())
time.sleep(5)
print("job1: end", datetime.datetime.now())
def job2():
print("I'm working for job2 start", datetime.datetime.now())
time.sleep(3)
print("job2: end", datetime.datetime.now())
def job1_task():
threading.Thread(target=job1).start()
def job2_task():
threading.Thread(target=job2).start()
def run():
schedule.every(3).seconds.do(job1_task)
schedule.every(3).seconds.do(job2_task)
while True:
schedule.run_pending()
time.sleep(1)
if __name__ == '__main__':
run()
代碼運(yùn)行結(jié)果:

5. 使用apscheduler 模塊
apscheduler 模塊詳情介紹
代碼實(shí)現(xiàn)
# -*- coding:utf-8 -*-
import time
import datetime
from apscheduler.schedulers.blocking import BlockingScheduler
def my_job(text="默認(rèn)值"):
print(text, time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())))
sched = BlockingScheduler()
sched.add_job(my_job, 'interval', seconds=3, args=['3秒定時(shí)'])
# 2018-3-17 00:00:00 執(zhí)行一次,args傳遞一個(gè)text參數(shù)
sched.add_job(my_job, 'date', run_date=datetime.date(2019, 10, 17), args=['根據(jù)年月日定時(shí)執(zhí)行'])
# 2018-3-17 13:46:00 執(zhí)行一次,args傳遞一個(gè)text參數(shù)
sched.add_job(my_job, 'date', run_date=datetime.datetime(2019, 10, 17, 14, 10, 0), args=['根據(jù)年月日時(shí)分秒定時(shí)執(zhí)行'])
# sched.start()
"""
interval 間隔調(diào)度,參數(shù)如下:
weeks (int) – 間隔幾周
days (int) – 間隔幾天
hours (int) – 間隔幾小時(shí)
minutes (int) – 間隔幾分鐘
seconds (int) – 間隔多少秒
start_date (datetime|str) – 開(kāi)始日期
end_date (datetime|str) – 結(jié)束日期
timezone (datetime.tzinfo|str) – 時(shí)區(qū)
"""
"""
cron參數(shù)如下:
year (int|str) – 年,4位數(shù)字
month (int|str) – 月 (范圍1-12)
day (int|str) – 日 (范圍1-31)
week (int|str) – 周 (范圍1-53)
day_of_week (int|str) – 周內(nèi)第幾天或者星期幾 (范圍0-6 或者 mon,tue,wed,thu,fri,sat,sun)
hour (int|str) – 時(shí) (范圍0-23)
minute (int|str) – 分 (范圍0-59)
second (int|str) – 秒 (范圍0-59)
start_date (datetime|str) – 最早開(kāi)始日期(包含)
end_date (datetime|str) – 最晚結(jié)束時(shí)間(包含)
timezone (datetime.tzinfo|str) – 指定時(shí)區(qū)
"""
# my_job將會(huì)在6,7,8,11,12月的第3個(gè)周五的1,2,3點(diǎn)運(yùn)行
sched.add_job(my_job, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')
# 截止到2018-12-30 00:00:00,每周一到周五早上五點(diǎn)半運(yùn)行job_function
sched.add_job(my_job, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2018-12-31')
# 表示2017年3月22日17時(shí)19分07秒執(zhí)行該程序
sched.add_job(my_job, 'cron', year=2017, month=3, day=22, hour=17, minute=19, second=7)
# 表示任務(wù)在6,7,8,11,12月份的第三個(gè)星期五的00:00,01:00,02:00,03:00 執(zhí)行該程序
sched.add_job(my_job, 'cron', month='6-8,11-12', day='3rd fri', hour='0-3')
# 表示從星期一到星期五5:30(AM)直到2014-05-30 00:00:00
sched.add_job(my_job, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2014-05-30')
# 表示每5秒執(zhí)行該程序一次,相當(dāng)于interval 間隔調(diào)度中seconds = 5
sched.add_job(my_job, 'cron', second='*/5', args=['5秒定時(shí)'])
sched.start()
到此這篇關(guān)于python實(shí)現(xiàn)定時(shí)器的5種方法的文章就介紹到這了,更多相關(guān)python 定時(shí)器內(nèi)容請(qǐng)搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
相關(guān)文章
Python Numpy中ndarray的常見(jiàn)操作
這篇文章主要介紹了Python Numpy中ndarray的常見(jiàn)操作,NumPy是Python的一種開(kāi)源的數(shù)值計(jì)算擴(kuò)展,更多詳細(xì)內(nèi)容需要的朋友可以參考一下2022-07-07
Python 拷貝對(duì)象(深拷貝deepcopy與淺拷貝copy)
Python中的對(duì)象之間賦值時(shí)是按引用傳遞的,如果需要拷貝對(duì)象,需要使用標(biāo)準(zhǔn)庫(kù)中的copy模塊。2008-09-09
python redis 批量設(shè)置過(guò)期key過(guò)程解析
這篇文章主要介紹了python redis 批量設(shè)置過(guò)期key過(guò)程解析,文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友可以參考下2019-11-11
Python簡(jiǎn)單實(shí)現(xiàn)安全開(kāi)關(guān)文件的兩種方式
這篇文章主要介紹了Python簡(jiǎn)單實(shí)現(xiàn)安全開(kāi)關(guān)文件的兩種方式,涉及Python的try語(yǔ)句針對(duì)錯(cuò)誤的判定與捕捉相關(guān)技巧,需要的朋友可以參考下2016-09-09
Python小白必備的8個(gè)最常用的內(nèi)置函數(shù)(推薦)
這篇文章主要介紹了Python常用的內(nèi)置函數(shù),文中通過(guò)示例代碼介紹的非常詳細(xì),對(duì)大家的學(xué)習(xí)或者工作具有一定的參考學(xué)習(xí)價(jià)值,需要的朋友們下面隨著小編來(lái)一起學(xué)習(xí)學(xué)習(xí)吧2019-04-04
Python實(shí)現(xiàn)解壓當(dāng)天創(chuàng)建的ZIP文件到指定文件夾中
這篇文章主要為大家詳細(xì)介紹了Python如何實(shí)現(xiàn)解壓當(dāng)天創(chuàng)建的ZIP文件到指定文件夾中,文中的示例代碼講解詳細(xì),需要的小伙伴可以參考下2024-03-03

