refactor: Move cron jobs to celery tasks (#6926)

* refactor: Factor out helper from fetch_meeting_attendance.py

* feat: Define fetch_meeting_attendance_task task

Equivalent to the fetch_meeting_attendance management command

* chore: Disable fetch_meeting_attendance in bin/daily

* feat: Log errors in fetch_meeting_attendance_task

* feat: Enable a result backend for celery

Ignore results by default, but enable the backend so we
can manage tasks

* feat: Define daily task in ietf.utils.tasks

* refactor: Make bin/send-review-reminders into a task

* refactor: Make bin/send-scheduled-mail into a task

* chore: Update copyright years

* refactor: Make bin/rfc-editor-index-updates into a task

* refactor: Accept date type in rfc index update fn

* chore: Update comment

* fix: Annotate param as Optional

* fix: Revert treating skip_older_than_date as str

Misunderstood the comment, "fixed" a non-bug. Oops.

* feat: mgmt command to create periodic tasks

* feat: add summary of tasks to mgmt cmd

* style: black

* fix: Remove debug statements

* feat: Enable/disable tasks

* chore: Disable periodic tasks by default

* chore: Revert changes to daily and every15m

* fix: Call intended function

* chore: Add task descriptions
This commit is contained in:
Jennifer Richards 2024-01-22 14:04:16 -04:00 committed by GitHub
parent 6da9dff354
commit 8d12071bf5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 356 additions and 11 deletions

View file

@ -56,7 +56,7 @@ $DTDIR/ietf/bin/expire-last-calls
# Run an extended version of the rfc editor update, to catch changes
# with backdated timestamps
# Enable when removed from /a/www/ietf-datatracker/scripts/Cron-runner:
$DTDIR/ietf/bin/rfc-editor-index-updates -d 1969-01-01
$DTDIR/ietf/bin/rfc-editor-index-updates -d 1969-01-01
# Fetch meeting attendance data from ietf.org/registration/attendees
$DTDIR/ietf/manage.py fetch_meeting_attendance --latest 2

43
ietf/review/tasks.py Normal file
View file

@ -0,0 +1,43 @@
# Copyright The IETF Trust 2024, All Rights Reserved
#
# Celery task definitions
#
from celery import shared_task
from ietf.review.utils import (
review_assignments_needing_reviewer_reminder, email_reviewer_reminder,
review_assignments_needing_secretary_reminder, email_secretary_reminder,
send_unavailability_period_ending_reminder, send_reminder_all_open_reviews,
send_review_reminder_overdue_assignment, send_reminder_unconfirmed_assignments)
from ietf.utils.log import log
from ietf.utils.timezone import date_today, DEADLINE_TZINFO
@shared_task
def send_review_reminders_task():
today = date_today(DEADLINE_TZINFO)
for assignment in review_assignments_needing_reviewer_reminder(today):
email_reviewer_reminder(assignment)
log("Emailed reminder to {} for review of {} in {} (req. id {})".format(assignment.reviewer.address, assignment.review_request.doc_id, assignment.review_request.team.acronym, assignment.review_request.pk))
for assignment, secretary_role in review_assignments_needing_secretary_reminder(today):
email_secretary_reminder(assignment, secretary_role)
review_req = assignment.review_request
log("Emailed reminder to {} for review of {} in {} (req. id {})".format(secretary_role.email.address, review_req.doc_id, review_req.team.acronym, review_req.pk))
period_end_reminders_sent = send_unavailability_period_ending_reminder(today)
for msg in period_end_reminders_sent:
log(msg)
overdue_reviews_reminders_sent = send_review_reminder_overdue_assignment(today)
for msg in overdue_reviews_reminders_sent:
log(msg)
open_reviews_reminders_sent = send_reminder_all_open_reviews(today)
for msg in open_reviews_reminders_sent:
log(msg)
unconfirmed_assignment_reminders_sent = send_reminder_unconfirmed_assignments(today)
for msg in unconfirmed_assignment_reminders_sent:
log(msg)

View file

@ -1,4 +1,4 @@
# Copyright The IETF Trust 2007-2023, All Rights Reserved
# Copyright The IETF Trust 2007-2024, All Rights Reserved
# -*- coding: utf-8 -*-
@ -1153,6 +1153,12 @@ CELERY_BROKER_URL = 'amqp://mq/'
CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
CELERY_BEAT_SYNC_EVERY = 1 # update DB after every event
CELERY_BROKER_CONNECTION_RETRY_ON_STARTUP = True # the default, but setting it squelches a warning
# Use a result backend so we can chain tasks. This uses the rpc backend, see
# https://docs.celeryq.dev/en/stable/userguide/tasks.html#rpc-result-backend-rabbitmq-qpid
# Results can be retrieved only once and only by the caller of the task. Results will be
# lost if the message broker restarts.
CELERY_RESULT_BACKEND = 'rpc://' # sends a msg via the msg broker
CELERY_TASK_IGNORE_RESULT = True # ignore results unless specifically enabled for a task
# Meetecho API setup: Uncomment this and provide real credentials to enable
# Meetecho conference creation for interim session requests

View file

@ -9,7 +9,7 @@ from django.utils import timezone
import debug # pyflakes:ignore
from ietf.meeting.models import Meeting
from ietf.stats.utils import get_meeting_registration_data
from ietf.stats.utils import fetch_attendance_from_meetings
logtag = __name__.split('.')[-1]
logname = "user.log"
@ -36,11 +36,11 @@ class Command(BaseCommand):
else:
raise CommandError("Please use one of --meeting, --all or --latest")
for meeting in meetings:
added, processed, total = get_meeting_registration_data(meeting)
msg = "Fetched data for meeting %3s: %4d processed, %4d added, %4d in table" % (meeting.number, processed, added, total)
for meeting, stats in zip(meetings, fetch_attendance_from_meetings(meetings)):
msg = "Fetched data for meeting {:>3}: {:4d} processed, {:4d} added, {:4d} in table".format(
meeting.number, stats.processed, stats.added, stats.total
)
if self.stdout.isatty():
self.stdout.write(msg+'\n') # make debugging a bit easier
else:
syslog.syslog(msg)

27
ietf/stats/tasks.py Normal file
View file

@ -0,0 +1,27 @@
# Copyright The IETF Trust 2024, All Rights Reserved
#
# Celery task definitions
#
from celery import shared_task
from django.utils import timezone
from ietf.meeting.models import Meeting
from ietf.stats.utils import fetch_attendance_from_meetings
from ietf.utils import log
@shared_task
def fetch_meeting_attendance_task():
# fetch most recent two meetings
meetings = Meeting.objects.filter(type="ietf", date__lte=timezone.now()).order_by("-date")[:2]
try:
stats = fetch_attendance_from_meetings(meetings)
except RuntimeError as err:
log.log(f"Error in fetch_meeting_attendance_task: {err}")
else:
for meeting, stats in zip(meetings, fetch_attendance_from_meetings(meetings)):
log.log(
"Fetched data for meeting {:>3}: {:4d} processed, {:4d} added, {:4d} in table".format(
meeting.number, stats.processed, stats.added, stats.total
)
)

View file

@ -4,7 +4,7 @@
import re
import requests
from collections import defaultdict
from collections import defaultdict, namedtuple
from django.conf import settings
from django.db.models import Q
@ -382,3 +382,13 @@ def find_meetingregistration_person_issues(meetings=None):
summary.no_email.add(f'{mr} ({mr.pk}) provides no email address')
return summary
FetchStats = namedtuple("FetchStats", "added processed total")
def fetch_attendance_from_meetings(meetings):
stats = [
FetchStats(*get_meeting_registration_data(meeting)) for meeting in meetings
]
return stats

View file

@ -336,12 +336,12 @@ def parse_index(response):
def update_docs_from_rfc_index(
index_data, errata_data, skip_older_than_date=None
index_data, errata_data, skip_older_than_date: Optional[datetime.date] = None
) -> Iterator[tuple[int, list[str], Document, bool]]:
"""Given parsed data from the RFC Editor index, update the documents in the database
Returns an iterator that yields (rfc_number, change_list, doc, rfc_published) for the
RFC document and, if applicable, the I-D that it came from.
RFC document and, if applicable, the I-D that it came from.
The skip_older_than_date is a bare date, not a datetime.
"""
@ -405,7 +405,8 @@ def update_docs_from_rfc_index(
abstract,
) in index_data:
if skip_older_than_date and rfc_published_date < skip_older_than_date:
# speed up the process by skipping old entries
# speed up the process by skipping old entries (n.b., the comparison above is a
# lexical comparison between "YYYY-MM-DD"-formatted dates)
continue
# we assume two things can happen: we get a new RFC, or an

67
ietf/sync/tasks.py Normal file
View file

@ -0,0 +1,67 @@
# Copyright The IETF Trust 2024, All Rights Reserved
#
# Celery task definitions
#
import datetime
import io
import requests
from celery import shared_task
from django.conf import settings
from ietf.sync.rfceditor import MIN_ERRATA_RESULTS, MIN_INDEX_RESULTS, parse_index, update_docs_from_rfc_index
from ietf.utils import log
from ietf.utils.timezone import date_today
@shared_task
def rfc_editor_index_update_task(full_index=False):
"""Update metadata from the RFC index
Default is to examine only changes in the past 365 days. Call with full_index=True to update
the full RFC index.
According to comments on the original script, a year's worth took about 20s on production as of
August 2022
The original rfc-editor-index-update script had a long-disabled provision for running the
rebuild_reference_relations scripts after the update. That has not been brought over
at all because it should be implemented as its own task if it is needed.
"""
skip_date = None if full_index else date_today() - datetime.timedelta(days=365)
log.log(
"Updating document metadata from RFC index going back to {since}, from {url}".format(
since=skip_date if skip_date is not None else "the beginning",
url=settings.RFC_EDITOR_INDEX_URL,
)
)
try:
response = requests.get(
settings.RFC_EDITOR_INDEX_URL,
timeout=30, # seconds
)
except requests.Timeout as exc:
log.log(f'GET request timed out retrieving RFC editor index: {exc}')
return # failed
rfc_index_xml = response.text
index_data = parse_index(io.StringIO(rfc_index_xml))
try:
response = requests.get(
settings.RFC_EDITOR_ERRATA_JSON_URL,
timeout=30, # seconds
)
except requests.Timeout as exc:
log.log(f'GET request timed out retrieving RFC editor errata: {exc}')
return # failed
errata_data = response.json()
if len(index_data) < MIN_INDEX_RESULTS:
log.log("Not enough index entries, only %s" % len(index_data))
return # failed
if len(errata_data) < MIN_ERRATA_RESULTS:
log.log("Not enough errata entries, only %s" % len(errata_data))
return # failed
for rfc_number, changes, doc, rfc_published in update_docs_from_rfc_index(
index_data, errata_data, skip_older_than_date=skip_date
):
for c in changes:
log.log("RFC%s, %s: %s" % (rfc_number, doc.name, c))

View file

@ -0,0 +1,139 @@
# Copyright The IETF Trust 2024, All Rights Reserved
import json
from django_celery_beat.models import CrontabSchedule, PeriodicTask
from django.core.management.base import BaseCommand
CRONTAB_DEFS = {
"daily": {
"minute": "5",
"hour": "0",
"day_of_week": "*",
"day_of_month": "*",
"month_of_year": "*",
},
"hourly": {
"minute": "5",
"hour": "*",
"day_of_week": "*",
"day_of_month": "*",
"month_of_year": "*",
},
"every_15m": {
"minute": "*/15",
"hour": "*",
"day_of_week": "*",
"day_of_month": "*",
"month_of_year": "*",
},
}
class Command(BaseCommand):
"""Manage periodic tasks"""
def add_arguments(self, parser):
parser.add_argument("--create-default", action="store_true")
parser.add_argument("--enable", type=int, action="append")
parser.add_argument("--disable", type=int, action="append")
def handle(self, *args, **options):
self.crontabs = self.get_or_create_crontabs()
if options["create_default"]:
self.create_default_tasks()
if options["enable"]:
self.enable_tasks(options["enable"])
if options["disable"]:
self.disable_tasks(options["disable"])
self.show_tasks()
def get_or_create_crontabs(self):
crontabs = {}
for label, definition in CRONTAB_DEFS.items():
crontabs[label], _ = CrontabSchedule.objects.get_or_create(**definition)
return crontabs
def create_default_tasks(self):
PeriodicTask.objects.get_or_create(
name="Send scheduled mail",
task="ietf.utils.tasks.send_scheduled_mail_task",
defaults=dict(
enabled=False,
crontab=self.crontabs["every_15m"],
description="Send mail scheduled to go out at certain times"
),
)
PeriodicTask.objects.get_or_create(
name="Partial sync with RFC Editor index",
task="ietf.review.tasks.rfc_editor_index_update_task",
kwargs=json.dumps(dict(full_index=False)),
defaults=dict(
enabled=False,
crontab=self.crontabs["every_15m"],
description=(
"Reparse the last _year_ of RFC index entries until "
"https://github.com/ietf-tools/datatracker/issues/3734 is addressed. "
"This takes about 20s on production as of 2022-08-11."
)
),
)
PeriodicTask.objects.get_or_create(
name="Full sync with RFC Editor index",
task="ietf.review.tasks.rfc_editor_index_update_task",
kwargs=json.dumps(dict(full_index=True)),
defaults=dict(
enabled=False,
crontab=self.crontabs["daily"],
description=(
"Run an extended version of the rfc editor update to catch changes with backdated timestamps"
),
),
)
PeriodicTask.objects.get_or_create(
name="Fetch meeting attendance",
task="ietf.stats.tasks.fetch_meeting_attendance_task",
defaults=dict(
enabled=False,
crontab=self.crontabs["daily"],
description="Fetch meeting attendance data from ietf.org/registration/attendees",
),
)
PeriodicTask.objects.get_or_create(
name="Send review reminders",
task="ietf.review.tasks.send_review_reminders_task",
defaults=dict(
enabled=False,
crontab=self.crontabs["daily"],
description="Send reminders originating from the review app",
),
)
def show_tasks(self):
for label, crontab in self.crontabs.items():
tasks = PeriodicTask.objects.filter(crontab=crontab).order_by(
"task", "name"
)
self.stdout.write(f"\n{label} ({crontab.human_readable})\n")
if tasks:
for task in tasks:
desc = f" {task.id:-3d}: {task.task} - {task.name}"
if task.enabled:
self.stdout.write(desc)
else:
self.stdout.write(self.style.NOTICE(f"{desc} - disabled"))
else:
self.stdout.write(" Nothing scheduled")
def enable_tasks(self, pks):
PeriodicTask.objects.filter(
crontab__in=self.crontabs.values(), pk__in=pks
).update(enabled=True)
def disable_tasks(self, pks):
PeriodicTask.objects.filter(
crontab__in=self.crontabs.values(), pk__in=pks
).update(enabled=False)

52
ietf/utils/tasks.py Normal file
View file

@ -0,0 +1,52 @@
# Copyright The IETF Trust 2024 All Rights Reserved
#
# Celery task definitions
#
from django.utils import timezone
from celery import shared_task
from smtplib import SMTPException
from ietf.message.utils import send_scheduled_message_from_send_queue
from ietf.message.models import SendQueue
from ietf.review.tasks import send_review_reminders_task
from ietf.stats.tasks import fetch_meeting_attendance_task
from ietf.sync.tasks import rfc_editor_index_update_task
from ietf.utils import log
from ietf.utils.mail import log_smtp_exception, send_error_email
@shared_task
def every_15m_task():
"""Queue four-times-hourly tasks for execution"""
# todo decide whether we want this to be a meta-task or to individually schedule the tasks
send_scheduled_mail_task.delay()
# Parse the last year of RFC index data to get new RFCs. Needed until
# https://github.com/ietf-tools/datatracker/issues/3734 is addressed.
rfc_editor_index_update_task.delay(full_index=False)
@shared_task
def daily_task():
"""Queue daily tasks for execution"""
fetch_meeting_attendance_task.delay()
send_review_reminders_task.delay()
# Run an extended version of the rfc editor update to catch changes
# with backdated timestamps
rfc_editor_index_update_task.delay(full_index=True)
@shared_task
def send_scheduled_mail_task():
"""Send scheduled email
This is equivalent to `ietf/bin/send-scheduled-mail all`, which was the only form used in the cron job.
"""
needs_sending = SendQueue.objects.filter(sent_at=None).select_related("message")
for s in needs_sending:
try:
send_scheduled_message_from_send_queue(s)
log.log('Sent scheduled message %s "%s"' % (s.id, s.message.subject))
except SMTPException as e:
log_smtp_exception(e)
send_error_email(e)