Skip to content

Commit

Permalink
INN-2882 Add selective header forwarding when sending events (#520)
Browse files Browse the repository at this point in the history
## Summary
<!-- Succinctly describe your change, providing context, what you've
changed, and why. -->

Read the incoming headers tracestate and traceparent. If it is set, copy
the header to any outgoing event-sending requests.

This only covers `step.sendEvent()`; we'll add later support for
`client.send()` as we need async hooks for this which has scattered
runtime support.

## Checklist
<!-- Tick these items off as you progress. -->
<!-- If an item isn't applicable, ideally please strikeout the item by
wrapping it in "~~"" and suffix it with "N/A My reason for skipping
this." -->
<!-- e.g. "- [ ] ~~Added tests~~ N/A Only touches docs" -->

- [ ] ~Added a [docs PR](https://github.com/inngest/website) that
references this PR~ N/A
- [ ] Added unit/integration tests
- [x] Added changesets if applicable

## Related
<!-- A space for any related links, issues, or PRs. -->
<!-- Linear issues are autolinked. -->
<!-- e.g. - INN-123 -->
<!-- GitHub issues/PRs can be linked using shorthand. -->
<!-- e.g. "- inngest/inngest#123" -->
<!-- Feel free to remove this section if there are no applicable related
links.-->
- INN-2882
  • Loading branch information
jpwilliams authored Apr 23, 2024
1 parent a639269 commit 0703740
Show file tree
Hide file tree
Showing 10 changed files with 101 additions and 8 deletions.
5 changes: 5 additions & 0 deletions .changeset/gold-crews-fly.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"inngest": patch
---

Add selective header forwarding when sending events, allowing context propagation when tracing fanouts 👀
16 changes: 15 additions & 1 deletion packages/inngest/src/components/Inngest.ts
Original file line number Diff line number Diff line change
Expand Up @@ -351,6 +351,20 @@ export class Inngest<TClientOpts extends ClientOptions = ClientOptions> {
public async send<Payload extends SendEventPayload<GetEvents<this>>>(
payload: Payload
): Promise<SendEventOutput<TClientOpts>> {
return this._send({ payload });
}

/**
* Internal method for sending an event, used to allow Inngest internals to
* further customize the request sent to an Inngest Server.
*/
private async _send<Payload extends SendEventPayload<GetEvents<this>>>({
payload,
headers,
}: {
payload: Payload;
headers?: Record<string, string>;
}): Promise<SendEventOutput<TClientOpts>> {
const hooks = await getHookStack(
this.middleware,
"onSendEvent",
Expand Down Expand Up @@ -465,7 +479,7 @@ export class Inngest<TClientOpts extends ClientOptions = ClientOptions> {
const response = await this.fetch(url, {
method: "POST",
body: stringify(payloads),
headers: { ...this.headers },
headers: { ...this.headers, ...headers },
});

let body: SendEventResponse | undefined;
Expand Down
38 changes: 37 additions & 1 deletion packages/inngest/src/components/InngestCommHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -735,12 +735,37 @@ export class InngestCommHandler<
(await getQuerystring("processing run request", queryKeys.StepId)) ||
null;

const headersToFetch = [headerKeys.TraceParent, headerKeys.TraceState];

const headerPromises = headersToFetch.map(async (header) => {
const value = await actions.headers(
`fetching ${header} for forwarding`,
header
);

return { header, value };
});

const fetchedHeaders = await Promise.all(headerPromises);

const headersToForward = fetchedHeaders.reduce<Record<string, string>>(
(acc, { header, value }) => {
if (value) {
acc[header] = value;
}

return acc;
},
{}
);

const { version, result } = this.runStep({
functionId: fnId,
data: body,
stepId,
timer,
reqArgs,
headers: headersToForward,
});
const stepOutput = await result;

Expand All @@ -759,6 +784,7 @@ export class InngestCommHandler<
status: result.retriable ? 500 : 400,
headers: {
"Content-Type": "application/json",
...headersToForward,
[headerKeys.NoRetry]: result.retriable ? "false" : "true",
...(typeof result.retriable === "string"
? { [headerKeys.RetryAfter]: result.retriable }
Expand All @@ -773,6 +799,7 @@ export class InngestCommHandler<
status: 200,
headers: {
"Content-Type": "application/json",
...headersToForward,
},
body: stringify(undefinedToNull(result.data)),
version,
Expand All @@ -783,6 +810,7 @@ export class InngestCommHandler<
status: 500,
headers: {
"Content-Type": "application/json",
...headersToForward,
[headerKeys.NoRetry]: "false",
},
body: stringify({
Expand All @@ -800,6 +828,7 @@ export class InngestCommHandler<
status: 206,
headers: {
"Content-Type": "application/json",
...headersToForward,
...(typeof result.retriable !== "undefined"
? {
[headerKeys.NoRetry]: result.retriable ? "false" : "true",
Expand All @@ -818,7 +847,10 @@ export class InngestCommHandler<

return {
status: 206,
headers: { "Content-Type": "application/json" },
headers: {
"Content-Type": "application/json",
...headersToForward,
},
body: stringify(steps),
version,
};
Expand Down Expand Up @@ -941,12 +973,14 @@ export class InngestCommHandler<
data,
timer,
reqArgs,
headers,
}: {
functionId: string;
stepId: string | null;
data: unknown;
timer: ServerTiming;
reqArgs: unknown[];
headers: Record<string, string>;
}): { version: ExecutionVersion; result: Promise<ExecutionResult> } {
const fn = this.fns[functionId];
if (!fn) {
Expand Down Expand Up @@ -1012,6 +1046,7 @@ export class InngestCommHandler<
isFailureHandler: fn.onFailure,
stepCompletionOrder: ctx?.stack?.stack ?? [],
reqArgs,
headers,
},
};
},
Expand Down Expand Up @@ -1047,6 +1082,7 @@ export class InngestCommHandler<
disableImmediateExecution: ctx?.disable_immediate_execution,
stepCompletionOrder: ctx?.stack?.stack ?? [],
reqArgs,
headers,
},
};
},
Expand Down
2 changes: 2 additions & 0 deletions packages/inngest/src/components/InngestFunction.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,7 @@ describe("runFn", () => {
stepState: {},
stepCompletionOrder: [],
reqArgs: [],
headers: {},
},
});

Expand Down Expand Up @@ -200,6 +201,7 @@ describe("runFn", () => {
runId: "run",
stepCompletionOrder: [],
reqArgs: [],
headers: {},
},
});

Expand Down
7 changes: 6 additions & 1 deletion packages/inngest/src/components/InngestStepTools.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import {
} from "./Inngest";
import { InngestFunction } from "./InngestFunction";
import { InngestFunctionReference } from "./InngestFunctionReference";
import { type InngestExecution } from "./execution/InngestExecution";

export interface FoundStep extends HashedOp {
hashedId: string;
Expand Down Expand Up @@ -113,6 +114,7 @@ export const createStepTools = <
TTriggers extends TriggersFromClient<TClient> = TriggersFromClient<TClient>,
>(
client: TClient,
execution: InngestExecution,
stepHandler: StepHandler
) => {
/**
Expand Down Expand Up @@ -190,7 +192,10 @@ export const createStepTools = <
},
{
fn: (idOrOptions, payload) => {
return client.send(payload);
return client["_send"]({
payload,
headers: execution["options"]["headers"],
});
},
}
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ export interface InngestExecutionOptions {
data: Omit<Context.Any, "step">;
stepState: Record<string, MemoizedOp>;
stepCompletionOrder: string[];

/**
* Headers to be sent with any request to Inngest during this execution.
*/
headers: Record<string, string>;
requestedRunStep?: string;
timer?: ServerTiming;
isFailureHandler?: boolean;
Expand All @@ -68,7 +73,6 @@ export class InngestExecution {
protected debug: Debugger;

constructor(protected options: InngestExecutionOptions) {
this.options = options;
this.debug = Debug(debugPrefix).extend(this.options.runId);
}
}
Expand Down
2 changes: 1 addition & 1 deletion packages/inngest/src/components/execution/v0.ts
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ export class V0InngestExecution
});
};

const step = createStepTools(this.options.client, stepHandler);
const step = createStepTools(this.options.client, this, stepHandler);

let fnArg = {
...(this.options.data as { event: EventPayload }),
Expand Down
2 changes: 1 addition & 1 deletion packages/inngest/src/components/execution/v1.ts
Original file line number Diff line number Diff line change
Expand Up @@ -854,7 +854,7 @@ class V1InngestExecution extends InngestExecution implements IInngestExecution {
return promise;
};

return createStepTools(this.options.client, stepHandler);
return createStepTools(this.options.client, this, stepHandler);
}

private getUserFnToRun(): Handler.Any {
Expand Down
2 changes: 2 additions & 0 deletions packages/inngest/src/helpers/consts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,8 @@ export enum headerKeys {
RetryAfter = "retry-after",
InngestServerKind = "x-inngest-server-kind",
InngestExpectedServerKind = "x-inngest-expected-server-kind",
TraceParent = "traceparent",
TraceState = "tracestate",
}

export const defaultInngestApiBaseUrl = "https://api.inngest.com/";
Expand Down
29 changes: 27 additions & 2 deletions packages/inngest/src/test/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import {
} from "@local/components/InngestStepTools";
import {
ExecutionVersion,
IInngestExecution,
InngestExecution,
InngestExecutionOptions,
PREFERRED_EXECUTION_VERSION,
} from "@local/components/execution/InngestExecution";
Expand Down Expand Up @@ -63,9 +65,31 @@ export const createClient = <T extends ConstructorParameters<typeof Inngest>>(
export const testClientId = "__test_client__";

export const getStepTools = (
client: Inngest.Any = createClient({ id: testClientId })
client: Inngest.Any = createClient({ id: testClientId }),
executionOptions: Partial<InngestExecutionOptions> = {}
) => {
const step = createStepTools(client, ({ args, matchOp }) => {
const execution = client
.createFunction({ id: "test" }, { event: "test" }, () => undefined)
["createExecution"]({
version: PREFERRED_EXECUTION_VERSION,
partialOptions: {
data: fromPartial({
event: { name: "foo", data: {} },
}),
runId: "run",
stepState: {},
stepCompletionOrder: [],
isFailureHandler: false,
requestedRunStep: undefined,
timer: new ServerTiming(),
disableImmediateExecution: false,
reqArgs: [],
headers: {},
...executionOptions,
},
}) as IInngestExecution & InngestExecution;

const step = createStepTools(client, execution, ({ args, matchOp }) => {
const stepOptions = getStepOptions(args[0]);
return Promise.resolve(matchOp(stepOptions, ...args.slice(1)));
});
Expand Down Expand Up @@ -105,6 +129,7 @@ export const runFnWithStack = (
timer: new ServerTiming(),
disableImmediateExecution: opts?.disableImmediateExecution,
reqArgs: [],
headers: {},
},
});

Expand Down

0 comments on commit 0703740

Please sign in to comment.