This repository has been archived on 2025-03-03. You can view files and clone it, but cannot push or open issues or pull requests.
facturio/utils/management/commands/runscheduler.py

99 lines
3.3 KiB
Python
Raw Permalink Normal View History

2025-03-03 21:26:27 +01:00
import logging
import signal
from copy import deepcopy
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
2025-03-03 21:51:01 +01:00
from apscheduler.triggers.interval import IntervalTrigger
2025-03-03 21:26:27 +01:00
from django.conf import settings
from django.core.management.base import BaseCommand
from django_apscheduler import util
2025-03-03 21:51:01 +01:00
from django_apscheduler.jobstores import DjangoJobStore
2025-03-03 21:26:27 +01:00
from django_apscheduler.models import DjangoJobExecution
from django_dramatiq.models import Task
logger = logging.getLogger(__name__)
DEFAULT_JOB_KWARGS = {
2025-03-03 21:52:20 +01:00
'max_instances': 1,
'replace_existing': True,
2025-03-03 21:26:27 +01:00
}
2025-03-03 21:52:20 +01:00
DAY_SEC = 24 * 60 * 60
2025-03-03 21:26:27 +01:00
PERIODIC_JOBS = [
{
2025-03-03 21:52:20 +01:00
'task': 'utils.tasks:send_queued_mail_task.send',
'trigger': IntervalTrigger(seconds=30),
2025-03-03 21:26:27 +01:00
},
]
# The `close_old_connections` decorator ensures that database connections that have become
# unusable or are obsolete are closed before and after your job has run.
# You should use it to wrap any jobs that you schedule that access the Django database in any way.
@util.close_old_connections
2025-03-03 21:51:01 +01:00
def delete_old_job_executions(max_age: int = 7 * DAY_SEC) -> None:
2025-03-03 21:26:27 +01:00
"""
This job deletes APScheduler job execution entries older than `max_age` from the database.
It helps to prevent the database from filling up with old historical records that are no
longer useful.
:param max_age: The maximum length of time to retain historical job execution records.
Defaults to 7 days.
"""
DjangoJobExecution.objects.delete_old_job_executions(max_age)
Task.tasks.delete_old_tasks(max_age)
class Command(BaseCommand):
2025-03-03 21:52:20 +01:00
help = 'Runs APScheduler.' # noqa: A003
2025-03-03 21:51:01 +01:00
scheduler: BlockingScheduler = None
2025-03-03 21:26:27 +01:00
def prepare_scheduler(self):
2025-03-03 21:52:20 +01:00
self.stdout.write(self.style.NOTICE('Preparing scheduler'))
2025-03-03 21:26:27 +01:00
self.scheduler = BlockingScheduler(timezone=settings.TIME_ZONE)
2025-03-03 21:52:20 +01:00
self.scheduler.add_jobstore(DjangoJobStore(), 'default')
2025-03-03 21:26:27 +01:00
def add_jobs(self):
self.scheduler.add_job(
delete_old_job_executions,
trigger=CronTrigger(
2025-03-03 21:52:20 +01:00
day_of_week='mon',
hour='00',
minute='00',
2025-03-03 21:26:27 +01:00
), # Midnight on Monday, before the start of the next work week.
2025-03-03 21:52:20 +01:00
id='delete_old_job_executions',
2025-03-03 21:26:27 +01:00
max_instances=1,
replace_existing=True,
)
self.stdout.write("Added weekly job: 'delete_old_job_executions'.")
for job in PERIODIC_JOBS:
kwargs = DEFAULT_JOB_KWARGS | deepcopy(job)
2025-03-03 21:52:20 +01:00
task = kwargs.pop('task')
2025-03-03 21:26:27 +01:00
2025-03-03 21:52:20 +01:00
if 'id' not in kwargs:
kwargs['id'] = task
2025-03-03 21:26:27 +01:00
self.scheduler.add_job(task, **kwargs)
2025-03-03 21:52:20 +01:00
self.stdout.write(f'Added job: {task}')
2025-03-03 21:26:27 +01:00
def handle_shutdown(self, *args, **kwargs): # noqa: ARG001
2025-03-03 21:52:20 +01:00
self.stdout.write(self.style.NOTICE('Stopping scheduler...'))
2025-03-03 21:26:27 +01:00
self.scheduler.shutdown()
2025-03-03 21:52:20 +01:00
self.stdout.write(self.style.NOTICE('Scheduler shut down successfully!'))
2025-03-03 21:26:27 +01:00
def handle(self, *args, **options):
self.prepare_scheduler()
self.add_jobs()
signal.signal(signal.SIGTERM, self.handle_shutdown)
try:
2025-03-03 21:52:20 +01:00
self.stdout.write(self.style.NOTICE('Starting scheduler...'))
2025-03-03 21:26:27 +01:00
self.scheduler.start()
except KeyboardInterrupt:
self.handle_shutdown()