This repository has been archived on 2025-03-03. You can view files and clone it, but cannot push or open issues or pull requests.
facturio/utils/management/commands/runscheduler.py

98 lines
3.3 KiB
Python

import logging
import signal
from copy import deepcopy
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
from django.conf import settings
from django.core.management.base import BaseCommand
from django_apscheduler import util
from django_apscheduler.jobstores import DjangoJobStore
from django_apscheduler.models import DjangoJobExecution
from django_dramatiq.models import Task
logger = logging.getLogger(__name__)
DEFAULT_JOB_KWARGS = {
'max_instances': 1,
'replace_existing': True,
}
DAY_SEC = 24 * 60 * 60
PERIODIC_JOBS = [
{
'task': 'utils.tasks:send_queued_mail_task.send',
'trigger': IntervalTrigger(seconds=30),
},
]
# The `close_old_connections` decorator ensures that database connections that have become
# unusable or are obsolete are closed before and after your job has run.
# You should use it to wrap any jobs that you schedule that access the Django database in any way.
@util.close_old_connections
def delete_old_job_executions(max_age: int = 7 * DAY_SEC) -> None:
"""
This job deletes APScheduler job execution entries older than `max_age` from the database.
It helps to prevent the database from filling up with old historical records that are no
longer useful.
:param max_age: The maximum length of time to retain historical job execution records.
Defaults to 7 days.
"""
DjangoJobExecution.objects.delete_old_job_executions(max_age)
Task.tasks.delete_old_tasks(max_age)
class Command(BaseCommand):
help = 'Runs APScheduler.' # noqa: A003
scheduler: BlockingScheduler = None
def prepare_scheduler(self):
self.stdout.write(self.style.NOTICE('Preparing scheduler'))
self.scheduler = BlockingScheduler(timezone=settings.TIME_ZONE)
self.scheduler.add_jobstore(DjangoJobStore(), 'default')
def add_jobs(self):
self.scheduler.add_job(
delete_old_job_executions,
trigger=CronTrigger(
day_of_week='mon',
hour='00',
minute='00',
), # Midnight on Monday, before the start of the next work week.
id='delete_old_job_executions',
max_instances=1,
replace_existing=True,
)
self.stdout.write("Added weekly job: 'delete_old_job_executions'.")
for job in PERIODIC_JOBS:
kwargs = DEFAULT_JOB_KWARGS | deepcopy(job)
task = kwargs.pop('task')
if 'id' not in kwargs:
kwargs['id'] = task
self.scheduler.add_job(task, **kwargs)
self.stdout.write(f'Added job: {task}')
def handle_shutdown(self, *args, **kwargs): # noqa: ARG001
self.stdout.write(self.style.NOTICE('Stopping scheduler...'))
self.scheduler.shutdown()
self.stdout.write(self.style.NOTICE('Scheduler shut down successfully!'))
def handle(self, *args, **options):
self.prepare_scheduler()
self.add_jobs()
signal.signal(signal.SIGTERM, self.handle_shutdown)
try:
self.stdout.write(self.style.NOTICE('Starting scheduler...'))
self.scheduler.start()
except KeyboardInterrupt:
self.handle_shutdown()