import logging import signal from copy import deepcopy from apscheduler.schedulers.blocking import BlockingScheduler from apscheduler.triggers.cron import CronTrigger from apscheduler.triggers.interval import IntervalTrigger from django.conf import settings from django.core.management.base import BaseCommand from django_apscheduler import util from django_apscheduler.jobstores import DjangoJobStore from django_apscheduler.models import DjangoJobExecution from django_dramatiq.models import Task logger = logging.getLogger(__name__) DEFAULT_JOB_KWARGS = { "max_instances": 1, "replace_existing": True, } DAY_SEC = 24*60*60 PERIODIC_JOBS = [ { "task": "utils.tasks:send_queued_mail_task.send", "trigger": IntervalTrigger(seconds=30), }, ] # The `close_old_connections` decorator ensures that database connections that have become # unusable or are obsolete are closed before and after your job has run. # You should use it to wrap any jobs that you schedule that access the Django database in any way. @util.close_old_connections def delete_old_job_executions(max_age: int = 7 * DAY_SEC) -> None: """ This job deletes APScheduler job execution entries older than `max_age` from the database. It helps to prevent the database from filling up with old historical records that are no longer useful. :param max_age: The maximum length of time to retain historical job execution records. Defaults to 7 days. """ DjangoJobExecution.objects.delete_old_job_executions(max_age) Task.tasks.delete_old_tasks(max_age) class Command(BaseCommand): help = "Runs APScheduler." # noqa: A003 scheduler: BlockingScheduler = None def prepare_scheduler(self): self.stdout.write(self.style.NOTICE("Preparing scheduler")) self.scheduler = BlockingScheduler(timezone=settings.TIME_ZONE) self.scheduler.add_jobstore(DjangoJobStore(), "default") def add_jobs(self): self.scheduler.add_job( delete_old_job_executions, trigger=CronTrigger( day_of_week="mon", hour="00", minute="00", ), # Midnight on Monday, before the start of the next work week. id="delete_old_job_executions", max_instances=1, replace_existing=True, ) self.stdout.write("Added weekly job: 'delete_old_job_executions'.") for job in PERIODIC_JOBS: kwargs = DEFAULT_JOB_KWARGS | deepcopy(job) task = kwargs.pop("task") if "id" not in kwargs: kwargs["id"] = task self.scheduler.add_job(task, **kwargs) self.stdout.write(f"Added job: {task}") def handle_shutdown(self, *args, **kwargs): # noqa: ARG001 self.stdout.write(self.style.NOTICE("Stopping scheduler...")) self.scheduler.shutdown() self.stdout.write(self.style.NOTICE("Scheduler shut down successfully!")) def handle(self, *args, **options): self.prepare_scheduler() self.add_jobs() signal.signal(signal.SIGTERM, self.handle_shutdown) try: self.stdout.write(self.style.NOTICE("Starting scheduler...")) self.scheduler.start() except KeyboardInterrupt: self.handle_shutdown()