定時任務(wù)的典型落地場景在各行業(yè)中都很普遍,比如支付系統(tǒng)中,支付過程中因為網(wǎng)絡(luò)或者其他因素導致出現(xiàn)掉單、卡單的情況,賬單變成了“單邊賬”,這種情況對于支付用戶來說,毫無疑問是災(zāi)難級別的體驗,明明自己付了錢,扣了款,但是訂單狀態(tài)卻未發(fā)生變化。所以,每一筆訂單的支付任務(wù)流程中都需要一個定時輪詢的備選方案,一旦支付中發(fā)生問題,定時輪詢服務(wù)就可以及時發(fā)現(xiàn)和更正訂單狀態(tài)。
又比如,之前的一篇以寡治眾各個擊破,超大文件分片上傳之構(gòu)建基于Vue.js3.0+Ant-desgin+Tornado6純異步IO高效寫入服務(wù),在超大型文件分片傳輸任務(wù)過程中,一旦分片上傳或者分片合并環(huán)節(jié)出了問題,就有可能導致超大型文件無法完整的傳輸?shù)椒?wù)器中,從而浪費大量的系統(tǒng)帶寬資源,所以每一個分片傳輸任務(wù)執(zhí)行過程中也需要一個對應(yīng)的定時輪詢來“盯”著,防止過程中出現(xiàn)問題。
在實際業(yè)務(wù)場景中,定時服務(wù)基本都作為主應(yīng)用的附屬服務(wù)而存在,不同定時任務(wù)的調(diào)度時間可能不一樣,所以如果能夠配合主服務(wù)并發(fā)異步調(diào)用定時任務(wù),則可以單應(yīng)用能夠支持上萬,甚至十萬以上的定時任務(wù),并且不同任務(wù)能夠有獨立的調(diào)度時間,這里通過Tornado配合APScheduler和Celery,分別展示不同的異步定時任務(wù)調(diào)用邏輯。
APScheduler(advanceded python scheduler)是一款及其優(yōu)秀的Python3定時任務(wù)框架,它不僅支持并發(fā)異步調(diào)用定時任務(wù),還可以動態(tài)地對定時任務(wù)進行管理,同時也支持定時任務(wù)的持久化。
首先安裝APScheduler以及Tornado6:
pip3 install apscheduler
pip3 install tornado==6.1
隨后導入基于Tornado的異步APScheduler:
from datetime import datetime
from tornado.ioloop import IOLoop, PeriodicCallback
from tornado.web import RequestHandler, Application
from apscheduler.schedulers.tornado import TornadoScheduler
這里TornadoScheduler實例就具備了Tornado的事件循環(huán)特性,隨后聲明異步定時任務(wù):
async def task():
print('[APScheduler][Task]-{}'.format(datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')))
隨后初始化定時任務(wù)對象:
scheduler = None
# 初始化
def init_scheduler():
global scheduler
scheduler = TornadoScheduler()
scheduler.start()
scheduler.add_job(task,"interval",seconds=3,id="job1",args=())
print("定時任務(wù)啟動")
這里啟動后就添加一個定時任務(wù),每隔三秒執(zhí)行一次。
接著main入口啟動服務(wù):
if __name__ == '__main__':
init_scheduler()
系統(tǒng)返回:
C:\Users\liuyue\www\tornado6>python test_scheduler.py
定時任務(wù)啟動
[APScheduler][Task]-2022-07-28 22:13:47.792582
[APScheduler][Task]-2022-07-28 22:13:50.783016
[APScheduler][Task]-2022-07-28 22:13:53.783362
[APScheduler][Task]-2022-07-28 22:13:56.775059
[APScheduler][Task]-2022-07-28 22:13:59.779563
隨后創(chuàng)建Tornado控制器視圖:
class SchedulerHandler(RequestHandler):
def get(self):
job_id = self.get_query_argument('job_id', None)
action = self.get_query_argument('action', None)
if job_id:
# 添加任務(wù)
if 'add' == action:
if job_id not in job_ids:
job_ids.append(job_id)
scheduler.add_job(task, 'interval', seconds=3, id=job_id, args=(job_id,))
self.write('[TASK ADDED] - {}'.format(job_id))
else:
self.write('[TASK EXISTS] - {}'.format(job_id))
# 刪除任務(wù)
elif 'remove' == action:
if job_id in job_ids:
scheduler.remove_job(job_id)
self.write('[TASK REMOVED] - {}'.format(job_id))
else:
self.write('[TASK NOT FOUND] - {}'.format(job_id))
else:
self.write('[INVALID PARAMS] INVALID job_id or action')
這里通過傳參來動態(tài)的刪減異步定時任務(wù),對于完成輪詢?nèi)蝿?wù)的定時任務(wù),完全可以物理刪除,從而節(jié)約系統(tǒng)資源,隨后添加路由并且啟動Tornado服務(wù):
if __name__ == '__main__':
routes = [url(r"/scheduler/",SchedulerHandler)]
init_scheduler()
# 聲明tornado對象
application = Application(routes,debug=True)
application.listen(8888)
IOLoop.current().start()
所謂任務(wù)持久化,即任務(wù)存儲在諸如文件或者數(shù)據(jù)庫這樣的持久化容器中,如果APScheduler定時任務(wù)服務(wù)進程中斷,未執(zhí)行的任務(wù)還會保留,當服務(wù)再次啟動時,定時任務(wù)可以從數(shù)據(jù)庫中讀取出來再次被裝載調(diào)用,這里以redis數(shù)據(jù)庫為例子:
from apscheduler.jobstores.redis import RedisJobStore
# 初始化
def init_scheduler():
global scheduler
jobstores = {
'default': RedisJobStore(jobs_key='cron.jobs',run_times_key='cron.run_times',
host='localhost', port=6379,)
}
scheduler = TornadoScheduler(jobstores=jobstores)
scheduler.start()
scheduler.add_job(task,"interval",seconds=3,id="job1",args=())
print("定時任務(wù)啟動")
這里通過jobstores參數(shù)將redis裝載到定時任務(wù)服務(wù)中,當創(chuàng)建任務(wù)時,數(shù)據(jù)庫中會以hash的形式來存儲任務(wù)明細:
127.0.0.1:6379> keys *
1) "cron.run_times"
2) "cron.jobs"
127.0.0.1:6379> type cron.jobs
hash
127.0.0.1:6379> hgetall cron.jobs
1) "job1"
2) "\x80\x05\x95\x14\x02\x00\x00\x00\x00\x00\x00}\x94(\x8c\aversion\x94K\x01\x8c\x02id\x94\x8c\x04job1\x94\x8c\x04func\x94\x8c\x0e__main__:task1\x94\x8c\atrigger\x94\x8c\x1dapscheduler.triggers.interval\x94\x8c\x0fIntervalTrigger\x94\x93\x94)\x81\x94}\x94(h\x01K\x02\x8c\btimezone\x94\x8c\x1bpytz_deprecation_shim._impl\x94\x8c\twrap_zone\x94\x93\x94\x8c\bbuiltins\x94\x8c\agetattr\x94\x93\x94\x8c\bzoneinfo\x94\x8c\bZoneInfo\x94\x93\x94\x8c\t_unpickle\x94\x86\x94R\x94\x8c\x0cAsia/Irkutsk\x94K\x01\x86\x94R\x94h\x19\x86\x94R\x94\x8c\nstart_date\x94\x8c\bdatetime\x94\x8c\bdatetime\x94\x93\x94C\n\a\xe6\a\x1c\x16\x1e&\x0b\xc7\x8b\x94h\x1d\x86\x94R\x94\x8c\bend_date\x94N\x8c\binterval\x94h\x1f\x8c\ttimedelta\x94\x93\x94K\x00K\x03K\x00\x87\x94R\x94\x8c\x06jitter\x94Nub\x8c\bexecutor\x94\x8c\adefault\x94\x8c\x04args\x94)\x8c\x06kwargs\x94}\x94\x8c\x04name\x94\x8c\x05task1\x94\x8c\x12misfire_grace_time\x94K\x01\x8c\bcoalesce\x94\x88\x8c\rmax_instances\x94K\x01\x8c\rnext_run_time\x94h!C\n\a\xe6\a\x1c\x16\x1e,\x0b\xc7\x8b\x94h\x1d\x86\x94R\x94u."
而如果刪除任務(wù),redis數(shù)據(jù)庫中的任務(wù)也會同步刪除。
至此,APScheduler配合Tornado就完成了一個簡單的并發(fā)異步定時任務(wù)服務(wù)。
celery是一款在Python定時任務(wù)領(lǐng)域“開風氣之先”的框架,和APScheduler相比,celery略顯臃腫了一點,同時,celery并不具備任何任務(wù)持久化的功能,也需要三方的容器進行支持。
首先安裝5.0以上版本:
pip3 install celery==5.2.7
隨后,初始化任務(wù)對象:
from celery import Celery
from datetime import timedelta
from redisbeat.scheduler import RedisScheduler
app = Celery("tornado")
app.conf["imports"] = ["celery_task"]
# 定義broker
app.conf.broker_url = "redis://localhost:6379"
# 任務(wù)結(jié)果
app.conf.result_backend = "redis://localhost:6379"
# 時區(qū)
app.conf.timezone = "Asia/Shanghai"
這里任務(wù)代理(broker)和任務(wù)結(jié)果(result_backend)也都存儲在redis中。
緊接著聲明異步任務(wù)方法:
from celery import shared_task
import asyncio
async def consume():
return 'test'
@shared_task
def async_job():
return asyncio.run(consume())
這里通過asyncio庫間接調(diào)用異步方法。
然后添加定時任務(wù)的配置:
from datetime import timedelta
# 需要執(zhí)行任務(wù)的配置
app.conf.beat_schedule = {
"task1": {
"task": "celery_task.async_consume", #執(zhí)行的方法
"schedule": timedelta(seconds=3),
"args":()
},
}
隨后啟動worker服務(wù):
celery -A module_name worker --pool=solo -l info
接著啟動beat服務(wù):
celery -A module_name beat -l info
異步定時任務(wù)會被裝載執(zhí)行,系統(tǒng)返回:
C:\Users\liuyue\www\tornado6>celery -A test_celery worker --pool=solo -l info
-------------- celery@LIUYUE354D v5.2.7 (dawn-chorus)
--- ***** -----
-- ******* ---- Windows-10-10.0.22000-SP0 2022-07-28 22:55:00
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: tornado:0x23769b40430
- ** ---------- .> transport: redis://localhost:6379//
- ** ---------- .> results: redis://localhost:6379/
- *** --- * --- .> concurrency: 4 (solo)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. celery_task.async_job
. celery_task.job
. test_celery.sub
[2022-07-28 22:55:02,234: INFO/MainProcess] Connected to redis://localhost:6379//
[2022-07-28 22:55:04,267: INFO/MainProcess] mingle: searching for neighbors
[2022-07-28 22:55:11,552: INFO/MainProcess] mingle: all alone
[2022-07-28 22:55:21,837: INFO/MainProcess] celery@LIUYUE354D ready.
[2022-07-28 22:58:26,032: INFO/MainProcess] Task celery_task.job[b0337808-c90b-450b-98bc-fd577f7039d0] received
[2022-07-28 22:58:28,086: INFO/MainProcess] Task celery_task.job[b0337808-c90b-450b-98bc-fd577f7039d0] succeeded in 2.062999999994645s: 'test'
[2022-07-28 22:58:28,099: INFO/MainProcess] Task celery_task.job[f4aa4304-02c3-48ee-8625-fa1fe27b8e98] received
[2022-07-28 22:58:28,099: INFO/MainProcess] Task celery_task.job[f4aa4304-02c3-48ee-8625-fa1fe27b8e98] succeeded in 0.0s: 'test'
[2022-07-28 22:58:28,975: INFO/MainProcess] Task celery_task.job[bb33981d-0629-4173-8375-128ba84d1f0f] received
[2022-07-28 22:58:28,975: INFO/MainProcess] Task celery_task.job[bb33981d-0629-4173-8375-128ba84d1f0f] succeeded in 0.0s: 'test'
同時,在redis數(shù)據(jù)庫中會以列表和字符串的形式存儲任務(wù)明細和結(jié)果:
127.0.0.1:6379> keys *
1) "celery-task-meta-f4aa4304-02c3-48ee-8625-fa1fe27b8e98"
2) "celery-task-meta-bb33981d-0629-4173-8375-128ba84d1f0f"
3) "_kombu.binding.celery"
4) "celery-task-meta-b0337808-c90b-450b-98bc-fd577f7039d0"
5) "cron.run_times"
6) "cron.jobs"
7) "celery"
從調(diào)度層面上講,celery和APScheduler并無太大的不同,但從使用成本上看,celery比APScheduler多維護一個服務(wù),worker和beat雙服務(wù)的形式無形中也增加了系統(tǒng)監(jiān)控資源的開銷。
從任務(wù)管理層面上看,celery毫無疑問輸?shù)暮軓氐祝驗樵鷆elery壓根就不支持動態(tài)地修改定時任務(wù)。但我們可以通過三方庫的形式來曲線救國:
pip3 install redisbeat
這里通過redis的定時任務(wù)服務(wù)來取代celery原生的beat服務(wù)。
建立redisbeat實例:
from celery import Celery
from datetime import timedelta
from redisbeat.scheduler import RedisScheduler
app = Celery("tornado")
app.conf["imports"] = ["celery_task"]
# 定義broker
app.conf.broker_url = "redis://localhost:6379"
# 任務(wù)結(jié)果
app.conf.result_backend = "redis://localhost:6379"
# 時區(qū)
app.conf.timezone = "Asia/Shanghai"
@app.task
def sub():
return "test"
schduler = RedisScheduler(app=app)
schduler.add(**{
'name': 'job1',
'task': 'test_celery.sub',
'schedule': timedelta(seconds=3),
'args': ()
})
通過schduler.add方法就可以動態(tài)地添加定時任務(wù),隨后以redisbeat的形式啟動celery服務(wù):
celery -A test_celery beat -S redisbeat.RedisScheduler -l INFO
此時經(jīng)過改造的系統(tǒng)接受動態(tài)任務(wù)調(diào)用而執(zhí)行:
C:\Users\liuyue\www\tornado6>celery -A test_celery worker --pool=solo -l info
-------------- celery@LIUYUE354D v5.2.7 (dawn-chorus)
--- ***** -----
-- ******* ---- Windows-10-10.0.22000-SP0 2022-07-28 23:09:50
- *** --- * ---
- ** ---------- [config]
- ** ---------- .> app: tornado:0x19c1a1f0040
- ** ---------- .> transport: redis://localhost:6379//
- ** ---------- .> results: redis://localhost:6379/
- *** --- * --- .> concurrency: 4 (solo)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. celery_task.async_job
. celery_task.job
. test_celery.sub
[2022-07-28 23:09:52,916: INFO/MainProcess] Connected to redis://localhost:6379//
[2022-07-28 23:09:54,971: INFO/MainProcess] mingle: searching for neighbors
[2022-07-28 23:10:02,140: INFO/MainProcess] mingle: all alone
[2022-07-28 23:10:12,427: INFO/MainProcess] celery@LIUYUE354D ready.
[2022-07-28 23:10:12,440: INFO/MainProcess] Task test_celery.sub[ade9c5ad-d551-44f2-84e7-a2824b2d022d] received
[2022-07-28 23:10:14,518: INFO/MainProcess] Task test_celery.sub[ade9c5ad-d551-44f2-84e7-a2824b2d022d] succeeded in 2.0780000000013388s: 'test'
[2022-07-28 23:10:14,518: INFO/MainProcess] Task test_celery.sub[11927889-8385-4c88-aff1-42179b559db0] received
[2022-07-28 23:10:14,518: INFO/MainProcess] Task test_celery.sub[11927889-8385-4c88-aff1-42179b559db0] succeeded in 0.0s: 'test'
[2022-07-28 23:10:14,533: INFO/MainProcess] Task test_celery.sub[442cd168-5a68-4ade-b4e7-6ae4a92a53ae] received
[2022-07-28 23:10:14,533: INFO/MainProcess] Task test_celery.sub[442cd168-5a68-4ade-b4e7-6ae4a92a53ae] succeeded in 0.0s: 'test'
[2022-07-28 23:10:17,087: INFO/MainProcess] Task test_celery.sub[e4850b5d-28e9-47c8-88e6-d9086e93db88] received
[2022-07-28 23:10:17,087: INFO/MainProcess] Task test_celery.sub[e4850b5d-28e9-47c8-88e6-d9086e93db88] succeeded in 0.0s: 'test'
相應(yīng)地,也可以通過remove方法和任務(wù)id進行刪除操作:
schduler.remove('job1')
任務(wù)明細的存儲形式上,也由列表升級成為了有序集合,提高了效率:
127.0.0.1:6379> type celery:beat:order_tasks
zset
127.0.0.1:6379> zrange celery:beat:order_tasks 0 -1
1) "{\"py/reduce\": [{\"py/type\": \"celery.beat.ScheduleEntry\"}, {\"py/tuple\": [\"job1\", \"test_celery.sub\", {\"__reduce__\": [{\"py/type\": \"datetime.datetime\"}, [\"B+YHHBcMDgfyGg==\", {\"py/reduce\": [{\"py/function\": \"pytz._p\"}, {\"py/tuple\": [\"Asia/Shanghai\", 28800, 0, \"CST\"]}]}]], \"py/object\": \"datetime.datetime\"}, 43, {\"py/reduce\": [{\"py/type\": \"celery.schedules.schedule\"}, {\"py/tuple\": [{\"py/reduce\": [{\"py/type\": \"datetime.timedelta\"}, {\"py/tuple\": [0, 3, 0]}]}, false, null]}]}, {\"py/tuple\": []}, {}, {}]}]}"
至此,celery配合tornado打造異步定時任務(wù)就完成了。
APScheduler長于靈活機動并可以依附于Tornado事件循環(huán)體系中,Celery則嫻于調(diào)度和分布式的支持并相對獨立,二者不分軒輊,各擅勝場,適合不同的業(yè)務(wù)應(yīng)用場景,當然,在異步定時任務(wù)執(zhí)行異常時的處理策略也有很多方面需要完善,比如由于實例夯死導致的過時觸發(fā)問題、任務(wù)追趕和任務(wù)堆積問題、工作流場景下任務(wù)異常后是整體重試還是斷點續(xù)傳重試等,都需要具體問題具體分析。
出品:科普中國
作者:蘭順正
策劃:宋雅娟
監(jiān)制:光明網(wǎng)科普事業(yè)部
近日有報道稱,德國萊茵金屬公司研發(fā)的“山貓”KF41步兵戰(zhàn)車已經(jīng)找到了第一個買家,匈牙利方面以超過20億美元的價格購買了218輛。
“又帥又能打”的山貓
“山貓”系列步兵戰(zhàn)車(Infrantry Fighting Vehicle,IFV)是由德國萊茵金屬公司開發(fā)的先進高防護履帶式裝甲車。該步兵戰(zhàn)車在2016年6月舉行的Eurosatory 2016展會上首次亮相。“山貓”步兵戰(zhàn)車有兩種基本配置,KF31和KF41。KF31重38t,可容納3名車組人員和6名搭車作戰(zhàn)的士兵,而此次匈牙利購買的KF41體型更大,重44噸,可容納3名車組人員和八名搭車作戰(zhàn)的步兵。
KF41全車長7.73米,寬3.6米,高3.3米(至炮塔頂),采用傳統(tǒng)布局,動力艙前置,中部為戰(zhàn)斗艙,后部為載員艙。
KF41的戰(zhàn)斗全重超過了很多主戰(zhàn)坦克,防護性能出色
良好的防護能力是KF41的一大賣點,其車體和炮塔都采用裝甲鋼焊接而成,車首上裝甲和下裝甲均為大傾斜平面,有良好的避彈外形,內(nèi)部有防剝落襯層,外面有模塊化附加裝甲,高達44噸的戰(zhàn)斗全重,比俄羅斯的T-72主戰(zhàn)坦克還重2噸,所以裝甲防護性能相當出色。而據(jù)萊茵金屬公司介紹,KF41的戰(zhàn)斗全重可以進一步增至50噸,這也就意味著其可以繼續(xù)加厚裝甲,達到近似主戰(zhàn)坦克的裝甲防護能力。同時,KF41還注重隱身性能,在外形上很有“科幻感”,車體簡潔,基本沒有什么突出物,機關(guān)炮的炮管外面裝有用于隔熱和冷卻的金屬護套。另外KF41的車體表面還噴涂有防紅外涂料,能夠顯著降低紅外探測裝置的發(fā)現(xiàn)概率。
在武器方面,“山貓”步兵戰(zhàn)車安裝有萊茵公司的“長矛(LANCE)”炮塔,配備穩(wěn)定的外部動力“沃坦30(30毫米x173)”或“沃坦35(35毫米x228)”機關(guān)炮(此次匈牙利引進了30毫米版)。“沃坦30” 射速為每分鐘200發(fā),易于操作且高度可靠,可以有效打擊各種目標,使其成為現(xiàn)代步兵戰(zhàn)車和其他戰(zhàn)車完美的武器選擇。炮塔左右兩側(cè)可以各加掛兩枚“長釘”LR反坦克導彈,有效射程4千米,能夠擊穿800毫米的勻質(zhì)裝甲,輔助武器為一挺7.62毫米同軸機槍。
動力方面,KF41采用了利勃海爾D9612柴油發(fā)動機與全自動變速箱,發(fā)動機的最大功率800千瓦(1050馬力),最大時速70千米/小時,可以跨越最寬2.5米的壕溝,1.5米的深水與1米高的垂直墻。
“叫好不叫座”的山貓
盡管“山貓”的性能不俗,可是自誕生以來其推廣之路卻一直不順。該車先后參加了捷克210輛新型履帶式步兵戰(zhàn)車競標、澳大利亞陸軍“地面400”第3階段競標、以及美國陸軍下一代戰(zhàn)斗車輛NGCV的競標,但是都統(tǒng)統(tǒng)落選。而德國陸軍也表示將繼續(xù)換裝“美洲獅”步兵戰(zhàn)車,不會考慮采購KF41。有分析認為,“山貓”之所以難以打開局面,主要原因是過于超前,太多的先進技術(shù)運用導致了價格過高(如從此次德國與匈牙利的合同金額來看,這批“山貓”裝甲車單價已經(jīng)超過900萬美元,和美國M1艾布拉姆斯主戰(zhàn)坦克售價相當)。再加上目前各方的步兵戰(zhàn)車也都能適應(yīng)很長一段時間的戰(zhàn)場需求,所以“山貓”會遭遇曲高和寡。
“山貓”在國際競標重多次落馬
此次與匈牙利成功簽署合約可謂是“山貓”正式跨入市場的第一步。根據(jù)合約規(guī)定,這批裝甲車將分兩階段完成交付,第一階段,首批46輛“山貓”裝甲車將在德國制造,并連同9輛布法羅裝甲搶救車以及備件和模擬器在2023年交付;而剩下的“山貓”裝甲車將移師匈牙利制造,據(jù)稱為了協(xié)助匈牙利完成裝甲車制造,萊茵金屬將在匈牙利開辦工廠,成立合資企業(yè)。
軍事小詞典
“山貓”步戰(zhàn)車:“山貓”系列步兵戰(zhàn)車是由德國萊茵金屬公司開發(fā)的先進高防護履帶式裝甲車。該車注重隱身和防護,“科幻感”十足,綜合性能很強,但是由于設(shè)計過于超前,這些年在國際市場一直難以推廣。
來源: 科普中國-軍事科技前沿