Skip to content

Commit

Permalink
sql_context init
Browse files Browse the repository at this point in the history
  • Loading branch information
ceyhunkerti committed Oct 2, 2024
1 parent ed18e53 commit f0b2547
Show file tree
Hide file tree
Showing 7 changed files with 250 additions and 0 deletions.
13 changes: 13 additions & 0 deletions lib/explorer/backend/sql_context.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
defmodule Explorer.Backend.SQLContext do
@type t :: struct()
@type c :: Explorer.SQLContext.t()
@type df :: Explorer.DataFrame.t()
@type result(t) :: {:ok, t} | {:error, term()}

@callback register(c, String.t(), df) :: c
@callback unregister(c, String.t()) :: c
@callback execute(c, String.t()) :: result(df)
@callback get_tables(c) :: list(String.t())

def new(ctx), do: %Explorer.SQLContext{ctx: ctx}
end
6 changes: 6 additions & 0 deletions lib/explorer/polars_backend/native.ex
Original file line number Diff line number Diff line change
Expand Up @@ -465,5 +465,11 @@ defmodule Explorer.PolarsBackend.Native do
def message_on_gc(_pid, _payload), do: err()
def is_message_on_gc(_term), do: err()

def sql_context_new(), do: err()
def sql_context_register(_ctx, _name, _df), do: err()
def sql_context_unregister(_ctx, _name), do: err()
def sql_context_execute(_ctx, _query), do: err()
def sql_context_get_tables(_ctx), do: err()

defp err, do: :erlang.nif_error(:nif_not_loaded)
end
37 changes: 37 additions & 0 deletions lib/explorer/polars_backend/sql_context.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
defmodule Explorer.PolarsBackend.SQLContext do
@moduledoc false

defstruct resource: nil

alias Explorer.Native
alias Explorer.PolarsBackend.Native
alias Explorer.PolarsBackend.Shared

@type t :: %__MODULE__{resource: reference()}

@behaviour Explorer.Backend.SQLContext

def new() do
ctx = Native.sql_context_new()
Explorer.Backend.SQLContext.new(ctx)
end

def register(%Explorer.SQLContext{ctx: ctx} = context, name, %Explorer.DataFrame{data: df}) do
Native.sql_context_register(ctx, name, df)
context
end

def unregister(%Explorer.SQLContext{ctx: ctx} = context, name) do
Native.sql_context_unregister(ctx, name)
context
end

def execute(%Explorer.SQLContext{ctx: ctx}, query) do
case Native.sql_context_execute(ctx, query) do
{:ok, polars_ldf} -> Shared.create_dataframe(polars_ldf)
{:error, error} -> {:error, RuntimeError.exception(error)}
end
end

def get_tables(%Explorer.SQLContext{ctx: ctx}), do: Native.sql_context_get_tables(ctx)
end
33 changes: 33 additions & 0 deletions lib/explorer/sql_context.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
defmodule Explorer.SQLContext do
@enforce_keys [:ctx]
defstruct [:ctx]

alias __MODULE__, as: SQLContext

@type t :: %SQLContext{ctx: Explorer.Backend.SQLContext.t()}

alias Explorer.Backend.SQLContext
alias Explorer.Shared

def new(args \\ [], opts \\ []), do: Shared.apply_init(backend(), :new, args, opts)

def register(ctx, name, df, opts \\ []) do
Shared.apply_init(backend(), :register, [ctx, name, df], opts)
end

def unregister(ctx, name, opts \\ []) do
Shared.apply_init(backend(), :unregister, [ctx, name], opts)
end

def execute(ctx, query, opts \\ []) do
Shared.apply_init(backend(), :execute, [ctx, query], opts)
end

def get_tables(ctx, opts \\ []) do
Shared.apply_init(backend(), :get_tables, [ctx], opts)
end

defp backend do
Module.concat([Explorer.Backend.get(), "SQLContext"])
end
end
2 changes: 2 additions & 0 deletions native/explorer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ mod expressions;
mod lazyframe;
mod local_message;
mod series;
mod sql_context;

