Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a Redis queue to sasquatch-backpack's usgs module #52

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
4 changes: 2 additions & 2 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
repos:
- repo: https://github.com/pre-commit/pre-commit-hooks
rev: v4.6.0
rev: v5.0.0
hooks:
- id: check-toml
- id: check-yaml
- id: trailing-whitespace

- repo: https://github.com/astral-sh/ruff-pre-commit
rev: v0.6.5
rev: v0.7.3
hooks:
- id: ruff
args: [--fix, --exit-non-zero-on-fix]
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ classifiers = [
]
requires-python = ">=3.12"
# Use requirements/main.in for runtime dependencies instead.
dependencies = []
dependencies = ["redis"]
dynamic = ["version"]

[project.scripts]
Expand Down
1,088 changes: 552 additions & 536 deletions requirements/dev.txt

Large diffs are not rendered by default.

1,227 changes: 717 additions & 510 deletions requirements/main.txt

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions requirements/tox.in
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@
-c dev.txt

tox
tox-docker
tox-uv
216 changes: 182 additions & 34 deletions requirements/tox.txt

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions src/sasquatchbackpack/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,4 @@ def main() -> None:


main.add_command(usgs.usgs_earthquake_data)
main.add_command(usgs.test_usgs_redis)
88 changes: 73 additions & 15 deletions src/sasquatchbackpack/commands/usgs.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@
import click

from sasquatchbackpack import sasquatch
from sasquatchbackpack.scripts import usgs
from sasquatchbackpack.scripts import usgs as scripts

DEFAULT_RADIUS = 400

DEFAULT_COORDS = (-30.22573200864174, -70.73932987127506)

DEFAULT_MAGNITUDE_BOUNDS = (2, 10)

# ruff: noqa:TD002
# ruff: noqa:TD003


