From 417e8017190c0f28f3bb896eb33ae7857aa2a34b Mon Sep 17 00:00:00 2001 From: Darwin D Wu Date: Tue, 1 Oct 2024 21:02:17 -0700 Subject: [PATCH 01/39] move Inngest into client module --- inngest/examples/axum/main.rs | 6 +- inngest/examples/send_events/main.rs | 2 +- inngest/src/client.rs | 119 +++++++++++++++++++++++++++ inngest/src/config.rs | 4 +- inngest/src/handler.rs | 4 +- inngest/src/lib.rs | 86 +------------------ inngest/src/serve/axum.rs | 3 +- 7 files changed, 129 insertions(+), 95 deletions(-) create mode 100644 inngest/src/client.rs diff --git a/inngest/examples/axum/main.rs b/inngest/examples/axum/main.rs index 590be11..d664e3e 100644 --- a/inngest/examples/axum/main.rs +++ b/inngest/examples/axum/main.rs @@ -1,16 +1,16 @@ use axum::{ routing::{get, put}, - Json, Router, + Router, }; use inngest::{ + client::Inngest, event::Event, function::{create_function, FunctionOps, Input, ServableFn, Trigger}, handler::Handler, into_dev_result, - result::{DevError, Error, InngestResult}, + result::{DevError, Error}, serve, step_tool::{InvokeFunctionOpts, Step as StepTool, WaitForEventOpts}, - Inngest, }; use serde::{Deserialize, Serialize}; use serde_json::{json, Value}; diff --git a/inngest/examples/send_events/main.rs b/inngest/examples/send_events/main.rs index 12c9d6a..51c45f8 100644 --- a/inngest/examples/send_events/main.rs +++ b/inngest/examples/send_events/main.rs @@ -1,4 +1,4 @@ -use inngest::{event::Event, Inngest}; +use inngest::{event::Event, client::Inngest}; use serde::{Deserialize, Serialize}; #[derive(Serialize, Deserialize, Clone, Debug)] diff --git a/inngest/src/client.rs b/inngest/src/client.rs new file mode 100644 index 0000000..d76cfde --- /dev/null +++ b/inngest/src/client.rs @@ -0,0 +1,119 @@ +use crate::{ + config::Config, + event::{Event, InngestEvent}, + result::DevError, +}; + +const EVENT_API_ORIGIN: &str = "https://inn.gs"; + +#[derive(Clone)] +pub struct Inngest { + pub app_id: String, + api_origin: Option, + event_api_origin: Option, + event_key: Option, + env: Option, + dev: Option, + http: reqwest::Client, +} + +impl Inngest { + pub fn new(app_id: &str) -> Self { + // initialize variable values here using environment variables + let api_origin = Config::api_origin(); + let event_api_origin = Config::event_api_origin(); + let event_key = Config::event_key(); + let env = Config::env(); + // TODO: allow updating dev server url here + // https://www.inngest.com/docs/sdk/environment-variables#inngest-dev + let dev = Config::dev().map(|v| v); + + Inngest { + app_id: app_id.to_string(), + api_origin, + event_api_origin, + event_key, + env, + dev, + http: reqwest::Client::new(), + } + } + + pub fn api_origin(mut self, url: &str) -> Self { + self.api_origin = Some(url.to_string()); + self + } + + pub fn event_api_origin(mut self, url: &str) -> Self { + self.event_api_origin = Some(url.to_string()); + self + } + + pub fn event_key(mut self, key: &str) -> Self { + self.event_key = Some(key.to_string()); + self + } + + pub fn env(mut self, e: &str) -> Self { + self.env = Some(e.to_string()); + self + } + + pub fn dev(mut self, dev: &str) -> Self { + self.dev = Some(dev.to_string()); + self + } + + // TODO: make the result return something properly + pub async fn send_event(&self, evt: &Event) -> Result<(), DevError> { + self.http + .post(format!( + "{}/e/{}", + self.evt_api_origin(), + self.evt_api_key() + )) + .json(&evt) + .send() + .await + .map(|_| ()) + .map_err(|err| DevError::Basic(format!("{}", err))) + } + + // TODO: make the result return something properly + pub async fn send_events(&self, evts: &[&Event]) -> Result<(), DevError> { + self.http + .post(format!( + "{}/e/{}", + self.evt_api_origin(), + self.evt_api_key() + )) + .json(&evts) + .send() + .await + .map(|_| ()) + .map_err(|err| DevError::Basic(format!("{}", err))) + } + + fn evt_api_origin(&self) -> String { + if let Some(dev) = self.dev.clone() { + return dev; + } + + if let Some(endpoint) = self.event_api_origin.clone() { + return endpoint; + } + + EVENT_API_ORIGIN.to_string() + } + + fn evt_api_key(&self) -> String { + if let Some(_) = self.dev.clone() { + return "test".to_string(); + } + + match self.event_key.clone() { + Some(key) => key, + None => "test".to_string(), + } + } +} diff --git a/inngest/src/config.rs b/inngest/src/config.rs index 451aacd..d165a35 100644 --- a/inngest/src/config.rs +++ b/inngest/src/config.rs @@ -33,8 +33,8 @@ impl Config { Self::read_env_str(INNGEST_ENV) } - pub fn is_dev() -> Option { - Self::read_env_bool(INNGEST_DEV) + pub fn dev() -> Option { + Self::read_env_str(INNGEST_DEV) } pub fn signing_key() -> Option { diff --git a/inngest/src/handler.rs b/inngest/src/handler.rs index e35c94e..394445b 100644 --- a/inngest/src/handler.rs +++ b/inngest/src/handler.rs @@ -6,13 +6,13 @@ use serde_json::{json, Value}; use crate::{ basic_error, + client::Inngest, config::Config, event::Event, function::{Function, Input, InputCtx, ServableFn, Step, StepRetry, StepRuntime}, - result::{DevError, Error, FlowControlError, FlowControlVariant, SdkResponse}, + result::{Error, FlowControlVariant, SdkResponse}, sdk::Request, step_tool::Step as StepTool, - Inngest, }; pub struct Handler { diff --git a/inngest/src/lib.rs b/inngest/src/lib.rs index 2358000..367a699 100644 --- a/inngest/src/lib.rs +++ b/inngest/src/lib.rs @@ -1,3 +1,4 @@ +pub mod client; pub(crate) mod config; pub mod event; pub mod function; @@ -7,88 +8,3 @@ pub mod sdk; pub mod serve; pub mod step_tool; pub(crate) mod utils; - -use config::Config; -use event::{Event, InngestEvent}; -use result::DevError; - -#[derive(Clone)] -pub struct Inngest { - app_id: String, - api_origin: Option, - event_api_origin: Option, - event_key: Option, - env: Option, - is_dev: Option, - http: reqwest::Client, -} - -impl Inngest { - pub fn new(app_id: &str) -> Self { - // initialize variable values here using environment variables - let api_origin = Config::api_origin(); - let event_api_origin = Config::event_api_origin(); - let event_key = Config::event_key(); - let env = Config::env(); - // TODO: allow updating dev server url here - // https://www.inngest.com/docs/sdk/environment-variables#inngest-dev - let is_dev = Config::is_dev(); - - Inngest { - app_id: app_id.to_string(), - api_origin, - event_api_origin, - event_key, - env, - is_dev, - http: reqwest::Client::new(), - } - } - - pub fn api_origin(mut self, url: &str) -> Self { - self.api_origin = Some(url.to_string()); - self - } - - pub fn event_api_origin(mut self, url: &str) -> Self { - self.event_api_origin = Some(url.to_string()); - self - } - - pub fn event_key(mut self, key: &str) -> Self { - self.event_key = Some(key.to_string()); - self - } - - pub fn env(mut self, e: &str) -> Self { - self.env = Some(e.to_string()); - self - } - - pub fn is_dev(mut self, dev: bool) -> Self { - self.is_dev = Some(dev); - self - } - - // TODO: make the result return something properly - pub async fn send_event(&self, evt: &Event) -> Result<(), DevError> { - self.http - .post("http://127.0.0.1:8288/e/test") - .json(&evt) - .send() - .await - .map(|_| ()) - .map_err(|err| DevError::Basic(format!("{}", err))) - } - - // TODO: make the result return something properly - pub async fn send_events(&self, evts: &[&Event]) -> Result<(), DevError> { - self.http - .post("http://127.0.0.1:8288/e/test") - .json(&evts) - .send() - .await - .map(|_| ()) - .map_err(|err| DevError::Basic(format!("{}", err))) - } -} diff --git a/inngest/src/serve/axum.rs b/inngest/src/serve/axum.rs index 89062ec..77b0998 100644 --- a/inngest/src/serve/axum.rs +++ b/inngest/src/serve/axum.rs @@ -1,7 +1,6 @@ use crate::{ - basic_error, handler::{Handler, RunQueryParams}, - result::{DevError, Error, SdkResponse}, + result::{Error, SdkResponse}, }; use axum::{ From d6f0b30fb82e55fffcf9bf064ffa65670ab68b39 Mon Sep 17 00:00:00 2001 From: Darwin D Wu Date: Tue, 1 Oct 2024 21:10:06 -0700 Subject: [PATCH 02/39] set dev server url using INNGEST_DEVc --- inngest/Cargo.toml | 1 + inngest/src/client.rs | 17 +++++++++++++---- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/inngest/Cargo.toml b/inngest/Cargo.toml index 00b6eb4..353c21b 100644 --- a/inngest/Cargo.toml +++ b/inngest/Cargo.toml @@ -25,3 +25,4 @@ futures = "0.3.30" # used for step encoding in SDK sha1 = "0.10.6" base16 = "0.2.1" +url = "2.5.2" diff --git a/inngest/src/client.rs b/inngest/src/client.rs index d76cfde..0b11ed9 100644 --- a/inngest/src/client.rs +++ b/inngest/src/client.rs @@ -1,9 +1,12 @@ +use url::Url; + use crate::{ config::Config, event::{Event, InngestEvent}, result::DevError, }; +const EVENT_API_ORIGIN_DEV: &str = "http://127.0.0.1:8288"; const EVENT_API_ORIGIN: &str = "https://inn.gs"; #[derive(Clone)] @@ -24,9 +27,11 @@ impl Inngest { let event_api_origin = Config::event_api_origin(); let event_key = Config::event_key(); let env = Config::env(); - // TODO: allow updating dev server url here - // https://www.inngest.com/docs/sdk/environment-variables#inngest-dev - let dev = Config::dev().map(|v| v); + // if the value is a URL, use it. otherwise set a default URL + let dev = Config::dev().map(|v| match Url::parse(&v) { + Ok(val) => val.to_string(), + Err(_) => EVENT_API_ORIGIN_DEV.to_string() + }); Inngest { app_id: app_id.to_string(), @@ -60,7 +65,11 @@ impl Inngest { } pub fn dev(mut self, dev: &str) -> Self { - self.dev = Some(dev.to_string()); + let url = match Url::parse(dev) { + Ok(val) => Some(val.to_string()), + Err(_) => Some(EVENT_API_ORIGIN_DEV.to_string()) + }; + self.dev = url; self } From 1d3a9b7ec62d3a4b0a4c3c7b9772fe985459f09c Mon Sep 17 00:00:00 2001 From: Darwin D Wu Date: Tue, 1 Oct 2024 21:20:56 -0700 Subject: [PATCH 03/39] add internal function to API_ORIGIN --- inngest/src/client.rs | 25 +++++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/inngest/src/client.rs b/inngest/src/client.rs index 0b11ed9..f619743 100644 --- a/inngest/src/client.rs +++ b/inngest/src/client.rs @@ -8,6 +8,7 @@ use crate::{ const EVENT_API_ORIGIN_DEV: &str = "http://127.0.0.1:8288"; const EVENT_API_ORIGIN: &str = "https://inn.gs"; +const API_ORIGIN: &str = "https://api.inngest.com"; #[derive(Clone)] pub struct Inngest { @@ -78,8 +79,8 @@ impl Inngest { self.http .post(format!( "{}/e/{}", - self.evt_api_origin(), - self.evt_api_key() + self.inngest_evt_api_origin(), + self.inngest_evt_api_key() )) .json(&evt) .send() @@ -93,8 +94,8 @@ impl Inngest { self.http .post(format!( "{}/e/{}", - self.evt_api_origin(), - self.evt_api_key() + self.inngest_evt_api_origin(), + self.inngest_evt_api_key() )) .json(&evts) .send() @@ -103,7 +104,19 @@ impl Inngest { .map_err(|err| DevError::Basic(format!("{}", err))) } - fn evt_api_origin(&self) -> String { + pub(crate) fn inngest_api_origin(&self) -> String { + if let Some(dev) = self.dev.clone() { + return dev; + } + + if let Some(endpoint) = self.api_origin.clone() { + return endpoint; + } + + API_ORIGIN.to_string() + } + + fn inngest_evt_api_origin(&self) -> String { if let Some(dev) = self.dev.clone() { return dev; } @@ -115,7 +128,7 @@ impl Inngest { EVENT_API_ORIGIN.to_string() } - fn evt_api_key(&self) -> String { + fn inngest_evt_api_key(&self) -> String { if let Some(_) = self.dev.clone() { return "test".to_string(); } From 26ec18e397290a04ea24d1afd7803d8b3544d1c3 Mon Sep 17 00:00:00 2001 From: Darwin D Wu Date: Tue, 1 Oct 2024 21:25:49 -0700 Subject: [PATCH 04/39] changing const name for dev api origin --- inngest/src/client.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/inngest/src/client.rs b/inngest/src/client.rs index f619743..31e3798 100644 --- a/inngest/src/client.rs +++ b/inngest/src/client.rs @@ -6,7 +6,7 @@ use crate::{ result::DevError, }; -const EVENT_API_ORIGIN_DEV: &str = "http://127.0.0.1:8288"; +const API_ORIGIN_DEV: &str = "http://127.0.0.1:8288"; const EVENT_API_ORIGIN: &str = "https://inn.gs"; const API_ORIGIN: &str = "https://api.inngest.com"; @@ -31,7 +31,7 @@ impl Inngest { // if the value is a URL, use it. otherwise set a default URL let dev = Config::dev().map(|v| match Url::parse(&v) { Ok(val) => val.to_string(), - Err(_) => EVENT_API_ORIGIN_DEV.to_string() + Err(_) => API_ORIGIN_DEV.to_string() }); Inngest { @@ -68,7 +68,7 @@ impl Inngest { pub fn dev(mut self, dev: &str) -> Self { let url = match Url::parse(dev) { Ok(val) => Some(val.to_string()), - Err(_) => Some(EVENT_API_ORIGIN_DEV.to_string()) + Err(_) => Some(API_ORIGIN_DEV.to_string()) }; self.dev = url; self From db45ef30a7542dec0e151f5d76b747658dd9f57c Mon Sep 17 00:00:00 2001 From: Darwin D Wu Date: Tue, 1 Oct 2024 22:03:01 -0700 Subject: [PATCH 05/39] add builder for handlerc --- inngest/src/handler.rs | 35 ++++++++++++++++++++++++++++++++++- 1 file changed, 34 insertions(+), 1 deletion(-) diff --git a/inngest/src/handler.rs b/inngest/src/handler.rs index 394445b..b2061ad 100644 --- a/inngest/src/handler.rs +++ b/inngest/src/handler.rs @@ -45,13 +45,46 @@ impl Handler { } } + pub fn signing_key(mut self, key: &str) -> Self { + self.signing_key = Some(key.to_string()); + self + } + + pub fn serve_origin(mut self, origin: &str) -> Self { + self.serve_origin = Some(origin.to_string()); + self + } + + pub fn serve_path(mut self, path: &str) -> Self { + self.serve_path = Some(path.to_string()); + self + } + pub fn register_fn(&mut self, func: ServableFn) { self.funcs.insert(func.slug(), func); } + fn app_serve_origin(&self, _headers: &HashMap) -> String { + if let Some(origin) = self.serve_origin.clone() { + return origin; + } + // if let Some(host) = headers.get("host") { + // return host.to_string(); + // } + + "http://127.0.0.1:3000".to_string() + } + + fn app_serve_path(&self) -> String { + if let Some(path) = self.serve_path.clone() { + return path; + } + "/api/inngest".to_string() + } + pub async fn sync( &self, - _headers: &HashMap, + headers: &HashMap, framework: &str, ) -> Result<(), String> { let functions: Vec = self From ca7b90849e5a7c3386a8eee701f9d70bcb44f6d0 Mon Sep 17 00:00:00 2001 From: Darwin D Wu Date: Tue, 1 Oct 2024 22:27:43 -0700 Subject: [PATCH 06/39] replace hardcoded urls with proper values --- inngest/src/client.rs | 11 ++++++----- inngest/src/config.rs | 2 -- inngest/src/handler.rs | 27 +++++++++++++++++++++------ inngest/src/header.rs | 1 + inngest/src/lib.rs | 1 + 5 files changed, 29 insertions(+), 13 deletions(-) create mode 100644 inngest/src/header.rs diff --git a/inngest/src/client.rs b/inngest/src/client.rs index 31e3798..46e06de 100644 --- a/inngest/src/client.rs +++ b/inngest/src/client.rs @@ -1,9 +1,7 @@ use url::Url; use crate::{ - config::Config, - event::{Event, InngestEvent}, - result::DevError, + config::Config, event::{Event, InngestEvent}, handler::Kind, result::DevError }; const API_ORIGIN_DEV: &str = "http://127.0.0.1:8288"; @@ -104,7 +102,7 @@ impl Inngest { .map_err(|err| DevError::Basic(format!("{}", err))) } - pub(crate) fn inngest_api_origin(&self) -> String { + pub(crate) fn inngest_api_origin(&self, kind: Kind) -> String { if let Some(dev) = self.dev.clone() { return dev; } @@ -113,7 +111,10 @@ impl Inngest { return endpoint; } - API_ORIGIN.to_string() + match kind { + Kind::Dev => API_ORIGIN_DEV.to_string(), + Kind::Cloud => API_ORIGIN.to_string() + } } fn inngest_evt_api_origin(&self) -> String { diff --git a/inngest/src/config.rs b/inngest/src/config.rs index d165a35..af27eb8 100644 --- a/inngest/src/config.rs +++ b/inngest/src/config.rs @@ -12,8 +12,6 @@ const INNGEST_SIGNING_KEY: &str = "INNGEST_SIGNING_KEY"; const INNGEST_SERVE_ORIGIN: &str = "INNGEST_SERVE_ORIGIN"; const INNGEST_SERVE_PATH: &str = "INNGEST_SERVE_PATH"; -// TODO: default values - pub(crate) struct Config {} impl Config { diff --git a/inngest/src/handler.rs b/inngest/src/handler.rs index b2061ad..5c4d289 100644 --- a/inngest/src/handler.rs +++ b/inngest/src/handler.rs @@ -13,6 +13,7 @@ use crate::{ result::{Error, FlowControlVariant, SdkResponse}, sdk::Request, step_tool::Step as StepTool, + header }; pub struct Handler { @@ -87,6 +88,14 @@ impl Handler { headers: &HashMap, framework: &str, ) -> Result<(), String> { + let kind = match headers.get(header::SERVER_KIND) { + Some(val) => match val.as_str() { + "cloud" => Kind::Cloud, + _ => Kind::Dev + }, + None => Kind::Dev + }; + let functions: Vec = self .funcs .iter() @@ -99,8 +108,9 @@ impl Handler { name: "step".to_string(), runtime: StepRuntime { url: format!( - // TODO: fix the URL - "http://127.0.0.1:3000/api/inngest?fnId={}&step=step", + "{}{}?fnId={}&step=step", + self.app_serve_origin(headers), + self.app_serve_path(), f.slug() ), method: "http".to_string(), @@ -122,14 +132,13 @@ impl Handler { app_name: self.inngest.app_id.clone(), framework: framework.to_string(), functions, - // TODO: fix the URL - url: "http://127.0.0.1:3000/api/inngest".to_string(), + url: format!("{}{}", self.app_serve_origin(headers), self.app_serve_path()), ..Default::default() }; reqwest::Client::new() - // TODO: fix the URL - .post("http://127.0.0.1:8288/fn/register") + .post(format!("{}/fn/register", self.inngest.inngest_api_origin(kind))) + // .header("x-inngest-kind", "dev") .json(&req) .send() .await @@ -264,3 +273,9 @@ struct RunRequestCtxStack { current: u32, stack: Vec, } + +#[derive(Clone)] +pub(crate) enum Kind { + Dev, + Cloud +} diff --git a/inngest/src/header.rs b/inngest/src/header.rs new file mode 100644 index 0000000..170001b --- /dev/null +++ b/inngest/src/header.rs @@ -0,0 +1 @@ +pub(crate) const SERVER_KIND: &str = "x-inngest-server-kind"; diff --git a/inngest/src/lib.rs b/inngest/src/lib.rs index 367a699..b4141b9 100644 --- a/inngest/src/lib.rs +++ b/inngest/src/lib.rs @@ -3,6 +3,7 @@ pub(crate) mod config; pub mod event; pub mod function; pub mod handler; +pub(crate) mod header; pub mod result; pub mod sdk; pub mod serve; From 563c67d1e04c6fb06d2ec61c7034571c4e9c7564 Mon Sep 17 00:00:00 2001 From: Darwin D Wu Date: Tue, 1 Oct 2024 22:33:07 -0700 Subject: [PATCH 07/39] add headers --- inngest/src/handler.rs | 2 +- inngest/src/header.rs | 16 +++++++++++++++- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/inngest/src/handler.rs b/inngest/src/handler.rs index 5c4d289..4e07ced 100644 --- a/inngest/src/handler.rs +++ b/inngest/src/handler.rs @@ -88,7 +88,7 @@ impl Handler { headers: &HashMap, framework: &str, ) -> Result<(), String> { - let kind = match headers.get(header::SERVER_KIND) { + let kind = match headers.get(header::INNGEST_SERVER_KIND) { Some(val) => match val.as_str() { "cloud" => Kind::Cloud, _ => Kind::Dev diff --git a/inngest/src/header.rs b/inngest/src/header.rs index 170001b..1d4eb89 100644 --- a/inngest/src/header.rs +++ b/inngest/src/header.rs @@ -1 +1,15 @@ -pub(crate) const SERVER_KIND: &str = "x-inngest-server-kind"; +pub(crate) const CONTENT_TYPE: &str = "content-type"; +pub(crate) const RETRY_AFTER: &str = "retry-after"; +pub(crate) const SERVER_TIMING: &str = "server-timing"; +pub(crate) const USE_AGENT: &str = "user-agent"; + +// Inngest specific ones +pub(crate) const INNGEST_ENV: &str = "x-inngest-env"; +pub(crate) const INNGEST_EXPECTED_SERVER_KIND: &str = "x-inngest-expected-server-kind"; +pub(crate) const INNGEST_FRAMEWORK: &str = "x-inngest-framework"; +pub(crate) const INNGEST_NO_RETRY: &str = "x-inngest-no-retry"; +pub(crate) const INNGEST_REQ_VERSION: &str = "x-inngest-req-version"; +pub(crate) const INNGEST_SDK: &str = "x-inngest-sdk"; +pub(crate) const INNGEST_SERVER_KIND: &str = "x-inngest-server-kind"; +pub(crate) const INNGEST_SIGNATURE: &str = "x-inngest-signature"; +pub(crate) const INNGEST_SYNC_KIND: &str = "x-inngest-sync-kind"; From f8571050e41f04237e49d82cad989c210e76d40e Mon Sep 17 00:00:00 2001 From: Darwin D Wu Date: Tue, 1 Oct 2024 23:38:41 -0700 Subject: [PATCH 08/39] replace header keys with constants --- inngest/src/handler.rs | 1 - inngest/src/result.rs | 13 ++++++++----- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/inngest/src/handler.rs b/inngest/src/handler.rs index 4e07ced..abbd289 100644 --- a/inngest/src/handler.rs +++ b/inngest/src/handler.rs @@ -138,7 +138,6 @@ impl Handler { reqwest::Client::new() .post(format!("{}/fn/register", self.inngest.inngest_api_origin(kind))) - // .header("x-inngest-kind", "dev") .json(&req) .send() .await diff --git a/inngest/src/result.rs b/inngest/src/result.rs index f87b640..0618702 100644 --- a/inngest/src/result.rs +++ b/inngest/src/result.rs @@ -1,13 +1,15 @@ use std::fmt::{Debug, Display}; use axum::{ - http::{header, HeaderMap, HeaderValue, StatusCode}, + http::{HeaderMap, HeaderValue, StatusCode}, response::IntoResponse, Json, }; use serde::Serialize; use serde_json::{json, Value}; +use crate::header; + #[derive(Serialize)] pub struct SdkResponse { pub status: u16, @@ -22,8 +24,9 @@ impl IntoResponse for SdkResponse { header::CONTENT_TYPE, HeaderValue::from_static("application/json"), ); - headers.insert("x-inngest-framework", HeaderValue::from_static("axum")); - headers.insert("x-inngest-sdk", HeaderValue::from_str(&sdk).unwrap()); + headers.insert(header::INNGEST_FRAMEWORK, HeaderValue::from_static("axum")); + headers.insert(header::INNGEST_SDK, HeaderValue::from_str(&sdk).unwrap()); + headers.insert(header::INNGEST_REQ_VERSION, HeaderValue::from_static("1")); match self.status { 200 => (StatusCode::OK, headers, Json(self.body)).into_response(), @@ -129,8 +132,8 @@ impl Drop for FlowControlError { // TODO: also add error! level tracing here println!("Flow control error was not acknowledged"); } else { - panic!("Flow control error was not acknowledged. - This is a developer error. + panic!("Flow control error was not acknowledged. + This is a developer error. You should ensure that you return the flow control error within Inngest funcitons to the caller as soon as they're received."); } } From 02f7784d3cb9caadf2d71ac2cd0112cd12cd2066 Mon Sep 17 00:00:00 2001 From: Darwin D Wu Date: Wed, 2 Oct 2024 00:23:31 -0700 Subject: [PATCH 09/39] add header struct as hashmap wrapper --- inngest/examples/send_events/main.rs | 2 +- inngest/src/client.rs | 13 +++++--- inngest/src/handler.rs | 48 ++++++++++++++++------------ inngest/src/header.rs | 36 +++++++++++++++++++++ inngest/src/lib.rs | 2 +- inngest/src/serve/axum.rs | 16 ++++------ 6 files changed, 80 insertions(+), 37 deletions(-) diff --git a/inngest/examples/send_events/main.rs b/inngest/examples/send_events/main.rs index 51c45f8..1d76b22 100644 --- a/inngest/examples/send_events/main.rs +++ b/inngest/examples/send_events/main.rs @@ -1,4 +1,4 @@ -use inngest::{event::Event, client::Inngest}; +use inngest::{client::Inngest, event::Event}; use serde::{Deserialize, Serialize}; #[derive(Serialize, Deserialize, Clone, Debug)] diff --git a/inngest/src/client.rs b/inngest/src/client.rs index 46e06de..ae9f5a9 100644 --- a/inngest/src/client.rs +++ b/inngest/src/client.rs @@ -1,7 +1,10 @@ use url::Url; use crate::{ - config::Config, event::{Event, InngestEvent}, handler::Kind, result::DevError + config::Config, + event::{Event, InngestEvent}, + handler::Kind, + result::DevError, }; const API_ORIGIN_DEV: &str = "http://127.0.0.1:8288"; @@ -29,7 +32,7 @@ impl Inngest { // if the value is a URL, use it. otherwise set a default URL let dev = Config::dev().map(|v| match Url::parse(&v) { Ok(val) => val.to_string(), - Err(_) => API_ORIGIN_DEV.to_string() + Err(_) => API_ORIGIN_DEV.to_string(), }); Inngest { @@ -65,8 +68,8 @@ impl Inngest { pub fn dev(mut self, dev: &str) -> Self { let url = match Url::parse(dev) { - Ok(val) => Some(val.to_string()), - Err(_) => Some(API_ORIGIN_DEV.to_string()) + Ok(val) => Some(val.to_string()), + Err(_) => Some(API_ORIGIN_DEV.to_string()), }; self.dev = url; self @@ -113,7 +116,7 @@ impl Inngest { match kind { Kind::Dev => API_ORIGIN_DEV.to_string(), - Kind::Cloud => API_ORIGIN.to_string() + Kind::Cloud => API_ORIGIN.to_string(), } } diff --git a/inngest/src/handler.rs b/inngest/src/handler.rs index abbd289..7458082 100644 --- a/inngest/src/handler.rs +++ b/inngest/src/handler.rs @@ -10,10 +10,10 @@ use crate::{ config::Config, event::Event, function::{Function, Input, InputCtx, ServableFn, Step, StepRetry, StepRuntime}, + header::Headers, result::{Error, FlowControlVariant, SdkResponse}, sdk::Request, step_tool::Step as StepTool, - header }; pub struct Handler { @@ -65,7 +65,7 @@ impl Handler { self.funcs.insert(func.slug(), func); } - fn app_serve_origin(&self, _headers: &HashMap) -> String { + fn app_serve_origin(&self, _headers: &Headers) -> String { if let Some(origin) = self.serve_origin.clone() { return origin; } @@ -83,18 +83,8 @@ impl Handler { "/api/inngest".to_string() } - pub async fn sync( - &self, - headers: &HashMap, - framework: &str, - ) -> Result<(), String> { - let kind = match headers.get(header::INNGEST_SERVER_KIND) { - Some(val) => match val.as_str() { - "cloud" => Kind::Cloud, - _ => Kind::Dev - }, - None => Kind::Dev - }; + pub async fn sync(&self, headers: &Headers, framework: &str) -> Result<(), String> { + let kind = headers.server_kind(); let functions: Vec = self .funcs @@ -132,12 +122,19 @@ impl Handler { app_name: self.inngest.app_id.clone(), framework: framework.to_string(), functions, - url: format!("{}{}", self.app_serve_origin(headers), self.app_serve_path()), + url: format!( + "{}{}", + self.app_serve_origin(headers), + self.app_serve_path() + ), ..Default::default() }; reqwest::Client::new() - .post(format!("{}/fn/register", self.inngest.inngest_api_origin(kind))) + .post(format!( + "{}/fn/register", + self.inngest.inngest_api_origin(kind) + )) .json(&req) .send() .await @@ -145,11 +142,22 @@ impl Handler { .map_err(|_err| "error registering".to_string()) } - pub async fn run(&self, query: RunQueryParams, body: &Value) -> Result + pub async fn run( + &self, + headers: &Headers, + query: RunQueryParams, + body: &Value, + ) -> Result where T: for<'de> Deserialize<'de> + Debug, E: Into, { + let sig = headers.signature(); + let kind = headers.server_kind(); + if kind == Kind::Cloud && sig.is_none() { + return Err(basic_error!("no signature provided for SDK in Cloud mode")); + } + let data = match serde_json::from_value::>(body.clone()) { Ok(res) => res, Err(err) => { @@ -273,8 +281,8 @@ struct RunRequestCtxStack { stack: Vec, } -#[derive(Clone)] -pub(crate) enum Kind { +#[derive(Clone, PartialEq, Eq)] +pub enum Kind { Dev, - Cloud + Cloud, } diff --git a/inngest/src/header.rs b/inngest/src/header.rs index 1d4eb89..0329819 100644 --- a/inngest/src/header.rs +++ b/inngest/src/header.rs @@ -1,3 +1,7 @@ +use std::collections::HashMap; + +use crate::handler::Kind; + pub(crate) const CONTENT_TYPE: &str = "content-type"; pub(crate) const RETRY_AFTER: &str = "retry-after"; pub(crate) const SERVER_TIMING: &str = "server-timing"; @@ -13,3 +17,35 @@ pub(crate) const INNGEST_SDK: &str = "x-inngest-sdk"; pub(crate) const INNGEST_SERVER_KIND: &str = "x-inngest-server-kind"; pub(crate) const INNGEST_SIGNATURE: &str = "x-inngest-signature"; pub(crate) const INNGEST_SYNC_KIND: &str = "x-inngest-sync-kind"; + +pub struct Headers(HashMap); + +impl Headers { + pub fn server_kind(&self) -> Kind { + match self.0.get(INNGEST_SERVER_KIND) { + Some(val) => match val.as_str() { + "cloud" => Kind::Cloud, + _ => Kind::Dev, + }, + None => Kind::Dev, + } + } + + pub fn signature(&self) -> Option { + self.0.get(INNGEST_SIGNATURE).map(|v| v.clone()) + } +} + +impl From for Headers { + fn from(hmap: axum::http::HeaderMap) -> Self { + let mut headers = HashMap::new(); + for head in hmap.iter() { + let key = head.0.to_string().to_lowercase(); + if let Ok(v) = head.1.to_str() { + headers.insert(key, v.to_string()); + } + } + + Headers(headers) + } +} diff --git a/inngest/src/lib.rs b/inngest/src/lib.rs index b4141b9..6a99974 100644 --- a/inngest/src/lib.rs +++ b/inngest/src/lib.rs @@ -3,7 +3,7 @@ pub(crate) mod config; pub mod event; pub mod function; pub mod handler; -pub(crate) mod header; +pub mod header; pub mod result; pub mod sdk; pub mod serve; diff --git a/inngest/src/serve/axum.rs b/inngest/src/serve/axum.rs index 77b0998..397a503 100644 --- a/inngest/src/serve/axum.rs +++ b/inngest/src/serve/axum.rs @@ -1,5 +1,6 @@ use crate::{ handler::{Handler, RunQueryParams}, + header::Headers, result::{Error, SdkResponse}, }; @@ -10,7 +11,7 @@ use axum::{ }; use serde::Deserialize; use serde_json::Value; -use std::{collections::HashMap, fmt::Debug, sync::Arc}; +use std::{fmt::Debug, sync::Arc}; // TODO: // provide a macro for simple import into Axum routes @@ -20,18 +21,12 @@ pub async fn register( State(handler): State>>, ) -> Result<(), String> { // convert the http headers into a generic hashmap - let mut headers: HashMap = HashMap::new(); - for head in hmap.iter() { - let key = head.0.to_string().to_lowercase(); - if let Ok(v) = head.1.to_str() { - headers.insert(key, v.to_string()); - } - } - + let headers = Headers::from(hmap); handler.sync(&headers, "axum").await } pub async fn invoke( + hmap: HeaderMap, Query(query): Query, State(handler): State>>, Json(body): Json, @@ -40,5 +35,6 @@ where T: for<'de> Deserialize<'de> + Debug, E: Into, { - handler.run(query, &body).await + let headers = Headers::from(hmap); + handler.run(&headers, query, &body).await } From 10ae438db03daecbe5a7faf50cdc122eadcbbc7d Mon Sep 17 00:00:00 2001 From: Darwin D Wu Date: Wed, 2 Oct 2024 00:36:25 -0700 Subject: [PATCH 10/39] add skeleton for signature verification --- inngest/src/handler.rs | 15 +++++++++++++++ inngest/src/lib.rs | 1 + inngest/src/signature.rs | 24 ++++++++++++++++++++++++ 3 files changed, 40 insertions(+) create mode 100644 inngest/src/signature.rs diff --git a/inngest/src/handler.rs b/inngest/src/handler.rs index 7458082..52a88af 100644 --- a/inngest/src/handler.rs +++ b/inngest/src/handler.rs @@ -13,6 +13,7 @@ use crate::{ header::Headers, result::{Error, FlowControlVariant, SdkResponse}, sdk::Request, + signature::Signature, step_tool::Step as StepTool, }; @@ -170,6 +171,20 @@ impl Handler { // TODO: retrieve data from API on flag if data.use_api {} + // Verify the signature if provided + if let Some(sig) = sig.clone() { + let Some(key) = self.signing_key.clone() else { + return Err(basic_error!( + "no signing key available for verifying request signature" + )); + }; + + let signature = Signature::new(&sig, &key, &body); + if let Err(err) = signature.verify() { + return Err(err); + } + } + // find the specified function let Some(func) = self.funcs.get(&query.fn_id) else { return Err(basic_error!( diff --git a/inngest/src/lib.rs b/inngest/src/lib.rs index 6a99974..3996b1c 100644 --- a/inngest/src/lib.rs +++ b/inngest/src/lib.rs @@ -7,5 +7,6 @@ pub mod header; pub mod result; pub mod sdk; pub mod serve; +pub mod signature; pub mod step_tool; pub(crate) mod utils; diff --git a/inngest/src/signature.rs b/inngest/src/signature.rs new file mode 100644 index 0000000..6ef1dcb --- /dev/null +++ b/inngest/src/signature.rs @@ -0,0 +1,24 @@ +use serde_json::Value; + +use crate::result::Error; + +pub struct Signature { + sig: String, + key: String, + body: Value, +} + +impl Signature { + pub fn new(sig: &str, key: &str, body: &Value) -> Self { + Signature { + sig: sig.to_string(), + key: key.to_string(), + body: body.clone(), + } + } + + // TODO: implement signature validation + pub fn verify(&self) -> Result<(), Error> { + Ok(()) + } +} From 4dbab805bc83419d31a4a83443d5713f0405cd10 Mon Sep 17 00:00:00 2001 From: Darwin D Wu Date: Fri, 4 Oct 2024 21:53:33 -0700 Subject: [PATCH 11/39] add tests samples --- inngest/src/handler.rs | 2 +- inngest/src/signature.rs | 88 +++++++++++++++++++++++++++++++++++++++- 2 files changed, 88 insertions(+), 2 deletions(-) diff --git a/inngest/src/handler.rs b/inngest/src/handler.rs index 52a88af..d684780 100644 --- a/inngest/src/handler.rs +++ b/inngest/src/handler.rs @@ -180,7 +180,7 @@ impl Handler { }; let signature = Signature::new(&sig, &key, &body); - if let Err(err) = signature.verify() { + if let Err(err) = signature.verify(false) { return Err(err); } } diff --git a/inngest/src/signature.rs b/inngest/src/signature.rs index 6ef1dcb..9ed0c8b 100644 --- a/inngest/src/signature.rs +++ b/inngest/src/signature.rs @@ -17,8 +17,94 @@ impl Signature { } } + fn hash(key: &str) -> String { + "".to_string() + } + // TODO: implement signature validation - pub fn verify(&self) -> Result<(), Error> { + pub fn verify(&self, ignore_ts: bool) -> Result<(), Error> { Ok(()) } } + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + const SIGNING_KEY: &str = + "signkey-test-8ee2262a15e8d3c42d6a840db7af3de2aab08ef632b32a37a687f24b34dba3ff"; + const HASHED_SIGNING_KEY: &str = + "signkey-test-e4bf4a2e7f55c7eb954b6e72f8f69628fbc409fe7da6d0f6958770987dcf0e02"; + const SIGNATURE: &str = + "t=1689920619&s=31df77f5b1b029de4bfce3a77e0517aa4ce0f5e2195a6467fc126a489ca2330b"; + + #[test] + fn test_hashed_signing_key() { + let hashed = Signature::hash(SIGNING_KEY); + assert_eq!(hashed, HASHED_SIGNING_KEY); + } + + fn event() -> Value { + json!({ + "id": "", + "name": "inngest/scheduled.timer", + "data": {}, + "user": {}, + "ts": 1_674_082_830_001 as i64, + "v": "1" + }) + } + + fn body() -> Value { + let evt = event(); + + json!({ + "ctx": { + "fn_id": "local-testing-local-cron", + "run_id": "01GQ3HTEZ01M7R8Z9PR1DMHDN1", + "step_id": "step" + }, + "event": &evt, + "events": [&evt], + "steps": {}, + "use_api": false + }) + } + + #[test] + fn test_verify_if_signature_is_valid() { + let body = body(); + let sig = Signature::new(SIGNATURE, SIGNING_KEY, &body); + let res = sig.verify(true); + assert!(res.is_ok()); + } + + #[ignore] + #[test] + fn test_verify_if_signature_is_expired() { + let body = body(); + let sig = Signature::new(SIGNATURE, SIGNING_KEY, &body); + let res = sig.verify(false); + assert!(res.is_err()); + } + + #[ignore] + #[test] + fn test_verify_if_signature_is_invalid() { + let body = body(); + let invalid_sig = format!("{}hello", SIGNATURE); + let sig = Signature::new(&invalid_sig, SIGNING_KEY, &body); + let res = sig.verify(true); + assert!(res.is_err()); + } + + #[ignore] + #[test] + fn test_verify_for_random_input() { + let body = body(); + let sig = Signature::new("10", SIGNING_KEY, &body); + let res = sig.verify(true); + assert!(res.is_err()); + } +} From 15cf4c53dd1d8a0dbb9ca1b2dcac3b06aac9dd43 Mon Sep 17 00:00:00 2001 From: Darwin D Wu Date: Fri, 4 Oct 2024 21:58:33 -0700 Subject: [PATCH 12/39] add regex --- inngest/Cargo.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/inngest/Cargo.toml b/inngest/Cargo.toml index 353c21b..703303a 100644 --- a/inngest/Cargo.toml +++ b/inngest/Cargo.toml @@ -21,6 +21,7 @@ slug = "0.1.4" typetag = "0.2.13" inngest_macros = { path = "../macros" } futures = "0.3.30" +regex = "1.11.0" # used for step encoding in SDK sha1 = "0.10.6" From 65d121df55f071732bdeb116c9e86c33523f7dbd Mon Sep 17 00:00:00 2001 From: Darwin D Wu Date: Fri, 4 Oct 2024 22:29:29 -0700 Subject: [PATCH 13/39] add signing key hashing --- inngest/Cargo.toml | 1 + inngest/src/signature.rs | 45 +++++++++++++++++++++++++++++++++++----- 2 files changed, 41 insertions(+), 5 deletions(-) diff --git a/inngest/Cargo.toml b/inngest/Cargo.toml index 703303a..cf0051c 100644 --- a/inngest/Cargo.toml +++ b/inngest/Cargo.toml @@ -27,3 +27,4 @@ regex = "1.11.0" sha1 = "0.10.6" base16 = "0.2.1" url = "2.5.2" +sha2 = "0.10.8" diff --git a/inngest/src/signature.rs b/inngest/src/signature.rs index 9ed0c8b..5b79452 100644 --- a/inngest/src/signature.rs +++ b/inngest/src/signature.rs @@ -1,30 +1,62 @@ +use regex::Regex; use serde_json::Value; +use sha1::Digest; +use sha2::Sha256; -use crate::result::Error; +use crate::{basic_error, result::Error}; pub struct Signature { sig: String, key: String, body: Value, + re: Regex, } impl Signature { pub fn new(sig: &str, key: &str, body: &Value) -> Self { + let re = Regex::new(r"^signkey-.+-").unwrap(); + Signature { sig: sig.to_string(), key: key.to_string(), body: body.clone(), + re, } } - fn hash(key: &str) -> String { - "".to_string() + fn hash(&self) -> Result { + match self.re.find(&self.key) { + Some(mat) => { + let prefix = mat.as_str(); + let key = self.normalize_key(); + + match base16::decode(key.as_bytes()) { + Ok(de) => { + let mut hasher = Sha256::new(); + hasher.update(de); + + let sum = hasher.finalize(); + let enc = base16::encode_lower(&sum); + + Ok(format!("{}{}", prefix, enc)) + } + // TODO: handle errors better + Err(_err) => Err(basic_error!("decode error")), + } + } + // TODO: handle errors better + None => Err(basic_error!("hash failure")), + } } // TODO: implement signature validation pub fn verify(&self, ignore_ts: bool) -> Result<(), Error> { Ok(()) } + + fn normalize_key(&self) -> String { + self.re.replace(&self.key, "").to_string() + } } #[cfg(test)] @@ -41,8 +73,11 @@ mod tests { #[test] fn test_hashed_signing_key() { - let hashed = Signature::hash(SIGNING_KEY); - assert_eq!(hashed, HASHED_SIGNING_KEY); + let body = body(); + let sig = Signature::new(SIGNATURE, SIGNING_KEY, &body); + let hashed = sig.hash(); + assert!(hashed.is_ok()); + assert_eq!(HASHED_SIGNING_KEY, hashed.unwrap()); } fn event() -> Value { From 79baf1bd9c6bb0d89f7701b3791626720931e83c Mon Sep 17 00:00:00 2001 From: Darwin D Wu Date: Fri, 4 Oct 2024 23:31:25 -0700 Subject: [PATCH 14/39] implement signature checks --- inngest/Cargo.toml | 1 + inngest/src/handler.rs | 28 +++++----- inngest/src/serve/axum.rs | 13 +++-- inngest/src/signature.rs | 109 ++++++++++++++++++++++++++++++++------ 4 files changed, 117 insertions(+), 34 deletions(-) diff --git a/inngest/Cargo.toml b/inngest/Cargo.toml index cf0051c..dd0c8c8 100644 --- a/inngest/Cargo.toml +++ b/inngest/Cargo.toml @@ -28,3 +28,4 @@ sha1 = "0.10.6" base16 = "0.2.1" url = "2.5.2" sha2 = "0.10.8" +hmac = "0.12.1" diff --git a/inngest/src/handler.rs b/inngest/src/handler.rs index d684780..5bbdafe 100644 --- a/inngest/src/handler.rs +++ b/inngest/src/handler.rs @@ -147,6 +147,7 @@ impl Handler { &self, headers: &Headers, query: RunQueryParams, + raw_body: &str, body: &Value, ) -> Result where @@ -159,6 +160,19 @@ impl Handler { return Err(basic_error!("no signature provided for SDK in Cloud mode")); } + // Verify the signature if provided + if let Some(sig) = sig.clone() { + let Some(key) = self.signing_key.clone() else { + return Err(basic_error!( + "no signing key available for verifying request signature" + )); + }; + + let signature = Signature::new(&sig, &key, raw_body); + _ = signature.verify(false)?; + } + + let data = match serde_json::from_value::>(body.clone()) { Ok(res) => res, Err(err) => { @@ -171,20 +185,6 @@ impl Handler { // TODO: retrieve data from API on flag if data.use_api {} - // Verify the signature if provided - if let Some(sig) = sig.clone() { - let Some(key) = self.signing_key.clone() else { - return Err(basic_error!( - "no signing key available for verifying request signature" - )); - }; - - let signature = Signature::new(&sig, &key, &body); - if let Err(err) = signature.verify(false) { - return Err(err); - } - } - // find the specified function let Some(func) = self.funcs.get(&query.fn_id) else { return Err(basic_error!( diff --git a/inngest/src/serve/axum.rs b/inngest/src/serve/axum.rs index 397a503..bc98731 100644 --- a/inngest/src/serve/axum.rs +++ b/inngest/src/serve/axum.rs @@ -1,7 +1,5 @@ use crate::{ - handler::{Handler, RunQueryParams}, - header::Headers, - result::{Error, SdkResponse}, + basic_error, handler::{Handler, RunQueryParams}, header::Headers, result::{Error, SdkResponse} }; use axum::{ @@ -29,12 +27,17 @@ pub async fn invoke( hmap: HeaderMap, Query(query): Query, State(handler): State>>, - Json(body): Json, + raw: String, + // Json(body): Json, ) -> Result where T: for<'de> Deserialize<'de> + Debug, E: Into, { let headers = Headers::from(hmap); - handler.run(&headers, query, &body).await + match serde_json::from_str(&raw) { + Ok(body) => handler.run(&headers, query, raw.as_str(), &body).await, + Err(_err) => Err(basic_error!("failed to parse body as JSON")) + } + } diff --git a/inngest/src/signature.rs b/inngest/src/signature.rs index 5b79452..8bea6e8 100644 --- a/inngest/src/signature.rs +++ b/inngest/src/signature.rs @@ -1,25 +1,32 @@ +use std::{ + collections::HashMap, + time::{self, Duration, SystemTime}, +}; + +use hmac::{Hmac, Mac}; use regex::Regex; -use serde_json::Value; use sha1::Digest; use sha2::Sha256; use crate::{basic_error, result::Error}; +type HmacSha256 = Hmac; + pub struct Signature { sig: String, key: String, - body: Value, + body: String, re: Regex, } impl Signature { - pub fn new(sig: &str, key: &str, body: &Value) -> Self { + pub fn new(sig: &str, key: &str, body: &str) -> Self { let re = Regex::new(r"^signkey-.+-").unwrap(); Signature { sig: sig.to_string(), key: key.to_string(), - body: body.clone(), + body: body.to_string(), re, } } @@ -28,7 +35,7 @@ impl Signature { match self.re.find(&self.key) { Some(mat) => { let prefix = mat.as_str(); - let key = self.normalize_key(); + let key = self.normalize_key(&self.key); match base16::decode(key.as_bytes()) { Ok(de) => { @@ -51,18 +58,91 @@ impl Signature { // TODO: implement signature validation pub fn verify(&self, ignore_ts: bool) -> Result<(), Error> { - Ok(()) + let smap = self.sig_map(); + + match smap.get("t") { + Some(tsstr) => match tsstr.parse::() { + Ok(ts) => { + let now = SystemTime::now(); + let from = self.unix_ts_to_systime(ts); + + match now.duration_since(from) { + Ok(dur) => { + if ignore_ts || dur <= Duration::from_secs(60 * 5) { + match self.sign(ts, &self.key, &self.body) { + Ok(sig) => { + if sig == self.sig { + Ok(()) + } else { + // TODO: handle errors better + Err(basic_error!("signature doesn't match")) + } + } + // TODO: handle errors better + Err(_err) => Err(basic_error!("error signing body")), + } + } else { + // TODO: handle errors better + Err(basic_error!("invalid sig")) + } + } + // TODO: handle errors better + Err(_err) => Err(basic_error!("error parsing time elasped for sig")), + } + } + // TODO: handle errors better + Err(_err) => Err(basic_error!("error parsing ts as integer")), + }, + // TODO: handle errors better + None => Err(basic_error!("no ts field for signature")), + } + } + + fn normalize_key(&self, key: &str) -> String { + self.re.replace(key, "").to_string() + } + + fn sig_map(&self) -> HashMap { + let mut map = HashMap::new(); + + for attrs in self.sig.split("&") { + let mut parts = attrs.split("="); + + if let (Some(key), Some(val)) = (parts.next(), parts.next()) { + map.insert(key.to_string(), val.to_string()); + } + } + + map + } + + fn unix_ts_to_systime(&self, ts: i64) -> SystemTime { + if ts >= 0 { + time::UNIX_EPOCH + Duration::from_millis(ts as u64) + } else { + // handle negative timestamp + let nts = Duration::from_millis(-ts as u64); + time::UNIX_EPOCH - nts + } } - fn normalize_key(&self) -> String { - self.re.replace(&self.key, "").to_string() + fn sign(&self, unix_ts: i64, signing_key: &str, body: &str) -> Result { + let key = self.normalize_key(signing_key); + let mut mac = + HmacSha256::new_from_slice(&key.as_bytes()).expect("HMAC can take key of any size"); + mac.update(format!("{}{}", body, unix_ts).as_bytes()); + + let sum = mac.finalize(); + let sig = base16::encode_lower(&sum.into_bytes()); + + Ok(format!("t={}&s={}", unix_ts, sig)) } } #[cfg(test)] mod tests { use super::*; - use serde_json::json; + use serde_json::{json, Value}; const SIGNING_KEY: &str = "signkey-test-8ee2262a15e8d3c42d6a840db7af3de2aab08ef632b32a37a687f24b34dba3ff"; @@ -91,10 +171,10 @@ mod tests { }) } - fn body() -> Value { + fn body() -> String { let evt = event(); - json!({ + let body = json!({ "ctx": { "fn_id": "local-testing-local-cron", "run_id": "01GQ3HTEZ01M7R8Z9PR1DMHDN1", @@ -104,7 +184,9 @@ mod tests { "events": [&evt], "steps": {}, "use_api": false - }) + }); + + serde_json::to_string(&body).expect("JSON should serialize to string") } #[test] @@ -115,7 +197,6 @@ mod tests { assert!(res.is_ok()); } - #[ignore] #[test] fn test_verify_if_signature_is_expired() { let body = body(); @@ -124,7 +205,6 @@ mod tests { assert!(res.is_err()); } - #[ignore] #[test] fn test_verify_if_signature_is_invalid() { let body = body(); @@ -134,7 +214,6 @@ mod tests { assert!(res.is_err()); } - #[ignore] #[test] fn test_verify_for_random_input() { let body = body(); From c20eb42c952cadb0850307bfbd9768e61f76c086 Mon Sep 17 00:00:00 2001 From: Darwin D Wu Date: Fri, 4 Oct 2024 23:32:13 -0700 Subject: [PATCH 15/39] format --- inngest/src/handler.rs | 1 - inngest/src/serve/axum.rs | 8 +++++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/inngest/src/handler.rs b/inngest/src/handler.rs index 5bbdafe..b242095 100644 --- a/inngest/src/handler.rs +++ b/inngest/src/handler.rs @@ -172,7 +172,6 @@ impl Handler { _ = signature.verify(false)?; } - let data = match serde_json::from_value::>(body.clone()) { Ok(res) => res, Err(err) => { diff --git a/inngest/src/serve/axum.rs b/inngest/src/serve/axum.rs index bc98731..343827b 100644 --- a/inngest/src/serve/axum.rs +++ b/inngest/src/serve/axum.rs @@ -1,5 +1,8 @@ use crate::{ - basic_error, handler::{Handler, RunQueryParams}, header::Headers, result::{Error, SdkResponse} + basic_error, + handler::{Handler, RunQueryParams}, + header::Headers, + result::{Error, SdkResponse}, }; use axum::{ @@ -37,7 +40,6 @@ where let headers = Headers::from(hmap); match serde_json::from_str(&raw) { Ok(body) => handler.run(&headers, query, raw.as_str(), &body).await, - Err(_err) => Err(basic_error!("failed to parse body as JSON")) + Err(_err) => Err(basic_error!("failed to parse body as JSON")), } - } From ab6d841c418a86bcf84391738587d2fb5c35928f Mon Sep 17 00:00:00 2001 From: Darwin D Wu Date: Mon, 7 Oct 2024 21:43:26 -0700 Subject: [PATCH 16/39] attempt docker image build --- .github/workflows/deploy.yml | 49 ++++++++++++++++++++++++++++++++ flake.nix | 1 + inngest/examples/axum/Dockerfile | 16 +++++++++++ 3 files changed, 66 insertions(+) create mode 100644 .github/workflows/deploy.yml create mode 100644 inngest/examples/axum/Dockerfile diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml new file mode 100644 index 0000000..7ece501 --- /dev/null +++ b/.github/workflows/deploy.yml @@ -0,0 +1,49 @@ +name: Deploy + +on: + push: + branches: + - main + pull_request: + +env: + CARGO_TERM_COLOR: always + +jobs: + build: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + + - name: Setup Docker Buildx + uses: docker/setup-buildx-action@v3 + + - name: Configure AWS credentials + uses: aws-actions/configure-aws-credentials@v4 + with: + role-to-assume: ${{ secret.AWS_GH_ACTION_ACCESS }} + aws-region: ${{ env.AWS_REGION }} + + - name: Login to AWS ECR + id: login-ecr + uses: aws-actions/amazon-ecr-login@v2 + + - name: Image metadata + id: meta + uses: docker/metadata-action@v5 + with: + images: ${{ steps.login-ecr.outputs }}/rust-sdk + tags: | + type=sha,prefix= + + - name: Build and Push + uses: docker/build-push-action@v5 + with: + context: . + push: true + provenance: false + file: inngest/examples/axum/Dockerfile + tags: ${{ steps.meta.outputs.tags }} + platform: linux/amd64 + cache-from: type=gha + cache-to: type=gha,mode=max diff --git a/flake.nix b/flake.nix index c926a14..1f63daf 100644 --- a/flake.nix +++ b/flake.nix @@ -29,6 +29,7 @@ # LSP rust-analyzer + nodePackages.yaml-language-server ]; RUST_SRC_PATH = diff --git a/inngest/examples/axum/Dockerfile b/inngest/examples/axum/Dockerfile new file mode 100644 index 0000000..44dc9e5 --- /dev/null +++ b/inngest/examples/axum/Dockerfile @@ -0,0 +1,16 @@ +FROM 1.79-bookworm AS build + +RUN apt-get update && apt-get install -y adduser ca-certificates tzdata curl dnsutils && update-ca-certificates + +WORKDIR /app +COPY . . + +RUN cargo build --example axum --release + +FROM ubuntu:24.04 AS runner +RUN apt update && apt install -y adduser wget ca-certificates tzdata curl dnsutils && update-ca-certificates + +COPY --from=build /app/target/release/examples/axum /bin/run + +USER ubuntu +CMD ["run"] From d950d61c0d51d6b14593594ba45a52e15d260380 Mon Sep 17 00:00:00 2001 From: Darwin D Wu Date: Mon, 7 Oct 2024 21:44:17 -0700 Subject: [PATCH 17/39] typo --- .github/workflows/deploy.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml index 7ece501..febbf88 100644 --- a/.github/workflows/deploy.yml +++ b/.github/workflows/deploy.yml @@ -21,7 +21,7 @@ jobs: - name: Configure AWS credentials uses: aws-actions/configure-aws-credentials@v4 with: - role-to-assume: ${{ secret.AWS_GH_ACTION_ACCESS }} + role-to-assume: ${{ secrets.AWS_GH_ACTION_ACCESS }} aws-region: ${{ env.AWS_REGION }} - name: Login to AWS ECR From 7044d4a7ae90c956ab36203ca477ea9ca58e502b Mon Sep 17 00:00:00 2001 From: Darwin D Wu Date: Mon, 7 Oct 2024 21:46:10 -0700 Subject: [PATCH 18/39] change to vars --- .github/workflows/deploy.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml index febbf88..8851567 100644 --- a/.github/workflows/deploy.yml +++ b/.github/workflows/deploy.yml @@ -22,7 +22,7 @@ jobs: uses: aws-actions/configure-aws-credentials@v4 with: role-to-assume: ${{ secrets.AWS_GH_ACTION_ACCESS }} - aws-region: ${{ env.AWS_REGION }} + aws-region: ${{ vars.AWS_REGION }} - name: Login to AWS ECR id: login-ecr From 87b762cd02757799dff4307f63f0649f2485e842 Mon Sep 17 00:00:00 2001 From: Darwin D Wu Date: Mon, 7 Oct 2024 21:46:50 -0700 Subject: [PATCH 19/39] add permissions --- .github/workflows/deploy.yml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml index 8851567..a115fae 100644 --- a/.github/workflows/deploy.yml +++ b/.github/workflows/deploy.yml @@ -12,6 +12,9 @@ env: jobs: build: runs-on: ubuntu-latest + permissions: + id-token: write + contents: read steps: - uses: actions/checkout@v4 From e223337132ff8180e0635a6e922535f3ebd0f2fa Mon Sep 17 00:00:00 2001 From: Darwin D Wu Date: Mon, 7 Oct 2024 21:54:58 -0700 Subject: [PATCH 20/39] fix image ref --- inngest/examples/axum/Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/inngest/examples/axum/Dockerfile b/inngest/examples/axum/Dockerfile index 44dc9e5..465f6b7 100644 --- a/inngest/examples/axum/Dockerfile +++ b/inngest/examples/axum/Dockerfile @@ -1,4 +1,4 @@ -FROM 1.79-bookworm AS build +FROM rust:1.79-bookworm AS build RUN apt-get update && apt-get install -y adduser ca-certificates tzdata curl dnsutils && update-ca-certificates From e8948c8cf52e4c7a725a2e13e6efe4038f0ec519 Mon Sep 17 00:00:00 2001 From: Darwin D Wu Date: Mon, 7 Oct 2024 21:59:51 -0700 Subject: [PATCH 21/39] use proper reference --- .github/workflows/deploy.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml index a115fae..2d972a1 100644 --- a/.github/workflows/deploy.yml +++ b/.github/workflows/deploy.yml @@ -35,7 +35,7 @@ jobs: id: meta uses: docker/metadata-action@v5 with: - images: ${{ steps.login-ecr.outputs }}/rust-sdk + images: ${{ steps.login-ecr.outputs.registry }}/rust-sdk tags: | type=sha,prefix= From 765cf8228af6ea7cfacaecb3b90c00442bcf1f15 Mon Sep 17 00:00:00 2001 From: Darwin D Wu Date: Mon, 7 Oct 2024 22:14:45 -0700 Subject: [PATCH 22/39] setup deploy --- deploy/deployment.yaml | 55 ++++++++++++++++++++++++++++++++++++++ deploy/kustomization.yaml | 21 +++++++++++++++ deploy/namespace.yaml | 7 +++++ deploy/service.yaml | 19 +++++++++++++ deploy/serviceaccount.yaml | 6 +++++ 5 files changed, 108 insertions(+) create mode 100644 deploy/deployment.yaml create mode 100644 deploy/kustomization.yaml create mode 100644 deploy/namespace.yaml create mode 100644 deploy/service.yaml create mode 100644 deploy/serviceaccount.yaml diff --git a/deploy/deployment.yaml b/deploy/deployment.yaml new file mode 100644 index 0000000..ed0a98e --- /dev/null +++ b/deploy/deployment.yaml @@ -0,0 +1,55 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: rust-sdk +spec: + replicas: 2 + revisionHistoryLimit: 5 + template: + spec: + # allow enough time for graceful termination + terminationGracePeriodSeconds: 600 + topologySpreadConstraints: + - topologyKey: "topology.kubernetes.io/zone" + maxSkew: 1 + whenUnsatisfiable: ScheduleAnyway + labelSelector: + matchLabels: + app: rust-sdk + serviceAccountName: rust-sdk + containers: + - image: rust-sdk:TO_BE_REPLACED + imagePullPolicy: IfNotPresent + name: axum + ports: + - containerPort: 3000 + resources: + limits: + memory: 100Mi + requests: + cpu: 0.5 + envFrom: + - configMapRef: + name: common + livenessProbe: + httpGet: + path: / + port: 3000 + initialDelaySeconds: 3 + periodSeconds: 10 + successThreshold: 1 + failureThreshold: 3 + readinessProbe: + httpGet: + path: / + port: 3000 + initialDelaySeconds: 3 + periodSeconds: 10 + successThreshold: 3 + failureThreshold: 3 + lifecycle: + # required for LB to register and drain target + # ref: https://aws.github.io/aws-eks-best-practices/networking/loadbalancing/loadbalancing/#ensure-pods-are-deregistered-from-load-balancers-before-termination + preStop: + exec: + command: ["/bin/sh", "-c", "sleep 180"] diff --git a/deploy/kustomization.yaml b/deploy/kustomization.yaml new file mode 100644 index 0000000..ce723ad --- /dev/null +++ b/deploy/kustomization.yaml @@ -0,0 +1,21 @@ +apiVersion: kustomize.config.k8s.io/v1beta1 +kind: Kustomization + +namespace: rust-sdk + +commonLabels: + app: rust-sdk + app.kubernetes.io/managed-by: kustomize + +configMapGenerator: + - name: common + literals: + - INNGEST_SERVE_ORIGIN=https://rust-sdk.inngest.net + - INNGEST_SIGNING_KEY=REPLACE_SIGNING_KEY + - INNGEST_EVENT_KEY=REPLACE_EVENT_KEY + +resources: + - namespace.yaml + - serviceaccount.yaml + - service.yaml + - deployment.yaml diff --git a/deploy/namespace.yaml b/deploy/namespace.yaml new file mode 100644 index 0000000..a96d240 --- /dev/null +++ b/deploy/namespace.yaml @@ -0,0 +1,7 @@ +apiVersion: v1 +kind: Namespace +metadata: + name: rust-sdk + labels: + # required to make sure pods are ready after LB has completed their health checks + elbv2.k8s.aws/pod-readiness-gate-inject: enabled diff --git a/deploy/service.yaml b/deploy/service.yaml new file mode 100644 index 0000000..2d808d0 --- /dev/null +++ b/deploy/service.yaml @@ -0,0 +1,19 @@ +apiVersion: v1 +kind: Service +metadata: + name: rust-sdk + annotations: + service.beta.kubernetes.io/aws-load-balancer-name: rust-sdk + service.beta.kubernetes.io/aws-load-balancer-scheme: internet-facing + service.beta.kubernetes.io/aws-load-balancer-ip-address-type: dualstack + service.beta.kubernetes.io/aws-load-balancer-nlb-target-type: ip + service.beta.kubernetes.io/aws-load-balancer-target-group-attributes: preserve_client_ip.enabled=true + service.beta.kubernetes.io/aws-load-balancer-ssl-negotiation-policy: ELBSecurityPolicy-TLS13-1-2-2021-06 + service.beta.kubernetes.io/aws-load-balancer-attributes: load_balancing.cross_zone.enabled=true +spec: + type: LoadBalancer + ports: + - name: tcp + port: 443 + targetPort: 3000 + protocol: TCP diff --git a/deploy/serviceaccount.yaml b/deploy/serviceaccount.yaml new file mode 100644 index 0000000..aa298d6 --- /dev/null +++ b/deploy/serviceaccount.yaml @@ -0,0 +1,6 @@ +apiVersion: v1 +kind: ServiceAccount +metadata: + name: rust-sdk + labels: + service: rust-sdk From 861f8044b6356e7cb21ad6d3d506298988271b54 Mon Sep 17 00:00:00 2001 From: Darwin D Wu Date: Mon, 7 Oct 2024 22:44:04 -0700 Subject: [PATCH 23/39] change binary name --- inngest/examples/axum/Dockerfile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/inngest/examples/axum/Dockerfile b/inngest/examples/axum/Dockerfile index 465f6b7..3989ef3 100644 --- a/inngest/examples/axum/Dockerfile +++ b/inngest/examples/axum/Dockerfile @@ -10,7 +10,7 @@ RUN cargo build --example axum --release FROM ubuntu:24.04 AS runner RUN apt update && apt install -y adduser wget ca-certificates tzdata curl dnsutils && update-ca-certificates -COPY --from=build /app/target/release/examples/axum /bin/run +COPY --from=build /app/target/release/examples/axum /bin/main USER ubuntu -CMD ["run"] +CMD ["main"] From 649216c36d131c57329818981e27050ccf8f3db6 Mon Sep 17 00:00:00 2001 From: Darwin D Wu Date: Mon, 7 Oct 2024 22:59:31 -0700 Subject: [PATCH 24/39] add deployment step --- .github/workflows/deploy.yml | 49 ++++++++++++++++++++++++++++++++++++ deploy/kustomization.yaml | 8 ++++++ deploy/service.yaml | 4 +++ 3 files changed, 61 insertions(+) diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml index 2d972a1..059c655 100644 --- a/.github/workflows/deploy.yml +++ b/.github/workflows/deploy.yml @@ -50,3 +50,52 @@ jobs: platform: linux/amd64 cache-from: type=gha cache-to: type=gha,mode=max + + deploy: + needs: [build] + runs-on: ubuntu-latest + permissions: + id-token: write + contents: read + steps: + - uses: actions/checkout@v4 + + - name: Configure AWS credentials + uses: aws-actions/configure-aws-credentials@v4 + with: + role-to-assume: ${{ secrets.AWS_GH_ACTION_ACCESS }} + aws-region: ${{ vars.AWS_REGION }} + + - name: Login to AWS ECR + id: login-ecr + uses: aws-actions/amazon-ecr-login@v2 + + - name: Image metadata + id: meta + uses: docker/metadata-action@v5 + with: + images: ${{ steps.login-ecr.outputs.registry }}/rust-sdk + tags: | + type=sha,prefix= + + - name: Replace placeholders in kustomize + run: | + # image name + sed -i "s/IMAGE_NAME/${{ steps.login-ecr.outputs.registry }}/g" kustomization.yaml + + # image tag + sed -i "s/IMAGE_TAG/${{ env.DOCKER_METADATA_OUTPUT_VERSION }}/g" kustomization.yaml + + # siging key + sed -i "s/REPLACE_SIGNING_KEY/${{ secrets.INNGEST_SIGNING_KEY }}/g" kustomization.yaml + + # event key + sed -i "s/REPLACE_EVENT_KEY/${{ secrets.INNGEST_EVENT_KEY }}/g" kustomization.yaml + + - name: Deploy + uses: ianbelcher/eks-kubectl-action@master # ALERT: master...? + with: + cluster_name: ${{ secrets.EKS_CLUSTER_NAME }} + kubernetes_version: v1.28.4 + eks_role_arn: ${{ secrets.EKS_ACCESS_ARN }} + args: apply -k ./opt/deploy/overlays/staging diff --git a/deploy/kustomization.yaml b/deploy/kustomization.yaml index ce723ad..da09f73 100644 --- a/deploy/kustomization.yaml +++ b/deploy/kustomization.yaml @@ -11,6 +11,9 @@ configMapGenerator: - name: common literals: - INNGEST_SERVE_ORIGIN=https://rust-sdk.inngest.net + - INNGEST_EVENT_API_ORIGIN=https://stage.inn.gs + - INNGEST_API_ORIGIN=https://api.inngest.net + - INNGEST_SIGNING_KEY=REPLACE_SIGNING_KEY - INNGEST_EVENT_KEY=REPLACE_EVENT_KEY @@ -19,3 +22,8 @@ resources: - serviceaccount.yaml - service.yaml - deployment.yaml + +images: + - name: rust-sdk + newName: "IMAGE_NAME" + newTag: "IMAGE_TAG" diff --git a/deploy/service.yaml b/deploy/service.yaml index 2d808d0..101f4c6 100644 --- a/deploy/service.yaml +++ b/deploy/service.yaml @@ -10,6 +10,10 @@ metadata: service.beta.kubernetes.io/aws-load-balancer-target-group-attributes: preserve_client_ip.enabled=true service.beta.kubernetes.io/aws-load-balancer-ssl-negotiation-policy: ELBSecurityPolicy-TLS13-1-2-2021-06 service.beta.kubernetes.io/aws-load-balancer-attributes: load_balancing.cross_zone.enabled=true + service.beta.kubernetes.io/aws-load-balancer-additional-resource-tags: env=staging,app=rust-sdk + service.beta.kubernetes.io/aws-load-balancer-ssl-cert: arn:aws:acm:us-east-2:909933634258:certificate/56142e36-5123-47fc-90de-a2865439c6aa + # external DNS integration test + external-dns.alpha.kubernetes.io/hostname: rust-sdk.inngest.net spec: type: LoadBalancer ports: From 83203fcad87d3f584fc0c3784f6ef42bd2d4e750 Mon Sep 17 00:00:00 2001 From: Darwin D Wu Date: Mon, 7 Oct 2024 23:01:23 -0700 Subject: [PATCH 25/39] consolidate --- .github/workflows/deploy.yml | 31 +++---------------------------- 1 file changed, 3 insertions(+), 28 deletions(-) diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml index 059c655..b542691 100644 --- a/.github/workflows/deploy.yml +++ b/.github/workflows/deploy.yml @@ -10,7 +10,8 @@ env: CARGO_TERM_COLOR: always jobs: - build: + build_and_deploy: + name: Build and Deploy runs-on: ubuntu-latest permissions: id-token: write @@ -51,34 +52,8 @@ jobs: cache-from: type=gha cache-to: type=gha,mode=max - deploy: - needs: [build] - runs-on: ubuntu-latest - permissions: - id-token: write - contents: read - steps: - - uses: actions/checkout@v4 - - - name: Configure AWS credentials - uses: aws-actions/configure-aws-credentials@v4 - with: - role-to-assume: ${{ secrets.AWS_GH_ACTION_ACCESS }} - aws-region: ${{ vars.AWS_REGION }} - - - name: Login to AWS ECR - id: login-ecr - uses: aws-actions/amazon-ecr-login@v2 - - - name: Image metadata - id: meta - uses: docker/metadata-action@v5 - with: - images: ${{ steps.login-ecr.outputs.registry }}/rust-sdk - tags: | - type=sha,prefix= - - name: Replace placeholders in kustomize + working-directory: deploy run: | # image name sed -i "s/IMAGE_NAME/${{ steps.login-ecr.outputs.registry }}/g" kustomization.yaml From 81eaf7933a4afc7ed318e7b59583dcb6246162dc Mon Sep 17 00:00:00 2001 From: Darwin D Wu Date: Mon, 7 Oct 2024 23:07:50 -0700 Subject: [PATCH 26/39] change to ipv6 binding --- inngest/examples/axum/main.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/inngest/examples/axum/main.rs b/inngest/examples/axum/main.rs index d664e3e..008bf67 100644 --- a/inngest/examples/axum/main.rs +++ b/inngest/examples/axum/main.rs @@ -29,7 +29,7 @@ async fn main() { let inngest_state = Arc::new(inngest_handler); let app = Router::new() - .route("/", get(|| async { "Hello, World!" })) + .route("/", get(|| async { "OK!" })) .route( "/api/inngest", put(serve::axum::register).post(serve::axum::invoke), @@ -37,7 +37,7 @@ async fn main() { .with_state(inngest_state); // run it with hyper on localhost:3000 - axum::Server::bind(&"0.0.0.0:3000".parse().unwrap()) + axum::Server::bind(&"[::]:3000".parse().unwrap()) .serve(app.into_make_service()) .await .unwrap(); From c00db277fc1050485227a30e650111c582aa0df7 Mon Sep 17 00:00:00 2001 From: Darwin D Wu Date: Mon, 7 Oct 2024 23:09:53 -0700 Subject: [PATCH 27/39] change to parsed --- inngest/examples/axum/main.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/inngest/examples/axum/main.rs b/inngest/examples/axum/main.rs index 008bf67..a040754 100644 --- a/inngest/examples/axum/main.rs +++ b/inngest/examples/axum/main.rs @@ -29,15 +29,17 @@ async fn main() { let inngest_state = Arc::new(inngest_handler); let app = Router::new() - .route("/", get(|| async { "OK!" })) + .route("/", get(|| async { "OK!\n" })) .route( "/api/inngest", put(serve::axum::register).post(serve::axum::invoke), ) .with_state(inngest_state); + let addr = "[::]:3000".parse::().unwrap(); + // run it with hyper on localhost:3000 - axum::Server::bind(&"[::]:3000".parse().unwrap()) + axum::Server::bind(&addr) .serve(app.into_make_service()) .await .unwrap(); From 2f723bc576e6ee38c130a15ab2d24335f83370b5 Mon Sep 17 00:00:00 2001 From: Darwin D Wu Date: Mon, 7 Oct 2024 23:10:33 -0700 Subject: [PATCH 28/39] update args --- .github/workflows/deploy.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml index b542691..15d48c4 100644 --- a/.github/workflows/deploy.yml +++ b/.github/workflows/deploy.yml @@ -73,4 +73,4 @@ jobs: cluster_name: ${{ secrets.EKS_CLUSTER_NAME }} kubernetes_version: v1.28.4 eks_role_arn: ${{ secrets.EKS_ACCESS_ARN }} - args: apply -k ./opt/deploy/overlays/staging + args: apply -k ./deploy From 091534ddf92c385781970995ce4c9a41f546e6c6 Mon Sep 17 00:00:00 2001 From: Darwin D Wu Date: Mon, 7 Oct 2024 23:14:55 -0700 Subject: [PATCH 29/39] fix syntax error --- .github/workflows/deploy.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml index 15d48c4..a808216 100644 --- a/.github/workflows/deploy.yml +++ b/.github/workflows/deploy.yml @@ -56,7 +56,7 @@ jobs: working-directory: deploy run: | # image name - sed -i "s/IMAGE_NAME/${{ steps.login-ecr.outputs.registry }}/g" kustomization.yaml + sed -i "s/IMAGE_NAME/${{ steps.login-ecr.outputs.registry }}\/rust-sdk/g" kustomization.yaml # image tag sed -i "s/IMAGE_TAG/${{ env.DOCKER_METADATA_OUTPUT_VERSION }}/g" kustomization.yaml From d22fbdab238c0080869b91558ecb7c842162eb67 Mon Sep 17 00:00:00 2001 From: Darwin D Wu Date: Mon, 7 Oct 2024 23:30:04 -0700 Subject: [PATCH 30/39] remove grace period --- .github/workflows/deploy.yml | 3 +++ deploy/deployment.yaml | 2 -- deploy/service.yaml | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml index a808216..a9da512 100644 --- a/.github/workflows/deploy.yml +++ b/.github/workflows/deploy.yml @@ -67,6 +67,9 @@ jobs: # event key sed -i "s/REPLACE_EVENT_KEY/${{ secrets.INNGEST_EVENT_KEY }}/g" kustomization.yaml + # TLS cert + sed -i "s/REPLACE_TLS_CERT_ARN/${{ secrets.TLS_CERT_ARN }}/g" service.yaml + - name: Deploy uses: ianbelcher/eks-kubectl-action@master # ALERT: master...? with: diff --git a/deploy/deployment.yaml b/deploy/deployment.yaml index ed0a98e..a481b5a 100644 --- a/deploy/deployment.yaml +++ b/deploy/deployment.yaml @@ -7,8 +7,6 @@ spec: revisionHistoryLimit: 5 template: spec: - # allow enough time for graceful termination - terminationGracePeriodSeconds: 600 topologySpreadConstraints: - topologyKey: "topology.kubernetes.io/zone" maxSkew: 1 diff --git a/deploy/service.yaml b/deploy/service.yaml index 101f4c6..dbbc049 100644 --- a/deploy/service.yaml +++ b/deploy/service.yaml @@ -11,7 +11,7 @@ metadata: service.beta.kubernetes.io/aws-load-balancer-ssl-negotiation-policy: ELBSecurityPolicy-TLS13-1-2-2021-06 service.beta.kubernetes.io/aws-load-balancer-attributes: load_balancing.cross_zone.enabled=true service.beta.kubernetes.io/aws-load-balancer-additional-resource-tags: env=staging,app=rust-sdk - service.beta.kubernetes.io/aws-load-balancer-ssl-cert: arn:aws:acm:us-east-2:909933634258:certificate/56142e36-5123-47fc-90de-a2865439c6aa + service.beta.kubernetes.io/aws-load-balancer-ssl-cert: REPLACE_TLS_CERT_ARN # external DNS integration test external-dns.alpha.kubernetes.io/hostname: rust-sdk.inngest.net spec: From 00f07b5e0955bbb4bc906104431fa9942a3dedee Mon Sep 17 00:00:00 2001 From: Darwin D Wu Date: Mon, 7 Oct 2024 23:36:18 -0700 Subject: [PATCH 31/39] change to single quotes --- .github/workflows/deploy.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml index a9da512..adad5a2 100644 --- a/.github/workflows/deploy.yml +++ b/.github/workflows/deploy.yml @@ -68,7 +68,7 @@ jobs: sed -i "s/REPLACE_EVENT_KEY/${{ secrets.INNGEST_EVENT_KEY }}/g" kustomization.yaml # TLS cert - sed -i "s/REPLACE_TLS_CERT_ARN/${{ secrets.TLS_CERT_ARN }}/g" service.yaml + sed -i 's/REPLACE_TLS_CERT_ARN/${{ secrets.TLS_CERT_ARN }}/g' service.yaml - name: Deploy uses: ianbelcher/eks-kubectl-action@master # ALERT: master...? From a636abba23f27d434df4466808cdb491e0d9f46a Mon Sep 17 00:00:00 2001 From: Darwin D Wu Date: Mon, 7 Oct 2024 23:42:41 -0700 Subject: [PATCH 32/39] change separator to be used --- .github/workflows/deploy.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml index adad5a2..0b97ccf 100644 --- a/.github/workflows/deploy.yml +++ b/.github/workflows/deploy.yml @@ -67,8 +67,8 @@ jobs: # event key sed -i "s/REPLACE_EVENT_KEY/${{ secrets.INNGEST_EVENT_KEY }}/g" kustomization.yaml - # TLS cert - sed -i 's/REPLACE_TLS_CERT_ARN/${{ secrets.TLS_CERT_ARN }}/g' service.yaml + # TLS cert - use % as separator instead + sed -i 's%REPLACE_TLS_CERT_ARN%${{ secrets.TLS_CERT_ARN }}%g' service.yaml - name: Deploy uses: ianbelcher/eks-kubectl-action@master # ALERT: master...? From 1f06d63577ee2859995b98d1fd025a866a6eebfe Mon Sep 17 00:00:00 2001 From: Darwin D Wu Date: Tue, 8 Oct 2024 00:14:25 -0700 Subject: [PATCH 33/39] add signature to header on sync --- inngest/src/handler.rs | 22 ++++++++++++++++++---- inngest/src/signature.rs | 29 +++++++++++++++++++---------- 2 files changed, 37 insertions(+), 14 deletions(-) diff --git a/inngest/src/handler.rs b/inngest/src/handler.rs index b242095..2c9e274 100644 --- a/inngest/src/handler.rs +++ b/inngest/src/handler.rs @@ -131,13 +131,27 @@ impl Handler { ..Default::default() }; - reqwest::Client::new() + let mut req = reqwest::Client::new() .post(format!( "{}/fn/register", self.inngest.inngest_api_origin(kind) )) - .json(&req) - .send() + .json(&req); + + if let Some(key) = &self.signing_key { + let sig = Signature::new(&key); + match sig.hash() { + Ok(hashed) => { + req = req.header("authorization", format!("Bearer {}", hashed)); + }, + Err(_err) => { + return Err("error hashing signing key".to_string()); + } + } + } + + + req.send() .await .map(|_| ()) .map_err(|_err| "error registering".to_string()) @@ -168,7 +182,7 @@ impl Handler { )); }; - let signature = Signature::new(&sig, &key, raw_body); + let signature = Signature::new(&key).sig(&sig).body(raw_body); _ = signature.verify(false)?; } diff --git a/inngest/src/signature.rs b/inngest/src/signature.rs index 8bea6e8..6162e4b 100644 --- a/inngest/src/signature.rs +++ b/inngest/src/signature.rs @@ -20,18 +20,28 @@ pub struct Signature { } impl Signature { - pub fn new(sig: &str, key: &str, body: &str) -> Self { + pub fn new(key: &str) -> Self { let re = Regex::new(r"^signkey-.+-").unwrap(); Signature { - sig: sig.to_string(), + sig: String::new(), key: key.to_string(), - body: body.to_string(), + body: String::new(), re, } } - fn hash(&self) -> Result { + pub fn sig(mut self, sig: &str) -> Self { + self.sig = sig.to_string(); + self + } + + pub fn body(mut self, body: &str) -> Self { + self.body = body.to_string(); + self + } + + pub fn hash(&self) -> Result { match self.re.find(&self.key) { Some(mat) => { let prefix = mat.as_str(); @@ -153,8 +163,7 @@ mod tests { #[test] fn test_hashed_signing_key() { - let body = body(); - let sig = Signature::new(SIGNATURE, SIGNING_KEY, &body); + let sig = Signature::new(SIGNING_KEY); let hashed = sig.hash(); assert!(hashed.is_ok()); assert_eq!(HASHED_SIGNING_KEY, hashed.unwrap()); @@ -192,7 +201,7 @@ mod tests { #[test] fn test_verify_if_signature_is_valid() { let body = body(); - let sig = Signature::new(SIGNATURE, SIGNING_KEY, &body); + let sig = Signature::new(SIGNING_KEY).sig(SIGNATURE).body(&body); let res = sig.verify(true); assert!(res.is_ok()); } @@ -200,7 +209,7 @@ mod tests { #[test] fn test_verify_if_signature_is_expired() { let body = body(); - let sig = Signature::new(SIGNATURE, SIGNING_KEY, &body); + let sig = Signature::new(SIGNING_KEY).sig(SIGNATURE).body(&body); let res = sig.verify(false); assert!(res.is_err()); } @@ -209,7 +218,7 @@ mod tests { fn test_verify_if_signature_is_invalid() { let body = body(); let invalid_sig = format!("{}hello", SIGNATURE); - let sig = Signature::new(&invalid_sig, SIGNING_KEY, &body); + let sig = Signature::new(SIGNING_KEY).sig(&invalid_sig).body(&body); let res = sig.verify(true); assert!(res.is_err()); } @@ -217,7 +226,7 @@ mod tests { #[test] fn test_verify_for_random_input() { let body = body(); - let sig = Signature::new("10", SIGNING_KEY, &body); + let sig = Signature::new(SIGNING_KEY).sig("10").body(&body); let res = sig.verify(true); assert!(res.is_err()); } From 609e917047c4b35dcd316e0777e10d1936ee24e2 Mon Sep 17 00:00:00 2001 From: Darwin D Wu Date: Tue, 8 Oct 2024 00:16:20 -0700 Subject: [PATCH 34/39] fmt --- inngest/src/handler.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/inngest/src/handler.rs b/inngest/src/handler.rs index 2c9e274..941236a 100644 --- a/inngest/src/handler.rs +++ b/inngest/src/handler.rs @@ -143,14 +143,13 @@ impl Handler { match sig.hash() { Ok(hashed) => { req = req.header("authorization", format!("Bearer {}", hashed)); - }, + } Err(_err) => { return Err("error hashing signing key".to_string()); } } } - req.send() .await .map(|_| ()) From 2ba4033866bf32ac7a78fe54da69b63e7940c366 Mon Sep 17 00:00:00 2001 From: Darwin D Wu Date: Tue, 8 Oct 2024 00:31:43 -0700 Subject: [PATCH 35/39] test more error messages --- inngest/examples/axum/main.rs | 2 +- inngest/src/result.rs | 27 ++++++++++++++++++++++----- 2 files changed, 23 insertions(+), 6 deletions(-) diff --git a/inngest/examples/axum/main.rs b/inngest/examples/axum/main.rs index a040754..6a8c815 100644 --- a/inngest/examples/axum/main.rs +++ b/inngest/examples/axum/main.rs @@ -140,7 +140,7 @@ fn hello_fn() -> ServableFn { let evt = &input.event; println!("Event: {}", evt.name); - step.sleep_until("sleep", 1727245659000)?; + step.sleep("wait-5s", Duration::from_secs(5))?; Ok(json!("test hello")) }, diff --git a/inngest/src/result.rs b/inngest/src/result.rs index 0618702..0e7bd97 100644 --- a/inngest/src/result.rs +++ b/inngest/src/result.rs @@ -142,11 +142,28 @@ impl Drop for FlowControlError { impl IntoResponse for Error { fn into_response(self) -> axum::response::Response { - ( - StatusCode::INTERNAL_SERVER_ERROR, - Json(json!("NOT IMPLEMENTED")), - ) - .into_response() + match self { + Error::Dev(err) => match err { + DevError::Basic(msg) => (StatusCode::INTERNAL_SERVER_ERROR, Json(json!(msg))), + DevError::RetryAt(_err) => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!("retry after error")), + ), + DevError::NoRetry(_err) => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!("no retry error")), + ), + }, + Error::NoInvokeFunctionResponseError => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!("No invoke response")), + ), + _ => ( + StatusCode::INTERNAL_SERVER_ERROR, + Json(json!("NOT IMPLEMENTED")), + ), + } + .into_response() } } From a7f1cacb2d868c4df78345a60fd111a07516f755 Mon Sep 17 00:00:00 2001 From: Darwin D Wu Date: Tue, 8 Oct 2024 00:43:28 -0700 Subject: [PATCH 36/39] debug --- inngest/src/signature.rs | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/inngest/src/signature.rs b/inngest/src/signature.rs index 6162e4b..400ac91 100644 --- a/inngest/src/signature.rs +++ b/inngest/src/signature.rs @@ -78,6 +78,10 @@ impl Signature { match now.duration_since(from) { Ok(dur) => { + println!( + "DEBUG:\n Timestamp: {}\n Now: {:#?}\n From: {:#?}\n Duration: {:#?}\n SMap: {:#?}\n Ignore: {}\n Within Range: {}", + ts, now, from, dur, smap, ignore_ts, dur <= Duration::from_secs(60 * 5) + ); if ignore_ts || dur <= Duration::from_secs(60 * 5) { match self.sign(ts, &self.key, &self.body) { Ok(sig) => { From b7e5f91c925e5283083c5b240ef1e9b3dcffc798 Mon Sep 17 00:00:00 2001 From: Darwin D Wu Date: Tue, 8 Oct 2024 01:19:04 -0700 Subject: [PATCH 37/39] change validation of timestamp --- inngest/src/signature.rs | 68 +++++++++++++++++---------------------- inngest/src/utils/mod.rs | 1 + inngest/src/utils/time.rs | 9 ++++++ 3 files changed, 40 insertions(+), 38 deletions(-) create mode 100644 inngest/src/utils/time.rs diff --git a/inngest/src/signature.rs b/inngest/src/signature.rs index 400ac91..7c20377 100644 --- a/inngest/src/signature.rs +++ b/inngest/src/signature.rs @@ -1,14 +1,11 @@ -use std::{ - collections::HashMap, - time::{self, Duration, SystemTime}, -}; +use std::{collections::HashMap, time::Duration}; use hmac::{Hmac, Mac}; use regex::Regex; use sha1::Digest; use sha2::Sha256; -use crate::{basic_error, result::Error}; +use crate::{basic_error, result::Error, utils::time}; type HmacSha256 = Hmac; @@ -73,35 +70,30 @@ impl Signature { match smap.get("t") { Some(tsstr) => match tsstr.parse::() { Ok(ts) => { - let now = SystemTime::now(); - let from = self.unix_ts_to_systime(ts); - - match now.duration_since(from) { - Ok(dur) => { - println!( - "DEBUG:\n Timestamp: {}\n Now: {:#?}\n From: {:#?}\n Duration: {:#?}\n SMap: {:#?}\n Ignore: {}\n Within Range: {}", - ts, now, from, dur, smap, ignore_ts, dur <= Duration::from_secs(60 * 5) - ); - if ignore_ts || dur <= Duration::from_secs(60 * 5) { - match self.sign(ts, &self.key, &self.body) { - Ok(sig) => { - if sig == self.sig { - Ok(()) - } else { - // TODO: handle errors better - Err(basic_error!("signature doesn't match")) - } - } + let now = time::now(); + let valid = + Duration::from_millis((now - ts) as u64) <= Duration::from_secs(60 * 5); + + println!( + "DEBUG:\n Timestamp: {}\n Now: {}\n SMap: {:#?}\n Ignore: {}\n Within Range: {}\n", + ts, now, smap, ignore_ts, valid + ); + if ignore_ts || valid { + match self.sign(ts, &self.key, &self.body) { + Ok(sig) => { + if sig == self.sig { + Ok(()) + } else { // TODO: handle errors better - Err(_err) => Err(basic_error!("error signing body")), + Err(basic_error!("signature doesn't match")) } - } else { - // TODO: handle errors better - Err(basic_error!("invalid sig")) } + // TODO: handle errors better + Err(_err) => Err(basic_error!("error signing body")), } + } else { // TODO: handle errors better - Err(_err) => Err(basic_error!("error parsing time elasped for sig")), + Err(basic_error!("invalid sig")) } } // TODO: handle errors better @@ -130,15 +122,15 @@ impl Signature { map } - fn unix_ts_to_systime(&self, ts: i64) -> SystemTime { - if ts >= 0 { - time::UNIX_EPOCH + Duration::from_millis(ts as u64) - } else { - // handle negative timestamp - let nts = Duration::from_millis(-ts as u64); - time::UNIX_EPOCH - nts - } - } + // fn unix_ts_to_systime(&self, ts: i64) -> SystemTime { + // if ts >= 0 { + // UNIX_EPOCH + Duration::from_millis(ts as u64) + // } else { + // // handle negative timestamp + // let nts = Duration::from_millis(-ts as u64); + // UNIX_EPOCH - nts + // } + // } fn sign(&self, unix_ts: i64, signing_key: &str, body: &str) -> Result { let key = self.normalize_key(signing_key); diff --git a/inngest/src/utils/mod.rs b/inngest/src/utils/mod.rs index 20ef437..c64a680 100644 --- a/inngest/src/utils/mod.rs +++ b/inngest/src/utils/mod.rs @@ -1 +1,2 @@ pub(crate) mod duration; +pub(crate) mod time; diff --git a/inngest/src/utils/time.rs b/inngest/src/utils/time.rs new file mode 100644 index 0000000..312587d --- /dev/null +++ b/inngest/src/utils/time.rs @@ -0,0 +1,9 @@ +use std::time::SystemTime; + +pub(crate) fn now() -> i64 { + let start = SystemTime::now(); + match start.duration_since(std::time::UNIX_EPOCH) { + Ok(dur) => dur.as_millis() as i64, + Err(_) => 0, + } +} From 0ad750a797c7904f338fa4cf72f9759be72948d0 Mon Sep 17 00:00:00 2001 From: Darwin D Wu Date: Tue, 8 Oct 2024 01:25:28 -0700 Subject: [PATCH 38/39] change to second based unix ts --- inngest/src/signature.rs | 3 +-- inngest/src/utils/time.rs | 10 +++++++++- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/inngest/src/signature.rs b/inngest/src/signature.rs index 7c20377..b09c16d 100644 --- a/inngest/src/signature.rs +++ b/inngest/src/signature.rs @@ -71,8 +71,7 @@ impl Signature { Some(tsstr) => match tsstr.parse::() { Ok(ts) => { let now = time::now(); - let valid = - Duration::from_millis((now - ts) as u64) <= Duration::from_secs(60 * 5); + let valid = (now - ts) <= (60 * 5); // within 5m println!( "DEBUG:\n Timestamp: {}\n Now: {}\n SMap: {:#?}\n Ignore: {}\n Within Range: {}\n", diff --git a/inngest/src/utils/time.rs b/inngest/src/utils/time.rs index 312587d..f381963 100644 --- a/inngest/src/utils/time.rs +++ b/inngest/src/utils/time.rs @@ -1,9 +1,17 @@ use std::time::SystemTime; -pub(crate) fn now() -> i64 { +pub(crate) fn now_ms() -> i64 { let start = SystemTime::now(); match start.duration_since(std::time::UNIX_EPOCH) { Ok(dur) => dur.as_millis() as i64, Err(_) => 0, } } + +pub(crate) fn now() -> i64 { + let start = SystemTime::now(); + match start.duration_since(std::time::UNIX_EPOCH) { + Ok(dur) => dur.as_secs() as i64, + Err(_) => 0, + } +} From 54bff0b3c09b6f7a506f37c30131e4288da7b735 Mon Sep 17 00:00:00 2001 From: Darwin D Wu Date: Tue, 8 Oct 2024 01:35:03 -0700 Subject: [PATCH 39/39] check body --- inngest/src/handler.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/inngest/src/handler.rs b/inngest/src/handler.rs index 941236a..479569e 100644 --- a/inngest/src/handler.rs +++ b/inngest/src/handler.rs @@ -188,6 +188,7 @@ impl Handler { let data = match serde_json::from_value::>(body.clone()) { Ok(res) => res, Err(err) => { + println!("BODY: {:#?}", &body); // TODO: need to surface this error better let msg = basic_error!("error parsing run request: {}", err); return Err(msg);