Skip to content

Commit

Permalink
Add initial integration with Explorer
Browse files Browse the repository at this point in the history
See: #36
  • Loading branch information
Philip Sampaio committed Aug 22, 2024
1 parent 42f1f61 commit 670a179
Show file tree
Hide file tree
Showing 7 changed files with 237 additions and 101 deletions.
85 changes: 79 additions & 6 deletions lib/req_athena.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,13 @@ defmodule ReqAthena do
@moduledoc """
`Req` plugin for [AWS Athena](https://docs.aws.amazon.com/athena/latest/APIReference/Welcome.html).
ReqAthena makes it easy to make Athena queries. Query results are decoded into the `ReqAthena.Result` struct.
The struct implements the `Table.Reader` protocol and thus can be efficiently traversed by rows or columns.
ReqAthena makes it easy to make Athena queries and save the results into S3 buckets.
By default, `ReqAthena` will save results using the Apache Parquet format, and return a
`Explorer.DataFrame` as a lazy frame pointing to all the partition files. These partitions
are sorted independently, but we cannot guarantee ordering as a whole.
See the limitations in the [`UNLOAD` command docs](https://docs.aws.amazon.com/athena/latest/ug/unload.html#unload-considerations-and-limitations).
"""
require Logger

Expand All @@ -19,8 +24,11 @@ defmodule ReqAthena do
athena
output_location
cache_query
no_explorer
)a

@credential_keys ~w(access_key_id secret_access_key region token)a

defguardp is_empty(value) when value in [nil, ""]

