From 4c48c1d9810e054ac3746ba1e42aaa417f8dfed5 Mon Sep 17 00:00:00 2001 From: John Whitlock Date: Fri, 7 Jun 2024 14:49:17 -0500 Subject: [PATCH 1/7] Move exceptions to emails/exceptions.py --- emails/exceptions.py | 105 ++++++++++++++++++++++++++++++++ emails/models.py | 112 ++++------------------------------- emails/tests/models_tests.py | 10 ++-- emails/views.py | 2 +- privaterelay/views.py | 8 +-- 5 files changed, 125 insertions(+), 112 deletions(-) create mode 100644 emails/exceptions.py diff --git a/emails/exceptions.py b/emails/exceptions.py new file mode 100644 index 0000000000..36e2480939 --- /dev/null +++ b/emails/exceptions.py @@ -0,0 +1,105 @@ +"""Exceptions raised by emails app""" + +from django.conf import settings +from django.core.exceptions import BadRequest + +from api.exceptions import ErrorContextType, RelayAPIException + + +class CannotMakeSubdomainException(BadRequest): + """Exception raised by Profile due to error on subdomain creation. + + Attributes: + message -- optional explanation of the error + """ + + def __init__(self, message: str | None = None) -> None: + self.message = message + + +class CannotMakeAddressException(RelayAPIException): + """Base exception for RelayAddress or DomainAddress creation failure.""" + + +class AccountIsPausedException(CannotMakeAddressException): + default_code = "account_is_paused" + default_detail = "Your account is on pause." + status_code = 403 + + +class AccountIsInactiveException(CannotMakeAddressException): + default_code = "account_is_inactive" + default_detail = "Your account is not active." + status_code = 403 + + +class RelayAddrFreeTierLimitException(CannotMakeAddressException): + default_code = "free_tier_limit" + default_detail_template = ( + "You’ve used all {free_tier_limit} email masks included with your free account." + " You can reuse an existing mask, but using a unique mask for each account is" + " the most secure option." + ) + status_code = 403 + + def __init__(self, free_tier_limit: int | None = None): + self.free_tier_limit = free_tier_limit or settings.MAX_NUM_FREE_ALIASES + super().__init__() + + def error_context(self) -> ErrorContextType: + return {"free_tier_limit": self.free_tier_limit} + + +class DomainAddrFreeTierException(CannotMakeAddressException): + default_code = "free_tier_no_subdomain_masks" + default_detail = ( + "Your free account does not include custom subdomains for masks." + " To create custom masks, upgrade to Relay Premium." + ) + status_code = 403 + + +class DomainAddrNeedSubdomainException(CannotMakeAddressException): + default_code = "need_subdomain" + default_detail = "Please select a subdomain before creating a custom email address." + status_code = 400 + + +class DomainAddrUpdateException(CannotMakeAddressException): + """Exception raised when attempting to edit an existing domain address field.""" + + default_code = "address_not_editable" + default_detail = "You cannot edit an existing domain address field." + status_code = 400 + + +class DomainAddrUnavailableException(CannotMakeAddressException): + default_code = "address_unavailable" + default_detail_template = ( + "“{unavailable_address}” could not be created." + " Please try again with a different mask name." + ) + status_code = 400 + + def __init__(self, unavailable_address: str): + self.unavailable_address = unavailable_address + super().__init__() + + def error_context(self) -> ErrorContextType: + return {"unavailable_address": self.unavailable_address} + + +class DomainAddrDuplicateException(CannotMakeAddressException): + default_code = "duplicate_address" + default_detail_template = ( + "“{duplicate_address}” already exists." + " Please try again with a different mask name." + ) + status_code = 409 + + def __init__(self, duplicate_address: str): + self.duplicate_address = duplicate_address + super().__init__() + + def error_context(self) -> ErrorContextType: + return {"duplicate_address": self.duplicate_address} diff --git a/emails/models.py b/emails/models.py index e8e26b392d..a409ba01ba 100644 --- a/emails/models.py +++ b/emails/models.py @@ -13,7 +13,6 @@ from django.conf import settings from django.contrib.auth.models import User -from django.core.exceptions import BadRequest from django.core.validators import MinLengthValidator from django.db import models, transaction from django.db.models.base import ModelBase @@ -27,7 +26,6 @@ from allauth.socialaccount.models import SocialAccount from rest_framework.authtoken.models import Token -from api.exceptions import ErrorContextType, RelayAPIException from privaterelay.plans import get_premium_countries from privaterelay.utils import ( AcceptLanguageError, @@ -36,6 +34,17 @@ ) from .apps import emails_config +from .exceptions import ( + AccountIsInactiveException, + AccountIsPausedException, + CannotMakeSubdomainException, + DomainAddrDuplicateException, + DomainAddrFreeTierException, + DomainAddrNeedSubdomainException, + DomainAddrUnavailableException, + DomainAddrUpdateException, + RelayAddrFreeTierLimitException, +) from .utils import get_domains_from_settings, incr_if_enabled if settings.PHONES_ENABLED: @@ -646,105 +655,6 @@ def __str__(self): return self.subdomain_hash -class CannotMakeSubdomainException(BadRequest): - """Exception raised by Profile due to error on subdomain creation. - - Attributes: - message -- optional explanation of the error - """ - - def __init__(self, message=None): - self.message = message - - -class CannotMakeAddressException(RelayAPIException): - """Base exception for RelayAddress or DomainAddress creation failure.""" - - -class AccountIsPausedException(CannotMakeAddressException): - default_code = "account_is_paused" - default_detail = "Your account is on pause." - status_code = 403 - - -class AccountIsInactiveException(CannotMakeAddressException): - default_code = "account_is_inactive" - default_detail = "Your account is not active." - status_code = 403 - - -class RelayAddrFreeTierLimitException(CannotMakeAddressException): - default_code = "free_tier_limit" - default_detail_template = ( - "You’ve used all {free_tier_limit} email masks included with your free account." - " You can reuse an existing mask, but using a unique mask for each account is" - " the most secure option." - ) - status_code = 403 - - def __init__(self, free_tier_limit: int | None = None): - self.free_tier_limit = free_tier_limit or settings.MAX_NUM_FREE_ALIASES - super().__init__() - - def error_context(self) -> ErrorContextType: - return {"free_tier_limit": self.free_tier_limit} - - -class DomainAddrFreeTierException(CannotMakeAddressException): - default_code = "free_tier_no_subdomain_masks" - default_detail = ( - "Your free account does not include custom subdomains for masks." - " To create custom masks, upgrade to Relay Premium." - ) - status_code = 403 - - -class DomainAddrNeedSubdomainException(CannotMakeAddressException): - default_code = "need_subdomain" - default_detail = "Please select a subdomain before creating a custom email address." - status_code = 400 - - -class DomainAddrUpdateException(CannotMakeAddressException): - """Exception raised when attempting to edit an existing domain address field.""" - - default_code = "address_not_editable" - default_detail = "You cannot edit an existing domain address field." - status_code = 400 - - -class DomainAddrUnavailableException(CannotMakeAddressException): - default_code = "address_unavailable" - default_detail_template = ( - "“{unavailable_address}” could not be created." - " Please try again with a different mask name." - ) - status_code = 400 - - def __init__(self, unavailable_address: str): - self.unavailable_address = unavailable_address - super().__init__() - - def error_context(self) -> ErrorContextType: - return {"unavailable_address": self.unavailable_address} - - -class DomainAddrDuplicateException(CannotMakeAddressException): - default_code = "duplicate_address" - default_detail_template = ( - "“{duplicate_address}” already exists." - " Please try again with a different mask name." - ) - status_code = 409 - - def __init__(self, duplicate_address: str): - self.duplicate_address = duplicate_address - super().__init__() - - def error_context(self) -> ErrorContextType: - return {"duplicate_address": self.duplicate_address} - - class RelayAddress(models.Model): user = models.ForeignKey(User, on_delete=models.CASCADE) address = models.CharField(max_length=64, default=address_default, unique=True) diff --git a/emails/tests/models_tests.py b/emails/tests/models_tests.py index adcdab351e..932783c2dc 100644 --- a/emails/tests/models_tests.py +++ b/emails/tests/models_tests.py @@ -14,14 +14,16 @@ from model_bakery import baker from waffle.testutils import override_flag -from ..models import ( - AbuseMetrics, +from ..exceptions import ( CannotMakeAddressException, CannotMakeSubdomainException, - DeletedAddress, DomainAddrDuplicateException, - DomainAddress, DomainAddrUnavailableException, +) +from ..models import ( + AbuseMetrics, + DeletedAddress, + DomainAddress, Profile, RegisteredSubdomain, RelayAddress, diff --git a/emails/views.py b/emails/views.py index e7117f81d7..f84dc8e2b4 100644 --- a/emails/views.py +++ b/emails/views.py @@ -41,8 +41,8 @@ glean_logger, ) +from .exceptions import CannotMakeAddressException from .models import ( - CannotMakeAddressException, DeletedAddress, DomainAddress, Profile, diff --git a/privaterelay/views.py b/privaterelay/views.py index 2c389abba0..b44f1c7285 100644 --- a/privaterelay/views.py +++ b/privaterelay/views.py @@ -26,12 +26,8 @@ from rest_framework.decorators import api_view, schema # from silk.profiling.profiler import silk_profile -from emails.models import ( - CannotMakeSubdomainException, - DomainAddress, - RelayAddress, - valid_available_subdomain, -) +from emails.exceptions import CannotMakeSubdomainException +from emails.models import DomainAddress, RelayAddress, valid_available_subdomain from emails.utils import incr_if_enabled from .apps import PrivateRelayConfig From 7ead8fef4cc226a6f9f61c81c4925391daf99f66 Mon Sep 17 00:00:00 2001 From: John Whitlock Date: Thu, 20 Jun 2024 11:25:32 -0500 Subject: [PATCH 2/7] Move valid_available_subdomain to validators.py Update the signature of valid_available_subdomain to be a field validator, raising an exception on failure and returning None. --- emails/apps.py | 3 +- .../0024_increase_subdomain_length.py | 4 +- emails/models.py | 41 +----- emails/tests/models_tests.py | 134 ++---------------- emails/tests/validator_tests.py | 128 +++++++++++++++++ emails/validators.py | 64 +++++++++ privaterelay/views.py | 7 +- 7 files changed, 211 insertions(+), 170 deletions(-) create mode 100644 emails/tests/validator_tests.py create mode 100644 emails/validators.py diff --git a/emails/apps.py b/emails/apps.py index f8051fe671..d414737733 100644 --- a/emails/apps.py +++ b/emails/apps.py @@ -49,7 +49,8 @@ def __init__(self, app_name, app_module): self.badwords = self._load_terms("badwords.text") self.blocklist = self._load_terms("blocklist.text") - def _load_terms(self, filename): + def _load_terms(self, filename: str) -> list[str]: + """Load a list of terms from a file.""" terms = [] terms_file_path = os.path.join(settings.BASE_DIR, "emails", filename) with open(terms_file_path) as terms_file: diff --git a/emails/migrations/0024_increase_subdomain_length.py b/emails/migrations/0024_increase_subdomain_length.py index 7f900571f1..0cf415d85d 100644 --- a/emails/migrations/0024_increase_subdomain_length.py +++ b/emails/migrations/0024_increase_subdomain_length.py @@ -2,7 +2,7 @@ from django.db import migrations, models -import emails.models +import emails.validators class Migration(migrations.Migration): @@ -20,7 +20,7 @@ class Migration(migrations.Migration): max_length=63, null=True, unique=True, - validators=[emails.models.valid_available_subdomain], + validators=[emails.validators.valid_available_subdomain], ), ), ] diff --git a/emails/models.py b/emails/models.py index a409ba01ba..edc674e1d6 100644 --- a/emails/models.py +++ b/emails/models.py @@ -33,7 +33,6 @@ guess_country_from_accept_lang, ) -from .apps import emails_config from .exceptions import ( AccountIsInactiveException, AccountIsPausedException, @@ -46,6 +45,7 @@ RelayAddrFreeTierLimitException, ) from .utils import get_domains_from_settings, incr_if_enabled +from .validators import has_bad_words, is_blocklisted, valid_available_subdomain if settings.PHONES_ENABLED: from phones.models import RealPhone, RelayNumber @@ -60,31 +60,6 @@ PREMIUM_DOMAINS = ["mozilla.com", "getpocket.com", "mozillafoundation.org"] -def valid_available_subdomain(subdomain, *args, **kwargs): - if not subdomain: - raise CannotMakeSubdomainException("error-subdomain-cannot-be-empty-or-null") - # valid subdomains: - # can't start or end with a hyphen - # must be 1-63 alphanumeric characters and/or hyphens - subdomain = subdomain.lower() - valid_subdomain_pattern = re.compile("^(?!-)[a-z0-9-]{1,63}(? 0 - ) - if not valid or bad_word or blocked_word or taken: - raise CannotMakeSubdomainException("error-subdomain-not-available") - return True - - # This historical function is referenced in migration # 0029_profile_add_deleted_metric_and_changeserver_storage_default def default_server_storage(): @@ -616,20 +591,6 @@ def address_default(): ) -def has_bad_words(value: str) -> bool: - for badword in emails_config().badwords: - badword = badword.strip() - if len(badword) <= 4 and badword == value: - return True - if len(badword) > 4 and badword in value: - return True - return False - - -def is_blocklisted(value: str) -> bool: - return any(blockedword == value for blockedword in emails_config().blocklist) - - def get_domain_numerical(domain_address): # get domain name from the address domains = get_domains_from_settings() diff --git a/emails/tests/models_tests.py b/emails/tests/models_tests.py index 932783c2dc..0064e10984 100644 --- a/emails/tests/models_tests.py +++ b/emails/tests/models_tests.py @@ -25,16 +25,11 @@ DeletedAddress, DomainAddress, Profile, - RegisteredSubdomain, RelayAddress, address_hash, get_domain_numerical, - has_bad_words, - hash_subdomain, - is_blocklisted, valid_address, valid_address_pattern, - valid_available_subdomain, ) from ..utils import get_domains_from_settings @@ -127,44 +122,7 @@ def vpn_subscription() -> str: return random.choice(vpn_only_plans) -class MiscEmailModelsTest(TestCase): - def test_has_bad_words_with_bad_words(self): - assert has_bad_words("angry") - - def test_has_bad_words_without_bad_words(self): - assert not has_bad_words("happy") - - def test_has_bad_words_exact_match_on_small_words(self): - assert has_bad_words("ho") - assert not has_bad_words("horse") - assert has_bad_words("ass") - assert not has_bad_words("cassandra") - assert has_bad_words("hell") - assert not has_bad_words("shell") - assert has_bad_words("bra") - assert not has_bad_words("brain") - assert has_bad_words("fart") - assert not has_bad_words("farther") - assert has_bad_words("fu") - assert not has_bad_words("funny") - assert has_bad_words("poo") - assert not has_bad_words("pools") - - def test_is_blocklisted_with_blocked_word(self): - assert is_blocklisted("mozilla") - - def test_is_blocklisted_with_custom_blocked_word(self): - # custom blocked word - # see MPP-2077 for more details - assert is_blocklisted("customdomain") - - def test_is_blocklisted_without_blocked_words(self): - assert not is_blocklisted("non-blocked-word") - - @patch("emails.models.emails_config", return_value=Mock(blocklist=["blocked-word"])) - def test_is_blocklisted_with_mocked_blocked_words(self, mock_config: Mock) -> None: - assert is_blocklisted("blocked-word") - +class AddressHashTest(TestCase): @override_settings(RELAY_FIREFOX_DOMAIN="firefox.com") def test_address_hash_without_subdomain_domain_firefox(self): address = "aaaaaaaaa" @@ -191,10 +149,14 @@ def test_address_hash_with_additional_domain(self): expected_hash = sha256(f"{address}@{test_domain}".encode()).hexdigest() assert address_hash(address, domain=test_domain) == expected_hash + +class GetDomainNumericalTest(TestCase): def test_get_domain_numerical(self): assert get_domain_numerical("default.com") == 1 assert get_domain_numerical("test.com") == 2 + +class ValidAddressPatternTest(TestCase): def test_valid_address_pattern_is_valid(self): assert valid_address_pattern("foo") assert valid_address_pattern("foo-bar") @@ -377,11 +339,11 @@ def test_valid_address_dupe_of_deleted_invalid(self): relay_address.delete() assert not valid_address(relay_address.address, relay_address.domain_value) - @patch( - "emails.models.emails_config", - return_value=Mock(badwords=[], blocklist=["blocked-word"]), - ) - def test_address_contains_blocklist_invalid(self, mock_config: Mock) -> None: + @patch("emails.validators.badwords", return_value=[]) + @patch("emails.validators.blocklist", return_value=["blocked-word"]) + def test_address_contains_blocklist_invalid( + self, mock_blocklist: Mock, mock_badwords: Mock + ) -> None: blocked_word = "blocked-word" relay_address = RelayAddress.objects.create( user=baker.make(User), address=blocked_word @@ -977,82 +939,6 @@ def test_save_server_storage_false_only_deletes_that_profiles_data(self) -> None assert relay_address.used_on == self.TEST_USED_ON -class ValidAvailableSubdomainTest(TestCase): - """Tests for valid_available_subdomain()""" - - ERR_NOT_AVAIL = "error-subdomain-not-available" - ERR_EMPTY_OR_NULL = "error-subdomain-cannot-be-empty-or-null" - - def reserve_subdomain_for_new_user(self, subdomain: str) -> User: - user = make_premium_test_user() - user.profile.add_subdomain(subdomain) - return user - - def test_bad_word_raises(self) -> None: - with self.assertRaisesMessage(CannotMakeSubdomainException, self.ERR_NOT_AVAIL): - valid_available_subdomain("angry") - - def test_blocked_word_raises(self) -> None: - with self.assertRaisesMessage(CannotMakeSubdomainException, self.ERR_NOT_AVAIL): - valid_available_subdomain("mozilla") - - def test_taken_subdomain_raises(self) -> None: - subdomain = "thisisfine" - self.reserve_subdomain_for_new_user(subdomain) - with self.assertRaisesMessage(CannotMakeSubdomainException, self.ERR_NOT_AVAIL): - valid_available_subdomain(subdomain) - - def test_taken_subdomain_different_case_raises(self) -> None: - self.reserve_subdomain_for_new_user("thIsIsfInE") - with self.assertRaisesMessage(CannotMakeSubdomainException, self.ERR_NOT_AVAIL): - valid_available_subdomain("THiSiSFiNe") - - def test_inactive_subdomain_raises(self) -> None: - """subdomains registered by now deleted profiles are not available.""" - subdomain = "thisisfine" - user = self.reserve_subdomain_for_new_user(subdomain) - user.delete() - - registered_subdomain_count = RegisteredSubdomain.objects.filter( - subdomain_hash=hash_subdomain(subdomain) - ).count() - assert Profile.objects.filter(subdomain=subdomain).count() == 0 - assert registered_subdomain_count == 1 - with self.assertRaisesMessage(CannotMakeSubdomainException, self.ERR_NOT_AVAIL): - valid_available_subdomain(subdomain) - - def test_subdomain_with_space_raises(self) -> None: - with self.assertRaisesMessage(CannotMakeSubdomainException, self.ERR_NOT_AVAIL): - valid_available_subdomain("my domain") - - def test_subdomain_with_special_char_raises(self) -> None: - with self.assertRaisesMessage(CannotMakeSubdomainException, self.ERR_NOT_AVAIL): - valid_available_subdomain("my@domain") - - def test_subdomain_with_dash_returns_True(self) -> None: - assert valid_available_subdomain("my-domain") is True - - def test_subdomain_with_dash_at_front_raises(self) -> None: - with self.assertRaisesMessage(CannotMakeSubdomainException, self.ERR_NOT_AVAIL): - valid_available_subdomain("-mydomain") - - def test_empty_subdomain_raises(self) -> None: - with self.assertRaisesMessage( - CannotMakeSubdomainException, self.ERR_EMPTY_OR_NULL - ): - valid_available_subdomain("") - - def test_null_subdomain_raises(self) -> None: - with self.assertRaisesMessage( - CannotMakeSubdomainException, self.ERR_EMPTY_OR_NULL - ): - valid_available_subdomain(None) - - def test_subdomain_with_space_at_end_raises(self) -> None: - with self.assertRaisesMessage(CannotMakeSubdomainException, self.ERR_NOT_AVAIL): - valid_available_subdomain("mydomain ") - - class ProfileDisplayNameTest(ProfileTestCase): """Tests for Profile.display_name""" diff --git a/emails/tests/validator_tests.py b/emails/tests/validator_tests.py new file mode 100644 index 0000000000..c86818af57 --- /dev/null +++ b/emails/tests/validator_tests.py @@ -0,0 +1,128 @@ +"""Tests for field validators.""" + +from unittest.mock import Mock, patch + +from django.contrib.auth.models import User +from django.test import TestCase + +from ..exceptions import CannotMakeSubdomainException +from ..models import Profile, RegisteredSubdomain, hash_subdomain +from ..validators import has_bad_words, is_blocklisted, valid_available_subdomain +from .models_tests import make_premium_test_user + + +class HasBadWordsTest(TestCase): + def test_has_bad_words_with_bad_words(self) -> None: + assert has_bad_words("angry") + + def test_has_bad_words_without_bad_words(self) -> None: + assert not has_bad_words("happy") + + def test_has_bad_words_exact_match_on_small_words(self) -> None: + assert has_bad_words("ho") + assert not has_bad_words("horse") + assert has_bad_words("ass") + assert not has_bad_words("cassandra") + assert has_bad_words("hell") + assert not has_bad_words("shell") + assert has_bad_words("bra") + assert not has_bad_words("brain") + assert has_bad_words("fart") + assert not has_bad_words("farther") + assert has_bad_words("fu") + assert not has_bad_words("funny") + assert has_bad_words("poo") + assert not has_bad_words("pools") + + +class IsBlocklistedTest(TestCase): + def test_is_blocklisted_with_blocked_word(self) -> None: + assert is_blocklisted("mozilla") + + def test_is_blocklisted_with_custom_blocked_word(self) -> None: + # custom blocked word + # see MPP-2077 for more details + assert is_blocklisted("customdomain") + + def test_is_blocklisted_without_blocked_words(self) -> None: + assert not is_blocklisted("non-blocked-word") + + @patch("emails.validators.blocklist", return_value=["blocked-word"]) + def test_is_blocklisted_with_mocked_blocked_words(self, mock_config: Mock) -> None: + assert is_blocklisted("blocked-word") + + +class ValidAvailableSubdomainTest(TestCase): + """Tests for valid_available_subdomain()""" + + ERR_NOT_AVAIL = "error-subdomain-not-available" + ERR_EMPTY_OR_NULL = "error-subdomain-cannot-be-empty-or-null" + + def reserve_subdomain_for_new_user(self, subdomain: str) -> User: + user = make_premium_test_user() + user.profile.add_subdomain(subdomain) + return user + + def test_bad_word_raises(self) -> None: + with self.assertRaisesMessage(CannotMakeSubdomainException, self.ERR_NOT_AVAIL): + valid_available_subdomain("angry") + + def test_blocked_word_raises(self) -> None: + with self.assertRaisesMessage(CannotMakeSubdomainException, self.ERR_NOT_AVAIL): + valid_available_subdomain("mozilla") + + def test_taken_subdomain_raises(self) -> None: + subdomain = "thisisfine" + self.reserve_subdomain_for_new_user(subdomain) + with self.assertRaisesMessage(CannotMakeSubdomainException, self.ERR_NOT_AVAIL): + valid_available_subdomain(subdomain) + + def test_taken_subdomain_different_case_raises(self) -> None: + self.reserve_subdomain_for_new_user("thIsIsfInE") + with self.assertRaisesMessage(CannotMakeSubdomainException, self.ERR_NOT_AVAIL): + valid_available_subdomain("THiSiSFiNe") + + def test_inactive_subdomain_raises(self) -> None: + """subdomains registered by now deleted profiles are not available.""" + subdomain = "thisisfine" + user = self.reserve_subdomain_for_new_user(subdomain) + user.delete() + + registered_subdomain_count = RegisteredSubdomain.objects.filter( + subdomain_hash=hash_subdomain(subdomain) + ).count() + assert Profile.objects.filter(subdomain=subdomain).count() == 0 + assert registered_subdomain_count == 1 + with self.assertRaisesMessage(CannotMakeSubdomainException, self.ERR_NOT_AVAIL): + valid_available_subdomain(subdomain) + + def test_subdomain_with_space_raises(self) -> None: + with self.assertRaisesMessage(CannotMakeSubdomainException, self.ERR_NOT_AVAIL): + valid_available_subdomain("my domain") + + def test_subdomain_with_special_char_raises(self) -> None: + with self.assertRaisesMessage(CannotMakeSubdomainException, self.ERR_NOT_AVAIL): + valid_available_subdomain("my@domain") + + def test_subdomain_with_dash_succeeds(self) -> None: + valid_available_subdomain("my-domain") + + def test_subdomain_with_dash_at_front_raises(self) -> None: + with self.assertRaisesMessage(CannotMakeSubdomainException, self.ERR_NOT_AVAIL): + valid_available_subdomain("-mydomain") + + def test_empty_subdomain_raises(self) -> None: + with self.assertRaisesMessage( + CannotMakeSubdomainException, self.ERR_EMPTY_OR_NULL + ): + valid_available_subdomain("") + + def test_null_subdomain_raises(self) -> None: + with self.assertRaisesMessage( + CannotMakeSubdomainException, self.ERR_EMPTY_OR_NULL + ): + valid_available_subdomain(None) + + def test_subdomain_with_space_at_end_raises(self) -> None: + with self.assertRaisesMessage(CannotMakeSubdomainException, self.ERR_NOT_AVAIL): + valid_available_subdomain("mydomain ") diff --git a/emails/validators.py b/emails/validators.py new file mode 100644 index 0000000000..6ba5c073ca --- /dev/null +++ b/emails/validators.py @@ -0,0 +1,64 @@ +"""Field validators for emails models.""" + +import re +from typing import Any + +from .apps import emails_config +from .exceptions import CannotMakeSubdomainException + +# A valid subdomain: +# can't start or end with a hyphen +# must be 1-63 alphanumeric characters and/or hyphens +_re_valid_subdomain = re.compile("^(?!-)[a-z0-9-]{1,63}(? list[str]: + """Allow mocking of badwords in tests.""" + return emails_config().badwords + + +def has_bad_words(value: str) -> bool: + """Return True if the value is a short bad word or contains a long bad word.""" + for badword in badwords(): + badword = badword.strip() + if len(badword) <= 4 and badword == value: + return True + if len(badword) > 4 and badword in value: + return True + return False + + +def blocklist() -> list[str]: + """Allow mocking of blocklist in tests.""" + return emails_config().blocklist + + +def is_blocklisted(value: str) -> bool: + """Return True if the value is a blocked word.""" + return any(blockedword == value for blockedword in blocklist()) + + +def valid_available_subdomain(subdomain: Any) -> None: + """Raise CannotMakeSubdomainException if the subdomain fails a validation test.""" + from .models import RegisteredSubdomain, hash_subdomain + + if not subdomain: + raise CannotMakeSubdomainException("error-subdomain-cannot-be-empty-or-null") + subdomain = str(subdomain).lower() + + # valid subdomains: + # have to meet the rules for length and characters + valid = _re_valid_subdomain.match(subdomain) is not None + # can't have "bad" words in them + bad_word = has_bad_words(subdomain) + # can't have "blocked" words in them + blocked_word = is_blocklisted(subdomain) + # can't be taken by someone else + taken = ( + RegisteredSubdomain.objects.filter( + subdomain_hash=hash_subdomain(subdomain) + ).count() + > 0 + ) + if not valid or bad_word or blocked_word or taken: + raise CannotMakeSubdomainException("error-subdomain-not-available") diff --git a/privaterelay/views.py b/privaterelay/views.py index b44f1c7285..c29bb5c55d 100644 --- a/privaterelay/views.py +++ b/privaterelay/views.py @@ -27,8 +27,9 @@ # from silk.profiling.profiler import silk_profile from emails.exceptions import CannotMakeSubdomainException -from emails.models import DomainAddress, RelayAddress, valid_available_subdomain +from emails.models import DomainAddress, RelayAddress from emails.utils import incr_if_enabled +from emails.validators import valid_available_subdomain from .apps import PrivateRelayConfig from .fxa_utils import NoSocialToken, _get_oauth2_session @@ -78,8 +79,8 @@ def profile_subdomain(request): try: if request.method == "GET": subdomain = request.GET.get("subdomain", None) - available = valid_available_subdomain(subdomain) - return JsonResponse({"available": available}) + valid_available_subdomain(subdomain) + return JsonResponse({"available": True}) else: subdomain = request.POST.get("subdomain", None) profile.add_subdomain(subdomain) From 32cba7a709133b99a9215aaa701ccb353f76c43a Mon Sep 17 00:00:00 2001 From: John Whitlock Date: Thu, 20 Jun 2024 11:39:59 -0500 Subject: [PATCH 3/7] Move copy_auth_token to signals, fill out types --- emails/models.py | 14 -------------- emails/signals.py | 36 ++++++++++++++++++++++++++++++++---- 2 files changed, 32 insertions(+), 18 deletions(-) diff --git a/emails/models.py b/emails/models.py index edc674e1d6..67d7ee7ff5 100644 --- a/emails/models.py +++ b/emails/models.py @@ -17,14 +17,12 @@ from django.db import models, transaction from django.db.models.base import ModelBase from django.db.models.query import QuerySet -from django.dispatch import receiver from django.utils.translation.trans_real import ( get_supported_language_variant, parse_accept_lang_header, ) from allauth.socialaccount.models import SocialAccount -from rest_framework.authtoken.models import Token from privaterelay.plans import get_premium_countries from privaterelay.utils import ( @@ -561,18 +559,6 @@ def metrics_premium_status(self) -> str: return f"{plan}_{self.plan_term}" -@receiver(models.signals.post_save, sender=Profile) -def copy_auth_token(sender, instance=None, created=False, **kwargs): - if created: - # baker triggers created during tests - # so first check the user doesn't already have a Token - try: - Token.objects.get(user=instance.user) - return - except Token.DoesNotExist: - Token.objects.create(user=instance.user, key=instance.api_token) - - def address_hash(address, subdomain=None, domain=None): if not domain: domain = get_domains_from_settings()["MOZMAIL_DOMAIN"] diff --git a/emails/signals.py b/emails/signals.py index 03f83fbf17..b76658d6c1 100644 --- a/emails/signals.py +++ b/emails/signals.py @@ -1,10 +1,13 @@ import logging from hashlib import sha256 +from typing import Any from django.contrib.auth.models import User from django.db.models.signals import post_save, pre_save from django.dispatch import receiver +from rest_framework.authtoken.models import Token + from emails.models import Profile from emails.utils import incr_if_enabled, set_user_group @@ -12,14 +15,18 @@ @receiver(post_save, sender=User) -def create_user_profile(sender, instance, created, **kwargs): +def create_user_profile( + sender: type[User], instance: User, created: bool, **kwargs: Any +) -> None: if created: set_user_group(instance) Profile.objects.create(user=instance) @receiver(pre_save, sender=Profile) -def measure_feature_usage(sender, instance, **kwargs): +def measure_feature_usage( + sender: type[Profile], instance: Profile, **kwargs: Any +) -> None: if instance._state.adding: # if newly created Profile ignore the signal return @@ -35,11 +42,32 @@ def measure_feature_usage(sender, instance, **kwargs): incr_if_enabled("tracker_removal_enabled") if not instance.remove_level_one_email_trackers: incr_if_enabled("tracker_removal_disabled") + if instance.fxa: + # TODO create a utility function or property for hashed fxa uid + hashed_uid = sha256(instance.fxa.uid.encode("utf-8")).hexdigest() + else: + hashed_uid = "_no_fxa_" info_logger.info( "tracker_removal_feature", extra={ "enabled": instance.remove_level_one_email_trackers, - # TODO create a utility function or property for hashed fxa uid - "hashed_uid": sha256(instance.fxa.uid.encode("utf-8")).hexdigest(), + "hashed_uid": hashed_uid, }, ) + + +@receiver(post_save, sender=Profile) +def copy_auth_token( + sender: type[Profile], + instance: Profile | None = None, + created: bool = False, + **kwargs: Any, +) -> None: + if created and instance is not None: + # baker triggers created during tests + # so first check the user doesn't already have a Token + try: + Token.objects.get(user=instance.user) + return + except Token.DoesNotExist: + Token.objects.create(user=instance.user, key=instance.api_token) From da0b63e99a0a87914dea9ecd8eca2e0eeed3eaa6 Mon Sep 17 00:00:00 2001 From: John Whitlock Date: Thu, 20 Jun 2024 13:19:12 -0500 Subject: [PATCH 4/7] Move valid_address to validators --- emails/models.py | 33 +++--------------- emails/tests/models_tests.py | 37 -------------------- emails/tests/validator_tests.py | 61 +++++++++++++++++++++++++++++++-- emails/validators.py | 34 ++++++++++++++++++ 4 files changed, 97 insertions(+), 68 deletions(-) diff --git a/emails/models.py b/emails/models.py index 67d7ee7ff5..e0a8c66890 100644 --- a/emails/models.py +++ b/emails/models.py @@ -2,7 +2,6 @@ import logging import random -import re import string import uuid from collections import namedtuple @@ -43,7 +42,11 @@ RelayAddrFreeTierLimitException, ) from .utils import get_domains_from_settings, incr_if_enabled -from .validators import has_bad_words, is_blocklisted, valid_available_subdomain +from .validators import ( + is_blocklisted, + valid_address, + valid_available_subdomain, +) if settings.PHONES_ENABLED: from phones.models import RealPhone, RelayNumber @@ -736,32 +739,6 @@ def check_user_can_make_another_address(profile: Profile) -> None: raise RelayAddrFreeTierLimitException() -def valid_address_pattern(address): - # can't start or end with a hyphen - # must be 1-63 lowercase alphanumeric characters and/or hyphens - valid_address_pattern = re.compile("^(?![-.])[a-z0-9-.]{1,63}(? bool: - address_pattern_valid = valid_address_pattern(address) - address_contains_badword = has_bad_words(address) - address_already_deleted = 0 - if not subdomain or flag_is_active_in_task( - "custom_domain_management_redesign", None - ): - address_already_deleted = DeletedAddress.objects.filter( - address_hash=address_hash(address, domain=domain, subdomain=subdomain) - ).count() - if ( - address_already_deleted > 0 - or address_contains_badword - or not address_pattern_valid - ): - return False - return True - - class DeletedAddress(models.Model): address_hash = models.CharField(max_length=64, db_index=True) num_forwarded = models.PositiveIntegerField(default=0) diff --git a/emails/tests/models_tests.py b/emails/tests/models_tests.py index 0064e10984..25fee3043f 100644 --- a/emails/tests/models_tests.py +++ b/emails/tests/models_tests.py @@ -28,8 +28,6 @@ RelayAddress, address_hash, get_domain_numerical, - valid_address, - valid_address_pattern, ) from ..utils import get_domains_from_settings @@ -156,25 +154,6 @@ def test_get_domain_numerical(self): assert get_domain_numerical("test.com") == 2 -class ValidAddressPatternTest(TestCase): - def test_valid_address_pattern_is_valid(self): - assert valid_address_pattern("foo") - assert valid_address_pattern("foo-bar") - assert valid_address_pattern("foo.bar") - assert valid_address_pattern("f00bar") - assert valid_address_pattern("123foo") - assert valid_address_pattern("123") - - def test_valid_address_pattern_is_not_valid(self): - assert not valid_address_pattern("-") - assert not valid_address_pattern("-foo") - assert not valid_address_pattern("foo-") - assert not valid_address_pattern(".foo") - assert not valid_address_pattern("foo.") - assert not valid_address_pattern("foo bar") - assert not valid_address_pattern("Foo") - - class RelayAddressTest(TestCase): def setUp(self): self.user = make_free_test_user() @@ -334,11 +313,6 @@ def test_relay_address_create_repeats_deleted_address_invalid(self): ) assert not repeat_deleted_relay_address.address == address - def test_valid_address_dupe_of_deleted_invalid(self): - relay_address = RelayAddress.objects.create(user=baker.make(User)) - relay_address.delete() - assert not valid_address(relay_address.address, relay_address.domain_value) - @patch("emails.validators.badwords", return_value=[]) @patch("emails.validators.blocklist", return_value=["blocked-word"]) def test_address_contains_blocklist_invalid( @@ -1345,17 +1319,6 @@ def test_make_domain_address_can_make_dupe_of_deleted(self): ) assert dupe_domain_address.full_address == domain_address.full_address - @override_flag("custom_domain_management_redesign", active=True) - def test_valid_address_dupe_domain_address_of_deleted_is_not_valid(self): - address = "same-address" - domain_address = DomainAddress.make_domain_address( - self.user_profile, address=address - ) - domain_address.delete() - assert not valid_address( - address, domain_address.domain_value, self.user_profile.subdomain - ) - @override_flag("custom_domain_management_redesign", active=True) def test_make_domain_address_cannot_make_dupe_of_deleted(self): address = "same-address" diff --git a/emails/tests/validator_tests.py b/emails/tests/validator_tests.py index c86818af57..d105662a0d 100644 --- a/emails/tests/validator_tests.py +++ b/emails/tests/validator_tests.py @@ -5,10 +5,24 @@ from django.contrib.auth.models import User from django.test import TestCase +from waffle.testutils import override_flag + from ..exceptions import CannotMakeSubdomainException -from ..models import Profile, RegisteredSubdomain, hash_subdomain -from ..validators import has_bad_words, is_blocklisted, valid_available_subdomain -from .models_tests import make_premium_test_user +from ..models import ( + DomainAddress, + Profile, + RegisteredSubdomain, + RelayAddress, + hash_subdomain, +) +from ..validators import ( + has_bad_words, + is_blocklisted, + valid_address, + valid_address_pattern, + valid_available_subdomain, +) +from .models_tests import make_free_test_user, make_premium_test_user class HasBadWordsTest(TestCase): @@ -126,3 +140,44 @@ def test_null_subdomain_raises(self) -> None: def test_subdomain_with_space_at_end_raises(self) -> None: with self.assertRaisesMessage(CannotMakeSubdomainException, self.ERR_NOT_AVAIL): valid_available_subdomain("mydomain ") + + +class ValidAddressPatternTest(TestCase): + def test_valid_address_pattern_is_valid(self) -> None: + assert valid_address_pattern("foo") + assert valid_address_pattern("foo-bar") + assert valid_address_pattern("foo.bar") + assert valid_address_pattern("f00bar") + assert valid_address_pattern("123foo") + assert valid_address_pattern("123") + + def test_valid_address_pattern_is_not_valid(self) -> None: + assert not valid_address_pattern("-") + assert not valid_address_pattern("-foo") + assert not valid_address_pattern("foo-") + assert not valid_address_pattern(".foo") + assert not valid_address_pattern("foo.") + assert not valid_address_pattern("foo bar") + assert not valid_address_pattern("Foo") + + +class ValidAddressTest(TestCase): + def test_valid_address_dupe_of_deleted_invalid(self) -> None: + user = make_free_test_user() + relay_address = RelayAddress.objects.create(user=user) + relay_address.delete() + assert not valid_address(relay_address.address, relay_address.domain_value) + + @override_flag("custom_domain_management_redesign", active=True) + def test_valid_address_dupe_domain_address_of_deleted_is_not_valid(self) -> None: + user = make_premium_test_user() + user.profile.subdomain = "mysubdomain" + user.profile.save() + address = "same-address" + domain_address = DomainAddress.make_domain_address( + user.profile, address=address + ) + domain_address.delete() + assert not valid_address( + address, domain_address.domain_value, user.profile.subdomain + ) diff --git a/emails/validators.py b/emails/validators.py index 6ba5c073ca..443786d27a 100644 --- a/emails/validators.py +++ b/emails/validators.py @@ -3,9 +3,16 @@ import re from typing import Any +from privaterelay.utils import flag_is_active_in_task + from .apps import emails_config from .exceptions import CannotMakeSubdomainException +# A valid local / username part of an email address: +# can't start or end with a hyphen +# must be 1-63 lowercase alphanumeric characters and/or hyphens +_re_valid_address = re.compile("^(?![-.])[a-z0-9-.]{1,63}(? bool: return any(blockedword == value for blockedword in blocklist()) +def valid_address(address: str, domain: str, subdomain: str | None = None) -> bool: + """Return if the given address parts make a valid Relay email.""" + from .models import DeletedAddress, address_hash + + address_pattern_valid = valid_address_pattern(address) + address_contains_badword = has_bad_words(address) + address_already_deleted = 0 + if not subdomain or flag_is_active_in_task( + "custom_domain_management_redesign", None + ): + address_already_deleted = DeletedAddress.objects.filter( + address_hash=address_hash(address, domain=domain, subdomain=subdomain) + ).count() + if ( + address_already_deleted > 0 + or address_contains_badword + or not address_pattern_valid + ): + return False + return True + + +def valid_address_pattern(address: str) -> bool: + """Return if the local/user part of an address is valid.""" + return _re_valid_address.match(address) is not None + + def valid_available_subdomain(subdomain: Any) -> None: """Raise CannotMakeSubdomainException if the subdomain fails a validation test.""" from .models import RegisteredSubdomain, hash_subdomain From 4d549e2f1903867613ed555038bfccc12f1ecbc0 Mon Sep 17 00:00:00 2001 From: John Whitlock Date: Thu, 20 Jun 2024 14:03:08 -0500 Subject: [PATCH 5/7] Change make_domain_address to take a User object DomainAddress.make_domain_address used to take a Profile argument, and now takes a User argument. This will allow other functions to take a User instead of a Profile, and avoid circular imports. --- emails/models.py | 47 ++++++++------- emails/tests/models_tests.py | 101 +++++++++++++------------------- emails/tests/validator_tests.py | 4 +- emails/views.py | 2 +- 4 files changed, 69 insertions(+), 85 deletions(-) diff --git a/emails/models.py b/emails/models.py index e0a8c66890..a27716ddb9 100644 --- a/emails/models.py +++ b/emails/models.py @@ -675,7 +675,7 @@ def save( if self._state.adding: with transaction.atomic(): locked_profile = Profile.objects.select_for_update().get(user=self.user) - check_user_can_make_another_address(locked_profile) + check_user_can_make_another_address(locked_profile.user) while True: address_is_allowed = not is_blocklisted(self.address) address_is_valid = valid_address(self.address, self.domain_value) @@ -726,16 +726,17 @@ def metrics_id(self) -> str: return f"R{self.id}" -def check_user_can_make_another_address(profile: Profile) -> None: - if not profile.user.is_active: +def check_user_can_make_another_address(user: User) -> None: + """Raise an exception if the user can not make a RelayAddress.""" + if not user.is_active: raise AccountIsInactiveException() - if profile.is_flagged: + if user.profile.is_flagged: raise AccountIsPausedException() # MPP-3021: return early for premium users to avoid at_max_free_aliases DB query - if profile.has_premium: + if user.profile.has_premium: return - if profile.at_max_free_aliases: + if user.profile.at_max_free_aliases: raise RelayAddrFreeTierLimitException() @@ -750,17 +751,18 @@ def __str__(self): return self.address_hash -def check_user_can_make_domain_address(user_profile: Profile) -> None: - if not user_profile.has_premium: +def check_user_can_make_domain_address(user: User) -> None: + """Raise an exception if the user can not make a DomainAddress.""" + if not user.profile.has_premium: raise DomainAddrFreeTierException() - if not user_profile.subdomain: + if not user.profile.subdomain: raise DomainAddrNeedSubdomainException() - if not user_profile.user.is_active: + if not user.is_active: raise AccountIsInactiveException() - if user_profile.is_flagged: + if user.profile.is_flagged: raise AccountIsPausedException() @@ -798,11 +800,10 @@ def save( using: str | None = None, update_fields: Iterable[str] | None = None, ) -> None: - user_profile = self.user.profile if self._state.adding: - check_user_can_make_domain_address(user_profile) + check_user_can_make_domain_address(self.user) domain_address_valid = valid_address( - self.address, self.domain_value, user_profile.subdomain + self.address, self.domain_value, self.user.profile.subdomain ) if not domain_address_valid: if self.first_emailed_at: @@ -814,9 +815,9 @@ def save( ).exists(): raise DomainAddrDuplicateException(duplicate_address=self.address) - user_profile.update_abuse_metric(address_created=True) - user_profile.last_engagement = datetime.now(UTC) - user_profile.save(update_fields=["last_engagement"]) + self.user.profile.update_abuse_metric(address_created=True) + self.user.profile.last_engagement = datetime.now(UTC) + self.user.profile.save(update_fields=["last_engagement"]) incr_if_enabled("domainaddress.create") if self.first_emailed_at: incr_if_enabled("domainaddress.create_via_email") @@ -826,11 +827,13 @@ def save( if existing_instance.address != self.address: raise DomainAddrUpdateException() - if not user_profile.has_premium and self.block_list_emails: + if not self.user.profile.has_premium and self.block_list_emails: self.block_list_emails = False if update_fields: update_fields = {"block_list_emails"}.union(update_fields) - if (not user_profile.server_storage) and (self.description or self.used_on): + if (not self.user.profile.server_storage) and ( + self.description or self.used_on + ): self.description = "" self.used_on = "" if update_fields: @@ -848,9 +851,9 @@ def user_profile(self): @staticmethod def make_domain_address( - user_profile: Profile, address: str | None = None, made_via_email: bool = False + user: User, address: str | None = None, made_via_email: bool = False ) -> DomainAddress: - check_user_can_make_domain_address(user_profile) + check_user_can_make_domain_address(user) if not address: # FIXME: if the alias is randomly generated and has bad words @@ -864,7 +867,7 @@ def make_domain_address( first_emailed_at = datetime.now(UTC) if made_via_email else None domain_address = DomainAddress.objects.create( - user=user_profile.user, address=address, first_emailed_at=first_emailed_at + user=user, address=address, first_emailed_at=first_emailed_at ) return domain_address diff --git a/emails/tests/models_tests.py b/emails/tests/models_tests.py index 25fee3043f..9705f91e63 100644 --- a/emails/tests/models_tests.py +++ b/emails/tests/models_tests.py @@ -1217,14 +1217,11 @@ def setUp(self): self.subdomain = "test" self.user = make_premium_test_user() self.storageless_user = make_storageless_test_user() - self.user_profile = self.user.profile - self.user_profile.subdomain = self.subdomain - self.user_profile.save() + self.user.profile.subdomain = self.subdomain + self.user.profile.save() def test_make_domain_address_assigns_to_user(self): - domain_address = DomainAddress.make_domain_address( - self.user_profile, "test-assigns" - ) + domain_address = DomainAddress.make_domain_address(self.user, "test-assigns") assert domain_address.user == self.user @skip(reason="test not reliable, look at FIXME comment") @@ -1234,7 +1231,7 @@ def test_make_domain_address_makes_different_addresses(self): # not been fixed yet for i in range(5): domain_address = DomainAddress.make_domain_address( - self.user_profile, f"test-different-{i}" + self.user, f"test-different-{i}" ) assert domain_address.first_emailed_at is None domain_addresses = DomainAddress.objects.filter(user=self.user).values_list( @@ -1244,39 +1241,33 @@ def test_make_domain_address_makes_different_addresses(self): assert len(set(domain_addresses)) == 5 def test_make_domain_address_makes_requested_address(self): - domain_address = DomainAddress.make_domain_address(self.user_profile, "foobar") + domain_address = DomainAddress.make_domain_address(self.user, "foobar") assert domain_address.address == "foobar" assert domain_address.first_emailed_at is None @override_settings(MAX_ADDRESS_CREATION_PER_DAY=10) def test_make_domain_address_has_limit(self) -> None: for i in range(10): - DomainAddress.make_domain_address(self.user_profile, "foobar" + str(i)) + DomainAddress.make_domain_address(self.user, "foobar" + str(i)) with pytest.raises(CannotMakeAddressException) as exc_info: - DomainAddress.make_domain_address(self.user_profile, "one-too-many") + DomainAddress.make_domain_address(self.user, "one-too-many") assert exc_info.value.get_codes() == "account_is_paused" - domain_address_count = DomainAddress.objects.filter( - user=self.user_profile.user - ).count() + domain_address_count = DomainAddress.objects.filter(user=self.user).count() assert domain_address_count == 10 def test_make_domain_address_makes_requested_address_via_email(self): - domain_address = DomainAddress.make_domain_address( - self.user_profile, "foobar", True - ) + domain_address = DomainAddress.make_domain_address(self.user, "foobar", True) assert domain_address.address == "foobar" assert domain_address.first_emailed_at is not None def test_make_domain_address_non_premium_user(self) -> None: - non_premium_user_profile = baker.make(User).profile + non_premium_user = baker.make(User) with pytest.raises(CannotMakeAddressException) as exc_info: - DomainAddress.make_domain_address( - non_premium_user_profile, "test-non-premium" - ) + DomainAddress.make_domain_address(non_premium_user, "test-non-premium") assert exc_info.value.get_codes() == "free_tier_no_subdomain_masks" def test_make_domain_address_can_make_blocklisted_address(self): - domain_address = DomainAddress.make_domain_address(self.user_profile, "testing") + domain_address = DomainAddress.make_domain_address(self.user, "testing") assert domain_address.address == "testing" def test_make_domain_address_valid_premium_user_with_no_subdomain(self) -> None: @@ -1287,24 +1278,21 @@ def test_make_domain_address_valid_premium_user_with_no_subdomain(self) -> None: provider="fxa", extra_data={"subscriptions": [premium_subscription()]}, ) - user_profile = Profile.objects.get(user=user) with pytest.raises(CannotMakeAddressException) as exc_info: - DomainAddress.make_domain_address(user_profile, "test-nosubdomain") + DomainAddress.make_domain_address(user, "test-nosubdomain") assert exc_info.value.get_codes() == "need_subdomain" def test_make_domain_address_dupe_of_existing_raises(self): address = "same-address" - DomainAddress.make_domain_address(self.user_profile, address=address) + DomainAddress.make_domain_address(self.user, address=address) with pytest.raises(DomainAddrDuplicateException) as exc_info: - DomainAddress.make_domain_address(self.user_profile, address=address) + DomainAddress.make_domain_address(self.user, address=address) assert exc_info.value.get_codes() == "duplicate_address" @override_flag("custom_domain_management_redesign", active=False) def test_make_domain_address_can_make_dupe_of_deleted(self): address = "same-address" - domain_address = DomainAddress.make_domain_address( - self.user_profile, address=address - ) + domain_address = DomainAddress.make_domain_address(self.user, address=address) domain_address_hash = address_hash( domain_address.address, domain_address.user_profile.subdomain, @@ -1312,7 +1300,7 @@ def test_make_domain_address_can_make_dupe_of_deleted(self): ) domain_address.delete() dupe_domain_address = DomainAddress.make_domain_address( - self.user_profile, address=address + self.user, address=address ) assert ( DeletedAddress.objects.filter(address_hash=domain_address_hash).count() == 1 @@ -1322,9 +1310,7 @@ def test_make_domain_address_can_make_dupe_of_deleted(self): @override_flag("custom_domain_management_redesign", active=True) def test_make_domain_address_cannot_make_dupe_of_deleted(self): address = "same-address" - domain_address = DomainAddress.make_domain_address( - self.user_profile, address=address - ) + domain_address = DomainAddress.make_domain_address(self.user, address=address) domain_address_hash = address_hash( domain_address.address, domain_address.user_profile.subdomain, @@ -1332,7 +1318,7 @@ def test_make_domain_address_cannot_make_dupe_of_deleted(self): ) domain_address.delete() with pytest.raises(DomainAddrUnavailableException) as exc_info: - DomainAddress.make_domain_address(self.user_profile, address=address) + DomainAddress.make_domain_address(self.user, address=address) assert exc_info.value.get_codes() == "address_unavailable" assert ( DeletedAddress.objects.filter(address_hash=domain_address_hash).count() == 1 @@ -1344,7 +1330,7 @@ def test_make_domain_address_doesnt_randomly_generate_bad_word( ) -> None: address_default_mocked.return_value = "angry0123" with pytest.raises(CannotMakeAddressException) as exc_info: - DomainAddress.make_domain_address(self.user_profile) + DomainAddress.make_domain_address(self.user) assert exc_info.value.get_codes() == "address_unavailable" def test_delete_adds_deleted_address_object(self): @@ -1370,14 +1356,15 @@ def test_premium_user_can_set_block_list_emails(self): assert domain_address.block_list_emails is True def test_delete_increments_values_on_profile(self): - assert self.user_profile.num_address_deleted == 0 - assert self.user_profile.num_email_forwarded_in_deleted_address == 0 - assert self.user_profile.num_email_blocked_in_deleted_address == 0 - assert self.user_profile.num_level_one_trackers_blocked_in_deleted_address == 0 - assert self.user_profile.num_email_replied_in_deleted_address == 0 - assert self.user_profile.num_email_spam_in_deleted_address == 0 - assert self.user_profile.num_deleted_relay_addresses == 0 - assert self.user_profile.num_deleted_domain_addresses == 0 + profile = self.user.profile + assert profile.num_address_deleted == 0 + assert profile.num_email_forwarded_in_deleted_address == 0 + assert profile.num_email_blocked_in_deleted_address == 0 + assert profile.num_level_one_trackers_blocked_in_deleted_address == 0 + assert profile.num_email_replied_in_deleted_address == 0 + assert profile.num_email_spam_in_deleted_address == 0 + assert profile.num_deleted_relay_addresses == 0 + assert profile.num_deleted_domain_addresses == 0 domain_address = DomainAddress.objects.create( user=self.user, @@ -1390,15 +1377,15 @@ def test_delete_increments_values_on_profile(self): ) domain_address.delete() - self.user_profile.refresh_from_db() - assert self.user_profile.num_address_deleted == 1 - assert self.user_profile.num_email_forwarded_in_deleted_address == 2 - assert self.user_profile.num_email_blocked_in_deleted_address == 3 - assert self.user_profile.num_level_one_trackers_blocked_in_deleted_address == 4 - assert self.user_profile.num_email_replied_in_deleted_address == 5 - assert self.user_profile.num_email_spam_in_deleted_address == 6 - assert self.user_profile.num_deleted_relay_addresses == 0 - assert self.user_profile.num_deleted_domain_addresses == 1 + profile.refresh_from_db() + assert profile.num_address_deleted == 1 + assert profile.num_email_forwarded_in_deleted_address == 2 + assert profile.num_email_blocked_in_deleted_address == 3 + assert profile.num_level_one_trackers_blocked_in_deleted_address == 4 + assert profile.num_email_replied_in_deleted_address == 5 + assert profile.num_email_spam_in_deleted_address == 6 + assert profile.num_deleted_relay_addresses == 0 + assert profile.num_deleted_domain_addresses == 1 def test_formerly_premium_user_clears_block_list_emails(self): domain_address = DomainAddress.objects.create( @@ -1481,19 +1468,17 @@ def test_clear_block_list_emails_with_update_fields(self) -> None: assert not domain_address.block_list_emails def test_create_updates_profile_last_engagement(self) -> None: - DomainAddress.make_domain_address(self.user.profile, address="create") + DomainAddress.make_domain_address(self.user, address="create") assert self.user.profile.last_engagement pre_create_last_engagement = self.user.profile.last_engagement - DomainAddress.make_domain_address(self.user.profile, address="create2") + DomainAddress.make_domain_address(self.user, address="create2") self.user.profile.refresh_from_db() assert self.user.profile.last_engagement > pre_create_last_engagement def test_save_does_not_update_profile_last_engagement(self) -> None: - domain_address = DomainAddress.make_domain_address( - self.user.profile, address="save" - ) + domain_address = DomainAddress.make_domain_address(self.user, address="save") assert self.user.profile.last_engagement pre_save_last_engagement = self.user.profile.last_engagement @@ -1504,9 +1489,7 @@ def test_save_does_not_update_profile_last_engagement(self) -> None: assert self.user.profile.last_engagement == pre_save_last_engagement def test_delete_updates_profile_last_engagement(self) -> None: - domain_address = DomainAddress.make_domain_address( - self.user.profile, address="delete" - ) + domain_address = DomainAddress.make_domain_address(self.user, address="delete") assert self.user.profile.last_engagement pre_delete_last_engagement = self.user.profile.last_engagement diff --git a/emails/tests/validator_tests.py b/emails/tests/validator_tests.py index d105662a0d..ab25f1baff 100644 --- a/emails/tests/validator_tests.py +++ b/emails/tests/validator_tests.py @@ -174,9 +174,7 @@ def test_valid_address_dupe_domain_address_of_deleted_is_not_valid(self) -> None user.profile.subdomain = "mysubdomain" user.profile.save() address = "same-address" - domain_address = DomainAddress.make_domain_address( - user.profile, address=address - ) + domain_address = DomainAddress.make_domain_address(user, address=address) domain_address.delete() assert not valid_address( address, domain_address.domain_value, user.profile.subdomain diff --git a/emails/views.py b/emails/views.py index f84dc8e2b4..b8bbc924c8 100644 --- a/emails/views.py +++ b/emails/views.py @@ -1379,7 +1379,7 @@ def _get_domain_address(local_portion: str, domain_portion: str) -> DomainAddres # was unable to receive an email due to user no longer being a # premium user as seen in exception thrown on make_domain_address domain_address = DomainAddress.make_domain_address( - locked_profile, local_portion, True + locked_profile.user, local_portion, True ) glean_logger().log_email_mask_created( mask=domain_address, From 4708dd45f3a23099009a886f29567d97f95e5b64 Mon Sep 17 00:00:00 2001 From: John Whitlock Date: Thu, 20 Jun 2024 12:31:24 -0500 Subject: [PATCH 6/7] Move check_user_can_make_*_address to validators --- emails/models.py | 36 ++---------------------------------- emails/validators.py | 40 +++++++++++++++++++++++++++++++++++++++- 2 files changed, 41 insertions(+), 35 deletions(-) diff --git a/emails/models.py b/emails/models.py index a27716ddb9..c9ba8f60b3 100644 --- a/emails/models.py +++ b/emails/models.py @@ -31,18 +31,15 @@ ) from .exceptions import ( - AccountIsInactiveException, - AccountIsPausedException, CannotMakeSubdomainException, DomainAddrDuplicateException, - DomainAddrFreeTierException, - DomainAddrNeedSubdomainException, DomainAddrUnavailableException, DomainAddrUpdateException, - RelayAddrFreeTierLimitException, ) from .utils import get_domains_from_settings, incr_if_enabled from .validators import ( + check_user_can_make_another_address, + check_user_can_make_domain_address, is_blocklisted, valid_address, valid_available_subdomain, @@ -726,20 +723,6 @@ def metrics_id(self) -> str: return f"R{self.id}" -def check_user_can_make_another_address(user: User) -> None: - """Raise an exception if the user can not make a RelayAddress.""" - if not user.is_active: - raise AccountIsInactiveException() - - if user.profile.is_flagged: - raise AccountIsPausedException() - # MPP-3021: return early for premium users to avoid at_max_free_aliases DB query - if user.profile.has_premium: - return - if user.profile.at_max_free_aliases: - raise RelayAddrFreeTierLimitException() - - class DeletedAddress(models.Model): address_hash = models.CharField(max_length=64, db_index=True) num_forwarded = models.PositiveIntegerField(default=0) @@ -751,21 +734,6 @@ def __str__(self): return self.address_hash -def check_user_can_make_domain_address(user: User) -> None: - """Raise an exception if the user can not make a DomainAddress.""" - if not user.profile.has_premium: - raise DomainAddrFreeTierException() - - if not user.profile.subdomain: - raise DomainAddrNeedSubdomainException() - - if not user.is_active: - raise AccountIsInactiveException() - - if user.profile.is_flagged: - raise AccountIsPausedException() - - class DomainAddress(models.Model): user = models.ForeignKey(User, on_delete=models.CASCADE) address = models.CharField( diff --git a/emails/validators.py b/emails/validators.py index 443786d27a..906bde1847 100644 --- a/emails/validators.py +++ b/emails/validators.py @@ -3,10 +3,19 @@ import re from typing import Any +from django.contrib.auth.models import User + from privaterelay.utils import flag_is_active_in_task from .apps import emails_config -from .exceptions import CannotMakeSubdomainException +from .exceptions import ( + AccountIsInactiveException, + AccountIsPausedException, + CannotMakeSubdomainException, + DomainAddrFreeTierException, + DomainAddrNeedSubdomainException, + RelayAddrFreeTierLimitException, +) # A valid local / username part of an email address: # can't start or end with a hyphen @@ -45,6 +54,35 @@ def is_blocklisted(value: str) -> bool: return any(blockedword == value for blockedword in blocklist()) +def check_user_can_make_another_address(user: User) -> None: + """Raise an exception if the user can not make a RelayAddress.""" + if not user.is_active: + raise AccountIsInactiveException() + + if user.profile.is_flagged: + raise AccountIsPausedException() + # MPP-3021: return early for premium users to avoid at_max_free_aliases DB query + if user.profile.has_premium: + return + if user.profile.at_max_free_aliases: + raise RelayAddrFreeTierLimitException() + + +def check_user_can_make_domain_address(user: User) -> None: + """Raise an exception if the user can not make a DomainAddress.""" + if not user.profile.has_premium: + raise DomainAddrFreeTierException() + + if not user.profile.subdomain: + raise DomainAddrNeedSubdomainException() + + if not user.is_active: + raise AccountIsInactiveException() + + if user.profile.is_flagged: + raise AccountIsPausedException() + + def valid_address(address: str, domain: str, subdomain: str | None = None) -> bool: """Return if the given address parts make a valid Relay email.""" from .models import DeletedAddress, address_hash From 6a8ceddeffe5e5b3a6fdad2433a76e852495ca5d Mon Sep 17 00:00:00 2001 From: John Whitlock Date: Thu, 20 Jun 2024 12:49:06 -0500 Subject: [PATCH 7/7] Move functions to top of emails/models.py --- emails/models.py | 86 ++++++++++++++++++++++++++---------------------- 1 file changed, 47 insertions(+), 39 deletions(-) diff --git a/emails/models.py b/emails/models.py index c9ba8f60b3..9b82d643cf 100644 --- a/emails/models.py +++ b/emails/models.py @@ -58,18 +58,61 @@ PREMIUM_DOMAINS = ["mozilla.com", "getpocket.com", "mozillafoundation.org"] -# This historical function is referenced in migration -# 0029_profile_add_deleted_metric_and_changeserver_storage_default -def default_server_storage(): +def default_server_storage() -> bool: + """ + This historical function is referenced in migration + 0029_profile_add_deleted_metric_and_changeserver_storage_default + """ return True -def default_domain_numerical(): +def default_domain_numerical() -> int: + """Return the default value for RelayAddress.domain""" domains = get_domains_from_settings() domain = domains["MOZMAIL_DOMAIN"] return get_domain_numerical(domain) +def get_domain_numerical(domain_address: str) -> int: + """Turn a domain name into a numerical domain""" + # get domain name from the address + domains = get_domains_from_settings() + domains_keys = list(domains.keys()) + domains_values = list(domains.values()) + domain_name = domains_keys[domains_values.index(domain_address)] + # get domain numerical value from domain name + choices = dict(DOMAIN_CHOICES) + choices_keys = list(choices.keys()) + choices_values = list(choices.values()) + return choices_keys[choices_values.index(domain_name)] + + +def address_hash( + address: str, subdomain: str | None = None, domain: str | None = None +) -> str: + """Create a hash of a Relay address, to prevent re-use.""" + if not domain: + domain = get_domains_from_settings()["MOZMAIL_DOMAIN"] + if subdomain: + return sha256(f"{address}@{subdomain}.{domain}".encode()).hexdigest() + if domain == settings.RELAY_FIREFOX_DOMAIN: + return sha256(f"{address}".encode()).hexdigest() + return sha256(f"{address}@{domain}".encode()).hexdigest() + + +def address_default() -> str: + """Return a random value for RelayAddress.address""" + return "".join( + random.choices( # noqa: S311 (standard pseudo-random generator used) + string.ascii_lowercase + string.digits, k=9 + ) + ) + + +def hash_subdomain(subdomain: str, domain: str = settings.MOZMAIL_DOMAIN) -> str: + return sha256(f"{subdomain}.{domain}".encode()).hexdigest() + + class Profile(models.Model): user = models.OneToOneField(User, on_delete=models.CASCADE) api_token = models.UUIDField(default=uuid.uuid4) @@ -559,41 +602,6 @@ def metrics_premium_status(self) -> str: return f"{plan}_{self.plan_term}" -def address_hash(address, subdomain=None, domain=None): - if not domain: - domain = get_domains_from_settings()["MOZMAIL_DOMAIN"] - if subdomain: - return sha256(f"{address}@{subdomain}.{domain}".encode()).hexdigest() - if domain == settings.RELAY_FIREFOX_DOMAIN: - return sha256(f"{address}".encode()).hexdigest() - return sha256(f"{address}@{domain}".encode()).hexdigest() - - -def address_default(): - return "".join( - random.choices( # noqa: S311 (standard pseudo-random generator used) - string.ascii_lowercase + string.digits, k=9 - ) - ) - - -def get_domain_numerical(domain_address): - # get domain name from the address - domains = get_domains_from_settings() - domains_keys = list(domains.keys()) - domains_values = list(domains.values()) - domain_name = domains_keys[domains_values.index(domain_address)] - # get domain numerical value from domain name - choices = dict(DOMAIN_CHOICES) - choices_keys = list(choices.keys()) - choices_values = list(choices.values()) - return choices_keys[choices_values.index(domain_name)] - - -def hash_subdomain(subdomain, domain=settings.MOZMAIL_DOMAIN): - return sha256(f"{subdomain}.{domain}".encode()).hexdigest() - - class RegisteredSubdomain(models.Model): subdomain_hash = models.CharField(max_length=64, db_index=True, unique=True) registered_at = models.DateTimeField(auto_now_add=True)