Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(core): add multimodal support #996

Merged
merged 44 commits into from
Nov 16, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
44 commits
Select commit Hold shift + click to select a range
aa59f2c
update fern
hassiebp Nov 7, 2024
e1536b3
add media upload
hassiebp Nov 12, 2024
266d334
add mediawrapper
hassiebp Nov 12, 2024
5b33316
add tests
hassiebp Nov 12, 2024
ea186e4
add decorator test
hassiebp Nov 12, 2024
71c717b
add langchain test
hassiebp Nov 12, 2024
d03e275
rename to LangfuseMedia
hassiebp Nov 12, 2024
5338ebb
fix test client
hassiebp Nov 12, 2024
a7dcb63
add upload time ms
hassiebp Nov 13, 2024
6be8f66
Merge branch 'main' into add-multimodal-support
hassiebp Nov 13, 2024
a810062
update post endpoint
hassiebp Nov 14, 2024
a590e15
keep openai format on audio
hassiebp Nov 14, 2024
5f24726
check for mediacontenttype
hassiebp Nov 14, 2024
634f7e4
add tests
hassiebp Nov 14, 2024
db93586
add retries
hassiebp Nov 14, 2024
f40972e
Merge branch 'main' into add-multimodal-support
hassiebp Nov 14, 2024
61b87c6
fix decorator serialization
hassiebp Nov 14, 2024
ca74a28
fix test
hassiebp Nov 14, 2024
9a817db
fix operand
hassiebp Nov 14, 2024
f05dbcb
pause upload consumer after ingestion
hassiebp Nov 14, 2024
8bf9b0d
move paramspec to typing extensions
hassiebp Nov 14, 2024
023658d
fix type access for python <3.10
hassiebp Nov 15, 2024
d55e56f
search for media 10 levels deep only
hassiebp Nov 15, 2024
4b269a0
handle ingestion consumer errors gracefully
hassiebp Nov 15, 2024
f11776e
revert decorator io media
hassiebp Nov 15, 2024
fcb71fd
make data uri parsing more robust
hassiebp Nov 15, 2024
aaef73c
use threads for media upload threads
hassiebp Nov 15, 2024
ca7bcc1
fix test
hassiebp Nov 15, 2024
e909c6c
fix test
hassiebp Nov 15, 2024
7f966ff
fix logging
hassiebp Nov 15, 2024
1a141e4
fix openai test
hassiebp Nov 15, 2024
b655790
address comments
hassiebp Nov 15, 2024
9706d63
fix ci script
hassiebp Nov 15, 2024
a93d5c8
push
hassiebp Nov 15, 2024
bc37741
remove queue type for 3.8 support
hassiebp Nov 15, 2024
281b50c
fix test
hassiebp Nov 15, 2024
c83e6c7
fix openai test
hassiebp Nov 15, 2024
1b2bbe1
fix langchain test
hassiebp Nov 15, 2024
22dc7a8
fix
hassiebp Nov 15, 2024
765850d
push
hassiebp Nov 15, 2024
359e995
push
hassiebp Nov 15, 2024
1f2df0b
fix
hassiebp Nov 15, 2024
10d8977
fix string stripping
hassiebp Nov 15, 2024
d19f4f1
increase ci timeout
hassiebp Nov 16, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

245 changes: 245 additions & 0 deletions langfuse/_task_manager/media_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,245 @@
import base64
import hashlib
import logging
from queue import Empty
from typing import Literal

import requests

from langfuse.api import GetMediaUploadUrlRequest, PatchMediaBody
from langfuse.api.client import FernLangfuse
from langfuse.utils import _get_timestamp

from .media_upload_queue import MediaUploadQueue, UploadMediaJob


class MediaManager:
_log = logging.getLogger(__name__)

def __init__(
self, *, api_client: FernLangfuse, media_upload_queue: MediaUploadQueue
):
self._api_client = api_client
self._queue = media_upload_queue

def process_next_media_upload(self):
try:
data = self._queue.get(block=True, timeout=1)
self._process_upload_media_job(data=data)

self._queue.task_done()
except Empty:
pass
except Exception as e:
self._log.error(f"Error uploading media: {e}")
self._queue.task_done()

def process_multimodal_event_in_place(self, event: dict):
try:
if "body" not in event:
return

body = event["body"]
multimodal_fields = ["input", "output"]

for field in multimodal_fields:
if field in body:
field_data = body[field]

if field == "output":
self._process_multimodal_message(
event=event, body=body, field=field, message=field_data
)
hassiebp marked this conversation as resolved.
Show resolved Hide resolved

if isinstance(field_data, list):
for message in field_data:
self._process_multimodal_message(
event=event, body=body, field=field, message=message
)

except Exception as e:
self._log.error(f"Error processing multimodal event: {e}")

def _process_multimodal_message(
self, *, event: dict, body: dict, field: str, message: dict
):
if isinstance(message, dict) and message.get("content", None) is not None:
content = message["content"]

for content_part in content:
if isinstance(content_part, dict):
hassiebp marked this conversation as resolved.
Show resolved Hide resolved
if content_part.get("image_url", None) is not None:
base64_data_uri = content_part["image_url"]["url"]
if base64_data_uri.startswith("data:"):
media_reference_string = self._enqueue_media_upload(
event=event,
body=body,
field=field,
base64_data_uri=base64_data_uri,
)

if media_reference_string:
content_part["image_url"]["url"] = (
media_reference_string
)