def check_duration(
ctx: click.Context, param: dict, value: tuple[int, int]
Expand Down Expand Up @@ -135,32 +138,49 @@ def check_magnitude_bounds(
callback=check_magnitude_bounds,
)
@click.option(
"--dry-run",
"--post",
is_flag=True,
default=False,
help="Perform a trial run with no data being sent to Kafka.",
help=(
"Allows the user to specify that the API output should be "
"posted to kafka"
),
)
def usgs_earthquake_data(
duration: tuple[int, int],
radius: int,
coords: tuple[float, float],
magnitude_bounds: tuple[int, int],
dry_run: bool, # noqa: FBT001
post: bool, # noqa: FBT001
) -> None:
"""Seaches USGS databases for relevant earthquake data and prints it
to console. Optionally, also allows the user to post the
queried data to kafka.
"""
click.echo(
f"Querying USGS with post mode {"enabled" if post else "disabled"}..."
)

days, hours = duration
total_duration = timedelta(days=days, hours=hours)

results = usgs.search_api(
config = scripts.USGSConfig(
total_duration, radius, coords, magnitude_bounds
)
source = scripts.USGSSource(config)
backpack_dispatcher = sasquatch.BackpackDispatcher(
source, sasquatch.DispatcherConfig()
)

results = scripts.search_api(
total_duration,
radius,
coords,
magnitude_bounds,
)

# TODO: Add CLI feedback on what items are cached to redis
# and which are new
if len(results) > 0:
click.secho("SUCCESS!", fg="green")
click.echo("------")
Expand All @@ -174,21 +194,59 @@ def usgs_earthquake_data(
click.echo("------")
return

if dry_run:
click.echo("Dry run mode: No data will be sent to Kafka.")
if not post:
click.echo("Post mode is disabled: No data will be sent to Kafka.")
return

click.echo("Sending data...")

config = usgs.USGSConfig(total_duration, radius, coords, magnitude_bounds)
source = usgs.USGSSource(config)
click.echo("Post mode enabled: Sending data...")

backpack_dispatcher = sasquatch.BackpackDispatcher(
source, sasquatch.DispatcherConfig()
)
result = backpack_dispatcher.post()
result, records = backpack_dispatcher.post()

if "Error" in result:
click.secho(result, fg="red")
elif "Warning" in result:
click.secho(result, fg="yellow")
else:
click.secho("Data successfully sent!", fg="green")
click.echo("The following items were added to Kafka:")

click.echo("------")
for record in records:
value = record["value"]
click.echo(
f"{value['id']} "
f"({value['latitude']}, {value['longitude']}) "
f"{value['depth']} km "
f"M{value['magnitude']}"
)
click.echo("------")

click.echo(
"All entries missing from this list "
"have been identified as already present in Kafka."
)


# Should be a test
@click.command()
def test_usgs_redis() -> None:
"""Test redis implementation."""
erm = sasquatch.RedisManager(address="redis://localhost:6379/0")
config = scripts.USGSConfig(
timedelta(days=10),
DEFAULT_RADIUS,
DEFAULT_COORDS,
DEFAULT_MAGNITUDE_BOUNDS,
)
source = scripts.USGSSource(config)

records = source.get_records()

# for record in records:
# Using earthquake id as redis key
record = records[0]
erm.store(
source.get_redis_key(record),
)
key = source.get_redis_key(record)
click.echo(f"Key '{key}' returns: {erm.get(key)}")
77 changes: 72 additions & 5 deletions src/sasquatchbackpack/sasquatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,21 @@

__all__ = ["BackpackDispatcher", "DispatcherConfig", "DataSource"]

import asyncio
import os
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from string import Template

import redis.asyncio as redis
import requests

# Code yoinked from https://github.com/lsst-sqre/
# sasquatch/blob/main/examples/RestProxyAPIExample.ipynb

# ruff: noqa:TD002
# ruff: noqa:TD003


class DataSource(ABC):
"""Base class for all relevant backpack data sources.
Expand All @@ -33,6 +38,30 @@ def load_schema(self) -> str:
def get_records(self) -> list[dict]:
pass

@abstractmethod
def get_redis_key(self, datapoint: dict) -> str:
pass


class RedisManager:
"""Manage redis for USGS."""

def __init__(self, address: str) -> None:
self.address = address
self.model = redis.from_url(self.address)

self.loop = asyncio.new_event_loop()

def store(self, key: str, item: str = "value") -> None:
if self.model is None:
raise RuntimeError("Model is undefined.")
self.loop.run_until_complete(self.model.set(key, item))

def get(self, key: str) -> str:
if self.model is None:
raise RuntimeError("Model is undefined.")
return self.loop.run_until_complete(self.model.get(key))


@dataclass
class DispatcherConfig:
Expand All @@ -55,6 +84,8 @@ class DispatcherConfig:
default=os.getenv("BACKPACK_NAMESPACE", "lsst.backpack")
)
"""Sasquatch namespace for the topic"""
redis_address: str = "redis://localhost:6379/0"
"""Address of Redis server"""


class BackpackDispatcher:
Expand All @@ -79,6 +110,7 @@ def __init__(self, source: DataSource, config: DispatcherConfig) -> None:
"topic_name": self.source.topic_name,
}
)
self.redis = RedisManager(config.redis_address)

def create_topic(self) -> str:
"""Create kafka topic based off data from provided source.
Expand Down Expand Up @@ -124,16 +156,47 @@ def create_topic(self) -> str:

return response.text

def post(self) -> str:
def _remove_redis_duplicates(self, records: list[dict]) -> list[dict]:
"""Check the redis server for any duplicate data points
present in the provided records, and return a list with them removed.

Parameters
----------
records : list[dict]
Output of a source.get_records() call.

Returns
-------
final : list[dict]
List with duplicate elements in common with those
on the redis server removed.
"""
final = []

for record in records:
if self.redis.get(self.source.get_redis_key(record)) is None:
final.append(record) # noqa: PERF401

return final

def post(self) -> tuple[str, list]:
"""Assemble schema and payload from the given source, then
makes a POST request to kafka.

Returns
-------
response text : str
response-text : str
The results of the POST request in string format
records : list
List of earthquakes with those already stored on remote removed
"""
records = self.source.get_records()
records = self._remove_redis_duplicates(self.source.get_records())

if len(records) == 0:
return (
"Warning: All entries already present, aborting POST request",
records,
)

payload = {"value_schema": self.schema, "records": records}

Expand All @@ -156,7 +219,11 @@ def post(self) -> str:
timeout=10,
)
response.raise_for_status() # Raises HTTPError for bad responses

except requests.RequestException as e:
return f"Error POSTing data: {e}"
return f"Error POSTing data: {e}", records

return response.text
for record in records:
self.redis.store(self.source.get_redis_key(record))

return response.text, records
20 changes: 10 additions & 10 deletions src/sasquatchbackpack/schemas/usgs.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
"""USGS Schemas."""

from dataclasses import dataclass, field
from dataclasses_avroschema.pydantic import AvroBaseModel
from pydantic import Field

from dataclasses_avroschema.schema_generator import AvroModel


@dataclass
class EarthquakeSchema(AvroModel):
class EarthquakeSchema(AvroBaseModel):
"""Collection of earthquakes near the summit."""

timestamp: int
id: str = field(metadata={"description": "unique earthquake id"})
latitude: float = field(metadata={"units": "degree"})
longitude: float = field(metadata={"units": "degree"})
depth: float = field(metadata={"units": "km"})
magnitude: float = field(metadata={"units": "u.richter_magnitudes"})
id: str = Field(description="unique earthquake id")
latitude: float = Field(json_schema_extra={"units": "degree"})
longitude: float = Field(json_schema_extra={"units": "degree"})
depth: float = Field(json_schema_extra={"units": "km"})
magnitude: float = Field(
json_schema_extra={"units": "u.richter_magnitudes"}
)

class Meta:
"""Schema metadata."""
Expand Down
26 changes: 18 additions & 8 deletions src/sasquatchbackpack/scripts/usgs.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,12 +65,6 @@ class USGSConfig:
(latitude, longitude)
magnitude_bounds : tuple[int, int]
Upper and lower bounds for magnitude search (lower, upper)
schema_file : `str`, optional
Directory path to the relevant source schema
(src/sasquatchbackpack/schemas/schema_name_here.avsc), optional,
defaults to src/sasquatchbackpack/schemas/usgs.avsc
cron_schema : `str`, optional
Directory path to the relevant source schema from a cronjob.
topic_name : `str`, optional
Name of the the sasquatch topic
"""
Expand All @@ -90,8 +84,6 @@ class USGSSource(DataSource):
config : USGSConfig
USGSConfig to transmit relevant information to
the Source
topic_name : str
Specific source name, used as an identifier
"""

def __init__(
Expand Down Expand Up @@ -145,3 +137,21 @@ def get_records(self) -> list[dict]:
raise ConnectionError(
f"A connection error occurred while fetching records: {ce}"
) from ce

def get_redis_key(self, datapoint: dict) -> str:
"""Allow USGS API to format its own redis keys.
For usage in the BackpackDispatcher.

Parameters
----------
datapoint : dict
An individual result from the list output of get_records().

Returns
-------
str : str
A deterministic redis key for this specific data point.
"""
# Redis keys are formatted "topic_name:key_value"
# to keep data from different APIs discreet
return f"{self.topic_name}:{datapoint["value"]["id"]}"
Loading
Loading