@doc """
Expand All @@ -39,15 +47,23 @@ defmodule ReqAthena do
* `:database` - Required. The AWS Athena database name.
* `:output_location` - Conditional. The S3 URL location to output AWS Athena query results.
Results will be saved as Parquet and loaded with Explorer only if this option is given.
* `:workgroup` - Conditional. The AWS Athena workgroup.
* `:cache_query` - Optional. Forces a non-cached result from AWS Athena.
* `:athena` - Required. The query to execute. It can be a plain sql string or
* `:no_explorer` - Disable output as an Explorer dataframe. Defaults to `nil`, which
enables Explorer integration by default.
* `:athena` - Required. The query to execute. It can be a plain SQL string or
a `{query, params}` tuple, where `query` can contain `?` placeholders and `params`
is a list of corresponding values.
There is a limitation of Athena that requires the `:output_location` to be empty
for every query. So we append "results" to the `:output_location`, so the partition
files are saved there.
Conditional fields must always be defined, and can be one of the fields or both.
If you want to set any of these options when attaching the plugin, pass them as the second argument.
Expand Down Expand Up @@ -163,6 +179,18 @@ defmodule ReqAthena do
%{WorkGroup: workgroup, ResultConfiguration: %{OutputLocation: output}}
end

query =
if not (!!request.options[:no_explorer]) and is_binary(request.options[:output_location]) do
ReqAthena.Query.with_unload(
query,
# We need to add this "subdirectory" because Athena expects the results directory
# to be empty.
to: Path.join(request.options[:output_location], "results")
)
else
query
end

body =
Map.merge(output_config, %{
QueryExecutionContext: %{Database: fetch_option!(request, :database)},
Expand Down Expand Up @@ -205,12 +233,59 @@ defmodule ReqAthena do
execute_prepared_query(request)

{"GetQueryResults", _} ->
decode_result(request, response)
if ReqAthena.Query.is_select(query) and not is_nil(query.unload) do
build_explorer_lazy_frame(request, response)
else
decode_result(request, response)
end
end
end

defp handle_athena_result(request_response), do: request_response

defp build_explorer_lazy_frame(request, response) do
body = Jason.decode!(response.body)

result =
if Map.has_key?(body, "ResultSet") do
manifest_csv_location =
Request.get_private(request, :athena_output_location) <> "-manifest.csv"

aws_credentials =
for key <- @credential_keys,
value = request.options[key],
not is_nil(value),
do: {key, value}

# This private field is only meant to be used in tests.
fetcher_and_builder =
Request.get_private(request, :athena_dataframe_builder, &fetch_and_build_dataframe/2)

fetcher_and_builder.(manifest_csv_location, aws_credentials)
else
body
end

Request.halt(request, %{response | body: result})
end

@doc false
def fetch_and_build_dataframe(manifest_csv_location, aws_credentials) do
# TODO: Should we handle errors here?
manifest_df =
Explorer.DataFrame.from_csv!(manifest_csv_location,
header: false,
config: aws_credentials
)

manifest_df[0]
|> Explorer.Series.to_list()
|> Enum.map(fn parquet_location ->
Explorer.DataFrame.from_parquet!(parquet_location, lazy: true, config: aws_credentials)
end)
|> Explorer.DataFrame.concat_rows()
end

defp get_query_state(request, response) do
response =
%{request | body: response.body}
Expand Down Expand Up @@ -379,8 +454,6 @@ defmodule ReqAthena do
)
end

@credential_keys ~w(access_key_id secret_access_key region token)a

defp maybe_put_aws_credentials(request) do
case aws_credentials() do
:undefined ->
Expand Down
11 changes: 9 additions & 2 deletions lib/req_athena/query.ex
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,17 @@ defmodule ReqAthena.Query do

defp encode_value(value), do: value

defp maybe_around_unload(%{query: query_string, unload: [_ | _] = opts})
def is_select(%{query: query_string})
when is_binary(query_string) do
query_string =~ ~r/^[\s]*select/i
end

def can_use_unload?(_), do: false

defp maybe_around_unload(%{query: query_string, unload: [_ | _] = opts} = query)
when is_binary(query_string) do
# UNLOAD works only with SELECT
if query_string =~ ~r/^[\s]*select/i do
if is_select(query) do
{to, props} = Keyword.pop!(opts, :to)

props =
Expand Down
1 change: 1 addition & 0 deletions mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ defmodule ReqAthena.MixProject do
[
{:req, "~> 0.5.0"},
{:aws_signature, "~> 0.3.0"},
{:explorer, "~> 0.9.0"},
{:aws_credentials, "~> 0.2", optional: true},
{:table, "~> 0.1.1", optional: true},
{:tzdata, "~> 1.1.1", only: :test},
Expand Down
4 changes: 4 additions & 0 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
"earmark_parser": {:hex, :earmark_parser, "1.4.41", "ab34711c9dc6212dda44fcd20ecb87ac3f3fce6f0ca2f28d4a00e4154f8cd599", [:mix], [], "hexpm", "a81a04c7e34b6617c2792e291b5a2e57ab316365c2644ddc553bb9ed863ebefa"},
"eini": {:hex, :eini_beam, "2.2.4", "02143b1dce4dda4243248e7d9b3d8274b8d9f5a666445e3d868e2cce79e4ff22", [:rebar3], [], "hexpm", "12de479d144b19e09bb92ba202a7ea716739929afdf9dff01ad802e2b1508471"},
"ex_doc": {:hex, :ex_doc, "0.34.2", "13eedf3844ccdce25cfd837b99bea9ad92c4e511233199440488d217c92571e8", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "5ce5f16b41208a50106afed3de6a2ed34f4acfd65715b82a0b84b49d995f95c1"},
"explorer": {:hex, :explorer, "0.9.1", "9c6f175dfd2fa2f432d5fe9a86b81875438a9a1110af5b952c284842bee434e4", [:mix], [{:adbc, "~> 0.1", [hex: :adbc, repo: "hexpm", optional: true]}, {:aws_signature, "~> 0.3", [hex: :aws_signature, repo: "hexpm", optional: false]}, {:castore, "~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:flame, "~> 0.3", [hex: :flame, repo: "hexpm", optional: true]}, {:fss, "~> 0.1", [hex: :fss, repo: "hexpm", optional: false]}, {:nx, "~> 0.4", [hex: :nx, repo: "hexpm", optional: true]}, {:rustler, "~> 0.34.0", [hex: :rustler, repo: "hexpm", optional: true]}, {:rustler_precompiled, "~> 0.7", [hex: :rustler_precompiled, repo: "hexpm", optional: false]}, {:table, "~> 0.1.2", [hex: :table, repo: "hexpm", optional: false]}, {:table_rex, "~> 3.1.1 or ~> 4.0.0", [hex: :table_rex, repo: "hexpm", optional: false]}], "hexpm", "d88ec0e78f904c5eaf0b37c4a0ce4632de133515f3740a29fbddd2c0d0a78e77"},
"finch": {:hex, :finch, "0.18.0", "944ac7d34d0bd2ac8998f79f7a811b21d87d911e77a786bc5810adb75632ada4", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: false]}, {:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.3", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 0.2.6 or ~> 1.0", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "69f5045b042e531e53edc2574f15e25e735b522c37e2ddb766e15b979e03aa65"},
"fss": {:hex, :fss, "0.1.1", "9db2344dbbb5d555ce442ac7c2f82dd975b605b50d169314a20f08ed21e08642", [:mix], [], "hexpm", "78ad5955c7919c3764065b21144913df7515d52e228c09427a004afe9c1a16b0"},
"hackney": {:hex, :hackney, "1.18.1", "f48bf88f521f2a229fc7bae88cf4f85adc9cd9bcf23b5dc8eb6a1788c662c4f6", [:rebar3], [{:certifi, "~> 2.9.0", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "~> 6.1.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "~> 1.0.0", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~> 1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:parse_trans, "3.3.1", [hex: :parse_trans, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~> 1.1.0", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}, {:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "a4ecdaff44297e9b5894ae499e9a070ea1888c84afdd1fd9b7b2bc384950128e"},
"hpax": {:hex, :hpax, "1.0.0", "28dcf54509fe2152a3d040e4e3df5b265dcb6cb532029ecbacf4ce52caea3fd2", [:mix], [], "hexpm", "7f1314731d711e2ca5fdc7fd361296593fc2542570b3105595bb0bc6d0fad601"},
"idna": {:hex, :idna, "6.1.1", "8a63070e9f7d0c62eb9d9fcb360a7de382448200fbbd1b106cc96d3d8099df8d", [:rebar3], [{:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "92376eb7894412ed19ac475e4a86f7b413c1b9fbb5bd16dccd57934157944cea"},
Expand All @@ -25,8 +27,10 @@
"nimble_pool": {:hex, :nimble_pool, "1.1.0", "bf9c29fbdcba3564a8b800d1eeb5a3c58f36e1e11d7b7fb2e084a643f645f06b", [:mix], [], "hexpm", "af2e4e6b34197db81f7aad230c1118eac993acc0dae6bc83bac0126d4ae0813a"},
"parse_trans": {:hex, :parse_trans, "3.3.1", "16328ab840cc09919bd10dab29e431da3af9e9e7e7e6f0089dd5a2d2820011d8", [:rebar3], [], "hexpm", "07cd9577885f56362d414e8c4c4e6bdf10d43a8767abb92d24cbe8b24c54888b"},
"req": {:hex, :req, "0.5.6", "8fe1eead4a085510fe3d51ad854ca8f20a622aae46e97b302f499dfb84f726ac", [:mix], [{:brotli, "~> 0.3.1", [hex: :brotli, repo: "hexpm", optional: true]}, {:ezstd, "~> 1.0", [hex: :ezstd, repo: "hexpm", optional: true]}, {:finch, "~> 0.17", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mime, "~> 2.0.6 or ~> 2.1", [hex: :mime, repo: "hexpm", optional: false]}, {:nimble_csv, "~> 1.0", [hex: :nimble_csv, repo: "hexpm", optional: true]}, {:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "cfaa8e720945d46654853de39d368f40362c2641c4b2153c886418914b372185"},
"rustler_precompiled": {:hex, :rustler_precompiled, "0.7.2", "097f657e401f02e7bc1cab808cfc6abdc1f7b9dc5e5adee46bf2fd8fdcce9ecf", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: false]}, {:rustler, "~> 0.23", [hex: :rustler, repo: "hexpm", optional: true]}], "hexpm", "7663faaeadc9e93e605164dcf9e69168e35f2f8b7f2b9eb4e400d1a8e0fe2999"},
"ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.7", "354c321cf377240c7b8716899e182ce4890c5938111a1296add3ec74cf1715df", [:make, :mix, :rebar3], [], "hexpm", "fe4c190e8f37401d30167c8c405eda19469f34577987c76dde613e838bbc67f8"},
"table": {:hex, :table, "0.1.2", "87ad1125f5b70c5dea0307aa633194083eb5182ec537efc94e96af08937e14a8", [:mix], [], "hexpm", "7e99bc7efef806315c7e65640724bf165c3061cdc5d854060f74468367065029"},
"table_rex": {:hex, :table_rex, "4.0.0", "3c613a68ebdc6d4d1e731bc973c233500974ec3993c99fcdabb210407b90959b", [:mix], [], "hexpm", "c35c4d5612ca49ebb0344ea10387da4d2afe278387d4019e4d8111e815df8f55"},
"telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"},
"tzdata": {:hex, :tzdata, "1.1.1", "20c8043476dfda8504952d00adac41c6eda23912278add38edc140ae0c5bcc46", [:mix], [{:hackney, "~> 1.17", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm", "a69cec8352eafcd2e198dea28a34113b60fdc6cb57eb5ad65c10292a6ba89787"},
"unicode_util_compat": {:hex, :unicode_util_compat, "0.7.0", "bc84380c9ab48177092f43ac89e4dfa2c6d62b40b8bd132b1059ecc7232f9a78", [:rebar3], [], "hexpm", "25eee6d67df61960cf6a794239566599b09e17e668d3700247bc498638152521"},
Expand Down
68 changes: 45 additions & 23 deletions test/integration_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -53,31 +53,49 @@ defmodule IntegrationTest do

assert query_response.status == 200

assert query_response.body.columns == [
"id",
"type",
"tags",
"members",
"timestamp",
"visible"
]

refute query_response.body.statement_name
assert is_binary(query_response.body.query_execution_id)
assert %Explorer.DataFrame{} = ldf = query_response.body
assert Explorer.DataFrame.lazy?(ldf)

df = Explorer.DataFrame.collect(ldf)

names = [
"id",
"type",
"tags",
"members",
"timestamp",
"visible"
]

assert query_response.body.output_location ==
"#{opts[:output_location]}/#{query_response.body.query_execution_id}.csv"
values = [
470_454,
"relation",
[
%{
"key" => "source",
"value" => "©IGN 2010 dans le cadre de la cartographie réglementaire"
},
%{"key" => "site", "value" => "geodesic"},
%{
"key" => "url",
"value" =>
"http://geodesie.ign.fr/fiches/index.php?module=e&action=fichepdf&source=carte&sit_no=17229A"
},
%{"key" => "name", "value" => "Mérignac A"},
%{"key" => "network", "value" => "NTF-5"},
%{"key" => "ref", "value" => "17229A"},
%{"key" => "type", "value" => "site"}
],
[
%{"ref" => 670_007_839, "role" => "", "type" => "node"},
%{"ref" => 670_007_840, "role" => "", "type" => "node"}
],
~N[2017-01-21 12:51:34.000000],
true
]

assert query_response.body.rows == [
[
470_454,
"relation",
"{ref=17229A, site=geodesic, name=Mérignac A, source=©IGN 2010 dans le cadre de la cartographie réglementaire, type=site, url=http://geodesie.ign.fr/fiches/index.php?module=e&action=fichepdf&source=carte&sit_no=17229A, network=NTF-5}",
"[{type=node, ref=670007839, role=}, {type=node, ref=670007840, role=}]",
~N[2017-01-21 12:51:34.000],
true
]
]
assert Explorer.DataFrame.names(df) == names
assert Explorer.DataFrame.to_rows(df) == [Map.new(Enum.zip(names, values))]
end

test "returns the response from AWS Athena's API with parameterized query" do
Expand All @@ -86,6 +104,7 @@ defmodule IntegrationTest do
secret_access_key: System.fetch_env!("AWS_SECRET_ACCESS_KEY"),
region: System.fetch_env!("AWS_REGION"),
database: "default",
no_explorer: true,
output_location: System.fetch_env!("AWS_ATHENA_OUTPUT_LOCATION")
]

Expand Down Expand Up @@ -121,6 +140,7 @@ defmodule IntegrationTest do
secret_access_key: System.fetch_env!("AWS_SECRET_ACCESS_KEY"),
region: System.fetch_env!("AWS_REGION"),
database: "default",
no_explorer: true,
output_location: System.fetch_env!("AWS_ATHENA_OUTPUT_LOCATION")
]

Expand Down Expand Up @@ -265,6 +285,7 @@ defmodule IntegrationTest do
secret_access_key: System.fetch_env!("AWS_SECRET_ACCESS_KEY"),
region: System.fetch_env!("AWS_REGION"),
database: "default",
no_explorer: true,
output_location: System.fetch_env!("AWS_ATHENA_OUTPUT_LOCATION")
]

Expand Down Expand Up @@ -301,6 +322,7 @@ defmodule IntegrationTest do
secret_access_key: System.fetch_env!("AWS_SECRET_ACCESS_KEY"),
region: System.fetch_env!("AWS_REGION"),
database: "default",
no_explorer: true,
output_location: System.fetch_env!("AWS_ATHENA_OUTPUT_LOCATION"),
cache_query: false
]
Expand Down
34 changes: 34 additions & 0 deletions test/req_athena/query_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -69,5 +69,39 @@ defmodule ReqAthena.QueryTest do
assert Query.to_query_string(query) ==
"EXECUTE test_statement USING 420"
end

test "unload attributes and a create command does not use the unload command" do
create = """
CREATE EXTERNAL TABLE IF NOT EXISTS planet (
id BIGINT,
type STRING,
tags MAP<STRING,STRING>,
lat DECIMAL(9,7),
lon DECIMAL(10,7),
nds ARRAY<STRUCT<REF:BIGINT>>,
members ARRAY<STRUCT<TYPE:STRING,REF:BIGINT,ROLE:STRING>>,
changeset BIGINT,
timestamp TIMESTAMP,
uid BIGINT,
user STRING,
version BIGINT,
visible BOOLEAN
)
STORED AS ORCFILE
LOCATION 's3://osm-pds/planet/';\
"""

query = %Query{
query: create,
params: [420],
prepared: true,
statement_name: "test_statement"
}

query = Query.with_unload(query, to: "s3://my-bucket/my-dir")

assert Query.to_query_string(query) ==
"EXECUTE test_statement USING 420"
end
end
end
Loading

0 comments on commit 670a179

Please sign in to comment.