Skip to content

Commit

Permalink
Merge pull request #180 from opensanctions/duckdb-bulk-enricher
Browse files Browse the repository at this point in the history
Duckdb bulk enricher
  • Loading branch information
jbothma authored Nov 19, 2024
2 parents 0ed8da8 + fe1af3a commit 52fb824
Show file tree
Hide file tree
Showing 11 changed files with 307 additions and 127 deletions.
140 changes: 115 additions & 25 deletions nomenklatura/enrich/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
import logging
from importlib import import_module
from typing import Iterable, Generator, Optional, Type, cast
from typing import Dict, Iterable, Generator, Optional, Type, cast

from nomenklatura.entity import CE
from nomenklatura.dataset import DS
from nomenklatura.cache import Cache
from nomenklatura.matching import DefaultAlgorithm
from nomenklatura.enrich.common import Enricher, EnricherConfig
from nomenklatura.enrich.common import (
Enricher,
EnricherConfig,
ItemEnricher,
BulkEnricher,
)
from nomenklatura.enrich.common import EnrichmentAbort, EnrichmentException
from nomenklatura.judgement import Judgement
from nomenklatura.resolver import Resolver
Expand All @@ -16,6 +21,7 @@
"Enricher",
"EnrichmentAbort",
"EnrichmentException",
"BulkEnricher",
"make_enricher",
"enrich",
"match",
Expand All @@ -42,44 +48,128 @@ def make_enricher(
# nk dedupe -i entities-with-matches.json -r resolver.json
def match(
enricher: Enricher[DS], resolver: Resolver[CE], entities: Iterable[CE]
) -> Generator[CE, None, None]:
if isinstance(enricher, BulkEnricher):
yield from get_bulk_matches(enricher, resolver, entities)
elif isinstance(enricher, ItemEnricher):
yield from get_itemwise_matches(enricher, resolver, entities)
else:
raise EnrichmentException("Invalid enricher type: %r" % enricher)


def get_itemwise_matches(
enricher: ItemEnricher[DS], resolver: Resolver[CE], entities: Iterable[CE]
) -> Generator[CE, None, None]:
for entity in entities:
yield entity
try:
for match in enricher.match_wrapped(entity):
if entity.id is None or match.id is None:
continue
if not resolver.check_candidate(entity.id, match.id):
continue
if not entity.schema.can_match(match.schema):
continue
result = DefaultAlgorithm.compare(entity, match)
log.info("Match [%s]: %.2f -> %s", entity, result.score, match)
resolver.suggest(entity.id, match.id, result.score)
match.datasets.add(enricher.dataset.name)
match = resolver.apply(match)
yield match
match_result = match_item(entity, match, resolver, enricher.dataset)
if match_result is not None:
yield match_result
except EnrichmentException:
log.exception("Failed to match: %r" % entity)


def get_bulk_matches(
enricher: BulkEnricher[DS], resolver: Resolver[CE], entities: Iterable[CE]
) -> Generator[CE, None, None]:
entity_lookup: Dict[str, CE] = {}
for entity in entities:
try:
enricher.load_wrapped(entity)
if entity.id is None:
raise EnrichmentException("Entity has no ID: %r" % entity)
if entity.id in entity_lookup:
raise EnrichmentException("Duplicate entity ID: %r" % entity.id)
entity_lookup[entity.id] = entity
except EnrichmentException:
log.exception("Failed to match: %r" % entity)
for entity_id, candidate_set in enricher.candidates():
entity = entity_lookup[entity_id.id]
try:
for match in enricher.match_candidates(entity, candidate_set):
match_result = match_item(entity, match, resolver, enricher.dataset)
if match_result is not None:
yield match_result
except EnrichmentException:
log.exception("Failed to match: %r" % entity)


def match_item(
entity: CE, match: CE, resolver: Resolver[CE], dataset: DS
) -> Optional[CE]:
if entity.id is None or match.id is None:
return None
if not resolver.check_candidate(entity.id, match.id):
return None
if not entity.schema.can_match(match.schema):
return None
result = DefaultAlgorithm.compare(entity, match)
log.info("Match [%s]: %.2f -> %s", entity, result.score, match)
resolver.suggest(entity.id, match.id, result.score)
match.datasets.add(dataset.name)
match = resolver.apply(match)
return match


# nk enrich -i entities.json -r resolver.json -o combined.json
def enrich(
enricher: Enricher[DS], resolver: Resolver[CE], entities: Iterable[CE]
) -> Generator[CE, None, None]:
if isinstance(enricher, BulkEnricher):
yield from get_bulk_enrichments(enricher, resolver, entities)
elif isinstance(enricher, ItemEnricher):
yield from get_itemwise_enrichments(enricher, resolver, entities)
else:
raise EnrichmentException("Invalid enricher type: %r" % enricher)


def get_itemwise_enrichments(
enricher: ItemEnricher[DS], resolver: Resolver[CE], entities: Iterable[CE]
) -> Generator[CE, None, None]:
for entity in entities:
try:
for match in enricher.match_wrapped(entity):
if entity.id is None or match.id is None:
continue
judgement = resolver.get_judgement(match.id, entity.id)
if judgement != Judgement.POSITIVE:
continue

log.info("Enrich [%s]: %r", entity, match)
for adjacent in enricher.expand_wrapped(entity, match):
adjacent.datasets.add(enricher.dataset.name)
adjacent = resolver.apply(adjacent)
yield adjacent
yield from enrich_item(enricher, entity, match, resolver)
except EnrichmentException:
log.exception("Failed to enrich: %r" % entity)


def get_bulk_enrichments(
enricher: BulkEnricher[DS], resolver: Resolver[CE], entities: Iterable[CE]
) -> Generator[CE, None, None]:
entity_lookup: Dict[str, CE] = {}
for entity in entities:
try:
enricher.load_wrapped(entity)
if entity.id is None:
raise EnrichmentException("Entity has no ID: %r" % entity)
if entity.id in entity_lookup:
raise EnrichmentException("Duplicate entity ID: %r" % entity.id)
entity_lookup[entity.id] = entity
except EnrichmentException:
log.exception("Failed to match: %r" % entity)
for entity_id, candidate_set in enricher.candidates():
entity = entity_lookup[entity_id.id]
try:
for match in enricher.match_candidates(entity, candidate_set):
yield from enrich_item(enricher, entity, match, resolver)
except EnrichmentException:
log.exception("Failed to enrich: %r" % entity)


def enrich_item(
enricher: Enricher[DS], entity: CE, match: CE, resolver: Resolver[CE]
) -> Generator[CE, None, None]:
if entity.id is None or match.id is None:
return None
judgement = resolver.get_judgement(match.id, entity.id)
if judgement != Judgement.POSITIVE:
return None

log.info("Enrich [%s]: %r", entity, match)
for adjacent in enricher.expand_wrapped(entity, match):
adjacent.datasets.add(enricher.dataset.name)
adjacent = resolver.apply(adjacent)
yield adjacent
4 changes: 2 additions & 2 deletions nomenklatura/enrich/aleph.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@
from nomenklatura.entity import CE
from nomenklatura.dataset import DS
from nomenklatura.cache import Cache
from nomenklatura.enrich.common import Enricher, EnricherConfig
from nomenklatura.enrich.common import ItemEnricher, EnricherConfig

log = logging.getLogger(__name__)


class AlephEnricher(Enricher[DS]):
class AlephEnricher(ItemEnricher[DS]):
def __init__(self, dataset: DS, cache: Cache, config: EnricherConfig):
super().__init__(dataset, cache, config)
self._host: str = os.environ.get("ALEPH_HOST", "https://aleph.occrp.org/")
Expand Down
60 changes: 51 additions & 9 deletions nomenklatura/enrich/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import logging
import time
from banal import as_bool
from typing import Union, Any, Dict, Optional, Generator, Generic
from typing import List, Tuple, Union, Any, Dict, Optional, Generator, Generic
from abc import ABC, abstractmethod
from requests import Session
from requests.exceptions import RequestException
Expand All @@ -18,8 +18,12 @@
from nomenklatura.dataset import DS
from nomenklatura.cache import Cache
from nomenklatura.util import HeadersType
from nomenklatura.resolver import Identifier

EnricherConfig = Dict[str, Any]
MatchCandidates = List[Tuple[Identifier, float]]
"""A list of candidate matches with their scores from a cheaper blocking comparison."""

log = logging.getLogger(__name__)


Expand Down Expand Up @@ -183,25 +187,63 @@ def _filter_entity(self, entity: CompositeEntity) -> bool:
return False
return True

def expand_wrapped(self, entity: CE, match: CE) -> Generator[CE, None, None]:
if not self._filter_entity(entity):
return
yield from self.expand(entity, match)

@abstractmethod
def expand(self, entity: CE, match: CE) -> Generator[CE, None, None]:
raise NotImplementedError()

def close(self) -> None:
self.cache.close()
if self._session is not None:
self._session.close()


class ItemEnricher(Enricher[DS], ABC):
"""
An enricher which performs matching on individual entities, one at a time.
"""

def match_wrapped(self, entity: CE) -> Generator[CE, None, None]:
if not self._filter_entity(entity):
return
yield from self.match(entity)

def expand_wrapped(self, entity: CE, match: CE) -> Generator[CE, None, None]:
@abstractmethod
def match(self, entity: CE) -> Generator[CE, None, None]:
raise NotImplementedError()


class BulkEnricher(Enricher[DS], ABC):
"""
An enricher which performs matching in bulk, requiring all subject entities
to be loaded before matching.
Once loaded, matching can be done by iterating over the `candidates` method
which provides the subject entity ID and a list of IDs of candidate matches.
`match_candidates` is then called for each subject entity and its
`MatchCandidates` yielding matching entities.
"""

def load_wrapped(self, entity: CE) -> None:
if not self._filter_entity(entity):
return
yield from self.expand(entity, match)
self.load(entity)

@abstractmethod
def match(self, entity: CE) -> Generator[CE, None, None]:
def load(self, entity: CE) -> None:
raise NotImplementedError()

@abstractmethod
def expand(self, entity: CE, match: CE) -> Generator[CE, None, None]:
def candidates(self) -> Generator[Tuple[Identifier, MatchCandidates], None, None]:
raise NotImplementedError()

def close(self) -> None:
self.cache.close()
if self._session is not None:
self._session.close()
@abstractmethod
def match_candidates(
self, entity: CE, candidates: MatchCandidates
) -> Generator[CE, None, None]:
raise NotImplementedError()
4 changes: 2 additions & 2 deletions nomenklatura/enrich/nominatim.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,14 @@
from nomenklatura.entity import CE
from nomenklatura.dataset import DS
from nomenklatura.cache import Cache
from nomenklatura.enrich.common import Enricher, EnricherConfig
from nomenklatura.enrich.common import ItemEnricher, EnricherConfig


log = logging.getLogger(__name__)
NOMINATIM = "https://nominatim.openstreetmap.org/search.php"


class NominatimEnricher(Enricher[DS]):
class NominatimEnricher(ItemEnricher[DS]):
def __init__(self, dataset: DS, cache: Cache, config: EnricherConfig):
super().__init__(dataset, cache, config)
self.cache.preload(f"{NOMINATIM}%")
Expand Down
4 changes: 2 additions & 2 deletions nomenklatura/enrich/opencorporates.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from nomenklatura.entity import CE
from nomenklatura.dataset import DS
from nomenklatura.cache import Cache
from nomenklatura.enrich.common import Enricher, EnricherConfig
from nomenklatura.enrich.common import ItemEnricher, EnricherConfig
from nomenklatura.enrich.common import EnrichmentAbort, EnrichmentException


Expand All @@ -22,7 +22,7 @@ def parse_date(raw: Any) -> Optional[str]:
return registry.date.clean(raw)


class OpenCorporatesEnricher(Enricher[DS]):
class OpenCorporatesEnricher(ItemEnricher[DS]):
COMPANY_SEARCH_API = "https://api.opencorporates.com/v0.4/companies/search"
OFFICER_SEARCH_API = "https://api.opencorporates.com/v0.4/officers/search"
UI_PART = "://opencorporates.com/"
Expand Down
4 changes: 2 additions & 2 deletions nomenklatura/enrich/openfigi.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@
from nomenklatura.entity import CE
from nomenklatura.dataset import DS
from nomenklatura.cache import Cache
from nomenklatura.enrich.common import Enricher, EnricherConfig
from nomenklatura.enrich.common import ItemEnricher, EnricherConfig

log = logging.getLogger(__name__)


class OpenFIGIEnricher(Enricher[DS]):
class OpenFIGIEnricher(ItemEnricher[DS]):
"""Uses the `OpenFIGI` search API to look up FIGIs by company name."""

SEARCH_URL = "https://api.openfigi.com/v3/search"
Expand Down
4 changes: 2 additions & 2 deletions nomenklatura/enrich/permid.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from nomenklatura.entity import CE
from nomenklatura.dataset import DS
from nomenklatura.cache import Cache
from nomenklatura.enrich.common import Enricher, EnricherConfig
from nomenklatura.enrich.common import ItemEnricher, EnricherConfig
from nomenklatura.enrich.common import EnrichmentAbort
from nomenklatura.util import fingerprint_name

Expand All @@ -28,7 +28,7 @@
}


