diff --git a/judge/admin/contest.py b/judge/admin/contest.py index 67ffd3bbdb..84642ded93 100644 --- a/judge/admin/contest.py +++ b/judge/admin/contest.py @@ -14,7 +14,7 @@ from reversion.admin import VersionAdmin from django_ace import AceWidget -from judge.models import Class, Contest, ContestProblem, ContestSubmission, Profile, Rating, Submission +from judge.models import Class, Contest, ContestProblem, Profile, Rating, Submission from judge.ratings import rate_contest from judge.utils.views import NoBatchDeleteMixin from judge.widgets import AdminHeavySelect2MultipleWidget, AdminHeavySelect2Widget, AdminMartorWidget, \ @@ -274,13 +274,13 @@ def get_urls(self): ] + super(ContestAdmin, self).get_urls() def rejudge_view(self, request, contest_id, problem_id): - queryset = ContestSubmission.objects.filter(problem_id=problem_id).select_related('submission') - for model in queryset: - model.submission.judge(rejudge=True, rejudge_user=request.user) + judged = sum(Submission.batch_judge( + Submission.objects.filter(contest__problem_id=problem_id), rejudge=True, rejudge_user=request.user, + )) self.message_user(request, ngettext('%d submission was successfully scheduled for rejudging.', '%d submissions were successfully scheduled for rejudging.', - len(queryset)) % len(queryset)) + judged) % judged) return HttpResponseRedirect(reverse('admin:judge_contest_change', args=(contest_id,))) def rate_all_view(self, request): diff --git a/judge/admin/submission.py b/judge/admin/submission.py index 59c74e1302..2f491034df 100644 --- a/judge/admin/submission.py +++ b/judge/admin/submission.py @@ -160,7 +160,6 @@ def judge(self, request, queryset): self.message_user(request, gettext('You do not have the permission to rejudge submissions.'), level=messages.ERROR) return - queryset = queryset.order_by('id') if not request.user.has_perm('judge.rejudge_submission_lot') and \ queryset.count() > settings.DMOJ_SUBMISSIONS_REJUDGE_LIMIT: self.message_user(request, gettext('You do not have the permission to rejudge THAT many submissions.'), @@ -169,9 +168,8 @@ def judge(self, request, queryset): if not request.user.has_perm('judge.edit_all_problem'): id = request.profile.id queryset = queryset.filter(Q(problem__authors__id=id) | Q(problem__curators__id=id)) - judged = len(queryset) - for model in queryset: - model.judge(rejudge=True, batch_rejudge=True, rejudge_user=request.user) + + judged = sum(Submission.batch_judge(queryset, rejudge=True, rejudge_user=request.user)) self.message_user(request, ngettext('%d submission was successfully scheduled for rejudging.', '%d submissions were successfully scheduled for rejudging.', judged) % judged) diff --git a/judge/bridge/django_handler.py b/judge/bridge/django_handler.py index b284b68f78..9ff81c6e58 100644 --- a/judge/bridge/django_handler.py +++ b/judge/bridge/django_handler.py @@ -14,6 +14,7 @@ def __init__(self, request, client_address, server, judges): self.handlers = { 'submission-request': self.on_submission, + 'batch-submission-request': self.on_batch_submission, 'terminate-submission': self.on_termination, 'disconnect-judge': self.on_disconnect_request, } @@ -44,6 +45,23 @@ def on_submission(self, data): self.judges.judge(id, problem, language, source, judge_id, priority) return {'name': 'submission-received', 'submission-id': id} + def on_batch_submission(self, data): + ids = [] + judge_id = data['judge-id'] + priority = data['priority'] + if not self.judges.check_priority(priority): + return {'name': 'bad-request'} + + for submission in data['submissions']: + id = submission['submission-id'] + problem = submission['problem-id'] + language = submission['language'] + source = submission['source'] + self.judges.judge(id, problem, language, source, judge_id, priority) + ids.append(id) + + return {'name': 'submissions-received', 'submission-ids': ids} + def on_termination(self, data): return {'name': 'submission-received', 'judge-aborted': self.judges.abort(data['submission-id'])} diff --git a/judge/judgeapi.py b/judge/judgeapi.py index 42bc000f5c..9ff7ccd38b 100644 --- a/judge/judgeapi.py +++ b/judge/judgeapi.py @@ -3,8 +3,12 @@ import socket import struct import zlib +from operator import attrgetter from django.conf import settings +from django.db import transaction +from django.db.models import BooleanField, F, OuterRef, Subquery +from django.db.models.functions import Coalesce from django.utils import timezone from judge import event_poster as event @@ -50,11 +54,11 @@ def judge_request(packet, reply=True): return result -def judge_submission(submission, rejudge=False, batch_rejudge=False, judge_id=None): +def judge_submission(submission, rejudge=False, judge_id=None): from .models import ContestSubmission, Submission, SubmissionTestCase updates = {'time': None, 'memory': None, 'points': None, 'result': None, 'case_points': 0, 'case_total': 0, - 'error': None, 'rejudged_date': timezone.now() if rejudge or batch_rejudge else None, 'status': 'QU'} + 'error': None, 'rejudged_date': timezone.now() if rejudge else None, 'status': 'QU'} try: # This is set proactively; it might get unset in judgecallback's on_grading_begin if the problem doesn't # actually have pretests stored on the judge. @@ -86,7 +90,7 @@ def judge_submission(submission, rejudge=False, batch_rejudge=False, judge_id=No 'language': submission.language.key, 'source': submission.source.source, 'judge-id': judge_id, - 'priority': BATCH_REJUDGE_PRIORITY if batch_rejudge else (REJUDGE_PRIORITY if rejudge else priority), + 'priority': REJUDGE_PRIORITY if rejudge else priority, }) except BaseException: logger.exception('Failed to send request to judge') @@ -100,6 +104,65 @@ def judge_submission(submission, rejudge=False, batch_rejudge=False, judge_id=No return success +def batch_judge_submission(submissions, rejudge=False, judge_id=None): + from .models import Submission, SubmissionTestCase + updates = { + 'time': None, 'memory': None, 'points': None, 'result': None, 'case_points': 0, 'case_total': 0, + 'error': None, 'rejudged_date': timezone.now() if rejudge else None, 'status': 'QU', + 'is_pretested': Coalesce( + Subquery( + Submission.objects.filter(pk=OuterRef('pk')) + .annotate(pretested=F('contest_object__run_pretests_only').bitand(F('contest__problem__is_pretested'))) + .values_list('pretested')[:1], + output_field=BooleanField(), + ), + False, + ), + } + + with transaction.atomic(): + submission_queryset = Submission.objects.filter(id__in=map(attrgetter('id'), submissions)) \ + .exclude(status__in=('P', 'G')) + submission_queryset.update(**updates) + + ids = set(submission_queryset.values_list('id', flat=True)) + # Do the filtering using the new set of IDs rather than submission_queryset + # because submission_queryset itself takes a list of IDs anyways. + SubmissionTestCase.objects.filter(submission_id__in=ids).delete() + + submissions = [submission for submission in submissions if submission.id in ids] + + try: + response = judge_request({ + 'name': 'batch-submission-request', + 'submissions': [{ + 'submission-id': submission.id, + 'problem-id': submission.problem.code, + 'language': submission.language.key, + 'source': submission.source.source, + } for submission in submissions], + 'judge-id': judge_id, + # This is technically incorrect when rejudge is False. + # Some submissions might be contest submissions and should have priority "CONTEST_SUBMISSION_PRIORITY". + # It's not worth further complicating this code by separating contest submissions and + # normal submissions as that case is not even being used. + # For the moment, let's just set all submissions to have "DEFAULT_PRIORITY" if we're not rejudging. + 'priority': BATCH_REJUDGE_PRIORITY if rejudge else DEFAULT_PRIORITY, + }) + except BaseException: + logger.exception('Failed to send request to judge') + processed_ids = set() + else: + processed_ids = set(response['submission-ids']) if 'submission-ids' in response else set() + + if processed_ids != ids: + Submission.objects.filter(id__in=ids - processed_ids).update(status='IE', result='IE', error='') + + for submission in submissions: + # If the submission is not in processed_ids, that means the submission IE'd and thus is "done" judging. + _post_update_submission(submission, done=submission.id not in processed_ids) + + def disconnect_judge(judge, force=False): judge_request({'name': 'disconnect-judge', 'judge-id': judge.name, 'force': force}, reply=False) diff --git a/judge/models/submission.py b/judge/models/submission.py index 0fd29a1af6..e867155f52 100644 --- a/judge/models/submission.py +++ b/judge/models/submission.py @@ -3,14 +3,14 @@ from django.conf import settings from django.core.exceptions import ObjectDoesNotExist -from django.db import models +from django.db import models, transaction from django.urls import reverse from django.utils import timezone from django.utils.functional import cached_property from django.utils.translation import gettext_lazy as _ from reversion import revisions -from judge.judgeapi import abort_submission, judge_submission +from judge.judgeapi import abort_submission, batch_judge_submission, judge_submission from judge.models.problem import Problem, SubmissionSourceAccess, TranslatedProblemForeignKeyQuerySet from judge.models.profile import Profile from judge.models.runtime import Language @@ -121,7 +121,7 @@ def long_status(self): def is_locked(self): return self.locked_after is not None and self.locked_after < timezone.now() - def judge(self, *args, rejudge=False, force_judge=False, rejudge_user=None, **kwargs): + def judge(self, *, rejudge=False, force_judge=False, rejudge_user=None, **kwargs): if force_judge or not self.is_locked: if rejudge: with revisions.create_revision(manage_manually=True): @@ -129,10 +129,40 @@ def judge(self, *args, rejudge=False, force_judge=False, rejudge_user=None, **kw revisions.set_user(rejudge_user) revisions.set_comment('Rejudged') revisions.add_to_revision(self) - judge_submission(self, *args, rejudge=rejudge, **kwargs) + judge_submission(self, rejudge=rejudge, **kwargs) judge.alters_data = True + @classmethod + def batch_judge(cls, submissions, *, rejudge=False, force_judge=False, rejudge_user=None, batch_size=100, **kwargs): + # Don't trust the caller to follow related objects that we need. + # Also explicitly order by id so we don't get any nasty surprises when splicing later on. + submissions = submissions.select_related('problem', 'language', 'source') + + if not force_judge: + submissions = submissions.exclude(locked_after__lt=timezone.now()) + + # Force evaluate the queryset here so we have a fixed set of submissions that we are working with + # rather than a lazy queryset that could change. + submissions = list(submissions) + + for i in range(0, len(submissions), batch_size): + current_batch = submissions[i:i + batch_size] + + if rejudge: + with transaction.atomic(): + for submission in current_batch: + with revisions.create_revision(manage_manually=True): + if rejudge_user: + revisions.set_user(rejudge_user) + revisions.set_comment('Rejudged') + revisions.add_to_revision(submission) + + batch_judge_submission(current_batch, rejudge=rejudge, **kwargs) + yield len(current_batch) + + batch_judge.alters_data = True + def abort(self): abort_submission(self) diff --git a/judge/tasks/submission.py b/judge/tasks/submission.py index a190ba71ee..f28d81bcb2 100644 --- a/judge/tasks/submission.py +++ b/judge/tasks/submission.py @@ -31,11 +31,9 @@ def rejudge_problem_filter(self, problem_id, id_range=None, languages=None, resu rejudged = 0 with Progress(self, queryset.count()) as p: - for submission in queryset.iterator(): - submission.judge(rejudge=True, batch_rejudge=True, rejudge_user=user) - rejudged += 1 - if rejudged % 10 == 0: - p.done = rejudged + for completed in Submission.batch_judge(queryset, rejudge=True, rejudge_user=user): + p.did(completed) + rejudged += completed return rejudged