Skip to content

Commit

Permalink
feat(instrumentation): implement event-based tracking for AlephAlpha (t…
Browse files Browse the repository at this point in the history
…raceloop#2456)

- Add event-based tracking support while maintaining legacy behavior
- Implement use_legacy_attributes config parameter
- Add event logger integration with proper trace context
- Add streaming support with event buffering
- Include comprehensive test coverage
- Add sample app integration

Part of the larger effort to support event-based tracking across all packages.

Signed-off-by: David Anyatonwu <[email protected]>
  • Loading branch information
onyedikachi-david committed Jan 3, 2025
1 parent fc46e7d commit 95228e3
Show file tree
Hide file tree
Showing 12 changed files with 1,194 additions and 134 deletions.
696 changes: 696 additions & 0 deletions IMPLEMENTATION_GUIDE.md

Large diffs are not rendered by default.

26 changes: 26 additions & 0 deletions opentelemetry/instrumentation/ai_providers/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
from opentelemetry._events import Event, EventLogger
from opentelemetry.semconv._incubating.attributes import gen_ai_attributes as GenAIAttributes

def create_prompt_event(prompt: dict, system: str, capture_content: bool = True) -> Event:
"""Creates a standardized prompt event"""
attributes = {
GenAIAttributes.GEN_AI_SYSTEM: system
}

body = {
"role": prompt.get("role", "user"),
}

if capture_content and "content" in prompt:
body["content"] = prompt["content"]

return Event(
name="gen_ai.prompt",
attributes=attributes,
body=body
)

def create_completion_event(completion: dict, system: str, capture_content: bool = True) -> Event:
"""Creates a standardized completion event"""
# Similar implementation for completions
pass
8 changes: 8 additions & 0 deletions opentelemetry/instrumentation/anthropic/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
class AnthropicInstrumentor(BaseInstrumentor):
def __init__(self):
super().__init__()
self.use_legacy_attributes = True

def _instrument(self, **kwargs):
self.use_legacy_attributes = kwargs.get('use_legacy_attributes', True)
# ... rest of instrumentation
22 changes: 22 additions & 0 deletions opentelemetry/instrumentation/anthropic/wrapper.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from opentelemetry.instrumentation.ai_providers.utils import create_prompt_event


def wrap_completion(tracer, event_logger, capture_content):
def wrapper(wrapped, instance, args, kwargs):
with tracer.start_as_current_span(...) as span:
# Emit prompt event if not using legacy
if not instance.use_legacy_attributes:
event_logger.emit(
create_prompt_event(
kwargs.get("prompt"),
system="anthropic",
capture_content=capture_content
)
)

# Existing attribute-based logic if using legacy
if instance.use_legacy_attributes:
span.set_attribute("ai.prompt", kwargs.get("prompt"))

# ... rest of the wrapper
return wrapper
2 changes: 1 addition & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,20 @@
import os
from typing import Collection
from opentelemetry.instrumentation.alephalpha.config import Config
from opentelemetry.instrumentation.alephalpha.utils import dont_throw
from opentelemetry.instrumentation.alephalpha.utils import (
dont_throw,
get_llm_request_attributes,
message_to_event,
completion_to_event,
set_span_attribute,
handle_span_exception,
CompletionBuffer,
)
from wrapt import wrap_function_wrapper

from opentelemetry import context as context_api
from opentelemetry.trace import get_tracer, SpanKind
from opentelemetry.trace.status import Status, StatusCode
from opentelemetry.trace import get_tracer, SpanKind, Status, StatusCode
from opentelemetry._events import EventLogger

from opentelemetry.instrumentation.instrumentor import BaseInstrumentor
from opentelemetry.instrumentation.utils import (
Expand All @@ -19,7 +27,7 @@

from opentelemetry.semconv_ai import (
SUPPRESS_LANGUAGE_MODEL_INSTRUMENTATION_KEY,
SpanAttributes,
GenAIAttributes,
LLMRequestTypeValues,
)
from opentelemetry.instrumentation.alephalpha.version import __version__
Expand All @@ -35,138 +43,151 @@
},
]


def should_send_prompts():
return (
os.getenv("TRACELOOP_TRACE_CONTENT") or "true"
).lower() == "true" or context_api.get_value("override_enable_content_tracing")


def _set_span_attribute(span, name, value):
if value is not None:
if value != "":
span.set_attribute(name, value)
return


@dont_throw
def _set_input_attributes(span, llm_request_type, args, kwargs):
_set_span_attribute(span, SpanAttributes.LLM_REQUEST_MODEL, kwargs.get("model"))

if should_send_prompts():
if llm_request_type == LLMRequestTypeValues.COMPLETION:
_set_span_attribute(span, f"{SpanAttributes.LLM_PROMPTS}.0.role", "user")
_set_span_attribute(
span,
f"{SpanAttributes.LLM_PROMPTS}.0.content",
args[0].prompt.items[0].text,
)


@dont_throw
def _set_response_attributes(span, llm_request_type, response):
if should_send_prompts():
if llm_request_type == LLMRequestTypeValues.COMPLETION:
_set_span_attribute(
span,
f"{SpanAttributes.LLM_COMPLETIONS}.0.content",
response.completions[0].completion,
)
_set_span_attribute(
span, f"{SpanAttributes.LLM_COMPLETIONS}.0.role", "assistant"
)

input_tokens = getattr(response, "num_tokens_prompt_total", 0)
output_tokens = getattr(response, "num_tokens_generated", 0)

_set_span_attribute(
span,
SpanAttributes.LLM_USAGE_TOTAL_TOKENS,
input_tokens + output_tokens,
)
_set_span_attribute(
span,
SpanAttributes.LLM_USAGE_COMPLETION_TOKENS,
output_tokens,
)
_set_span_attribute(
span,
SpanAttributes.LLM_USAGE_PROMPT_TOKENS,
input_tokens,
)


