Skip to content

Commit

Permalink
complete support for composite primary keys on 'Rows' methods
Browse files Browse the repository at this point in the history
  • Loading branch information
sduchesneau committed Oct 31, 2023
1 parent 9035b4a commit 7902be4
Show file tree
Hide file tree
Showing 2 changed files with 103 additions and 19 deletions.
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,19 @@ All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## Unreleased

Better support for using composite primary keys:

* New enum in this crate: `tables::PrimaryKey`
* Single(string): `let sng_pk: PrimaryKey = "hello world".into()`
* Composite(BTreeMap<String, String>: `let cmp_pk: PrimaryKey = [("evt_tx_hash","hello".to_string()),("evt_index","world".to_string())].into()`

Breaking changes:

* The `Rows` struct now requires pks to be of that new `PrimaryKey` type.
* create_row(), update_row() and delete_row() now require a `PrimaryKey` instead of a String.

## [1.2.1]

* Changed imports in `substreams.yaml` definition so that packaged `.spkg` can you the expect path `sf/substreams/sink/database/v1` to exclude and generating from the `.spkg` will generate data on the right path.
Expand Down
109 changes: 90 additions & 19 deletions src/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@ impl Tables {
}
}

pub fn create_row<K: AsRef<str>>(&mut self, table: &str, key: K) -> &mut Row {
pub fn create_row(&mut self, table: &str, key: PrimaryKey) -> &mut Row {
let rows = self.tables.entry(table.to_string()).or_insert(Rows::new());
let row = rows
.pks
.entry(PrimaryKey::Single(key.as_ref().to_string()))
.or_insert(Row::new());
let key_debug = format!("{:?}", key);
let row = rows.pks.entry(key).or_insert(Row::new());
match row.operation {
Operation::Unspecified => {
row.operation = Operation::Create;
Expand All @@ -35,20 +33,17 @@ impl Tables {
Operation::Delete => {
panic!(
"cannot create a row after a scheduled delete operation - table: {} key: {}",
table,
key.as_ref().to_string()
table, key_debug,
)
}
}
row
}

pub fn update_row<K: AsRef<str>>(&mut self, table: &str, key: K) -> &mut Row {
pub fn update_row(&mut self, table: &str, key: PrimaryKey) -> &mut Row {
let rows = self.tables.entry(table.to_string()).or_insert(Rows::new());
let row = rows
.pks
.entry(PrimaryKey::Single(key.as_ref().to_string()))
.or_insert(Row::new());
let key_debug = format!("{:?}", key);
let row = rows.pks.entry(key).or_insert(Row::new());
match row.operation {
Operation::Unspecified => {
row.operation = Operation::Update;
Expand All @@ -58,20 +53,16 @@ impl Tables {
Operation::Delete => {
panic!(
"cannot update a row after a scheduled delete operation - table: {} key: {}",
table,
key.as_ref().to_string()
table, key_debug,
)
}
}
row
}

pub fn delete_row<K: AsRef<str>>(&mut self, table: &str, key: K) -> &mut Row {
pub fn delete_row(&mut self, table: &str, key: PrimaryKey) -> &mut Row {
let rows = self.tables.entry(table.to_string()).or_insert(Rows::new());
let row = rows
.pks
.entry(PrimaryKey::Single(key.as_ref().to_string()))
.or_insert(Row::new());
let row = rows.pks.entry(key).or_insert(Row::new());
match row.operation {
Operation::Unspecified => {
row.operation = Operation::Delete;
Expand Down Expand Up @@ -133,6 +124,29 @@ pub enum PrimaryKey {
Composite(BTreeMap<String, String>),
}

impl From<&str> for PrimaryKey {
fn from(x: &str) -> Self {
Self::Single(x.to_string())
}
}

impl From<String> for PrimaryKey {
fn from(x: String) -> Self {
Self::Single(x)
}
}

impl<K: AsRef<str>, const N: usize> From<[(K, String); N]> for PrimaryKey {
fn from(arr: [(K, String); N]) -> Self {
if N == 0 {
return Self::Composite(BTreeMap::new());
}

let string_arr = arr.map(|(k, v)| (k.as_ref().to_string(), v));
Self::Composite(BTreeMap::from(string_arr))
}
}

#[derive(Debug)]
pub struct Rows {
// Map of primary keys within this table, to the fields within
Expand Down Expand Up @@ -255,7 +269,12 @@ impl<T: AsRef<[u8]>> ToDatabaseValue for &Hex<T> {

#[cfg(test)]
mod test {
use crate::pb::database::table_change::PrimaryKey;
use crate::pb::database::CompositePrimaryKey;
use crate::pb::database::{DatabaseChanges, TableChange};
use crate::tables::Tables;
use crate::tables::ToDatabaseValue;
use std::collections::HashMap;

#[test]
fn to_database_value_proto_timestamp() {
Expand All @@ -267,4 +286,56 @@ mod test {
"1970-01-01T01:01:01.000000001Z"
);
}

#[test]
fn create_row_single_pk() {
let mut tables = Tables::new();
tables.create_row("myevent", "myhash".into());

assert_eq!(
tables.to_database_changes(),
DatabaseChanges {
table_changes: [TableChange {
table: "myevent".to_string(),
ordinal: 0,
operation: 1,
fields: [].into(),
primary_key: Some(PrimaryKey::Pk("myhash".to_string())),
}]
.to_vec(),
}
);
}

#[test]
fn create_row_composite_pk() {
let mut tables = Tables::new();
tables.create_row(
"myevent",
[
("evt_tx_hash", "hello".to_string()),
("evt_index", "world".to_string()),
]
.into(),
);

assert_eq!(
tables.to_database_changes(),
DatabaseChanges {
table_changes: [TableChange {
table: "myevent".to_string(),
ordinal: 0,
operation: 1,
fields: [].into(),
primary_key: Some(PrimaryKey::CompositePk(CompositePrimaryKey {
keys: HashMap::from([
("evt_tx_hash".to_string(), "hello".to_string()),
("evt_index".to_string(), "world".to_string())
])
}))
}]
.to_vec(),
}
);
}
}

0 comments on commit 7902be4

Please sign in to comment.