From 31add6bc82cd963982fa5695c697cd1f10f86bdf Mon Sep 17 00:00:00 2001 From: KiKoS0 <22998716+KiKoS0@users.noreply.github.com> Date: Thu, 12 Sep 2024 00:55:21 +0100 Subject: [PATCH] Implement `onFailure` handler. (#63) * Implement an interface for functions that need to handle failures. I took a more OOP approach for this feature which is slightly different than the other SDKs but we can discuss this further in the PR review. The `WithFailureHandler` interface has a single method `onFailure` that is called when the function fails. An example of how to use this: ```kotlin class MyFunction : InngestFunction(), WithFailureHandler { override fun execute(ctx: FunctionContext, step: Step): Any? { // Do something } override fun onFailure(ctx: FunctionContext, step: Step): Any? { // Handle failure } } ``` * Add a function with an onFailure handler and a test for it * Add a Kotlin function example that has a failure handler * Make name in InngestFunctionConfigBuilder internal * Run formatting * Switch to an overridable `onFailure` method in `InngestFunction` This removes the `WithFailureHandler` interface and instead provides an overridable `onFailure` method in `InngestFunction` that can be used to define a function to be called when the function fails. It's used in the same way but without the need to implement an interface ```kotlin class MyFunction : InngestFunction() { override fun execute(ctx: FunctionContext, step: Step): Any? { // Do something } override fun onFailure(ctx: FunctionContext, step: Step): Any? { // Handle failure } } ``` * Address PR comments related to code clarity --- .../springbootdemo/DevServerComponent.java | 8 +++ .../springbootdemo/EventsResponse.java | 39 +++++++++++++ .../testfunctions/WithOnFailureFunction.java | 31 ++++++++++ .../springbootdemo/DemoTestConfiguration.java | 1 + .../WithOnFailureIntegrationTest.java | 57 +++++++++++++++++++ .../main/kotlin/com/inngest/testserver/App.kt | 1 + .../inngest/testserver/SlackFailureReport.kt | 34 +++++++++++ inngest/src/main/kotlin/com/inngest/Comm.kt | 13 ++++- .../kotlin/com/inngest/InngestFunction.kt | 42 ++++++++++++-- .../inngest/InngestFunctionConfigBuilder.kt | 12 +++- 10 files changed, 230 insertions(+), 8 deletions(-) create mode 100644 inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/EventsResponse.java create mode 100644 inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/WithOnFailureFunction.java create mode 100644 inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/WithOnFailureIntegrationTest.java create mode 100644 inngest-test-server/src/main/kotlin/com/inngest/testserver/SlackFailureReport.kt diff --git a/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/DevServerComponent.java b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/DevServerComponent.java index d61d8d04..272c0abf 100644 --- a/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/DevServerComponent.java +++ b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/DevServerComponent.java @@ -55,6 +55,14 @@ EventRunsResponse runsByEvent(String eventId) throws Exception { }); } + EventsResponse listEvents() throws Exception { + Request request = new Request.Builder() + .url(String.format("%s/v1/events", baseUrl)) + .build(); + return makeRequest(request, new TypeReference() { + }); + } + RunResponse runById(String eventId, Class outputType) throws Exception { Request request = new Request.Builder() .url(String.format("%s/v1/runs/%S", baseUrl, eventId)) diff --git a/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/EventsResponse.java b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/EventsResponse.java new file mode 100644 index 00000000..7cde30ed --- /dev/null +++ b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/EventsResponse.java @@ -0,0 +1,39 @@ +package com.inngest.springbootdemo; + +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import lombok.Getter; +import lombok.Setter; + +@Getter +@Setter +@JsonIgnoreProperties(ignoreUnknown = true) +class EventsResponse { + EventEntry[] data; +} + +@Getter +@Setter +@JsonIgnoreProperties(ignoreUnknown = true) +class EventEntry { + String id; + String name; + + String internal_id; + + EventEntryData data; +} + +@Getter +@Setter +@JsonIgnoreProperties(ignoreUnknown = true) +class EventEntryData{ + EventData event; +} + +@Getter +@Setter +@JsonIgnoreProperties(ignoreUnknown = true) +class EventData { + String name; +} + diff --git a/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/WithOnFailureFunction.java b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/WithOnFailureFunction.java new file mode 100644 index 00000000..4e9825db --- /dev/null +++ b/inngest-spring-boot-demo/src/main/java/com/inngest/springbootdemo/testfunctions/WithOnFailureFunction.java @@ -0,0 +1,31 @@ +package com.inngest.springbootdemo.testfunctions; + +import com.inngest.*; +import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; + +public class WithOnFailureFunction extends InngestFunction { + @NotNull + @Override + public InngestFunctionConfigBuilder config(InngestFunctionConfigBuilder builder) { + return builder + .id("with-on-failure-function") + .name("With On Failure Function") + .triggerEvent("test/with-on-failure"); + } + + @Override + public String execute(FunctionContext ctx, Step step) { + step.run("fail-step", () -> { + throw new NonRetriableError("something fatally went wrong"); + }, String.class); + + return "Success"; + } + + @Nullable + @Override + public String onFailure(@NotNull FunctionContext ctx, @NotNull Step step) { + return "On Failure Success"; + } +} diff --git a/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/DemoTestConfiguration.java b/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/DemoTestConfiguration.java index cbada3ea..cff6a8c3 100644 --- a/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/DemoTestConfiguration.java +++ b/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/DemoTestConfiguration.java @@ -29,6 +29,7 @@ protected HashMap functions() { addInngestFunction(functions, new IdempotentFunction()); addInngestFunction(functions, new Scale2DObjectFunction()); addInngestFunction(functions, new MultiplyMatrixFunction()); + addInngestFunction(functions, new WithOnFailureFunction()); return functions; } diff --git a/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/WithOnFailureIntegrationTest.java b/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/WithOnFailureIntegrationTest.java new file mode 100644 index 00000000..6a9647d7 --- /dev/null +++ b/inngest-spring-boot-demo/src/test/java/com/inngest/springbootdemo/WithOnFailureIntegrationTest.java @@ -0,0 +1,57 @@ +package com.inngest.springbootdemo; + +import com.inngest.CommHandler; +import com.inngest.Inngest; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.parallel.Execution; +import org.junit.jupiter.api.parallel.ExecutionMode; +import org.springframework.beans.factory.annotation.Autowired; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.Optional; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; + +@IntegrationTest +@Execution(ExecutionMode.CONCURRENT) +class WithOnFailureIntegrationTest { + @Autowired + private DevServerComponent devServer; + + static int sleepTime = 5000; + + @Autowired + private Inngest client; + + @Test + void testWithOnFailureShouldCallOnFailure() throws Exception { + String eventName = "test/with-on-failure"; + String eventId = InngestFunctionTestHelpers.sendEvent(client, eventName).getIds()[0]; + + Thread.sleep(sleepTime); + + // Check that the original function failed + RunEntry run = devServer.runsByEvent(eventId).first(); + LinkedHashMap output = (LinkedHashMap) run.getOutput(); + + assertEquals("Failed", run.getStatus()); + assertNotNull(run.getEnded_at()); + assert output.get("name").contains("NonRetriableError"); + + // Check that the onFailure function was called + Optional event = Arrays.stream(devServer.listEvents().getData()) + .filter(e -> "inngest/function.failed".equals(e.getName()) && eventName.equals(e.getData().getEvent().getName())) + .findFirst(); + + assert event.isPresent(); + + RunEntry onFailureRun = devServer.runsByEvent(event.get().getInternal_id()).first(); + + assertEquals("Completed", onFailureRun.getStatus()); + assertEquals("On Failure Success", (String) onFailureRun.getOutput()); + } +} diff --git a/inngest-test-server/src/main/kotlin/com/inngest/testserver/App.kt b/inngest-test-server/src/main/kotlin/com/inngest/testserver/App.kt index 5e947fd1..d0bb969d 100644 --- a/inngest-test-server/src/main/kotlin/com/inngest/testserver/App.kt +++ b/inngest-test-server/src/main/kotlin/com/inngest/testserver/App.kt @@ -29,6 +29,7 @@ fun Application.module() { TranscodeVideo(), ImageFromPrompt(), PushToSlackChannel(), + SlackFailureReport(), WeeklyCleanup(), ), ) diff --git a/inngest-test-server/src/main/kotlin/com/inngest/testserver/SlackFailureReport.kt b/inngest-test-server/src/main/kotlin/com/inngest/testserver/SlackFailureReport.kt new file mode 100644 index 00000000..e0ed2ef5 --- /dev/null +++ b/inngest-test-server/src/main/kotlin/com/inngest/testserver/SlackFailureReport.kt @@ -0,0 +1,34 @@ +package com.inngest.testserver + +import com.inngest.* + +class SlackFailureReport : InngestFunction() { + override fun config(builder: InngestFunctionConfigBuilder): InngestFunctionConfigBuilder = + builder + .id("always-fail-fn") + .name("Always Fail Function") + .triggerEvent("always-fail-fn") + + override fun execute( + ctx: FunctionContext, + step: Step, + ): String { + step.run("throw exception") { + throw RuntimeException("This function always fails") + "Step result" + } + + return "Success" + } + + override fun onFailure( + ctx: FunctionContext, + step: Step, + ): String { + step.run("send slack message") { + "Sending a message to Slack" + } + + return "onFailure Success" + } +} diff --git a/inngest/src/main/kotlin/com/inngest/Comm.kt b/inngest/src/main/kotlin/com/inngest/Comm.kt index d1d4251b..e27e3cc2 100644 --- a/inngest/src/main/kotlin/com/inngest/Comm.kt +++ b/inngest/src/main/kotlin/com/inngest/Comm.kt @@ -54,6 +54,15 @@ data class CommError( private val stepTerminalStatusCodes = setOf(ResultStatusCode.StepComplete, ResultStatusCode.StepError) +private fun generateFailureFunctions( + functions: Map, + client: Inngest, +): Map = + functions + .mapNotNull { (_, fn) -> + fn.toFailureHandler(client.appId)?.let { it.id()!! to it } + }.toMap() + class CommHandler( functions: Map, val client: Inngest, @@ -61,7 +70,9 @@ class CommHandler( private val framework: SupportedFrameworkName, ) { val headers = Environment.inngestHeaders(framework).plus(client.headers) - private val functions = functions.mapValues { (_, fn) -> fn.toInngestFunction() } + + private val failureFunctions = generateFailureFunctions(functions, client) + private val functions = functions.mapValues { (_, fn) -> fn.toInngestFunction() }.plus(failureFunctions) fun callFunction( functionId: String, diff --git a/inngest/src/main/kotlin/com/inngest/InngestFunction.kt b/inngest/src/main/kotlin/com/inngest/InngestFunction.kt index a64dc6ab..64bb733a 100644 --- a/inngest/src/main/kotlin/com/inngest/InngestFunction.kt +++ b/inngest/src/main/kotlin/com/inngest/InngestFunction.kt @@ -1,5 +1,7 @@ package com.inngest +const val FUNCTION_FAILED = "inngest/function.failed" + abstract class InngestFunction { open fun config(builder: InngestFunctionConfigBuilder): InngestFunctionConfigBuilder = builder @@ -19,9 +21,7 @@ abstract class InngestFunction { return this.config(builder) } - fun id(): String { - return buildConfig().id!! - } + fun id(): String = buildConfig().id!! internal fun toInngestFunction(): InternalInngestFunction { val builder = InngestFunctionConfigBuilder() @@ -29,5 +29,39 @@ abstract class InngestFunction { return InternalInngestFunction(configBuilder, this::execute) } - // TODO - Add toFailureHandler method to generate a second function if configured + internal fun toFailureHandler(appId: String): InternalInngestFunction? { + val onFailureMethod = this.javaClass.getMethod("onFailure", FunctionContext::class.java, Step::class.java) + + // Only generate the failure handler if the onFailure method was overridden + if (onFailureMethod.declaringClass != InngestFunction::class.java) { + val fnConfig = buildConfig() + val fullyQualifiedId = "$appId-${fnConfig.id}" + val fnName = fnConfig.name ?: fnConfig.id + + val configBuilder = + InngestFunctionConfigBuilder() + .id("${fnConfig.id}-failure") + .name("$fnName (failure)") + .triggerEventIf(FUNCTION_FAILED, "event.data.function_id == '$fullyQualifiedId'") + + return InternalInngestFunction(configBuilder, this::onFailure) + } + return null + } + + /** + * Provide a function to be called if your function fails, meaning + * that it ran out of retries and was unable to complete successfully. + * + * This is useful for sending warning notifications or cleaning up + * after a failure and supports all the same functionality as a + * regular handler. + * + * @param ctx The function context including event(s) that triggered the function + * @param step A class with methods to define steps within the function + */ + open fun onFailure( + ctx: FunctionContext, + step: Step, + ): Any? = null } diff --git a/inngest/src/main/kotlin/com/inngest/InngestFunctionConfigBuilder.kt b/inngest/src/main/kotlin/com/inngest/InngestFunctionConfigBuilder.kt index 04b480f6..f4b136ae 100644 --- a/inngest/src/main/kotlin/com/inngest/InngestFunctionConfigBuilder.kt +++ b/inngest/src/main/kotlin/com/inngest/InngestFunctionConfigBuilder.kt @@ -9,7 +9,7 @@ import java.time.Duration // TODO: Throw illegal argument exception class InngestFunctionConfigBuilder { var id: String? = null - private var name: String? = null + internal var name: String? = null private var triggers: MutableList = mutableListOf() private var concurrency: MutableList? = null private var retries = 3 @@ -115,8 +115,14 @@ class InngestFunctionConfigBuilder { scope: ConcurrencyScope? = null, ): InngestFunctionConfigBuilder { when (scope) { - ConcurrencyScope.ENVIRONMENT -> if (key == null) throw InngestInvalidConfigurationException("Concurrency key required with environment scope") - ConcurrencyScope.ACCOUNT -> if (key == null) throw InngestInvalidConfigurationException("Concurrency key required with account scope") + ConcurrencyScope.ENVIRONMENT -> + if (key == null) { + throw InngestInvalidConfigurationException("Concurrency key required with environment scope") + } + ConcurrencyScope.ACCOUNT -> + if (key == null) { + throw InngestInvalidConfigurationException("Concurrency key required with account scope") + } ConcurrencyScope.FUNCTION -> {} null -> {} }