def _with_tracer_wrapper(func):
"""Helper for providing tracer for wrapper functions."""

def _with_tracer(tracer, to_wrap):
def wrapper(wrapped, instance, args, kwargs):
return func(tracer, to_wrap, wrapped, instance, args, kwargs)

return wrapper

return _with_tracer


def _llm_request_type_by_method(method_name):
if method_name == "complete":
return LLMRequestTypeValues.COMPLETION
else:
return LLMRequestTypeValues.UNKNOWN


@_with_tracer_wrapper
def _wrap(tracer, to_wrap, wrapped, instance, args, kwargs):
class StreamWrapper:
"""Wrapper for streaming responses."""
def __init__(self, stream, span, event_logger: EventLogger, capture_content: bool):
self.stream = stream
self.span = span
self.event_logger = event_logger
self.capture_content = capture_content
self.completion_buffer = CompletionBuffer(0)
self._span_started = False
self.setup()

def setup(self):
if not self._span_started:
self._span_started = True

def cleanup(self):
if self._span_started:
if self.completion_buffer.text_content:
# Emit completion event with buffered content
self.event_logger.emit(
completion_to_event(
self.completion_buffer.get_content(),
self.capture_content
)
)
self.span.end()
self._span_started = False

def __iter__(self):
return self

def __next__(self):
try:
chunk = next(self.stream)
if chunk.completions:
self.completion_buffer.append_content(chunk.completions[0].completion)
return chunk
except StopIteration:
self.cleanup()
raise
except Exception as error:
handle_span_exception(self.span, error)
raise

def _wrap(tracer, event_logger: EventLogger, capture_content: bool):
"""Instruments and calls every function defined in TO_WRAP."""
if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY) or context_api.get_value(
SUPPRESS_LANGUAGE_MODEL_INSTRUMENTATION_KEY
):
return wrapped(*args, **kwargs)

name = to_wrap.get("span_name")
llm_request_type = _llm_request_type_by_method(to_wrap.get("method"))
span = tracer.start_span(
name,
kind=SpanKind.CLIENT,
attributes={
SpanAttributes.LLM_SYSTEM: "AlephAlpha",
SpanAttributes.LLM_REQUEST_TYPE: llm_request_type.value,
},
)
if span.is_recording():
_set_input_attributes(span, llm_request_type, args, kwargs)

response = wrapped(*args, **kwargs)

if response:
if span.is_recording():

_set_response_attributes(span, llm_request_type, response)
span.set_status(Status(StatusCode.OK))

span.end()
return response

def wrapper(wrapped, instance, args, kwargs):
if context_api.get_value(_SUPPRESS_INSTRUMENTATION_KEY) or context_api.get_value(
SUPPRESS_LANGUAGE_MODEL_INSTRUMENTATION_KEY
):
return wrapped(*args, **kwargs)

span_attributes = get_llm_request_attributes(kwargs)
with tracer.start_as_current_span(
name="alephalpha.completion",
kind=SpanKind.CLIENT,
attributes=span_attributes,
end_on_exit=False,
) as span:
try:
if span.is_recording():
# Emit prompt event
if should_send_prompts():
prompt_text = args[0].prompt.items[0].text
event_logger.emit(
message_to_event(prompt_text, capture_content)
)

result = wrapped(*args, **kwargs)

# Handle streaming responses
if kwargs.get("stream", False):
return StreamWrapper(result, span, event_logger, capture_content)

if span.is_recording() and should_send_prompts():
# Emit completion event
completion_text = result.completions[0].completion
event_logger.emit(
completion_to_event(completion_text, capture_content)
)

# Set usage attributes
input_tokens = getattr(result, "num_tokens_prompt_total", 0)
output_tokens = getattr(result, "num_tokens_generated", 0)
set_span_attribute(
span,
GenAIAttributes.GEN_AI_USAGE_TOTAL_TOKENS,
input_tokens + output_tokens,
)
set_span_attribute(
span,
GenAIAttributes.GEN_AI_USAGE_COMPLETION_TOKENS,
output_tokens,
)
set_span_attribute(
span,
GenAIAttributes.GEN_AI_USAGE_PROMPT_TOKENS,
input_tokens,
)

span.set_status(Status(StatusCode.OK))
span.end()
return result

except Exception as error:
handle_span_exception(span, error)
raise

return wrapper

class AlephAlphaInstrumentor(BaseInstrumentor):
"""An instrumentor for Aleph Alpha's client library."""

def __init__(self, exception_logger=None):
super().__init__()
Config.exception_logger = exception_logger
Config.use_legacy_attributes = True

def instrumentation_dependencies(self) -> Collection[str]:
return _instruments

def _instrument(self, **kwargs):
tracer_provider = kwargs.get("tracer_provider")
Config.use_legacy_attributes = kwargs.get("use_legacy_attributes", True)

tracer = get_tracer(__name__, __version__, tracer_provider)
event_logger = EventLogger(
__name__,
__version__,
tracer_provider=tracer_provider,
)

capture_content = kwargs.get("capture_content", True)

for wrapped_method in WRAPPED_METHODS:
wrap_method = wrapped_method.get("method")
wrap_function_wrapper(
"aleph_alpha_client",
f"Client.{wrap_method}",
_wrap(tracer, wrapped_method),
_wrap(tracer, event_logger, capture_content),
)

def _uninstrument(self, **kwargs):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,5 @@
class Config:
"""Config for AlephAlpha instrumentation"""

exception_logger = None
use_legacy_attributes = True # Controls whether to use legacy attribute-based approach or new event-based approach
Loading

0 comments on commit 95228e3

Please sign in to comment.