不败君

前端萌新&初级后端攻城狮

实现 Windows 系统下使用 Crontab 定时任务

实现 Windows 系统下使用 Crontab 定时任务

2025-01-01 19:34:00

围观(557)

众所周知,在 Linux 系统部署自己编写的代码,无论使用的是什么语言,都可以轻松调用 Crontab 定时任务管理。

使用 Windows 在开发或者部署程序时没法使用 Crontab ,利用元旦假期的半天时间,实现了这个小玩意,直接上代码:

import json
from time import sleep
import time
import redis
from croniter import croniter, CroniterBadCronError
from datetime import datetime
import concurrent.futures
import subprocess

# 读取配置文件
with open('config.json', 'r', encoding='utf-8') as file:
    config = json.load(file)


def main():
    with concurrent.futures.ThreadPoolExecutor(max_workers=config['max_workers']) as executor:
        while 1:
            cur_time = int(time.time())
            crontabs = get_crontabs()
            for crontab in crontabs:
                if config['mode'] == 'redis':
                    crontab = crontab.decode('utf-8')
                    crontab = json.loads(crontab)
                if crontab is None:
                    continue
                if crontab['path'] is None:
                    # print(f'未设置执行路径:{crontab}')
                    continue
                try:
                    ci = croniter(crontab['crontab'], cur_time)
                    pre_time = int(ci.get_prev(datetime).timestamp() + (8 * 3600))
                    next_time = int(ci.get_next(datetime).timestamp() + (8 * 3600))
                    if cur_time == pre_time or cur_time == next_time:
                        command = crontab['command']
                        path = crontab['path']
                        executor.submit(execute_command, command, path)
                except CroniterBadCronError as e:
                    print(f'解析 crontab 失败:{crontab['crontab']}, 错误原因:{e}')
                    continue
            sleep(1)


def get_crontabs():
    if config['mode'] == 'file':
        # 文件驱动
        with open('crontab.json', 'r', encoding='utf-8') as crontabFile:
            return json.load(crontabFile)
    # redis 驱动
    r = redis.Redis(host=config['redis_host'], port=config['redis_port'], password=config['redis_password'],
                    db=config['redis_index'])
    return r.hgetall('CRONTABS').values()


def execute_command(command, path):
    print(f"进入目录:{path},执行命令:{command}")
    res = subprocess.run(command, cwd=path, capture_output=True, text=True)
    print(f"命令[{command}]的响应结果:{res.stdout}")


if __name__ == '__main__':
    main()

复制以上代码保存到 py 文件,装好所需的依赖包,设置配置 (程序根目录的 config.json):

{
    "mode":"file",
    "redis_host": "127.0.0.1",
    "redis_password": null,
    "redis_port": 6379,
    "redis_index": 0,
    "max_workers": 100
}

配置文件中的 mode 意味着使用的定时任务管理方式,可选值是 “redis” 和 “file”。设置为 “file” 后,需要在程序根目录下设置 crontab.json 文件:

[
    {
        "crontab": "* * * * *",
        "command": "php 2.php",
        "path": "D:\\project\\crontab"
    },
    {
        "crontab": "* * * * *",
        "command": "php 1.php",
        "path": "D:\\project\\crontab\\t"
    },
    {
        "crontab": "* * * * *",
        "command": "php 3.php",
        "path": "D:\\project\\crontab"
    },
    {
        "crontab": "3/5 * * * *",
        "command": "php 1.php",
        "path": "D:\\project\\crontab\\t"
    }
]

以上内容为实例,需要根据实际需求自行设置。


设置 mode 为 redis 时,需注意填写 redis 相关配置,例如地址、密码、端口等,然后定时任务内容设置在哈希表,表名是 "CRONTABS"

每行数据的键自定义,值内容和上面一致,如:

{"crontab":"* * * * *","command":"php 3.php","path":"D:\\project\\crontab"}


运行效果:


运行模式推荐使用 redis,程序上可随时管理定时任务。

放上打包好的程序及代码: https://pan.baidu.com/s/1Oy-6NrZ5ePPU2R0LTcSx8w?pwd=3h2m


注:配置文件的 max_workers 是设置最大的线程数,需要根据自己电脑性能配置及定时任务数量调整。

本文地址 : bubaijun.com/page.php?id=240

版权声明 : 未经允许禁止转载!

评论:我要评论
发布评论:
Copyright © 不败君 粤ICP备18102917号-1

不败君

首 页 作 品 微 语