diff --git a/lib/req_athena.ex b/lib/req_athena.ex index 9234f53..e220ec3 100644 --- a/lib/req_athena.ex +++ b/lib/req_athena.ex @@ -2,8 +2,13 @@ defmodule ReqAthena do @moduledoc """ `Req` plugin for [AWS Athena](https://docs.aws.amazon.com/athena/latest/APIReference/Welcome.html). - ReqAthena makes it easy to make Athena queries. Query results are decoded into the `ReqAthena.Result` struct. - The struct implements the `Table.Reader` protocol and thus can be efficiently traversed by rows or columns. + ReqAthena makes it easy to make Athena queries and save the results into S3 buckets. + + By default, `ReqAthena` will save results using the Apache Parquet format, and return a + `Explorer.DataFrame` as a lazy frame pointing to all the partition files. These partitions + are sorted independently, but we cannot guarantee ordering as a whole. + + See the limitations in the [`UNLOAD` command docs](https://docs.aws.amazon.com/athena/latest/ug/unload.html#unload-considerations-and-limitations). """ require Logger @@ -19,8 +24,11 @@ defmodule ReqAthena do athena output_location cache_query + no_explorer )a + @credential_keys ~w(access_key_id secret_access_key region token)a + defguardp is_empty(value) when value in [nil, ""] @doc """ @@ -39,15 +47,23 @@ defmodule ReqAthena do * `:database` - Required. The AWS Athena database name. * `:output_location` - Conditional. The S3 URL location to output AWS Athena query results. + Results will be saved as Parquet and loaded with Explorer only if this option is given. * `:workgroup` - Conditional. The AWS Athena workgroup. * `:cache_query` - Optional. Forces a non-cached result from AWS Athena. - * `:athena` - Required. The query to execute. It can be a plain sql string or + * `:no_explorer` - Disable output as an Explorer dataframe. Defaults to `nil`, which + enables Explorer integration by default. + + * `:athena` - Required. The query to execute. It can be a plain SQL string or a `{query, params}` tuple, where `query` can contain `?` placeholders and `params` is a list of corresponding values. + There is a limitation of Athena that requires the `:output_location` to be empty + for every query. So we append "results" to the `:output_location`, so the partition + files are saved there. + Conditional fields must always be defined, and can be one of the fields or both. If you want to set any of these options when attaching the plugin, pass them as the second argument. @@ -163,6 +179,18 @@ defmodule ReqAthena do %{WorkGroup: workgroup, ResultConfiguration: %{OutputLocation: output}} end + query = + if not (!!request.options[:no_explorer]) and is_binary(request.options[:output_location]) do + ReqAthena.Query.with_unload( + query, + # We need to add this "subdirectory" because Athena expects the results directory + # to be empty. + to: Path.join(request.options[:output_location], "results") + ) + else + query + end + body = Map.merge(output_config, %{ QueryExecutionContext: %{Database: fetch_option!(request, :database)}, @@ -205,12 +233,59 @@ defmodule ReqAthena do execute_prepared_query(request) {"GetQueryResults", _} -> - decode_result(request, response) + if ReqAthena.Query.is_select(query) and not is_nil(query.unload) do + build_explorer_lazy_frame(request, response) + else + decode_result(request, response) + end end end defp handle_athena_result(request_response), do: request_response + defp build_explorer_lazy_frame(request, response) do + body = Jason.decode!(response.body) + + result = + if Map.has_key?(body, "ResultSet") do + manifest_csv_location = + Request.get_private(request, :athena_output_location) <> "-manifest.csv" + + aws_credentials = + for key <- @credential_keys, + value = request.options[key], + not is_nil(value), + do: {key, value} + + # This private field is only meant to be used in tests. + fetcher_and_builder = + Request.get_private(request, :athena_dataframe_builder, &fetch_and_build_dataframe/2) + + fetcher_and_builder.(manifest_csv_location, aws_credentials) + else + body + end + + Request.halt(request, %{response | body: result}) + end + + @doc false + def fetch_and_build_dataframe(manifest_csv_location, aws_credentials) do + # TODO: Should we handle errors here? + manifest_df = + Explorer.DataFrame.from_csv!(manifest_csv_location, + header: false, + config: aws_credentials + ) + + manifest_df[0] + |> Explorer.Series.to_list() + |> Enum.map(fn parquet_location -> + Explorer.DataFrame.from_parquet!(parquet_location, lazy: true, config: aws_credentials) + end) + |> Explorer.DataFrame.concat_rows() + end + defp get_query_state(request, response) do response = %{request | body: response.body} @@ -379,8 +454,6 @@ defmodule ReqAthena do ) end - @credential_keys ~w(access_key_id secret_access_key region token)a - defp maybe_put_aws_credentials(request) do case aws_credentials() do :undefined -> diff --git a/lib/req_athena/query.ex b/lib/req_athena/query.ex index 874b8c0..1658615 100644 --- a/lib/req_athena/query.ex +++ b/lib/req_athena/query.ex @@ -55,10 +55,17 @@ defmodule ReqAthena.Query do defp encode_value(value), do: value - defp maybe_around_unload(%{query: query_string, unload: [_ | _] = opts}) + def is_select(%{query: query_string}) + when is_binary(query_string) do + query_string =~ ~r/^[\s]*select/i + end + + def can_use_unload?(_), do: false + + defp maybe_around_unload(%{query: query_string, unload: [_ | _] = opts} = query) when is_binary(query_string) do # UNLOAD works only with SELECT - if query_string =~ ~r/^[\s]*select/i do + if is_select(query) do {to, props} = Keyword.pop!(opts, :to) props = diff --git a/mix.exs b/mix.exs index 7c15780..4861a0e 100644 --- a/mix.exs +++ b/mix.exs @@ -43,6 +43,7 @@ defmodule ReqAthena.MixProject do [ {:req, "~> 0.5.0"}, {:aws_signature, "~> 0.3.0"}, + {:explorer, "~> 0.9.0"}, {:aws_credentials, "~> 0.2", optional: true}, {:table, "~> 0.1.1", optional: true}, {:tzdata, "~> 1.1.1", only: :test}, diff --git a/mix.lock b/mix.lock index c847fdc..6887fcc 100644 --- a/mix.lock +++ b/mix.lock @@ -6,7 +6,9 @@ "earmark_parser": {:hex, :earmark_parser, "1.4.41", "ab34711c9dc6212dda44fcd20ecb87ac3f3fce6f0ca2f28d4a00e4154f8cd599", [:mix], [], "hexpm", "a81a04c7e34b6617c2792e291b5a2e57ab316365c2644ddc553bb9ed863ebefa"}, "eini": {:hex, :eini_beam, "2.2.4", "02143b1dce4dda4243248e7d9b3d8274b8d9f5a666445e3d868e2cce79e4ff22", [:rebar3], [], "hexpm", "12de479d144b19e09bb92ba202a7ea716739929afdf9dff01ad802e2b1508471"}, "ex_doc": {:hex, :ex_doc, "0.34.2", "13eedf3844ccdce25cfd837b99bea9ad92c4e511233199440488d217c92571e8", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.0", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14 or ~> 1.0", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1 or ~> 1.0", [hex: :makeup_erlang, repo: "hexpm", optional: false]}, {:makeup_html, ">= 0.1.0", [hex: :makeup_html, repo: "hexpm", optional: true]}], "hexpm", "5ce5f16b41208a50106afed3de6a2ed34f4acfd65715b82a0b84b49d995f95c1"}, + "explorer": {:hex, :explorer, "0.9.1", "9c6f175dfd2fa2f432d5fe9a86b81875438a9a1110af5b952c284842bee434e4", [:mix], [{:adbc, "~> 0.1", [hex: :adbc, repo: "hexpm", optional: true]}, {:aws_signature, "~> 0.3", [hex: :aws_signature, repo: "hexpm", optional: false]}, {:castore, "~> 1.0", [hex: :castore, repo: "hexpm", optional: true]}, {:flame, "~> 0.3", [hex: :flame, repo: "hexpm", optional: true]}, {:fss, "~> 0.1", [hex: :fss, repo: "hexpm", optional: false]}, {:nx, "~> 0.4", [hex: :nx, repo: "hexpm", optional: true]}, {:rustler, "~> 0.34.0", [hex: :rustler, repo: "hexpm", optional: true]}, {:rustler_precompiled, "~> 0.7", [hex: :rustler_precompiled, repo: "hexpm", optional: false]}, {:table, "~> 0.1.2", [hex: :table, repo: "hexpm", optional: false]}, {:table_rex, "~> 3.1.1 or ~> 4.0.0", [hex: :table_rex, repo: "hexpm", optional: false]}], "hexpm", "d88ec0e78f904c5eaf0b37c4a0ce4632de133515f3740a29fbddd2c0d0a78e77"}, "finch": {:hex, :finch, "0.18.0", "944ac7d34d0bd2ac8998f79f7a811b21d87d911e77a786bc5810adb75632ada4", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: false]}, {:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.3", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 0.2.6 or ~> 1.0", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "69f5045b042e531e53edc2574f15e25e735b522c37e2ddb766e15b979e03aa65"}, + "fss": {:hex, :fss, "0.1.1", "9db2344dbbb5d555ce442ac7c2f82dd975b605b50d169314a20f08ed21e08642", [:mix], [], "hexpm", "78ad5955c7919c3764065b21144913df7515d52e228c09427a004afe9c1a16b0"}, "hackney": {:hex, :hackney, "1.18.1", "f48bf88f521f2a229fc7bae88cf4f85adc9cd9bcf23b5dc8eb6a1788c662c4f6", [:rebar3], [{:certifi, "~> 2.9.0", [hex: :certifi, repo: "hexpm", optional: false]}, {:idna, "~> 6.1.0", [hex: :idna, repo: "hexpm", optional: false]}, {:metrics, "~> 1.0.0", [hex: :metrics, repo: "hexpm", optional: false]}, {:mimerl, "~> 1.1", [hex: :mimerl, repo: "hexpm", optional: false]}, {:parse_trans, "3.3.1", [hex: :parse_trans, repo: "hexpm", optional: false]}, {:ssl_verify_fun, "~> 1.1.0", [hex: :ssl_verify_fun, repo: "hexpm", optional: false]}, {:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "a4ecdaff44297e9b5894ae499e9a070ea1888c84afdd1fd9b7b2bc384950128e"}, "hpax": {:hex, :hpax, "1.0.0", "28dcf54509fe2152a3d040e4e3df5b265dcb6cb532029ecbacf4ce52caea3fd2", [:mix], [], "hexpm", "7f1314731d711e2ca5fdc7fd361296593fc2542570b3105595bb0bc6d0fad601"}, "idna": {:hex, :idna, "6.1.1", "8a63070e9f7d0c62eb9d9fcb360a7de382448200fbbd1b106cc96d3d8099df8d", [:rebar3], [{:unicode_util_compat, "~> 0.7.0", [hex: :unicode_util_compat, repo: "hexpm", optional: false]}], "hexpm", "92376eb7894412ed19ac475e4a86f7b413c1b9fbb5bd16dccd57934157944cea"}, @@ -25,8 +27,10 @@ "nimble_pool": {:hex, :nimble_pool, "1.1.0", "bf9c29fbdcba3564a8b800d1eeb5a3c58f36e1e11d7b7fb2e084a643f645f06b", [:mix], [], "hexpm", "af2e4e6b34197db81f7aad230c1118eac993acc0dae6bc83bac0126d4ae0813a"}, "parse_trans": {:hex, :parse_trans, "3.3.1", "16328ab840cc09919bd10dab29e431da3af9e9e7e7e6f0089dd5a2d2820011d8", [:rebar3], [], "hexpm", "07cd9577885f56362d414e8c4c4e6bdf10d43a8767abb92d24cbe8b24c54888b"}, "req": {:hex, :req, "0.5.6", "8fe1eead4a085510fe3d51ad854ca8f20a622aae46e97b302f499dfb84f726ac", [:mix], [{:brotli, "~> 0.3.1", [hex: :brotli, repo: "hexpm", optional: true]}, {:ezstd, "~> 1.0", [hex: :ezstd, repo: "hexpm", optional: true]}, {:finch, "~> 0.17", [hex: :finch, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}, {:mime, "~> 2.0.6 or ~> 2.1", [hex: :mime, repo: "hexpm", optional: false]}, {:nimble_csv, "~> 1.0", [hex: :nimble_csv, repo: "hexpm", optional: true]}, {:plug, "~> 1.0", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "cfaa8e720945d46654853de39d368f40362c2641c4b2153c886418914b372185"}, + "rustler_precompiled": {:hex, :rustler_precompiled, "0.7.2", "097f657e401f02e7bc1cab808cfc6abdc1f7b9dc5e5adee46bf2fd8fdcce9ecf", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: false]}, {:rustler, "~> 0.23", [hex: :rustler, repo: "hexpm", optional: true]}], "hexpm", "7663faaeadc9e93e605164dcf9e69168e35f2f8b7f2b9eb4e400d1a8e0fe2999"}, "ssl_verify_fun": {:hex, :ssl_verify_fun, "1.1.7", "354c321cf377240c7b8716899e182ce4890c5938111a1296add3ec74cf1715df", [:make, :mix, :rebar3], [], "hexpm", "fe4c190e8f37401d30167c8c405eda19469f34577987c76dde613e838bbc67f8"}, "table": {:hex, :table, "0.1.2", "87ad1125f5b70c5dea0307aa633194083eb5182ec537efc94e96af08937e14a8", [:mix], [], "hexpm", "7e99bc7efef806315c7e65640724bf165c3061cdc5d854060f74468367065029"}, + "table_rex": {:hex, :table_rex, "4.0.0", "3c613a68ebdc6d4d1e731bc973c233500974ec3993c99fcdabb210407b90959b", [:mix], [], "hexpm", "c35c4d5612ca49ebb0344ea10387da4d2afe278387d4019e4d8111e815df8f55"}, "telemetry": {:hex, :telemetry, "1.2.1", "68fdfe8d8f05a8428483a97d7aab2f268aaff24b49e0f599faa091f1d4e7f61c", [:rebar3], [], "hexpm", "dad9ce9d8effc621708f99eac538ef1cbe05d6a874dd741de2e689c47feafed5"}, "tzdata": {:hex, :tzdata, "1.1.1", "20c8043476dfda8504952d00adac41c6eda23912278add38edc140ae0c5bcc46", [:mix], [{:hackney, "~> 1.17", [hex: :hackney, repo: "hexpm", optional: false]}], "hexpm", "a69cec8352eafcd2e198dea28a34113b60fdc6cb57eb5ad65c10292a6ba89787"}, "unicode_util_compat": {:hex, :unicode_util_compat, "0.7.0", "bc84380c9ab48177092f43ac89e4dfa2c6d62b40b8bd132b1059ecc7232f9a78", [:rebar3], [], "hexpm", "25eee6d67df61960cf6a794239566599b09e17e668d3700247bc498638152521"}, diff --git a/test/integration_test.exs b/test/integration_test.exs index 458de3f..e916722 100644 --- a/test/integration_test.exs +++ b/test/integration_test.exs @@ -53,31 +53,49 @@ defmodule IntegrationTest do assert query_response.status == 200 - assert query_response.body.columns == [ - "id", - "type", - "tags", - "members", - "timestamp", - "visible" - ] - - refute query_response.body.statement_name - assert is_binary(query_response.body.query_execution_id) + assert %Explorer.DataFrame{} = ldf = query_response.body + assert Explorer.DataFrame.lazy?(ldf) + + df = Explorer.DataFrame.collect(ldf) + + names = [ + "id", + "type", + "tags", + "members", + "timestamp", + "visible" + ] - assert query_response.body.output_location == - "#{opts[:output_location]}/#{query_response.body.query_execution_id}.csv" + values = [ + 470_454, + "relation", + [ + %{ + "key" => "source", + "value" => "©IGN 2010 dans le cadre de la cartographie réglementaire" + }, + %{"key" => "site", "value" => "geodesic"}, + %{ + "key" => "url", + "value" => + "http://geodesie.ign.fr/fiches/index.php?module=e&action=fichepdf&source=carte&sit_no=17229A" + }, + %{"key" => "name", "value" => "Mérignac A"}, + %{"key" => "network", "value" => "NTF-5"}, + %{"key" => "ref", "value" => "17229A"}, + %{"key" => "type", "value" => "site"} + ], + [ + %{"ref" => 670_007_839, "role" => "", "type" => "node"}, + %{"ref" => 670_007_840, "role" => "", "type" => "node"} + ], + ~N[2017-01-21 12:51:34.000000], + true + ] - assert query_response.body.rows == [ - [ - 470_454, - "relation", - "{ref=17229A, site=geodesic, name=Mérignac A, source=©IGN 2010 dans le cadre de la cartographie réglementaire, type=site, url=http://geodesie.ign.fr/fiches/index.php?module=e&action=fichepdf&source=carte&sit_no=17229A, network=NTF-5}", - "[{type=node, ref=670007839, role=}, {type=node, ref=670007840, role=}]", - ~N[2017-01-21 12:51:34.000], - true - ] - ] + assert Explorer.DataFrame.names(df) == names + assert Explorer.DataFrame.to_rows(df) == [Map.new(Enum.zip(names, values))] end test "returns the response from AWS Athena's API with parameterized query" do @@ -86,6 +104,7 @@ defmodule IntegrationTest do secret_access_key: System.fetch_env!("AWS_SECRET_ACCESS_KEY"), region: System.fetch_env!("AWS_REGION"), database: "default", + no_explorer: true, output_location: System.fetch_env!("AWS_ATHENA_OUTPUT_LOCATION") ] @@ -121,6 +140,7 @@ defmodule IntegrationTest do secret_access_key: System.fetch_env!("AWS_SECRET_ACCESS_KEY"), region: System.fetch_env!("AWS_REGION"), database: "default", + no_explorer: true, output_location: System.fetch_env!("AWS_ATHENA_OUTPUT_LOCATION") ] @@ -265,6 +285,7 @@ defmodule IntegrationTest do secret_access_key: System.fetch_env!("AWS_SECRET_ACCESS_KEY"), region: System.fetch_env!("AWS_REGION"), database: "default", + no_explorer: true, output_location: System.fetch_env!("AWS_ATHENA_OUTPUT_LOCATION") ] @@ -301,6 +322,7 @@ defmodule IntegrationTest do secret_access_key: System.fetch_env!("AWS_SECRET_ACCESS_KEY"), region: System.fetch_env!("AWS_REGION"), database: "default", + no_explorer: true, output_location: System.fetch_env!("AWS_ATHENA_OUTPUT_LOCATION"), cache_query: false ] diff --git a/test/req_athena/query_test.exs b/test/req_athena/query_test.exs index f065067..d276b81 100644 --- a/test/req_athena/query_test.exs +++ b/test/req_athena/query_test.exs @@ -69,5 +69,39 @@ defmodule ReqAthena.QueryTest do assert Query.to_query_string(query) == "EXECUTE test_statement USING 420" end + + test "unload attributes and a create command does not use the unload command" do + create = """ + CREATE EXTERNAL TABLE IF NOT EXISTS planet ( + id BIGINT, + type STRING, + tags MAP, + lat DECIMAL(9,7), + lon DECIMAL(10,7), + nds ARRAY>, + members ARRAY>, + changeset BIGINT, + timestamp TIMESTAMP, + uid BIGINT, + user STRING, + version BIGINT, + visible BOOLEAN + ) + STORED AS ORCFILE + LOCATION 's3://osm-pds/planet/';\ + """ + + query = %Query{ + query: create, + params: [420], + prepared: true, + statement_name: "test_statement" + } + + query = Query.with_unload(query, to: "s3://my-bucket/my-dir") + + assert Query.to_query_string(query) == + "EXECUTE test_statement USING 420" + end end end diff --git a/test/req_athena_test.exs b/test/req_athena_test.exs index d81a872..95b0a53 100644 --- a/test/req_athena_test.exs +++ b/test/req_athena_test.exs @@ -2,7 +2,7 @@ defmodule ReqAthenaTest do use ExUnit.Case, async: true @moduletag :capture_log - test "executes a query string" do + test "executes a query string returning a data frame" do opts = [ access_key_id: "some key", secret_access_key: "dummy", @@ -11,47 +11,53 @@ defmodule ReqAthenaTest do output_location: "s3://foo" ] + request_validations = %{ + "StartQueryExecution" => fn request -> + decoded = Jason.decode!(request.body) + + assert %{ + "ClientRequestToken" => client_req_token, + "QueryExecutionContext" => %{ + "Database" => "my_awesome_database" + }, + "QueryString" => + "UNLOAD (select * from iris)\nTO 's3://foo/results'\nWITH (compression = 'SNAPPY', format = 'PARQUET')", + "ResultConfiguration" => %{"OutputLocation" => "s3://foo"} + } = decoded + + assert is_binary(client_req_token) + end + } + + me = self() + assert response = - Req.new(adapter: fake_athena()) + Req.new(adapter: fake_athena(request_validations)) |> Req.Request.put_header("x-auth", "my awesome auth header") + |> Req.Request.put_private(:athena_dataframe_builder, fn manifest_location, + credentials -> + assert manifest_location == "s3://foo-manifest.csv" + + assert Enum.sort(Keyword.take(opts, [:access_key_id, :secret_access_key, :region])) == + Enum.sort(credentials) + + send(me, {:explorer_built, manifest_location}) + + Explorer.DataFrame.new(id: [1, 2], name: ["Ale", "Wojtek"]) + end) |> ReqAthena.attach(opts) |> Req.post!(athena: "select * from iris") assert response.status == 200 - assert response.body == %ReqAthena.Result{ - columns: ["id", "name"], - output_location: "s3://foo", - query_execution_id: "an uuid", - rows: [[1, "Ale"], [2, "Wojtek"]], - statement_name: nil, - metadata: [ - %{ - "CaseSensitive" => false, - "CatalogName" => "hive", - "Label" => "id", - "Name" => "id", - "Nullable" => "UNKNOWN", - "Precision" => 10, - "Scale" => 0, - "SchemaName" => "", - "TableName" => "", - "Type" => "integer" - }, - %{ - "CaseSensitive" => true, - "CatalogName" => "hive", - "Label" => "name", - "Name" => "name", - "Nullable" => "UNKNOWN", - "Precision" => 2_147_483_647, - "Scale" => 0, - "SchemaName" => "", - "TableName" => "", - "Type" => "varchar" - } - ] + assert df = %Explorer.DataFrame{} = response.body + + assert Explorer.DataFrame.to_columns(df, atom_keys: true) == %{ + id: [1, 2], + name: ["Ale", "Wojtek"] } + + assert_received {:explorer_built, _output_location} end test "parses a response with a datum object missing" do @@ -60,6 +66,7 @@ defmodule ReqAthenaTest do secret_access_key: "dummy", region: "us-east-1", database: "my_awesome_database", + no_explorer: true, output_location: "s3://foo" ] @@ -308,6 +315,7 @@ defmodule ReqAthenaTest do secret_access_key: "dummy", region: "us-east-1", database: "my_awesome_database", + no_explorer: true, output_location: "s3://foo" ] @@ -375,46 +383,33 @@ defmodule ReqAthenaTest do output_location: "s3://foo" ] - assert response = - Req.new(adapter: fake_athena(validations)) - |> ReqAthena.attach(opts) - |> Req.post!(athena: "select * from iris") + me = self() + + response = + Req.new(adapter: fake_athena(validations)) + |> ReqAthena.attach(opts) + |> Req.Request.put_private(:athena_dataframe_builder, fn manifest_location, credentials -> + assert manifest_location == "s3://foo-manifest.csv" + + assert Enum.sort( + Keyword.take(opts, [:access_key_id, :secret_access_key, :region, :token]) + ) == + Enum.sort(credentials) + + send(me, :explorer_built) + + Explorer.DataFrame.new(id: [1, 2], name: ["Ale", "Wojtek"]) + end) + |> Req.post!(athena: "select * from iris") assert response.status == 200 - assert response.body == %ReqAthena.Result{ - columns: ["id", "name"], - output_location: "s3://foo", - query_execution_id: "an uuid", - rows: [[1, "Ale"], [2, "Wojtek"]], - statement_name: nil, - metadata: [ - %{ - "CaseSensitive" => false, - "CatalogName" => "hive", - "Label" => "id", - "Name" => "id", - "Nullable" => "UNKNOWN", - "Precision" => 10, - "Scale" => 0, - "SchemaName" => "", - "TableName" => "", - "Type" => "integer" - }, - %{ - "CaseSensitive" => true, - "CatalogName" => "hive", - "Label" => "name", - "Name" => "name", - "Nullable" => "UNKNOWN", - "Precision" => 2_147_483_647, - "Scale" => 0, - "SchemaName" => "", - "TableName" => "", - "Type" => "varchar" - } - ] + assert Explorer.DataFrame.to_columns(response.body, atom_keys: true) == %{ + id: [1, 2], + name: ["Ale", "Wojtek"] } + + assert_received :explorer_built end test "executes a query with workgroup" do