From ba4cfbf204496c23307f9cac14791da5381e5205 Mon Sep 17 00:00:00 2001 From: Olivier Leger Date: Mon, 6 Feb 2023 16:10:04 -0500 Subject: [PATCH 1/3] Update "clean_duplicated_submissions' management command --- .../commands/clean_duplicated_submissions.py | 240 +++++++++++------- onadata/apps/logger/xform_instance_parser.py | 31 ++- 2 files changed, 176 insertions(+), 95 deletions(-) diff --git a/onadata/apps/logger/management/commands/clean_duplicated_submissions.py b/onadata/apps/logger/management/commands/clean_duplicated_submissions.py index 6050a51a9..71d664c2b 100644 --- a/onadata/apps/logger/management/commands/clean_duplicated_submissions.py +++ b/onadata/apps/logger/management/commands/clean_duplicated_submissions.py @@ -1,29 +1,28 @@ -#!/usr/bin/env python -# vim: ai ts=4 sts=4 et sw=4 fileencoding=utf-8 # coding: utf-8 +from collections import defaultdict + from django.conf import settings -from django.core.management.base import BaseCommand, CommandError +from django.core.management.base import BaseCommand from django.db import transaction -from django.db.models import Sum +from django.db.models import F from django.db.models.aggregates import Count -from django.utils import timezone from onadata.apps.logger.models.attachment import Attachment from onadata.apps.logger.models.instance import Instance from onadata.apps.viewer.models.parsed_instance import ParsedInstance -from onadata.apps.logger.models.xform import XForm -from onadata.libs.utils.common_tags import MONGO_STRFTIME +from onadata.apps.logger.models.daily_xform_submission_counter import ( + DailyXFormSubmissionCounter, +) +from onadata.apps.logger.models.monthly_xform_submission_counter import ( + MonthlyXFormSubmissionCounter, +) +from onadata.apps.logger.xform_instance_parser import set_meta class Command(BaseCommand): help = "Deletes duplicated submissions (i.e same `uuid` and same `xml`)" - def __init__(self, **kwargs): - super().__init__(**kwargs) - self.__vaccuum = False - self.__users = set([]) - def add_arguments(self, parser): super().add_arguments(parser) @@ -39,11 +38,20 @@ def add_arguments(self, parser): help="Specify a XForm's `id_string` to clean up only this form", ) + parser.add_argument( + "--delete-unique-uuids", + action='store_true', + default=False, + help="Delete duplicates with identical uuid", + ) + def handle(self, *args, **options): username = options['user'] xform_id_string = options['xform'] + self._delete_unique_uuids = options['delete_unique_uuids'] + self._verbosity = options['verbosity'] - # Retrieve all instances with the same `uuid`. + # Retrieve all instances with the same `xml_hash`. query = Instance.objects if xform_id_string: query = query.filter(xform__id_string=xform_id_string) @@ -51,87 +59,137 @@ def handle(self, *args, **options): if username: query = query.filter(xform__user__username=username) - query = query.values_list('uuid', flat=True)\ - .annotate(count_uuid=Count('uuid'))\ - .filter(count_uuid__gt=1)\ + query = ( + query.values_list('xml_hash', flat=True) + .annotate(count_xml_hash=Count('xml_hash')) + .filter(count_xml_hash__gt=1) .distinct() + ) - for uuid in query.all(): - - duplicated_query = Instance.objects.filter(uuid=uuid) - - instances_with_same_uuid = duplicated_query.values_list('id', - 'xml_hash')\ - .order_by('xml_hash', 'date_created') - xml_hash_ref = None - instance_id_ref = None - - duplicated_instance_ids = [] - for instance_with_same_uuid in instances_with_same_uuid: - instance_id = instance_with_same_uuid[0] - instance_xml_hash = instance_with_same_uuid[1] - - if instance_xml_hash != xml_hash_ref: - self.__clean_up(instance_id_ref, - duplicated_instance_ids) - xml_hash_ref = instance_xml_hash - instance_id_ref = instance_id - duplicated_instance_ids = [] - continue - - duplicated_instance_ids.append(instance_id) - - self.__clean_up(instance_id_ref, - duplicated_instance_ids) - - if not self.__vaccuum: - self.stdout.write('No instances have been purged.') - else: - # Update number of submissions for each user. - for user_ in list(self.__users): - result = XForm.objects.filter(user_id=user_.id)\ - .aggregate(count=Sum('num_of_submissions')) - user_.profile.num_of_submissions = result['count'] - self.stdout.write( - "\tUpdating `{}`'s number of submissions".format( - user_.username)) - user_.profile.save(update_fields=['num_of_submissions']) + for xml_hash in query.iterator(): + + duplicates_queryset = Instance.objects.filter(xml_hash=xml_hash) + + instances_with_same_xml_hash = duplicates_queryset.values( + 'id', 'uuid', 'xform_id' + ).order_by('xform_id', 'uuid', 'date_created') + + duplicates_by_xform = self._get_duplicates_by_xform( + instances_with_same_xml_hash + ) + + for ( + xform_id, + instances_with_same_xml_hash, + ) in duplicates_by_xform.items(): + instance_ref = instances_with_same_xml_hash.pop(0) + self._clean_up(instance_ref, instances_with_same_xml_hash) + + def _clean_up(self, instance_ref, duplicated_instances): + + if duplicated_instances: + + if self._replace_duplicates(duplicated_instances): + return + + self._delete_duplicates(instance_ref, duplicated_instances) + + def _delete_duplicates( + self, instance_ref: dict, duplicated_instances: list[dict] + ): + + duplicated_instance_ids = [i['id'] for i in duplicated_instances] + + if self._verbosity >= 1: + self.stdout.write( + f"Deleting instance #{instance_ref['id']} duplicates…" + ) + + with transaction.atomic(): + # Update attachments + Attachment.objects.select_for_update().filter( + instance_id__in=duplicated_instance_ids + ).update(instance_id=instance_ref['id']) + if self._verbosity >= 2: self.stdout.write( - '\t\tDone! New number: {}'.format(result['count'])) - - def __clean_up(self, instance_id_ref, duplicated_instance_ids): - if instance_id_ref is not None and len(duplicated_instance_ids) > 0: - self.__vaccuum = True - with transaction.atomic(): - self.stdout.write('Link attachments to instance #{}'.format( - instance_id_ref)) - # Update attachments - Attachment.objects.select_for_update()\ - .filter(instance_id__in=duplicated_instance_ids)\ - .update(instance_id=instance_id_ref) - - # Update Mongo - main_instance = Instance.objects.select_for_update()\ - .get(id=instance_id_ref) - main_instance.parsed_instance.save() - - self.stdout.write('\tPurging instances: {}'.format( - duplicated_instance_ids)) - Instance.objects.select_for_update()\ - .filter(id__in=duplicated_instance_ids).delete() - ParsedInstance.objects.select_for_update()\ - .filter(instance_id__in=duplicated_instance_ids).delete() - settings.MONGO_DB.instances.remove( - {'_id': {'$in': duplicated_instance_ids}} + f"\tLinked attachments to instance #{instance_ref['id']}" ) - # Update number of submissions - xform = main_instance.xform - self.stdout.write( - '\tUpdating number of submissions of XForm #{} ({})'.format( - xform.id, xform.id_string)) - xform_submission_count = xform.submission_count(force_update=True) + + # Update Mongo + main_instance = Instance.objects.select_for_update().get( + id=instance_ref['id'] + ) + main_instance.parsed_instance.save() + + ParsedInstance.objects.filter( + instance_id__in=duplicated_instance_ids + ).delete() + + instance_queryset = Instance.objects.filter( + id__in=duplicated_instance_ids + ) + # update counters + for instance in instance_queryset.values( + 'xform_id', 'date_created__date', 'xform__user_id' + ): + MonthlyXFormSubmissionCounter.objects.filter( + year=instance['date_created__date'].year, + month=instance['date_created__date'].month, + user_id=instance['xform__user_id'], + xform_id=instance['xform_id'], + ).update(counter=F('counter') - 1) + + DailyXFormSubmissionCounter.objects.filter( + date=instance['date_created__date'], + xform_id=instance['xform_id'], + ).update(counter=F('counter') - 1) + + instance_queryset.delete() + + settings.MONGO_DB.instances.remove( + {'_id': {'$in': duplicated_instance_ids}} + ) + if self._verbosity > 1: self.stdout.write( - '\t\tDone! New number: {}'.format(xform_submission_count)) - self.stdout.write('') + f'\tPurged instance IDs: {duplicated_instance_ids}' + ) + + def _replace_duplicates(self, duplicated_instances: list) -> bool: + uniq__uuids = set([i['uuid'] for i in duplicated_instances]) + + if len(uniq__uuids) > 1 or self._delete_unique_uuids: + return False + + duplicates = [] + + for idx, duplicated_instance in enumerate(duplicated_instances): + try: + instance = Instance.objects.get(pk=duplicated_instance['id']) + except Instance.DoesNotExist: + pass + else: + if self._verbosity > 1: + self.stdout.write( + f'\tUpdating instance #{instance.pk} ({instance.uuid})…' + ) + + instance.uuid = f'DUPLICATE {idx} {instance.uuid}' + instance.xml = set_meta( + instance.xml, 'instanceID', instance.uuid + ) + instance.xml_hash = instance.get_hash(instance.xml) + duplicates.append(instance) + + if duplicates: + Instance.objects.bulk_update( + duplicates, fields=['uuid', 'xml', 'xml_hash'] + ) + + return True + + def _get_duplicates_by_xform(self, queryset): + duplicates_by_xform = defaultdict(list) + for record in queryset: + duplicates_by_xform[record['xform_id']].append(record) - self.__users.add(xform.user) + return duplicates_by_xform diff --git a/onadata/apps/logger/xform_instance_parser.py b/onadata/apps/logger/xform_instance_parser.py index 4251b7734..4f263b6d5 100644 --- a/onadata/apps/logger/xform_instance_parser.py +++ b/onadata/apps/logger/xform_instance_parser.py @@ -1,7 +1,10 @@ # coding: utf-8 +from __future__ import annotations + import logging import re import sys +from typing import Union import dateutil.parser import six @@ -40,7 +43,9 @@ class InstanceMultipleNodeError(Exception): pass -def get_meta_from_xml(xml_str, meta_name): +def get_meta_node_from_xml( + xml_str: str, meta_name: str +) -> Union[None, tuple[str, minidom.Document]]: xml = clean_and_parse_xml(xml_str) children = xml.childNodes # children ideally contains a single element @@ -65,8 +70,13 @@ def get_meta_from_xml(xml_str, meta_name): return None uuid_tag = uuid_tags[0] - return uuid_tag.firstChild.nodeValue.strip() if uuid_tag.firstChild\ - else None + return uuid_tag, xml + + +def get_meta_from_xml(xml_str: str, meta_name: str) -> str: + if node_and_root := get_meta_node_from_xml(xml_str, meta_name): + node, _ = node_and_root + return node.firstChild.nodeValue.strip() if node.firstChild else None def get_uuid_from_xml(xml): @@ -118,13 +128,26 @@ def get_deprecated_uuid_from_xml(xml): return None -def clean_and_parse_xml(xml_string): +def clean_and_parse_xml(xml_string: str) -> minidom.Document: clean_xml_str = xml_string.strip() clean_xml_str = re.sub(r">\s+<", "><", smart_str(clean_xml_str)) xml_obj = minidom.parseString(clean_xml_str) return xml_obj +def set_meta(xml_str: str, meta_name: str, new_value: str) -> str: + + if not (node_and_root := get_meta_node_from_xml(xml_str, meta_name)): + raise ValueError(f"{meta_name} node not found.") + + node, root = node_and_root + + if node.firstChild: + node.firstChild.nodeValue = new_value + + return root.toxml() + + def _xml_node_to_dict(node, repeats=[]): assert isinstance(node, minidom.Node) if len(node.childNodes) == 0: From 32e38a7cbe21591ffaa5a8f9dc38bea7c2dec4a9 Mon Sep 17 00:00:00 2001 From: Olivier Leger Date: Wed, 3 May 2023 15:44:42 -0400 Subject: [PATCH 2/3] Avoid duplicate submissions to be saved --- conftest.py | 20 ++- dependencies/pip/dev_requirements.in | 1 + dependencies/pip/dev_requirements.txt | 11 +- dependencies/pip/requirements.txt | 4 +- onadata/apps/django_digest_backends/cache.py | 2 +- onadata/apps/logger/exceptions.py | 35 ++++- onadata/apps/logger/models/__init__.py | 1 - onadata/apps/logger/models/instance.py | 1 - onadata/apps/logger/models/xform.py | 2 +- .../logger/tests/test_simple_submission.py | 4 +- onadata/apps/logger/xform_instance_parser.py | 31 +--- onadata/libs/utils/logger_tools.py | 134 +++++++++++------- onadata/settings/base.py | 4 + onadata/settings/testing.py | 6 + 14 files changed, 166 insertions(+), 90 deletions(-) diff --git a/conftest.py b/conftest.py index e357e0b76..18f3c43d3 100644 --- a/conftest.py +++ b/conftest.py @@ -1,9 +1,11 @@ # coding: utf-8 import os -import pytest import sys +import fakeredis +import pytest from django.conf import settings +from mock import patch from onadata.libs.utils.storage import rmdir, default_storage @@ -78,6 +80,22 @@ def setup(request): request.addfinalizer(_tear_down) +@pytest.fixture(scope='session', autouse=True) +def default_session_fixture(request): + """ + Globally patch redis_client with fake redis + """ + with patch( + 'kobo_service_account.models.ServiceAccountUser.redis_client', + fakeredis.FakeStrictRedis(), + ): + with patch( + 'onadata.apps.django_digest_backends.cache.RedisCacheNonceStorage._get_cache', + fakeredis.FakeStrictRedis, + ): + yield + + def _tear_down(): print("\nCleaning testing environment...") print('Removing MongoDB...') diff --git a/dependencies/pip/dev_requirements.in b/dependencies/pip/dev_requirements.in index 51a2959c8..68f1cfe6f 100644 --- a/dependencies/pip/dev_requirements.in +++ b/dependencies/pip/dev_requirements.in @@ -10,3 +10,4 @@ pytest-env mongomock mock httmock +fakeredis[lua] diff --git a/dependencies/pip/dev_requirements.txt b/dependencies/pip/dev_requirements.txt index 19a7e1e12..d286ab18c 100644 --- a/dependencies/pip/dev_requirements.txt +++ b/dependencies/pip/dev_requirements.txt @@ -1,6 +1,6 @@ # -# This file is autogenerated by pip-compile with python 3.10 -# To update, run: +# This file is autogenerated by pip-compile with Python 3.10 +# by the following command: # # pip-compile dependencies/pip/dev_requirements.in # @@ -166,6 +166,8 @@ et-xmlfile==1.1.0 # via openpyxl executing==0.8.3 # via stack-data +fakeredis[lua]==2.11.2 + # via -r dependencies/pip/dev_requirements.in gdata-python3==3.0.1 # via -r dependencies/pip/requirements.in httmock==1.4.0 @@ -194,6 +196,8 @@ jwcrypto==1.0 # via django-oauth-toolkit kombu==5.2.4 # via celery +lupa==1.14.1 + # via fakeredis lxml==4.8.0 # via # -r dependencies/pip/requirements.in @@ -290,6 +294,7 @@ redis==4.2.2 # celery # django-redis # django-redis-sessions + # fakeredis # kobo-service-account requests==2.27.1 # via @@ -320,6 +325,8 @@ six==1.16.0 # isodate # mongomock # python-dateutil +sortedcontainers==2.4.0 + # via fakeredis sqlparse==0.4.2 # via # -r dependencies/pip/dev_requirements.in diff --git a/dependencies/pip/requirements.txt b/dependencies/pip/requirements.txt index e17d5d5c4..cb6850f78 100644 --- a/dependencies/pip/requirements.txt +++ b/dependencies/pip/requirements.txt @@ -1,6 +1,6 @@ # -# This file is autogenerated by pip-compile with python 3.10 -# To update, run: +# This file is autogenerated by pip-compile with Python 3.10 +# by the following command: # # pip-compile dependencies/pip/requirements.in # diff --git a/onadata/apps/django_digest_backends/cache.py b/onadata/apps/django_digest_backends/cache.py index 619b75c35..79b24ec06 100644 --- a/onadata/apps/django_digest_backends/cache.py +++ b/onadata/apps/django_digest_backends/cache.py @@ -5,7 +5,7 @@ NONCE_NO_COUNT = '' # Needs to be something other than None to determine not set vs set to null -class RedisCacheNonceStorage(): +class RedisCacheNonceStorage: _blocking_timeout = 30 def _get_cache(self): diff --git a/onadata/apps/logger/exceptions.py b/onadata/apps/logger/exceptions.py index 74b31a19b..f8c2b9e48 100644 --- a/onadata/apps/logger/exceptions.py +++ b/onadata/apps/logger/exceptions.py @@ -1,5 +1,13 @@ # coding: utf-8 -from django.utils.translation import gettext as t + +class ConflictingXMLHashInstanceError(Exception): + pass + + +class DuplicateInstanceError(Exception): + + def __init__(self, message='Duplicate Instance'): + super().__init__(message) class DuplicateUUIDError(Exception): @@ -10,5 +18,30 @@ class FormInactiveError(Exception): pass +class InstanceEmptyError(Exception): + + def __init__(self, message='Empty instance'): + super().__init__(message) + + +class InstanceInvalidUserError(Exception): + def __init__(self, message='Could not determine the user'): + super().__init__(message) + + +class InstanceMultipleNodeError(Exception): + pass + + +class InstanceParseError(Exception): + + def __init__(self, message='The instance could not be parsed'): + super().__init__(message) + + class TemporarilyUnavailableError(Exception): pass + + +class XLSFormError(Exception): + pass diff --git a/onadata/apps/logger/models/__init__.py b/onadata/apps/logger/models/__init__.py index 0e130872d..cfb2cf52a 100644 --- a/onadata/apps/logger/models/__init__.py +++ b/onadata/apps/logger/models/__init__.py @@ -3,7 +3,6 @@ from onadata.apps.logger.models.instance import Instance from onadata.apps.logger.models.survey_type import SurveyType from onadata.apps.logger.models.xform import XForm -from onadata.apps.logger.xform_instance_parser import InstanceParseError from onadata.apps.logger.models.note import Note from onadata.apps.logger.models.daily_xform_submission_counter import ( DailyXFormSubmissionCounter, diff --git a/onadata/apps/logger/models/instance.py b/onadata/apps/logger/models/instance.py index 7b6d40342..47cb9855a 100644 --- a/onadata/apps/logger/models/instance.py +++ b/onadata/apps/logger/models/instance.py @@ -1,6 +1,5 @@ # coding: utf-8 from hashlib import sha256 - try: from zoneinfo import ZoneInfo except ImportError: diff --git a/onadata/apps/logger/models/xform.py b/onadata/apps/logger/models/xform.py index 2217e61e0..e0c9db53b 100644 --- a/onadata/apps/logger/models/xform.py +++ b/onadata/apps/logger/models/xform.py @@ -24,7 +24,7 @@ from onadata.apps.logger.models.monthly_xform_submission_counter import ( MonthlyXFormSubmissionCounter, ) -from onadata.apps.logger.xform_instance_parser import XLSFormError +from onadata.apps.logger.exceptions import XLSFormError from onadata.koboform.pyxform_utils import convert_csv_to_xls from onadata.libs.constants import ( CAN_ADD_SUBMISSIONS, diff --git a/onadata/apps/logger/tests/test_simple_submission.py b/onadata/apps/logger/tests/test_simple_submission.py index ae875f10f..ece72cd51 100644 --- a/onadata/apps/logger/tests/test_simple_submission.py +++ b/onadata/apps/logger/tests/test_simple_submission.py @@ -3,7 +3,7 @@ from django.test import TestCase, RequestFactory from pyxform import SurveyElementBuilder -from onadata.apps.logger.xform_instance_parser import DuplicateInstance +from onadata.apps.logger.exceptions import DuplicateInstanceError from onadata.apps.viewer.models.data_dictionary import DataDictionary from onadata.libs.utils.logger_tools import ( create_instance, safe_create_instance @@ -39,7 +39,7 @@ def _submit_at_hour(self, hour): '_time>' % hour try: create_instance(self.user.username, TempFileProxy(st_xml), []) - except DuplicateInstance: + except DuplicateInstanceError: pass def _submit_simple_yes(self): diff --git a/onadata/apps/logger/xform_instance_parser.py b/onadata/apps/logger/xform_instance_parser.py index 4f263b6d5..7e50ba268 100644 --- a/onadata/apps/logger/xform_instance_parser.py +++ b/onadata/apps/logger/xform_instance_parser.py @@ -12,37 +12,10 @@ from django.utils.translation import gettext as t from xml.dom import minidom, Node +from onadata.apps.logger.exceptions import InstanceEmptyError from onadata.libs.utils.common_tags import XFORM_ID_STRING -class XLSFormError(Exception): - pass - - -class DuplicateInstance(Exception): - def __str__(self): - return t("Duplicate Instance") - - -class InstanceInvalidUserError(Exception): - def __str__(self): - return t("Could not determine the user.") - - -class InstanceParseError(Exception): - def __str__(self): - return t("The instance could not be parsed.") - - -class InstanceEmptyError(InstanceParseError): - def __str__(self): - return t("Empty instance") - - -class InstanceMultipleNodeError(Exception): - pass - - def get_meta_node_from_xml( xml_str: str, meta_name: str ) -> Union[None, tuple[str, minidom.Document]]: @@ -138,7 +111,7 @@ def clean_and_parse_xml(xml_string: str) -> minidom.Document: def set_meta(xml_str: str, meta_name: str, new_value: str) -> str: if not (node_and_root := get_meta_node_from_xml(xml_str, meta_name)): - raise ValueError(f"{meta_name} node not found.") + raise ValueError(f'{meta_name} node not found') node, root = node_and_root diff --git a/onadata/libs/utils/logger_tools.py b/onadata/libs/utils/logger_tools.py index 574456199..fd776c89d 100644 --- a/onadata/libs/utils/logger_tools.py +++ b/onadata/libs/utils/logger_tools.py @@ -1,18 +1,13 @@ # coding: utf-8 from __future__ import annotations +import contextlib import os import re import sys import traceback from datetime import date, datetime from xml.parsers.expat import ExpatError - -from onadata.apps.logger.signals import ( - update_user_profile_attachment_storage_bytes, - update_xform_attachment_storage_bytes, -) - try: from zoneinfo import ZoneInfo except ImportError: @@ -23,7 +18,7 @@ from django.core.exceptions import ValidationError, PermissionDenied from django.core.files.storage import get_storage_class from django.core.mail import mail_admins -from django.db import IntegrityError, transaction +from django.db import connection, IntegrityError, transaction from django.db.models import Q from django.http import ( HttpResponse, @@ -43,6 +38,7 @@ from wsgiref.util import FileWrapper from onadata.apps.logger.exceptions import ( + ConflictingXMLHashInstanceError, DuplicateUUIDError, FormInactiveError, TemporarilyUnavailableError, @@ -59,12 +55,18 @@ update_xform_monthly_counter, update_xform_submission_count, ) -from onadata.apps.logger.models.xform import XLSFormError -from onadata.apps.logger.xform_instance_parser import ( +from onadata.apps.logger.exceptions import XLSFormError +from onadata.apps.logger.signals import ( + update_user_profile_attachment_storage_bytes, + update_xform_attachment_storage_bytes, +) +from onadata.apps.logger.exceptions import ( InstanceEmptyError, InstanceInvalidUserError, InstanceMultipleNodeError, - DuplicateInstance, + DuplicateInstanceError, +) +from onadata.apps.logger.xform_instance_parser import ( clean_and_parse_xml, get_uuid_from_xml, get_deprecated_uuid_from_xml, @@ -131,7 +133,6 @@ def check_edit_submission_permissions( )) -@transaction.atomic # paranoia; redundant since `ATOMIC_REQUESTS` set to `True` def create_instance( username: str, xml_file: str, @@ -158,43 +159,47 @@ def create_instance( # get new and deprecated uuid's new_uuid = get_uuid_from_xml(xml) - # Dorey's rule from 2012 (commit 890a67aa): - # Ignore submission as a duplicate IFF - # * a submission's XForm collects start time - # * the submitted XML is an exact match with one that - # has already been submitted for that user. - # The start-time requirement protected submissions with identical responses - # from being rejected as duplicates *before* KoBoCAT had the concept of - # submission UUIDs. Nowadays, OpenRosa requires clients to send a UUID (in - # ``) within every submission; if the incoming XML has a UUID - # and still exactly matches an existing submission, it's certainly a - # duplicate (https://docs.opendatakit.org/openrosa-metadata/#fields). - if xform.has_start_time or new_uuid is not None: - # XML matches are identified by identical content hash OR, when a - # content hash is not present, by string comparison of the full - # content, which is slow! Use the management command - # `populate_xml_hashes_for_instances` to hash existing submissions - existing_instance = Instance.objects.filter( - Q(xml_hash=xml_hash) | Q(xml_hash=Instance.DEFAULT_XML_HASH, xml=xml), - xform__user=xform.user, - ).first() - else: - existing_instance = None - - if existing_instance: - existing_instance.check_active(force=False) - # ensure we have saved the extra attachments - new_attachments = save_attachments(existing_instance, media_files) - if not new_attachments: - raise DuplicateInstance() + with get_instance_lock(xml_hash) as lock_acquired: + if not lock_acquired: + raise ConflictingXMLHashInstanceError() + + # Dorey's rule from 2012 (commit 890a67aa): + # Ignore submission as a duplicate IFF + # * a submission's XForm collects start time + # * the submitted XML is an exact match with one that + # has already been submitted for that user. + # The start-time requirement protected submissions with identical responses + # from being rejected as duplicates *before* KoBoCAT had the concept of + # submission UUIDs. Nowadays, OpenRosa requires clients to send a UUID (in + # ``) within every submission; if the incoming XML has a UUID + # and still exactly matches an existing submission, it's certainly a + # duplicate (https://docs.opendatakit.org/openrosa-metadata/#fields). + if xform.has_start_time or new_uuid is not None: + # XML matches are identified by identical content hash OR, when a + # content hash is not present, by string comparison of the full + # content, which is slow! Use the management command + # `populate_xml_hashes_for_instances` to hash existing submissions + existing_instance = Instance.objects.filter( + Q(xml_hash=xml_hash) | Q(xml_hash=Instance.DEFAULT_XML_HASH, xml=xml), + xform__user=xform.user, + ).first() else: - # Update Mongo via the related ParsedInstance - existing_instance.parsed_instance.save(asynchronous=False) - return existing_instance - else: - instance = save_submission(request, xform, xml, media_files, new_uuid, - status, date_created_override) - return instance + existing_instance = None + + if existing_instance: + existing_instance.check_active(force=False) + # ensure we have saved the extra attachments + new_attachments = save_attachments(existing_instance, media_files) + if not new_attachments: + raise DuplicateInstanceError() + else: + # Update Mongo via the related ParsedInstance + existing_instance.parsed_instance.save(asynchronous=False) + return existing_instance + else: + instance = save_submission(request, xform, xml, media_files, new_uuid, + status, date_created_override) + return instance def disposition_ext_and_date(name, extension, show_date=True): @@ -213,6 +218,32 @@ def dict2xform(jsform, form_id): return xml_head + dict2xml(jsform) + xml_tail +@contextlib.contextmanager +def get_instance_lock(xml_hash: str) -> bool: + int_lock = int(xml_hash, 16) & 0xfffffffffffffff + is_postgresql = connection.vendor == 'postgresql' + acquired = False + + with transaction.atomic(): + try: + if is_postgresql: + cur = connection.cursor() + cur.execute('SELECT pg_try_advisory_lock(%s::bigint);', (int_lock,)) + acquired = cur.fetchone()[0] + else: + prefix = os.getenv('KOBOCAT_REDIS_LOCK_PREFIX', 'kc-lock') + key_ = f'{prefix}:{int_lock}' + redis_lock = settings.REDIS_LOCK_CLIENT.lock(key_, timeout=60) + acquired = redis_lock.acquire(blocking=False) + yield acquired + finally: + if is_postgresql: + cur.execute('SELECT pg_advisory_unlock(%s::bigint);', (int_lock,)) + cur.close() + elif acquired: + redis_lock.release() + + def get_instance_or_404(**criteria): """ Mimic `get_object_or_404` but handles duplicate records. @@ -541,8 +572,13 @@ def safe_create_instance(username, xml_file, media_files, uuid, request): ) except ExpatError as e: error = OpenRosaResponseBadRequest(t("Improperly formatted XML.")) - except DuplicateInstance: - response = OpenRosaResponse(t("Duplicate submission")) + except ConflictingXMLHashInstanceError: + response = OpenRosaResponse(t('Conflict with already existing instance')) + response.status_code = 409 + response['Location'] = request.build_absolute_uri(request.path) + error = response + except DuplicateInstanceError: + response = OpenRosaResponse(t('Duplicate instance')) response.status_code = 202 response['Location'] = request.build_absolute_uri(request.path) error = response diff --git a/onadata/settings/base.py b/onadata/settings/base.py index 49f6a3e1e..11b42c404 100644 --- a/onadata/settings/base.py +++ b/onadata/settings/base.py @@ -7,6 +7,7 @@ from urllib.parse import quote_plus import environ +import redis from celery.schedules import crontab from django.core.exceptions import SuspiciousOperation from pymongo import MongoClient @@ -680,6 +681,9 @@ def skip_suspicious_operations(record): # NOTE: this should be set to False for major deployments. This can take a long time SKIP_HEAVY_MIGRATIONS = env.bool('SKIP_HEAVY_MIGRATIONS', False) +redis_lock_url = env.cache_url('REDIS_LOCK_URL', default=redis_session_url) +REDIS_LOCK_CLIENT = redis.Redis.from_url(redis_lock_url['LOCATION']) + ################################ # Celery settings # ################################ diff --git a/onadata/settings/testing.py b/onadata/settings/testing.py index 45d9d94de..6e2105a1c 100644 --- a/onadata/settings/testing.py +++ b/onadata/settings/testing.py @@ -1,6 +1,7 @@ # coding: utf-8 import os +from fakeredis import FakeConnection, FakeStrictRedis from mongomock import MongoClient as MockMongoClient from .base import * @@ -24,6 +25,9 @@ SECRET_KEY = os.urandom(50).hex() SESSION_ENGINE = 'django.contrib.sessions.backends.db' +CACHES['default']['OPTIONS'] = { + 'connection_class': FakeConnection +} ################################### # Django Rest Framework settings # @@ -53,6 +57,8 @@ SERVICE_ACCOUNT['WHITELISTED_HOSTS'] = ['testserver'] SERVICE_ACCOUNT['NAMESPACE'] = 'kobo-service-account-test' +REDIS_LOCK_CLIENT = FakeStrictRedis() + ################################ # Celery settings # ################################ From 019e7bf018340225423cbb1f86943e8a4df9d8b3 Mon Sep 17 00:00:00 2001 From: Olivier Leger Date: Mon, 8 May 2023 09:53:44 -0400 Subject: [PATCH 3/3] Add unit test for concurrent duplicate data --- .github/workflows/pytest.yml | 2 +- .gitlab-ci.yml | 3 +- conftest.py | 2 +- onadata/apps/api/tests/fixtures/users.json | 19 +++ .../viewsets/test_xform_submission_api.py | 109 ++++++++++++++++++ onadata/settings/base.py | 5 +- onadata/settings/testing.py | 7 +- 7 files changed, 141 insertions(+), 6 deletions(-) create mode 100644 onadata/apps/api/tests/fixtures/users.json diff --git a/.github/workflows/pytest.yml b/.github/workflows/pytest.yml index 878a819ac..35b182420 100644 --- a/.github/workflows/pytest.yml +++ b/.github/workflows/pytest.yml @@ -52,7 +52,7 @@ jobs: - name: Install Python dependencies run: pip-sync dependencies/pip/dev_requirements.txt - name: Run pytest - run: pytest -vv -rf + run: pytest -vv -rf --disable-warnings env: DJANGO_SECRET_KEY: ${{ secrets.DJANGO_SECRET_KEY }} TEST_DATABASE_URL: postgis://kobo:kobo@localhost:5432/kobocat_test diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 102745e94..96f42782d 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -32,6 +32,7 @@ test: POSTGRES_PASSWORD: kobo POSTGRES_DB: kobocat_test SERVICE_ACCOUNT_BACKEND_URL: redis://redis_cache:6379/4 + GIT_LAB: "True" services: - name: postgis/postgis:14-3.2 alias: postgres @@ -40,7 +41,7 @@ test: script: - apt-get update && apt-get install -y ghostscript gdal-bin libproj-dev gettext openjdk-11-jre - pip install -r dependencies/pip/dev_requirements.txt - - pytest -vv -rf + - pytest -vv -rf --disable-warnings deploy-beta: stage: deploy diff --git a/conftest.py b/conftest.py index 18f3c43d3..d370e61dd 100644 --- a/conftest.py +++ b/conftest.py @@ -83,7 +83,7 @@ def setup(request): @pytest.fixture(scope='session', autouse=True) def default_session_fixture(request): """ - Globally patch redis_client with fake redis + Globally patch redis_client with fakeredis """ with patch( 'kobo_service_account.models.ServiceAccountUser.redis_client', diff --git a/onadata/apps/api/tests/fixtures/users.json b/onadata/apps/api/tests/fixtures/users.json new file mode 100644 index 000000000..8c2391fc9 --- /dev/null +++ b/onadata/apps/api/tests/fixtures/users.json @@ -0,0 +1,19 @@ +[ + { + "fields": { + "date_joined": "2015-02-12T19:52:14.406Z", + "email": "bob@columbia.edu", + "first_name": "bob", + "groups": [], + "is_active": true, + "is_staff": false, + "is_superuser": false, + "last_login": "2015-02-12T19:52:14.406Z", + "last_name": "bob", + "password": "pbkdf2_sha256$260000$jSfi1lb5FclOUV9ZodfCdP$Up19DmjLFtBh0VREyow/oduVkwEoqQftljfwq6b9vIo=", + "username": "bob" + }, + "model": "auth.user", + "pk": 2 + } +] diff --git a/onadata/apps/api/tests/viewsets/test_xform_submission_api.py b/onadata/apps/api/tests/viewsets/test_xform_submission_api.py index 7317d8d5f..3779a40f2 100644 --- a/onadata/apps/api/tests/viewsets/test_xform_submission_api.py +++ b/onadata/apps/api/tests/viewsets/test_xform_submission_api.py @@ -1,11 +1,18 @@ # coding: utf-8 +import multiprocessing import os import uuid +from collections import defaultdict +from functools import partial +import pytest +import requests import simplejson as json from django.conf import settings from django.contrib.auth.models import AnonymousUser from django.core.files.uploadedfile import InMemoryUploadedFile +from django.test.testcases import LiveServerTestCase +from django.urls import reverse from django_digest.test import DigestAuth from guardian.shortcuts import assign_perm from kobo_service_account.utils import get_request_headers @@ -15,6 +22,7 @@ TestAbstractViewSet from onadata.apps.api.viewsets.xform_submission_api import XFormSubmissionApi from onadata.apps.logger.models import Attachment +from onadata.apps.main import tests as main_tests from onadata.libs.constants import ( CAN_ADD_SUBMISSIONS ) @@ -441,6 +449,7 @@ def test_edit_submission_with_service_account(self): self.assertEqual( response['Location'], 'http://testserver/submission' ) + def test_submission_blocking_flag(self): # Set 'submissions_suspended' True in the profile metadata to test if # submission do fail with the flag set @@ -488,3 +497,103 @@ def test_submission_blocking_flag(self): ) response = self.view(request, username=username) self.assertEqual(response.status_code, status.HTTP_201_CREATED) + + +class ConcurrentSubmissionTestCase(LiveServerTestCase): + """ + Inherit from LiveServerTestCase to be able to test concurrent requests + to submission endpoint in different transactions (and different processes). + Otherwise, DB is populated only on the first request but still empty on + subsequent ones. + """ + + fixtures = ['onadata/apps/api/tests/fixtures/users'] + + def publish_xls_form(self): + + path = os.path.join( + settings.ONADATA_DIR, + 'apps', + 'main', + 'tests', + 'fixtures', + 'transportation', + 'transportation.xls', + ) + + xform_list_url = reverse('xform-list') + self.client.login(username='bob', password='bob') + with open(path, 'rb') as xls_file: + post_data = {'xls_file': xls_file} + response = self.client.post(xform_list_url, data=post_data) + + assert response.status_code == status.HTTP_201_CREATED + + @pytest.mark.skipif( + settings.GIT_LAB, reason='GitLab does not seem to support multi-processes' + ) + def test_post_concurrent_same_submissions(self): + + DUPLICATE_SUBMISSIONS_COUNT = 2 # noqa + + self.publish_xls_form() + username = 'bob' + survey = 'transport_2011-07-25_19-05-49' + results = defaultdict(int) + + with multiprocessing.Pool() as pool: + for result in pool.map( + partial( + submit_data, + live_server_url=self.live_server_url, + survey_=survey, + username_=username, + ), + range(DUPLICATE_SUBMISSIONS_COUNT), + ): + results[result] += 1 + + assert results[status.HTTP_201_CREATED] == 1 + assert results[status.HTTP_409_CONFLICT] == DUPLICATE_SUBMISSIONS_COUNT - 1 + + +def submit_data(identifier, survey_, username_, live_server_url): + """ + Submit data to live server. + + It has to be outside `ConcurrentSubmissionTestCase` class to be pickled by + `multiprocessing.Pool().map()`. + """ + media_file = '1335783522563.jpg' + main_directory = os.path.dirname(main_tests.__file__) + path = os.path.join( + main_directory, + 'fixtures', + 'transportation', + 'instances', + survey_, + media_file, + ) + with open(path, 'rb') as f: + f = InMemoryUploadedFile( + f, + 'media_file', + media_file, + 'image/jpg', + os.path.getsize(path), + None, + ) + submission_path = os.path.join( + main_directory, + 'fixtures', + 'transportation', + 'instances', + survey_, + f'{survey_}.xml', + ) + with open(submission_path) as sf: + files = {'xml_submission_file': sf, 'media_file': f} + response = requests.post( + f'{live_server_url}/{username_}/submission', files=files + ) + return response.status_code diff --git a/onadata/settings/base.py b/onadata/settings/base.py index 11b42c404..73954cacb 100644 --- a/onadata/settings/base.py +++ b/onadata/settings/base.py @@ -681,7 +681,10 @@ def skip_suspicious_operations(record): # NOTE: this should be set to False for major deployments. This can take a long time SKIP_HEAVY_MIGRATIONS = env.bool('SKIP_HEAVY_MIGRATIONS', False) -redis_lock_url = env.cache_url('REDIS_LOCK_URL', default=redis_session_url) +redis_lock_url = env.cache_url( + 'REDIS_LOCK_URL', + default=os.getenv('REDIS_SESSION_URL', 'redis://redis_cache:6380/2'), +) REDIS_LOCK_CLIENT = redis.Redis.from_url(redis_lock_url['LOCATION']) ################################ diff --git a/onadata/settings/testing.py b/onadata/settings/testing.py index 6e2105a1c..232af5ac0 100644 --- a/onadata/settings/testing.py +++ b/onadata/settings/testing.py @@ -1,7 +1,7 @@ # coding: utf-8 import os -from fakeredis import FakeConnection, FakeStrictRedis +from fakeredis import FakeConnection, FakeStrictRedis, FakeServer from mongomock import MongoClient as MockMongoClient from .base import * @@ -57,7 +57,10 @@ SERVICE_ACCOUNT['WHITELISTED_HOSTS'] = ['testserver'] SERVICE_ACCOUNT['NAMESPACE'] = 'kobo-service-account-test' -REDIS_LOCK_CLIENT = FakeStrictRedis() +server = FakeServer() +REDIS_LOCK_CLIENT = FakeStrictRedis(server=server) + +GIT_LAB = os.getenv('GIT_LAB', False) ################################ # Celery settings #