Skip to content

Commit

Permalink
Add initial SQL support
Browse files Browse the repository at this point in the history
Additionally, this commit introduces(a.k.a. mixes) a number of changes.

1. `TabletClient::scope` to namespace keys, and `KvClient` to unify kv
   operations. Resolves #16.
2. Use `Status::details` to carry `BatchError`. We probably should
   enhance it to full featured grpc compatible solution. Resolves #17.

Resolves #24.
  • Loading branch information
kezhuw committed Dec 11, 2024
1 parent 9336944 commit 8003c86
Show file tree
Hide file tree
Showing 47 changed files with 6,535 additions and 798 deletions.
25 changes: 21 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,9 @@ documentation = "https://docs.rs/seamdb"

[dependencies]
anyhow = { version = "1.0", features = ["backtrace"] }
async-trait = "0.1.72"
async-trait = "0.1.81"
bytesize = "1.2.0"
compact_str = "0.7.1"
derivative = "2.2.0"
hashlink = "0.8.3"
rdkafka = "0.34.0"
tokio = {version = "1.30.0", features = ["full"]}
Expand All @@ -55,20 +54,38 @@ bytemuck = { version = "1.13.1", features = ["derive"] }
futures = "0.3.28"
thiserror = "1.0.48"
tokio-stream = "0.1.15"
asyncs = "0.3.0"
datafusion = "43.0.0"
pgwire = "0.27.0"
derive-where = "1.2.7"
asyncs = { version = "0.3.0", features = ["tokio"] }
async-io = "2.3.4"
bytes = "1.7.2"
pg_query = "5.1.1"
lazy_static = "1.5.0"
lazy-init = "0.5.1"
enum_dispatch = "0.3.13"
jiff = "0.1.15"
clap = { version = "4.5.23", features = ["derive"] }
tracing-appender = "0.2.3"
tracing-subscriber = { version = "0.3.19", features = ["tracing-log", "env-filter", "std"] }

[dev-dependencies]
assertor = "0.0.2"
asyncs = { version = "0.3.0", features = ["test", "tokio"] }
env_logger = "0.10.0"
env_logger = "0.11.5"
serial_test = "2.0.0"
speculoos = "0.11.0"
test-case = "3.1.0"
test-log = "0.2.12"
testcontainers = "0.14.0"
tracing-test = "0.2.4"

[profile.dev]
lto = "thin"

[profile.release]
lto = "thin"

[workspace]
members = ["src/protos/build"]

Expand Down
6 changes: 5 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,11 @@ lint:
cargo clippy --no-deps -- -D clippy::all

build:
cargo build --tests
cargo build --tests --bins
release:
cargo build --tests --bins --release

test:
cargo test
clean:
cargo clean
104 changes: 104 additions & 0 deletions src/bin/seamdbd.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Copyright 2023 The SeamDB Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::net::SocketAddr;
use std::time::Duration;

use anyhow::{anyhow, Result};
use clap::Parser;
use pgwire::tokio::process_socket;
use seamdb::cluster::{ClusterEnv, EtcdClusterMetaDaemon, EtcdNodeRegistry, NodeId};
use seamdb::endpoint::{Endpoint, Params, ServiceUri};
use seamdb::log::{KafkaLogFactory, LogManager, MemoryLogFactory};
use seamdb::protos::TableDescriptor;
use seamdb::sql::postgres::PostgresqlHandlerFactory;
use seamdb::tablet::{TabletClient, TabletNode};
use tokio::net::{TcpListener, TcpStream};
use tracing::{info, instrument};
use tracing_subscriber::prelude::*;
use tracing_subscriber::{fmt, EnvFilter};

async fn new_log_manager(uri: ServiceUri<'_>) -> Result<LogManager> {
match uri.scheme() {
"memory" => LogManager::new(MemoryLogFactory::new(), &MemoryLogFactory::ENDPOINT, &Params::default()).await,
"kafka" => LogManager::new(KafkaLogFactory {}, &uri.endpoint(), uri.params()).await,
scheme => Err(anyhow!("unsupported log schema: {}, supported: memory, kafka", scheme)),
}
}

#[instrument(skip_all, fields(addr = %addr))]
async fn serve_connection(factory: PostgresqlHandlerFactory, stream: TcpStream, addr: SocketAddr) {
match process_socket(stream, None, factory).await {
Ok(_) => info!("connection terminated"),
Err(err) => info!("connection terminated: {err}"),
}
}

#[derive(Parser, Debug)]
#[command(version, about, long_about = None)]
pub struct Args {
/// Meta cluster uri to store cluster wide metadata, e.g. etcd://etcd-cluster/scope.
#[arg(long = "cluster.uri")]
cluster_uri: String,
/// Cluster name.
#[arg(long = "cluster.name", default_value = "seamdb")]
cluster_name: String,
/// Log cluster uri to store WAL logs, e.g. kafka://kafka-cluster.
#[arg(long = "log.uri")]
log_uri: String,
/// Port to serve PostgreSQL compatible SQL statements.
#[arg(long = "sql.postgresql.port", default_value_t = 5432)]
pgsql_port: u16,
}

