Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flow complete fix #9

Merged
merged 3 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 30 additions & 30 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -12,56 +12,56 @@
"test": "jest"
},
"dependencies": {
"url-join": "4.0.1",
"form-data": "^4.0.0",
"formdata-node": "^6.0.3",
"node-fetch": "2.7.0",
"qs": "6.11.2",
"readable-stream": "^4.5.2",
"form-data-encoder": "^4.0.2",
"@opentelemetry/api": "1.9.0",
"@opentelemetry/auto-instrumentations-node": "0.53.0",
"@opentelemetry/sdk-metrics": "1.28.0",
"@opentelemetry/sdk-node": "0.55.0",
"@opentelemetry/sdk-trace-node": "1.28.0",
"@traceloop/ai-semantic-conventions": "0.11.6",
"@traceloop/instrumentation-anthropic": "0.11.1",
"@traceloop/instrumentation-cohere": "0.11.1",
"@traceloop/instrumentation-openai": "0.11.3",
"@traceloop/ai-semantic-conventions": "0.11.6",
"uuid": "11.0.3",
"nanoid": "5.0.9",
"cli-progress": "^3.12.0",
"form-data": "^4.0.0",
"form-data-encoder": "^4.0.2",
"formdata-node": "^6.0.3",
"lodash": "4.17.21",
"nanoid": "5.0.9",
"node-fetch": "2.7.0",
"p-map": "7.0.3",
"stable-hash": "0.0.4"
"qs": "6.11.2",
"readable-stream": "^4.5.2",
"stable-hash": "0.0.4",
"url-join": "4.0.1",
"uuid": "11.0.3"
},
"devDependencies": {
"@types/url-join": "4.0.1",
"@types/qs": "6.9.8",
"@anthropic-ai/sdk": "^0.32.1",
"@babel/core": "^7.26.0",
"@babel/plugin-transform-modules-commonjs": "^7.26.3",
"@babel/preset-env": "^7.26.0",
"@trivago/prettier-plugin-sort-imports": "4.3.0",
"@types/cli-progress": "^3.11.6",
"@types/jest": "29.5.5",
"@types/lodash": "4.14.74",
"@types/node": "17.0.33",
"@types/node-fetch": "2.6.9",
"@types/qs": "6.9.8",
"@types/readable-stream": "^4.0.15",
"@types/url-join": "4.0.1",
"babel-jest": "^29.7.0",
"cohere-ai": "^7.15.0",
"dotenv": "^16.4.6",
"fetch-mock-jest": "^1.5.1",
"webpack": "^5.94.0",
"ts-loader": "^9.3.1",
"jest": "29.7.0",
"@types/jest": "29.5.5",
"ts-jest": "29.1.1",
"jest-environment-jsdom": "29.7.0",
"@types/node": "17.0.33",
"jsonschema": "^1.4.1",
"openai": "^4.74.0",
"prettier": "^3.4.2",
"ts-jest": "29.1.1",
"ts-loader": "^9.3.1",
"typescript": "4.6.4",
"openai": "^4.74.0",
"@anthropic-ai/sdk": "^0.32.1",
"cohere-ai": "^7.15.0",
"dotenv": "^16.4.6",
"jsonschema": "^1.4.1",
"@types/cli-progress": "^3.11.6",
"babel-jest": "^29.7.0",
"@babel/core": "^7.26.0",
"@babel/plugin-transform-modules-commonjs": "^7.26.3",
"@babel/preset-env": "^7.26.0",
"@types/lodash": "4.14.74",
"@trivago/prettier-plugin-sort-imports": "4.3.0"
"webpack": "^5.94.0"
},
"browser": {
"fs": false,
Expand Down
8 changes: 6 additions & 2 deletions src/otel/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,9 @@ export const HUMANLOOP_LOG_KEY = "humanloop.log";
export const HUMANLOOP_FILE_TYPE_KEY = "humanloop.file_type";
export const HUMANLOOP_PATH_KEY = "humanloop.file.path";
export const HUMANLOOP_META_FUNCTION_NAME = "humanloop.meta.function_name";
export const HUMANLOOP_PARENT_SPAN_CTX_KEY = "humanloop.context.parentSpanId";
export const HUMANLOOP_TRACE_FLOW_CTX_KEY = "humanloop.context.traceFlow";
export const HUMANLOOP_FLOW_PREREQUISITES_KEY = "humanloop.flow.prerequisites";

export const HUMANLOOP_SPAN_PREFIX = "humanloop.";
export const HUMANLOOP_FLOW_SPAN_NAME = `${HUMANLOOP_SPAN_PREFIX}flow`;
export const HUMANLOOP_PROMPT_SPAN_NAME = `${HUMANLOOP_SPAN_PREFIX}prompt`;
export const HUMANLOOP_TOOL_SPAN_NAME = `${HUMANLOOP_SPAN_PREFIX}tool`;
46 changes: 46 additions & 0 deletions src/otel/exporter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import { HumanloopClient } from "../humanloop.client";
import {
HUMANLOOP_FILE_KEY,
HUMANLOOP_FILE_TYPE_KEY,
HUMANLOOP_FLOW_PREREQUISITES_KEY,
HUMANLOOP_LOG_KEY,
HUMANLOOP_PATH_KEY,
} from "./constants";
Expand All @@ -30,13 +31,17 @@ export class HumanloopSpanExporter implements SpanExporter {
private shutdownFlag: boolean;
private readonly uploadPromises: Promise<void>[];
private readonly exportedSpans: ReadableSpan[];
// List of spans that must be uploaded before completing the Flow log
// This maps [flow log span ID] -> [set of child span IDs]
private readonly prerequisites: Map<string, Set<string>>;

constructor(client: HumanloopClient) {
this.client = client;
this.spanIdToUploadedLogId = new Map();
this.shutdownFlag = false;
this.uploadPromises = [];
this.exportedSpans = [];
this.prerequisites = new Map();
}

export(spans: ReadableSpan[]): ExportResult {
Expand Down Expand Up @@ -69,6 +74,32 @@ export class HumanloopSpanExporter implements SpanExporter {
await this.shutdown();
}

/**
* Mark a span as uploaded to the Humanloop.
*
* A Log might be contained inside a Flow trace, which must be marked as complete
* when all its children are uploaded. Each Flow Log span contains a
* 'humanloop.flow.prerequisites' attribute, which is a list of all spans that must
* be uploaded before the Flow Log is marked as complete.
*
* This method finds the trace the Span belongs to and removes the Span from the list.
* Once all prerequisites are uploaded, the method marks the Flow Log as complete.
*
* @param spanId - The ID of the span that has been uploaded.
*/
private markSpanCompleted(spanId: string) {
for (const [flowLogSpanId, flowChildrenSpanIds] of this.prerequisites) {
if (flowChildrenSpanIds.has(spanId)) {
flowChildrenSpanIds.delete(spanId);
if (flowChildrenSpanIds.size === 0) {
const flowLogId = this.spanIdToUploadedLogId.get(flowLogSpanId)!;
this.client.flows.updateLog(flowLogId, { traceStatus: "complete" });
}
break;
}
}
}

private async exportSpanDispatch(span: ReadableSpan): Promise<void> {
const fileType = span.attributes[HUMANLOOP_FILE_TYPE_KEY];
const parentSpanId = span.parentSpanId;
Expand Down Expand Up @@ -130,6 +161,7 @@ export class HumanloopSpanExporter implements SpanExporter {
} catch (error) {
console.error(`Error exporting prompt: ${error}`);
}
this.markSpanCompleted(span.spanContext().spanId);
}

private async exportTool(span: ReadableSpan): Promise<void> {
Expand Down Expand Up @@ -158,6 +190,7 @@ export class HumanloopSpanExporter implements SpanExporter {
} catch (error) {
console.error(`Error exporting tool: ${error}`);
}
this.markSpanCompleted(span.spanContext().spanId);
}

private async exportFlow(span: ReadableSpan): Promise<void> {
Expand All @@ -168,6 +201,18 @@ export class HumanloopSpanExporter implements SpanExporter {
logObject.startTime = hrTimeToDate(span.startTime);
logObject.endTime = hrTimeToDate(span.endTime);
logObject.createdAt = hrTimeToDate(span.endTime);
// Spans that must be uploaded before the Flow Span is completed
let prerequisites: string[] | undefined = undefined;
try {
prerequisites = readFromOpenTelemetrySpan(
span,
HUMANLOOP_FLOW_PREREQUISITES_KEY,
) as unknown as string[];
} catch (error) {
prerequisites = [];
}

this.prerequisites.set(span.spanContext().spanId, new Set(prerequisites));

const spanParentId = span.parentSpanId;
const traceParentId = spanParentId
Expand All @@ -188,5 +233,6 @@ export class HumanloopSpanExporter implements SpanExporter {
} catch (error) {
console.error("Error exporting flow: ", error, span.spanContext().spanId);
}
this.markSpanCompleted(span.spanContext().spanId);
}
}
16 changes: 3 additions & 13 deletions src/otel/helpers.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
import { AttributeValue, SpanKind } from "@opentelemetry/api";
import { ReadableSpan } from "@opentelemetry/sdk-trace-base";
import { v4 as uuidv4 } from "uuid";

// Constants for Humanloop attributes
import { HUMANLOOP_FILE_TYPE_KEY } from "./constants";
import { HUMANLOOP_SPAN_PREFIX } from "./constants";

export type NestedDict = { [key: string]: NestedDict | AttributeValue };
export type NestedList = Array<NestedDict | AttributeValue>;
Expand Down Expand Up @@ -34,7 +33,7 @@ function _listToOtelFormat(lst: NestedList): NestedDict {
*/
export function writeToOpenTelemetrySpan(
span: ReadableSpan,
value: NestedDict | NestedList | AttributeValue,
value: NestedDict | NestedList | AttributeValue | any[],
key: string,
): void {
let toWriteCopy: NestedDict;
Expand Down Expand Up @@ -191,16 +190,7 @@ export function isLLMProviderCall(span: ReadableSpan): boolean {
* @returns True if the span was created by the Humanloop SDK, false otherwise
*/
export function isHumanloopSpan(span: ReadableSpan): boolean {
return span.attributes[HUMANLOOP_FILE_TYPE_KEY] !== undefined;
}

/**
* Generates a unique span ID.
*
* @returns A UUID string
*/
export function generateSpanId(): string {
return uuidv4();
return span.name.startsWith(HUMANLOOP_SPAN_PREFIX);
}

/**
Expand Down
43 changes: 40 additions & 3 deletions src/otel/processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { PromptKernelRequest } from "../api/types/PromptKernelRequest";
import {
HUMANLOOP_FILE_KEY,
HUMANLOOP_FILE_TYPE_KEY,
HUMANLOOP_FLOW_SPAN_NAME,
HUMANLOOP_LOG_KEY,
HUMANLOOP_META_FUNCTION_NAME,
} from "./constants";
Expand All @@ -36,15 +37,37 @@ interface CompletableSpan {
export class HumanloopSpanProcessor implements SpanProcessor {
private spanExporter: SpanExporter;
private children: Map<string, CompletableSpan[]>;
// List of all span IDs that are contained in a Flow trace
// They are passed to the Exporter as a span attribute
// so the Exporter knows when to complete a trace
private prerequisites: Map<string, string[]>;

constructor(exporter: SpanExporter) {
this.spanExporter = exporter;
this.children = new Map();
this.prerequisites = new Map();
}

async forceFlush(): Promise<void> {}

onStart(span: Span, _: Context): void {
const spanId = span.spanContext().spanId;
const parentSpanId = span.parentSpanId;
if (span.name === HUMANLOOP_FLOW_SPAN_NAME) {
this.prerequisites.set(spanId, []);
}
if (parentSpanId !== undefined && isHumanloopSpan(span)) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

noob question - (parentSpanId !== undefined && isHumanloopSpan(span)) this is to check if it's a span from flow child ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep - i am building a flat list of all logs found inside a trace. this list is passed as an attribute on the flow log span to the expoter. when they're all uploaded, the flow log is also marked as complete

for (const [traceHead, allTraceNodes] of this.prerequisites) {
if (
parentSpanId === traceHead ||
allTraceNodes.includes(parentSpanId)
) {
allTraceNodes.push(spanId);
this.prerequisites.set(traceHead, allTraceNodes);
break;
}
}
}
// Handle stream case: when Prompt instrumented function calls a provider with streaming: true
// The instrumentor span will end only when the ChunksResponse is consumed, which can happen
// after the span created by the Prompt utility finishes. To handle this, we register all instrumentor
Expand All @@ -66,6 +89,7 @@ export class HumanloopSpanProcessor implements SpanProcessor {
*/
onEnd(span: ReadableSpan): void {
if (isHumanloopSpan(span)) {
// Wait for children to complete asynchronously
new Promise<void>((resolve) => {
const checkChildrenSpans = () => {
const childrenSpans = this.children.get(span.spanContext().spanId);
Expand All @@ -79,15 +103,28 @@ export class HumanloopSpanProcessor implements SpanProcessor {
};
checkChildrenSpans();
}).then((_) => {
// All children/ instrumentor spans have arrived, we can process the
// All instrumentor spans have arrived, we can process the
// Humanloop parent span owning them
if (span.name === HUMANLOOP_FLOW_SPAN_NAME) {
// If the span if a Flow Log, add attribute with all span IDs it
// needs to wait before completion
writeToOpenTelemetrySpan(
span,
this.prerequisites.get(span.spanContext().spanId) || [],
HUMANLOOP_LOG_KEY,
);
this.prerequisites.delete(span.spanContext().spanId);
}

this.processSpanDispatch(
span,
this.children.get(span.spanContext().spanId) || [],
);

// Release references
this.children.delete(span.spanContext().spanId);
// Export the Humanloop span

// Pass Humanloop span to Exporter
this.spanExporter.export([span], (result: ExportResult) => {
if (result.code !== ExportResultCode.SUCCESS) {
console.error("Failed to export span:", result.error);
Expand Down Expand Up @@ -182,7 +219,7 @@ export class HumanloopSpanProcessor implements SpanProcessor {
// Placeholder for processing other file types
break;
default:
console.error("Unknown Humanloop File Span", span);
console.error("Unknown Humanloop File span", span);
}
}

Expand Down
Loading
Loading