Skip to content

Commit

Permalink
Add append function
Browse files Browse the repository at this point in the history
  • Loading branch information
yangcancai committed Apr 7, 2022
1 parent ae3289a commit 760f3f8
Show file tree
Hide file tree
Showing 6 changed files with 639 additions and 111 deletions.
339 changes: 334 additions & 5 deletions crates/big_data/core/src/big_data.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,62 @@
use ordermap::set::OrderSet;
use std::cmp::Ordering;

use anyhow::Error;
use std::collections::{BTreeMap, HashMap};
use std::ops::Add;
use std::ops::AddAssign;
use std::ops::Bound::Included;
pub type R<T> = anyhow::Result<T>;
#[cfg(not(feature = "nif"))]
use erlang_term::RawTerm;

pub struct Time(pub u128);

#[derive(Debug, Clone)]
pub struct RowOption {
pub t: Option<String>,
pub max_len: Option<i64>,
pub update: Option<String>,
// The replace cond
pub replace_cond: Option<i64>,
}
impl From<RowTerm> for RowOption {
fn from(r: RowTerm) -> Self {
let mut rs = RowOption::new();
if let RowTerm::List(list) = r {
for option in list {
if let RowTerm::Tuple(option) = option {
if let RowTerm::Atom(atom) = &option[0] {
match atom.as_str() {
"type" => {
if let RowTerm::Atom(t) = &option[1] {
rs.set_t(t.clone());
}
}
"max_len" => {
if let RowTerm::Integer(max) = &option[1] {
rs.set_max_len(*max);
}
}
"update" => {
if let RowTerm::Atom(update) = &option[1] {
rs.set_update(update.clone());
}
}
"replace_cond" => {
if let RowTerm::Integer(replace_cond) = &option[1] {
rs.set_replace_cond(*replace_cond);
}
}
_ => {}
}
}
}
}
}
rs
}
}
#[derive(Debug, Clone)]
pub struct RowData {
// id
Expand All @@ -24,6 +73,17 @@ pub struct BigData {
// BTreeMap to range sort rows
// The same time may be have mutiple RowId
time_index: BTreeMap<u128, Box<OrderSet<String>>>,
// 0 -> default
// Pos :: neg_integer() | string()
// eg. current pos 5, and parent pos ..1
// format: [
// {Pos :: neg_integer(), [
// {type, Type :: list | pos_integer()},
// {max_len, Max :: pos_integer()}]},
// {update, Compare :: gt | lt},
// {replace_cond, pos_integer()}
// ]
options: HashMap<i32, RowTerm>,
}
/// RowTerm is an enum that covers all the Erlang / Elixir term types that can be stored in
/// a BigData.
Expand Down Expand Up @@ -62,7 +122,34 @@ pub enum RowTerm {
Bin(Vec<u8>),
Float(f64),
Map(Vec<(RowTerm, RowTerm)>),
Other(RawTerm)
Other(RawTerm),
}
impl Default for RowOption {
fn default() -> Self {
RowOption::new()
}
}
impl RowOption {
pub fn new() -> Self {
Self {
max_len: None,
t: None,
replace_cond: None,
update: None,
}
}
pub fn set_max_len(&mut self, max: i64) {
self.max_len = Some(max);
}
pub fn set_t(&mut self, t: String) {
self.t = Some(t);
}
pub fn set_update(&mut self, update: String) {
self.update = Some(update);
}
pub fn set_replace_cond(&mut self, replace_cond: i64) {
self.replace_cond = Some(replace_cond);
}
}
impl RowData {
pub fn new(row_id: &str, row_term: RowTerm, time: u128) -> Self {
Expand Down Expand Up @@ -91,8 +178,242 @@ impl BigData {
BigData {
rows: HashMap::new(),
time_index: BTreeMap::new(),
options: HashMap::new(),
}
}
/// Append RowData to the BigData
///
/// # Example
///
///
///
pub fn append(&mut self, row_data: RowData, option: RowTerm) -> R<()> {
let time = row_data.time;
let row_id = row_data.row_id.clone();
match self.rows.get_mut(&row_id) {
// first insert
None => {
self.rows.insert(row_id.clone(), row_data);
}
// second insert
Some(r) => {
let old_time = r.time;
// clear old time_index
if let Some(set) = self.time_index.get_mut(&old_time) {
set.remove(&row_id);
}
let rs = match &option {
// If option is equal integer, then fetch option from options
RowTerm::Integer(index) => {
let index = *index as i32;
match self.options.get(&index) {
Some(inner) => BigData::process_append(r, &row_data, inner.clone()),
_ => Ok(()),
}
}
_ => BigData::process_append(r, &row_data, option),
};
match rs {
Ok(()) => {}
Err(e) => return Err(e),
}
}
}

// insert time index
if let Some(set) = self.time_index.get_mut(&time) {
set.insert(row_id);
} else {
let mut set = OrderSet::new();
set.insert(row_id);
self.time_index.insert(time, Box::new(set));
}
Ok(())
}

/// Process append, this is private function
///
///
fn process_append(rs: &mut RowData, new: &RowData, option: RowTerm) -> R<()> {
// Supported options
let mut filter = HashMap::new();
match option {
RowTerm::List(option) => {
// [{pos,option}]
for option in option {
match option {
RowTerm::Tuple(option) => {
// which element will be process with this option
let mut u = 0usize;
let pos = match &option[u] {
RowTerm::Integer(pos) => *pos as usize,
_ => return Err(Error::msg("Append option format error")),
};
u += 1;
let option: RowTerm = option[u].clone();
let option: RowOption = option.into();
// update
BigData::process_append_update(rs, new, pos, &option);
// replace_cond
// must before max_len
BigData::process_append_replace_cond(rs, new, pos, &option);
// max_len
BigData::process_append_max_len(rs, new, pos, &option);
filter.insert(pos, 1);
}
_ => return Err(Error::msg("Append option format error")),
}
}
}
_ => return Err(Error::msg("Append option format error")),
}
if let (RowTerm::Tuple(inner), RowTerm::Tuple(other)) = (&mut rs.term, &new.term) {
for i in 0..other.len() {
// the pos option not included
if filter.get(&i).is_none() {
if inner.len() > i {
inner[i] = other[i].clone();
} else {
// new elem append
inner.push(other[i].clone());
}
}
}
}
Ok(())
}

///
/// Process replace_cond
///
#[allow(clippy::all)]
fn process_append_replace_cond(
rs: &mut RowData,
new: &RowData,
pos: usize,
option: &RowOption,
) {
if let Some(replace_cond) = option.replace_cond {
let replace_cond = replace_cond as usize;
if let Some(t) = &option.t {
match (t.as_str(), &mut rs.term, &new.term) {
("list", RowTerm::Tuple(tuple), RowTerm::Tuple(new_tuple)) => {
if tuple.len() > pos && new_tuple.len() > pos {
match (&mut tuple[pos], &new_tuple[pos]) {
(RowTerm::List(elem_list), RowTerm::Tuple(elem)) => {
BigData::process_replace_elem(elem_list, replace_cond, elem);
}
(RowTerm::List(elem_list), RowTerm::List(elem)) => {
for elem in elem {
match elem {
RowTerm::Tuple(elem) => {
BigData::process_replace_elem(
elem_list,
replace_cond,
elem,
);
}
_ => {}
}
}
}
_ => {}
}
}
}
_ => {}
}
}
}
}
#[allow(clippy::needless_range_loop, clippy::ptr_arg)]
fn process_replace_elem(
elem_list: &mut Vec<RowTerm>,
replace_cond: usize,
elem_tuple: &Vec<RowTerm>,
) {
let mut replaced = false;
for i in 0..elem_list.len() {
let item = &mut elem_list[i];
if let RowTerm::Tuple(item_tuple) = item {
if item_tuple.len() > replace_cond
&& elem_tuple.len() > replace_cond
&& item_tuple[replace_cond] == elem_tuple[replace_cond]
{
*item = RowTerm::Tuple((*elem_tuple).clone());
replaced = true;
break;
}
}
}
if !replaced {
elem_list.push(RowTerm::Tuple((*elem_tuple).clone()));
}
}
///
/// Process update option
///
fn process_append_update(rs: &mut RowData, new: &RowData, pos: usize, option: &RowOption) {
if let Some(update) = &option.update {
match (&mut rs.term, &new.term, update.as_str()) {
(RowTerm::Tuple(tuple), RowTerm::Tuple(new_tuple), "gt") => {
if new_tuple.len() > pos && tuple.len() > pos && new_tuple[pos] > tuple[pos] {
tuple[pos] = new_tuple[pos].clone();
}
}
(RowTerm::Tuple(tuple), RowTerm::Tuple(new_tuple), "lt") => {
if new_tuple.len() > pos && tuple.len() > pos && new_tuple[pos] < tuple[pos] {
tuple[pos] = new_tuple[pos].clone();
}
}
(a, b, "gt") => {
if b > a {
*a = b.clone();
}
}

(a, b, "lt") => {
if b < a {
*a = b.clone();
}
}
_ => {}
}
}
}
///
/// list element max_len
///
fn process_append_max_len(rs: &mut RowData, new: &RowData, pos: usize, option: &RowOption) {
if let Some(max) = option.max_len {
if let Some(t) = &option.t {
if let "list" = t.as_str() {
if let RowTerm::Tuple(list) = &mut rs.term {
if let RowTerm::Tuple(new_list) = &new.term {
if new_list.len() > pos && list.len() > pos {
if let RowTerm::List(list) = &mut list[pos] {
if let RowTerm::List(new_list) = &new_list[pos] {
// If replace_cond not None,
// New elem will be replace with
// process_append_replace_cond function
if option.replace_cond.is_none() {
let mut temp = (*new_list).clone();
list.append(&mut temp);
}
if list.len() > max as usize {
let new_pos = list.len() - max as usize;
list.drain(..new_pos);
}
}
}
}
}
}
}
}
}
}
///
/// Insert RowData to the BigData
///
/// # Examples
Expand Down Expand Up @@ -714,7 +1035,15 @@ impl AddAssign for RowTerm {
}
}
}

impl PartialOrd for RowTerm {
fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
match (self, other) {
(RowTerm::Integer(inner), RowTerm::Integer(i)) => inner.partial_cmp(i),
(RowTerm::Float(inner), RowTerm::Float(i)) => inner.partial_cmp(i),
_ => None,
}
}
}
impl PartialEq for RowTerm {
fn eq(&self, other: &RowTerm) -> bool {
match self {
Expand Down Expand Up @@ -785,10 +1114,10 @@ impl PartialEq for RowTerm {
_ => false,
},
#[cfg(not(feature = "nif"))]
RowTerm::Other(self_inner) => match other{
RowTerm::Other(self_inner) => match other {
RowTerm::Other(inner) => self_inner == inner,
_ => false
}
_ => false,
},
}
}
}
Loading

0 comments on commit 760f3f8

Please sign in to comment.