if content_part.get("input_audio", None) is not None:
base64_data_uri = (
f"data:audio/{content_part['input_audio']['format']};base64,"
+ content_part["input_audio"]["data"]
)

media_reference_string = self._enqueue_media_upload(
event=event,
body=body,
field=field,
base64_data_uri=base64_data_uri,
)

if media_reference_string:
content_part["input_audio"]["data"] = media_reference_string

if content_part.get("output_audio", None) is not None:
base64_data_uri = (
f"data:audio/{content_part['output_audio']['format']};base64,"
+ content_part["output_audio"]["data"]
)

media_reference_string = self._enqueue_media_upload(
event=event,
body=body,
field=field,
base64_data_uri=base64_data_uri,
)

if media_reference_string:
content_part["output_audio"]["data"] = (
media_reference_string
)

def _enqueue_media_upload(
self, *, event: dict, body: dict, field: str, base64_data_uri: str
):
parsed_content = self._parse_base64_data_uri(base64_data_uri)
trace_id = body.get("traceId", None) or (
body.get("id", None)
if "type" in event and "trace" in event["type"]
else None
)

if trace_id is None:
raise ValueError("trace_id is required for media upload")

observation_id = (
body.get("id", None)
if "type" in event
and ("generation" in event["type"] or "span" in event["type"])
else None
)

if parsed_content:
content_length = parsed_content["content_length"]
content_type = parsed_content["content_type"]
content_sha256_hash = parsed_content["content_sha256_hash"]
content_bytes = parsed_content["content_bytes"]

upload_url_response = self._api_client.media.get_upload_url(
request=GetMediaUploadUrlRequest(
field=field,
contentLength=content_length,
contentType=content_type,
sha256Hash=content_sha256_hash,
traceId=trace_id,
observationId=observation_id,
)
)

upload_url = upload_url_response.upload_url
media_id = upload_url_response.media_id

if upload_url is not None:
self._queue.put(
item={
"content_bytes": content_bytes,
"content_type": content_type,
"content_sha256_hash": content_sha256_hash,
"upload_url": upload_url,
"media_id": media_id,
},
block=True,
)
hassiebp marked this conversation as resolved.
Show resolved Hide resolved

return self._format_media_reference_string(
content_type=content_type,
media_id=media_id,
source="base64",
)

def _process_upload_media_job(
self,
*,
data: UploadMediaJob,
):
upload_response = requests.put(
data["upload_url"],
headers={
"Content-Type": data["content_type"],
"x-amz-checksum-sha256": data["content_sha256_hash"],
},
data=data["content_bytes"],
)
hassiebp marked this conversation as resolved.
Show resolved Hide resolved

self._api_client.media.patch(
media_id=data["media_id"],
request=PatchMediaBody(
uploadedAt=_get_timestamp(),
uploadHttpStatus=upload_response.status_code,
uploadHttpError=upload_response.text,
),
)

def _format_media_reference_string(
self, *, content_type: str, media_id: str, source: Literal["base64"]
) -> str:
return f"@@@langfuseMedia:type={content_type}|id={media_id}|source={source}@@@"

def _parse_base64_data_uri(self, data: str):
if not data or not isinstance(data, str):
return None

if not data.startswith("data:"):
return None

try:
# Split the data into metadata and actual data
header, _, actual_data = data[5:].partition(",")
if not header or not actual_data:
return None

# Determine if the data is base64 encoded
is_base64 = header.endswith(";base64")
if not is_base64:
return None

content_type = header[:-7]
if not content_type:
return None

try:
content_bytes = base64.b64decode(actual_data)
except Exception:
return None

content_length = len(content_bytes)

sha256_hash_bytes = hashlib.sha256(content_bytes).digest()
sha256_hash_base64 = base64.b64encode(sha256_hash_bytes).decode("utf-8")

return {
"content_type": content_type,
"content_bytes": content_bytes,
"content_length": content_length,
"content_sha256_hash": sha256_hash_base64,
}
except Exception:
return None
41 changes: 41 additions & 0 deletions langfuse/_task_manager/media_upload_consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import logging
import threading

from .media_manager import MediaManager


class MediaUploadConsumer(threading.Thread):
_log = logging.getLogger(__name__)
_identifier: int
_max_retries: int
_media_manager: MediaManager
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: max_retries is defined as a class attribute but never used in the implementation


def __init__(
self,
*,
identifier: int,
max_retries: int,
media_manager: MediaManager,
):
"""Create a consumer thread."""
super().__init__()
# Make consumer a daemon thread so that it doesn't block program exit
self.daemon = True
# It's important to set running in the constructor: if we are asked to
# pause immediately after construction, we might set running to True in
# run() *after* we set it to False in pause... and keep running
# forever.
self.running = True
self._identifier = identifier
self._max_retries = max_retries
self._media_manager = media_manager

def run(self):
"""Run the media upload consumer."""
self._log.debug("consumer is running...")
while self.running:
self._media_manager.process_next_media_upload()
Comment on lines +34 to +35
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

logic: no error handling around process_next_media_upload() - unhandled exceptions will crash the thread


def pause(self):
"""Pause the media upload consumer."""
self.running = False
14 changes: 14 additions & 0 deletions langfuse/_task_manager/media_upload_queue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from queue import Queue
from typing import TypedDict


class UploadMediaJob(TypedDict):
upload_url: str
media_id: str
content_type: str
content_bytes: bytes
content_sha256_hash: str


class MediaUploadQueue(Queue[UploadMediaJob]):
pass
Loading
Loading