Skip to content

Commit

Permalink
Hatchet fallback ingestion (#1853)
Browse files Browse the repository at this point in the history
  • Loading branch information
NolanTrem authored Jan 21, 2025
1 parent 197e8b8 commit df93b8c
Show file tree
Hide file tree
Showing 3 changed files with 147 additions and 114 deletions.
45 changes: 26 additions & 19 deletions py/core/main/api/v3/collections_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -1245,22 +1245,29 @@ async def extract(
settings_dict=settings, # type: ignore
)
if run_with_orchestration:
workflow_input = {
"collection_id": str(id),
"graph_creation_settings": server_graph_creation_settings.model_dump_json(),
"user": auth_user.json(),
}

return await self.providers.orchestration.run_workflow( # type: ignore
"extract-triples", {"request": workflow_input}, {}
)
else:
from core.main.orchestration import simple_kg_factory

logger.info("Running extract-triples without orchestration.")
simple_kg = simple_kg_factory(self.services.graph)
await simple_kg["extract-triples"](workflow_input) # type: ignore
return { # type: ignore
"message": "Graph created successfully.",
"task_id": None,
}
try:
workflow_input = {
"collection_id": str(id),
"graph_creation_settings": server_graph_creation_settings.model_dump_json(),
"user": auth_user.json(),
}

return await self.providers.orchestration.run_workflow( # type: ignore
"extract-triples", {"request": workflow_input}, {}
)
except (
Exception
) as e: # TODO: Need to find specific error (gRPC most likely?)
logger.error(
f"Error running orchestrated extraction: {e} \n\nAttempting to run without orchestration."
)

from core.main.orchestration import simple_kg_factory

logger.info("Running extract-triples without orchestration.")
simple_kg = simple_kg_factory(self.services.graph)
await simple_kg["extract-triples"](workflow_input) # type: ignore
return { # type: ignore
"message": "Graph created successfully.",
"task_id": None,
}
184 changes: 102 additions & 82 deletions py/core/main/api/v3/documents_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -432,39 +432,41 @@ async def create_document(
}

if run_with_orchestration:
# Run ingestion with orchestration
raw_message = (
await self.providers.orchestration.run_workflow(
"ingest-chunks",
{"request": workflow_input},
options={
"additional_metadata": {
"document_id": str(document_id),
}
},
try:
# Run ingestion with orchestration
raw_message = (
await self.providers.orchestration.run_workflow(
"ingest-chunks",
{"request": workflow_input},
options={
"additional_metadata": {
"document_id": str(document_id),
}
},
)
)
raw_message["document_id"] = str(document_id)
return raw_message # type: ignore
except (
Exception
) as e: # TODO: Need to find specific errors that we should be excepting (gRPC most likely?)
logger.error(
f"Error running orchestrated ingestion: {e} \n\nAttempting to run without orchestration."
)
)
raw_message["document_id"] = str(document_id)
return raw_message # type: ignore

else:
logger.info(
"Running chunk ingestion without orchestration."
)
from core.main.orchestration import (
simple_ingestion_factory,
)
logger.info("Running chunk ingestion without orchestration.")
from core.main.orchestration import simple_ingestion_factory

simple_ingestor = simple_ingestion_factory(
self.services.ingestion
)
await simple_ingestor["ingest-chunks"](workflow_input)
simple_ingestor = simple_ingestion_factory(
self.services.ingestion
)
await simple_ingestor["ingest-chunks"](workflow_input)

return { # type: ignore
"message": "Document created and ingested successfully.",
"document_id": str(document_id),
"task_id": None,
}
return { # type: ignore
"message": "Document created and ingested successfully.",
"document_id": str(document_id),
"task_id": None,
}

else:
if file:
Expand Down Expand Up @@ -545,35 +547,39 @@ async def create_document(
)

if run_with_orchestration:
# TODO - Modify create_chunks so that we can add chunks to existing document

raw_message: dict[str, str | None] = await self.providers.orchestration.run_workflow( # type: ignore
"ingest-files",
{"request": workflow_input},
options={
"additional_metadata": {
"document_id": str(document_id),
}
},
)
raw_message["document_id"] = str(document_id)
return raw_message # type: ignore
else:
logger.info(
f"Running ingestion without orchestration for file {file_name} and document_id {document_id}."
)
# TODO - Clean up implementation logic here to be more explicitly `synchronous`
from core.main.orchestration import simple_ingestion_factory
try:
# TODO - Modify create_chunks so that we can add chunks to existing document

raw_message: dict[str, str | None] = await self.providers.orchestration.run_workflow( # type: ignore
"ingest-files",
{"request": workflow_input},
options={
"additional_metadata": {
"document_id": str(document_id),
}
},
)
raw_message["document_id"] = str(document_id)
return raw_message # type: ignore
except (
Exception
) as e: # TODO: Need to find specific error (gRPC most likely?)
logger.error(
f"Error running orchestrated ingestion: {e} \n\nAttempting to run without orchestration."
)
logger.info(
f"Running ingestion without orchestration for file {file_name} and document_id {document_id}."
)
# TODO - Clean up implementation logic here to be more explicitly `synchronous`
from core.main.orchestration import simple_ingestion_factory

simple_ingestor = simple_ingestion_factory(
self.services.ingestion
)
await simple_ingestor["ingest-files"](workflow_input)
return { # type: ignore
"message": "Document created and ingested successfully.",
"document_id": str(document_id),
"task_id": None,
}
simple_ingestor = simple_ingestion_factory(self.services.ingestion)
await simple_ingestor["ingest-files"](workflow_input)
return { # type: ignore
"message": "Document created and ingested successfully.",
"document_id": str(document_id),
"task_id": None,
}

@self.router.post(
"/documents/export",
Expand Down Expand Up @@ -1612,25 +1618,32 @@ async def extract(
)

if run_with_orchestration:
workflow_input = {
"document_id": str(id),
"graph_creation_settings": server_graph_creation_settings.model_dump_json(),
"user": auth_user.json(),
}
try:
workflow_input = {
"document_id": str(id),
"graph_creation_settings": server_graph_creation_settings.model_dump_json(),
"user": auth_user.json(),
}

return await self.providers.orchestration.run_workflow( # type: ignore
"extract-triples", {"request": workflow_input}, {}
)
else:
from core.main.orchestration import simple_kg_factory
return await self.providers.orchestration.run_workflow( # type: ignore
"extract-triples", {"request": workflow_input}, {}
)
except (
Exception
) as e: # TODO: Need to find specific errors that we should be excepting (gRPC most likely?)
logger.error(
f"Error running orchestrated extraction: {e} \n\nAttempting to run without orchestration."
)

logger.info("Running extract-triples without orchestration.")
simple_kg = simple_kg_factory(self.services.graph)
await simple_kg["extract-triples"](workflow_input)
return { # type: ignore
"message": "Graph created successfully.",
"task_id": None,
}
from core.main.orchestration import simple_kg_factory

logger.info("Running extract-triples without orchestration.")
simple_kg = simple_kg_factory(self.services.graph)
await simple_kg["extract-triples"](workflow_input)
return { # type: ignore
"message": "Graph created successfully.",
"task_id": None,
}

@self.router.post(
"/documents/{id}/deduplicate",
Expand Down Expand Up @@ -1738,15 +1751,22 @@ async def deduplicate(
)

if run_with_orchestration:
workflow_input = {
"document_id": str(id),
}
try:
workflow_input = {
"document_id": str(id),
}

return await self.providers.orchestration.run_workflow( # type: ignore
"deduplicate-document-entities",
{"request": workflow_input},
{},
)
return await self.providers.orchestration.run_workflow( # type: ignore
"deduplicate-document-entities",
{"request": workflow_input},
{},
)
except (
Exception
) as e: # TODO: Need to find specific errors that we should be excepting (gRPC most likely?)
logger.error(
f"Error running orchestrated deduplication: {e} \n\nAttempting to run without orchestration."
)
else:
from core.main.orchestration import simple_kg_factory

Expand Down
32 changes: 19 additions & 13 deletions py/core/main/api/v3/graph_router.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,19 +312,25 @@ async def build_communities(
}

if run_with_orchestration:
return await self.providers.orchestration.run_workflow( # type: ignore
"build-communities", {"request": workflow_input}, {}
)
else:
from core.main.orchestration import simple_kg_factory

logger.info("Running build-communities without orchestration.")
simple_kg = simple_kg_factory(self.services.graph)
await simple_kg["build-communities"](workflow_input)
return {
"message": "Graph communities created successfully.",
"task_id": None,
}
try:
return await self.providers.orchestration.run_workflow( # type: ignore
"build-communities", {"request": workflow_input}, {}
)
except (
Exception
) as e: # TODO: Need to find specific error (gRPC most likely?)
logger.error(
f"Error running orchestrated community building: {e} \n\nAttempting to run without orchestration."
)
from core.main.orchestration import simple_kg_factory

logger.info("Running build-communities without orchestration.")
simple_kg = simple_kg_factory(self.services.graph)
await simple_kg["build-communities"](workflow_input)
return {
"message": "Graph communities created successfully.",
"task_id": None,
}

@self.router.post(
"/graphs/{collection_id}/reset",
Expand Down

0 comments on commit df93b8c

Please sign in to comment.