Skip to content
This repository has been archived by the owner on Oct 5, 2024. It is now read-only.

Starting the scaffolding of the redis-based locking mechanism #4

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 12 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,32 @@ edition = "2021"
[dependencies]
async-trait = "^0.1.68"
bytes = "*"
object_store = "^0.5.0"
object_store = "0.9.0"
futures = "*"
thiserror = "1"
tokio = { version = "^1.25.0", default-features = false}
tracing = { version = "0.1.40", features = ["log"] }
serde = { version = "1", features = [ "derive" ]}
url = { version = "2.5.0", features = ["serde"] }

# dynamodb feature
#dynamodb_lock = { version = "^0.5.0", optional = true }
dynamodb_lock = { git = "https://github.com/delta-incubator/dynamodb-lock-rs", branch = "web-identity", optional = true }
dynamodb_lock = { version = "^0.6.1", optional = true }

# postres feature
sqlx = {version = "^0.6.3", default-features = false, optional = true }

[dev-dependencies]
object_store = { version = "^0.5.0", features = ["aws"]}
# redis features
rslock = { version = "0.3.0", default-features = false, features = ["tokio-comp"], optional = true}

[features]
default = []
default = ["redis", "integration-test"]
# Integration test is a feature that just requires docker-compose to be running
integration-test = []
dynamodb = ["dynamodb_lock"]
postgres = ["sqlx/postgres"]
redis = [
"dep:rslock",
]
tokio-native-tls = ["sqlx/runtime-tokio-native-tls"]
tokio-rustls = ["sqlx/runtime-tokio-rustls"]
actix-native-tls = ["sqlx/runtime-actix-native-tls"]
Expand Down
22 changes: 22 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
version: "3.9"
services:
redis:
image: redis
ports:
- 6380:6380
localstack:
image: localstack/localstack:0.14
ports:
- 4566:4566
- 8080:8080
environment:
- SERVICES=s3,dynamodb
- DEBUG=1
- DATA_DIR=/tmp/localstack/data
- PORT_WEB_UI=8080
- DOCKER_HOST=unix:///var/run/docker.sock
- HOST_TMP_FOLDER=${TMPDIR}
- AWS_ACCESS_KEY_ID=deltalake
- AWS_SECRET_ACCESS_KEY=weloverust
healthcheck:
test: [ "CMD", "curl", "-f", "http://localhost:4566/health" ]
5 changes: 2 additions & 3 deletions src/dynamodb.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
/*
* This module defines the DynamoDB backed object_store
*/
//!
//! This module defines the DynamoDB backed object_store

use dynamodb_lock::DynamoDbLockClient;
pub use dynamodb_lock::DynamoDbOptions;
Expand Down
101 changes: 81 additions & 20 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,44 @@ mod errors;
pub mod dynamodb;
#[cfg(feature = "postgres")]
pub mod postgres;
#[cfg(feature = "redis")]
pub mod redis;

use crate::errors::LockedObjectStoreError;
use async_trait::async_trait;
use object_store::{GetResult, ListResult, MultipartId, ObjectMeta, ObjectStore, Result};
use serde::{Deserialize, Serialize};
use std::env;
use std::fmt::Debug;
use std::sync::Arc;

///
/// The `LockedObjectStore` provides the wrapper needed to lock on specific [ObjectStore]
/// operations
///
#[derive(Clone, Debug)]
pub struct LockedObjectStore<T> {
inner: Arc<dyn ObjectStore>,
lock: Arc<dyn LockClient<T>>,
}

///
/// Definitions of operations on [LockedObjectStore] Which can be locked
///
#[derive(Debug)]
pub enum LockOperations {
Copy,
CopyIfNotExists,
Delete,
Put,
Rename,
RenameIfNotExists,
}

