From 22f6fd3afdc89eb8511ab7ec6cc8672b7e0148c1 Mon Sep 17 00:00:00 2001 From: Andrei Bratu Date: Mon, 6 Jan 2025 16:24:15 +0000 Subject: [PATCH 1/3] Fix flow complete bug --- package.json | 60 ++++++++++++++++++++--------------------- src/otel/constants.ts | 1 + src/otel/exporter.ts | 31 +++++++++++++++++++++ src/otel/helpers.ts | 13 ++------- src/otel/processor.ts | 42 ++++++++++++++++++++++++++--- src/utilities/flow.ts | 53 +++++++----------------------------- src/utilities/prompt.ts | 60 +++++++++-------------------------------- src/utilities/tool.ts | 60 +++++++++-------------------------------- 8 files changed, 137 insertions(+), 183 deletions(-) diff --git a/package.json b/package.json index 4212705..a149b57 100644 --- a/package.json +++ b/package.json @@ -12,56 +12,56 @@ "test": "jest" }, "dependencies": { - "url-join": "4.0.1", - "form-data": "^4.0.0", - "formdata-node": "^6.0.3", - "node-fetch": "2.7.0", - "qs": "6.11.2", - "readable-stream": "^4.5.2", - "form-data-encoder": "^4.0.2", "@opentelemetry/api": "1.9.0", "@opentelemetry/auto-instrumentations-node": "0.53.0", "@opentelemetry/sdk-metrics": "1.28.0", "@opentelemetry/sdk-node": "0.55.0", "@opentelemetry/sdk-trace-node": "1.28.0", + "@traceloop/ai-semantic-conventions": "0.11.6", "@traceloop/instrumentation-anthropic": "0.11.1", "@traceloop/instrumentation-cohere": "0.11.1", "@traceloop/instrumentation-openai": "0.11.3", - "@traceloop/ai-semantic-conventions": "0.11.6", - "uuid": "11.0.3", - "nanoid": "5.0.9", "cli-progress": "^3.12.0", + "form-data": "^4.0.0", + "form-data-encoder": "^4.0.2", + "formdata-node": "^6.0.3", "lodash": "4.17.21", + "nanoid": "5.0.9", + "node-fetch": "2.7.0", "p-map": "7.0.3", - "stable-hash": "0.0.4" + "qs": "6.11.2", + "readable-stream": "^4.5.2", + "stable-hash": "0.0.4", + "url-join": "4.0.1", + "uuid": "11.0.3" }, "devDependencies": { - "@types/url-join": "4.0.1", - "@types/qs": "6.9.8", + "@anthropic-ai/sdk": "^0.32.1", + "@babel/core": "^7.26.0", + "@babel/plugin-transform-modules-commonjs": "^7.26.3", + "@babel/preset-env": "^7.26.0", + "@trivago/prettier-plugin-sort-imports": "4.3.0", + "@types/cli-progress": "^3.11.6", + "@types/jest": "29.5.5", + "@types/lodash": "4.14.74", + "@types/node": "17.0.33", "@types/node-fetch": "2.6.9", + "@types/qs": "6.9.8", "@types/readable-stream": "^4.0.15", + "@types/url-join": "4.0.1", + "babel-jest": "^29.7.0", + "cohere-ai": "^7.15.0", + "dotenv": "^16.4.6", "fetch-mock-jest": "^1.5.1", - "webpack": "^5.94.0", - "ts-loader": "^9.3.1", "jest": "29.7.0", - "@types/jest": "29.5.5", - "ts-jest": "29.1.1", "jest-environment-jsdom": "29.7.0", - "@types/node": "17.0.33", + "jsonschema": "^1.4.1", + "openai": "^4.74.0", "prettier": "^3.4.2", + "ts-jest": "29.1.1", + "ts-loader": "^9.3.1", "typescript": "4.6.4", - "openai": "^4.74.0", - "@anthropic-ai/sdk": "^0.32.1", - "cohere-ai": "^7.15.0", - "dotenv": "^16.4.6", - "jsonschema": "^1.4.1", - "@types/cli-progress": "^3.11.6", - "babel-jest": "^29.7.0", - "@babel/core": "^7.26.0", - "@babel/plugin-transform-modules-commonjs": "^7.26.3", - "@babel/preset-env": "^7.26.0", - "@types/lodash": "4.14.74", - "@trivago/prettier-plugin-sort-imports": "4.3.0" + "webpack": "^5.94.0" }, "browser": { "fs": false, diff --git a/src/otel/constants.ts b/src/otel/constants.ts index 519cbf4..d7f0935 100644 --- a/src/otel/constants.ts +++ b/src/otel/constants.ts @@ -7,3 +7,4 @@ export const HUMANLOOP_PATH_KEY = "humanloop.file.path"; export const HUMANLOOP_META_FUNCTION_NAME = "humanloop.meta.function_name"; export const HUMANLOOP_PARENT_SPAN_CTX_KEY = "humanloop.context.parentSpanId"; export const HUMANLOOP_TRACE_FLOW_CTX_KEY = "humanloop.context.traceFlow"; +export const HUMANLOOP_FLOW_PREREQUISITES_KEY = "humanloop.flow.prerequisites"; diff --git a/src/otel/exporter.ts b/src/otel/exporter.ts index 9a22759..906ef19 100644 --- a/src/otel/exporter.ts +++ b/src/otel/exporter.ts @@ -6,6 +6,7 @@ import { HumanloopClient } from "../humanloop.client"; import { HUMANLOOP_FILE_KEY, HUMANLOOP_FILE_TYPE_KEY, + HUMANLOOP_FLOW_PREREQUISITES_KEY, HUMANLOOP_LOG_KEY, HUMANLOOP_PATH_KEY, } from "./constants"; @@ -30,6 +31,7 @@ export class HumanloopSpanExporter implements SpanExporter { private shutdownFlag: boolean; private readonly uploadPromises: Promise[]; private readonly exportedSpans: ReadableSpan[]; + private readonly prerequisites: Map>; constructor(client: HumanloopClient) { this.client = client; @@ -37,6 +39,7 @@ export class HumanloopSpanExporter implements SpanExporter { this.shutdownFlag = false; this.uploadPromises = []; this.exportedSpans = []; + this.prerequisites = new Map(); } export(spans: ReadableSpan[]): ExportResult { @@ -69,6 +72,19 @@ export class HumanloopSpanExporter implements SpanExporter { await this.shutdown(); } + private completeFlowLog(spanId: string) { + for (const [flowLogSpanId, flowChildrenSpanIds] of this.prerequisites) { + if (flowChildrenSpanIds.has(spanId)) { + flowChildrenSpanIds.delete(spanId); + if (flowChildrenSpanIds.size === 0) { + const flowLogId = this.spanIdToUploadedLogId.get(flowLogSpanId)!; + this.client.flows.updateLog(flowLogId, { traceStatus: "complete" }); + } + break; + } + } + } + private async exportSpanDispatch(span: ReadableSpan): Promise { const fileType = span.attributes[HUMANLOOP_FILE_TYPE_KEY]; const parentSpanId = span.parentSpanId; @@ -130,6 +146,7 @@ export class HumanloopSpanExporter implements SpanExporter { } catch (error) { console.error(`Error exporting prompt: ${error}`); } + this.completeFlowLog(span.spanContext().spanId); } private async exportTool(span: ReadableSpan): Promise { @@ -158,6 +175,7 @@ export class HumanloopSpanExporter implements SpanExporter { } catch (error) { console.error(`Error exporting tool: ${error}`); } + this.completeFlowLog(span.spanContext().spanId); } private async exportFlow(span: ReadableSpan): Promise { @@ -168,6 +186,18 @@ export class HumanloopSpanExporter implements SpanExporter { logObject.startTime = hrTimeToDate(span.startTime); logObject.endTime = hrTimeToDate(span.endTime); logObject.createdAt = hrTimeToDate(span.endTime); + // Spans that must be uploaded before the Flow Span is completed + let prerequisites: string[] | undefined = undefined; + try { + prerequisites = readFromOpenTelemetrySpan( + span, + HUMANLOOP_FLOW_PREREQUISITES_KEY, + ) as unknown as string[]; + } catch (error) { + prerequisites = []; + } + + this.prerequisites.set(span.spanContext().spanId, new Set(prerequisites)); const spanParentId = span.parentSpanId; const traceParentId = spanParentId @@ -188,5 +218,6 @@ export class HumanloopSpanExporter implements SpanExporter { } catch (error) { console.error("Error exporting flow: ", error, span.spanContext().spanId); } + this.completeFlowLog(span.spanContext().spanId); } } diff --git a/src/otel/helpers.ts b/src/otel/helpers.ts index 278aad2..14cd752 100644 --- a/src/otel/helpers.ts +++ b/src/otel/helpers.ts @@ -34,7 +34,7 @@ function _listToOtelFormat(lst: NestedList): NestedDict { */ export function writeToOpenTelemetrySpan( span: ReadableSpan, - value: NestedDict | NestedList | AttributeValue, + value: NestedDict | NestedList | AttributeValue | any[], key: string, ): void { let toWriteCopy: NestedDict; @@ -191,16 +191,7 @@ export function isLLMProviderCall(span: ReadableSpan): boolean { * @returns True if the span was created by the Humanloop SDK, false otherwise */ export function isHumanloopSpan(span: ReadableSpan): boolean { - return span.attributes[HUMANLOOP_FILE_TYPE_KEY] !== undefined; -} - -/** - * Generates a unique span ID. - * - * @returns A UUID string - */ -export function generateSpanId(): string { - return uuidv4(); + return span.name.startsWith("humanloop."); } /** diff --git a/src/otel/processor.ts b/src/otel/processor.ts index 978e873..1a2e9d4 100644 --- a/src/otel/processor.ts +++ b/src/otel/processor.ts @@ -36,15 +36,37 @@ interface CompletableSpan { export class HumanloopSpanProcessor implements SpanProcessor { private spanExporter: SpanExporter; private children: Map; + // List of all span IDs that are contained in a Flow trace + // They are passed to the Exporter as a span attribute + // so the Exporter knows when to complete a trace + private prerequisites: Map; constructor(exporter: SpanExporter) { this.spanExporter = exporter; this.children = new Map(); + this.prerequisites = new Map(); } async forceFlush(): Promise {} onStart(span: Span, _: Context): void { + const spanId = span.spanContext().spanId; + const parentSpanId = span.parentSpanId; + if (span.name === "humanloop.flow") { + this.prerequisites.set(spanId, []); + } + if (parentSpanId !== undefined && isHumanloopSpan(span)) { + for (const [traceHead, allTraceNodes] of this.prerequisites) { + if ( + parentSpanId === traceHead || + allTraceNodes.includes(parentSpanId) + ) { + allTraceNodes.push(spanId); + this.prerequisites.set(traceHead, allTraceNodes); + break; + } + } + } // Handle stream case: when Prompt instrumented function calls a provider with streaming: true // The instrumentor span will end only when the ChunksResponse is consumed, which can happen // after the span created by the Prompt utility finishes. To handle this, we register all instrumentor @@ -66,6 +88,7 @@ export class HumanloopSpanProcessor implements SpanProcessor { */ onEnd(span: ReadableSpan): void { if (isHumanloopSpan(span)) { + // Wait for children to complete asynchronously new Promise((resolve) => { const checkChildrenSpans = () => { const childrenSpans = this.children.get(span.spanContext().spanId); @@ -79,15 +102,28 @@ export class HumanloopSpanProcessor implements SpanProcessor { }; checkChildrenSpans(); }).then((_) => { - // All children/ instrumentor spans have arrived, we can process the + // All instrumentor spans have arrived, we can process the // Humanloop parent span owning them + if (span.name === "humanloop.flow") { + // If the span if a Flow Log, add attribute with all span IDs it + // needs to wait before completion + writeToOpenTelemetrySpan( + span, + this.prerequisites.get(span.spanContext().spanId) || [], + HUMANLOOP_LOG_KEY, + ); + this.prerequisites.delete(span.spanContext().spanId); + } + this.processSpanDispatch( span, this.children.get(span.spanContext().spanId) || [], ); + // Release references this.children.delete(span.spanContext().spanId); - // Export the Humanloop span + + // Pass Humanloop span to Exporter this.spanExporter.export([span], (result: ExportResult) => { if (result.code !== ExportResultCode.SUCCESS) { console.error("Failed to export span:", result.error); @@ -182,7 +218,7 @@ export class HumanloopSpanProcessor implements SpanProcessor { // Placeholder for processing other file types break; default: - console.error("Unknown Humanloop File Span", span); + console.error("Unknown Humanloop File span", span); } } diff --git a/src/utilities/flow.ts b/src/utilities/flow.ts index 408b3ef..d083907 100644 --- a/src/utilities/flow.ts +++ b/src/utilities/flow.ts @@ -10,7 +10,6 @@ import { HUMANLOOP_PATH_KEY, HUMANLOOP_TRACE_FLOW_CTX_KEY, NestedDict, - generateSpanId, jsonifyIfNotString, writeToOpenTelemetrySpan, } from "../otel"; @@ -39,30 +38,9 @@ export function flowUtilityFactory( const parentSpanContextKey = createContextKey(HUMANLOOP_PARENT_SPAN_CTX_KEY); const flowMetadataKey = createContextKey(HUMANLOOP_TRACE_FLOW_CTX_KEY); // @ts-ignore - return opentelemetryTracer.startActiveSpan(generateSpanId(), async (span) => { + return opentelemetryTracer.startActiveSpan("humanloop.flow", async (span) => { const ctx = context.active(); const spanId = span.spanContext().spanId; - const parentSpanId = ctx.getValue(parentSpanContextKey) as - | string - | undefined; - const parentFlowMetadata = ctx.getValue(flowMetadataKey) as { - traceId: string; - isFlowLog: boolean; - traceParentId: string; - } | null; - // Handle trace flow context - const flowMetadata = - parentSpanId && parentFlowMetadata - ? { - traceId: spanId, - isFlowLog: true, - traceParentId: parentSpanId, - } - : { - traceId: spanId, - traceParentId: null, - isFlowLog: true, - }; // Add span attributes span = span.setAttribute(HUMANLOOP_PATH_KEY, path || func.name); @@ -77,26 +55,15 @@ export function flowUtilityFactory( ); } - const { output, error } = await context.with( - ctx - .setValue(parentSpanContextKey, spanId) - .setValue(flowMetadataKey, flowMetadata), - async () => { - let output: O | null; - let error: string | null = null; - try { - output = await func(inputs, messages); - } catch (err: any) { - console.error(`Error calling ${func.name}:`, err); - output = null; - error = err.message || String(err); - } - return { - output, - error, - }; - }, - ); + let output: O | null; + let error: string | null = null; + try { + output = await func(inputs, messages); + } catch (err: any) { + console.error(`Error calling ${func.name}:`, err); + output = null; + error = err.message || String(err); + } const outputStringified = jsonifyIfNotString(func, output); diff --git a/src/utilities/prompt.ts b/src/utilities/prompt.ts index ff3ef2a..a0aee37 100644 --- a/src/utilities/prompt.ts +++ b/src/utilities/prompt.ts @@ -11,7 +11,6 @@ import { HUMANLOOP_PATH_KEY, HUMANLOOP_TRACE_FLOW_CTX_KEY, NestedDict, - generateSpanId, jsonifyIfNotString, writeToOpenTelemetrySpan, } from "../otel"; @@ -61,28 +60,9 @@ export function promptUtilityFactory( const flowMetadataKey = createContextKey(HUMANLOOP_TRACE_FLOW_CTX_KEY); // @ts-ignore - return opentelemetryTracer.startActiveSpan(generateSpanId(), async (span) => { + return opentelemetryTracer.startActiveSpan("humanloop.prompt", async (span) => { const ctx = context.active(); const spanId = span.spanContext().spanId; - const parentSpanId = ctx.getValue(parentSpanContextKey) as - | string - | undefined; - const parentFlowMetadata = ctx.getValue(flowMetadataKey) as { - traceId: string; - isFlowLog: boolean; - traceParentId: string; - } | null; - - // Handle trace flow context - - const flowMetadata = - parentSpanId && parentFlowMetadata - ? { - traceId: parentFlowMetadata.traceId, - isFlowLog: false, - traceParentId: parentSpanId, - } - : null; // Add span attributes span = span.setAttribute(HUMANLOOP_PATH_KEY, path || func.name); @@ -100,34 +80,19 @@ export function promptUtilityFactory( } // Execute the wrapped function in a child context - const { output, error } = await context.with( - ctx - .setValue(parentSpanContextKey, spanId) - .setValue(flowMetadataKey, flowMetadata), - async () => { - let output: O | null; - let error: string | null = null; - - try { - output = await func(inputs, messages); - } catch (err: any) { - console.error(`Error calling ${func.name}:`, err); - // @ts-ignore - output = null; - error = err.message || String(err); - } - - return { - output, - error, - }; - }, - ); - - const outputStringified = jsonifyIfNotString(func, output); + let output: O | null; + let error: string | null = null; + try { + output = await func(inputs, messages); + } catch (err: any) { + console.error(`Error calling ${func.name}:`, err); + // @ts-ignore + output = null; + error = err.message || String(err); + } const promptLog = { - output: outputStringified, + output: jsonifyIfNotString(func, output), error, inputs: inputs, messages: messages, @@ -141,7 +106,6 @@ export function promptUtilityFactory( ); span.end(); - return output; }); }; diff --git a/src/utilities/tool.ts b/src/utilities/tool.ts index c71ca7a..accf314 100644 --- a/src/utilities/tool.ts +++ b/src/utilities/tool.ts @@ -2,12 +2,7 @@ import { context, createContextKey } from "@opentelemetry/api"; import { ReadableSpan, Tracer } from "@opentelemetry/sdk-trace-node"; import { ToolKernelRequest } from "../api/types/ToolKernelRequest"; -import { - NestedDict, - generateSpanId, - jsonifyIfNotString, - writeToOpenTelemetrySpan, -} from "../otel"; +import { NestedDict, jsonifyIfNotString, writeToOpenTelemetrySpan } from "../otel"; import { HUMANLOOP_FILE_KEY, HUMANLOOP_FILE_TYPE_KEY, @@ -53,60 +48,29 @@ export function toolUtilityFactory( const flowMetadataKey = createContextKey(HUMANLOOP_TRACE_FLOW_CTX_KEY); // @ts-ignore - return opentelemetryTracer.startActiveSpan(generateSpanId(), async (span) => { + return opentelemetryTracer.startActiveSpan("humanloop.tool", async (span) => { const ctx = context.active(); const spanId = span.spanContext().spanId; - const parentSpanId = ctx.getValue(parentSpanContextKey) as - | string - | undefined; - const parentFlowMetadata = ctx.getValue(flowMetadataKey) as { - traceId: string; - isFlowLog: boolean; - traceParentId: string; - } | null; - - // Handle trace flow context - const flowMetadata = - parentSpanId && parentFlowMetadata - ? { - traceId: parentFlowMetadata.traceId, - isFlowLog: false, - traceParentId: parentSpanId, - } - : null; // Add span attributes span.setAttribute(HUMANLOOP_PATH_KEY, path || func.name); span.setAttribute(HUMANLOOP_FILE_TYPE_KEY, "tool"); span = span.setAttribute(HUMANLOOP_META_FUNCTION_NAME, func.name); - // @ts-ignore // Execute the wrapped function in the appropriate context - const { output, error } = await context.with( - ctx - .setValue(parentSpanContextKey, spanId) - .setValue(flowMetadataKey, flowMetadata), - async () => { - let output: O | null; - let error: string | null = null; - - try { - output = await func(inputs); - } catch (err: any) { - console.error(`Error calling ${func.name}:`, err); - output = null; - error = err.message || String(err); - } - - return { output, error }; - }, - ); - - const outputStringified = jsonifyIfNotString(func, output); + let output: O | null; + let error: string | null = null; + try { + output = await func(inputs); + } catch (err: any) { + console.error(`Error calling ${func.name}:`, err); + output = null; + error = err.message || String(err); + } const toolLog = { inputs: inputs, - output: outputStringified, + output: jsonifyIfNotString(func, output), error, }; From 4658307146be1d565e61fa166a8de1606f7f3884 Mon Sep 17 00:00:00 2001 From: Andrei Bratu Date: Tue, 7 Jan 2025 11:49:20 +0000 Subject: [PATCH 2/3] PR feedback --- src/otel/constants.ts | 7 +++- src/otel/exporter.ts | 23 ++++++++-- src/otel/helpers.ts | 5 +-- src/otel/processor.ts | 5 ++- src/utilities/flow.ts | 86 ++++++++++++++++++------------------- src/utilities/prompt.ts | 90 +++++++++++++++++++-------------------- src/utilities/tool.ts | 93 +++++++++++++++++++---------------------- 7 files changed, 156 insertions(+), 153 deletions(-) diff --git a/src/otel/constants.ts b/src/otel/constants.ts index d7f0935..baa463c 100644 --- a/src/otel/constants.ts +++ b/src/otel/constants.ts @@ -5,6 +5,9 @@ export const HUMANLOOP_LOG_KEY = "humanloop.log"; export const HUMANLOOP_FILE_TYPE_KEY = "humanloop.file_type"; export const HUMANLOOP_PATH_KEY = "humanloop.file.path"; export const HUMANLOOP_META_FUNCTION_NAME = "humanloop.meta.function_name"; -export const HUMANLOOP_PARENT_SPAN_CTX_KEY = "humanloop.context.parentSpanId"; -export const HUMANLOOP_TRACE_FLOW_CTX_KEY = "humanloop.context.traceFlow"; export const HUMANLOOP_FLOW_PREREQUISITES_KEY = "humanloop.flow.prerequisites"; + +export const HUMANLOOP_SPAN_PREFIX = "humanloop."; +export const HUMANLOOP_FLOW_SPAN_NAME = `${HUMANLOOP_SPAN_PREFIX}flow`; +export const HUMANLOOP_PROMPT_SPAN_NAME = `${HUMANLOOP_SPAN_PREFIX}prompt`; +export const HUMANLOOP_TOOL_SPAN_NAME = `${HUMANLOOP_SPAN_PREFIX}tool`; diff --git a/src/otel/exporter.ts b/src/otel/exporter.ts index 906ef19..ea14578 100644 --- a/src/otel/exporter.ts +++ b/src/otel/exporter.ts @@ -31,6 +31,8 @@ export class HumanloopSpanExporter implements SpanExporter { private shutdownFlag: boolean; private readonly uploadPromises: Promise[]; private readonly exportedSpans: ReadableSpan[]; + // List of spans that must be uploaded before completing the Flow log + // This maps [flow log span ID] -> [set of child span IDs] private readonly prerequisites: Map>; constructor(client: HumanloopClient) { @@ -72,7 +74,20 @@ export class HumanloopSpanExporter implements SpanExporter { await this.shutdown(); } - private completeFlowLog(spanId: string) { + /** + * Mark a span as uploaded to the Humanloop. + * + * A Log might be contained inside a Flow trace, which must be marked as complete + * when all its children are uploaded. Each Flow Log span contains a + * 'humanloop.flow.prerequisites' attribute, which is a list of all spans that must + * be uploaded before the Flow Log is marked as complete. + * + * This method finds the trace the Span belongs to and removes the Span from the list. + * Once all prerequisites are uploaded, the method marks the Flow Log as complete. + * + * @param spanId - The ID of the span that has been uploaded. + */ + private notifySpanUploaded(spanId: string) { for (const [flowLogSpanId, flowChildrenSpanIds] of this.prerequisites) { if (flowChildrenSpanIds.has(spanId)) { flowChildrenSpanIds.delete(spanId); @@ -146,7 +161,7 @@ export class HumanloopSpanExporter implements SpanExporter { } catch (error) { console.error(`Error exporting prompt: ${error}`); } - this.completeFlowLog(span.spanContext().spanId); + this.notifySpanUploaded(span.spanContext().spanId); } private async exportTool(span: ReadableSpan): Promise { @@ -175,7 +190,7 @@ export class HumanloopSpanExporter implements SpanExporter { } catch (error) { console.error(`Error exporting tool: ${error}`); } - this.completeFlowLog(span.spanContext().spanId); + this.notifySpanUploaded(span.spanContext().spanId); } private async exportFlow(span: ReadableSpan): Promise { @@ -218,6 +233,6 @@ export class HumanloopSpanExporter implements SpanExporter { } catch (error) { console.error("Error exporting flow: ", error, span.spanContext().spanId); } - this.completeFlowLog(span.spanContext().spanId); + this.notifySpanUploaded(span.spanContext().spanId); } } diff --git a/src/otel/helpers.ts b/src/otel/helpers.ts index 14cd752..7555b7b 100644 --- a/src/otel/helpers.ts +++ b/src/otel/helpers.ts @@ -1,9 +1,8 @@ import { AttributeValue, SpanKind } from "@opentelemetry/api"; import { ReadableSpan } from "@opentelemetry/sdk-trace-base"; -import { v4 as uuidv4 } from "uuid"; // Constants for Humanloop attributes -import { HUMANLOOP_FILE_TYPE_KEY } from "./constants"; +import { HUMANLOOP_SPAN_PREFIX } from "./constants"; export type NestedDict = { [key: string]: NestedDict | AttributeValue }; export type NestedList = Array; @@ -191,7 +190,7 @@ export function isLLMProviderCall(span: ReadableSpan): boolean { * @returns True if the span was created by the Humanloop SDK, false otherwise */ export function isHumanloopSpan(span: ReadableSpan): boolean { - return span.name.startsWith("humanloop."); + return span.name.startsWith(HUMANLOOP_SPAN_PREFIX); } /** diff --git a/src/otel/processor.ts b/src/otel/processor.ts index 1a2e9d4..4729ecf 100644 --- a/src/otel/processor.ts +++ b/src/otel/processor.ts @@ -13,6 +13,7 @@ import { PromptKernelRequest } from "../api/types/PromptKernelRequest"; import { HUMANLOOP_FILE_KEY, HUMANLOOP_FILE_TYPE_KEY, + HUMANLOOP_FLOW_SPAN_NAME, HUMANLOOP_LOG_KEY, HUMANLOOP_META_FUNCTION_NAME, } from "./constants"; @@ -52,7 +53,7 @@ export class HumanloopSpanProcessor implements SpanProcessor { onStart(span: Span, _: Context): void { const spanId = span.spanContext().spanId; const parentSpanId = span.parentSpanId; - if (span.name === "humanloop.flow") { + if (span.name === HUMANLOOP_FLOW_SPAN_NAME) { this.prerequisites.set(spanId, []); } if (parentSpanId !== undefined && isHumanloopSpan(span)) { @@ -104,7 +105,7 @@ export class HumanloopSpanProcessor implements SpanProcessor { }).then((_) => { // All instrumentor spans have arrived, we can process the // Humanloop parent span owning them - if (span.name === "humanloop.flow") { + if (span.name === HUMANLOOP_FLOW_SPAN_NAME) { // If the span if a Flow Log, add attribute with all span IDs it // needs to wait before completion writeToOpenTelemetrySpan( diff --git a/src/utilities/flow.ts b/src/utilities/flow.ts index d083907..e0ba7f6 100644 --- a/src/utilities/flow.ts +++ b/src/utilities/flow.ts @@ -1,14 +1,12 @@ -import { context, createContextKey } from "@opentelemetry/api"; import { ReadableSpan, Tracer } from "@opentelemetry/sdk-trace-node"; import { FlowKernelRequest } from "../api/types/FlowKernelRequest"; import { HUMANLOOP_FILE_TYPE_KEY, + HUMANLOOP_FLOW_SPAN_NAME, HUMANLOOP_LOG_KEY, HUMANLOOP_META_FUNCTION_NAME, - HUMANLOOP_PARENT_SPAN_CTX_KEY, HUMANLOOP_PATH_KEY, - HUMANLOOP_TRACE_FLOW_CTX_KEY, NestedDict, jsonifyIfNotString, writeToOpenTelemetrySpan, @@ -35,55 +33,53 @@ export function flowUtilityFactory( ); } - const parentSpanContextKey = createContextKey(HUMANLOOP_PARENT_SPAN_CTX_KEY); - const flowMetadataKey = createContextKey(HUMANLOOP_TRACE_FLOW_CTX_KEY); // @ts-ignore - return opentelemetryTracer.startActiveSpan("humanloop.flow", async (span) => { - const ctx = context.active(); - const spanId = span.spanContext().spanId; + return opentelemetryTracer.startActiveSpan( + HUMANLOOP_FLOW_SPAN_NAME, + async (span) => { + // Add span attributes + span = span.setAttribute(HUMANLOOP_PATH_KEY, path || func.name); + span = span.setAttribute(HUMANLOOP_FILE_TYPE_KEY, "flow"); + span = span.setAttribute(HUMANLOOP_META_FUNCTION_NAME, func.name); - // Add span attributes - span = span.setAttribute(HUMANLOOP_PATH_KEY, path || func.name); - span = span.setAttribute(HUMANLOOP_FILE_TYPE_KEY, "flow"); - span = span.setAttribute(HUMANLOOP_META_FUNCTION_NAME, func.name); + if (version) { + writeToOpenTelemetrySpan( + span as unknown as ReadableSpan, + version as unknown as NestedDict, + "humanloop.file.flow", + ); + } - if (version) { - writeToOpenTelemetrySpan( - span as unknown as ReadableSpan, - version as unknown as NestedDict, - "humanloop.file.flow", - ); - } - - let output: O | null; - let error: string | null = null; - try { - output = await func(inputs, messages); - } catch (err: any) { - console.error(`Error calling ${func.name}:`, err); - output = null; - error = err.message || String(err); - } + let output: O | null; + let error: string | null = null; + try { + output = await func(inputs, messages); + } catch (err: any) { + console.error(`Error calling ${func.name}:`, err); + output = null; + error = err.message || String(err); + } - const outputStringified = jsonifyIfNotString(func, output); + const outputStringified = jsonifyIfNotString(func, output); - const flowLog = { - output: outputStringified, - inputs: inputs, - messages: messages, - error, - }; + const flowLog = { + output: outputStringified, + inputs: inputs, + messages: messages, + error, + }; - writeToOpenTelemetrySpan( - span as unknown as ReadableSpan, - // @ts-ignore - flowLog, - HUMANLOOP_LOG_KEY, - ); + writeToOpenTelemetrySpan( + span as unknown as ReadableSpan, + // @ts-ignore + flowLog, + HUMANLOOP_LOG_KEY, + ); - span.end(); - return output; - }); + span.end(); + return output; + }, + ); }; // @ts-ignore diff --git a/src/utilities/prompt.ts b/src/utilities/prompt.ts index a0aee37..dcd1d59 100644 --- a/src/utilities/prompt.ts +++ b/src/utilities/prompt.ts @@ -1,4 +1,3 @@ -import { context, createContextKey } from "@opentelemetry/api"; import { ReadableSpan, Tracer } from "@opentelemetry/sdk-trace-node"; import { PromptKernelRequest } from "../api/types/PromptKernelRequest"; @@ -7,9 +6,8 @@ import { HUMANLOOP_FILE_TYPE_KEY, HUMANLOOP_LOG_KEY, HUMANLOOP_META_FUNCTION_NAME, - HUMANLOOP_PARENT_SPAN_CTX_KEY, HUMANLOOP_PATH_KEY, - HUMANLOOP_TRACE_FLOW_CTX_KEY, + HUMANLOOP_PROMPT_SPAN_NAME, NestedDict, jsonifyIfNotString, writeToOpenTelemetrySpan, @@ -56,57 +54,53 @@ export function promptUtilityFactory( .reduce((obj, [key, value]) => ({ ...obj, [key]: value }), {}); } - const parentSpanContextKey = createContextKey(HUMANLOOP_PARENT_SPAN_CTX_KEY); - const flowMetadataKey = createContextKey(HUMANLOOP_TRACE_FLOW_CTX_KEY); - // @ts-ignore - return opentelemetryTracer.startActiveSpan("humanloop.prompt", async (span) => { - const ctx = context.active(); - const spanId = span.spanContext().spanId; + return opentelemetryTracer.startActiveSpan( + HUMANLOOP_PROMPT_SPAN_NAME, + async (span) => { + // Add span attributes + span = span.setAttribute(HUMANLOOP_PATH_KEY, path || func.name); + span = span.setAttribute(HUMANLOOP_FILE_TYPE_KEY, "prompt"); + span = span.setAttribute(HUMANLOOP_META_FUNCTION_NAME, func.name); + + if (version) { + writeToOpenTelemetrySpan( + span as unknown as ReadableSpan, + { + ...version, + } as unknown as NestedDict, + "humanloop.file.prompt", + ); + } + + // Execute the wrapped function in a child context + let output: O | null; + let error: string | null = null; + try { + output = await func(inputs, messages); + } catch (err: any) { + console.error(`Error calling ${func.name}:`, err); + output = null; + error = err.message || String(err); + } - // Add span attributes - span = span.setAttribute(HUMANLOOP_PATH_KEY, path || func.name); - span = span.setAttribute(HUMANLOOP_FILE_TYPE_KEY, "prompt"); - span = span.setAttribute(HUMANLOOP_META_FUNCTION_NAME, func.name); + const promptLog = { + output: jsonifyIfNotString(func, output), + error, + inputs: inputs, + messages: messages, + }; - if (version) { writeToOpenTelemetrySpan( span as unknown as ReadableSpan, - { - ...version, - } as unknown as NestedDict, - "humanloop.file.prompt", + // @ts-ignore + promptLog, + HUMANLOOP_LOG_KEY, ); - } - - // Execute the wrapped function in a child context - let output: O | null; - let error: string | null = null; - try { - output = await func(inputs, messages); - } catch (err: any) { - console.error(`Error calling ${func.name}:`, err); - // @ts-ignore - output = null; - error = err.message || String(err); - } - - const promptLog = { - output: jsonifyIfNotString(func, output), - error, - inputs: inputs, - messages: messages, - }; - - writeToOpenTelemetrySpan( - span as unknown as ReadableSpan, - // @ts-ignore - promptLog, - HUMANLOOP_LOG_KEY, - ); - span.end(); - return output; - }); + span.end(); + return output; + }, + ); }; } diff --git a/src/utilities/tool.ts b/src/utilities/tool.ts index accf314..930c9ff 100644 --- a/src/utilities/tool.ts +++ b/src/utilities/tool.ts @@ -1,4 +1,3 @@ -import { context, createContextKey } from "@opentelemetry/api"; import { ReadableSpan, Tracer } from "@opentelemetry/sdk-trace-node"; import { ToolKernelRequest } from "../api/types/ToolKernelRequest"; @@ -8,9 +7,8 @@ import { HUMANLOOP_FILE_TYPE_KEY, HUMANLOOP_LOG_KEY, HUMANLOOP_META_FUNCTION_NAME, - HUMANLOOP_PARENT_SPAN_CTX_KEY, HUMANLOOP_PATH_KEY, - HUMANLOOP_TRACE_FLOW_CTX_KEY, + HUMANLOOP_TOOL_SPAN_NAME, } from "../otel/constants"; import { ToolCallableType } from "./types"; @@ -44,53 +42,50 @@ export function toolUtilityFactory( ): O extends Promise ? Promise : Promise => { validateArgumentsAgainstSchema(version, inputs); - const parentSpanContextKey = createContextKey(HUMANLOOP_PARENT_SPAN_CTX_KEY); - const flowMetadataKey = createContextKey(HUMANLOOP_TRACE_FLOW_CTX_KEY); - // @ts-ignore - return opentelemetryTracer.startActiveSpan("humanloop.tool", async (span) => { - const ctx = context.active(); - const spanId = span.spanContext().spanId; - - // Add span attributes - span.setAttribute(HUMANLOOP_PATH_KEY, path || func.name); - span.setAttribute(HUMANLOOP_FILE_TYPE_KEY, "tool"); - span = span.setAttribute(HUMANLOOP_META_FUNCTION_NAME, func.name); - - // Execute the wrapped function in the appropriate context - let output: O | null; - let error: string | null = null; - try { - output = await func(inputs); - } catch (err: any) { - console.error(`Error calling ${func.name}:`, err); - output = null; - error = err.message || String(err); - } - - const toolLog = { - inputs: inputs, - output: jsonifyIfNotString(func, output), - error, - }; - - writeToOpenTelemetrySpan( - span as unknown as ReadableSpan, - toolLog as unknown as NestedDict, - HUMANLOOP_LOG_KEY, - ); - - writeToOpenTelemetrySpan( - span as unknown as ReadableSpan, - { - ...version, - } as unknown as NestedDict, - `${HUMANLOOP_FILE_KEY}.tool`, - ); - - span.end(); - return output; - }); + return opentelemetryTracer.startActiveSpan( + HUMANLOOP_TOOL_SPAN_NAME, + async (span) => { + // Add span attributes + span.setAttribute(HUMANLOOP_PATH_KEY, path || func.name); + span.setAttribute(HUMANLOOP_FILE_TYPE_KEY, "tool"); + span = span.setAttribute(HUMANLOOP_META_FUNCTION_NAME, func.name); + + // Execute the wrapped function in the appropriate context + let output: O | null; + let error: string | null = null; + try { + output = await func(inputs); + } catch (err: any) { + console.error(`Error calling ${func.name}:`, err); + output = null; + error = err.message || String(err); + } + + const toolLog = { + inputs: inputs, + output: jsonifyIfNotString(func, output), + error, + }; + + writeToOpenTelemetrySpan( + span as unknown as ReadableSpan, + toolLog as unknown as NestedDict, + HUMANLOOP_LOG_KEY, + ); + + writeToOpenTelemetrySpan( + span as unknown as ReadableSpan, + { + ...version, + } as unknown as NestedDict, + `${HUMANLOOP_FILE_KEY}.tool`, + ); + + span.end(); + return output; + }, + ); }; // @ts-ignore Adding jsonSchema property to utility-wrapped function From 8feeed59f536b7c7c9c36faf461fc7ca2b55c2b0 Mon Sep 17 00:00:00 2001 From: Andrei Bratu Date: Tue, 7 Jan 2025 18:45:57 +0000 Subject: [PATCH 3/3] renamed exporter span uploaded function --- src/otel/exporter.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/otel/exporter.ts b/src/otel/exporter.ts index ea14578..8c3148d 100644 --- a/src/otel/exporter.ts +++ b/src/otel/exporter.ts @@ -87,7 +87,7 @@ export class HumanloopSpanExporter implements SpanExporter { * * @param spanId - The ID of the span that has been uploaded. */ - private notifySpanUploaded(spanId: string) { + private markSpanCompleted(spanId: string) { for (const [flowLogSpanId, flowChildrenSpanIds] of this.prerequisites) { if (flowChildrenSpanIds.has(spanId)) { flowChildrenSpanIds.delete(spanId); @@ -161,7 +161,7 @@ export class HumanloopSpanExporter implements SpanExporter { } catch (error) { console.error(`Error exporting prompt: ${error}`); } - this.notifySpanUploaded(span.spanContext().spanId); + this.markSpanCompleted(span.spanContext().spanId); } private async exportTool(span: ReadableSpan): Promise { @@ -190,7 +190,7 @@ export class HumanloopSpanExporter implements SpanExporter { } catch (error) { console.error(`Error exporting tool: ${error}`); } - this.notifySpanUploaded(span.spanContext().spanId); + this.markSpanCompleted(span.spanContext().spanId); } private async exportFlow(span: ReadableSpan): Promise { @@ -233,6 +233,6 @@ export class HumanloopSpanExporter implements SpanExporter { } catch (error) { console.error("Error exporting flow: ", error, span.spanContext().spanId); } - this.notifySpanUploaded(span.spanContext().spanId); + this.markSpanCompleted(span.spanContext().spanId); } }