98 lines
3.3 KiB
Python
98 lines
3.3 KiB
Python
import logging
|
|
import signal
|
|
from copy import deepcopy
|
|
|
|
from apscheduler.schedulers.blocking import BlockingScheduler
|
|
from apscheduler.triggers.cron import CronTrigger
|
|
from apscheduler.triggers.interval import IntervalTrigger
|
|
from django.conf import settings
|
|
from django.core.management.base import BaseCommand
|
|
from django_apscheduler import util
|
|
from django_apscheduler.jobstores import DjangoJobStore
|
|
from django_apscheduler.models import DjangoJobExecution
|
|
from django_dramatiq.models import Task
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
DEFAULT_JOB_KWARGS = {
|
|
'max_instances': 1,
|
|
'replace_existing': True,
|
|
}
|
|
|
|
DAY_SEC = 24 * 60 * 60
|
|
|
|
PERIODIC_JOBS = [
|
|
{
|
|
'task': 'utils.tasks:send_queued_mail_task.send',
|
|
'trigger': IntervalTrigger(seconds=30),
|
|
},
|
|
]
|
|
|
|
|
|
# The `close_old_connections` decorator ensures that database connections that have become
|
|
# unusable or are obsolete are closed before and after your job has run.
|
|
# You should use it to wrap any jobs that you schedule that access the Django database in any way.
|
|
@util.close_old_connections
|
|
def delete_old_job_executions(max_age: int = 7 * DAY_SEC) -> None:
|
|
"""
|
|
This job deletes APScheduler job execution entries older than `max_age` from the database.
|
|
It helps to prevent the database from filling up with old historical records that are no
|
|
longer useful.
|
|
|
|
:param max_age: The maximum length of time to retain historical job execution records.
|
|
Defaults to 7 days.
|
|
"""
|
|
DjangoJobExecution.objects.delete_old_job_executions(max_age)
|
|
Task.tasks.delete_old_tasks(max_age)
|
|
|
|
|
|
class Command(BaseCommand):
|
|
help = 'Runs APScheduler.' # noqa: A003
|
|
scheduler: BlockingScheduler = None
|
|
|
|
def prepare_scheduler(self):
|
|
self.stdout.write(self.style.NOTICE('Preparing scheduler'))
|
|
self.scheduler = BlockingScheduler(timezone=settings.TIME_ZONE)
|
|
self.scheduler.add_jobstore(DjangoJobStore(), 'default')
|
|
|
|
def add_jobs(self):
|
|
self.scheduler.add_job(
|
|
delete_old_job_executions,
|
|
trigger=CronTrigger(
|
|
day_of_week='mon',
|
|
hour='00',
|
|
minute='00',
|
|
), # Midnight on Monday, before the start of the next work week.
|
|
id='delete_old_job_executions',
|
|
max_instances=1,
|
|
replace_existing=True,
|
|
)
|
|
self.stdout.write("Added weekly job: 'delete_old_job_executions'.")
|
|
|
|
for job in PERIODIC_JOBS:
|
|
kwargs = DEFAULT_JOB_KWARGS | deepcopy(job)
|
|
task = kwargs.pop('task')
|
|
|
|
if 'id' not in kwargs:
|
|
kwargs['id'] = task
|
|
|
|
self.scheduler.add_job(task, **kwargs)
|
|
|
|
self.stdout.write(f'Added job: {task}')
|
|
|
|
def handle_shutdown(self, *args, **kwargs): # noqa: ARG001
|
|
self.stdout.write(self.style.NOTICE('Stopping scheduler...'))
|
|
self.scheduler.shutdown()
|
|
self.stdout.write(self.style.NOTICE('Scheduler shut down successfully!'))
|
|
|
|
def handle(self, *args, **options):
|
|
self.prepare_scheduler()
|
|
self.add_jobs()
|
|
|
|
signal.signal(signal.SIGTERM, self.handle_shutdown)
|
|
|
|
try:
|
|
self.stdout.write(self.style.NOTICE('Starting scheduler...'))
|
|
self.scheduler.start()
|
|
except KeyboardInterrupt:
|
|
self.handle_shutdown()
|