/// A lock that has been successfully acquired
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct LockItem<T> {
/// The name of the owner that owns this lock.
pub owner_name: String,
/// Current version number of the lock in DynamoDB. This is what tells the lock client
/// when the lock is stale.
pub record_version_number: String,
/// The amount of time (in seconds) that the owner has this lock for.
/// If lease_duration is None then the lock is non-expirable.
pub lease_duration: Option<u64>,
Expand All @@ -34,40 +58,77 @@ pub struct LockItem<T> {
pub is_non_acquirable: bool,
}

impl<T> Default for LockItem<T> {
fn default() -> Self {
let owner_name = env::var("LOCK_OWNER_NAME").unwrap_or("locking-object-store".into());
let lease_duration = env::var("LOCK_LEASE_SECONDS").unwrap_or("20".into());

Self {
owner_name,
acquired_expired_lock: false,
is_non_acquirable: false,
is_released: false,
// TODO bring lease_duration through
lease_duration: None,
data: None,
lookup_time: 0,
}
}
}

impl<T> LockItem<T> {
///
/// Create a standard [LockItem] with default parameters set by environment variables
fn from(data: T) -> Self {
let mut item = LockItem::default();
item.data = Some(data);
item
}
}

/// Abstraction over a distributive lock provider
#[async_trait]
pub trait LockClient: Send + Sync + Debug {
#[async_trait::async_trait]
pub trait LockClient<T: Serialize + Send>: Send + Sync + Debug {
/// Attempts to acquire lock for data. If successful, returns the lock.
/// Otherwise returns [`Option::None`] which is retryable action.
/// Visit implementation docs for more details.
async fn try_acquire_lock<T: Serialize + Send>(
async fn try_acquire_lock(
&self,
data: T,
) -> Result<Option<LockItem<T>>, LockedObjectStoreError>;
) -> Result<Option<LockItem<T>>, LockedObjectStoreError>
where
T: 'async_trait;

/// Returns current lock for data (if any).
// the original implementation of this was returning the top lock in the system, since our lock
// is unique based on data, we require that same piece of data in order to return the current lock
// again though, this is just lock information - it does not mean you're safe to act on the lock
async fn get_lock<T: Serialize + Send>(
&self,
data: T,
) -> Result<Option<LockItem<T>>, LockedObjectStoreError>;
async fn get_lock(&self, data: T) -> Result<Option<LockItem<T>>, LockedObjectStoreError>
where
T: 'async_trait;

/// Update data in the upstream lock of the current user still has it.
/// The returned lock will have a new `rvn` so it'll increase the lease duration
/// as this method is usually called when the work with a lock is extended.
async fn update_data<T: Serialize>(
&self,
lock: &LockItem<T>,
) -> Result<LockItem<T>, LockedObjectStoreError>;
async fn update_data(&self, lock: &LockItem<T>) -> Result<LockItem<T>, LockedObjectStoreError>
where
T: 'async_trait;

/// Releases the given lock if the current user still has it, returning true if the lock was
/// successfully released, and false if someone else already stole the lock
async fn release_lock<T: Serialize>(
&self,
lock: &LockItem<T>,
) -> Result<bool, LockedObjectStoreError>;
async fn release_lock(&self, lock: &LockItem<T>) -> Result<bool, LockedObjectStoreError>
where
T: 'async_trait;
}

pub const DEFAULT_MAX_RETRY_ACQUIRE_LOCK_ATTEMPTS: u32 = 100;

#[cfg(test)]
mod tests {
use super::*;

#[test]
fn lock_item_detault() {
let _item = LockItem::<&str>::default();
}
}
87 changes: 87 additions & 0 deletions src/redis.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
//!
//! The redis module uses redis hash functions to implement a locking mechanism

use crate::errors::LockedObjectStoreError;
use crate::{LockClient, LockItem};
use serde::Serialize;

///
/// A Redis-based lock, which relies on the key functionality in redis
///
/// ```rust
/// # use locking_object_store::redis::*;
/// let manager = rslock::LockManager::new(vec!["redis://127.0.0.1"]);
/// let lock = RedisLock::with(manager);
/// ````
#[derive(Clone, Debug)]
pub struct RedisLock {
manager: rslock::LockManager,
}

impl RedisLock {
pub fn with(manager: rslock::LockManager) -> Self {
Self { manager }
}
}

#[async_trait::async_trait]
impl<T: Send + Serialize> LockClient<T> for RedisLock {
async fn try_acquire_lock(&self, data: T) -> Result<Option<LockItem<T>>, LockedObjectStoreError>
where
T: 'async_trait,
{
let item = LockItem::from(data);
Ok(Some(item))
}

/// Returns current lock for data (if any).
// the original implementation of this was returning the top lock in the system, since our lock
// is unique based on data, we require that same piece of data in order to return the current lock
// again though, this is just lock information - it does not mean you're safe to act on the lock
async fn get_lock(&self, data: T) -> Result<Option<LockItem<T>>, LockedObjectStoreError>
where
T: 'async_trait,
{
todo!()
}

/// Update data in the upstream lock of the current user still has it.
/// The returned lock will have a new `rvn` so it'll increase the lease duration
/// as this method is usually called when the work with a lock is extended.
async fn update_data(&self, lock: &LockItem<T>) -> Result<LockItem<T>, LockedObjectStoreError>
where
T: 'async_trait,
{
todo!()
}

/// Releases the given lock if the current user still has it, returning true if the lock was
/// successfully released, and false if someone else already stole the lock
async fn release_lock(&self, lock: &LockItem<T>) -> Result<bool, LockedObjectStoreError>
where
T: 'async_trait,
{
todo!()
}
}

#[cfg(test)]
mod tests {
use super::*;

#[cfg(feature = "integration-test")]
mod integration_tests {
use super::*;

#[tokio::test]
async fn test_construct() -> Result<(), crate::errors::LockedObjectStoreError> {
let manager = rslock::LockManager::new(vec!["redis://127.0.0.1/"]);
let lock = RedisLock::with(manager);
let data = "hello rust";
let result = lock.try_acquire_lock(data).await?;
assert!(result.is_some(), "Should receive a valid lock item");

Ok(())
}
}
}
Loading