-
Notifications
You must be signed in to change notification settings - Fork 7
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
680a601
commit 67aaabc
Showing
1 changed file
with
193 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,193 @@ | ||
--- | ||
author: 'pws' | ||
title: 'Durable Executions with Coffee Grinders' | ||
description: 'Durable Executions in Clojure' | ||
category: 'clojure' | ||
layout: '../../layouts/BlogPost.astro' | ||
publishedDate: '2023-12-01' | ||
heroImage: 'roots.jpg' | ||
--- | ||
|
||
### Durable Executions | ||
|
||
Tools like [Temporal](https://temporal.io/) and | ||
[Azure Durable Functions](https://learn.microsoft.com/en-us/azure/azure-functions/durable/durable-functions-orchestrations?tabs=csharp-inproc) | ||
offer similar guarantees for distributed processes that database transactions offer for your data. Plus a better dev | ||
experience i.e. no yaml 😉. | ||
|
||
These tools work by saving the execution history and replaying it when a process is restarted. | ||
This requires the workflow to be deterministic between runs. Various methods are provided to manage versioning as | ||
requirements evolve. Bounded history sizes add another concern to think about. | ||
|
||
To me, they provide clear advantages when implementing reliable sagas or schedules but they come with associated costs | ||
such as infrastruture tie in and code evolution complexities. | ||
|
||
What could this concept look like with a little clojure philosophy sprinkled on it? | ||
|
||
### Coffee Grinders | ||
|
||
[Coffee grinders](https://lambdaisland.com/blog/2020-03-29-coffee-grinders-2) are a generalization of | ||
[interceptors](https://lambdaisland.com/episodes/interceptors-concepts) where the process definition | ||
and process state are co-located. | ||
|
||
With "code as data", the definition and the execution do not need to be seperate. | ||
|
||
This togetherness guarantees atomic changes, providing the transactional outbox pattern for free. | ||
|
||
It also makes it possible to pause and resume the process later, for example after a bug fix. | ||
|
||
Here is the basic coffee grinder from the blog post: | ||
|
||
```clojure | ||
(defn inc-counter [ctx] (update ctx :counter inc)) | ||
|
||
(defn run [{:keys [queue] :as ctx}] | ||
(if-let [[step & next-queue] (seq queue)] | ||
(run (step (assoc ctx :queue next-queue))) | ||
ctx)) | ||
|
||
(run {:counter 0 | ||
:queue [inc-counter inc-counter inc-counter]}) | ||
|
||
;;=> {:counter 3 :queue nil} | ||
``` | ||
|
||
### Making it durable | ||
|
||
We can make the process durable by saving the context after each step. | ||
|
||
Here I'm using an atom for simplicity but play along and imagine it is some database. | ||
|
||
I also switch to using clojure's persistent queue because I like to pop things and I think they look cool when pretty printed 🤓. | ||
|
||
```clojure | ||
(def history (atom [])) | ||
|
||
(? (defn inc-counter [ctx] (update ctx :counter inc))) | ||
|
||
(fdat/register [inc-counter]) | ||
|
||
(defn run [{:keys [queue] :as ctx}] | ||
(if-let [step (peek queue)] | ||
(let [result (-> ctx step (update :queue pop))] | ||
(swap! history conj result) | ||
(run result)) | ||
ctx)) | ||
|
||
(run {:counter 0 | ||
:queue (conj PersistentQueue/EMPTY inc-counter inc-counter inc-counter)}) | ||
|
||
(->> @history | ||
(map pp-fns) ; - just making the functions more readable in the output | ||
clojure.pprint/pprint) ; - makes queues look cool | ||
;; => | ||
;; ({:counter 1, :queue <-("inc-counter" "inc-counter")-<} | ||
;; {:counter 2, :queue <-("inc-counter")-<} | ||
;; {:counter 3, :queue <-()-<} | ||
``` | ||
|
||
If your wondering what the [Fdat](https://github.com/helins/fdat.cljc) `?` macro is about, it adds a key to the metadata on `inc-counter`. | ||
I'm using this for now to make the queue more readable but it will come in handy later when we get to serialization. | ||
|
||
### Resuming after a problem | ||
|
||
If a step function returns an error, we won't update the persisted version, which opens up the possiblity for resuming the process after a fix is deployed. | ||
|
||
```clojure | ||
(def latest-version (atom nil)) | ||
|
||
(? (defn bang [ctx] (assoc ctx :error true))) | ||
|
||
(fdat/register [bang]) | ||
|
||
(defn run [{:keys [queue] :as ctx}] | ||
(if-let [step (peek queue)] | ||
(let [{:keys [error] :as result} (-> ctx step (update :queue pop))] | ||
(if error | ||
(println "Alert!") ; <- a cry for help | ||
(do | ||
(reset! latest-version result) | ||
(run result)))) | ||
(dissoc ctx :queue))) | ||
|
||
(run {:counter 0 | ||
:queue (conj PersistentQueue/EMPTY inc-counter inc-counter bang inc-counter)}) | ||
|
||
(->> @latest-version | ||
pp-fns | ||
clojure.pprint/pprint) | ||
;; => Alert! | ||
;; => {:counter 2, :queue <-("bang" "inc-counter")-<} | ||
``` | ||
|
||
We can fix `bang` and restart the process: | ||
|
||
```clojure | ||
(defn bang [ctx] (assoc ctx :error false)) | ||
|
||
(run @latest-version) | ||
;; => Alert! | ||
;; => {:counter 2, :queue <-("bang" "inc-counter")-<} NOPE! still broken | ||
``` | ||
|
||
This retry doesn't work yet because the function on the queue has closed over the old definition. | ||
|
||
However, if the context is round tripped through serialization (using transit here) it does work. | ||
|
||
### Serialization | ||
|
||
> Code is data, this axiom holds true until one has to serialize some functions. | ||
Fdat isn't a requirement, another option would be to represents steps as vectors of events and their args (as ReFrame does). | ||
|
||
But this is my post and I prefer to stick to good old functions. | ||
|
||
The metadata added by `?` is used as a place holder during serialization to represent the function. | ||
When deserialized the new function definition is picked up which is why the following example now works. | ||
|
||
```clojure | ||
(defn run [{:keys [queue] :as ctx}] | ||
(if-let [step (peek queue)] | ||
(let [{:keys [error] :as result} (-> ctx step (update :queue pop))] | ||
(if error | ||
(println "Alert!") | ||
(do | ||
(reset! latest-version (serialize result)) ; <- change | ||
(run result)))) | ||
(dissoc ctx :queue))) | ||
|
||
(? (defn bang [ctx] (assoc ctx :error false))) ; <- bugfix here | ||
(fdat/register [bang]) | ||
|
||
(run (deserialize @latest-version)) | ||
; => {:counter 3, :error false} Yay! | ||
``` | ||
|
||
There are still some limits to code evolution. | ||
A step function can't be deleted or have it's signature changed without the risk of breaking saved processes. | ||
I think this fits in with clojure philosophy. See Rich Hickey's view on [versioning](https://youtu.be/oyLBGkS5ICk?t=1335) | ||
|
||
But as we have access to the steps in the context we can also change the workflow as needed. For example, skipping the broken step entirely: | ||
|
||
```clojure | ||
(-> @latest-version | ||
deserialize | ||
(update :queue pop) | ||
run) | ||
;; => {:counter 3} | ||
``` | ||
|
||
### Conclusion | ||
|
||
Using Coffee Grinders for durable execution has removed the need for a history log and determinism in the workflow definition. | ||
There are probably other concerns for durable exections that I have not addressed here. Ping me a message if you have any thoughts. | ||
|
||
Further Possiblities: | ||
|
||
- Throw in a scheduler to get durable timers for long running or repeating processes. | ||
- Add a worker queue to distribute the processing. | ||
- Use the interceptor pattern to apply saga compensations when a process fails. | ||
- Use different versions of step functions in various contexts e.g. in the browser or in tests. | ||
|
||
|
||
[gist](https://gist.github.com/peter-wilkins/98a2fe47aa5024b85cda7dbe5ace07f0) |