diff --git a/src/pages/blog/durable-execution.mdx b/src/pages/blog/durable-execution.mdx new file mode 100644 index 00000000..d7fb116c --- /dev/null +++ b/src/pages/blog/durable-execution.mdx @@ -0,0 +1,193 @@ +--- +author: 'pws' +title: 'Durable Executions with Coffee Grinders' +description: 'Durable Executions in Clojure' +category: 'clojure' +layout: '../../layouts/BlogPost.astro' +publishedDate: '2023-12-01' +heroImage: 'roots.jpg' +--- + +### Durable Executions + +Tools like [Temporal](https://temporal.io/) and +[Azure Durable Functions](https://learn.microsoft.com/en-us/azure/azure-functions/durable/durable-functions-orchestrations?tabs=csharp-inproc) + offer similar guarantees for distributed processes that database transactions offer for your data. Plus a better dev + experience i.e. no yaml 😉. + +These tools work by saving the execution history and replaying it when a process is restarted. +This requires the workflow to be deterministic between runs. Various methods are provided to manage versioning as +requirements evolve. Bounded history sizes add another concern to think about. + +To me, they provide clear advantages when implementing reliable sagas or schedules but they come with associated costs + such as infrastruture tie in and code evolution complexities. + +What could this concept look like with a little clojure philosophy sprinkled on it? + +### Coffee Grinders + +[Coffee grinders](https://lambdaisland.com/blog/2020-03-29-coffee-grinders-2) are a generalization of + [interceptors](https://lambdaisland.com/episodes/interceptors-concepts) where the process definition + and process state are co-located. + + With "code as data", the definition and the execution do not need to be seperate. + + This togetherness guarantees atomic changes, providing the transactional outbox pattern for free. + + It also makes it possible to pause and resume the process later, for example after a bug fix. + + Here is the basic coffee grinder from the blog post: + +```clojure +(defn inc-counter [ctx] (update ctx :counter inc)) + +(defn run [{:keys [queue] :as ctx}] + (if-let [[step & next-queue] (seq queue)] + (run (step (assoc ctx :queue next-queue))) + ctx)) + +(run {:counter 0 + :queue [inc-counter inc-counter inc-counter]}) + +;;=> {:counter 3 :queue nil} +``` + +### Making it durable + +We can make the process durable by saving the context after each step. + +Here I'm using an atom for simplicity but play along and imagine it is some database. + +I also switch to using clojure's persistent queue because I like to pop things and I think they look cool when pretty printed 🤓. + +```clojure +(def history (atom [])) + +(? (defn inc-counter [ctx] (update ctx :counter inc))) + +(fdat/register [inc-counter]) + +(defn run [{:keys [queue] :as ctx}] + (if-let [step (peek queue)] + (let [result (-> ctx step (update :queue pop))] + (swap! history conj result) + (run result)) + ctx)) + +(run {:counter 0 + :queue (conj PersistentQueue/EMPTY inc-counter inc-counter inc-counter)}) + +(->> @history + (map pp-fns) ; - just making the functions more readable in the output + clojure.pprint/pprint) ; - makes queues look cool +;; => +;; ({:counter 1, :queue <-("inc-counter" "inc-counter")-<} +;; {:counter 2, :queue <-("inc-counter")-<} +;; {:counter 3, :queue <-()-<} +``` + +If your wondering what the [Fdat](https://github.com/helins/fdat.cljc) `?` macro is about, it adds a key to the metadata on `inc-counter`. +I'm using this for now to make the queue more readable but it will come in handy later when we get to serialization. + +### Resuming after a problem + +If a step function returns an error, we won't update the persisted version, which opens up the possiblity for resuming the process after a fix is deployed. + +```clojure +(def latest-version (atom nil)) + +(? (defn bang [ctx] (assoc ctx :error true))) + +(fdat/register [bang]) + +(defn run [{:keys [queue] :as ctx}] + (if-let [step (peek queue)] + (let [{:keys [error] :as result} (-> ctx step (update :queue pop))] + (if error + (println "Alert!") ; <- a cry for help + (do + (reset! latest-version result) + (run result)))) + (dissoc ctx :queue))) + +(run {:counter 0 + :queue (conj PersistentQueue/EMPTY inc-counter inc-counter bang inc-counter)}) + +(->> @latest-version + pp-fns + clojure.pprint/pprint) +;; => Alert! +;; => {:counter 2, :queue <-("bang" "inc-counter")-<} +``` + +We can fix `bang` and restart the process: + +```clojure +(defn bang [ctx] (assoc ctx :error false)) + +(run @latest-version) +;; => Alert! +;; => {:counter 2, :queue <-("bang" "inc-counter")-<} NOPE! still broken +``` + +This retry doesn't work yet because the function on the queue has closed over the old definition. + +However, if the context is round tripped through serialization (using transit here) it does work. + +### Serialization + +> Code is data, this axiom holds true until one has to serialize some functions. + +Fdat isn't a requirement, another option would be to represents steps as vectors of events and their args (as ReFrame does). + +But this is my post and I prefer to stick to good old functions. + +The metadata added by `?` is used as a place holder during serialization to represent the function. +When deserialized the new function definition is picked up which is why the following example now works. + +```clojure +(defn run [{:keys [queue] :as ctx}] + (if-let [step (peek queue)] + (let [{:keys [error] :as result} (-> ctx step (update :queue pop))] + (if error + (println "Alert!") + (do + (reset! latest-version (serialize result)) ; <- change + (run result)))) + (dissoc ctx :queue))) + +(? (defn bang [ctx] (assoc ctx :error false))) ; <- bugfix here +(fdat/register [bang]) + +(run (deserialize @latest-version)) +; => {:counter 3, :error false} Yay! +``` + +There are still some limits to code evolution. +A step function can't be deleted or have it's signature changed without the risk of breaking saved processes. +I think this fits in with clojure philosophy. See Rich Hickey's view on [versioning](https://youtu.be/oyLBGkS5ICk?t=1335) + +But as we have access to the steps in the context we can also change the workflow as needed. For example, skipping the broken step entirely: + +```clojure +(-> @latest-version + deserialize + (update :queue pop) + run) +;; => {:counter 3} +``` + +### Conclusion + +Using Coffee Grinders for durable execution has removed the need for a history log and determinism in the workflow definition. +There are probably other concerns for durable exections that I have not addressed here. Ping me a message if you have any thoughts. + +Further Possiblities: + +- Throw in a scheduler to get durable timers for long running or repeating processes. +- Add a worker queue to distribute the processing. +- Use the interceptor pattern to apply saga compensations when a process fails. +- Use different versions of step functions in various contexts e.g. in the browser or in tests. + + +[gist](https://gist.github.com/peter-wilkins/98a2fe47aa5024b85cda7dbe5ace07f0) \ No newline at end of file