Skip to content

Commit

Permalink
Update docs and make Explorer optional
Browse files Browse the repository at this point in the history
  • Loading branch information
Philip Sampaio committed Sep 10, 2024
1 parent 5f86fea commit fe5d7b4
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 45 deletions.
84 changes: 40 additions & 44 deletions lib/req_athena.ex
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,10 @@ defmodule ReqAthena do
then will lazy load these parquet files into an Explorer dataframe.
There are some limitations when using the `:json` and `:explorer` format.
See more about it reading the [`UNLOAD` command docs](https://docs.aws.amazon.com/athena/latest/ug/unload.html#unload-considerations-and-limitations).
First, you need to install Explorer in order to use the `:explorer` format.
Second, when using these format, you always need to provide a different output location.
See the [`UNLOAD` command docs](https://docs.aws.amazon.com/athena/latest/ug/unload.html#unload-considerations-and-limitations)
for more details.
* `:output_compression` - Optional. Sets the Parquet compression format and level
for the output when using the Explorer output format. This can be a string, like `"gzip"`,
Expand Down Expand Up @@ -96,18 +99,25 @@ defmodule ReqAthena do
...> ]
iex> query = "SELECT id, type, tags, members, timestamp, visible FROM planet WHERE id = 470454 and type = 'relation'"
iex> req = Req.new() |> ReqAthena.attach(opts)
iex> Req.post!(req, athena: query).body
%ReqAthena.Result{
columns: ["id", "type", "tags", "members", "timestamp", "visible"],
output_location: "s3://my-bucket/c594d5df-9879-4bf7-8796-780e0b87a673.csv",
query_execution_id: "c594d5df-9879-4bf7-8796-780e0b87a673",
rows: [
[470454, "relation",
"{ref=17229A, site=geodesic, name=Mérignac A, source=©IGN 2010 dans le cadre de la cartographie réglementaire, type=site, url=http://geodesie.ign.fr/fiches/index.php?module=e&action=fichepdf&source=carte&sit_no=17229A, network=NTF-5}",
"[{type=node, ref=670007839, role=}, {type=node, ref=670007840, role=}]",
~N[2017-01-21 12:51:34.000], true]
iex> Req.post!(req, athena: query, format: :json).body
%{
"id" => 470454,
"members" => [
%{"ref" => 670007839, "role" => "", "type" => "node"},
%{"ref" => 670007840, "role" => "", "type" => "node"}
],
statement_name: nil
"tags" => %{
"name" => "Mérignac A",
"network" => "NTF-5",
"ref" => "17229A",
"site" => "geodesic",
"source" => "©IGN 2010 dans le cadre de la cartographie réglementaire",
"type" => "site",
"url" => "http://geodesie.ign.fr/fiches/index.php?module=e&action=fichepdf&source=carte&sit_no=17229A"
},
"timestamp" => "2017-01-21 12:51:34",
"type" => "relation",
"visible" => true
}
With parameterized query:
Expand All @@ -121,14 +131,8 @@ defmodule ReqAthena do
...> ]
iex> query = "SELECT id, type FROM planet WHERE id = ? and type = ?"
iex> req = Req.new() |> ReqAthena.attach(opts)
iex> Req.post!(req, athena: {query, [239_970_142, "node"]}).body
%ReqAthena.Result{
columns: ["id", "type"],
output_location: "s3://my-bucket/dda41d66-1eea-4588-850a-945c9def9163.csv",
query_execution_id: "dda41d66-1eea-4588-850a-945c9def9163",
rows: [[239_970_142, "node"]],
statement_name: "query_C71EF77B8B7B92D9846C6D7E70136448"
}
iex> Req.post!(req, athena: {query, [239_970_142, "node"]}, format: :json).body
[%{"id" => 239970142, "type" => "node"}]
"""
def attach(%Request{} = request, options \\ []) do
Expand All @@ -143,10 +147,10 @@ defmodule ReqAthena do

defp run(request) do
if query = request.options[:athena] do
region = fetch_option!(request, :region)
region = Request.fetch_option!(request, :region)

url = "https://athena.#{region}.amazonaws.com"
cache_query = get_option(request, :cache_query, true)
cache_query = Request.get_option(request, :cache_query, true)

%{request | url: URI.parse(url)}
|> put_request_body(query, cache_query)
Expand Down Expand Up @@ -227,7 +231,7 @@ defmodule ReqAthena do

body =
Map.merge(output_config, %{
QueryExecutionContext: %{Database: fetch_option!(request, :database)},
QueryExecutionContext: %{Database: Request.fetch_option!(request, :database)},
QueryString: ReqAthena.Query.to_query_string(query)
})

Expand Down Expand Up @@ -364,13 +368,24 @@ defmodule ReqAthena do
locations = ReqAthena.S3.get_locations(req_s3, output_location)

if decode_body do
build_lazy_frame(locations, aws_credentials)
else
locations
end
end

if Code.ensure_loaded?(Explorer) do
defp build_lazy_frame(parquet_locations, aws_credentials) do
parquet_locations
|> Enum.map(fn parquet_location ->
Explorer.DataFrame.from_parquet!(parquet_location, lazy: true, config: aws_credentials)
end)
|> Explorer.DataFrame.concat_rows()
else
locations
end
else
defp build_lazy_frame(parquet_locations, aws_credentials) do
raise ArgumentError,
"format: :explorer - you need to install Explorer as a dependency in order to use this format"
end
end

Expand Down Expand Up @@ -523,23 +538,4 @@ defmodule ReqAthena do
end

defp now, do: NaiveDateTime.utc_now() |> NaiveDateTime.to_erl()

# TODO: Use Req.Request.get_option/3 when Req 0.4.0 is out.
defp get_option(request, key, default) when is_atom(key) do
Map.get(request.options, key, default)
end

# TODO: Use Req.Request.fetch_option!/2 when Req 0.4.0 is out.
def fetch_option!(request, key) when is_atom(key) do
case Map.fetch(request.options, key) do
{:ok, value} ->
value

:error ->
raise KeyError,
term: request.options,
key: key,
message: "option #{inspect(key)} is not set"
end
end
end
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ defmodule ReqAthena.MixProject do
{:req, "~> 0.5.0"},
{:aws_signature, "~> 0.3.0"},
{:req_s3, "~> 0.2"},
{:explorer, "~> 0.9"},
{:explorer, "~> 0.9", optional: true},
{:aws_credentials, "~> 0.2", optional: true},
{:table, "~> 0.1.1", optional: true},
{:tzdata, "~> 1.1.1", only: :test},
Expand Down

0 comments on commit fe5d7b4

Please sign in to comment.