diff --git a/.env.example b/.env.example index 943896d..9862c40 100644 --- a/.env.example +++ b/.env.example @@ -1 +1,2 @@ -RUST_LOG=info,tx_sitter=debug,fake_rpc=debug,tower_http=debug +RUST_LOG=info,tx_sitter=debug +COLOR_EYRE=true diff --git a/Cargo.lock b/Cargo.lock index d6bd4a0..53c24f0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -27,6 +27,16 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +[[package]] +name = "aead" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d122413f284cf2d62fb1b7db97e02edb8cda96d769b16e443a4f6195e35662b0" +dependencies = [ + "crypto-common", + "generic-array", +] + [[package]] name = "aes" version = "0.8.3" @@ -38,6 +48,20 @@ dependencies = [ "cpufeatures", ] +[[package]] +name = "aes-gcm" +version = "0.10.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "831010a0f742e1209b3bcea8fab6a8e149051ba6099432c8cb2cc117dec3ead1" +dependencies = [ + "aead", + "aes", + "cipher", + "ctr", + "ghash", + "subtle", +] + [[package]] name = "ahash" version = "0.7.7" @@ -186,6 +210,12 @@ dependencies = [ "num-traits", ] +[[package]] +name = "atomic-waker" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" + [[package]] name = "auto_impl" version = "1.1.0" @@ -226,7 +256,7 @@ dependencies = [ "fastrand", "hex", "http 0.2.11", - "hyper", + "hyper 0.14.27", "ring 0.17.5", "time", "tokio", @@ -444,11 +474,11 @@ dependencies = [ "aws-smithy-types", "bytes", "fastrand", - "h2", + "h2 0.3.22", "http 0.2.11", "http-body 0.4.5", "http-body 1.0.0", - "hyper", + "hyper 0.14.27", "hyper-rustls", "once_cell", "pin-project-lite", @@ -525,56 +555,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "axum" -version = "0.6.20" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" -dependencies = [ - "async-trait", - "axum-core", - "bitflags 1.3.2", - "bytes", - "futures-util", - "headers", - "http 0.2.11", - "http-body 0.4.5", - "hyper", - "itoa", - "matchit", - "memchr", - "mime", - "percent-encoding", - "pin-project-lite", - "rustversion", - "serde", - "serde_json", - "serde_path_to_error", - "serde_urlencoded", - "sync_wrapper", - "tokio", - "tower", - "tower-layer", - "tower-service", -] - -[[package]] -name = "axum-core" -version = "0.3.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "759fa577a247914fd3f7f76d62972792636412fbfd634cd452f6a385a74d2d2c" -dependencies = [ - "async-trait", - "bytes", - "futures-util", - "http 0.2.11", - "http-body 0.4.5", - "mime", - "rustversion", - "tower-layer", - "tower-service", -] - [[package]] name = "backtrace" version = "0.3.69" @@ -608,6 +588,12 @@ version = "0.21.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "35636a1494ede3b646cc98f74f8e62c773a38a659ebc777a2cf26b9b74171df9" +[[package]] +name = "base64" +version = "0.22.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" + [[package]] name = "base64-simd" version = "0.8.0" @@ -830,6 +816,12 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "cfg_aliases" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd16c4719339c4530435d38e511904438d07cce7950afa3718a84ac36c10e89e" + [[package]] name = "chrono" version = "0.4.31" @@ -873,7 +865,7 @@ dependencies = [ "anstream", "anstyle", "clap_lex", - "strsim", + "strsim 0.10.0", ] [[package]] @@ -946,6 +938,33 @@ dependencies = [ "thiserror", ] +[[package]] +name = "color-eyre" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55146f5e46f237f7423d74111267d4597b59b0dad0ffaf7303bce9945d843ad5" +dependencies = [ + "backtrace", + "color-spantrace", + "eyre", + "indenter", + "once_cell", + "owo-colors", + "tracing-error", +] + +[[package]] +name = "color-spantrace" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cd6be1b2a7e382e2b98b43b2adcca6bb0e465af0bdd38123873ae61eb17a72c2" +dependencies = [ + "once_cell", + "owo-colors", + "tracing-core", + "tracing-error", +] + [[package]] name = "colorchoice" version = "1.0.0" @@ -996,6 +1015,30 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "245097e9a4535ee1e3e3931fcfcd55a796a44c643e8596ff6566d68f09b87bbc" +[[package]] +name = "convert_case" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6245d59a3e82a7fc217c5828a6692dbc6dfb63a0c8c90495621f7b9d79704a0e" + +[[package]] +name = "cookie" +version = "0.18.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ddef33a339a91ea89fb53151bd0a4689cfce27055c291dfa69945475d22c747" +dependencies = [ + "aes-gcm", + "base64 0.22.1", + "hkdf", + "hmac", + "percent-encoding", + "rand", + "sha2", + "subtle", + "time", + "version_check", +] + [[package]] name = "core-foundation" version = "0.9.3" @@ -1123,6 +1166,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3" dependencies = [ "generic-array", + "rand_core", "typenum", ] @@ -1135,6 +1179,41 @@ dependencies = [ "cipher", ] +[[package]] +name = "darling" +version = "0.20.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83b2eb4d90d12bdda5ed17de686c2acb4c57914f8f921b8da7e112b5a36f3fe1" +dependencies = [ + "darling_core", + "darling_macro", +] + +[[package]] +name = "darling_core" +version = "0.20.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "622687fe0bac72a04e5599029151f5796111b90f1baaa9b544d807a5e31cd120" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim 0.11.1", + "syn 2.0.39", +] + +[[package]] +name = "darling_macro" +version = "0.20.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "733cabb43482b1a1b53eee8583c2b9e8684d592215ea83efd305dd31bc2f0178" +dependencies = [ + "darling_core", + "quote", + "syn 2.0.39", +] + [[package]] name = "data-encoding" version = "2.4.0" @@ -1167,8 +1246,10 @@ version = "0.99.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4fb810d30a7c1953f91334de7244731fc3f3c10d7fe163338a35b9f640960321" dependencies = [ + "convert_case", "proc-macro2", "quote", + "rustc_version", "syn 1.0.109", ] @@ -1954,6 +2035,16 @@ dependencies = [ "wasi", ] +[[package]] +name = "ghash" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0d8a4362ccb29cb0b265253fb0a2728f592895ee6854fd9bc13f2ffda266ff1" +dependencies = [ + "opaque-debug", + "polyval", +] + [[package]] name = "gimli" version = "0.28.0" @@ -2008,6 +2099,25 @@ dependencies = [ "tracing", ] +[[package]] +name = "h2" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa82e28a107a8cc405f0839610bdc9b15f1e25ec7d696aa5cf173edbcb1486ab" +dependencies = [ + "atomic-waker", + "bytes", + "fnv", + "futures-core", + "futures-sink", + "http 1.1.0", + "indexmap 2.1.0", + "slab", + "tokio", + "tokio-util", + "tracing", +] + [[package]] name = "hashbrown" version = "0.12.3" @@ -2062,13 +2172,28 @@ checksum = "06683b93020a07e3dbcf5f8c0f6d40080d725bea7936fc01ad345c01b97dc270" dependencies = [ "base64 0.21.5", "bytes", - "headers-core", + "headers-core 0.2.0", "http 0.2.11", "httpdate", "mime", "sha1", ] +[[package]] +name = "headers" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "322106e6bd0cba2d5ead589ddb8150a13d7c4217cf80d7c4f682ca994ccc6aa9" +dependencies = [ + "base64 0.21.5", + "bytes", + "headers-core 0.3.0", + "http 1.1.0", + "httpdate", + "mime", + "sha1", +] + [[package]] name = "headers-core" version = "0.2.0" @@ -2078,6 +2203,15 @@ dependencies = [ "http 0.2.11", ] +[[package]] +name = "headers-core" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "54b4a22553d4242c49fddb9ba998a99962b5cc6f22cb5a3482bec22522403ce4" +dependencies = [ + "http 1.1.0", +] + [[package]] name = "heck" version = "0.4.1" @@ -2188,12 +2322,6 @@ dependencies = [ "pin-project-lite", ] -[[package]] -name = "http-range-header" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "add0ab9360ddbd88cfeb3bd9574a1d85cfdfa14db10b3e21d3700dbc4328758f" - [[package]] name = "httparse" version = "1.8.0" @@ -2232,7 +2360,7 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "h2", + "h2 0.3.22", "http 0.2.11", "http-body 0.4.5", "httparse", @@ -2246,6 +2374,26 @@ dependencies = [ "want", ] +[[package]] +name = "hyper" +version = "1.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe575dd17d0862a9a33781c8c4696a55c320909004a67a00fb286ba8b1bc496d" +dependencies = [ + "bytes", + "futures-channel", + "futures-util", + "h2 0.4.5", + "http 1.1.0", + "http-body 1.0.0", + "httparse", + "httpdate", + "itoa", + "pin-project-lite", + "smallvec", + "tokio", +] + [[package]] name = "hyper-rustls" version = "0.24.2" @@ -2254,7 +2402,7 @@ checksum = "ec3efd23720e2049821a693cbc7e65ea87c72f1c58ff2f9522ff332b1491e590" dependencies = [ "futures-util", "http 0.2.11", - "hyper", + "hyper 0.14.27", "log", "rustls", "rustls-native-certs", @@ -2269,12 +2417,27 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" dependencies = [ "bytes", - "hyper", + "hyper 0.14.27", "native-tls", "tokio", "tokio-native-tls", ] +[[package]] +name = "hyper-util" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b875924a60b96e5d7b9ae7b066540b1dd1cbd90d1828f54c92e02a283351c56" +dependencies = [ + "bytes", + "futures-util", + "http 1.1.0", + "http-body 1.0.0", + "hyper 1.3.1", + "pin-project-lite", + "tokio", +] + [[package]] name = "iana-time-zone" version = "0.1.58" @@ -2298,6 +2461,12 @@ dependencies = [ "cc", ] +[[package]] +name = "ident_case" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9e0384b61958566e926dc50660321d12159025e767c18e043daf26b70104c39" + [[package]] name = "idna" version = "0.4.0" @@ -2551,9 +2720,9 @@ dependencies = [ [[package]] name = "libc" -version = "0.2.150" +version = "0.2.155" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89d92a4743f9a61002fae18374ed11e7973f530cb3a3255fb354818118b2203c" +checksum = "97b3888a4aecf77e811145cadf6eef5901f4782c53886191b2f693f24761847c" [[package]] name = "libm" @@ -2620,12 +2789,6 @@ dependencies = [ "regex-automata 0.1.10", ] -[[package]] -name = "matchit" -version = "0.7.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" - [[package]] name = "md-5" version = "0.10.6" @@ -2668,7 +2831,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "83a4c4718a371ddfb7806378f23617876eea8b82e5ff1324516bcd283249d9ea" dependencies = [ "base64 0.21.5", - "hyper", + "hyper 0.14.27", "hyper-tls", "indexmap 1.9.3", "ipnet", @@ -2738,6 +2901,24 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "multer" +version = "3.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83e87776546dc87511aa5ee218730c92b666d7264ab6ed41f9d215af9cd5224b" +dependencies = [ + "bytes", + "encoding_rs", + "futures-util", + "http 1.1.0", + "httparse", + "memchr", + "mime", + "spin 0.9.8", + "tokio", + "version_check", +] + [[package]] name = "native-tls" version = "0.2.11" @@ -2762,6 +2943,18 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e4a24736216ec316047a1fc4252e27dabb04218aa4a3f37c6e7ddbf1f9782b54" +[[package]] +name = "nix" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab2156c4fce2f8df6c499cc1c763e4394b7482525bf2a9701c9d79d215f519e4" +dependencies = [ + "bitflags 2.4.1", + "cfg-if", + "cfg_aliases", + "libc", +] + [[package]] name = "nom" version = "7.1.3" @@ -2887,6 +3080,12 @@ version = "1.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d" +[[package]] +name = "opaque-debug" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381" + [[package]] name = "open-fastrlp" version = "0.1.4" @@ -3074,6 +3273,12 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" +[[package]] +name = "owo-colors" +version = "3.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1b04fb49957986fdce4d6ee7a65027d55d4b6d2265e5848bbb507b58ccfdb6f" + [[package]] name = "parity-scale-codec" version = "3.6.5" @@ -3373,6 +3578,115 @@ version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26072860ba924cbfa98ea39c8c19b4dd6a4a25423dbdf219c1eca91aa0cf6964" +[[package]] +name = "poem" +version = "3.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e88b6912ed1e8833d7c22c9c986c517f4518d7d37e3c04566d917c789aaea591" +dependencies = [ + "bytes", + "chrono", + "cookie", + "eyre", + "futures-util", + "headers 0.4.0", + "http 1.1.0", + "http-body-util", + "hyper 1.3.1", + "hyper-util", + "mime", + "multer", + "nix", + "parking_lot", + "percent-encoding", + "pin-project-lite", + "poem-derive", + "quick-xml", + "regex", + "rfc7239", + "serde", + "serde_json", + "serde_urlencoded", + "serde_yaml", + "smallvec", + "sync_wrapper 1.0.1", + "tempfile", + "thiserror", + "time", + "tokio", + "tokio-stream", + "tokio-util", + "tracing", + "wildmatch", +] + +[[package]] +name = "poem-derive" +version = "3.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2b961d58a6c53380c20236394381d9292fda03577f902b158f1638932964dcf" +dependencies = [ + "proc-macro-crate 3.1.0", + "proc-macro2", + "quote", + "syn 2.0.39", +] + +[[package]] +name = "poem-openapi" +version = "5.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6445b50be2e26f142d4e554d15773fc1e7510b994083c9625a65eba0d3f4287" +dependencies = [ + "base64 0.22.1", + "bytes", + "derive_more", + "futures-util", + "indexmap 2.1.0", + "mime", + "num-traits", + "poem", + "poem-openapi-derive", + "quick-xml", + "regex", + "serde", + "serde_json", + "serde_urlencoded", + "serde_yaml", + "thiserror", + "tokio", +] + +[[package]] +name = "poem-openapi-derive" +version = "5.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e890165626ff447a1ff3d6f2293e6ccacbf7fcbdd4c94086aa548de655735b03" +dependencies = [ + "darling", + "http 1.1.0", + "indexmap 2.1.0", + "mime", + "proc-macro-crate 3.1.0", + "proc-macro2", + "quote", + "regex", + "syn 2.0.39", + "thiserror", +] + +[[package]] +name = "polyval" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d1fe60d06143b2430aa532c94cfe9e29783047f06c0d7fd359a9a51b729fa25" +dependencies = [ + "cfg-if", + "cpufeatures", + "opaque-debug", + "universal-hash", +] + [[package]] name = "portable-atomic" version = "1.6.0" @@ -3449,6 +3763,15 @@ dependencies = [ "toml_edit 0.20.7", ] +[[package]] +name = "proc-macro-crate" +version = "3.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d37c51ca738a55da99dc0c4a34860fd675453b8b36209178c2249bb13651284" +dependencies = [ + "toml_edit 0.21.0", +] + [[package]] name = "proc-macro-error" version = "1.0.4" @@ -3513,6 +3836,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "quick-xml" +version = "0.31.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1004a344b30a54e2ee58d66a71b32d2db2feb0a31f9a2d302bf0536f15de2a33" +dependencies = [ + "memchr", + "serde", +] + [[package]] name = "quote" version = "1.0.33" @@ -3618,13 +3951,13 @@ dependencies = [ [[package]] name = "regex" -version = "1.10.2" +version = "1.10.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "380b951a9c5e80ddfd6136919eef32310721aa4aacd4889a8d39124b026ab343" +checksum = "c117dbdfde9c8308975b6a18d71f3f385c89461f7b3fb054288ecf2a2058ba4c" dependencies = [ "aho-corasick", "memchr", - "regex-automata 0.4.3", + "regex-automata 0.4.6", "regex-syntax 0.8.2", ] @@ -3639,9 +3972,9 @@ dependencies = [ [[package]] name = "regex-automata" -version = "0.4.3" +version = "0.4.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5f804c7828047e88b2d32e2d7fe5a105da8ee3264f01902f796c8e067dc2483f" +checksum = "86b83b8b9847f9bf95ef68afb0b8e6cdb80f498442f5179a29fad448fcc1eaea" dependencies = [ "aho-corasick", "memchr", @@ -3683,10 +4016,10 @@ dependencies = [ "encoding_rs", "futures-core", "futures-util", - "h2", + "h2 0.3.22", "http 0.2.11", "http-body 0.4.5", - "hyper", + "hyper 0.14.27", "hyper-rustls", "hyper-tls", "ipnet", @@ -3702,7 +4035,7 @@ dependencies = [ "serde", "serde_json", "serde_urlencoded", - "sync_wrapper", + "sync_wrapper 0.1.2", "system-configuration", "tokio", "tokio-native-tls", @@ -3726,6 +4059,15 @@ dependencies = [ "subtle", ] +[[package]] +name = "rfc7239" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b106a85eeb5b0336d16d6a20eab857f92861d4fbb1eb9a239866fb98fb6a1063" +dependencies = [ + "uncased", +] + [[package]] name = "ring" version = "0.16.20" @@ -4095,16 +4437,6 @@ dependencies = [ "serde", ] -[[package]] -name = "serde_path_to_error" -version = "0.1.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4beec8bce849d58d06238cb50db2e1c417cfeafa4c63f692b15c82b7c80f8335" -dependencies = [ - "itoa", - "serde", -] - [[package]] name = "serde_spanned" version = "0.6.4" @@ -4126,6 +4458,19 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_yaml" +version = "0.9.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a15e0ef66bf939a7c890a0bf6d5a733c70202225f9888a89ed5c62298b019129" +dependencies = [ + "indexmap 2.1.0", + "itoa", + "ryu", + "serde", + "unsafe-libyaml", +] + [[package]] name = "sha1" version = "0.10.6" @@ -4221,9 +4566,9 @@ dependencies = [ [[package]] name = "smallvec" -version = "1.11.2" +version = "1.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4dccd0940a2dcdf68d092b8cbab7dc0ad8fa938bf95787e1b916b0e3d0e8e970" +checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" [[package]] name = "socket2" @@ -4541,6 +4886,12 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" +[[package]] +name = "strsim" +version = "0.11.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" + [[package]] name = "strum" version = "0.25.0" @@ -4617,6 +4968,15 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2047c6ded9c721764247e62cd3b03c09ffc529b2ba5b10ec482ae507a4a70160" +[[package]] +name = "sync_wrapper" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7065abeca94b6a8a577f9bd45aa0867a2238b74e8eb67cf10d492bc39351394" +dependencies = [ + "futures-core", +] + [[package]] name = "system-configuration" version = "0.5.1" @@ -4968,49 +5328,6 @@ dependencies = [ "winnow", ] -[[package]] -name = "tower" -version = "0.4.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" -dependencies = [ - "futures-core", - "futures-util", - "pin-project", - "pin-project-lite", - "tokio", - "tower-layer", - "tower-service", - "tracing", -] - -[[package]] -name = "tower-http" -version = "0.4.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "61c5bb1d698276a2443e5ecfabc1008bf15a36c12e6a7176e7bf089ea9131140" -dependencies = [ - "base64 0.21.5", - "bitflags 2.4.1", - "bytes", - "futures-core", - "futures-util", - "http 0.2.11", - "http-body 0.4.5", - "http-range-header", - "mime", - "pin-project-lite", - "tower-layer", - "tower-service", - "tracing", -] - -[[package]] -name = "tower-layer" -version = "0.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c20c8dbed6283a09604c3e69b4b7eeb54e298b8a600d4d5ecb5ad39de609f1d0" - [[package]] name = "tower-service" version = "0.3.2" @@ -5062,6 +5379,16 @@ dependencies = [ "valuable", ] +[[package]] +name = "tracing-error" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d686ec1c0f384b1277f097b2f279a2ecc11afe8c133c1aabf036a27cb4cd206e" +dependencies = [ + "tracing", + "tracing-subscriber", +] + [[package]] name = "tracing-futures" version = "0.2.5" @@ -5169,28 +5496,31 @@ dependencies = [ "aws-smithy-runtime-api", "aws-smithy-types", "aws-types", - "axum", "base64 0.21.5", "bigdecimal 0.4.2", "chrono", "clap", + "color-eyre", "config", "dotenv", "ethers", "eyre", "futures", - "headers", + "headers 0.3.9", "hex", "hex-literal", "humantime", "humantime-serde", - "hyper", + "hyper 0.14.27", "indoc", "itertools 0.12.0", "metrics", "num-bigint", + "poem", + "poem-openapi", "postgres-docker-utils", "rand", + "regex", "reqwest", "serde", "serde_json", @@ -5203,11 +5533,12 @@ dependencies = [ "thiserror", "tokio", "toml 0.8.8", - "tower-http", "tracing", + "tracing-error", "tracing-subscriber", "url", "uuid 0.8.2", + "version", ] [[package]] @@ -5240,6 +5571,15 @@ version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eaea85b334db583fe3274d12b4cd1880032beab409c0d774be044d4480ab9a94" +[[package]] +name = "uncased" +version = "0.9.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e1b88fcfe09e89d3866a5c11019378088af2d24c3fbd4f0543f96b479ec90697" +dependencies = [ + "version_check", +] + [[package]] name = "unicode-bidi" version = "0.3.13" @@ -5279,6 +5619,22 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39ec24b3121d976906ece63c9daad25b85969647682eee313cb5779fdd69e14e" +[[package]] +name = "universal-hash" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc1de2c688dc15305988b563c3854064043356019f97a4b46276fe734c4f07ea" +dependencies = [ + "crypto-common", + "subtle", +] + +[[package]] +name = "unsafe-libyaml" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "673aac59facbab8a9007c7f6108d11f63b603f7cabff99fabf650fea5c32b861" + [[package]] name = "untrusted" version = "0.7.1" @@ -5348,6 +5704,12 @@ version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" +[[package]] +name = "version" +version = "3.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a449064fee414fcc201356a3e6c1510f6c8829ed28bb06b91c54ebe208ce065" + [[package]] name = "version_check" version = "0.9.4" @@ -5492,6 +5854,12 @@ version = "1.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "22fc3756b8a9133049b26c7f61ab35416c130e8c09b660f5b3958b446f52cc50" +[[package]] +name = "wildmatch" +version = "2.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3928939971918220fed093266b809d1ee4ec6c1a2d72692ff6876898f3b16c19" + [[package]] name = "winapi" version = "0.3.9" diff --git a/Cargo.toml b/Cargo.toml index 8ab0051..780be4d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,7 +26,6 @@ telemetry-batteries = { git = "https://github.com/worldcoin/telemetry-batteries" ## External async-trait = "0.1.74" -axum = { version = "0.6.20", features = ["headers"] } base64 = "0.21.5" bigdecimal = "0.4.2" chrono = "0.4" @@ -34,7 +33,8 @@ clap = { version = "4.3.0", features = ["env", "derive"] } config = "0.13.3" dotenv = "0.15.0" ethers = { version = "2.0.11", features = ["ws"] } -eyre = "0.6.5" +eyre = "0.6.3" +color-eyre = "0.6.3" futures = "0.3" headers = "0.3.9" hex = "0.4.3" @@ -45,6 +45,13 @@ hyper = "0.14.27" itertools = "0.12.0" metrics = "0.22.1" num-bigint = "0.4.4" +poem = { version = "3", features = ["eyre06"] } +poem-openapi = { version = "5", features = [ + "openapi-explorer", + "rapidoc", + "redoc", + "swagger-ui", +] } rand = "0.8.5" reqwest = { version = "0.11.24", default-features = false, features = [ "rustls-tls", @@ -66,7 +73,6 @@ strum = { version = "0.25.0", features = ["derive"] } thiserror = "1.0.50" tokio = { version = "1", features = ["macros", "rt-multi-thread"] } toml = "0.8.8" -tower-http = { version = "0.4.4", features = ["trace", "auth"] } tracing = { version = "0.1", features = ["log"] } tracing-subscriber = { version = "0.3", default-features = false, features = [ "env-filter", @@ -75,8 +81,11 @@ tracing-subscriber = { version = "0.3", default-features = false, features = [ "json", "ansi", ] } +tracing-error = "0.2.0" url = "2.4.1" uuid = { version = "0.8", features = ["v4"] } +version = "3.0.0" +regex = "1.10.4" [dev-dependencies] indoc = "2.0.3" diff --git a/README.md b/README.md index 960855d..70ef8c1 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,23 @@ # Tx Sitter Monolith -A monolithized version of the [tx-sitter](https://github.com/worldcoin/tx-sitter-aws/). +An easy to run transaction relayer. + +## Quickstart +Copy `.env.example` to `.env` or set `RUST_LOG=info,service=debug` to have logging. + +1. Spin up the database `docker run --rm -e POSTGRES_HOST_AUTH_METHOD=trust -p 5432:5432 postgres` +2. Spin up a chain `anvil --chain-id 31337 --block-time 2` +3. Start the service `cargo run` +4. Visit http://localhost:3000/swagger or http://localhost:3000/rapidoc to interact with the api. Redoc ui is also available at http://localhost:3000/redoc but it's not interactive. + +API schema can be downloaded from http://localhost:3000/schema.json or http://localhost:3000/schema.yml + +This will use the `config.toml` configuration. + +### Error reporting & debugging +For a better local development experience the `.env.example` enables color-eyre reporting. + +But that by default doesn't include the backtrace or code snippets. In order to enable snippets run with `RUST_LIB_BACKTRACE=full`. ## Configuration The Tx Sitter can be configured in 2 ways: @@ -31,17 +48,6 @@ The Tx Sitter can be configured in 2 ways: TX_SITTER__KEYS__KIND="local" ``` -## Testing locally -Copy `.env.example` to `.env` or set `RUST_LOG=info,service=debug` to have logging. - -1. Spin up the database `docker run --rm -e POSTGRES_HOST_AUTH_METHOD=trust -p 5432:5432 postgres` -2. Spin up the chain `anvil --chain-id 31337 --block-time 2` -3. Start the service `cargo run` - -This will use the `config.toml` configuration. - -If you have [nushell](https://www.nushell.sh/) installed, `nu manual_test.nu` can be run to execute a basic test. - ## Running tests While you obviously can run tests with ``` diff --git a/config.toml b/config.toml index 68a897d..94f8ff9 100644 --- a/config.toml +++ b/config.toml @@ -17,6 +17,7 @@ api_key = "G5CKNF3BTS2hRl60bpdYMNPqXvXsP-QZd2lrtmgctsk=" [server] host = "127.0.0.1:3000" disable_auth = false +server_address = "http://localhost:3000" [database] kind = "connection_string" diff --git a/db/migrations/005_network_rpc_contraints.sql b/db/migrations/005_network_rpc_contraints.sql new file mode 100644 index 0000000..4900137 --- /dev/null +++ b/db/migrations/005_network_rpc_contraints.sql @@ -0,0 +1,20 @@ +-- Step 1: Identify and remove duplicate rpcs +WITH ranked_rpcs AS ( + SELECT + id, + chain_id, + kind, + ROW_NUMBER() OVER (PARTITION BY chain_id, kind ORDER BY id DESC) AS rank + FROM + rpcs +) +DELETE FROM rpcs +WHERE id IN ( + SELECT id + FROM ranked_rpcs + WHERE rank > 1 +); + +-- Step 2: Add the uniqueness constraint +ALTER TABLE rpcs +ADD CONSTRAINT unique_chain_id_kind UNIQUE (chain_id, kind); diff --git a/src/api_key.rs b/src/api_key.rs index 06a1216..286e0f5 100644 --- a/src/api_key.rs +++ b/src/api_key.rs @@ -2,6 +2,10 @@ use std::borrow::Cow; use std::str::FromStr; use base64::Engine; +use poem_openapi::registry::{MetaSchema, MetaSchemaRef}; +use poem_openapi::types::{ + ParseError, ParseFromJSON, ParseFromParameter, ToJSON, +}; use rand::rngs::OsRng; use rand::Rng; use serde::Serialize; @@ -121,6 +125,64 @@ impl FromStr for ApiKey { } } +impl poem_openapi::types::Type for ApiKey { + const IS_REQUIRED: bool = true; + + type RawValueType = ApiKey; + + type RawElementValueType = ApiKey; + + fn name() -> std::borrow::Cow<'static, str> { + "string(api-key)".into() + } + + fn schema_ref() -> MetaSchemaRef { + let mut schema_ref = MetaSchema::new_with_format("string", "api-key"); + + schema_ref.example = Some(serde_json::Value::String( + "G5CKNF3BTS2hRl60bpdYMNPqXvXsP-QZd2lrtmgctsk=".to_string(), + )); + schema_ref.title = Some("Api Key".to_string()); + schema_ref.description = Some("Base64 encoded API key"); + + MetaSchemaRef::Inline(Box::new(schema_ref)) + } + + fn as_raw_value(&self) -> Option<&Self::RawValueType> { + Some(self) + } + + fn raw_element_iter<'a>( + &'a self, + ) -> Box + 'a> { + Box::new(self.as_raw_value().into_iter()) + } +} + +impl ParseFromJSON for ApiKey { + fn parse_from_json( + value: Option, + ) -> poem_openapi::types::ParseResult { + let value = value.ok_or_else(ParseError::expected_input)?; + + serde_json::from_value(value).map_err(ParseError::custom) + } +} + +impl ToJSON for ApiKey { + fn to_json(&self) -> Option { + serde_json::to_value(self).ok() + } +} + +impl ParseFromParameter for ApiKey { + fn parse_from_parameter( + value: &str, + ) -> poem_openapi::types::ParseResult { + value.parse().map_err(|_| ParseError::expected_input()) + } +} + impl ApiKey { pub fn reveal(&self) -> eyre::Result { let relayer_id = uuid::Uuid::parse_str(&self.relayer_id) diff --git a/src/broadcast_utils.rs b/src/broadcast_utils.rs index f1d791d..04d206c 100644 --- a/src/broadcast_utils.rs +++ b/src/broadcast_utils.rs @@ -34,7 +34,7 @@ pub async fn should_send_relayer_transactions( return Ok(false); } - for gas_limit in &relayer.gas_price_limits.0 { + for gas_limit in &relayer.gas_price_limits { let chain_fees = app .db .get_latest_block_fees_by_chain_id(relayer.chain_id) diff --git a/src/client.rs b/src/client.rs index c92f639..75e8581 100644 --- a/src/client.rs +++ b/src/client.rs @@ -2,19 +2,17 @@ use reqwest::Response; use thiserror::Error; use crate::api_key::ApiKey; -use crate::server::routes::network::NewNetworkInfo; -use crate::server::routes::relayer::{ +use crate::types::{ CreateApiKeyResponse, CreateRelayerRequest, CreateRelayerResponse, + GetTxResponse, NewNetworkInfo, RelayerUpdate, SendTxRequest, + SendTxResponse, }; -use crate::server::routes::transaction::{ - GetTxResponse, SendTxRequest, SendTxResponse, -}; -use crate::server::ApiError; -use crate::types::RelayerUpdate; pub struct TxSitterClient { client: reqwest::Client, url: String, + + credentials: Option<(String, String)>, } #[derive(Debug, Error)] @@ -26,7 +24,7 @@ pub enum ClientError { Serde(#[from] serde_json::Error), #[error("API error: {0}")] - TxSitter(#[from] ApiError), + TxSitter(String), #[error("Invalid API key: {0}")] InvalidApiKey(eyre::Error), @@ -37,14 +35,38 @@ impl TxSitterClient { Self { client: reqwest::Client::new(), url: url.to_string(), + credentials: None, } } + pub fn with_credentials( + mut self, + username: impl ToString, + password: impl ToString, + ) -> Self { + self.credentials = Some((username.to_string(), password.to_string())); + self + } + + fn creds(&self) -> (&str, &str) { + self.credentials + .as_ref() + .map(|(u, p)| (u.as_str(), p.as_str())) + .unwrap_or_default() + } + async fn post(&self, url: &str) -> Result where R: serde::de::DeserializeOwned, { - let response = self.client.post(url).send().await?; + let (username, password) = self.creds(); + + let response = self + .client + .post(url) + .basic_auth(username, Some(password)) + .send() + .await?; let response = Self::validate_response(response).await?; @@ -60,7 +82,15 @@ impl TxSitterClient { T: serde::Serialize, R: serde::de::DeserializeOwned, { - let response = self.client.post(url).json(&body).send().await?; + let (username, password) = self.creds(); + + let response = self + .client + .post(url) + .json(&body) + .basic_auth(username, Some(password)) + .send() + .await?; let response = Self::validate_response(response).await?; @@ -71,7 +101,14 @@ impl TxSitterClient { where R: serde::de::DeserializeOwned, { - let response = self.client.get(url).send().await?; + let (username, password) = self.creds(); + + let response = self + .client + .get(url) + .basic_auth(username, Some(password)) + .send() + .await?; let response = Self::validate_response(response).await?; @@ -82,7 +119,7 @@ impl TxSitterClient { response: Response, ) -> Result { if !response.status().is_success() { - let body: ApiError = response.json().await?; + let body: String = response.text().await?; return Err(ClientError::TxSitter(body)); } @@ -110,11 +147,21 @@ impl TxSitterClient { relayer_id: &str, relayer_update: RelayerUpdate, ) -> Result<(), ClientError> { - self.json_post( - &format!("{}/1/admin/relayer/{relayer_id}", self.url), - relayer_update, - ) - .await + let url: &str = &format!("{}/1/admin/relayer/{relayer_id}", self.url); + + let (username, password) = self.creds(); + + let response = self + .client + .post(url) + .json(&relayer_update) + .basic_auth(username, Some(password)) + .send() + .await?; + + let _response = Self::validate_response(response).await?; + + Ok(()) } pub async fn send_tx( @@ -164,3 +211,12 @@ impl TxSitterClient { Ok(()) } } + +impl ClientError { + pub fn tx_sitter(&self) -> Option<&str> { + match self { + Self::TxSitter(s) => Some(s), + _ => None, + } + } +} diff --git a/src/config.rs b/src/config.rs index 2bbde93..35dcf90 100644 --- a/src/config.rs +++ b/src/config.rs @@ -102,6 +102,9 @@ pub struct ServerConfig { pub username: Option, pub password: Option, + + // Optional address to show in API explorer + pub server_address: Option, } impl ServerConfig { @@ -292,6 +295,7 @@ mod tests { host: SocketAddr::from(([127, 0, 0, 1], 3000)), username: None, password: None, + server_address: None, }, database: DatabaseConfig::connection_string( "postgres://postgres:postgres@127.0.0.1:52804/database" @@ -319,6 +323,7 @@ mod tests { host: SocketAddr::from(([127, 0, 0, 1], 3000)), username: None, password: None, + server_address: None, }, database: DatabaseConfig::Parts(DbParts { host: "host".to_string(), diff --git a/src/db.rs b/src/db.rs index f4c0613..c28e971 100644 --- a/src/db.rs +++ b/src/db.rs @@ -11,12 +11,15 @@ use tracing::instrument; use crate::broadcast_utils::gas_estimation::FeesEstimate; use crate::config::DatabaseConfig; -use crate::types::{RelayerInfo, RelayerUpdate, TransactionPriority}; +use crate::types::wrappers::h256::H256Wrapper; +use crate::types::{ + NetworkInfo, RelayerInfo, RelayerUpdate, TransactionPriority, TxStatus, +}; pub mod data; -use self::data::{BlockFees, H256Wrapper, NetworkStats, ReadTxData, RpcKind}; -pub use self::data::{TxForEscalation, TxStatus, UnsentTx}; +use self::data::{BlockFees, NetworkStats, ReadTxData, RpcKind}; +pub use self::data::{TxForEscalation, UnsentTx}; // Statically link in migration files static MIGRATOR: Migrator = sqlx::migrate!("db/migrations"); @@ -214,7 +217,10 @@ impl Database { } #[instrument(skip(self), level = "debug")] - pub async fn get_relayer(&self, id: &str) -> eyre::Result { + pub async fn get_relayer( + &self, + id: &str, + ) -> eyre::Result> { Ok(sqlx::query_as( r#" SELECT @@ -234,7 +240,7 @@ impl Database { "#, ) .bind(id) - .fetch_one(&self.pool) + .fetch_optional(&self.pool) .await?) } @@ -963,7 +969,7 @@ impl Database { } #[instrument(skip(self), level = "debug")] - pub async fn create_network( + pub async fn upsert_network( &self, chain_id: u64, name: &str, @@ -976,7 +982,8 @@ impl Database { r#" INSERT INTO networks (chain_id, name) VALUES ($1, $2) - ON CONFLICT DO NOTHING + ON CONFLICT (chain_id) DO UPDATE + SET name = EXCLUDED.name "#, ) .bind(chain_id as i64) @@ -988,14 +995,27 @@ impl Database { r#" INSERT INTO rpcs (chain_id, url, kind) VALUES - ($1, $2, $3), - ($1, $4, $5) - ON CONFLICT DO NOTHING + ($1, $2, $3) + ON CONFLICT (chain_id, kind) DO UPDATE + SET url = EXCLUDED.url "#, ) .bind(chain_id as i64) .bind(http_rpc) .bind(RpcKind::Http) + .execute(tx.as_mut()) + .await?; + + sqlx::query( + r#" + INSERT INTO rpcs (chain_id, url, kind) + VALUES + ($1, $2, $3) + ON CONFLICT (chain_id, kind) DO UPDATE + SET url = EXCLUDED.url + "#, + ) + .bind(chain_id as i64) .bind(ws_rpc) .bind(RpcKind::Ws) .execute(tx.as_mut()) @@ -1043,7 +1063,40 @@ impl Database { } #[instrument(skip(self), level = "debug")] - pub async fn create_api_key( + pub async fn get_networks(&self) -> eyre::Result> { + Ok(sqlx::query_as( + r#" + SELECT networks.chain_id, name, http.url as http_rpc, ws.url as ws_rpc + FROM networks + INNER JOIN rpcs http ON networks.chain_id = http.chain_id AND http.kind = 'http' + INNER JOIN rpcs ws ON networks.chain_id = ws.chain_id AND ws.kind = 'ws' + "#, + ) + .fetch_all(&self.pool) + .await?) + } + + #[instrument(skip(self), level = "debug")] + pub async fn get_network( + &self, + chain_id: u64, + ) -> eyre::Result> { + Ok(sqlx::query_as( + r#" + SELECT networks.chain_id, name, http.url as http_rpc, ws.url as ws_rpc + FROM networks + INNER JOIN rpcs http ON networks.chain_id = http.chain_id AND http.kind = 'http' + INNER JOIN rpcs ws ON networks.chain_id = ws.chain_id AND ws.kind = 'ws' + WHERE networks.chain_id = $1 + "#, + ) + .bind(chain_id as i64) + .fetch_optional(&self.pool) + .await?) + } + + #[instrument(skip(self), level = "debug")] + pub async fn upsert_api_key( &self, relayer_id: &str, api_key_hash: [u8; 32], @@ -1201,7 +1254,6 @@ mod tests { use postgres_docker_utils::DockerContainerGuard; use super::*; - use crate::db::data::U256Wrapper; use crate::types::RelayerGasPriceLimit; async fn setup_db() -> eyre::Result<(Database, DockerContainerGuard)> { @@ -1255,7 +1307,7 @@ mod tests { let http_rpc = "http_rpc"; let ws_rpc = "ws_rpc"; - db.create_network(chain_id, network_name, http_rpc, ws_rpc) + db.upsert_network(chain_id, network_name, http_rpc, ws_rpc) .await?; let relayer_id = uuid(); @@ -1314,7 +1366,7 @@ mod tests { let http_rpc = "http_rpc"; let ws_rpc = "ws_rpc"; - db.create_network(chain_id, network_name, http_rpc, ws_rpc) + db.upsert_network(chain_id, network_name, http_rpc, ws_rpc) .await?; let relayer_id = uuid(); @@ -1333,7 +1385,10 @@ mod tests { ) .await?; - let relayer = db.get_relayer(relayer_id).await?; + let relayer = db + .get_relayer(relayer_id) + .await? + .context("Missing relayer")?; assert_eq!(relayer.id, relayer_id); assert_eq!(relayer.name, relayer_name); @@ -1343,7 +1398,7 @@ mod tests { assert_eq!(relayer.nonce, 0); assert_eq!(relayer.current_nonce, 0); assert_eq!(relayer.max_inflight_txs, 5); - assert_eq!(relayer.gas_price_limits.0, vec![]); + assert_eq!(relayer.gas_price_limits, vec![]); db.update_relayer( relayer_id, @@ -1353,14 +1408,17 @@ mod tests { max_queued_txs: Some(20), gas_price_limits: Some(vec![RelayerGasPriceLimit { chain_id: 1, - value: U256Wrapper(U256::from(10_123u64)), + value: U256::from(10_123u64).into(), }]), enabled: None, }, ) .await?; - let relayer = db.get_relayer(relayer_id).await?; + let relayer = db + .get_relayer(relayer_id) + .await? + .context("Missing relayer")?; assert_eq!(relayer.id, relayer_id); assert_eq!(relayer.name, relayer_name); @@ -1372,10 +1430,10 @@ mod tests { assert_eq!(relayer.max_inflight_txs, 10); assert_eq!(relayer.max_queued_txs, 20); assert_eq!( - relayer.gas_price_limits.0, + relayer.gas_price_limits, vec![RelayerGasPriceLimit { chain_id: 1, - value: U256Wrapper(U256::from(10_123u64)), + value: U256::from(10_123u64).into(), }] ); @@ -1391,7 +1449,7 @@ mod tests { let http_rpc = "http_rpc"; let ws_rpc = "ws_rpc"; - db.create_network(chain_id, network_name, http_rpc, ws_rpc) + db.upsert_network(chain_id, network_name, http_rpc, ws_rpc) .await?; let relayer_id = uuid(); diff --git a/src/db/data.rs b/src/db/data.rs index 8720a6a..b92f1c8 100644 --- a/src/db/data.rs +++ b/src/db/data.rs @@ -1,12 +1,12 @@ -use ethers::types::{Address, H256, U256}; +use ethers::types::U256; use serde::{Deserialize, Serialize}; -use sqlx::database::{HasArguments, HasValueRef}; -use sqlx::postgres::{PgHasArrayType, PgTypeInfo}; use sqlx::prelude::FromRow; -use sqlx::Database; use crate::broadcast_utils::gas_estimation::FeesEstimate; -use crate::types::TransactionPriority; +use crate::types::wrappers::address::AddressWrapper; +use crate::types::wrappers::h256::H256Wrapper; +use crate::types::wrappers::hex_u256::HexU256; +use crate::types::{TransactionPriority, TxStatus}; #[derive(Debug, Clone, FromRow)] pub struct UnsentTx { @@ -14,8 +14,8 @@ pub struct UnsentTx { pub id: String, pub tx_to: AddressWrapper, pub data: Vec, - pub value: U256Wrapper, - pub gas_limit: U256Wrapper, + pub value: HexU256, + pub gas_limit: HexU256, pub priority: TransactionPriority, #[sqlx(try_from = "i64")] pub nonce: u64, @@ -31,16 +31,16 @@ pub struct TxForEscalation { pub id: String, pub tx_to: AddressWrapper, pub data: Vec, - pub value: U256Wrapper, - pub gas_limit: U256Wrapper, + pub value: HexU256, + pub gas_limit: HexU256, #[sqlx(try_from = "i64")] pub nonce: u64, pub blobs: Option>>, pub key_id: String, #[sqlx(try_from = "i64")] pub chain_id: u64, - pub initial_max_fee_per_gas: U256Wrapper, - pub initial_max_priority_fee_per_gas: U256Wrapper, + pub initial_max_fee_per_gas: HexU256, + pub initial_max_priority_fee_per_gas: HexU256, #[sqlx(try_from = "i64")] pub escalation_count: usize, } @@ -50,8 +50,8 @@ pub struct ReadTxData { pub tx_id: String, pub to: AddressWrapper, pub data: Vec, - pub value: U256Wrapper, - pub gas_limit: U256Wrapper, + pub value: HexU256, + pub gas_limit: HexU256, #[sqlx(try_from = "i64")] pub nonce: u64, pub blobs: Option>>, @@ -76,173 +76,6 @@ pub struct BlockFees { pub gas_price: U256, } -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -#[serde(transparent)] -pub struct AddressWrapper(pub Address); - -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -#[serde(transparent)] -pub struct U256Wrapper(pub U256); - -#[derive(Debug, Clone, PartialEq, Eq)] -pub struct H256Wrapper(pub H256); - -impl<'r, DB> sqlx::Decode<'r, DB> for AddressWrapper -where - DB: Database, - Vec: sqlx::Decode<'r, DB>, -{ - fn decode( - value: >::ValueRef, - ) -> Result { - let bytes = as sqlx::Decode>::decode(value)?; - - let address = Address::from_slice(&bytes); - - Ok(Self(address)) - } -} - -impl sqlx::Type for AddressWrapper -where - Vec: sqlx::Type, -{ - fn type_info() -> DB::TypeInfo { - as sqlx::Type>::type_info() - } - - fn compatible(ty: &DB::TypeInfo) -> bool { - *ty == Self::type_info() - } -} - -impl From
for AddressWrapper { - fn from(value: Address) -> Self { - Self(value) - } -} - -impl<'r, DB> sqlx::Decode<'r, DB> for U256Wrapper -where - DB: Database, - [u8; 32]: sqlx::Decode<'r, DB>, -{ - fn decode( - value: >::ValueRef, - ) -> Result { - let bytes = <[u8; 32] as sqlx::Decode>::decode(value)?; - - let value = U256::from_big_endian(&bytes); - - Ok(Self(value)) - } -} - -impl sqlx::Type for U256Wrapper -where - [u8; 32]: sqlx::Type, -{ - fn type_info() -> DB::TypeInfo { - <[u8; 32] as sqlx::Type>::type_info() - } - - fn compatible(ty: &DB::TypeInfo) -> bool { - *ty == Self::type_info() - } -} - -impl<'q, DB> sqlx::Encode<'q, DB> for U256Wrapper -where - DB: Database, - [u8; 32]: sqlx::Encode<'q, DB>, -{ - fn encode_by_ref( - &self, - buf: &mut >::ArgumentBuffer, - ) -> sqlx::encode::IsNull { - let mut bytes = [0u8; 32]; - self.0.to_big_endian(&mut bytes); - - <[u8; 32] as sqlx::Encode>::encode_by_ref(&bytes, buf) - } -} - -impl From for U256Wrapper { - fn from(value: U256) -> Self { - Self(value) - } -} - -impl<'r, DB> sqlx::Decode<'r, DB> for H256Wrapper -where - DB: Database, - [u8; 32]: sqlx::Decode<'r, DB>, -{ - fn decode( - value: >::ValueRef, - ) -> Result { - let bytes = <[u8; 32] as sqlx::Decode>::decode(value)?; - - let value = H256::from_slice(&bytes); - - Ok(Self(value)) - } -} - -impl<'q, DB> sqlx::Encode<'q, DB> for H256Wrapper -where - DB: Database, - [u8; 32]: sqlx::Encode<'q, DB>, -{ - fn encode_by_ref( - &self, - buf: &mut >::ArgumentBuffer, - ) -> sqlx::encode::IsNull { - <[u8; 32] as sqlx::Encode>::encode_by_ref(&self.0 .0, buf) - } -} - -impl PgHasArrayType for H256Wrapper { - fn array_type_info() -> PgTypeInfo { - <[u8; 32] as PgHasArrayType>::array_type_info() - } -} - -impl sqlx::Type for H256Wrapper -where - [u8; 32]: sqlx::Type, -{ - fn type_info() -> DB::TypeInfo { - <[u8; 32] as sqlx::Type>::type_info() - } - - fn compatible(ty: &DB::TypeInfo) -> bool { - *ty == Self::type_info() - } -} - -#[derive( - Debug, Clone, Serialize, Deserialize, Copy, PartialEq, Eq, sqlx::Type, -)] -#[sqlx(rename_all = "camelCase")] -#[sqlx(type_name = "tx_status")] -#[serde(rename_all = "camelCase")] -pub enum TxStatus { - Pending, - Mined, - Finalized, -} - -impl TxStatus { - pub fn previous(self) -> Self { - match self { - Self::Pending => Self::Pending, - Self::Mined => Self::Pending, - Self::Finalized => Self::Mined, - } - } -} - #[derive( Debug, Clone, Serialize, Deserialize, Copy, PartialEq, Eq, sqlx::Type, )] diff --git a/src/main.rs b/src/main.rs index 947d452..d9cb7a9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -23,6 +23,13 @@ struct Args { #[clap(short, long)] env_file: Vec, + + /// Installs color-eyre hooks for better messages + /// + /// Useful for local testing and debugging + /// Not very useful for production deployments + #[clap(short = 'E', long, env)] + color_eyre: bool, } #[tokio::main] @@ -30,11 +37,17 @@ async fn main() -> eyre::Result<()> { let args = Args::parse(); dotenv::dotenv().ok(); - for path in &args.env_file { dotenv::from_path(path)?; } + // Reparse the args to account for newly loaded env vars + let args = Args::parse(); + + if args.color_eyre { + color_eyre::install()?; + } + let config = load_config(args.config.iter().map(PathBuf::as_ref))?; let _tracing_shutdown_handle = @@ -61,6 +74,7 @@ async fn main() -> eyre::Result<()> { tracing_subscriber::registry() .with(tracing_subscriber::fmt::layer().pretty().compact()) .with(tracing_subscriber::EnvFilter::from_default_env()) + .with(tracing_error::ErrorLayer::default()) .init(); TracingShutdownHandle diff --git a/src/serde_utils.rs b/src/serde_utils.rs index 596953b..b54f388 100644 --- a/src/serde_utils.rs +++ b/src/serde_utils.rs @@ -1,2 +1 @@ pub mod base64_binary; -pub mod decimal_u256; diff --git a/src/serde_utils/decimal_u256.rs b/src/serde_utils/decimal_u256.rs deleted file mode 100644 index f37e802..0000000 --- a/src/serde_utils/decimal_u256.rs +++ /dev/null @@ -1,42 +0,0 @@ -use ethers::types::U256; - -pub fn serialize(u256: &U256, serializer: S) -> Result -where - S: serde::Serializer, -{ - let s = u256.to_string(); - serializer.serialize_str(&s) -} - -pub fn deserialize<'de, D>(deserializer: D) -> Result -where - D: serde::Deserializer<'de>, -{ - let s: &str = serde::Deserialize::deserialize(deserializer)?; - let u256 = U256::from_dec_str(s).map_err(serde::de::Error::custom)?; - Ok(u256) -} - -#[cfg(test)] -mod tests { - use serde::{Deserialize, Serialize}; - - use super::*; - - #[derive(Debug, Clone, Serialize, Deserialize)] - struct Test { - #[serde(with = "super")] - v: U256, - } - - #[test] - fn test_u256_serde() { - let test = Test { v: U256::from(123) }; - - let s = serde_json::to_string(&test).unwrap(); - assert_eq!(s, r#"{"v":"123"}"#); - - let test: Test = serde_json::from_str(&s).unwrap(); - assert_eq!(test.v, U256::from(123)); - } -} diff --git a/src/server.rs b/src/server.rs index 3a9b9ac..020c909 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,77 +1,464 @@ +use std::net::SocketAddr; use std::sync::Arc; -use axum::routing::{get, post, IntoMakeService}; -use axum::Router; -use hyper::server::conn::AddrIncoming; -use tower_http::validate_request::ValidateRequestHeaderLayer; +use ethers::signers::Signer; +use eyre::ContextCompat; +use poem::http::StatusCode; +use poem::listener::{Acceptor, Listener, TcpListener}; +use poem::middleware::Cors; +use poem::web::{Data, LocalAddr}; +use poem::{EndpointExt, Result, Route}; +use poem_openapi::param::{Path, Query}; +use poem_openapi::payload::Json; +use poem_openapi::{ApiResponse, OpenApi, OpenApiService}; +use security::BasicAuth; +use serde_json::Value; +use url::Url; -use self::routes::relayer::{ - create_relayer, create_relayer_api_key, get_relayer, get_relayers, - purge_unsent_txs, relayer_rpc, update_relayer, -}; -use self::routes::transaction::{get_tx, get_txs, send_tx}; -use self::trace_layer::MatchedPathMakeSpan; +use crate::api_key::ApiKey; use crate::app::App; +use crate::service::Service; +use crate::task_runner::TaskRunner; +use crate::types::{ + CreateApiKeyResponse, CreateRelayerRequest, CreateRelayerResponse, + GetTxResponse, NetworkInfo, NewNetworkInfo, RelayerInfo, RelayerUpdate, + RpcRequest, SendTxRequest, SendTxResponse, TxStatus, +}; + +mod security; +mod trace_middleware; + +struct AdminApi; + +#[OpenApi(prefix_path = "/1/admin/")] +impl AdminApi { + /// Create Relayer + #[oai(path = "/relayer", method = "post")] + async fn create_relayer( + &self, + basic_auth: BasicAuth, + Data(app): Data<&Arc>, + Json(req): Json, + ) -> Result> { + basic_auth.validate(app).await?; + + let (key_id, signer) = app.keys_source.new_signer(&req.name).await?; + + let address = signer.address(); + + let relayer_id = uuid::Uuid::new_v4(); + let relayer_id = relayer_id.to_string(); + + app.db + .create_relayer( + &relayer_id, + &req.name, + req.chain_id, + &key_id, + address, + ) + .await?; + + Ok(Json(CreateRelayerResponse { + relayer_id, + address: address.into(), + })) + } + + /// Get Relayers + #[oai(path = "/relayers", method = "get")] + async fn get_relayers( + &self, + basic_auth: BasicAuth, + Data(app): Data<&Arc>, + ) -> Result>> { + basic_auth.validate(app).await?; + + let relayer_info = app.db.get_relayers().await?; + + Ok(Json(relayer_info)) + } + + /// Get Relayer + #[oai(path = "/relayer/:relayer_id", method = "get")] + async fn get_relayer( + &self, + basic_auth: BasicAuth, + Data(app): Data<&Arc>, + Path(relayer_id): Path, + ) -> Result> { + basic_auth.validate(app).await?; + + let relayer_info = app.db.get_relayer(&relayer_id).await?; + + match relayer_info { + Some(relayer_info) => Ok(Json(relayer_info)), + None => Err(poem::error::Error::from_string( + "Relayer not found".to_string(), + StatusCode::NOT_FOUND, + )), + } + } + + /// Update Relayer + #[oai(path = "/relayer/:relayer_id", method = "post")] + async fn update_relayer( + &self, + basic_auth: BasicAuth, + Data(app): Data<&Arc>, + Path(relayer_id): Path, + Json(req): Json, + ) -> Result<()> { + basic_auth.validate(app).await?; + + app.db.update_relayer(&relayer_id, &req).await?; -mod error; -mod middleware; -pub mod routes; -mod trace_layer; + Ok(()) + } + + /// Reset Relayer transactions + /// + /// Purges unsent transactions, useful for unstucking the relayer + #[oai(path = "/relayer/:relayer_id/reset", method = "post")] + async fn purge_unsent_txs( + &self, + basic_auth: BasicAuth, + Data(app): Data<&Arc>, + Path(relayer_id): Path, + ) -> Result<()> { + basic_auth.validate(app).await?; + + app.db.purge_unsent_txs(&relayer_id).await?; + + Ok(()) + } + + /// Create Relayer API Key + #[oai(path = "/relayer/:relayer_id/key", method = "post")] + async fn create_relayer_api_key( + &self, + basic_auth: BasicAuth, + Data(app): Data<&Arc>, + Path(relayer_id): Path, + ) -> Result> { + basic_auth.validate(app).await?; + + let api_key = ApiKey::random(&relayer_id); + + app.db + .upsert_api_key(&relayer_id, api_key.api_key_secret_hash()) + .await?; + + Ok(Json(CreateApiKeyResponse { api_key })) + } + + /// Create Network + #[oai(path = "/network/:chain_id", method = "post")] + async fn create_network( + &self, + basic_auth: BasicAuth, + Data(app): Data<&Arc>, + Path(chain_id): Path, + Json(network): Json, + ) -> Result<()> { + basic_auth.validate(app).await?; -pub use self::error::ApiError; + let http_url: Url = network + .http_rpc + .parse::() + .map_err(poem::error::BadRequest)?; -pub async fn serve(app: Arc) -> eyre::Result<()> { - let server = spawn_server(app).await?; + let ws_url: Url = network + .ws_rpc + .parse::() + .map_err(poem::error::BadRequest)?; - tracing::info!("Listening on {}", server.local_addr()); + app.db + .upsert_network( + chain_id, + &network.name, + http_url.as_str(), + ws_url.as_str(), + ) + .await?; - server.await?; + let task_runner = TaskRunner::new(app.clone()); + Service::spawn_chain_tasks(&task_runner, chain_id)?; - Ok(()) + Ok(()) + } + + /// Get Networks + #[oai(path = "/networks", method = "get")] + async fn list_networks( + &self, + basic_auth: BasicAuth, + Data(app): Data<&Arc>, + ) -> Result>> { + basic_auth.validate(app).await?; + + let networks = app.db.get_networks().await?; + + Ok(Json(networks)) + } } -pub async fn spawn_server( - app: Arc, -) -> eyre::Result>> { - let api_routes = Router::new() - .route("/:api_token/tx", post(send_tx)) - .route("/:api_token/tx/:tx_id", get(get_tx)) - .route("/:api_token/txs", get(get_txs)) - .route("/:api_token/rpc", post(relayer_rpc)) - .with_state(app.clone()); - - let mut admin_routes = Router::new() - .route("/relayer", post(create_relayer)) - .route("/relayer/:relayer_id/reset", post(purge_unsent_txs)) - .route("/relayers", get(get_relayers)) - .route( - "/relayer/:relayer_id", - post(update_relayer).get(get_relayer), - ) - .route("/relayer/:relayer_id/key", post(create_relayer_api_key)) - .route("/network/:chain_id", post(routes::network::create_network)) - .with_state(app.clone()); - - if let Some((username, password)) = app.config.server.credentials() { - admin_routes = admin_routes - .layer(ValidateRequestHeaderLayer::basic(username, password)); +struct RelayerApi; + +#[OpenApi(prefix_path = "/1/api/")] +impl RelayerApi { + /// Send Transaction + #[oai(path = "/:api_token/tx", method = "post")] + async fn send_tx( + &self, + Data(app): Data<&Arc>, + Path(api_token): Path, + Json(req): Json, + ) -> Result> { + api_token.validate(app).await?; + + tracing::info!(?req, "Send tx"); + + let tx_id = if let Some(id) = req.tx_id { + id + } else { + uuid::Uuid::new_v4().to_string() + }; + + let relayer = app + .db + .get_relayer(api_token.relayer_id()) + .await? + .context("Missing relayer")?; + + if !relayer.enabled { + return Err(poem::error::Error::from_string( + "Relayer is disabled".to_string(), + StatusCode::FORBIDDEN, + )); + } + + let relayer_queued_tx_count = app + .db + .get_relayer_pending_txs(api_token.relayer_id()) + .await?; + + if relayer_queued_tx_count > relayer.max_queued_txs as usize { + return Err(poem::error::Error::from_string( + "Relayer queue is full".to_string(), + StatusCode::TOO_MANY_REQUESTS, + )); + } + + app.db + .create_transaction( + &tx_id, + req.to.0, + req.data.as_ref().map(|d| &d.0[..]).unwrap_or(&[]), + req.value.0, + req.gas_limit.0, + req.priority, + req.blobs, + api_token.relayer_id(), + ) + .await?; + + tracing::info!(tx_id, "Transaction created"); + + Ok(Json(SendTxResponse { tx_id })) } - let v1_routes = Router::new() - .nest("/api", api_routes) - .nest("/admin", admin_routes); + /// Get Transaction + #[oai(path = "/:api_token/tx/:tx_id", method = "get")] + async fn get_tx( + &self, + Data(app): Data<&Arc>, + Path(api_token): Path, + Path(tx_id): Path, + ) -> Result> { + api_token.validate(app).await?; + + let tx = app.db.read_tx(&tx_id).await?.ok_or_else(|| { + poem::error::Error::from_string( + "Transaction not found".to_string(), + StatusCode::NOT_FOUND, + ) + })?; + + let get_tx_response = GetTxResponse { + tx_id: tx.tx_id, + to: tx.to, + data: if tx.data.is_empty() { + None + } else { + Some(tx.data.into()) + }, + value: tx.value.into(), + gas_limit: tx.gas_limit.into(), + nonce: tx.nonce, + tx_hash: tx.tx_hash, + status: tx.status, + }; + + Ok(Json(get_tx_response)) + } + + /// Get Transactions + #[oai(path = "/:api_token/txs", method = "get")] + async fn get_txs( + &self, + Data(app): Data<&Arc>, + Path(api_token): Path, + /// Optional tx status to filter by + Query(status): Query>, + /// Fetch unsent txs, overrides the status query + #[oai(default = "default_false")] + Query(unsent): Query, + ) -> Result>> { + api_token.validate(app).await?; + + let txs = if unsent { + app.db.read_txs(api_token.relayer_id(), Some(None)).await? + } else if let Some(status) = status { + app.db + .read_txs(api_token.relayer_id(), Some(Some(status))) + .await? + } else { + app.db.read_txs(api_token.relayer_id(), None).await? + }; + + let txs = txs + .into_iter() + .map(|tx| GetTxResponse { + tx_id: tx.tx_id, + to: tx.to, + data: if tx.data.is_empty() { + None + } else { + Some(tx.data.into()) + }, + value: tx.value.into(), + gas_limit: tx.gas_limit.into(), + nonce: tx.nonce, + tx_hash: tx.tx_hash, + status: tx.status, + }) + .collect(); + + Ok(Json(txs)) + } + + /// Relayer RPC + #[oai(path = "/:api_token/rpc", method = "post")] + async fn relayer_rpc( + &self, + Data(app): Data<&Arc>, + Path(api_token): Path, + Json(req): Json, + ) -> Result> { + api_token.validate(app).await?; + + let relayer_info = app + .db + .get_relayer(api_token.relayer_id()) + .await? + .context("Missing relayer")?; + + // TODO: Cache? + let http_provider = app.http_provider(relayer_info.chain_id).await?; + let url = http_provider.url(); + + let response = reqwest::Client::new() + .post(url.clone()) + .json(&req) + .send() + .await + .map_err(|err| { + eyre::eyre!("Error sending request to {}: {}", url, err) + })?; + + let response: Value = response.json().await.map_err(|err| { + eyre::eyre!("Error parsing response from {}: {}", url, err) + })?; + + Ok(Json(response)) + } +} + +fn default_false() -> bool { + false +} + +struct ServiceApi; + +#[derive(ApiResponse)] +enum ServiceResponse { + #[oai(status = 200)] + Healthy, +} + +#[OpenApi] +impl ServiceApi { + /// Health + #[oai(path = "/", method = "get")] + async fn health(&self) -> ServiceResponse { + ServiceResponse::Healthy + } +} + +pub struct ServerHandle { + pub local_addrs: Vec, + pub server_handle: tokio::task::JoinHandle>, +} + +impl ServerHandle { + pub fn local_addr(&self) -> SocketAddr { + self.local_addrs + .iter() + .filter_map(|addr| addr.as_socket_addr()) + .next() + .cloned() + .expect("Not bound to any address") + } +} + +pub async fn spawn_server(app: Arc) -> eyre::Result { + let mut api_service = OpenApiService::new( + (AdminApi, RelayerApi, ServiceApi), + "Tx Sitter", + version::version!(), + ) + .description(include_str!("./server/description.md")); + + if let Some(server_address) = app.config.server.server_address.as_ref() { + api_service = api_service.server(server_address.clone()); + } + + let router = Route::new() + .nest("/rapidoc", api_service.rapidoc()) + .nest("/swagger", api_service.swagger_ui()) + .nest("/redoc", api_service.redoc()) + .nest("/schema.json", api_service.spec_endpoint()) + .nest("/schema.yml", api_service.spec_endpoint_yaml()) + .nest("/", api_service) + .with(Cors::new()) + .with(trace_middleware::TraceMiddleware) + .data(app.clone()); + + let listener = TcpListener::bind(app.config.server.host); + let acceptor = listener.into_acceptor().await?; + + let local_addrs = acceptor.local_addr(); - let router = Router::new() - .nest("/1", v1_routes) - .route("/health", get(routes::health)) - .layer( - tower_http::trace::TraceLayer::new_for_http() - .make_span_with(MatchedPathMakeSpan), - ) - .layer(axum::middleware::from_fn(middleware::log_response)); + let server = poem::Server::new_with_acceptor(acceptor); - let server = axum::Server::bind(&app.config.server.host) - .serve(router.into_make_service()); + let server_handle = tokio::spawn(async move { + server.run(router).await?; + Ok(()) + }); - Ok(server) + Ok(ServerHandle { + local_addrs, + server_handle, + }) } diff --git a/src/server/description.md b/src/server/description.md new file mode 100644 index 0000000..da9f44f --- /dev/null +++ b/src/server/description.md @@ -0,0 +1,22 @@ +A transaction relayer service! + +## Operating a relayer +Below is a guide on using this service. Note that septs 1 through 4 require authentication using HTTP Basic auth. Using swagger explorer make sure to click the authorize button and use the correct credentials. Default dev creds are `admin:admin`. + +### 1. Setup a network +tx-sitter keeps track of supported networks in its internal database. In order to be able to create any relayers at least one network must be present. To add a network use the `POST /1/admin/networks/:chain_id` endpoint. + +To see the list of currently added networks use the `GET /1/admin/networks` endpoint. + +### 2. Create a relayer +A relayer is an abstraction layer on top of a private key stored locally (for testing purposes only!) or using a secrets manager (currently only AWS KMS is supported). + +To create a relayer use the `POST /1/admin/relayer` endpoint. The data returned will contain a relayer id, make sure to copy it to the clipboard. + +### 3. Create an API key +By itself a relayer is not very useful. In order to send transactions one must create an API key. To do that use the `POST /1/admin/relayer/:relayer_id/key` endpoint. **Make sure to copy the API key from the response. It's not possible to recover it!** But it's always possible to create a new one. + +### 4. Use the API key +Once an API keys has been created it's possible to use the relayer api to, among other things, send transactions. + +You can use the `POST /1/api/:api_token/tx` endpoint to create a transaction. diff --git a/src/server/error.rs b/src/server/error.rs deleted file mode 100644 index 2b2456b..0000000 --- a/src/server/error.rs +++ /dev/null @@ -1,123 +0,0 @@ -use axum::response::IntoResponse; -use hyper::StatusCode; -use serde::{Deserialize, Serialize}; -use thiserror::Error; - -#[derive(Debug, Error, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub enum ApiError { - #[error("Invalid key encoding")] - KeyEncoding, - - #[error("Invalid key length")] - KeyLength, - - #[error("Unauthorized")] - Unauthorized, - - #[error("Invalid format")] - InvalidFormat, - - #[error("Missing tx")] - MissingTx, - - #[error("Relayer is disabled")] - RelayerDisabled, - - #[error("Too many queued transactions, max: {max}, current: {current}")] - TooManyTransactions { max: usize, current: usize }, - - #[error("Internal error {0}")] - #[serde(with = "serde_eyre")] - Other(#[from] eyre::Report), -} - -impl IntoResponse for ApiError { - fn into_response(self) -> axum::response::Response { - let status_code = match self { - Self::KeyLength | Self::KeyEncoding => StatusCode::BAD_REQUEST, - Self::Unauthorized => StatusCode::UNAUTHORIZED, - Self::Other(_) => StatusCode::INTERNAL_SERVER_ERROR, - Self::InvalidFormat => StatusCode::BAD_REQUEST, - Self::MissingTx => StatusCode::NOT_FOUND, - Self::RelayerDisabled => StatusCode::FORBIDDEN, - Self::TooManyTransactions { .. } => StatusCode::TOO_MANY_REQUESTS, - }; - - let message = serde_json::to_string(&self) - .expect("Failed to serialize error message"); - - (status_code, message).into_response() - } -} - -// Mostly used for tests -impl PartialEq for ApiError { - fn eq(&self, other: &Self) -> bool { - match (self, other) { - ( - Self::TooManyTransactions { - max: l_max, - current: l_current, - }, - Self::TooManyTransactions { - max: r_max, - current: r_current, - }, - ) => l_max == r_max && l_current == r_current, - (Self::Other(l0), Self::Other(r0)) => { - l0.to_string() == r0.to_string() - } - _ => { - core::mem::discriminant(self) == core::mem::discriminant(other) - } - } - } -} - -mod serde_eyre { - use std::borrow::Cow; - - use serde::Deserialize; - - pub fn serialize( - error: &eyre::Report, - serializer: S, - ) -> Result - where - S: serde::Serializer, - { - let error = error.to_string(); - serializer.serialize_str(&error) - } - - pub fn deserialize<'de, D>( - deserializer: D, - ) -> Result - where - D: serde::Deserializer<'de>, - { - let error = Cow::<'static, str>::deserialize(deserializer)?; - Ok(eyre::eyre!(error)) - } -} - -#[cfg(test)] -mod tests { - use test_case::test_case; - - use super::*; - - #[test_case(ApiError::KeyLength, r#""keyLength""# ; "Key length")] - #[test_case(ApiError::Other(eyre::eyre!("Test error")), r#"{"other":"Test error"}"# ; "Other error")] - #[test_case(ApiError::TooManyTransactions { max: 10, current: 20 }, r#"{"tooManyTransactions":{"max":10,"current":20}}"# ; "Too many transactions")] - fn serialization(error: ApiError, expected: &str) { - let serialized = serde_json::to_string(&error).unwrap(); - - assert_eq!(serialized, expected); - - let deserialized = serde_json::from_str::(expected).unwrap(); - - assert_eq!(error, deserialized); - } -} diff --git a/src/server/middleware.rs b/src/server/middleware.rs deleted file mode 100644 index a56ef0c..0000000 --- a/src/server/middleware.rs +++ /dev/null @@ -1,3 +0,0 @@ -mod log_response_middleware; - -pub use self::log_response_middleware::log_response; diff --git a/src/server/middleware/log_response_middleware.rs b/src/server/middleware/log_response_middleware.rs deleted file mode 100644 index ecd8f42..0000000 --- a/src/server/middleware/log_response_middleware.rs +++ /dev/null @@ -1,24 +0,0 @@ -use axum::http::Request; -use axum::middleware::Next; -use axum::response::Response; -use hyper::Body; - -pub async fn log_response(request: Request, next: Next) -> Response { - let mut response = next.run(request).await; - - if !response.status().is_success() { - let body_bytes = hyper::body::to_bytes(response.body_mut()) - .await - .expect("Failed to read body"); - - let body_as_text = std::str::from_utf8(&body_bytes) - .unwrap_or("Failed to parse body as text"); - - let status_code = response.status(); - tracing::error!(?status_code, "{body_as_text}"); - - *response.body_mut() = axum::body::boxed(Body::from(body_bytes)); - } - - response -} diff --git a/src/server/routes.rs b/src/server/routes.rs deleted file mode 100644 index de17332..0000000 --- a/src/server/routes.rs +++ /dev/null @@ -1,9 +0,0 @@ -use hyper::StatusCode; - -pub mod network; -pub mod relayer; -pub mod transaction; - -pub async fn health() -> StatusCode { - StatusCode::OK -} diff --git a/src/server/routes/network.rs b/src/server/routes/network.rs deleted file mode 100644 index 2c0b349..0000000 --- a/src/server/routes/network.rs +++ /dev/null @@ -1,75 +0,0 @@ -use std::sync::Arc; - -use axum::extract::{Json, Path, State}; -use eyre::Result; -use serde::{Deserialize, Serialize}; -use url::Url; - -use crate::app::App; -use crate::server::ApiError; -use crate::service::Service; -use crate::task_runner::TaskRunner; - -#[derive(Debug, Default, Clone, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct NewNetworkInfo { - pub name: String, - pub http_rpc: String, - pub ws_rpc: String, -} - -#[derive(Debug, Default, Clone, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct NetworkInfo { - pub chain_id: u64, - pub name: String, - pub http_rpc: String, - pub ws_rpc: String, -} - -#[tracing::instrument(skip(app))] -pub async fn create_network( - State(app): State>, - Path(chain_id): Path, - Json(network): Json, -) -> Result<(), ApiError> { - let http_url: Url = network.http_rpc.parse().map_err(|err| { - tracing::error!("Invalid http rpc url: {}", err); - ApiError::InvalidFormat - })?; - - let ws_url: Url = network.ws_rpc.parse().map_err(|err| { - tracing::error!("Invalid ws rpc url: {}", err); - ApiError::InvalidFormat - })?; - - app.db - .create_network( - chain_id, - &network.name, - http_url.as_str(), - ws_url.as_str(), - ) - .await?; - - let task_runner = TaskRunner::new(app.clone()); - Service::spawn_chain_tasks(&task_runner, chain_id)?; - - Ok(()) -} - -#[tracing::instrument(skip(_app))] -pub async fn _get_network( - State(_app): State>, - Path(_chain_id): Path, -) -> &'static str { - "Hello, World!" -} - -#[tracing::instrument(skip(_app))] -pub async fn _get_networks( - State(_app): State>, - Path(_chain_id): Path, -) -> &'static str { - "Hello, World!" -} diff --git a/src/server/routes/relayer.rs b/src/server/routes/relayer.rs deleted file mode 100644 index ad87b09..0000000 --- a/src/server/routes/relayer.rs +++ /dev/null @@ -1,168 +0,0 @@ -use std::sync::Arc; - -use axum::extract::{Json, Path, State}; -use ethers::signers::Signer; -use ethers::types::Address; -use eyre::Result; -use serde::{Deserialize, Serialize}; -use serde_json::Value; - -use crate::api_key::ApiKey; -use crate::app::App; -use crate::server::ApiError; -use crate::types::{RelayerInfo, RelayerUpdate}; - -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct CreateRelayerRequest { - pub name: String, - pub chain_id: u64, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct CreateRelayerResponse { - pub relayer_id: String, - pub address: Address, -} - -#[derive(Debug, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct RpcRequest { - pub id: i32, - pub method: String, - #[serde(default)] - pub params: Value, - pub jsonrpc: JsonRpcVersion, -} - -#[derive(Debug, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct RpcResponse { - pub id: i32, - pub result: Value, - pub jsonrpc: JsonRpcVersion, -} - -#[derive(Debug, Serialize, Deserialize)] -pub enum JsonRpcVersion { - #[serde(rename = "2.0")] - V2, -} - -#[derive(Debug, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct CreateApiKeyResponse { - pub api_key: ApiKey, -} - -#[tracing::instrument(skip(app))] -pub async fn create_relayer( - State(app): State>, - Json(req): Json, -) -> Result, ApiError> { - let (key_id, signer) = app.keys_source.new_signer(&req.name).await?; - - let address = signer.address(); - - let relayer_id = uuid::Uuid::new_v4(); - let relayer_id = relayer_id.to_string(); - - app.db - .create_relayer(&relayer_id, &req.name, req.chain_id, &key_id, address) - .await?; - - Ok(Json(CreateRelayerResponse { - relayer_id, - address, - })) -} - -#[tracing::instrument(skip(app))] -pub async fn update_relayer( - State(app): State>, - Path(relayer_id): Path, - Json(req): Json, -) -> Result, ApiError> { - app.db.update_relayer(&relayer_id, &req).await?; - - Ok(Json(())) -} - -#[tracing::instrument(skip(app))] -pub async fn get_relayers( - State(app): State>, -) -> Result>, ApiError> { - let relayer_info = app.db.get_relayers().await?; - - Ok(Json(relayer_info)) -} - -#[tracing::instrument(skip(app))] -pub async fn get_relayer( - State(app): State>, - Path(relayer_id): Path, -) -> Result, ApiError> { - let relayer_info = app.db.get_relayer(&relayer_id).await?; - - Ok(Json(relayer_info)) -} - -/// Resets the relayer -/// deletes all unsent txs -/// and resets nonce to the current confirmed nonce -#[tracing::instrument(skip(app))] -pub async fn purge_unsent_txs( - State(app): State>, - Path(relayer_id): Path, -) -> Result, ApiError> { - app.db.purge_unsent_txs(&relayer_id).await?; - - Ok(Json(())) -} - -#[tracing::instrument(skip(app, api_token))] -pub async fn relayer_rpc( - State(app): State>, - Path(api_token): Path, - Json(req): Json, -) -> Result, ApiError> { - if !app.is_authorized(&api_token).await? { - return Err(ApiError::Unauthorized); - } - - let relayer_info = app.db.get_relayer(api_token.relayer_id()).await?; - - // TODO: Cache? - let http_provider = app.http_provider(relayer_info.chain_id).await?; - let url = http_provider.url(); - - let response = reqwest::Client::new() - .post(url.clone()) - .json(&req) - .send() - .await - .map_err(|err| { - eyre::eyre!("Error sending request to {}: {}", url, err) - })?; - - let response: Value = response.json().await.map_err(|err| { - eyre::eyre!("Error parsing response from {}: {}", url, err) - })?; - - Ok(Json(response)) -} - -#[tracing::instrument(skip(app))] -pub async fn create_relayer_api_key( - State(app): State>, - Path(relayer_id): Path, -) -> Result, ApiError> { - let api_key = ApiKey::random(&relayer_id); - - app.db - .create_api_key(&relayer_id, api_key.api_key_secret_hash()) - .await?; - - Ok(Json(CreateApiKeyResponse { api_key })) -} diff --git a/src/server/routes/transaction.rs b/src/server/routes/transaction.rs deleted file mode 100644 index fef6d5e..0000000 --- a/src/server/routes/transaction.rs +++ /dev/null @@ -1,225 +0,0 @@ -use std::sync::Arc; - -use axum::extract::{Json, Path, Query, State}; -use ethers::types::{Address, Bytes, H256, U256}; -use eyre::Result; -use serde::{Deserialize, Serialize}; - -use crate::api_key::ApiKey; -use crate::app::App; -use crate::db::TxStatus; -use crate::server::ApiError; -use crate::types::TransactionPriority; - -#[derive(Debug, Default, Clone, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct SendTxRequest { - pub to: Address, - #[serde(with = "crate::serde_utils::decimal_u256")] - pub value: U256, - #[serde(default)] - pub data: Option, - #[serde(with = "crate::serde_utils::decimal_u256")] - pub gas_limit: U256, - #[serde(default)] - pub priority: TransactionPriority, - #[serde(default)] - pub tx_id: Option, - #[serde(default, with = "crate::serde_utils::base64_binary")] - pub blobs: Option>>, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct SendTxResponse { - pub tx_id: String, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct GetTxQuery { - #[serde(default)] - pub status: Option, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub struct GetTxResponse { - pub tx_id: String, - pub to: Address, - #[serde(default, skip_serializing_if = "Option::is_none")] - pub data: Option, - #[serde(with = "crate::serde_utils::decimal_u256")] - pub value: U256, - #[serde(with = "crate::serde_utils::decimal_u256")] - pub gas_limit: U256, - pub nonce: u64, - - // Sent tx data - #[serde(default, skip_serializing_if = "Option::is_none")] - pub tx_hash: Option, - pub status: GetTxResponseStatus, -} - -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(untagged)] -#[serde(rename_all = "camelCase")] -pub enum GetTxResponseStatus { - TxStatus(TxStatus), - Unsent(UnsentStatus), -} - -// We need this status as a separate enum to avoid manual serialization -#[derive(Debug, Clone, Serialize, Deserialize)] -#[serde(rename_all = "camelCase")] -pub enum UnsentStatus { - Unsent, -} - -#[tracing::instrument(skip(app, api_token))] -pub async fn send_tx( - State(app): State>, - Path(api_token): Path, - Json(req): Json, -) -> Result, ApiError> { - if !app.is_authorized(&api_token).await? { - return Err(ApiError::Unauthorized); - } - - let tx_id = if let Some(id) = req.tx_id { - id - } else { - uuid::Uuid::new_v4().to_string() - }; - - let relayer = app.db.get_relayer(api_token.relayer_id()).await?; - - if !relayer.enabled { - return Err(ApiError::RelayerDisabled); - } - - let relayer_queued_tx_count = app - .db - .get_relayer_pending_txs(api_token.relayer_id()) - .await?; - - if relayer_queued_tx_count > relayer.max_queued_txs as usize { - return Err(ApiError::TooManyTransactions { - max: relayer.max_queued_txs as usize, - current: relayer_queued_tx_count, - }); - } - - app.db - .create_transaction( - &tx_id, - req.to, - req.data.as_ref().map(|d| &d[..]).unwrap_or(&[]), - req.value, - req.gas_limit, - req.priority, - req.blobs, - api_token.relayer_id(), - ) - .await?; - - tracing::info!(tx_id, "Transaction created"); - - Ok(Json(SendTxResponse { tx_id })) -} - -#[tracing::instrument(skip(app, api_token))] -pub async fn get_txs( - State(app): State>, - Path(api_token): Path, - Query(query): Query, -) -> Result>, ApiError> { - if !app.is_authorized(&api_token).await? { - return Err(ApiError::Unauthorized); - } - - let txs = match query.status { - Some(GetTxResponseStatus::TxStatus(status)) => { - app.db - .read_txs(api_token.relayer_id(), Some(Some(status))) - .await? - } - Some(GetTxResponseStatus::Unsent(_)) => { - app.db.read_txs(api_token.relayer_id(), Some(None)).await? - } - None => app.db.read_txs(api_token.relayer_id(), None).await?, - }; - - let txs = - txs.into_iter() - .map(|tx| GetTxResponse { - tx_id: tx.tx_id, - to: tx.to.0, - data: if tx.data.is_empty() { - None - } else { - Some(tx.data.into()) - }, - value: tx.value.0, - gas_limit: tx.gas_limit.0, - nonce: tx.nonce, - tx_hash: tx.tx_hash.map(|h| h.0), - status: tx.status.map(GetTxResponseStatus::TxStatus).unwrap_or( - GetTxResponseStatus::Unsent(UnsentStatus::Unsent), - ), - }) - .collect(); - - Ok(Json(txs)) -} - -#[tracing::instrument(skip(app, api_token))] -pub async fn get_tx( - State(app): State>, - Path((api_token, tx_id)): Path<(ApiKey, String)>, -) -> Result, ApiError> { - if !app.is_authorized(&api_token).await? { - return Err(ApiError::Unauthorized); - } - - let tx = app.db.read_tx(&tx_id).await?.ok_or(ApiError::MissingTx)?; - - let get_tx_response = GetTxResponse { - tx_id: tx.tx_id, - to: tx.to.0, - data: if tx.data.is_empty() { - None - } else { - Some(tx.data.into()) - }, - value: tx.value.0, - gas_limit: tx.gas_limit.0, - nonce: tx.nonce, - tx_hash: tx.tx_hash.map(|h| h.0), - status: tx - .status - .map(GetTxResponseStatus::TxStatus) - .unwrap_or(GetTxResponseStatus::Unsent(UnsentStatus::Unsent)), - }; - - Ok(Json(get_tx_response)) -} - -#[cfg(test)] -mod tests { - use test_case::test_case; - - use super::*; - - #[test_case(GetTxResponseStatus::TxStatus(TxStatus::Pending) => "pending")] - #[test_case(GetTxResponseStatus::Unsent(UnsentStatus::Unsent) => "unsent")] - fn get_tx_response_status_serialization( - status: GetTxResponseStatus, - ) -> &'static str { - let json = serde_json::to_string(&status).unwrap(); - - let s = json.trim_start_matches('\"').trim_end_matches('\"'); - - Box::leak(s.to_owned().into_boxed_str()) - } -} diff --git a/src/server/security.rs b/src/server/security.rs new file mode 100644 index 0000000..e22f92d --- /dev/null +++ b/src/server/security.rs @@ -0,0 +1,48 @@ +use poem::Result; +use poem_openapi::{auth, SecurityScheme}; + +use crate::api_key::ApiKey; +use crate::app::App; + +#[derive(SecurityScheme)] +#[oai(ty = "basic")] +pub struct BasicAuth(auth::Basic); + +impl BasicAuth { + pub async fn validate(&self, app: impl AsRef) -> Result<()> { + let app = app.as_ref(); + + if let Some((username, password)) = app.config.server.credentials() { + if username != self.0.username && password != self.0.password { + return Err(poem::error::Error::from_string( + "Unauthorized".to_string(), + poem::http::StatusCode::UNAUTHORIZED, + )); + } + } + + Ok(()) + } +} + +impl ApiKey { + pub async fn validate(&self, app: impl AsRef) -> Result<()> { + let app = app.as_ref(); + + let is_authorized = app.is_authorized(self).await.map_err(|err| { + poem::error::Error::from_string( + err.to_string(), + poem::http::StatusCode::INTERNAL_SERVER_ERROR, + ) + })?; + + if !is_authorized { + return Err(poem::error::Error::from_string( + "Unauthorized".to_string(), + poem::http::StatusCode::UNAUTHORIZED, + )); + } + + Ok(()) + } +} diff --git a/src/server/trace_layer.rs b/src/server/trace_layer.rs deleted file mode 100644 index 85987b9..0000000 --- a/src/server/trace_layer.rs +++ /dev/null @@ -1,25 +0,0 @@ -use axum::extract::MatchedPath; -use hyper::Request; -use tower_http::trace::MakeSpan; -use tracing::{Level, Span}; - -/// MakeSpan to remove api keys from logs -#[derive(Clone)] -pub(crate) struct MatchedPathMakeSpan; - -impl MakeSpan for MatchedPathMakeSpan { - fn make_span(&mut self, request: &Request) -> Span { - let matched_path = request - .extensions() - .get::() - .map(MatchedPath::as_str); - - tracing::span!( - Level::DEBUG, - "request", - method = %request.method(), - matched_path, - version = ?request.version(), - ) - } -} diff --git a/src/server/trace_middleware.rs b/src/server/trace_middleware.rs new file mode 100644 index 0000000..9df203b --- /dev/null +++ b/src/server/trace_middleware.rs @@ -0,0 +1,69 @@ +use poem::http::uri::Builder; +use poem::{Endpoint, IntoResponse, Middleware, Request, Response, Result}; +use tracing::{Instrument, Level}; + +pub struct TraceMiddleware; + +impl Middleware for TraceMiddleware { + type Output = TraceMiddlwareImpl; + + fn transform(&self, ep: E) -> Self::Output { + TraceMiddlwareImpl(ep) + } +} + +pub struct TraceMiddlwareImpl(E); + +impl Endpoint for TraceMiddlwareImpl { + type Output = Response; + + async fn call(&self, req: Request) -> Result { + let obfuscated_uri_path = if req.uri().path().starts_with("/1/api") { + // Regex to hide the api token in a url like /1/api//... + let re = regex::Regex::new(r"/1/api/([^/]+)/").unwrap(); + re.replace(req.uri().path(), "/1/api/.../") + } else { + req.uri().path().into() + }; + + let mut req_uri_builder = Builder::from(req.uri().clone()); + if let Some(path_and_query) = req.uri().path_and_query() { + let q = path_and_query.query(); + + let new_p_and_q = if let Some(q) = q { + format!("{obfuscated_uri_path}?{q}") + } else { + obfuscated_uri_path.to_string() + }; + + req_uri_builder = req_uri_builder.path_and_query(new_p_and_q); + } + + let req_uri = req_uri_builder.build().expect("Invalid URI"); + + let span = tracing::span!(Level::DEBUG, "request", method = %req.method(), uri = %req_uri); + + let res = async move { + // TODO: Propagate span from request headers + tracing::debug!("started processing request"); + + let res = self.0.call(req).await; + let response = match res { + Ok(r) => r.into_response(), + Err(e) => e.into_response(), + }; + + if response.status().is_server_error() { + tracing::error!(status = %response.status(), "finished processing request"); + } else if response.status().is_client_error() { + tracing::warn!(status = %response.status(), "finished processing request"); + } else { + tracing::debug!(status = %response.status(), "finished processing request"); + } + + response + }.instrument(span).await; + + Ok(res) + } +} diff --git a/src/service.rs b/src/service.rs index 22537d1..2a510d4 100644 --- a/src/service.rs +++ b/src/service.rs @@ -13,7 +13,7 @@ use crate::tasks; pub struct Service { _app: Arc, local_addr: SocketAddr, - server_handle: JoinHandle>, + server_handle: JoinHandle>, } impl Service { @@ -45,10 +45,7 @@ impl Service { let server = crate::server::spawn_server(app.clone()).await?; let local_addr = server.local_addr(); - let server_handle = tokio::spawn(async move { - server.await?; - Ok(()) - }); + let server_handle = server.server_handle; initialize_predefined_values(&app).await?; @@ -100,35 +97,44 @@ async fn initialize_predefined_values( tracing::warn!("Running with predefined values is not recommended in a production environment"); - app.db - .create_network( - predefined.network.chain_id, - &predefined.network.name, - &predefined.network.http_rpc, - &predefined.network.ws_rpc, - ) - .await?; + if app + .db + .get_network(predefined.network.chain_id) + .await? + .is_none() + { + app.db + .upsert_network( + predefined.network.chain_id, + &predefined.network.name, + &predefined.network.http_rpc, + &predefined.network.ws_rpc, + ) + .await?; - let task_runner = TaskRunner::new(app.clone()); - Service::spawn_chain_tasks(&task_runner, predefined.network.chain_id)?; + let task_runner = TaskRunner::new(app.clone()); + Service::spawn_chain_tasks(&task_runner, predefined.network.chain_id)?; + } let secret_key = signing_key_from_hex(&predefined.relayer.key_id)?; let signer = Wallet::from(secret_key); let address = signer.address(); - app.db - .create_relayer( - &predefined.relayer.id, - &predefined.relayer.name, - predefined.relayer.chain_id, - &predefined.relayer.key_id, - address, - ) - .await?; + if app.db.get_relayer(&predefined.relayer.id).await?.is_none() { + app.db + .create_relayer( + &predefined.relayer.id, + &predefined.relayer.name, + predefined.relayer.chain_id, + &predefined.relayer.key_id, + address, + ) + .await?; + } app.db - .create_api_key( + .upsert_api_key( predefined.relayer.api_key.relayer_id(), predefined.relayer.api_key.api_key_secret_hash(), ) diff --git a/src/tasks/broadcast.rs b/src/tasks/broadcast.rs index e2e97e8..41ae204 100644 --- a/src/tasks/broadcast.rs +++ b/src/tasks/broadcast.rs @@ -54,7 +54,11 @@ async fn broadcast_relayer_txs( return Ok(()); } - let relayer = app.db.get_relayer(&relayer_id).await?; + let relayer = app + .db + .get_relayer(&relayer_id) + .await? + .context("Missing relayer")?; if !should_send_relayer_transactions(app, &relayer).await? { tracing::warn!(relayer_id = relayer_id, "Skipping relayer broadcasts"); diff --git a/src/tasks/escalate.rs b/src/tasks/escalate.rs index 55367f6..c863d2a 100644 --- a/src/tasks/escalate.rs +++ b/src/tasks/escalate.rs @@ -56,7 +56,11 @@ async fn escalate_relayer_txs( relayer_id: String, txs: Vec, ) -> eyre::Result<()> { - let relayer = app.db.get_relayer(&relayer_id).await?; + let relayer = app + .db + .get_relayer(&relayer_id) + .await? + .context("Missing relayer")?; if txs.is_empty() { tracing::info!("No transactions to escalate"); diff --git a/src/tasks/index.rs b/src/tasks/index.rs index 440542b..bdf79c0 100644 --- a/src/tasks/index.rs +++ b/src/tasks/index.rs @@ -22,25 +22,31 @@ const GAS_PRICE_FOR_METRICS_FACTOR: f64 = 1e-9; pub async fn index_chain(app: Arc, chain_id: u64) -> eyre::Result<()> { loop { - let ws_rpc = app.ws_provider(chain_id).await?; - let rpc = app.http_provider(chain_id).await?; - - tracing::info!("Subscribing to new blocks"); - // Subscribe to new block with the WS client which uses an unbounded receiver, buffering the stream - let mut blocks_stream = ws_rpc.subscribe_blocks().await?; - - // Get the first block from the stream, backfilling any missing blocks from the latest block in the db to the chain head - tracing::info!("Backfilling blocks"); - if let Some(latest_block) = blocks_stream.next().await { - backfill_to_block(app.clone(), chain_id, &rpc, latest_block) - .await?; - } + index_inner(app.clone(), chain_id).await?; + } +} - // Index incoming blocks from the stream - while let Some(block) = blocks_stream.next().await { - index_block(app.clone(), chain_id, &rpc, block).await?; - } +#[tracing::instrument(skip(app), level = "info")] +async fn index_inner(app: Arc, chain_id: u64) -> eyre::Result<()> { + let ws_rpc = app.ws_provider(chain_id).await?; + let rpc = app.http_provider(chain_id).await?; + + tracing::info!("Subscribing to new blocks"); + // Subscribe to new block with the WS client which uses an unbounded receiver, buffering the stream + let mut blocks_stream = ws_rpc.subscribe_blocks().await?; + + // Get the first block from the stream, backfilling any missing blocks from the latest block in the db to the chain head + tracing::info!("Backfilling blocks"); + if let Some(latest_block) = blocks_stream.next().await { + backfill_to_block(app.clone(), chain_id, &rpc, latest_block).await?; + } + + // Index incoming blocks from the stream + while let Some(block) = blocks_stream.next().await { + index_block(app.clone(), chain_id, &rpc, block).await?; } + + Ok(()) } #[tracing::instrument(skip(app, rpc, block))] @@ -77,6 +83,7 @@ pub async fn index_block( let metric_labels: [(&str, String); 1] = [("chain_id", chain_id.to_string())]; + for tx in mined_txs { tracing::info!( tx_id = tx.0, @@ -94,7 +101,7 @@ pub async fn index_block( Ok(()) } -#[tracing::instrument(skip(app, rpc, latest_block))] +#[tracing::instrument(skip(app, rpc, latest_block), level = "info")] pub async fn backfill_to_block( app: Arc, chain_id: u64, @@ -102,35 +109,45 @@ pub async fn backfill_to_block( latest_block: Block, ) -> eyre::Result<()> { // Get the latest block from the db - if let Some(latest_db_block_number) = + let Some(latest_db_block_number) = app.db.get_latest_block_number(chain_id).await? - { - let next_block_number: u64 = latest_db_block_number + 1; - - // Get the first block from the stream and backfill any missing blocks - let latest_block_number = latest_block - .number - .context("Missing block number")? - .as_u64(); - - if latest_block_number > next_block_number { - // Backfill blocks between the last synced block and the chain head, non inclusive - for block_number in next_block_number..latest_block_number { - let block = rpc - .get_block::(block_number.into()) - .await? - .context(format!( - "Could not get block at height {}", - block_number - ))?; - - index_block(app.clone(), chain_id, rpc, block).await?; - } + else { + tracing::info!(chain_id, "No latest block"); + return Ok(()); + }; + + let next_block_number: u64 = latest_db_block_number + 1; + + // Get the first block from the stream and backfill any missing blocks + let latest_block_number = latest_block + .number + .context("Missing block number")? + .as_u64(); + + tracing::info!( + latest_block_number, + next_block_number, + "Backfilling to block" + ); + + if latest_block_number > next_block_number { + // Backfill blocks between the last synced block and the chain head, non inclusive + for block_number in next_block_number..latest_block_number { + let block = rpc + .get_block::(block_number.into()) + .await? + .context(format!( + "Could not get block at height {}", + block_number + ))?; + + index_block(app.clone(), chain_id, rpc, block).await?; } + } + + // Index the latest block after backfilling + index_block(app.clone(), chain_id, rpc, latest_block).await?; - // Index the latest block after backfilling - index_block(app.clone(), chain_id, rpc, latest_block).await?; - }; Ok(()) } diff --git a/src/types.rs b/src/types.rs index 657e245..c56e4b1 100644 --- a/src/types.rs +++ b/src/types.rs @@ -1,11 +1,21 @@ +use poem_openapi::{Enum, Object}; use serde::{Deserialize, Serialize}; +use serde_json::Value; use sqlx::prelude::FromRow; -use sqlx::types::Json; +use wrappers::address::AddressWrapper; +use wrappers::decimal_u256::DecimalU256; +use wrappers::h256::H256Wrapper; +use wrappers::hex_bytes::HexBytes; -use crate::db::data::{AddressWrapper, U256Wrapper}; +use crate::api_key::ApiKey; -#[derive(Deserialize, Serialize, Debug, Clone, Copy, Default, sqlx::Type)] +pub mod wrappers; + +#[derive( + Deserialize, Serialize, Debug, Clone, Copy, Default, sqlx::Type, Enum, +)] #[serde(rename_all = "camelCase")] +#[oai(rename_all = "camelCase")] #[sqlx(type_name = "transaction_priority", rename_all = "camelCase")] pub enum TransactionPriority { // 5th percentile @@ -27,8 +37,9 @@ impl TransactionPriority { } } -#[derive(Deserialize, Serialize, Debug, Clone, FromRow)] +#[derive(Deserialize, Serialize, Debug, Clone, FromRow, Object)] #[serde(rename_all = "camelCase")] +#[oai(rename_all = "camelCase")] pub struct RelayerInfo { pub id: String, pub name: String, @@ -44,12 +55,14 @@ pub struct RelayerInfo { pub max_inflight_txs: u64, #[sqlx(try_from = "i64")] pub max_queued_txs: u64, - pub gas_price_limits: Json>, + #[sqlx(json)] + pub gas_price_limits: Vec, pub enabled: bool, } -#[derive(Deserialize, Serialize, Debug, Clone, Default)] +#[derive(Deserialize, Serialize, Debug, Clone, Default, Object)] #[serde(rename_all = "camelCase")] +#[oai(rename_all = "camelCase")] pub struct RelayerUpdate { #[serde(default)] pub relayer_name: Option, @@ -63,13 +76,181 @@ pub struct RelayerUpdate { pub enabled: Option, } -#[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq)] +#[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq, Object)] #[serde(rename_all = "camelCase")] +#[oai(rename_all = "camelCase")] pub struct RelayerGasPriceLimit { - pub value: U256Wrapper, + pub value: DecimalU256, pub chain_id: i64, } +#[derive(Debug, Serialize, Deserialize, Object)] +#[serde(rename_all = "camelCase")] +#[oai(rename_all = "camelCase")] +pub struct CreateApiKeyResponse { + pub api_key: ApiKey, +} + +#[derive(Debug, Default, Clone, Serialize, Deserialize, Object)] +#[serde(rename_all = "camelCase")] +#[oai(rename_all = "camelCase")] +pub struct NewNetworkInfo { + pub name: String, + pub http_rpc: String, + pub ws_rpc: String, +} + +#[derive(Debug, Default, Clone, Serialize, Deserialize, FromRow, Object)] +#[serde(rename_all = "camelCase")] +#[oai(rename_all = "camelCase")] +pub struct NetworkInfo { + #[sqlx(try_from = "i64")] + pub chain_id: u64, + pub name: String, + pub http_rpc: String, + pub ws_rpc: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize, Object)] +#[serde(rename_all = "camelCase")] +#[oai(rename_all = "camelCase")] +pub struct CreateRelayerRequest { + /// New relayer name + pub name: String, + /// The chain id of the relayer + pub chain_id: u64, +} + +#[derive(Debug, Clone, Serialize, Deserialize, Object)] +#[serde(rename_all = "camelCase")] +#[oai(rename_all = "camelCase")] +pub struct CreateRelayerResponse { + /// ID of the created relayer + pub relayer_id: String, + /// Address of the created relayer + pub address: AddressWrapper, +} + +#[derive(Debug, Default, Clone, Serialize, Deserialize, Object)] +#[serde(rename_all = "camelCase")] +#[oai(rename_all = "camelCase")] +pub struct SendTxRequest { + pub to: AddressWrapper, + /// Transaction value + pub value: DecimalU256, + #[serde(default)] + #[oai(default)] + pub data: Option, + /// Transaction gas limit + pub gas_limit: DecimalU256, + /// Transaction priority + /// + /// The values map to the following percentiles: + /// + /// slowest -> 5th percentile + /// + /// slow -> 25th percentile + /// + /// regular -> 50th percentile + /// + /// fast -> 75th percentile + /// + /// fastest -> 95th percentile + /// + /// i.e. a transaction with priority `fast` will have a gas price that is higher than 75% of the gas prices of other transactions (based on fee estimates from previous blocks). + #[serde(default)] + #[oai(default)] + pub priority: TransactionPriority, + /// An optional transaction id. If not provided tx-sitter will generate a UUID. + /// + /// Can be used to provide idempotency for the transaction. + #[serde(default)] + #[oai(default)] + pub tx_id: Option, + // TODO: poem_openapi thinks this is a nested array of numbers + #[serde(default, with = "crate::serde_utils::base64_binary")] + #[oai(default)] + pub blobs: Option>>, +} + +#[derive(Debug, Clone, Serialize, Deserialize, Object)] +#[serde(rename_all = "camelCase")] +#[oai(rename_all = "camelCase")] +pub struct SendTxResponse { + pub tx_id: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize, Object)] +#[serde(rename_all = "camelCase")] +#[oai(rename_all = "camelCase")] +pub struct GetTxResponse { + pub tx_id: String, + pub to: AddressWrapper, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub data: Option, + pub value: DecimalU256, + pub gas_limit: DecimalU256, + pub nonce: u64, + + // Sent tx data + #[serde(default, skip_serializing_if = "Option::is_none")] + pub tx_hash: Option, + #[serde(default)] + #[oai(default)] + pub status: Option, +} + +#[derive( + Debug, Clone, Serialize, Deserialize, Copy, PartialEq, Eq, sqlx::Type, Enum, +)] +#[sqlx(rename_all = "camelCase")] +#[sqlx(type_name = "tx_status")] +#[serde(rename_all = "camelCase")] +#[oai(rename_all = "camelCase")] +pub enum TxStatus { + Pending, + Mined, + Finalized, +} + +#[derive(Debug, Serialize, Deserialize, Object)] +#[serde(rename_all = "camelCase")] +#[oai(rename_all = "camelCase")] +pub struct RpcRequest { + pub id: i32, + pub method: String, + #[serde(default)] + #[oai(default)] + pub params: Value, + pub jsonrpc: JsonRpcVersion, +} + +#[derive(Debug, Serialize, Deserialize, Object)] +#[serde(rename_all = "camelCase")] +#[oai(rename_all = "camelCase")] +pub struct RpcResponse { + pub id: i32, + pub result: Value, + pub jsonrpc: JsonRpcVersion, +} + +#[derive(Debug, Serialize, Deserialize, Enum)] +pub enum JsonRpcVersion { + #[serde(rename = "2.0")] + #[oai(rename = "2.0")] + V2, +} + +impl TxStatus { + pub fn previous(self) -> Self { + match self { + Self::Pending => Self::Pending, + Self::Mined => Self::Pending, + Self::Finalized => Self::Mined, + } + } +} + impl RelayerUpdate { pub fn with_relayer_name(mut self, relayer_name: String) -> Self { self.relayer_name = Some(relayer_name); @@ -103,6 +284,7 @@ impl RelayerUpdate { #[cfg(test)] mod tests { use ethers::types::{Address, U256}; + use ethers::utils::parse_units; use super::*; @@ -118,10 +300,10 @@ mod tests { current_nonce: 0, max_inflight_txs: 0, max_queued_txs: 0, - gas_price_limits: Json(vec![RelayerGasPriceLimit { - value: U256Wrapper(U256::zero()), + gas_price_limits: vec![RelayerGasPriceLimit { + value: U256::zero().into(), chain_id: 1, - }]), + }], enabled: true, }; @@ -140,7 +322,7 @@ mod tests { "maxQueuedTxs": 0, "gasPriceLimits": [ { - "value": "0x0", + "value": "0", "chainId": 1 } ], @@ -150,4 +332,37 @@ mod tests { assert_eq!(json.trim(), expected.trim()); } + + #[test] + fn send_tx_request() { + let value: U256 = parse_units("1", "ether").unwrap().into(); + + let request = SendTxRequest { + to: AddressWrapper(Address::zero()), + value: value.into(), + data: Some(HexBytes::from(vec![0])), + gas_limit: U256::zero().into(), + priority: TransactionPriority::Regular, + tx_id: Some("tx_id".to_string()), + blobs: Some(vec![vec![0]]), + }; + + let json = serde_json::to_string_pretty(&request).unwrap(); + + let expected = indoc::indoc! {r#" + { + "to": "0x0000000000000000000000000000000000000000", + "value": "1000000000000000000", + "data": "0x00", + "gasLimit": "0", + "priority": "regular", + "txId": "tx_id", + "blobs": [ + "AA==" + ] + } + "#}; + + assert_eq!(json.trim(), expected.trim()); + } } diff --git a/src/types/wrappers.rs b/src/types/wrappers.rs new file mode 100644 index 0000000..acff1d0 --- /dev/null +++ b/src/types/wrappers.rs @@ -0,0 +1,22 @@ +use decimal_u256::DecimalU256; +use hex_u256::HexU256; + +pub mod address; +pub mod h256; +pub mod hex_bytes; + +// TODO: Remove repeated code in these 2 modules +pub mod decimal_u256; +pub mod hex_u256; + +impl From for DecimalU256 { + fn from(value: HexU256) -> DecimalU256 { + DecimalU256::from(value.0) + } +} + +impl From for HexU256 { + fn from(value: DecimalU256) -> HexU256 { + HexU256::from(value.0) + } +} diff --git a/src/types/wrappers/address.rs b/src/types/wrappers/address.rs new file mode 100644 index 0000000..a0eccd5 --- /dev/null +++ b/src/types/wrappers/address.rs @@ -0,0 +1,118 @@ +use ethers::types::Address; +use poem_openapi::registry::{MetaSchema, MetaSchemaRef}; +use poem_openapi::types::{ParseError, ParseFromJSON, ToJSON}; +use serde::{Deserialize, Serialize}; +use sqlx::database::HasValueRef; +use sqlx::Database; + +#[derive(Debug, Default, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(transparent)] +pub struct AddressWrapper(pub Address); + +impl<'r, DB> sqlx::Decode<'r, DB> for AddressWrapper +where + DB: Database, + Vec: sqlx::Decode<'r, DB>, +{ + fn decode( + value: >::ValueRef, + ) -> Result { + let bytes = as sqlx::Decode>::decode(value)?; + + let address = Address::from_slice(&bytes); + + Ok(Self(address)) + } +} + +impl sqlx::Type for AddressWrapper +where + Vec: sqlx::Type, +{ + fn type_info() -> DB::TypeInfo { + as sqlx::Type>::type_info() + } + + fn compatible(ty: &DB::TypeInfo) -> bool { + *ty == Self::type_info() + } +} + +impl From
for AddressWrapper { + fn from(value: Address) -> Self { + Self(value) + } +} + +impl poem_openapi::types::Type for AddressWrapper { + const IS_REQUIRED: bool = true; + + type RawValueType = Address; + + type RawElementValueType = Address; + + fn name() -> std::borrow::Cow<'static, str> { + "string(address)".into() + } + + fn schema_ref() -> MetaSchemaRef { + let mut schema_ref = MetaSchema::new_with_format("string", "address"); + + schema_ref.example = Some(serde_json::Value::String( + "0x000000000000000000000000000000000000000f".to_string(), + )); + schema_ref.title = Some("Address".to_string()); + schema_ref.description = Some("Hex encoded ethereum address"); + + MetaSchemaRef::Inline(Box::new(schema_ref)) + } + + fn as_raw_value(&self) -> Option<&Self::RawValueType> { + Some(&self.0) + } + + fn raw_element_iter<'a>( + &'a self, + ) -> Box + 'a> { + Box::new(self.as_raw_value().into_iter()) + } +} + +impl ParseFromJSON for AddressWrapper { + fn parse_from_json( + value: Option, + ) -> poem_openapi::types::ParseResult { + let value = value.ok_or_else(ParseError::expected_input)?; + + let value = + serde_json::from_value(value).map_err(ParseError::custom)?; + + Ok(value) + } +} + +impl ToJSON for AddressWrapper { + fn to_json(&self) -> Option { + serde_json::to_value(self).ok() + } +} + +#[cfg(test)] +mod tests { + use ethers::types::H160; + use hex_literal::hex; + + use super::*; + + #[test] + fn deserialize() { + let address: AddressWrapper = serde_json::from_str( + r#""1Ed53d680B8890DAe2a63f673a85fFDE1FD5C7a2""#, + ) + .unwrap(); + + let expected = H160(hex!("1Ed53d680B8890DAe2a63f673a85fFDE1FD5C7a2")); + + assert_eq!(address.0, expected); + } +} diff --git a/src/types/wrappers/decimal_u256.rs b/src/types/wrappers/decimal_u256.rs new file mode 100644 index 0000000..d32aa65 --- /dev/null +++ b/src/types/wrappers/decimal_u256.rs @@ -0,0 +1,159 @@ +use std::borrow::Cow; + +use ethers::types::U256; +use poem_openapi::registry::{MetaSchema, MetaSchemaRef}; +use poem_openapi::types::{ParseError, ParseFromJSON, ToJSON}; +use serde::{Deserialize, Serialize}; +use sqlx::database::{HasArguments, HasValueRef}; +use sqlx::Database; + +#[derive(Debug, Default, Clone, PartialEq, Eq)] +pub struct DecimalU256(pub U256); + +impl Serialize for DecimalU256 { + fn serialize( + &self, + serializer: S, + ) -> Result { + let s = self.0.to_string(); + serializer.serialize_str(&s) + } +} + +impl<'de> Deserialize<'de> for DecimalU256 { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let s: Cow = serde::Deserialize::deserialize(deserializer)?; + + let u256 = U256::from_dec_str(&s).map_err(serde::de::Error::custom)?; + + Ok(Self(u256)) + } +} + +impl<'r, DB> sqlx::Decode<'r, DB> for DecimalU256 +where + DB: Database, + [u8; 32]: sqlx::Decode<'r, DB>, +{ + fn decode( + value: >::ValueRef, + ) -> Result { + let bytes = <[u8; 32] as sqlx::Decode>::decode(value)?; + + let value = U256::from_big_endian(&bytes); + + Ok(Self(value)) + } +} + +impl sqlx::Type for DecimalU256 +where + [u8; 32]: sqlx::Type, +{ + fn type_info() -> DB::TypeInfo { + <[u8; 32] as sqlx::Type>::type_info() + } + + fn compatible(ty: &DB::TypeInfo) -> bool { + *ty == Self::type_info() + } +} + +impl<'q, DB> sqlx::Encode<'q, DB> for DecimalU256 +where + DB: Database, + [u8; 32]: sqlx::Encode<'q, DB>, +{ + fn encode_by_ref( + &self, + buf: &mut >::ArgumentBuffer, + ) -> sqlx::encode::IsNull { + let mut bytes = [0u8; 32]; + self.0.to_big_endian(&mut bytes); + + <[u8; 32] as sqlx::Encode>::encode_by_ref(&bytes, buf) + } +} + +impl From for DecimalU256 { + fn from(value: U256) -> Self { + Self(value) + } +} + +impl poem_openapi::types::Type for DecimalU256 { + const IS_REQUIRED: bool = true; + + type RawValueType = Self; + + type RawElementValueType = Self; + + fn name() -> std::borrow::Cow<'static, str> { + "string(decimal-u256)".into() + } + + fn schema_ref() -> MetaSchemaRef { + let mut schema_ref = + MetaSchema::new_with_format("string", "decimal-u256"); + + schema_ref.example = Some(serde_json::Value::String("0".to_string())); + schema_ref.default = Some(serde_json::Value::String("0".to_string())); + schema_ref.title = Some("Decimal U256".to_string()); + schema_ref.description = Some("A decimal 256-bit unsigned integer"); + + MetaSchemaRef::Inline(Box::new(schema_ref)) + } + + fn as_raw_value(&self) -> Option<&Self::RawValueType> { + Some(self) + } + + fn raw_element_iter<'a>( + &'a self, + ) -> Box + 'a> { + Box::new(self.as_raw_value().into_iter()) + } +} + +impl ParseFromJSON for DecimalU256 { + fn parse_from_json( + value: Option, + ) -> poem_openapi::types::ParseResult { + let value = value.ok_or_else(ParseError::expected_input)?; + + let value = + serde_json::from_value(value).map_err(ParseError::custom)?; + + Ok(value) + } +} + +impl ToJSON for DecimalU256 { + fn to_json(&self) -> Option { + serde_json::to_value(self).ok() + } +} + +#[cfg(test)] +mod tests { + use test_case::test_case; + + use super::*; + + #[test_case("10", 10)] + #[test_case("255", 255)] + #[test_case("10000000000000000000", 10000000000000000000)] + fn deserialize_string(s: &str, v: u64) { + let s = format!("\"{s}\""); + let actual: DecimalU256 = serde_json::from_str(&s).unwrap(); + + assert_eq!(actual.0, U256::from(v)); + + let reserialized = serde_json::to_string(&actual).unwrap(); + + assert_eq!(reserialized, s); + } +} diff --git a/src/types/wrappers/h256.rs b/src/types/wrappers/h256.rs new file mode 100644 index 0000000..069f536 --- /dev/null +++ b/src/types/wrappers/h256.rs @@ -0,0 +1,116 @@ +use ethers::types::H256; +use poem_openapi::registry::{MetaSchema, MetaSchemaRef}; +use poem_openapi::types::{ParseError, ParseFromJSON, ToJSON}; +use serde::{Deserialize, Serialize}; +use sqlx::database::{HasArguments, HasValueRef}; +use sqlx::postgres::{PgHasArrayType, PgTypeInfo}; +use sqlx::Database; + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(transparent)] +pub struct H256Wrapper(pub H256); + +impl<'r, DB> sqlx::Decode<'r, DB> for H256Wrapper +where + DB: Database, + [u8; 32]: sqlx::Decode<'r, DB>, +{ + fn decode( + value: >::ValueRef, + ) -> Result { + let bytes = <[u8; 32] as sqlx::Decode>::decode(value)?; + + let value = H256::from_slice(&bytes); + + Ok(Self(value)) + } +} + +impl<'q, DB> sqlx::Encode<'q, DB> for H256Wrapper +where + DB: Database, + [u8; 32]: sqlx::Encode<'q, DB>, +{ + fn encode_by_ref( + &self, + buf: &mut >::ArgumentBuffer, + ) -> sqlx::encode::IsNull { + <[u8; 32] as sqlx::Encode>::encode_by_ref(&self.0 .0, buf) + } +} + +impl PgHasArrayType for H256Wrapper { + fn array_type_info() -> PgTypeInfo { + <[u8; 32] as PgHasArrayType>::array_type_info() + } +} + +impl sqlx::Type for H256Wrapper +where + [u8; 32]: sqlx::Type, +{ + fn type_info() -> DB::TypeInfo { + <[u8; 32] as sqlx::Type>::type_info() + } + + fn compatible(ty: &DB::TypeInfo) -> bool { + *ty == Self::type_info() + } +} + +impl poem_openapi::types::Type for H256Wrapper { + const IS_REQUIRED: bool = true; + + type RawValueType = Self; + + type RawElementValueType = Self; + + fn name() -> std::borrow::Cow<'static, str> { + "string(h256)".into() + } + + fn schema_ref() -> MetaSchemaRef { + let mut schema_ref = MetaSchema::new_with_format("string", "h256"); + + schema_ref.example = Some(serde_json::Value::String( + "0x46239dbfe5502b9f82c3dff992927d8d9b3168e732b4fd5771288569f5a1813d".to_string(), + )); + schema_ref.default = Some(serde_json::Value::String( + "0x0000000000000000000000000000000000000000000000000000000000000000".to_string(), + )); + schema_ref.title = Some("H256".to_string()); + schema_ref.description = Some("A hex encoded 256-bit hash"); + + MetaSchemaRef::Inline(Box::new(schema_ref)) + } + + fn as_raw_value(&self) -> Option<&Self::RawValueType> { + Some(self) + } + + fn raw_element_iter<'a>( + &'a self, + ) -> Box + 'a> { + Box::new(self.as_raw_value().into_iter()) + } +} + +impl ParseFromJSON for H256Wrapper { + fn parse_from_json( + value: Option, + ) -> poem_openapi::types::ParseResult { + // TODO: Better error handling + let value = value.ok_or_else(ParseError::expected_input)?; + + let inner = + serde_json::from_value(value).map_err(ParseError::custom)?; + + Ok(Self(inner)) + } +} + +impl ToJSON for H256Wrapper { + fn to_json(&self) -> Option { + serde_json::to_value(self.0).ok() + } +} diff --git a/src/types/wrappers/hex_bytes.rs b/src/types/wrappers/hex_bytes.rs new file mode 100644 index 0000000..02dbbe8 --- /dev/null +++ b/src/types/wrappers/hex_bytes.rs @@ -0,0 +1,90 @@ +use ethers::types::Bytes; +use poem_openapi::registry::{MetaSchema, MetaSchemaRef}; +use poem_openapi::types::{ParseError, ParseFromJSON, ToJSON}; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(transparent)] +pub struct HexBytes(pub Bytes); + +impl From for HexBytes { + fn from(value: Bytes) -> Self { + Self(value) + } +} + +impl From> for HexBytes { + fn from(value: Vec) -> Self { + Self(Bytes::from(value)) + } +} + +impl poem_openapi::types::Type for HexBytes { + const IS_REQUIRED: bool = true; + + type RawValueType = Self; + + type RawElementValueType = Self; + + fn name() -> std::borrow::Cow<'static, str> { + "string(bytes)".into() + } + + fn schema_ref() -> MetaSchemaRef { + let mut schema_ref = MetaSchema::new_with_format("string", "bytes"); + + schema_ref.example = + Some(serde_json::Value::String("0xffffff".to_string())); + schema_ref.title = Some("Bytes".to_string()); + schema_ref.description = Some("Hex encoded binary blob"); + + MetaSchemaRef::Inline(Box::new(MetaSchema::new_with_format( + "string", "bytes", + ))) + } + + fn as_raw_value(&self) -> Option<&Self::RawValueType> { + Some(self) + } + + fn raw_element_iter<'a>( + &'a self, + ) -> Box + 'a> { + Box::new(self.as_raw_value().into_iter()) + } +} + +impl ParseFromJSON for HexBytes { + fn parse_from_json( + value: Option, + ) -> poem_openapi::types::ParseResult { + let value = value.ok_or_else(ParseError::expected_input)?; + + let inner = + serde_json::from_value(value).map_err(ParseError::custom)?; + + Ok(Self(inner)) + } +} + +impl ToJSON for HexBytes { + fn to_json(&self) -> Option { + serde_json::to_value(&self.0).ok() + } +} + +#[cfg(test)] +mod tests { + use test_case::test_case; + + use super::*; + + #[test_case("0xff", vec![255])] + #[test_case("0xffff", vec![255, 255])] + #[test_case("0x0101", vec![1, 1])] + fn deserialize_string(s: &str, v: Vec) { + let value = serde_json::Value::String(s.to_string()); + let result = HexBytes::parse_from_json(Some(value)).unwrap(); + assert_eq!(result.0, Bytes::from(v)); + } +} diff --git a/src/types/wrappers/hex_u256.rs b/src/types/wrappers/hex_u256.rs new file mode 100644 index 0000000..63f4a5f --- /dev/null +++ b/src/types/wrappers/hex_u256.rs @@ -0,0 +1,130 @@ +use ethers::types::U256; +use poem_openapi::registry::{MetaSchema, MetaSchemaRef}; +use poem_openapi::types::{ParseError, ParseFromJSON, ToJSON}; +use serde::{Deserialize, Serialize}; +use sqlx::database::{HasArguments, HasValueRef}; +use sqlx::Database; + +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] +#[serde(transparent)] +pub struct HexU256(pub U256); + +impl<'r, DB> sqlx::Decode<'r, DB> for HexU256 +where + DB: Database, + [u8; 32]: sqlx::Decode<'r, DB>, +{ + fn decode( + value: >::ValueRef, + ) -> Result { + let bytes = <[u8; 32] as sqlx::Decode>::decode(value)?; + + let value = U256::from_big_endian(&bytes); + + Ok(Self(value)) + } +} + +impl sqlx::Type for HexU256 +where + [u8; 32]: sqlx::Type, +{ + fn type_info() -> DB::TypeInfo { + <[u8; 32] as sqlx::Type>::type_info() + } + + fn compatible(ty: &DB::TypeInfo) -> bool { + *ty == Self::type_info() + } +} + +impl<'q, DB> sqlx::Encode<'q, DB> for HexU256 +where + DB: Database, + [u8; 32]: sqlx::Encode<'q, DB>, +{ + fn encode_by_ref( + &self, + buf: &mut >::ArgumentBuffer, + ) -> sqlx::encode::IsNull { + let mut bytes = [0u8; 32]; + self.0.to_big_endian(&mut bytes); + + <[u8; 32] as sqlx::Encode>::encode_by_ref(&bytes, buf) + } +} + +impl From for HexU256 { + fn from(value: U256) -> Self { + Self(value) + } +} + +impl poem_openapi::types::Type for HexU256 { + const IS_REQUIRED: bool = true; + + type RawValueType = Self; + + type RawElementValueType = Self; + + fn name() -> std::borrow::Cow<'static, str> { + "string(hex-u256)".into() + } + + fn schema_ref() -> MetaSchemaRef { + let mut schema_ref = MetaSchema::new_with_format("string", "hex-u256"); + + schema_ref.example = + Some(serde_json::Value::String("0xff".to_string())); + schema_ref.default = Some(serde_json::Value::String("0x0".to_string())); + schema_ref.title = Some("Hex U256".to_string()); + schema_ref.description = Some("A hex encoded 256-bit unsigned integer"); + + MetaSchemaRef::Inline(Box::new(schema_ref)) + } + + fn as_raw_value(&self) -> Option<&Self::RawValueType> { + Some(self) + } + + fn raw_element_iter<'a>( + &'a self, + ) -> Box + 'a> { + Box::new(self.as_raw_value().into_iter()) + } +} + +impl ParseFromJSON for HexU256 { + fn parse_from_json( + value: Option, + ) -> poem_openapi::types::ParseResult { + let value = value.ok_or_else(ParseError::expected_input)?; + + let value = + serde_json::from_value(value).map_err(ParseError::custom)?; + + Ok(value) + } +} + +impl ToJSON for HexU256 { + fn to_json(&self) -> Option { + serde_json::to_value(self).ok() + } +} + +#[cfg(test)] +mod tests { + use test_case::test_case; + + use super::*; + + #[test_case("0xff", 255)] + #[test_case("0x10", 16)] + fn deserialize_string(s: &str, v: u64) { + let s = format!("\"{s}\""); + let actual: HexU256 = serde_json::from_str(&s).unwrap(); + + assert_eq!(actual.0, U256::from(v)); + } +} diff --git a/tests/common/mod.rs b/tests/common/mod.rs index c514749..b6e007f 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -34,10 +34,10 @@ pub mod prelude { pub use futures::StreamExt; pub use tx_sitter::api_key::ApiKey; pub use tx_sitter::client::TxSitterClient; - pub use tx_sitter::server::routes::relayer::{ + pub use tx_sitter::types::{ CreateApiKeyResponse, CreateRelayerRequest, CreateRelayerResponse, + SendTxRequest, }; - pub use tx_sitter::server::routes::transaction::SendTxRequest; pub use url::Url; pub use super::*; @@ -70,8 +70,7 @@ pub fn setup_tracing() { .with( EnvFilter::builder() .with_default_directive(LevelFilter::INFO.into()) - // Logging from fake_rpc can get very messy so we set it to warn only - .parse_lossy("info,tx_sitter=debug,fake_rpc=warn"), + .parse_lossy("info,tx_sitter=debug"), ) .init(); } @@ -114,7 +113,12 @@ pub async fn await_balance( for _ in 0..50 { let balance = provider.get_balance(address, None).await?; - tracing::info!(?balance, ?expected_balance, "Checking balance"); + tracing::info!( + ?address, + ?balance, + ?expected_balance, + "Checking balance" + ); if balance >= expected_balance { return Ok(()); diff --git a/tests/common/service_builder.rs b/tests/common/service_builder.rs index 45496b3..9b21d9a 100644 --- a/tests/common/service_builder.rs +++ b/tests/common/service_builder.rs @@ -23,9 +23,9 @@ pub struct ServiceBuilder { impl Default for ServiceBuilder { fn default() -> Self { Self { - escalation_interval: Duration::from_secs(30), - soft_reorg_interval: Duration::from_secs(45), - hard_reorg_interval: Duration::from_secs(60), + escalation_interval: Duration::from_secs(5), + soft_reorg_interval: Duration::from_secs(10), + hard_reorg_interval: Duration::from_secs(15), } } } @@ -83,6 +83,7 @@ impl ServiceBuilder { )), username: None, password: None, + server_address: None, }, database: DatabaseConfig::connection_string(db_url), keys: KeysConfig::Local(LocalKeysConfig::default()), diff --git a/tests/disabled_relayer.rs b/tests/disabled_relayer.rs index 20a1953..acd05d8 100644 --- a/tests/disabled_relayer.rs +++ b/tests/disabled_relayer.rs @@ -39,9 +39,9 @@ async fn disabled_relayer() -> eyre::Result<()> { .send_tx( &api_key, &SendTxRequest { - to: ARBITRARY_ADDRESS, - value, - gas_limit: U256::from(21_000), + to: ARBITRARY_ADDRESS.into(), + value: value.into(), + gas_limit: U256::from(21_000).into(), ..Default::default() }, ) diff --git a/tests/escalation.rs b/tests/escalation.rs index cab10c6..ceeeb65 100644 --- a/tests/escalation.rs +++ b/tests/escalation.rs @@ -31,9 +31,9 @@ async fn escalation() -> eyre::Result<()> { .send_tx( &api_key, &SendTxRequest { - to: ARBITRARY_ADDRESS, - value, - gas_limit: U256::from(21_000), + to: ARBITRARY_ADDRESS.into(), + value: value.into(), + gas_limit: U256::from(21_000).into(), ..Default::default() }, ) @@ -78,7 +78,7 @@ async fn get_tx_hash( let tx = client.get_tx(api_key, tx_id).await?; if let Some(tx_hash) = tx.tx_hash { - return Ok(tx_hash); + return Ok(tx_hash.0); } else { tokio::time::sleep(Duration::from_secs(3)).await; } diff --git a/tests/reorg.rs b/tests/reorg.rs index a25ae9a..32973eb 100644 --- a/tests/reorg.rs +++ b/tests/reorg.rs @@ -26,9 +26,9 @@ async fn reorg() -> eyre::Result<()> { .send_tx( &api_key, &SendTxRequest { - to: ARBITRARY_ADDRESS, - value, - gas_limit: U256::from(21_000), + to: ARBITRARY_ADDRESS.into(), + value: value.into(), + gas_limit: U256::from(21_000).into(), ..Default::default() }, ) diff --git a/tests/send_many_txs.rs b/tests/send_many_txs.rs index bb273f6..c4a4ba3 100644 --- a/tests/send_many_txs.rs +++ b/tests/send_many_txs.rs @@ -19,6 +19,7 @@ async fn send_many_txs() -> eyre::Result<()> { // Send a transaction let value: U256 = parse_units("10", "ether")?.into(); + let num_transfers = 10; let mut tasks = FuturesUnordered::new(); @@ -29,9 +30,9 @@ async fn send_many_txs() -> eyre::Result<()> { .send_tx( &api_key, &SendTxRequest { - to: ARBITRARY_ADDRESS, - value, - gas_limit: U256::from(21_000), + to: ARBITRARY_ADDRESS.into(), + value: value.into(), + gas_limit: U256::from(21_000).into(), ..Default::default() }, ) diff --git a/tests/send_too_many_txs.rs b/tests/send_too_many_txs.rs index d93b3fa..f9f322a 100644 --- a/tests/send_too_many_txs.rs +++ b/tests/send_too_many_txs.rs @@ -1,7 +1,5 @@ mod common; -use tx_sitter::client::ClientError; -use tx_sitter::server::ApiError; use tx_sitter::types::{RelayerUpdate, TransactionPriority}; use crate::common::prelude::*; @@ -35,6 +33,7 @@ async fn send_too_many_txs() -> eyre::Result<()> { api_key: secondary_api_key, } = client.create_relayer_api_key(&secondary_relayer_id).await?; + tracing::info!("Updating relayer"); // Set max queued txs client .update_relayer( @@ -48,15 +47,16 @@ async fn send_too_many_txs() -> eyre::Result<()> { // Send a transaction let value: U256 = parse_units("0.01", "ether")?.into(); + tracing::info!("Sending txs"); for _ in 0..=MAX_QUEUED_TXS { client .send_tx( &secondary_api_key, &SendTxRequest { - to: ARBITRARY_ADDRESS, - value, + to: ARBITRARY_ADDRESS.into(), + value: value.into(), data: None, - gas_limit: U256::from(21_000), + gas_limit: U256::from(21_000).into(), priority: TransactionPriority::Regular, tx_id: None, blobs: None, @@ -70,10 +70,10 @@ async fn send_too_many_txs() -> eyre::Result<()> { .send_tx( &secondary_api_key, &SendTxRequest { - to: ARBITRARY_ADDRESS, - value, + to: ARBITRARY_ADDRESS.into(), + value: value.into(), data: None, - gas_limit: U256::from(21_000), + gas_limit: U256::from(21_000).into(), priority: TransactionPriority::Regular, tx_id: None, blobs: None, @@ -81,11 +81,10 @@ async fn send_too_many_txs() -> eyre::Result<()> { ) .await; - assert!( - matches!( - result, - Err(ClientError::TxSitter(ApiError::TooManyTransactions { .. })) - ), + // TODO: Fix checking errors by string + assert_eq!( + result.as_ref().err().and_then(|e| e.tx_sitter()), + Some("Relayer queue is full"), "Result {:?} should be too many transactions", result ); @@ -98,10 +97,10 @@ async fn send_too_many_txs() -> eyre::Result<()> { .send_tx( &api_key, &SendTxRequest { - to: secondary_relayer_address, - value: total_required_value, + to: secondary_relayer_address.clone(), + value: total_required_value.into(), data: None, - gas_limit: U256::from(21_000), + gas_limit: U256::from(21_000).into(), priority: TransactionPriority::Regular, tx_id: None, blobs: None, @@ -110,7 +109,7 @@ async fn send_too_many_txs() -> eyre::Result<()> { .await?; tracing::info!("Waiting for secondary relayer balance"); - await_balance(&provider, total_required_value, secondary_relayer_address) + await_balance(&provider, total_required_value, secondary_relayer_address.0) .await?; tracing::info!("Waiting for queued up txs to be processed"); diff --git a/tests/send_tx.rs b/tests/send_tx.rs index 1a8fe62..c3f76c1 100644 --- a/tests/send_tx.rs +++ b/tests/send_tx.rs @@ -22,9 +22,9 @@ async fn send_tx() -> eyre::Result<()> { .send_tx( &api_key, &SendTxRequest { - to: ARBITRARY_ADDRESS, - value, - gas_limit: U256::from(21_000), + to: ARBITRARY_ADDRESS.into(), + value: value.into(), + gas_limit: U256::from(21_000).into(), ..Default::default() }, )