Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

durable execution with coffee grinders #47

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
193 changes: 193 additions & 0 deletions src/pages/blog/durable-execution.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
---
author: 'pws'
title: 'Durable Executions with Coffee Grinders'
description: 'Durable Executions in Clojure'
category: 'clojure'
layout: '../../layouts/BlogPost.astro'
publishedDate: '2023-12-01'
heroImage: 'roots.jpg'
---

### Durable Executions

Tools like [Temporal](https://temporal.io/) and
[Azure Durable Functions](https://learn.microsoft.com/en-us/azure/azure-functions/durable/durable-functions-orchestrations?tabs=csharp-inproc)
offer similar guarantees for distributed processes that database transactions offer for your data. Plus a better dev
experience i.e. no yaml 😉.

These tools work by saving the execution history and replaying it when a process is restarted.
This requires the workflow to be deterministic between runs. Various methods are provided to manage versioning as
requirements evolve. Bounded history sizes add another concern to think about.

To me, they provide clear advantages when implementing reliable sagas or schedules but they come with associated costs
such as infrastruture tie in and code evolution complexities.

What could this concept look like with a little clojure philosophy sprinkled on it?

### Coffee Grinders

[Coffee grinders](https://lambdaisland.com/blog/2020-03-29-coffee-grinders-2) are a generalization of
[interceptors](https://lambdaisland.com/episodes/interceptors-concepts) where the process definition
and process state are co-located.

With "code as data", the definition and the execution do not need to be seperate.

This togetherness guarantees atomic changes, providing the transactional outbox pattern for free.

It also makes it possible to pause and resume the process later, for example after a bug fix.

Here is the basic coffee grinder from the blog post:

```clojure
(defn inc-counter [ctx] (update ctx :counter inc))

(defn run [{:keys [queue] :as ctx}]
(if-let [[step & next-queue] (seq queue)]
(run (step (assoc ctx :queue next-queue)))
ctx))

(run {:counter 0
:queue [inc-counter inc-counter inc-counter]})

;;=> {:counter 3 :queue nil}
```

### Making it durable

We can make the process durable by saving the context after each step.

Here I'm using an atom for simplicity but play along and imagine it is some database.

I also switch to using clojure's persistent queue because I like to pop things and I think they look cool when pretty printed 🤓.

```clojure
(def history (atom []))

(? (defn inc-counter [ctx] (update ctx :counter inc)))

(fdat/register [inc-counter])

(defn run [{:keys [queue] :as ctx}]
(if-let [step (peek queue)]
(let [result (-> ctx step (update :queue pop))]
(swap! history conj result)
(run result))
ctx))

(run {:counter 0
:queue (conj PersistentQueue/EMPTY inc-counter inc-counter inc-counter)})

(->> @history
(map pp-fns) ; - just making the functions more readable in the output
clojure.pprint/pprint) ; - makes queues look cool
;; =>
;; ({:counter 1, :queue <-("inc-counter" "inc-counter")-<}
;; {:counter 2, :queue <-("inc-counter")-<}
;; {:counter 3, :queue <-()-<}
```

If your wondering what the [Fdat](https://github.com/helins/fdat.cljc) `?` macro is about, it adds a key to the metadata on `inc-counter`.
I'm using this for now to make the queue more readable but it will come in handy later when we get to serialization.

### Resuming after a problem

If a step function returns an error, we won't update the persisted version, which opens up the possiblity for resuming the process after a fix is deployed.

```clojure
(def latest-version (atom nil))

(? (defn bang [ctx] (assoc ctx :error true)))

(fdat/register [bang])

(defn run [{:keys [queue] :as ctx}]
(if-let [step (peek queue)]
(let [{:keys [error] :as result} (-> ctx step (update :queue pop))]
(if error
(println "Alert!") ; <- a cry for help
(do
(reset! latest-version result)
(run result))))
(dissoc ctx :queue)))

(run {:counter 0
:queue (conj PersistentQueue/EMPTY inc-counter inc-counter bang inc-counter)})

(->> @latest-version
pp-fns
clojure.pprint/pprint)
;; => Alert!
;; => {:counter 2, :queue <-("bang" "inc-counter")-<}
```

We can fix `bang` and restart the process:

```clojure
(defn bang [ctx] (assoc ctx :error false))

(run @latest-version)
;; => Alert!
;; => {:counter 2, :queue <-("bang" "inc-counter")-<} NOPE! still broken
```

This retry doesn't work yet because the function on the queue has closed over the old definition.

However, if the context is round tripped through serialization (using transit here) it does work.

### Serialization

> Code is data, this axiom holds true until one has to serialize some functions.

Fdat isn't a requirement, another option would be to represents steps as vectors of events and their args (as ReFrame does).

But this is my post and I prefer to stick to good old functions.

The metadata added by `?` is used as a place holder during serialization to represent the function.
When deserialized the new function definition is picked up which is why the following example now works.

```clojure
(defn run [{:keys [queue] :as ctx}]
(if-let [step (peek queue)]
(let [{:keys [error] :as result} (-> ctx step (update :queue pop))]
(if error
(println "Alert!")
(do
(reset! latest-version (serialize result)) ; <- change
(run result))))
(dissoc ctx :queue)))

(? (defn bang [ctx] (assoc ctx :error false))) ; <- bugfix here
(fdat/register [bang])

(run (deserialize @latest-version))
; => {:counter 3, :error false} Yay!
```

There are still some limits to code evolution.
A step function can't be deleted or have it's signature changed without the risk of breaking saved processes.
I think this fits in with clojure philosophy. See Rich Hickey's view on [versioning](https://youtu.be/oyLBGkS5ICk?t=1335)

But as we have access to the steps in the context we can also change the workflow as needed. For example, skipping the broken step entirely:

```clojure
(-> @latest-version
deserialize
(update :queue pop)
run)
;; => {:counter 3}
```

### Conclusion

Using Coffee Grinders for durable execution has removed the need for a history log and determinism in the workflow definition.
There are probably other concerns for durable exections that I have not addressed here. Ping me a message if you have any thoughts.

Further Possiblities:

- Throw in a scheduler to get durable timers for long running or repeating processes.
- Add a worker queue to distribute the processing.
- Use the interceptor pattern to apply saga compensations when a process fails.
- Use different versions of step functions in various contexts e.g. in the browser or in tests.


[gist](https://gist.github.com/peter-wilkins/98a2fe47aa5024b85cda7dbe5ace07f0)