From 9035b4a6dcabd508e4216442027c38bc55d0700d Mon Sep 17 00:00:00 2001 From: Matthieu Vachon Date: Tue, 31 Oct 2023 11:04:34 -0400 Subject: [PATCH 1/3] Enable primary key to be either Single or Composite via a new `enum` --- src/tables.rs | 27 +++++++++++++++++++++------ 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/src/tables.rs b/src/tables.rs index bf81d06..46eac17 100644 --- a/src/tables.rs +++ b/src/tables.rs @@ -1,5 +1,5 @@ use crate::pb::database::{table_change::Operation, DatabaseChanges, Field, TableChange}; -use std::collections::HashMap; +use std::collections::{BTreeMap, HashMap}; use substreams::{ scalar::{BigDecimal, BigInt}, Hex, @@ -22,7 +22,7 @@ impl Tables { let rows = self.tables.entry(table.to_string()).or_insert(Rows::new()); let row = rows .pks - .entry(key.as_ref().to_string()) + .entry(PrimaryKey::Single(key.as_ref().to_string())) .or_insert(Row::new()); match row.operation { Operation::Unspecified => { @@ -47,7 +47,7 @@ impl Tables { let rows = self.tables.entry(table.to_string()).or_insert(Rows::new()); let row = rows .pks - .entry(key.as_ref().to_string()) + .entry(PrimaryKey::Single(key.as_ref().to_string())) .or_insert(Row::new()); match row.operation { Operation::Unspecified => { @@ -70,7 +70,7 @@ impl Tables { let rows = self.tables.entry(table.to_string()).or_insert(Rows::new()); let row = rows .pks - .entry(key.as_ref().to_string()) + .entry(PrimaryKey::Single(key.as_ref().to_string())) .or_insert(Row::new()); match row.operation { Operation::Unspecified => { @@ -101,7 +101,16 @@ impl Tables { continue; } - let mut change = TableChange::new(table.clone(), pk, 0, row.operation); + let mut change = match pk { + PrimaryKey::Single(pk) => TableChange::new(table.clone(), pk, 0, row.operation), + PrimaryKey::Composite(keys) => TableChange::new_composite( + table.clone(), + keys.into_iter().collect(), + 0, + row.operation, + ), + }; + for (field, value) in row.columns.into_iter() { change.fields.push(Field { name: field, @@ -118,10 +127,16 @@ impl Tables { } } +#[derive(Hash, Debug, Eq, PartialEq)] +pub enum PrimaryKey { + Single(String), + Composite(BTreeMap), +} + #[derive(Debug)] pub struct Rows { // Map of primary keys within this table, to the fields within - pub pks: HashMap, + pub pks: HashMap, } impl Rows { From 7902be42394c9a4d2b1d0bfe8319b5ae60f2a204 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Duchesneau?= Date: Tue, 31 Oct 2023 14:16:32 -0400 Subject: [PATCH 2/3] complete support for composite primary keys on 'Rows' methods --- CHANGELOG.md | 13 ++++++ src/tables.rs | 109 +++++++++++++++++++++++++++++++++++++++++--------- 2 files changed, 103 insertions(+), 19 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 487fd93..d70a0d3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,6 +4,19 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## Unreleased + +Better support for using composite primary keys: + +* New enum in this crate: `tables::PrimaryKey` + * Single(string): `let sng_pk: PrimaryKey = "hello world".into()` + * Composite(BTreeMap: `let cmp_pk: PrimaryKey = [("evt_tx_hash","hello".to_string()),("evt_index","world".to_string())].into()` + +Breaking changes: + +* The `Rows` struct now requires pks to be of that new `PrimaryKey` type. +* create_row(), update_row() and delete_row() now require a `PrimaryKey` instead of a String. + ## [1.2.1] * Changed imports in `substreams.yaml` definition so that packaged `.spkg` can you the expect path `sf/substreams/sink/database/v1` to exclude and generating from the `.spkg` will generate data on the right path. diff --git a/src/tables.rs b/src/tables.rs index 46eac17..9e84df4 100644 --- a/src/tables.rs +++ b/src/tables.rs @@ -18,12 +18,10 @@ impl Tables { } } - pub fn create_row>(&mut self, table: &str, key: K) -> &mut Row { + pub fn create_row(&mut self, table: &str, key: PrimaryKey) -> &mut Row { let rows = self.tables.entry(table.to_string()).or_insert(Rows::new()); - let row = rows - .pks - .entry(PrimaryKey::Single(key.as_ref().to_string())) - .or_insert(Row::new()); + let key_debug = format!("{:?}", key); + let row = rows.pks.entry(key).or_insert(Row::new()); match row.operation { Operation::Unspecified => { row.operation = Operation::Create; @@ -35,20 +33,17 @@ impl Tables { Operation::Delete => { panic!( "cannot create a row after a scheduled delete operation - table: {} key: {}", - table, - key.as_ref().to_string() + table, key_debug, ) } } row } - pub fn update_row>(&mut self, table: &str, key: K) -> &mut Row { + pub fn update_row(&mut self, table: &str, key: PrimaryKey) -> &mut Row { let rows = self.tables.entry(table.to_string()).or_insert(Rows::new()); - let row = rows - .pks - .entry(PrimaryKey::Single(key.as_ref().to_string())) - .or_insert(Row::new()); + let key_debug = format!("{:?}", key); + let row = rows.pks.entry(key).or_insert(Row::new()); match row.operation { Operation::Unspecified => { row.operation = Operation::Update; @@ -58,20 +53,16 @@ impl Tables { Operation::Delete => { panic!( "cannot update a row after a scheduled delete operation - table: {} key: {}", - table, - key.as_ref().to_string() + table, key_debug, ) } } row } - pub fn delete_row>(&mut self, table: &str, key: K) -> &mut Row { + pub fn delete_row(&mut self, table: &str, key: PrimaryKey) -> &mut Row { let rows = self.tables.entry(table.to_string()).or_insert(Rows::new()); - let row = rows - .pks - .entry(PrimaryKey::Single(key.as_ref().to_string())) - .or_insert(Row::new()); + let row = rows.pks.entry(key).or_insert(Row::new()); match row.operation { Operation::Unspecified => { row.operation = Operation::Delete; @@ -133,6 +124,29 @@ pub enum PrimaryKey { Composite(BTreeMap), } +impl From<&str> for PrimaryKey { + fn from(x: &str) -> Self { + Self::Single(x.to_string()) + } +} + +impl From for PrimaryKey { + fn from(x: String) -> Self { + Self::Single(x) + } +} + +impl, const N: usize> From<[(K, String); N]> for PrimaryKey { + fn from(arr: [(K, String); N]) -> Self { + if N == 0 { + return Self::Composite(BTreeMap::new()); + } + + let string_arr = arr.map(|(k, v)| (k.as_ref().to_string(), v)); + Self::Composite(BTreeMap::from(string_arr)) + } +} + #[derive(Debug)] pub struct Rows { // Map of primary keys within this table, to the fields within @@ -255,7 +269,12 @@ impl> ToDatabaseValue for &Hex { #[cfg(test)] mod test { + use crate::pb::database::table_change::PrimaryKey; + use crate::pb::database::CompositePrimaryKey; + use crate::pb::database::{DatabaseChanges, TableChange}; + use crate::tables::Tables; use crate::tables::ToDatabaseValue; + use std::collections::HashMap; #[test] fn to_database_value_proto_timestamp() { @@ -267,4 +286,56 @@ mod test { "1970-01-01T01:01:01.000000001Z" ); } + + #[test] + fn create_row_single_pk() { + let mut tables = Tables::new(); + tables.create_row("myevent", "myhash".into()); + + assert_eq!( + tables.to_database_changes(), + DatabaseChanges { + table_changes: [TableChange { + table: "myevent".to_string(), + ordinal: 0, + operation: 1, + fields: [].into(), + primary_key: Some(PrimaryKey::Pk("myhash".to_string())), + }] + .to_vec(), + } + ); + } + + #[test] + fn create_row_composite_pk() { + let mut tables = Tables::new(); + tables.create_row( + "myevent", + [ + ("evt_tx_hash", "hello".to_string()), + ("evt_index", "world".to_string()), + ] + .into(), + ); + + assert_eq!( + tables.to_database_changes(), + DatabaseChanges { + table_changes: [TableChange { + table: "myevent".to_string(), + ordinal: 0, + operation: 1, + fields: [].into(), + primary_key: Some(PrimaryKey::CompositePk(CompositePrimaryKey { + keys: HashMap::from([ + ("evt_tx_hash".to_string(), "hello".to_string()), + ("evt_index".to_string(), "world".to_string()) + ]) + })) + }] + .to_vec(), + } + ); + } } From 8200a41ca5e559d3271c4233a5252e8007cfd5b3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?St=C3=A9phane=20Duchesneau?= Date: Tue, 31 Oct 2023 15:47:22 -0400 Subject: [PATCH 3/3] prep. 1.3.0, simplify CreateRow without explicit '.into()' --- CHANGELOG.md | 10 +++++----- src/tables.rs | 52 +++++++++++++++++++++++++++++++++++++++------------ 2 files changed, 45 insertions(+), 17 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d70a0d3..14e4d9b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,18 +4,18 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). -## Unreleased +## [1.3.0] Better support for using composite primary keys: * New enum in this crate: `tables::PrimaryKey` - * Single(string): `let sng_pk: PrimaryKey = "hello world".into()` - * Composite(BTreeMap: `let cmp_pk: PrimaryKey = [("evt_tx_hash","hello".to_string()),("evt_index","world".to_string())].into()` + * `Single(String)`: `let single: PrimaryKey = "hello world".into()` + * `Composite(BTreeMap)`: `let composite: PrimaryKey = [("evt_tx_hash","hello".to_string()),("evt_index","world".to_string())].into()` Breaking changes: -* The `Rows` struct now requires pks to be of that new `PrimaryKey` type. -* create_row(), update_row() and delete_row() now require a `PrimaryKey` instead of a String. +* The `Rows.pks` field is not public anymore. +* `create_row()`, `update_row()` and `delete_row()` now require a `PrimaryKey` instead of a `String`. This should work directly with a `String`, `&String` or `&str`. ## [1.2.1] diff --git a/src/tables.rs b/src/tables.rs index 9e84df4..39ec211 100644 --- a/src/tables.rs +++ b/src/tables.rs @@ -18,10 +18,11 @@ impl Tables { } } - pub fn create_row(&mut self, table: &str, key: PrimaryKey) -> &mut Row { + pub fn create_row>(&mut self, table: &str, key: K) -> &mut Row { let rows = self.tables.entry(table.to_string()).or_insert(Rows::new()); - let key_debug = format!("{:?}", key); - let row = rows.pks.entry(key).or_insert(Row::new()); + let k = key.into(); + let key_debug = format!("{:?}", k); + let row = rows.pks.entry(k).or_insert(Row::new()); match row.operation { Operation::Unspecified => { row.operation = Operation::Create; @@ -40,10 +41,11 @@ impl Tables { row } - pub fn update_row(&mut self, table: &str, key: PrimaryKey) -> &mut Row { + pub fn update_row>(&mut self, table: &str, key: K) -> &mut Row { let rows = self.tables.entry(table.to_string()).or_insert(Rows::new()); - let key_debug = format!("{:?}", key); - let row = rows.pks.entry(key).or_insert(Row::new()); + let k = key.into(); + let key_debug = format!("{:?}", k); + let row = rows.pks.entry(k).or_insert(Row::new()); match row.operation { Operation::Unspecified => { row.operation = Operation::Update; @@ -60,9 +62,9 @@ impl Tables { row } - pub fn delete_row(&mut self, table: &str, key: PrimaryKey) -> &mut Row { + pub fn delete_row>(&mut self, table: &str, key: PrimaryKey) -> &mut Row { let rows = self.tables.entry(table.to_string()).or_insert(Rows::new()); - let row = rows.pks.entry(key).or_insert(Row::new()); + let row = rows.pks.entry(key.into()).or_insert(Row::new()); match row.operation { Operation::Unspecified => { row.operation = Operation::Delete; @@ -130,6 +132,12 @@ impl From<&str> for PrimaryKey { } } +impl From<&String> for PrimaryKey { + fn from(x: &String) -> Self { + Self::Single(x.clone()) + } +} + impl From for PrimaryKey { fn from(x: String) -> Self { Self::Single(x) @@ -150,7 +158,7 @@ impl, const N: usize> From<[(K, String); N]> for PrimaryKey { #[derive(Debug)] pub struct Rows { // Map of primary keys within this table, to the fields within - pub pks: HashMap, + pks: HashMap, } impl Rows { @@ -272,6 +280,7 @@ mod test { use crate::pb::database::table_change::PrimaryKey; use crate::pb::database::CompositePrimaryKey; use crate::pb::database::{DatabaseChanges, TableChange}; + use crate::tables::PrimaryKey as TablesPrimaryKey; use crate::tables::Tables; use crate::tables::ToDatabaseValue; use std::collections::HashMap; @@ -287,10 +296,30 @@ mod test { ); } + #[test] + fn create_row_single_pk_direct() { + let mut tables = Tables::new(); + tables.create_row("myevent", TablesPrimaryKey::Single("myhash".to_string())); + + assert_eq!( + tables.to_database_changes(), + DatabaseChanges { + table_changes: [TableChange { + table: "myevent".to_string(), + ordinal: 0, + operation: 1, + fields: [].into(), + primary_key: Some(PrimaryKey::Pk("myhash".to_string())), + }] + .to_vec(), + } + ); + } + #[test] fn create_row_single_pk() { let mut tables = Tables::new(); - tables.create_row("myevent", "myhash".into()); + tables.create_row("myevent", "myhash"); assert_eq!( tables.to_database_changes(), @@ -315,8 +344,7 @@ mod test { [ ("evt_tx_hash", "hello".to_string()), ("evt_index", "world".to_string()), - ] - .into(), + ], ); assert_eq!(