class PermIDEnricher(Enricher[DS]):
class PermIDEnricher(ItemEnricher[DS]):
MATCHING_API = "https://api-eit.refinitiv.com/permid/match"

def __init__(self, dataset: DS, cache: Cache, config: EnricherConfig):
Expand Down
4 changes: 2 additions & 2 deletions nomenklatura/enrich/wikidata/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
PROPS_TOPICS,
)
from nomenklatura.enrich.wikidata.model import Claim, Item
from nomenklatura.enrich.common import Enricher, EnricherConfig
from nomenklatura.enrich.common import ItemEnricher, EnricherConfig

WD_API = "https://www.wikidata.org/w/api.php"
LABEL_PREFIX = "wd:lb:"
Expand All @@ -29,7 +29,7 @@ def clean_name(name: str) -> str:
return clean_brackets(name).strip()


class WikidataEnricher(Enricher[DS]):
class WikidataEnricher(ItemEnricher[DS]):
def __init__(self, dataset: DS, cache: Cache, config: EnricherConfig):
super().__init__(dataset, cache, config)
self.depth = self.get_config_int("depth", 1)
Expand Down
4 changes: 2 additions & 2 deletions nomenklatura/enrich/yente.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,13 @@
from nomenklatura.entity import CE, CompositeEntity
from nomenklatura.dataset import DS
from nomenklatura.cache import Cache
from nomenklatura.enrich.common import Enricher, EnricherConfig
from nomenklatura.enrich.common import ItemEnricher, EnricherConfig
from nomenklatura.enrich.common import EnrichmentException

log = logging.getLogger(__name__)


class YenteEnricher(Enricher[DS]):
class YenteEnricher(ItemEnricher[DS]):
"""Uses the `yente` match API to look up entities in a specific dataset."""

def __init__(self, dataset: DS, cache: Cache, config: EnricherConfig):
Expand Down
Loading

0 comments on commit 52fb824

Please sign in to comment.