#[tokio::main]
async fn main() {
let (non_blocking, _guard) = tracing_appender::non_blocking(std::io::stdout());

tracing_subscriber::registry()
.with(fmt::layer().with_writer(non_blocking).with_level(true).with_file(true).with_line_number(true))
.with(EnvFilter::from_default_env())
.init();

let args = Args::parse();
let cluster_uri = ServiceUri::parse(&args.cluster_uri).unwrap();
let log_uri = ServiceUri::parse(&args.log_uri).unwrap();

let node_id = NodeId::new_random();
info!("Starting node {node_id}");

let listener = TcpListener::bind(("127.0.0.1", 0)).await.unwrap();
let address = format!("http://{}", listener.local_addr().unwrap());
let endpoint = Endpoint::try_from(address.as_str()).unwrap();
let (nodes, lease) =
EtcdNodeRegistry::join(cluster_uri.clone(), node_id.clone(), Some(endpoint.to_owned())).await.unwrap();
let log_manager = new_log_manager(log_uri).await.unwrap();
let cluster_env = ClusterEnv::new(log_manager.into(), nodes).with_replicas(1);
let mut cluster_meta_handle =
EtcdClusterMetaDaemon::start(args.cluster_name, cluster_uri.clone(), cluster_env.clone()).await.unwrap();
let descriptor_watcher = cluster_meta_handle.watch_descriptor(None).await.unwrap();
let deployment_watcher = cluster_meta_handle.watch_deployment(None).await.unwrap();
let cluster_env = cluster_env.with_descriptor(descriptor_watcher).with_deployment(deployment_watcher.monitor());
let _node = TabletNode::start(node_id, listener, lease, cluster_env.clone());
let client = TabletClient::new(cluster_env).scope(TableDescriptor::POSTGRESQL_DIALECT_PREFIX);
tokio::time::sleep(Duration::from_secs(20)).await;

let factory = PostgresqlHandlerFactory::new(client);
let listener = TcpListener::bind(format!("0.0.0.0:{}", args.pgsql_port)).await.unwrap();
info!("Listening on {} ...", listener.local_addr().unwrap());
loop {
let (stream, addr) = listener.accept().await.unwrap();
tokio::spawn(serve_connection(factory.clone(), stream, addr));
}
}
18 changes: 17 additions & 1 deletion src/clock.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::ops::{Add, Sub};
use std::sync::Arc;
use std::time::{Duration, SystemTime};

use jiff::Timestamp as JiffTimestamp;
use static_assertions::{assert_impl_all, assert_not_impl_any};

pub use crate::protos::Timestamp;
Expand Down Expand Up @@ -88,7 +89,15 @@ impl SystemTimeClock {

impl std::fmt::Display for Timestamp {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self)
if let Some(sequence) = self.get_txn_sequence() {
return write!(f, "txn-seq-{}", sequence);
}
let ts = JiffTimestamp::new(self.seconds as i64, self.nanoseconds as i32).unwrap();
if self.logical == 0 {
write!(f, "{}", ts)
} else {
write!(f, "{}-{}", ts, self.logical)
}
}
}

Expand All @@ -101,6 +110,13 @@ impl Timestamp {
self.seconds == 0 && self.nanoseconds == 0 && self.logical == 0
}

pub const fn get_txn_sequence(&self) -> Option<u32> {
match self.seconds & 0x8000000000000000 != 0 {
true => Some(self.seconds as u32),
false => None,
}
}

pub const fn txn_sequence(sequence: u32) -> Self {
Self { seconds: 0x8000000000000000 + sequence as u64, nanoseconds: 0, logical: 0 }
}
Expand Down
4 changes: 3 additions & 1 deletion src/cluster/etcd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl EtcdLease {
}
}

pub(super) enum EtcdHelper {}
pub enum EtcdHelper {}

impl EtcdHelper {
pub async fn connect(endpoint: Endpoint<'_>, params: &Params<'_>) -> Result<Client> {
Expand Down Expand Up @@ -129,6 +129,8 @@ pub mod tests {
container: Container<'static, GenericImage>,
}

unsafe impl Send for EtcdContainer {}

impl EtcdContainer {
pub fn uri(&self) -> ServiceUri<'static> {
let cluster = format!("etcd://127.0.0.1:{}", self.container.get_host_port_ipv4(2379));
Expand Down
2 changes: 1 addition & 1 deletion src/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

mod env;
mod etcd;
pub mod etcd;
mod meta;
mod node;

Expand Down
2 changes: 1 addition & 1 deletion src/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -443,7 +443,7 @@ pub struct Params<'a> {
/// Owned version of [Params].
pub type OwnedParams = Params<'static>;

impl<'a> Params<'a> {
impl Params<'_> {
fn new(map: LinkedHashMap<CompactString, CompactString>) -> Self {
Self { map, _marker: std::marker::PhantomData }
}
Expand Down
Loading

0 comments on commit 8003c86

Please sign in to comment.