datatracker/ietf/doc/tasks.py
Jennifer Richards c9f35987bc
refactor: expire last calls via celery (#7417)
* feat: expire_last_calls_task

* feat: create periodic task for last calls

* test: test new task

* chore: remove expire-last-calls script
2024-05-15 12:04:47 -05:00

93 lines
3.1 KiB
Python

# Copyright The IETF Trust 2024, All Rights Reserved
#
# Celery task definitions
#
import datetime
import debug # pyflakes:ignore
from celery import shared_task
from pathlib import Path
from django.conf import settings
from ietf.utils import log
from ietf.utils.timezone import datetime_today
from .expire import (
in_draft_expire_freeze,
get_expired_drafts,
expirable_drafts,
send_expire_notice_for_draft,
expire_draft,
clean_up_draft_files,
get_soon_to_expire_drafts,
send_expire_warning_for_draft,
)
from .lastcall import get_expired_last_calls, expire_last_call
from .models import Document
from .utils import generate_idnits2_rfc_status, generate_idnits2_rfcs_obsoleted
@shared_task
def expire_ids_task():
try:
if not in_draft_expire_freeze():
log.log("Expiring drafts ...")
for doc in get_expired_drafts():
# verify expirability -- it might have changed after get_expired_drafts() was run
# (this whole loop took about 2 minutes on 04 Jan 2018)
# N.B., re-running expirable_drafts() repeatedly is fairly expensive. Where possible,
# it's much faster to run it once on a superset query of the objects you are going
# to test and keep its results. That's not desirable here because it would defeat
# the purpose of double-checking that a document is still expirable when it is actually
# being marked as expired.
if expirable_drafts(
Document.objects.filter(pk=doc.pk)
).exists() and doc.expires < datetime_today() + datetime.timedelta(1):
send_expire_notice_for_draft(doc)
expire_draft(doc)
log.log(f" Expired draft {doc.name}-{doc.rev}")
log.log("Cleaning up draft files")
clean_up_draft_files()
except Exception as e:
log.log("Exception in expire-ids: %s" % e)
raise
@shared_task
def notify_expirations_task(notify_days=14):
for doc in get_soon_to_expire_drafts(notify_days):
send_expire_warning_for_draft(doc)
@shared_task
def expire_last_calls_task():
for doc in get_expired_last_calls():
try:
expire_last_call(doc)
except Exception:
log.log(f"ERROR: Failed to expire last call for {doc.file_tag()} (id={doc.pk})")
else:
log.log(f"Expired last call for {doc.file_tag()} (id={doc.pk})")
@shared_task
def generate_idnits2_rfc_status_task():
outpath = Path(settings.DERIVED_DIR) / "idnits2-rfc-status"
blob = generate_idnits2_rfc_status()
try:
outpath.write_text(blob, encoding="utf8")
except Exception as e:
log.log(f"failed to write idnits2-rfc-status: {e}")
@shared_task
def generate_idnits2_rfcs_obsoleted_task():
outpath = Path(settings.DERIVED_DIR) / "idnits2-rfcs-obsoleted"
blob = generate_idnits2_rfcs_obsoleted()
try:
outpath.write_text(blob, encoding="utf8")
except Exception as e:
log.log(f"failed to write idnits2-rfcs-obsoleted: {e}")