diff --git a/asab/storage/mongodb.py b/asab/storage/mongodb.py index e5c37eae0..bd1086b32 100644 --- a/asab/storage/mongodb.py +++ b/asab/storage/mongodb.py @@ -181,7 +181,7 @@ async def execute(self, custom_data: typing.Optional[dict] = None): # pass # obj[k] = o - if self.Storage.WebhookURI is not None: + if self.Storage.WebhookURIs is not None: webhook_data = { "collection": self.Collection, } @@ -205,6 +205,6 @@ async def execute(self, custom_data: typing.Optional[dict] = None): upsertor_data["unset"] = {k: v for k, v in self.ModUnset.items() if not k.startswith("__")} webhook_data["upsertor"] = upsertor_data - await self._webhook(webhook_data) + await self.webhook(webhook_data) return self.ObjId diff --git a/asab/storage/service.py b/asab/storage/service.py index 017da459d..b21b0e9bc 100644 --- a/asab/storage/service.py +++ b/asab/storage/service.py @@ -1,8 +1,9 @@ import abc import secrets import hashlib - +import logging import asab +import re try: import cryptography.hazmat.primitives.ciphers @@ -11,6 +12,12 @@ except ModuleNotFoundError: cryptography = None +# + +L = logging.getLogger(__name__) + +# + ENCRYPTED_PREFIX = b"$aes-cbc$" @@ -19,7 +26,13 @@ class StorageServiceABC(asab.Service): def __init__(self, app, service_name): super().__init__(app, service_name) - self.WebhookURI = asab.Config.get("asab:storage:changestream", "webhook_uri", fallback="") or None + self.WebhookURIs = asab.Config.get("asab:storage:changestream", "webhook_uri", fallback="") or None + if self.WebhookURIs is not None: + self.WebhookURIs = [uri for uri in re.split(r"\s+", self.WebhookURIs) if len(uri) > 0] + try: + self.ProactorService = app.get_service("asab.ProactorService") + except KeyError as e: + raise Exception("Storage webhooks require ProactorService") from e self.WebhookAuth = asab.Config.get("asab:storage:changestream", "webhook_auth", fallback="") or None # Specify a non-empty AES key to enable AES encryption of selected fields diff --git a/asab/storage/upsertor.py b/asab/storage/upsertor.py index 40a45aedf..25044fe8b 100644 --- a/asab/storage/upsertor.py +++ b/asab/storage/upsertor.py @@ -4,8 +4,8 @@ import hashlib import datetime import logging -import aiohttp import asab.web.rest.json +import requests import typing # @@ -43,7 +43,7 @@ def __init__(self, storage, collection, obj_id, version=None): self.ModPush = {} self.ModPull = {} - self.WebhookResponseData = None + self.WebhookResponseData = {} def get_id_name(self): @@ -117,14 +117,14 @@ async def execute(self, custom_data: typing.Optional[dict] = None): ```python upsertor = storage_service.upsertor("users") upsertor.set("name", "Raccoon") - await upsertor.execute(custom_data={"action": "user_creation"}) + await upsertor.execute(custom_data={"event_type": "create_user"}) ``` will trigger a webhook whose payload may look like this: ```json { "collection": "users", - "custom": {"action": "user_creation"}, + "custom": {"event_type": "create_user"}, "upsertor": { "id": "2O-h3ulpO-ZwDrkSbQlYB3pYS0JJxCJj3nr6uQAu8aU", "id_field_name": "_id", @@ -143,21 +143,31 @@ async def execute(self, custom_data: typing.Optional[dict] = None): pass - async def _webhook(self, data: dict): - assert self.Storage.WebhookURI is not None + async def webhook(self, data: dict): + assert self.Storage.WebhookURIs is not None json_dump = asab.web.rest.json.JSONDumper(pretty=False)(data) + for uri in self.Storage.WebhookURIs: + self.WebhookResponseData[uri] = await self.Storage.ProactorService.execute( + self._webhook, json_dump, uri, self.Storage.WebhookAuth) + + + + def _webhook(self, data, uri, auth=None): try: - async with aiohttp.ClientSession(auth=self.Storage.WebhookAuth) as session: - async with session.put( - self.Storage.WebhookURI, - data=json_dump, + with requests.Session() as session: + if self.Storage.WebhookAuth: + session.headers["Authorization"] = self.Storage.WebhookAuth + with session.put( + uri, + data=data, headers={"Content-Type": "application/json"} ) as response: - if response.status // 100 != 2: - text = await response.text() - L.error("Webhook endpoint responded with {}:\n{}".format(response.status, text)) - return - self.WebhookResponseData = await response.json() + if response.status_code // 100 != 2: + text = response.text + L.error( + "Webhook endpoint responded with {}:\n{}".format(response.status_code, text), + struct_data={"uri": uri}) + return response.json() except json.decoder.JSONDecodeError as e: L.error("Failed to decode JSON response from webhook: {}".format(str(e))) except Exception as e: