Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/table row support composite keys #6

Merged
merged 3 commits into from
Oct 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,19 @@ All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [1.3.0]

Better support for using composite primary keys:

* New enum in this crate: `tables::PrimaryKey`
* `Single(String)`: `let single: PrimaryKey = "hello world".into()`
* `Composite(BTreeMap<String, String>)`: `let composite: PrimaryKey = [("evt_tx_hash","hello".to_string()),("evt_index","world".to_string())].into()`

Breaking changes:

* The `Rows.pks` field is not public anymore.
* `create_row()`, `update_row()` and `delete_row()` now require a `PrimaryKey` instead of a `String`. This should work directly with a `String`, `&String` or `&str`.

## [1.2.1]

* Changed imports in `substreams.yaml` definition so that packaged `.spkg` can you the expect path `sf/substreams/sink/database/v1` to exclude and generating from the `.spkg` will generate data on the right path.
Expand Down
158 changes: 136 additions & 22 deletions src/tables.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::pb::database::{table_change::Operation, DatabaseChanges, Field, TableChange};
use std::collections::HashMap;
use std::collections::{BTreeMap, HashMap};
use substreams::{
scalar::{BigDecimal, BigInt},
Hex,
Expand All @@ -18,12 +18,11 @@ impl Tables {
}
}

pub fn create_row<K: AsRef<str>>(&mut self, table: &str, key: K) -> &mut Row {
pub fn create_row<K: Into<PrimaryKey>>(&mut self, table: &str, key: K) -> &mut Row {
let rows = self.tables.entry(table.to_string()).or_insert(Rows::new());
let row = rows
.pks
.entry(key.as_ref().to_string())
.or_insert(Row::new());
let k = key.into();
let key_debug = format!("{:?}", k);
let row = rows.pks.entry(k).or_insert(Row::new());
match row.operation {
Operation::Unspecified => {
row.operation = Operation::Create;
Expand All @@ -35,20 +34,18 @@ impl Tables {
Operation::Delete => {
panic!(
"cannot create a row after a scheduled delete operation - table: {} key: {}",
table,
key.as_ref().to_string()
table, key_debug,
)
}
}
row
}

pub fn update_row<K: AsRef<str>>(&mut self, table: &str, key: K) -> &mut Row {
pub fn update_row<K: Into<PrimaryKey>>(&mut self, table: &str, key: K) -> &mut Row {
let rows = self.tables.entry(table.to_string()).or_insert(Rows::new());
let row = rows
.pks
.entry(key.as_ref().to_string())
.or_insert(Row::new());
let k = key.into();
let key_debug = format!("{:?}", k);
let row = rows.pks.entry(k).or_insert(Row::new());
match row.operation {
Operation::Unspecified => {
row.operation = Operation::Update;
Expand All @@ -58,20 +55,16 @@ impl Tables {
Operation::Delete => {
panic!(
"cannot update a row after a scheduled delete operation - table: {} key: {}",
table,
key.as_ref().to_string()
table, key_debug,
)
}
}
row
}

pub fn delete_row<K: AsRef<str>>(&mut self, table: &str, key: K) -> &mut Row {
pub fn delete_row<K: Into<PrimaryKey>>(&mut self, table: &str, key: PrimaryKey) -> &mut Row {
let rows = self.tables.entry(table.to_string()).or_insert(Rows::new());
let row = rows
.pks
.entry(key.as_ref().to_string())
.or_insert(Row::new());
let row = rows.pks.entry(key.into()).or_insert(Row::new());
match row.operation {
Operation::Unspecified => {
row.operation = Operation::Delete;
Expand Down Expand Up @@ -101,7 +94,16 @@ impl Tables {
continue;
}

let mut change = TableChange::new(table.clone(), pk, 0, row.operation);
let mut change = match pk {
PrimaryKey::Single(pk) => TableChange::new(table.clone(), pk, 0, row.operation),
PrimaryKey::Composite(keys) => TableChange::new_composite(
table.clone(),
keys.into_iter().collect(),
0,
row.operation,
),
};

for (field, value) in row.columns.into_iter() {
change.fields.push(Field {
name: field,
Expand All @@ -118,10 +120,45 @@ impl Tables {
}
}

#[derive(Hash, Debug, Eq, PartialEq)]
pub enum PrimaryKey {
Single(String),
Composite(BTreeMap<String, String>),
}

impl From<&str> for PrimaryKey {
fn from(x: &str) -> Self {
Self::Single(x.to_string())
}
}

impl From<&String> for PrimaryKey {
fn from(x: &String) -> Self {
Self::Single(x.clone())
}
}

impl From<String> for PrimaryKey {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You got a problem with AsRef<str>?

fn from(x: String) -> Self {
Self::Single(x)
}
}

impl<K: AsRef<str>, const N: usize> From<[(K, String); N]> for PrimaryKey {
fn from(arr: [(K, String); N]) -> Self {
if N == 0 {
return Self::Composite(BTreeMap::new());
}

let string_arr = arr.map(|(k, v)| (k.as_ref().to_string(), v));
Self::Composite(BTreeMap::from(string_arr))
}
}

#[derive(Debug)]
pub struct Rows {
// Map of primary keys within this table, to the fields within
pub pks: HashMap<String, Row>,
pks: HashMap<PrimaryKey, Row>,
}

impl Rows {
Expand Down Expand Up @@ -240,7 +277,13 @@ impl<T: AsRef<[u8]>> ToDatabaseValue for &Hex<T> {

#[cfg(test)]
mod test {
use crate::pb::database::table_change::PrimaryKey;
use crate::pb::database::CompositePrimaryKey;
use crate::pb::database::{DatabaseChanges, TableChange};
use crate::tables::PrimaryKey as TablesPrimaryKey;
use crate::tables::Tables;
use crate::tables::ToDatabaseValue;
use std::collections::HashMap;

#[test]
fn to_database_value_proto_timestamp() {
Expand All @@ -252,4 +295,75 @@ mod test {
"1970-01-01T01:01:01.000000001Z"
);
}

#[test]
fn create_row_single_pk_direct() {
let mut tables = Tables::new();
tables.create_row("myevent", TablesPrimaryKey::Single("myhash".to_string()));

assert_eq!(
tables.to_database_changes(),
DatabaseChanges {
table_changes: [TableChange {
table: "myevent".to_string(),
ordinal: 0,
operation: 1,
fields: [].into(),
primary_key: Some(PrimaryKey::Pk("myhash".to_string())),
}]
.to_vec(),
}
);
}

#[test]
fn create_row_single_pk() {
let mut tables = Tables::new();
tables.create_row("myevent", "myhash");

assert_eq!(
tables.to_database_changes(),
DatabaseChanges {
table_changes: [TableChange {
table: "myevent".to_string(),
ordinal: 0,
operation: 1,
fields: [].into(),
primary_key: Some(PrimaryKey::Pk("myhash".to_string())),
}]
.to_vec(),
}
);
}

#[test]
fn create_row_composite_pk() {
let mut tables = Tables::new();
tables.create_row(
"myevent",
[
("evt_tx_hash", "hello".to_string()),
("evt_index", "world".to_string()),
],
);

assert_eq!(
tables.to_database_changes(),
DatabaseChanges {
table_changes: [TableChange {
table: "myevent".to_string(),
ordinal: 0,
operation: 1,
fields: [].into(),
primary_key: Some(PrimaryKey::CompositePk(CompositePrimaryKey {
keys: HashMap::from([
("evt_tx_hash".to_string(), "hello".to_string()),
("evt_index".to_string(), "world".to_string())
])
}))
}]
.to_vec(),
}
);
}
}
Loading