diff --git a/.env b/.env index 61a8e65..c6f516a 100644 --- a/.env +++ b/.env @@ -1,3 +1,7 @@ DATABASE_URL=postgres://postgres:holaplex@localhost:5437/analytics POSTGRES_DB=analytics POSTGRES_PASSWORD=holaplex +CUBE_AUTH_TOKEN=SECRET +CUBE_BASE_URL="http://127.0.0.1:4000/cubejs-api" +KAFKA_BROKERS=127.0.0.1:9092 +KAFKA_SSL=false diff --git a/.github/workflows/cargo_test.yml b/.github/workflows/cargo_test.yml index 8b41a8b..7e72828 100644 --- a/.github/workflows/cargo_test.yml +++ b/.github/workflows/cargo_test.yml @@ -49,7 +49,7 @@ jobs: - uses: actions-rs/toolchain@v1 with: - toolchain: 1.69.0 + toolchain: 1.71.0 override: true components: cargo, rustc diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 49e6e0b..12f73d2 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -42,7 +42,7 @@ jobs: uses: docker/setup-buildx-action@v2 with: driver-opts: image=moby/buildkit:master - version: v0.10.4 + version: v0.11.2 endpoint: ci - uses: aws-actions/configure-aws-credentials@v1 diff --git a/.gitignore b/.gitignore index b94a6f7..76186ef 100644 --- a/.gitignore +++ b/.gitignore @@ -1,7 +1,7 @@ # Generated by Cargo # will have compiled files and executables /target/ - +/schema # These are backup files generated by rustfmt **/*.rs.bk diff --git a/Cargo.lock b/Cargo.lock index cd1799f..416b6cb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7,6 +7,10 @@ name = "Inflector" version = "0.11.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fe438c63458706e03479442743baae6c88256498e6431708f6dfc520a26515d3" +dependencies = [ + "lazy_static", + "regex", +] [[package]] name = "addr2line" @@ -23,6 +27,41 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +[[package]] +name = "aead" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d122413f284cf2d62fb1b7db97e02edb8cda96d769b16e443a4f6195e35662b0" +dependencies = [ + "crypto-common", + "generic-array", +] + +[[package]] +name = "aes" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac1f845298e95f983ff1944b728ae08b8cebab80d684f0a832ed0fc74dfa27e2" +dependencies = [ + "cfg-if", + "cipher", + "cpufeatures", +] + +[[package]] +name = "aes-gcm" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "209b47e8954a928e1d72e86eca7000ebb6655fe1436d33eefc2201cad027e237" +dependencies = [ + "aead", + "aes", + "cipher", + "ctr", + "ghash", + "subtle", +] + [[package]] name = "ahash" version = "0.7.6" @@ -145,6 +184,104 @@ version = "0.7.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "96d30a06541fbafbc7f82ed10c06164cfbd2c401138f6addd8404629c4b16711" +[[package]] +name = "ascii_utils" +version = "0.9.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "71938f30533e4d95a6d17aa530939da3842c2ab6f4f84b9dae68447e4129f74a" + +[[package]] +name = "async-graphql" +version = "5.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b35ef8f9be23ee30fe1eb1cf175c689bc33517c6c6d0fd0669dade611e5ced7f" +dependencies = [ + "async-graphql-derive", + "async-graphql-parser", + "async-graphql-value", + "async-stream", + "async-trait", + "base64 0.13.1", + "bytes 1.4.0", + "chrono", + "fast_chemail", + "fnv", + "futures-channel", + "futures-timer", + "futures-util", + "handlebars", + "http", + "indexmap 1.9.3", + "log", + "lru", + "mime", + "multer", + "num-traits", + "once_cell", + "pin-project-lite", + "regex", + "serde", + "serde_json", + "serde_urlencoded", + "static_assertions", + "tempfile", + "thiserror", + "uuid", +] + +[[package]] +name = "async-graphql-derive" +version = "5.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a0f6ceed3640b4825424da70a5107e79d48d9b2bc6318dfc666b2fc4777f8c4" +dependencies = [ + "Inflector", + "async-graphql-parser", + "darling 0.14.4", + "proc-macro-crate 1.3.1", + "proc-macro2", + "quote", + "syn 1.0.109", + "thiserror", +] + +[[package]] +name = "async-graphql-parser" +version = "5.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ecc308cd3bc611ee86c9cf19182d2b5ee583da40761970e41207f088be3db18f" +dependencies = [ + "async-graphql-value", + "pest", + "serde", + "serde_json", +] + +[[package]] +name = "async-graphql-poem" +version = "5.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68f818938d4e47dcc40bc383e9ddec373e9aab1db29e5ad9706b29621afe3b3f" +dependencies = [ + "async-graphql", + "futures-util", + "poem", + "serde_json", + "tokio-util", +] + +[[package]] +name = "async-graphql-value" +version = "5.0.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d461325bfb04058070712296601dfe5e5bd6cdff84780a0a8c569ffb15c87eb3" +dependencies = [ + "bytes 1.4.0", + "indexmap 1.9.3", + "serde", + "serde_json", +] + [[package]] name = "async-stream" version = "0.3.5" @@ -238,6 +375,12 @@ version = "0.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" +[[package]] +name = "base64" +version = "0.20.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ea22880d78093b0cbe17c89f64a7d457941e65759157ec6cb31a31d652b05e5" + [[package]] name = "base64" version = "0.21.2" @@ -356,11 +499,20 @@ version = "1.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" +[[package]] +name = "bytes" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e4cec68f03f32e44924783795810fa50a7035d8c8ebe78580ad7e6c703fba38" + [[package]] name = "bytes" version = "1.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "89b2fd2a0dcf38d7971e2194b6b6eebab45ae01067456a7fd93d5547a61b70be" +dependencies = [ + "serde", +] [[package]] name = "cc" @@ -394,6 +546,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "cipher" +version = "0.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "773f3b9af64447d2ce9850330c473515014aa235e6a783b02db81ff39e4a3dad" +dependencies = [ + "crypto-common", + "inout", +] + [[package]] name = "clap" version = "3.2.25" @@ -481,6 +643,24 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" +[[package]] +name = "cookie" +version = "0.16.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e859cd57d0710d9e06c381b550c06e76992472a8c6d527aecd2fc673dcc231fb" +dependencies = [ + "aes-gcm", + "base64 0.20.0", + "hkdf", + "hmac", + "percent-encoding", + "rand", + "sha2", + "subtle", + "time 0.3.25", + "version_check", +] + [[package]] name = "core-foundation" version = "0.9.3" @@ -547,17 +727,70 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3" dependencies = [ "generic-array", + "rand_core", "typenum", ] +[[package]] +name = "ctr" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0369ee1ad671834580515889b80f2ea915f23b8be8d0daa4bbaf2ac5c7590835" +dependencies = [ + "cipher", +] + +[[package]] +name = "cube-client" +version = "0.1.2" +source = "git+https://github.com/holaplex/cube-client?branch=dev#45e5debd92b97f56e9627d5c2b3b0a24cd752645" +dependencies = [ + "anyhow", + "async-trait", + "either", + "log", + "reqwest", + "reqwest-middleware", + "serde", + "serde_derive", + "serde_json", + "tokio", + "url", + "uuid", +] + +[[package]] +name = "darling" +version = "0.14.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b750cb3417fd1b327431a470f388520309479ab0bf5e323505daf0290cd3850" +dependencies = [ + "darling_core 0.14.4", + "darling_macro 0.14.4", +] + [[package]] name = "darling" version = "0.20.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0209d94da627ab5605dcccf08bb18afa5009cfbef48d8a8b7d7bdbc79be25c5e" dependencies = [ - "darling_core", - "darling_macro", + "darling_core 0.20.3", + "darling_macro 0.20.3", +] + +[[package]] +name = "darling_core" +version = "0.14.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "109c1ca6e6b7f82cc233a97004ea8ed7ca123a9af07a8230878fcfda9b158bf0" +dependencies = [ + "fnv", + "ident_case", + "proc-macro2", + "quote", + "strsim", + "syn 1.0.109", ] [[package]] @@ -574,17 +807,34 @@ dependencies = [ "syn 2.0.28", ] +[[package]] +name = "darling_macro" +version = "0.14.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4aab4dbc9f7611d8b55048a3a16d2d010c2c8334e46304b40ac1cc14bf3b48e" +dependencies = [ + "darling_core 0.14.4", + "quote", + "syn 1.0.109", +] + [[package]] name = "darling_macro" version = "0.20.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "836a9bbc7ad63342d6d6e7b815ccab164bc77a2d95d84bc3117a8c0d5c98e2d5" dependencies = [ - "darling_core", + "darling_core 0.20.3", "quote", "syn 2.0.28", ] +[[package]] +name = "data-encoding" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2e66c9d817f1720209181c316d28635c050fa304f9c79e47a520882661b7308" + [[package]] name = "deranged" version = "0.3.7" @@ -697,6 +947,15 @@ version = "2.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" +[[package]] +name = "fast_chemail" +version = "0.9.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "495a39d30d624c2caabe6312bfead73e7717692b44e0b32df168c275a2e8e9e4" +dependencies = [ + "ascii_utils", +] + [[package]] name = "fastrand" version = "2.0.0" @@ -827,6 +1086,12 @@ version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65" +[[package]] +name = "futures-timer" +version = "3.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c" + [[package]] name = "futures-util" version = "0.3.28" @@ -845,6 +1110,18 @@ dependencies = [ "slab", ] +[[package]] +name = "futures_codec" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce54d63f8b0c75023ed920d46fd71d0cbbb830b0ee012726b5b4f506fb6dea5b" +dependencies = [ + "bytes 0.5.6", + "futures", + "memchr", + "pin-project", +] + [[package]] name = "generic-array" version = "0.14.7" @@ -866,6 +1143,16 @@ dependencies = [ "wasi 0.11.0+wasi-snapshot-preview1", ] +[[package]] +name = "ghash" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d930750de5717d2dd0b8c0d42c076c0e884c81a73e6cab859bbd2339c71e3e40" +dependencies = [ + "opaque-debug", + "polyval", +] + [[package]] name = "gimli" version = "0.27.3" @@ -878,7 +1165,7 @@ version = "0.3.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "97ec8491ebaf99c8eaa73058b045fe58073cd6be7f596ac993ced0b0a0c01049" dependencies = [ - "bytes", + "bytes 1.4.0", "fnv", "futures-core", "futures-sink", @@ -891,6 +1178,20 @@ dependencies = [ "tracing", ] +[[package]] +name = "handlebars" +version = "4.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "83c3372087601b532857d332f5957cbae686da52bb7810bf038c3e3c3cc2fa0d" +dependencies = [ + "log", + "pest", + "pest_derive", + "serde", + "serde_json", + "thiserror", +] + [[package]] name = "hashbrown" version = "0.12.3" @@ -928,6 +1229,31 @@ dependencies = [ "hashbrown 0.14.0", ] +[[package]] +name = "headers" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3e372db8e5c0d213e0cd0b9be18be2aca3d44cf2fe30a9d46a65581cd454584" +dependencies = [ + "base64 0.13.1", + "bitflags 1.3.2", + "bytes 1.4.0", + "headers-core", + "http", + "httpdate", + "mime", + "sha1", +] + +[[package]] +name = "headers-core" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e7f66481bfee273957b1f20485a4ff3362987f85b2c236580d81b4eb7a326429" +dependencies = [ + "http", +] + [[package]] name = "heck" version = "0.3.3" @@ -992,8 +1318,13 @@ dependencies = [ name = "holaplex-hub-analytics" version = "0.1.0" dependencies = [ + "async-graphql", + "async-graphql-poem", + "cube-client", + "either", "holaplex-hub-core", "holaplex-hub-core-build", + "poem", "prost", "sea-orm", "serde", @@ -1075,7 +1406,7 @@ version = "0.2.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bd6effc99afb63425aff9b05836f029929e345a6148a14b7ecd5ab67af944482" dependencies = [ - "bytes", + "bytes 1.4.0", "fnv", "itoa", ] @@ -1086,7 +1417,7 @@ version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1" dependencies = [ - "bytes", + "bytes 1.4.0", "http", "pin-project-lite", ] @@ -1109,7 +1440,7 @@ version = "0.14.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ffb1cfd654a8219eaef89881fdb3bb3b1cdc5fa75ded05d6933b2b382e395468" dependencies = [ - "bytes", + "bytes 1.4.0", "futures-channel", "futures-core", "futures-util", @@ -1127,13 +1458,27 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-rustls" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d78e1e73ec14cf7375674f74d7dde185c8206fd9dea6fb6295e8a98098aaa97" +dependencies = [ + "futures-util", + "http", + "hyper", + "rustls 0.21.6", + "tokio", + "tokio-rustls 0.24.1", +] + [[package]] name = "hyper-tls" version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" dependencies = [ - "bytes", + "bytes 1.4.0", "hyper", "native-tls", "tokio", @@ -1200,6 +1545,15 @@ dependencies = [ "hashbrown 0.14.0", ] +[[package]] +name = "inout" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a0c10553d664a4d0bcff9f4215d0aac67a639cc68ef660840afe309b807bc9f5" +dependencies = [ + "generic-array", +] + [[package]] name = "instant" version = "0.1.12" @@ -1315,6 +1669,15 @@ dependencies = [ "prost-types", ] +[[package]] +name = "lru" +version = "0.7.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e999beba7b6e8345721bd280141ed958096a2e4abdf74f67ff4ce49b4b54e47a" +dependencies = [ + "hashbrown 0.12.3", +] + [[package]] name = "match_cfg" version = "0.1.0" @@ -1359,6 +1722,16 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" +[[package]] +name = "mime_guess" +version = "2.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4192263c238a5f0d0c6bfd21f336a313a4ce1c450542449ca191bb657b4642ef" +dependencies = [ + "mime", + "unicase", +] + [[package]] name = "minimal-lexical" version = "0.2.1" @@ -1385,6 +1758,24 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "multer" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "01acbdc23469fd8fe07ab135923371d5f5a422fbf9c522158677c8eb15bc51c2" +dependencies = [ + "bytes 1.4.0", + "encoding_rs", + "futures-util", + "http", + "httparse", + "log", + "memchr", + "mime", + "spin 0.9.8", + "version_check", +] + [[package]] name = "multimap" version = "0.8.3" @@ -1505,6 +1896,12 @@ version = "1.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dd8b5dd2ae5ed71462c540258bedcb51965123ad7e7ccf4b9a8cafaa4a63576d" +[[package]] +name = "opaque-debug" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" + [[package]] name = "openssl" version = "0.10.55" @@ -1654,6 +2051,50 @@ version = "2.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b2a4787296e9989611394c33f193f676704af1686e70b8f8033ab5ba9a35a94" +[[package]] +name = "pest" +version = "2.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1acb4a4365a13f749a93f1a094a7805e5cfa0955373a9de860d962eaa3a5fe5a" +dependencies = [ + "thiserror", + "ucd-trie", +] + +[[package]] +name = "pest_derive" +version = "2.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "666d00490d4ac815001da55838c500eafb0320019bbaa44444137c48b443a853" +dependencies = [ + "pest", + "pest_generator", +] + +[[package]] +name = "pest_generator" +version = "2.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68ca01446f50dbda87c1786af8770d535423fa8a53aec03b8f4e3d7eb10e0929" +dependencies = [ + "pest", + "pest_meta", + "proc-macro2", + "quote", + "syn 2.0.28", +] + +[[package]] +name = "pest_meta" +version = "2.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56af0a30af74d0445c0bf6d9d051c979b516a1a5af790d251daee76005420a48" +dependencies = [ + "once_cell", + "pest", + "sha2", +] + [[package]] name = "petgraph" version = "0.6.3" @@ -1664,6 +2105,26 @@ dependencies = [ "indexmap 1.9.3", ] +[[package]] +name = "pin-project" +version = "0.4.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ef0f924a5ee7ea9cbcea77529dba45f8a9ba9f622419fe3386ca581a3ae9d5a" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "0.4.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "851c8d0ce9bebe43790dedfc86614c23494ac9f423dd618d3a61fc693eafe61e" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "pin-project-lite" version = "0.2.10" @@ -1682,6 +2143,67 @@ version = "0.3.27" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26072860ba924cbfa98ea39c8c19b4dd6a4a25423dbdf219c1eca91aa0cf6964" +[[package]] +name = "poem" +version = "1.3.57" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f0d92c532a37a9e98c0e9a0411e6852b8acccf9ec07d5e6e450b01cbf947d90b" +dependencies = [ + "anyhow", + "async-trait", + "base64 0.21.2", + "bytes 1.4.0", + "chrono", + "cookie", + "futures-util", + "headers", + "http", + "hyper", + "mime", + "parking_lot 0.12.1", + "percent-encoding", + "pin-project-lite", + "poem-derive", + "regex", + "rfc7239", + "serde", + "serde_json", + "serde_urlencoded", + "smallvec", + "sse-codec", + "thiserror", + "time 0.3.25", + "tokio", + "tokio-stream", + "tokio-tungstenite", + "tokio-util", + "tracing", +] + +[[package]] +name = "poem-derive" +version = "1.3.57" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f5dd58846a1f582215370384c3090c62c9ef188e9d798ffc67ea90d0a1a8a3b8" +dependencies = [ + "proc-macro-crate 1.3.1", + "proc-macro2", + "quote", + "syn 2.0.28", +] + +[[package]] +name = "polyval" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d52cff9d1d4dee5fe6d03729099f4a310a41179e0a10dbf542039873f2e826fb" +dependencies = [ + "cfg-if", + "cpufeatures", + "opaque-debug", + "universal-hash", +] + [[package]] name = "ppv-lite86" version = "0.2.17" @@ -1756,7 +2278,7 @@ version = "0.11.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0b82eaa1d779e9a4bc1c3217db8ffbeabaae1dca241bf70183242128d48681cd" dependencies = [ - "bytes", + "bytes 1.4.0", "prost-derive", ] @@ -1766,7 +2288,7 @@ version = "0.11.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "119533552c9a7ffacc21e099c24a0ac8bb19c2a2a3f363de84cd9b844feab270" dependencies = [ - "bytes", + "bytes 1.4.0", "heck 0.4.1", "itertools", "lazy_static", @@ -1991,7 +2513,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cde824a14b7c14f85caff81225f411faacc04a2013f41670f41443742b1c1c55" dependencies = [ "base64 0.21.2", - "bytes", + "bytes 1.4.0", "encoding_rs", "futures-core", "futures-util", @@ -1999,20 +2521,25 @@ dependencies = [ "http", "http-body", "hyper", + "hyper-rustls", "hyper-tls", "ipnet", "js-sys", "log", "mime", + "mime_guess", "native-tls", "once_cell", "percent-encoding", "pin-project-lite", + "rustls 0.21.6", + "rustls-pemfile", "serde", "serde_json", "serde_urlencoded", "tokio", "tokio-native-tls", + "tokio-rustls 0.24.1", "tokio-util", "tower-service", "url", @@ -2020,9 +2547,34 @@ dependencies = [ "wasm-bindgen-futures", "wasm-streams", "web-sys", + "webpki-roots", "winreg", ] +[[package]] +name = "reqwest-middleware" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ff44108c7925d082f2861e683a88618b68235ad9cdc60d64d9d1188efc951cdb" +dependencies = [ + "anyhow", + "async-trait", + "http", + "reqwest", + "serde", + "task-local-extensions", + "thiserror", +] + +[[package]] +name = "rfc7239" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "087317b3cf7eb481f13bd9025d729324b7cd068d6f470e2d76d049e191f5ba47" +dependencies = [ + "uncased", +] + [[package]] name = "ring" version = "0.16.20" @@ -2032,7 +2584,7 @@ dependencies = [ "cc", "libc", "once_cell", - "spin", + "spin 0.5.2", "untrusted", "web-sys", "winapi", @@ -2075,7 +2627,7 @@ dependencies = [ "arrayvec", "borsh", "byteorder", - "bytes", + "bytes 1.4.0", "num-traits", "rand", "rkyv", @@ -2114,6 +2666,18 @@ dependencies = [ "webpki", ] +[[package]] +name = "rustls" +version = "0.21.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d1feddffcfcc0b33f5c6ce9a29e341e4cd59c3f78e7ee45f4a40c038b1d6cbb" +dependencies = [ + "log", + "ring", + "rustls-webpki", + "sct", +] + [[package]] name = "rustls-pemfile" version = "1.0.3" @@ -2123,6 +2687,16 @@ dependencies = [ "base64 0.21.2", ] +[[package]] +name = "rustls-webpki" +version = "0.101.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "261e9e0888cba427c3316e6322805653c9425240b6fd96cee7cb671ab70ab8d0" +dependencies = [ + "ring", + "untrusted", +] + [[package]] name = "rustversion" version = "1.0.14" @@ -2426,7 +3000,7 @@ version = "2.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "881b6f881b17d13214e5d494c939ebab463d01264ce1811e9d4ac3a882e7695f" dependencies = [ - "darling", + "darling 0.20.3", "proc-macro2", "quote", "syn 2.0.28", @@ -2473,6 +3047,15 @@ dependencies = [ "winapi", ] +[[package]] +name = "signal-hook-registry" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d8229b473baa5980ac72ef434c4415e70c4b5e71b423043adb4ba059f89c99a1" +dependencies = [ + "libc", +] + [[package]] name = "simdutf8" version = "0.1.4" @@ -2516,6 +3099,12 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d" +[[package]] +name = "spin" +version = "0.9.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" + [[package]] name = "sqlformat" version = "0.2.1" @@ -2548,7 +3137,7 @@ dependencies = [ "base64 0.13.1", "bitflags 1.3.2", "byteorder", - "bytes", + "bytes 1.4.0", "chrono", "crc", "crossbeam-queue", @@ -2576,7 +3165,7 @@ dependencies = [ "percent-encoding", "rand", "rust_decimal", - "rustls", + "rustls 0.20.8", "rustls-pemfile", "serde", "serde_json", @@ -2623,9 +3212,27 @@ checksum = "804d3f245f894e61b1e6263c84b23ca675d96753b5abfd5cc8597d86806e8024" dependencies = [ "once_cell", "tokio", - "tokio-rustls", + "tokio-rustls 0.23.4", +] + +[[package]] +name = "sse-codec" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "84a59f811350c44b4a037aabeb72dc6a9591fc22aa95a036db9a96297c58085a" +dependencies = [ + "bytes 0.5.6", + "futures-io", + "futures_codec", + "memchr", ] +[[package]] +name = "static_assertions" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" + [[package]] name = "stringprep" version = "0.1.3" @@ -2676,6 +3283,15 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "55937e1799185b12863d447f42597ed69d9928686b8d88a1df17376a097d8369" +[[package]] +name = "task-local-extensions" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba323866e5d033818e3240feeb9f7db2c4296674e4d9e16b97b7bf8f490434e8" +dependencies = [ + "pin-utils", +] + [[package]] name = "tempfile" version = "3.7.0" @@ -2796,12 +3412,13 @@ checksum = "532826ff75199d5833b9d2c5fe410f29235e25704ee5f0ef599fb51c21f4a4da" dependencies = [ "autocfg", "backtrace", - "bytes", + "bytes 1.4.0", "libc", "mio", "num_cpus", "parking_lot 0.12.1", "pin-project-lite", + "signal-hook-registry", "socket2", "tokio-macros", "windows-sys", @@ -2834,11 +3451,21 @@ version = "0.23.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c43ee83903113e03984cb9e5cebe6c04a5116269e900e3ddba8f068a62adda59" dependencies = [ - "rustls", + "rustls 0.20.8", "tokio", "webpki", ] +[[package]] +name = "tokio-rustls" +version = "0.24.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c28327cf380ac148141087fbfb9de9d7bd4e84ab5d2c28fbc911d753de8a7081" +dependencies = [ + "rustls 0.21.6", + "tokio", +] + [[package]] name = "tokio-stream" version = "0.1.14" @@ -2850,14 +3477,27 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-tungstenite" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec509ac96e9a0c43427c74f003127d953a265737636129424288d27cb5c4b12c" +dependencies = [ + "futures-util", + "log", + "tokio", + "tungstenite", +] + [[package]] name = "tokio-util" version = "0.7.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "806fe8c2c87eccc8b3267cbae29ed3ab2d0bd37fca70ab622e46aaa9375ddb7d" dependencies = [ - "bytes", + "bytes 1.4.0", "futures-core", + "futures-io", "futures-sink", "pin-project-lite", "tokio", @@ -2997,12 +3637,55 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3528ecfd12c466c6f163363caf2d02a71161dd5e1cc6ae7b34207ea2d42d81ed" +[[package]] +name = "tungstenite" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "15fba1a6d6bb030745759a9a2a588bfe8490fc8b4751a277db3a0be1c9ebbf67" +dependencies = [ + "byteorder", + "bytes 1.4.0", + "data-encoding", + "http", + "httparse", + "log", + "rand", + "sha1", + "thiserror", + "url", + "utf-8", +] + [[package]] name = "typenum" version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "497961ef93d974e23eb6f433eb5fe1b7930b659f06d12dec6fc44a8f554c0bba" +[[package]] +name = "ucd-trie" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed646292ffc8188ef8ea4d1e0e0150fb15a5c2e12ad9b8fc191ae7a8a7f3c4b9" + +[[package]] +name = "uncased" +version = "0.9.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b9bc53168a4be7402ab86c3aad243a84dd7381d09be0eddc81280c1da95ca68" +dependencies = [ + "version_check", +] + +[[package]] +name = "unicase" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50f37be617794602aabbeee0be4f259dc1778fabe05e2d67ee8f79326d5cb4f6" +dependencies = [ + "version_check", +] + [[package]] name = "unicode-bidi" version = "0.3.13" @@ -3036,6 +3719,16 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39ec24b3121d976906ece63c9daad25b85969647682eee313cb5779fdd69e14e" +[[package]] +name = "universal-hash" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fc1de2c688dc15305988b563c3854064043356019f97a4b46276fe734c4f07ea" +dependencies = [ + "crypto-common", + "subtle", +] + [[package]] name = "untrusted" version = "0.7.1" @@ -3054,6 +3747,12 @@ dependencies = [ "serde", ] +[[package]] +name = "utf-8" +version = "0.7.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09cc8ee72d2a9becf2f2febe0205bbed8fc6615b7cb429ad062dc7b7ddd036a9" + [[package]] name = "utf8parse" version = "0.2.1" diff --git a/Dockerfile b/Dockerfile index 9a4e164..eb4117d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM rust:1.70.0-bookworm as chef +FROM rust:1.71.0-bookworm as chef RUN cargo install cargo-chef --locked WORKDIR /app diff --git a/app/Cargo.toml b/app/Cargo.toml index d20f035..5acdae9 100644 --- a/app/Cargo.toml +++ b/app/Cargo.toml @@ -18,9 +18,20 @@ sea-orm = { version = "^0.10.0", features = [ "runtime-tokio-rustls", "sqlx-postgres", ] } +poem = { version = "1.3.50", features = ["anyhow", "test", "cookie"] } +async-graphql = { version = "5.0.4", features = [ + "chrono", + "uuid", + "log", + "dataloader", + "apollo_tracing", +] } +async-graphql-poem = "5.0.3" serde = { version = "1.0.152", features = ["derive"] } serde_json = { version = "1.0.91" } prost = "0.11.6" +cube-client = { version = "0.1.2", git = "https://github.com/holaplex/cube-client", branch = "dev" } +either = "1.9.0" [dependencies.hub-core] package = "holaplex-hub-core" diff --git a/app/proto.lock b/app/proto.lock index 3cca701..e6a3610 100644 --- a/app/proto.lock +++ b/app/proto.lock @@ -3,6 +3,16 @@ subject = "credential" version = 2 sha512 = "0e01b47d7892ee2c5d7b21e5f7da6bb196404d5e5beeb8ac5c14100736d12252988df47a8924f7e3edf7205755423553f2c561623ff3644989a3f1450a798909" +[[schemas]] +subject = "credits" +version = 6 +sha512 = "20ed618976876db672f7cb1aa86fc73332107bbfd998786673b17c0a244ee47973f92e0530768babddb52d6a601096c3c95333835a9006430f40d32ffddd4fe7" + +[[schemas]] +subject = "credits_mpsc" +version = 2 +sha512 = "627b4a73d6f37f87c9c828ac2b094d7b267246fbd2eee627760bfe48bb0a40ab36073b163f85ace639cd83fcf04ef4d3ca38ecb3345ae0b68e30f14b548c45e8" + [[schemas]] subject = "customer" version = 2 @@ -10,8 +20,8 @@ sha512 = "d75800df0d4744c6b0f4d9a9952d3bfd0bb6b24a8babd19104cc11b54a525f85551b3c [[schemas]] subject = "nfts" -version = 22 -sha512 = "c9920f6a5792b067396c88e40b9bd2adfcb55b582734aff924a67a9d5841a5e2839fc734c1bbff66f402f9a9d8852ca5fef1339aaaa3d5b05aa7868ddfa375c1" +version = 26 +sha512 = "8191626dbd6e222f24914589a77855e5f0f5122b7adc655ef7e84af0d9ef104715408db97cdf7843882aaa65cfcacc436a01e01b336f6758b0863ec5e007b709" [[schemas]] subject = "organization" diff --git a/app/proto.toml b/app/proto.toml index 8ad2931..08b07be 100644 --- a/app/proto.toml +++ b/app/proto.toml @@ -3,9 +3,11 @@ endpoint = "https://schemas.holaplex.tools/" [schemas] organization = 5 -nfts = 22 +nfts = 26 customer = 2 treasury = 16 +credits = 6 +credits_mpsc = 2 credential = 2 webhook = 2 solana_nfts = 7 diff --git a/app/src/cube_client.rs b/app/src/cube_client.rs new file mode 100644 index 0000000..8a89e4d --- /dev/null +++ b/app/src/cube_client.rs @@ -0,0 +1,61 @@ +use cube_client::apis::{ + configuration::Configuration as CubeConfig, default_api as cube_api, Error as CubeApiError, +}; +pub use cube_client::models::{ + v1_load_request::V1LoadRequest, v1_query::Query, v1_time::TimeGranularity, +}; +use hub_core::{ + anyhow::{Context, Result}, + clap, thiserror, + url::Url, +}; + +/// Arguments for establishing a database connection +#[derive(Clone, Debug, clap::Args)] +pub struct CubeArgs { + #[arg(long, env, default_value = "http://127.0.0.1:4000")] + cube_base_url: String, + #[arg(long, env)] + cube_auth_token: String, +} + +#[derive(Clone, Debug)] +pub struct Client(CubeConfig); + +#[derive(Debug, thiserror::Error)] +pub enum CubeClientError { + #[error("Cube API error: {0}")] + CubeApiError(#[from] CubeApiError), + #[error("Serialization error: {0}")] + SerializationError(#[from] serde_json::Error), +} + +impl Client { + /// Res + /// Constructs a new `Client` instance from the provided arguments. + /// # Errors + /// This function fails if unable to parse Url from the provided arguments. + pub fn from_args(args: &CubeArgs) -> Result { + // It would be a good practice to validate the URL and maybe even normalize it. + let base_url = Url::parse(&args.cube_base_url).context("Invalid Cube base URL provided")?; + + Ok(Client(CubeConfig { + bearer_access_token: Some(args.cube_auth_token.clone()), + base_path: base_url.to_string(), + ..Default::default() + })) + } + /// Res + /// + /// # Errors + /// This function fails if query parameters are invalid or Cube is not responding + pub async fn query(&self, query: Query) -> Result { + let request = V1LoadRequest { + query: Some(query.build()), + query_type: Some("multi".to_string()), + }; + + let response = cube_api::load_v1(&self.0, Some(request)).await?; + Ok(serde_json::to_string(&response)?) + } +} diff --git a/app/src/entities/credits.rs b/app/src/entities/credits.rs new file mode 100644 index 0000000..3fd7a13 --- /dev/null +++ b/app/src/entities/credits.rs @@ -0,0 +1,18 @@ +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.1 + +use sea_orm::entity::prelude::*; + +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] +#[sea_orm(table_name = "credits")] +pub struct Model { + #[sea_orm(primary_key)] + pub id: Uuid, + pub amount: u64, + pub organization_id: Uuid, + pub timestamp: DateTime, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation {} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/app/src/entities/mints.rs b/app/src/entities/mints.rs index e2971bb..19c8d54 100644 --- a/app/src/entities/mints.rs +++ b/app/src/entities/mints.rs @@ -8,7 +8,6 @@ pub struct Model { #[sea_orm(primary_key)] pub id: Uuid, pub project_id: Uuid, - pub owner: String, pub collection_id: Uuid, pub timestamp: DateTime, } diff --git a/app/src/entities/mod.rs b/app/src/entities/mod.rs index 40a750d..4459d90 100644 --- a/app/src/entities/mod.rs +++ b/app/src/entities/mod.rs @@ -3,8 +3,11 @@ pub mod prelude; pub mod collections; +pub mod credits; pub mod customers; pub mod mints; pub mod organizations; pub mod projects; +pub mod transfers; pub mod wallets; +pub mod webhooks; diff --git a/app/src/entities/projects.rs b/app/src/entities/projects.rs index 7d2b280..d6e777c 100644 --- a/app/src/entities/projects.rs +++ b/app/src/entities/projects.rs @@ -10,6 +10,7 @@ pub struct Model { #[sea_orm(column_type = "Text")] pub name: String, pub organization_id: Uuid, + pub timestamp: DateTime, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] diff --git a/app/src/entities/transfers.rs b/app/src/entities/transfers.rs new file mode 100644 index 0000000..257dcd1 --- /dev/null +++ b/app/src/entities/transfers.rs @@ -0,0 +1,17 @@ +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.1 + +use sea_orm::entity::prelude::*; + +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] +#[sea_orm(table_name = "transfers")] +pub struct Model { + #[sea_orm(primary_key)] + pub id: Uuid, + pub project_id: Uuid, + pub timestamp: DateTime, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation {} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/app/src/entities/webhooks.rs b/app/src/entities/webhooks.rs new file mode 100644 index 0000000..c0c9e40 --- /dev/null +++ b/app/src/entities/webhooks.rs @@ -0,0 +1,18 @@ +//! `SeaORM` Entity. Generated by sea-orm-codegen 0.12.1 + +use sea_orm::entity::prelude::*; + +#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq)] +#[sea_orm(table_name = "webhooks")] +pub struct Model { + #[sea_orm(primary_key)] + pub id: Uuid, + pub project_id: Uuid, + pub organization_id: Uuid, + pub timestamp: DateTime, +} + +#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] +pub enum Relation {} + +impl ActiveModelBehavior for ActiveModel {} diff --git a/app/src/events.rs b/app/src/events.rs index 006432d..6d64374 100644 --- a/app/src/events.rs +++ b/app/src/events.rs @@ -3,7 +3,7 @@ use sea_orm::{prelude::*, Set}; use crate::{ db::Connection, - entities::{collections, customers, mints, organizations, projects, wallets}, + entities::{collections, customers, mints, organizations, projects, transfers, wallets}, proto::{customer_events, nft_events, organization_events, solana_nft_events, treasury_events}, Services, }; @@ -30,6 +30,7 @@ pub async fn process(msg: Services, db: Connection) -> Result<()> { id: Set(Uuid::parse_str(&k.id)?), name: Set(v.name), organization_id: Set(Uuid::parse_str(&v.organization_id)?), + timestamp: Set(Utc::now().naive_utc()), } .insert(db.get()) .await?; @@ -65,6 +66,7 @@ pub async fn process(msg: Services, db: Connection) -> Result<()> { }, Some(_) | None => Ok(()), }, + Services::Webhooks(..) => Ok(()), //TODO Services::Nfts(k, v) => match v.event { Some(nft_events::Event::SolanaCreateDrop(v)) => { collections::ActiveModel { @@ -94,7 +96,6 @@ pub async fn process(msg: Services, db: Connection) -> Result<()> { mints::ActiveModel { id: Set(Uuid::parse_str(&k.id)?), collection_id: Set(Uuid::parse_str(&v.collection_id)?), - owner: Set(v.recipient_address), project_id: Set(Uuid::parse_str(&k.project_id)?), timestamp: Set(Utc::now().naive_utc()), } @@ -106,7 +107,16 @@ pub async fn process(msg: Services, db: Connection) -> Result<()> { mints::ActiveModel { id: Set(Uuid::parse_str(&k.id)?), collection_id: Set(Uuid::parse_str(&v.collection_id)?), - owner: Set(v.receiver), + project_id: Set(Uuid::parse_str(&k.project_id)?), + timestamp: Set(Utc::now().naive_utc()), + } + .insert(db.get()) + .await?; + Ok(()) + }, + Some(nft_events::Event::TransferMint(_)) => { + transfers::ActiveModel { + id: Set(Uuid::parse_str(&k.id)?), project_id: Set(Uuid::parse_str(&k.project_id)?), timestamp: Set(Utc::now().naive_utc()), } @@ -132,7 +142,6 @@ pub async fn process(msg: Services, db: Connection) -> Result<()> { Some(solana_nft_events::Event::ImportedExternalMint(v)) => { mints::ActiveModel { id: Set(Uuid::parse_str(&k.id)?), - owner: Set(v.owner), collection_id: Set(Uuid::parse_str(&v.collection_id)?), project_id: Set(Uuid::parse_str(&k.project_id)?), timestamp: Set(Utc::now().naive_utc()), diff --git a/app/src/graphql/mod.rs b/app/src/graphql/mod.rs new file mode 100644 index 0000000..b65bf6f --- /dev/null +++ b/app/src/graphql/mod.rs @@ -0,0 +1,3 @@ +pub mod objects; +pub mod queries; +pub mod schema; diff --git a/app/src/graphql/objects/collection.rs b/app/src/graphql/objects/collection.rs new file mode 100644 index 0000000..4745973 --- /dev/null +++ b/app/src/graphql/objects/collection.rs @@ -0,0 +1,38 @@ +use async_graphql::{ComplexObject, Context, Result, SimpleObject}; +use hub_core::uuid::Uuid; + +use crate::graphql::{ + objects::{DataPoint, Interval, Order}, + queries::analytics::Query, +}; + +#[derive(Debug, Clone, SimpleObject)] +#[graphql(complex)] +pub struct Collection { + #[graphql(external)] + pub id: Uuid, +} + +#[ComplexObject] +impl Collection { + #[allow(clippy::too_many_arguments)] + async fn analytics( + &self, + ctx: &Context<'_>, + interval: Option, + order: Option, + limit: Option, + ) -> Result> { + Query::analytics( + &Query, + ctx, + None, + None, + Some(self.id), + interval, + order, + limit, + ) + .await + } +} diff --git a/app/src/graphql/objects/datapoint.rs b/app/src/graphql/objects/datapoint.rs new file mode 100644 index 0000000..ed621e9 --- /dev/null +++ b/app/src/graphql/objects/datapoint.rs @@ -0,0 +1,422 @@ +use std::{fmt, str::FromStr}; + +use async_graphql::{Enum, Error, InputObject, SimpleObject}; +pub use cube_client::models::{v1_time::TimeGranularity, V1LoadResponse}; +use hub_core::{ + anyhow::Result, + chrono::{NaiveDate, NaiveDateTime}, + uuid::Uuid, +}; +use serde::{Deserialize, Serialize}; +use serde_json::Value; + +/// A `DataPoint` object containing analytics information. +#[derive(Debug, Default, Clone, Serialize, Deserialize, SimpleObject)] +pub struct DataPoint { + /// Analytics data for mints. + #[serde(skip_serializing_if = "Option::is_none")] + pub mints: Option>, + /// Analytics data for customers. + #[serde(skip_serializing_if = "Option::is_none")] + pub customers: Option>, + /// Analytics data for collections. + #[serde(skip_serializing_if = "Option::is_none")] + pub collections: Option>, + /// Analytics data for wallets. + #[serde(skip_serializing_if = "Option::is_none")] + pub wallets: Option>, + /// Analytics data for projects. + #[serde(skip_serializing_if = "Option::is_none")] + pub projects: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + pub webhooks: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + pub credits: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + pub transfers: Option>, + #[graphql(visible = false)] + pub timestamp: Option, +} + +macro_rules! merge_fields { + ($self:expr, $other:expr, $($field:ident),+) => { + $( + if let Some(ref mut dest) = $self.$field { + if let Some(src) = &$other.$field { + dest.extend_from_slice(src); + } + } else { + $self.$field = $other.$field.clone(); + } + )+ + }; +} + +macro_rules! set_field { + ($self:expr, $resource:expr, $data:expr, $(($enum_variant:ident, $field:ident)),+ ) => { + match $resource { + $( + Resource::$enum_variant => { + $self.$field.get_or_insert_with(Vec::new).push($data.clone()); + } + ),+ + } + }; +} + +impl DataPoint { + #[must_use] + pub fn new() -> Self { + Self { + mints: None, + customers: None, + collections: None, + wallets: None, + projects: None, + transfers: None, + webhooks: None, + credits: None, + timestamp: None, + } + } + + pub fn set(&mut self, resource: Resource, data: &Data, timestamp: Option) { + self.timestamp = timestamp; + set_field!( + self, + resource, + data, + (Mints, mints), + (Customers, customers), + (Wallets, wallets), + (Collections, collections), + (Projects, projects), + (Transfers, transfers), + (Webhooks, webhooks), + (Credits, credits) + ); + } + pub fn merge(&mut self, other: &DataPoint) { + merge_fields!( + self, + other, + mints, + customers, + wallets, + collections, + projects, + transfers, + webhooks, + credits + ); + } +} + +#[derive(Debug, Default, Clone, Serialize, Deserialize, SimpleObject)] +pub struct Data { + /// Count for the metric. + #[serde(skip_serializing_if = "Option::is_none")] + pub count: Option, + /// The ID of the organization the data belongs to. + #[serde(skip_serializing_if = "Option::is_none")] + pub organization_id: Option, + /// The ID of the collection the data belongs to. + #[serde(skip_serializing_if = "Option::is_none")] + pub collection_id: Option, + /// The ID of the project the data belongs to. + #[serde(skip_serializing_if = "Option::is_none")] + pub project_id: Option, + /// the timestamp associated with the data point. + #[serde(skip_serializing_if = "Option::is_none")] + pub timestamp: Option, +} + +#[derive(Enum, Copy, Clone, Eq, PartialEq)] +pub enum Granularity { + Hour, + Day, + Week, + Month, + Year, +} + +impl fmt::Display for Granularity { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let s = match self { + Granularity::Hour => "hour", + Granularity::Day => "day", + _ => "week", + }; + write!(f, "{s}") + } +} + +#[derive(InputObject)] +pub struct Measure { + pub resource: Resource, + pub operation: Operation, +} + +impl Measure { + #[must_use] + pub fn new(resource: Resource, operation: Operation) -> Self { + Self { + resource, + operation, + } + } + #[must_use] + pub fn as_string(&self) -> String { + format!("{}.{}", self.resource, self.operation) + } +} + +#[derive(Enum, Copy, Clone, Eq, PartialEq)] +pub enum Operation { + Count, + Change, +} + +impl fmt::Display for Operation { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let s = match self { + Operation::Count => "count", + Operation::Change => "change", + }; + write!(f, "{s}") + } +} + +#[derive(Debug, Enum, Copy, Clone, Eq, PartialEq)] +pub enum Resource { + Mints, + Customers, + Wallets, + Collections, + Projects, + Transfers, + Webhooks, + Credits, +} + +impl fmt::Display for Resource { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let s = match self { + Resource::Mints => "mints", + Resource::Customers => "customers", + Resource::Wallets => "wallets", + Resource::Collections => "collections", + Resource::Projects => "projects", + Resource::Transfers => "transfers", + Resource::Webhooks => "webhooks", + Resource::Credits => "credits", + }; + write!(f, "{s}") + } +} + +impl FromStr for Resource { + type Err = (); + + fn from_str(s: &str) -> Result { + match s { + "mints" => Ok(Resource::Mints), + "customers" => Ok(Resource::Customers), + "wallets" => Ok(Resource::Wallets), + "collections" => Ok(Resource::Collections), + "projects" => Ok(Resource::Projects), + "transfers" => Ok(Resource::Transfers), + "webhooks" => Ok(Resource::Webhooks), + "credits" => Ok(Resource::Credits), + _ => Err(()), + } + } +} + +#[derive(Enum, Copy, Clone, Eq, PartialEq)] +pub enum Order { + Asc, + Desc, +} + +impl fmt::Display for Order { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let s = match self { + Order::Asc => "asc", + Order::Desc => "desc", + }; + write!(f, "{s}") + } +} + +#[derive(Enum, Copy, Clone, Eq, PartialEq)] +pub enum Dimension { + Collections, + Projects, + Organizations, +} + +impl fmt::Display for Dimension { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let s = match self { + Dimension::Collections => "collections", + Dimension::Projects => "projects", + Dimension::Organizations => "organizations", + }; + write!(f, "{s}") + } +} + +#[derive(InputObject)] +pub struct DateRange { + pub start: Option, + pub end: Option, + pub interval: Option, +} + +#[derive(Default, Enum, Copy, Clone, Eq, PartialEq)] +pub enum Interval { + All, + #[default] + Today, + Yesterday, + ThisWeek, + ThisMonth, + ThisYear, + Last7Days, + Last30Days, + LastWeek, + LastMonth, + LastQuarter, + LastYear, +} + +impl fmt::Display for Interval { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let s = match self { + Interval::All => "all", + Interval::Today => "today", + Interval::Yesterday => "yesterday", + Interval::ThisWeek => "this week", + Interval::ThisMonth => "this month", + Interval::ThisYear => "this year", + Interval::Last7Days => "last 7 days", + Interval::Last30Days => "last 30 days", + Interval::LastWeek => "last week", + Interval::LastMonth => "last month", + Interval::LastQuarter => "last quarter", + Interval::LastYear => "last year", + }; + write!(f, "{s}") + } +} +impl Interval { + #[must_use] + pub fn to_granularity(&self) -> Granularity { + match self { + Interval::Today | Interval::Yesterday => Granularity::Hour, + Interval::ThisWeek + | Interval::All + | Interval::Last7Days + | Interval::LastWeek + | Interval::ThisMonth + | Interval::Last30Days + | Interval::LastMonth => Granularity::Day, + Interval::LastQuarter => Granularity::Week, + Interval::ThisYear | Interval::LastYear => Granularity::Month, + } + } +} +impl From for Vec { + fn from(date_range: DateRange) -> Self { + vec![ + date_range.start.unwrap().format("%Y-%m-%d").to_string(), + date_range.end.unwrap().format("%Y-%m-%d").to_string(), + ] + } +} + +impl From for TimeGranularity { + fn from(input: Granularity) -> Self { + match input { + Granularity::Hour => TimeGranularity::Minute, + Granularity::Day => TimeGranularity::Hour, + Granularity::Week | Granularity::Month => TimeGranularity::Day, + Granularity::Year => TimeGranularity::Month, + } + } +} + +pub struct DataPoints(Vec); +impl DataPoints { + #[must_use] + pub fn into_vec(self) -> Vec { + self.0 + } +} +impl DataPoints { + /// Helper function to get a field and parse it as u64. + fn parse_count(value: &Value, resource: &str) -> Option { + value + .get(&format!("{resource}.count")) + .and_then(Value::as_str) + .and_then(|s| s.parse().ok()) + } + + /// Helper function to get a field and parse it as Uuid. + fn parse_uuid(value: &Value, field: &str) -> Option { + value + .get(field) + .and_then(Value::as_str) + .and_then(|s| Uuid::parse_str(s).ok()) + } + + /// Helper function to get a field and parse it as `NaiveDateTime`. + fn parse_timestamp(value: &Value, field: &str) -> Option { + value + .get(field) + .and_then(Value::as_str) + .and_then(|s| NaiveDateTime::parse_from_str(s, "%Y-%m-%dT%H:%M:%S%.f").ok()) + } + + /// # Returns + /// a vector of datapoints parsed from the response coming from Cube API + /// + /// # Errors + /// This function returns an error if there was a problem with retrieving the data points. + pub fn from_response(response: &str, resource: Resource) -> Result { + let response: V1LoadResponse = + serde_json::from_str(response).map_err(|e| Error::new(e.to_string()))?; + + hub_core::tracing::info!("Res: {:#?}", response); + let data = response + .results + .first() + .ok_or_else(|| Error::new("No results found"))? + .data + .iter() + .map(|v| { + let mut data_point = DataPoint::new(); + let data = Self::parse_data(v, &resource.to_string()); + data_point.set( + resource, + &data, + Self::parse_timestamp(v, &format!("{resource}.timestamp")), + ); + data_point + }) + .collect(); + + Ok(DataPoints(data)) + } + + fn parse_data(value: &Value, resource: &str) -> Data { + Data { + count: Self::parse_count(value, resource), + organization_id: Self::parse_uuid(value, "projects.organization_id"), + project_id: Self::parse_uuid(value, &format!("{resource}.project_id")), + collection_id: Self::parse_uuid(value, "mints.collection_id"), + timestamp: Self::parse_timestamp(value, &format!("{resource}.timestamp")), + } + } +} diff --git a/app/src/graphql/objects/mod.rs b/app/src/graphql/objects/mod.rs new file mode 100644 index 0000000..1c2c6f0 --- /dev/null +++ b/app/src/graphql/objects/mod.rs @@ -0,0 +1,15 @@ +mod collection; +mod datapoint; +mod organization; +mod project; + +pub use collection::Collection; +pub use cube_client::models::{ + V1LoadRequestQueryFilterItem, V1LoadRequestQueryTimeDimension, V1LoadResponse, +}; +pub use datapoint::{ + DataPoint, DataPoints, DateRange, Dimension, Granularity, Interval, Measure, Operation, Order, + Resource, TimeGranularity, +}; +pub use organization::Organization; +pub use project::Project; diff --git a/app/src/graphql/objects/organization.rs b/app/src/graphql/objects/organization.rs new file mode 100644 index 0000000..cb4c51f --- /dev/null +++ b/app/src/graphql/objects/organization.rs @@ -0,0 +1,37 @@ +use async_graphql::{ComplexObject, Context, Result, SimpleObject}; +use hub_core::uuid::Uuid; + +use crate::graphql::{ + objects::{DataPoint, Interval, Order}, + queries::analytics::Query, +}; + +#[derive(Debug, Clone, SimpleObject)] +#[graphql(complex)] +pub struct Organization { + #[graphql(external)] + pub id: Uuid, +} + +#[ComplexObject] +impl Organization { + async fn analytics( + &self, + ctx: &Context<'_>, + interval: Option, + order: Option, + limit: Option, + ) -> Result> { + Query::analytics( + &Query, + ctx, + Some(self.id), + None, + None, + interval, + order, + limit, + ) + .await + } +} diff --git a/app/src/graphql/objects/project.rs b/app/src/graphql/objects/project.rs new file mode 100644 index 0000000..b857dab --- /dev/null +++ b/app/src/graphql/objects/project.rs @@ -0,0 +1,37 @@ +use async_graphql::{ComplexObject, Context, Result, SimpleObject}; +use hub_core::uuid::Uuid; + +use crate::graphql::{ + objects::{DataPoint, Interval, Order}, + queries::analytics::Query, +}; + +#[derive(Debug, Clone, SimpleObject)] +#[graphql(complex)] +pub struct Project { + #[graphql(external)] + pub id: Uuid, +} + +#[ComplexObject] +impl Project { + async fn analytics( + &self, + ctx: &Context<'_>, + interval: Option, + order: Option, + limit: Option, + ) -> Result> { + Query::analytics( + &Query, + ctx, + None, + Some(self.id), + None, + interval, + order, + limit, + ) + .await + } +} diff --git a/app/src/graphql/queries/analytics.rs b/app/src/graphql/queries/analytics.rs new file mode 100644 index 0000000..2ef3949 --- /dev/null +++ b/app/src/graphql/queries/analytics.rs @@ -0,0 +1,186 @@ +use std::collections::BTreeMap; + +use async_graphql::{Context, Object, Result}; +use either::Either; +use hub_core::{ + chrono::{NaiveDate, NaiveDateTime}, + uuid::Uuid, +}; + +use crate::{ + cube_client::{Client, Query as CubeQuery}, + graphql::objects::{ + DataPoint, DataPoints, Interval, Measure, Operation, Order, Resource, TimeGranularity, + V1LoadRequestQueryFilterItem as Filter, V1LoadRequestQueryTimeDimension as TimeDimension, + }, +}; + +#[derive(Debug, Clone, Default)] +pub struct Query; + +#[Object(name = "AnalyticsQuery")] +impl Query { + /// Returns a list of data points for a specific collection and timeframe. + /// + /// # Arguments + /// * `organizationId` - The ID of the organization + /// * `projectId` - The ID of the project. + /// * `collectionId` - The ID of the collection. + /// * `measures` - An map array of resources to query (resource, operation). + /// * `interval` - The timeframe interval. `TODAY` | `YESTERDAY` | `THIS_MONTH` | `LAST_MONTH` + /// * `order` - order the results by ASC or DESC. + /// * `limit` - Optional limit on the number of data points to retrieve. + /// + /// # Returns + /// A vector of Analytics objects representing the analytics data. + /// + /// # Errors + /// This function returns an error if there was a problem with retrieving the data points. + #[allow(clippy::too_many_arguments)] + pub async fn analytics( + &self, + ctx: &Context<'_>, + organization_id: Option, + project_id: Option, + collection_id: Option, + interval: Option, + order: Option, + limit: Option, + ) -> Result> { + let cube = ctx.data::()?; + let mut datapoints = Vec::new(); + + let selections = Selection::from_context(ctx); + + let (id, root) = parse_id_and_root(organization_id, project_id, collection_id)?; + + let order = order.unwrap_or(Order::Desc); + let mut use_ts = false; + for selection in &selections { + let resource = selection.resource.to_string(); + let ts_dimension = format!("{resource}.timestamp"); + let mut td = TimeDimension::new(ts_dimension.clone()); + td.date_range(Either::Left(interval.unwrap_or_default().to_string())); + + if selection.has_ts { + use_ts = true; + td.granularity = Some(interval.unwrap_or_default().to_granularity()) + .map(|g| TimeGranularity::from(g).to_string()); + } + + let filter = Filter::new() + .member(&format!("{resource}.{root}")) + .operator("equals") + .values(vec![id.clone()]); + + let query = CubeQuery::new() + .limit(limit.unwrap_or(100)) + .order(&ts_dimension, &order.to_string()) + .measures(selection.measures.iter().map(Measure::as_string).collect()) + .dimensions(selection.dimensions.clone()) + .time_dimensions(Some(td.clone())) + .filter_member(filter); + + hub_core::tracing::info!("Query: {query:#?}"); + + datapoints.extend( + DataPoints::from_response(&cube.query(query).await?, selection.resource)? + .into_vec(), + ); + } + + let dummy_ts: NaiveDateTime = NaiveDate::from_ymd_opt(1900, 1, 1) + .unwrap() + .and_hms_opt(0, 0, 0) + .unwrap(); + + let response = if use_ts { + let mut merged: BTreeMap = BTreeMap::new(); + + for dp in &datapoints { + let timestamp = dp.timestamp.unwrap_or(dummy_ts); + merged + .entry(timestamp) + .and_modify(|existing_dp| existing_dp.merge(dp)) + .or_insert_with(|| dp.clone()); + } + + let mut datapoints: Vec = merged.into_values().collect(); + + for dp in &mut datapoints { + if dp.timestamp == Some(dummy_ts) { + dp.timestamp = None; + } + } + + if matches!(order, Order::Desc) { + datapoints.reverse(); + } + + datapoints + } else { + let mut merged = DataPoint::new(); + datapoints.iter().for_each(|dp| merged.merge(dp)); + vec![merged] + }; + Ok(response) + } +} + +pub struct Selection { + pub resource: Resource, + pub measures: Vec, + pub dimensions: Vec, + pub has_ts: bool, +} + +impl Selection { + #[must_use] + pub fn from_context(ctx: &Context<'_>) -> Vec { + let mut selections: Vec = Vec::new(); + + for field in ctx.field().selection_set() { + if let Ok(resource) = field.name().parse::() { + let mut dimensions = Vec::new(); + let mut measures = Vec::new(); + let mut has_ts = false; + for nested_field in field.selection_set() { + match nested_field.name() { + "count" => measures.push(Measure::new(resource, Operation::Count)), + "organizationId" => dimensions.push("projects.organization_id".to_string()), + "projectId" => dimensions.push(format!("{resource}.project_id")), + "collectionId" => dimensions.push(format!("{resource}.collection_id")), + "timestamp" => has_ts = true, + _ => {}, + } + } + + let selection = Selection { + resource, + measures, + dimensions, + has_ts, + }; + + selections.push(selection); + } + } + + selections + } +} + +fn parse_id_and_root( + organization_id: Option, + project_id: Option, + collection_id: Option, +) -> Result<(String, &'static str), async_graphql::Error> { + match (organization_id, project_id, collection_id) { + (Some(organization_id), None, None) => Ok((organization_id.to_string(), "organization_id")), + (None, Some(project_id), None) => Ok((project_id.to_string(), "project_id")), + (None, None, Some(collection_id)) => Ok((collection_id.to_string(), "collection_id")), + _ => Err(async_graphql::Error::new( + "No valid [project,organization,collection] ID or multiple IDs provided", + )), + } +} diff --git a/app/src/graphql/queries/collection.rs b/app/src/graphql/queries/collection.rs new file mode 100644 index 0000000..6a30521 --- /dev/null +++ b/app/src/graphql/queries/collection.rs @@ -0,0 +1,19 @@ +use async_graphql::{Context, Object, Result}; +use hub_core::uuid::Uuid; + +use crate::graphql::objects::Collection; + +#[derive(Default)] +pub struct Query; + +#[Object(name = "CollectionQuery")] +impl Query { + #[graphql(entity)] + async fn find_collection_by_id( + &self, + _ctx: &Context<'_>, + #[graphql(key)] id: Uuid, + ) -> Result { + Ok(Collection { id }) + } +} diff --git a/app/src/graphql/queries/mod.rs b/app/src/graphql/queries/mod.rs new file mode 100644 index 0000000..2ce1de1 --- /dev/null +++ b/app/src/graphql/queries/mod.rs @@ -0,0 +1,15 @@ +#![allow(clippy::unused_async)] + +pub mod analytics; +mod collection; +mod organization; +mod project; + +// // Add your other ones here to create a unified Query object +#[derive(async_graphql::MergedObject, Default)] +pub struct Query( + analytics::Query, + organization::Query, + project::Query, + collection::Query, +); diff --git a/app/src/graphql/queries/organization.rs b/app/src/graphql/queries/organization.rs new file mode 100644 index 0000000..d92361e --- /dev/null +++ b/app/src/graphql/queries/organization.rs @@ -0,0 +1,19 @@ +use async_graphql::{Context, Object, Result}; +use hub_core::uuid::Uuid; + +use crate::graphql::objects::Organization; + +#[derive(Default)] +pub struct Query; + +#[Object(name = "OrganizationQuery")] +impl Query { + #[graphql(entity)] + async fn find_organization_by_id( + &self, + _ctx: &Context<'_>, + #[graphql(key)] id: Uuid, + ) -> Result { + Ok(Organization { id }) + } +} diff --git a/app/src/graphql/queries/project.rs b/app/src/graphql/queries/project.rs new file mode 100644 index 0000000..759c29f --- /dev/null +++ b/app/src/graphql/queries/project.rs @@ -0,0 +1,19 @@ +use async_graphql::{Context, Object, Result}; +use hub_core::uuid::Uuid; + +use crate::graphql::objects::Project; + +#[derive(Default)] +pub struct Query; + +#[Object(name = "ProjectQuery")] +impl Query { + #[graphql(entity)] + async fn find_project_by_id( + &self, + _ctx: &Context<'_>, + #[graphql(key)] id: Uuid, + ) -> Result { + Ok(Project { id }) + } +} diff --git a/app/src/graphql/schema.rs b/app/src/graphql/schema.rs new file mode 100644 index 0000000..96ef348 --- /dev/null +++ b/app/src/graphql/schema.rs @@ -0,0 +1,18 @@ +use async_graphql::{ + extensions::{ApolloTracing, Logger}, + EmptyMutation, EmptySubscription, Schema, +}; + +use crate::graphql::queries::Query; + +pub type AppSchema = Schema; + +/// Builds the GraphQL Schema, attaching the Database to the context +#[must_use] +pub fn build_schema() -> AppSchema { + Schema::build(Query::default(), EmptyMutation, EmptySubscription) + .extension(ApolloTracing) + .extension(Logger) + .enable_federation() + .finish() +} diff --git a/app/src/handlers.rs b/app/src/handlers.rs new file mode 100644 index 0000000..8c38b95 --- /dev/null +++ b/app/src/handlers.rs @@ -0,0 +1,35 @@ +use async_graphql::http::{playground_source, GraphQLPlaygroundConfig}; +use async_graphql_poem::{GraphQLRequest, GraphQLResponse}; +use poem::{ + handler, + web::{Data, Html}, + IntoResponse, Result, +}; + +use crate::{AppContext, AppState, UserID}; + +#[handler] +pub fn health() {} + +#[handler] +pub fn playground() -> impl IntoResponse { + Html(playground_source(GraphQLPlaygroundConfig::new("/graphql"))) +} + +#[handler] +pub async fn graphql_handler( + Data(state): Data<&AppState>, + user_id: UserID, + req: GraphQLRequest, +) -> Result { + let cube_client = &state.cube; + let UserID(user_id) = user_id; + + let context = AppContext::new(user_id); + + Ok(state + .schema + .execute(req.0.data(context).data(cube_client.clone())) + .await + .into()) +} diff --git a/app/src/lib.rs b/app/src/lib.rs index 7e5c7ae..ece4421 100644 --- a/app/src/lib.rs +++ b/app/src/lib.rs @@ -2,11 +2,16 @@ #![warn(clippy::pedantic, clippy::cargo)] #![allow(clippy::module_name_repetitions)] +pub mod cube_client; pub mod db; +#[allow(clippy::pedantic)] pub mod entities; pub mod events; -use hub_core::{clap, consumer::RecvError, prelude::*}; - +pub mod graphql; +pub mod handlers; +use db::Connection; +use hub_core::{clap, consumer::RecvError, prelude::*, uuid::Uuid}; +use poem::{async_trait, FromRequest, Request, RequestBody}; #[allow(clippy::pedantic)] pub mod proto { include!(concat!(env!("OUT_DIR"), "/organization.proto.rs")); @@ -24,6 +29,7 @@ pub enum Services { Organizations(proto::OrganizationEventKey, proto::OrganizationEvents), Customers(proto::CustomerEventKey, proto::CustomerEvents), Treasuries(proto::TreasuryEventKey, proto::TreasuryEvents), + Webhooks(proto::WebhookEventKey, proto::WebhookEvents), Nfts(proto::NftEventKey, proto::NftEvents), SolanaNfts(proto::SolanaNftEventKey, proto::SolanaNftEvents), } @@ -33,6 +39,8 @@ impl hub_core::consumer::MessageGroup for Services { "hub-orgs", "hub-customers", "hub-treasuries", + "hub-webhooks", + "hub-credits", "hub-nfts", "hub-nfts-solana", "hub-nfts-polygon", @@ -63,6 +71,8 @@ impl hub_core::consumer::MessageGroup for Services { Ok(Services::Treasuries(key, val)) }, + "hub-webhooks" => todo!(), + "hub-credits" => todo!(), "hub-nfts" => { let key = proto::NftEventKey::decode(key)?; let val = proto::NftEvents::decode(val)?; @@ -83,6 +93,71 @@ impl hub_core::consumer::MessageGroup for Services { #[derive(Debug, clap::Args)] #[command(version, author, about)] pub struct Args { + #[arg(short, long, env, default_value_t = 3008)] + pub port: u16, + #[command(flatten)] pub db: db::DbArgs, + + #[command(flatten)] + pub cube: cube_client::CubeArgs, +} + +#[derive(Debug, Clone, Copy)] +pub struct UserID(Option); + +impl TryFrom<&str> for UserID { + type Error = Error; + + fn try_from(value: &str) -> Result { + let id = Uuid::from_str(value)?; + + Ok(Self(Some(id))) + } +} + +#[async_trait] +impl<'a> FromRequest<'a> for UserID { + async fn from_request(req: &'a Request, _body: &mut RequestBody) -> poem::Result { + let id = req + .headers() + .get("X-USER-ID") + .and_then(|value| value.to_str().ok()) + .map_or(Ok(Self(None)), Self::try_from)?; + + Ok(id) + } +} + +#[derive(Clone)] +pub struct AppState { + pub schema: graphql::schema::AppSchema, + pub connection: Connection, + pub cube: cube_client::Client, +} + +impl AppState { + #[must_use] + pub fn new( + schema: graphql::schema::AppSchema, + connection: Connection, + cube: cube_client::Client, + ) -> Self { + Self { + schema, + connection, + cube, + } + } +} + +pub struct AppContext { + pub user_id: Option, +} + +impl AppContext { + #[must_use] + pub fn new(user_id: Option) -> Self { + Self { user_id } + } } diff --git a/app/src/main.rs b/app/src/main.rs index 104b8bc..6cd0d2f 100644 --- a/app/src/main.rs +++ b/app/src/main.rs @@ -1,10 +1,16 @@ -//! - -use holaplex_hub_analytics::{db::Connection, events, Args, Services}; +use holaplex_hub_analytics::{ + cube_client::Client, + db::Connection, + events, + graphql::schema::build_schema, + handlers::{graphql_handler, health, playground}, + AppState, Args, Services, +}; use hub_core::{ prelude::*, tokio::{self, task}, }; +use poem::{get, listener::TcpListener, middleware::AddData, post, EndpointExt, Route, Server}; pub fn main() { let opts = hub_core::StartConfig { @@ -12,29 +18,50 @@ pub fn main() { }; hub_core::run(opts, |common, args| { - let Args { db } = args; + let Args { port, db, cube } = args; common.rt.block_on(async move { - let cons = common.consumer_cfg.build::().await?; let connection = Connection::new(db) .await .context("failed to get database connection")?; - let mut stream = cons.stream(); - loop { - let connection = connection.clone(); - match stream.next().await { - Some(Ok(msg)) => { - info!(?msg, "message received"); - tokio::spawn(async move { events::process(msg, connection.clone()).await }); - task::yield_now().await; - }, - None => (), - Some(Err(e)) => { - warn!("failed to get message {:?}", e); - }, + let schema = build_schema(); + let cube_client = Client::from_args(&cube)?; + let state = AppState::new(schema, connection.clone(), cube_client.clone()); + let cons = common.consumer_cfg.build::().await?; + + tokio::spawn(async move { + { + let mut stream = cons.stream(); + loop { + let connection = connection.clone(); + match stream.next().await { + Some(Ok(msg)) => { + info!(?msg, "message received"); + + tokio::spawn(async move { + events::process(msg, connection.clone()).await + }); + task::yield_now().await; + }, + None => (), + Some(Err(e)) => { + warn!("failed to get message {:?}", e); + }, + } + } } - } + }); + + Server::new(TcpListener::bind(format!("0.0.0.0:{port}"))) + .run( + Route::new() + .at("/graphql", post(graphql_handler).with(AddData::new(state))) + .at("/playground", get(playground)) + .at("/health", get(health)), + ) + .await + .context("failed to build graphql server") }) }); } diff --git a/migration/src/lib.rs b/migration/src/lib.rs index 36f68eb..294f915 100644 --- a/migration/src/lib.rs +++ b/migration/src/lib.rs @@ -6,6 +6,9 @@ mod m20230804_212603_create_customers_table; mod m20230804_213809_create_collections_table; mod m20230804_214701_create_mints_table; mod m20230805_140311_create_wallets_table; +mod m20230818_030012_create_webhooks_table; +mod m20230818_031112_create_credits_table; +mod m20231804_024905_create_transfers_table; pub struct Migrator; @@ -19,6 +22,9 @@ impl MigratorTrait for Migrator { Box::new(m20230804_213809_create_collections_table::Migration), Box::new(m20230804_214701_create_mints_table::Migration), Box::new(m20230805_140311_create_wallets_table::Migration), + Box::new(m20231804_024905_create_transfers_table::Migration), + Box::new(m20230818_030012_create_webhooks_table::Migration), + Box::new(m20230818_031112_create_credits_table::Migration), ] } } diff --git a/migration/src/m20230804_212530_create_projects_table.rs b/migration/src/m20230804_212530_create_projects_table.rs index 00de9f6..0eb85df 100644 --- a/migration/src/m20230804_212530_create_projects_table.rs +++ b/migration/src/m20230804_212530_create_projects_table.rs @@ -24,6 +24,7 @@ impl MigrationTrait for Migration { .on_delete(ForeignKeyAction::Cascade) .on_update(ForeignKeyAction::Cascade), ) + .col(ColumnDef::new(Projects::Timestamp).timestamp().not_null()) .to_owned(), ) .await?; @@ -53,4 +54,5 @@ pub enum Projects { Id, Name, OrganizationId, + Timestamp, } diff --git a/migration/src/m20230804_212603_create_customers_table.rs b/migration/src/m20230804_212603_create_customers_table.rs index bb31764..3edc4f3 100644 --- a/migration/src/m20230804_212603_create_customers_table.rs +++ b/migration/src/m20230804_212603_create_customers_table.rs @@ -3,6 +3,8 @@ use sea_orm_migration::prelude::*; #[derive(DeriveMigrationName)] pub struct Migration; +use crate::m20230804_212530_create_projects_table::Projects; + #[async_trait::async_trait] impl MigrationTrait for Migration { async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { @@ -18,6 +20,14 @@ impl MigrationTrait for Migration { .primary_key(), ) .col(ColumnDef::new(Customers::ProjectId).uuid().not_null()) + .foreign_key( + ForeignKey::create() + .name("fk-customers_project_id-projects") + .from(Customers::Table, Customers::ProjectId) + .to(Projects::Table, Projects::Id) + .on_delete(ForeignKeyAction::Cascade) + .on_update(ForeignKeyAction::Cascade), + ) .col(ColumnDef::new(Customers::Timestamp).timestamp().not_null()) .to_owned(), ) diff --git a/migration/src/m20230804_213809_create_collections_table.rs b/migration/src/m20230804_213809_create_collections_table.rs index ab7db3c..081270f 100644 --- a/migration/src/m20230804_213809_create_collections_table.rs +++ b/migration/src/m20230804_213809_create_collections_table.rs @@ -19,7 +19,6 @@ impl MigrationTrait for Migration { .not_null() .primary_key(), ) - .col(ColumnDef::new(Collections::Name).string().not_null()) .col(ColumnDef::new(Collections::ProjectId).uuid().not_null()) .foreign_key( ForeignKey::create() @@ -62,7 +61,6 @@ impl MigrationTrait for Migration { pub enum Collections { Table, Id, - Name, Blockchain, ProjectId, Timestamp, diff --git a/migration/src/m20230804_214701_create_mints_table.rs b/migration/src/m20230804_214701_create_mints_table.rs index 8e6f5b7..8d39652 100644 --- a/migration/src/m20230804_214701_create_mints_table.rs +++ b/migration/src/m20230804_214701_create_mints_table.rs @@ -17,7 +17,6 @@ impl MigrationTrait for Migration { .table(Mints::Table) .if_not_exists() .col(ColumnDef::new(Mints::Id).uuid().not_null().primary_key()) - .col(ColumnDef::new(Mints::Owner).string().not_null()) .col(ColumnDef::new(Mints::CollectionId).uuid().not_null()) .foreign_key( ForeignKey::create() @@ -44,12 +43,12 @@ impl MigrationTrait for Migration { manager .create_index( IndexCreateStatement::new() - .name("mints_collection_id_idx") - .table(Mints::Table) - .col(Mints::CollectionId) - .index_type(IndexType::Hash) - .if_not_exists() // Adding this line to conditionally create the index - .to_owned(), + .name("mints_collection_id_idx") + .table(Mints::Table) + .col(Mints::CollectionId) + .index_type(IndexType::Hash) + .if_not_exists() + .to_owned(), ) .await } @@ -66,7 +65,6 @@ enum Mints { Table, Id, CollectionId, - Owner, ProjectId, Timestamp, } diff --git a/migration/src/m20230805_140311_create_wallets_table.rs b/migration/src/m20230805_140311_create_wallets_table.rs index 42a65a5..6b4ea9d 100644 --- a/migration/src/m20230805_140311_create_wallets_table.rs +++ b/migration/src/m20230805_140311_create_wallets_table.rs @@ -1,6 +1,9 @@ use sea_orm_migration::prelude::*; -use crate::m20230804_212530_create_projects_table::Projects; +use crate::{ + m20230804_212530_create_projects_table::Projects, + m20230804_212603_create_customers_table::Customers, +}; #[derive(DeriveMigrationName)] pub struct Migration; @@ -23,6 +26,15 @@ impl MigrationTrait for Migration { .on_delete(ForeignKeyAction::Cascade) .on_update(ForeignKeyAction::Cascade), ) + .col(ColumnDef::new(Wallets::CustomerId).uuid().not_null()) + .foreign_key( + ForeignKey::create() + .name("fk-wallets_customer_id-customers") + .from(Wallets::Table, Wallets::CustomerId) + .to(Customers::Table, Customers::Id) + .on_delete(ForeignKeyAction::Cascade) + .on_update(ForeignKeyAction::Cascade), + ) .col(ColumnDef::new(Wallets::Blockchain).string().not_null()) .col(ColumnDef::new(Wallets::Timestamp).timestamp().not_null()) .to_owned(), @@ -52,6 +64,7 @@ impl MigrationTrait for Migration { pub enum Wallets { Table, Id, + CustomerId, ProjectId, Blockchain, Timestamp, diff --git a/migration/src/m20230818_030012_create_webhooks_table.rs b/migration/src/m20230818_030012_create_webhooks_table.rs new file mode 100644 index 0000000..46eb662 --- /dev/null +++ b/migration/src/m20230818_030012_create_webhooks_table.rs @@ -0,0 +1,69 @@ +use sea_orm_migration::prelude::*; + +use crate::{ + m20230804_212412_create_organizations_table::Organizations, + m20230804_212530_create_projects_table::Projects, +}; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .create_table( + Table::create() + .table(Webhooks::Table) + .if_not_exists() + .col(ColumnDef::new(Webhooks::Id).uuid().not_null().primary_key()) + .col(ColumnDef::new(Webhooks::OrganizationId).uuid().not_null()) + .foreign_key( + ForeignKey::create() + .name("fk-webhooks_organization_id-organizations") + .from(Webhooks::Table, Webhooks::ProjectId) + .to(Organizations::Table, Organizations::Id) + .on_delete(ForeignKeyAction::Cascade) + .on_update(ForeignKeyAction::Cascade), + ) + .col(ColumnDef::new(Webhooks::ProjectId).uuid().not_null()) + .foreign_key( + ForeignKey::create() + .name("fk-webhooks_project_id-projects") + .from(Webhooks::Table, Webhooks::ProjectId) + .to(Projects::Table, Projects::Id) + .on_delete(ForeignKeyAction::Cascade) + .on_update(ForeignKeyAction::Cascade), + ) + .col(ColumnDef::new(Webhooks::Timestamp).timestamp().not_null()) + .to_owned(), + ) + .await?; + + manager + .create_index( + IndexCreateStatement::new() + .name("webhooks_project_id_idx") + .table(Webhooks::Table) + .col(Webhooks::OrganizationId) + .index_type(IndexType::Hash) + .to_owned(), + ) + .await + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .drop_table(Table::drop().table(Webhooks::Table).to_owned()) + .await + } +} + +#[derive(Iden)] +pub enum Webhooks { + Table, + Id, + ProjectId, + OrganizationId, + Timestamp, +} diff --git a/migration/src/m20230818_031112_create_credits_table.rs b/migration/src/m20230818_031112_create_credits_table.rs new file mode 100644 index 0000000..d797957 --- /dev/null +++ b/migration/src/m20230818_031112_create_credits_table.rs @@ -0,0 +1,58 @@ +use sea_orm_migration::prelude::*; + +use crate::m20230804_212412_create_organizations_table::Organizations; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .create_table( + Table::create() + .table(Credits::Table) + .if_not_exists() + .col(ColumnDef::new(Credits::Id).uuid().not_null().primary_key()) + .col(ColumnDef::new(Credits::OrganizationId).uuid().not_null()) + .col(ColumnDef::new(Credits::Amount).big_integer().not_null()) + .foreign_key( + ForeignKey::create() + .name("fk-credits_organization_id-organizations") + .from(Credits::Table, Credits::OrganizationId) + .to(Organizations::Table, Organizations::Id) + .on_delete(ForeignKeyAction::Cascade) + .on_update(ForeignKeyAction::Cascade), + ) + .col(ColumnDef::new(Credits::Timestamp).timestamp().not_null()) + .to_owned(), + ) + .await?; + + manager + .create_index( + IndexCreateStatement::new() + .name("credits_organization_id_idx") + .table(Credits::Table) + .col(Credits::OrganizationId) + .index_type(IndexType::Hash) + .to_owned(), + ) + .await + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .drop_table(Table::drop().table(Credits::Table).to_owned()) + .await + } +} + +#[derive(Iden)] +pub enum Credits { + Table, + Id, + Amount, + OrganizationId, + Timestamp, +} diff --git a/migration/src/m20231804_024905_create_transfers_table.rs b/migration/src/m20231804_024905_create_transfers_table.rs new file mode 100644 index 0000000..94f8b55 --- /dev/null +++ b/migration/src/m20231804_024905_create_transfers_table.rs @@ -0,0 +1,62 @@ +use sea_orm_migration::prelude::*; + +use crate::m20230804_212530_create_projects_table::Projects; + +#[derive(DeriveMigrationName)] +pub struct Migration; + +#[async_trait::async_trait] +impl MigrationTrait for Migration { + async fn up(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .create_table( + Table::create() + .table(Transfers::Table) + .if_not_exists() + .col( + ColumnDef::new(Transfers::Id) + .uuid() + .not_null() + .primary_key(), + ) + .col(ColumnDef::new(Transfers::ProjectId).uuid().not_null()) + .foreign_key( + ForeignKey::create() + .name("fk-transfers_project_id-projects") + .from(Transfers::Table, Transfers::ProjectId) + .to(Projects::Table, Projects::Id) + .on_delete(ForeignKeyAction::Cascade) + .on_update(ForeignKeyAction::Cascade), + ) + .col(ColumnDef::new(Transfers::Timestamp).timestamp().not_null()) + .to_owned(), + ) + .await?; + + manager + .create_index( + IndexCreateStatement::new() + .name("transfers_project_id_idx") + .table(Transfers::Table) + .col(Transfers::ProjectId) + .index_type(IndexType::Hash) + .if_not_exists() + .to_owned(), + ) + .await + } + + async fn down(&self, manager: &SchemaManager) -> Result<(), DbErr> { + manager + .drop_table(Table::drop().table(Transfers::Table).to_owned()) + .await + } +} + +#[derive(Iden)] +enum Transfers { + Table, + Id, + ProjectId, + Timestamp, +} diff --git a/rust-toolchain.toml b/rust-toolchain.toml index b4c2b62..db8508b 100644 --- a/rust-toolchain.toml +++ b/rust-toolchain.toml @@ -1,5 +1,5 @@ [toolchain] -channel = "1.70.0" +channel = "1.71.0" components = [ "cargo", "clippy",