import logging import signal from copy import deepcopy from apscheduler.schedulers.blocking import BlockingScheduler from apscheduler.triggers.cron import CronTrigger from apscheduler.triggers.interval import IntervalTrigger from django.conf import settings from django.core.management.base import BaseCommand from django_apscheduler import util from django_apscheduler.jobstores import DjangoJobStore from django_apscheduler.models import DjangoJobExecution from django_dramatiq.models import Task logger = logging.getLogger(__name__) DEFAULT_JOB_KWARGS = { 'max_instances': 1, 'replace_existing': True, } DAY_SEC = 24 * 60 * 60 PERIODIC_JOBS = [ { 'task': 'utils.tasks:send_queued_mail_task.send', 'trigger': IntervalTrigger(seconds=30), }, ] # The `close_old_connections` decorator ensures that database connections that have become # unusable or are obsolete are closed before and after your job has run. # You should use it to wrap any jobs that you schedule that access the Django database in any way. @util.close_old_connections def delete_old_job_executions(max_age: int = 7 * DAY_SEC) -> None: """ This job deletes APScheduler job execution entries older than `max_age` from the database. It helps to prevent the database from filling up with old historical records that are no longer useful. :param max_age: The maximum length of time to retain historical job execution records. Defaults to 7 days. """ DjangoJobExecution.objects.delete_old_job_executions(max_age) Task.tasks.delete_old_tasks(max_age) class Command(BaseCommand): help = 'Runs APScheduler.' # noqa: A003 scheduler: BlockingScheduler = None def prepare_scheduler(self): self.stdout.write(self.style.NOTICE('Preparing scheduler')) self.scheduler = BlockingScheduler(timezone=settings.TIME_ZONE) self.scheduler.add_jobstore(DjangoJobStore(), 'default') def add_jobs(self): self.scheduler.add_job( delete_old_job_executions, trigger=CronTrigger( day_of_week='mon', hour='00', minute='00', ), # Midnight on Monday, before the start of the next work week. id='delete_old_job_executions', max_instances=1, replace_existing=True, ) self.stdout.write("Added weekly job: 'delete_old_job_executions'.") for job in PERIODIC_JOBS: kwargs = DEFAULT_JOB_KWARGS | deepcopy(job) task = kwargs.pop('task') if 'id' not in kwargs: kwargs['id'] = task self.scheduler.add_job(task, **kwargs) self.stdout.write(f'Added job: {task}') def handle_shutdown(self, *args, **kwargs): # noqa: ARG001 self.stdout.write(self.style.NOTICE('Stopping scheduler...')) self.scheduler.shutdown() self.stdout.write(self.style.NOTICE('Scheduler shut down successfully!')) def handle(self, *args, **options): self.prepare_scheduler() self.add_jobs() signal.signal(signal.SIGTERM, self.handle_shutdown) try: self.stdout.write(self.style.NOTICE('Starting scheduler...')) self.scheduler.start() except KeyboardInterrupt: self.handle_shutdown()