pub use datatypes::{
ExDataFrame, ExDataFrameRef, ExExpr, ExExprRef, ExLazyFrame, ExLazyFrameRef, ExSeries,
Expand All @@ -33,6 +34,7 @@ pub use datatypes::{
pub use error::ExplorerError;
use expressions::*;
use series::*;
pub use sql_context::*;

mod atoms {
rustler::atoms! {
Expand Down
70 changes: 70 additions & 0 deletions native/explorer/src/sql_context.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
use crate::{ExDataFrame, ExLazyFrame, ExplorerError};
use polars::{prelude::IntoLazy, sql::SQLContext};
use rustler::{NifStruct, Resource, ResourceArc};
use std::sync::{Arc, Mutex};
pub struct ExSQLContextRef(pub Arc<Mutex<SQLContext>>);

#[rustler::resource_impl]
impl Resource for ExSQLContextRef {}

#[derive(NifStruct)]
#[module = "Explorer.PolarsBackend.SQLContext"]
pub struct ExSQLContext {
pub resource: ResourceArc<ExSQLContextRef>,
}

impl ExSQLContextRef {
pub fn new(ctx: SQLContext) -> Self {
Self(Arc::new(Mutex::new(ctx)))
}
}

impl ExSQLContext {
pub fn new(ctx: SQLContext) -> Self {
Self {
resource: ResourceArc::new(ExSQLContextRef::new(ctx)),
}
}

// Function to get a lock on the inner SQLContext
pub fn lock_inner(&self) -> std::sync::MutexGuard<SQLContext> {
self.resource.0.lock().unwrap()
}
}

#[rustler::nif]
fn sql_context_new() -> ExSQLContext {
let ctx = SQLContext::new();
return ExSQLContext::new(ctx);
}

#[rustler::nif]
fn sql_context_register(context: ExSQLContext, name: &str, df: ExDataFrame) {
let mut ctx = context.lock_inner();
let ldf = df.clone_inner().lazy();
ctx.register(name, ldf);
}

#[rustler::nif]
fn sql_context_unregister(context: ExSQLContext, name: &str) {
let mut ctx = context.lock_inner();
ctx.unregister(name);
}

#[rustler::nif]
fn sql_context_execute(context: ExSQLContext, query: &str) -> Result<ExLazyFrame, ExplorerError> {
let mut ctx = context.lock_inner();
match ctx.execute(query) {
Ok(lazy_frame) => Ok(ExLazyFrame::new(lazy_frame)),
Err(e) => Err(ExplorerError::Other(format!(
"Failed to execute query: {}",
e
))),
}
}

#[rustler::nif]
fn sql_context_get_tables(context: ExSQLContext) -> Vec<String> {
let ctx = context.lock_inner();
return ctx.get_tables();
}
89 changes: 89 additions & 0 deletions test/explorer/sql_context_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
defmodule Explorer.SQLContextTest do
use ExUnit.Case, async: true

require Explorer.DataFrame

alias Explorer.DataFrame, as: DF
alias Explorer.SQLContext

describe "execute" do
test "execute without any data frame registered" do
case SQLContext.new()
|> SQLContext.execute("select 1 as column_a union all select 2 as column_a") do
{:ok, result} ->
assert result != nil
assert DF.compute(result) |> DF.to_columns(atom_keys: true) == %{column_a: [1, 2]}

{:error, reason} ->
flunk("SQL query execution failed with reason: #{inspect(reason)}")
end
end

test "execute with registering single data frame" do
df = DF.new(%{column_a: [1, 2, 3]})

case SQLContext.new()
|> SQLContext.register("t1", df)
|> SQLContext.execute(
"select 2 * t.column_a as column_2a from t1 as t where t.column_a < 3"
) do
{:ok, result} ->
assert result != nil
assert DF.compute(result) |> DF.to_columns(atom_keys: true) == %{column_2a: [2, 4]}

{:error, reason} ->
flunk("SQL query execution failed with reason: #{inspect(reason)}")
end
end

test "execute with registering multiple data frames" do
df1 = DF.new(%{column_1a: [1, 2, 3]})

df2 =
DF.new(%{
column_2a: [1, 2, 4],
column_2b: ["a", "b", "c"]
})

case SQLContext.new()
|> SQLContext.register("t1", df1)
|> SQLContext.register("t2", df2)
|> SQLContext.execute(
"select t2.column_2b as col from t1 join t2 on t1.column_1a = t2.column_2a"
) do
{:ok, result} ->
assert result != nil
assert DF.compute(result) |> DF.to_columns(atom_keys: true) == %{col: ["a", "b"]}

{:error, reason} ->
flunk("SQL query execution failed with reason: #{inspect(reason)}")
end
end

test "get_tables get registered tables" do
df = DF.new(%{col: [1]})

tables =
SQLContext.new()
|> SQLContext.register("t1", df)
|> SQLContext.register("t2", df)
|> SQLContext.get_tables()

assert tables == ["t1", "t2"]
end

test "unregister" do
df = DF.new(%{col: [1]})

tables =
SQLContext.new()
|> SQLContext.register("t1", df)
|> SQLContext.register("t2", df)
|> SQLContext.register("t3", df)
|> SQLContext.unregister("t1")
|> SQLContext.get_tables()

assert tables == ["t2", "t3"]
end
end
end

0 comments on commit f0b2547

Please sign in to comment.