Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use ADBC for Google BigQuery and update Athena code #84

Merged
merged 5 commits into from
Jan 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions lib/assets/connection_cell/main.js
Original file line number Diff line number Diff line change
Expand Up @@ -691,8 +691,9 @@ export function init(ctx, info) {
const reader = new FileReader();

reader.onload = (res) => {
const value = JSON.parse(res.target.result);
ctx.pushEvent("update_field", { field: "credentials", value });
// Reformat the JSON into a compact form
const value = JSON.stringify(JSON.parse(res.target.result));
ctx.pushEvent("update_field", { field: "credentials_json", value });
};

reader.readAsText(file);
Expand Down
126 changes: 45 additions & 81 deletions lib/kino_db/connection_cell.ex
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ defmodule KinoDB.ConnectionCell do
"database" => attrs["database"] || "",
"project_id" => attrs["project_id"] || "",
"default_dataset_id" => attrs["default_dataset_id"] || "",
"credentials" => attrs["credentials"] || %{},
"credentials_json" => attrs["credentials_json"] || "",
"access_key_id" => attrs["access_key_id"] || "",
"secret_access_key" => secret_access_key,
"use_secret_access_key_secret" =>
Expand Down Expand Up @@ -128,7 +128,7 @@ defmodule KinoDB.ConnectionCell do
~w|database_path|

"bigquery" ->
~w|project_id default_dataset_id credentials|
~w|project_id default_dataset_id credentials_json|

"athena" ->
if fields["use_secret_access_key_secret"],
Expand Down Expand Up @@ -267,6 +267,37 @@ defmodule KinoDB.ConnectionCell do
end
end

defp to_quoted(%{"type" => "bigquery"} = attrs) do
var = quoted_var(attrs["variable"])

opts =
[
driver: :bigquery,
"adbc.bigquery.sql.project_id": attrs["project_id"]
] ++
case attrs["default_dataset_id"] do
"" -> []
dataset_id -> ["adbc.bigquery.sql.dataset_id": dataset_id]
end ++
case attrs["credentials_json"] do
"" ->
[]

credentials_json ->
[
"adbc.bigquery.sql.auth_type": "adbc.bigquery.sql.auth_type.json_credential_string",
"adbc.bigquery.sql.auth_credentials":
{:sigil_S, [delimiter: ~s["""]], [{:<<>>, [], [credentials_json <> "\n"]}, []]}
]
end

quote do
:ok = Adbc.download_driver!(:bigquery)
{:ok, db} = Kino.start_child({Adbc.Database, unquote(opts)})
{:ok, unquote(var)} = Kino.start_child({Adbc.Connection, database: db})
end
end

defp to_quoted(%{"type" => "postgres"} = attrs) do
quote do
opts = unquote(trim_opts(shared_options(attrs) ++ postgres_and_mysql_options(attrs)))
Expand All @@ -291,40 +322,18 @@ defmodule KinoDB.ConnectionCell do
end
end

defp to_quoted(%{"type" => "bigquery"} = attrs) do
goth_opts_block = check_bigquery_credentials(attrs)

conn_block =
quote do
{:ok, _pid} = Kino.start_child({Goth, opts})

unquote(quoted_var(attrs["variable"])) =
Req.new(http_errors: :raise)
|> ReqBigQuery.attach(
goth: ReqBigQuery.Goth,
project_id: unquote(attrs["project_id"]),
default_dataset_id: unquote(attrs["default_dataset_id"])
)

:ok
end

join_quoted([goth_opts_block, conn_block])
end

defp to_quoted(%{"type" => "athena"} = attrs) do
quote do
unquote(quoted_var(attrs["variable"])) =
Req.new(http_errors: :raise)
|> ReqAthena.attach(
format: :explorer,
ReqAthena.new(
access_key_id: unquote(attrs["access_key_id"]),
database: unquote(attrs["database"]),
output_location: unquote(attrs["output_location"]),
region: unquote(attrs["region"]),
secret_access_key: unquote(quoted_access_key(attrs)),
token: unquote(attrs["token"]),
workgroup: unquote(attrs["workgroup"])
workgroup: unquote(attrs["workgroup"]),
http_errors: :raise
)

:ok
Expand Down Expand Up @@ -354,37 +363,6 @@ defmodule KinoDB.ConnectionCell do
end
end

defp check_bigquery_credentials(attrs) do
case attrs["credentials"] do
%{"type" => "service_account"} ->
quote do
credentials = unquote(Macro.escape(attrs["credentials"]))

opts = [
name: ReqBigQuery.Goth,
http_client: &Req.request/1,
source: {:service_account, credentials}
]
end

%{"type" => "authorized_user"} ->
quote do
credentials = unquote(Macro.escape(attrs["credentials"]))

opts = [
name: ReqBigQuery.Goth,
http_client: &Req.request/1,
source: {:refresh_token, credentials}
]
end

_empty_map ->
quote do
opts = [name: ReqBigQuery.Goth, http_client: &Req.request/1]
end
end
end

defp shared_options(attrs) do
opts = [
hostname: attrs["hostname"],
Expand Down Expand Up @@ -516,15 +494,20 @@ defmodule KinoDB.ConnectionCell do
Code.ensure_loaded?(Postgrex) -> "postgres"
Code.ensure_loaded?(MyXQL) -> "mysql"
Code.ensure_loaded?(Exqlite) -> "sqlite"
Code.ensure_loaded?(ReqBigQuery) -> "bigquery"
Code.ensure_loaded?(ReqAthena) -> "athena"
Code.ensure_loaded?(ReqCH) -> "clickhouse"
Code.ensure_loaded?(Adbc) -> "duckdb"
Code.ensure_loaded?(Adbc) -> adbc_default_db_type()
Code.ensure_loaded?(Tds) -> "sqlserver"
true -> "postgres"
end
end

defp adbc_default_db_type() do
drivers = Application.get_env(:adbc, :drivers, [])
driver = Enum.find([:duckdb, :snowflake, :bigquery], :duckdb, &(&1 in drivers))
Atom.to_string(driver)
end

defp missing_dep(%{"type" => "postgres"}) do
unless Code.ensure_loaded?(Postgrex) do
~s/{:postgrex, "~> 0.18"}/
Expand All @@ -543,20 +526,14 @@ defmodule KinoDB.ConnectionCell do
end
end

defp missing_dep(%{"type" => "bigquery"}) do
unless Code.ensure_loaded?(ReqBigQuery) do
~s|{:req_bigquery, "~> 0.1"}|
end
end

defp missing_dep(%{"type" => "athena"}) do
missing_many_deps([
{ReqAthena, ~s|{:req_athena, "~> 0.1"}|},
{ReqAthena, ~s|{:req_athena, "~> 0.3"}|},
{Explorer, ~s|{:explorer, "~> 0.10"}|}
])
end

defp missing_dep(%{"type" => adbc}) when adbc in ~w[snowflake duckdb] do
defp missing_dep(%{"type" => adbc}) when adbc in ~w[duckdb snowflake bigquery] do
unless Code.ensure_loaded?(Adbc) do
~s|{:adbc, "~> 0.3"}|
end
Expand Down Expand Up @@ -585,19 +562,6 @@ defmodule KinoDB.ConnectionCell do
end
end

defp join_quoted(quoted_blocks) do
asts =
Enum.flat_map(quoted_blocks, fn
{:__block__, _meta, nodes} -> nodes
node -> [node]
end)

case asts do
[node] -> node
nodes -> {:__block__, [], nodes}
end
end

defp help_box(%{"type" => "bigquery"}) do
if Code.ensure_loaded?(Mint.HTTP) do
if running_on_google_metadata?() do
Expand Down
84 changes: 34 additions & 50 deletions lib/kino_db/sql_cell.ex
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ defmodule KinoDB.SQLCell do
result_variable: Kino.SmartCell.prefixed_var_name("result", attrs["result_variable"]),
query: query,
timeout: attrs["timeout"],
cache_query: attrs["cache_query"] || true,
cache_query: Map.get(attrs, "cache_query", true),
data_frame_alias: Explorer.DataFrame,
missing_dep: missing_dep(connection)
)
Expand Down Expand Up @@ -165,7 +165,6 @@ defmodule KinoDB.SQLCell do

defp connection_type(connection) when is_struct(connection, Req.Request) do
cond do
Keyword.has_key?(connection.request_steps, :bigquery_run) -> "bigquery"
Keyword.has_key?(connection.request_steps, :athena_run) -> "athena"
Keyword.has_key?(connection.request_steps, :clickhouse_run) -> "clickhouse"
true -> nil
Expand Down Expand Up @@ -220,55 +219,39 @@ defmodule KinoDB.SQLCell do
to_quoted(attrs, quote(do: Tds), fn n -> "@#{n}" end)
end

# query!/4 based that returns a Req response.
defp to_quoted(%{"connection" => %{"type" => "clickhouse"}} = attrs) do
to_quoted_query_req(attrs, quote(do: ReqCH), fn n, inner ->
name =
if String.match?(inner, ~r/[^a-z0-9_]/) do
"param_#{n}"
else
inner
end

"{#{name}:String}"
end)
end

# Explorer-based
defp to_quoted(%{"connection" => %{"type" => "snowflake"}} = attrs) do
to_explorer_quoted(attrs, fn n -> "?#{n}" end)
to_quoted_explorer(attrs, fn n -> "?#{n}" end)
end

defp to_quoted(%{"connection" => %{"type" => "duckdb"}} = attrs) do
to_explorer_quoted(attrs, fn n -> "?#{n}" end)
to_quoted_explorer(attrs, fn n -> "?#{n}" end)
end

# Req-based
defp to_quoted(%{"connection" => %{"type" => "bigquery"}} = attrs) do
to_req_quoted(attrs, fn _n -> "?" end, :bigquery)
to_quoted_explorer(attrs, fn _n -> "?" end)
end

# Req-based
defp to_quoted(%{"connection" => %{"type" => "athena"}} = attrs) do
to_req_quoted(attrs, fn _n -> "?" end, :athena)
to_quoted_req_query(attrs, quote(do: ReqAthena), fn _n -> "?" end)
end

defp to_quoted(_ctx) do
quote do
end
end
defp to_quoted(%{"connection" => %{"type" => "clickhouse"}} = attrs) do
to_quoted_req_query(attrs, quote(do: ReqCH), fn n, inner ->
name =
if String.match?(inner, ~r/[^a-z0-9_]/) do
"param_#{n}"
else
inner
end

defp to_quoted_query_req(attrs, quoted_module, next) do
{query, params} = parameterize(attrs["query"], attrs["connection"]["type"], next)
opts_args = query_opts_args(attrs)
"{#{name}:String}"
end)
end

defp to_quoted(_ctx) do
quote do
unquote(quoted_var(attrs["result_variable"])) =
unquote(quoted_module).query!(
unquote(quoted_var(attrs["connection"]["variable"])),
unquote(quoted_query(query)),
unquote(params),
unquote_splicing(opts_args)
).body
end
end

Expand All @@ -287,32 +270,32 @@ defmodule KinoDB.SQLCell do
end
end

defp to_req_quoted(attrs, next, req_key) do
defp to_quoted_explorer(attrs, next) do
{query, params} = parameterize(attrs["query"], attrs["connection"]["type"], next)
query = {quoted_query(query), params}
opts = query_opts_args(attrs)
req_opts = opts |> Enum.at(0, []) |> Keyword.put(req_key, query)
data_frame_alias = attrs["data_frame_alias"]

quote do
unquote(quoted_var(attrs["result_variable"])) =
Req.post!(
unquote(data_frame_alias).from_query!(
unquote(quoted_var(attrs["connection"]["variable"])),
unquote(req_opts)
).body
unquote(quoted_query(query)),
unquote(params)
)
end
end

defp to_explorer_quoted(attrs, next) do
defp to_quoted_req_query(attrs, quoted_module, next) do
{query, params} = parameterize(attrs["query"], attrs["connection"]["type"], next)
data_frame_alias = attrs["data_frame_alias"]
opts_args = query_opts_args(attrs)

quote do
unquote(quoted_var(attrs["result_variable"])) =
unquote(data_frame_alias).from_query!(
unquote(quoted_module).query!(
unquote(quoted_var(attrs["connection"]["variable"])),
unquote(quoted_query(query)),
unquote(params)
)
unquote(params),
unquote_splicing(opts_args)
).body
end
end

Expand All @@ -333,8 +316,9 @@ defmodule KinoDB.SQLCell do
when timeout != nil and type in @connection_types_with_timeout,
do: [[timeout: timeout * 1000]]

defp query_opts_args(%{"connection" => %{"type" => "athena"}, "cache_query" => cache_query}),
do: [[cache_query: cache_query]]
defp query_opts_args(%{"connection" => %{"type" => "athena"}} = attrs) do
[[format: :explorer] ++ if(attrs["cache_query"], do: [], else: [cache_query: false])]
end

defp query_opts_args(%{"connection" => %{"type" => "clickhouse"}}),
do: [[format: :explorer]]
Expand Down Expand Up @@ -418,7 +402,7 @@ defmodule KinoDB.SQLCell do
end
end

defp missing_dep(%{type: adbc}) when adbc in ~w[snowflake duckdb] do
defp missing_dep(%{type: adbc}) when adbc in ~w[snowflake duckdb bigquery] do
unless Code.ensure_loaded?(Explorer) do
~s|{:explorer, "~> 0.10"}|
end
Expand Down
3 changes: 1 addition & 2 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,7 @@ defmodule KinoDB.MixProject do
{:explorer, "~> 0.10", optional: true},

# Those dependecies are new, so we use stricter versions
{:req_bigquery, "~> 0.1.0", optional: true},
{:req_athena, "~> 0.2.0", optional: true},
{:req_athena, "~> 0.3.0", optional: true},
{:req_ch, "~> 0.1.0", optional: true},

# Dev only
Expand Down
Loading
Loading