Skip to content

Commit

Permalink
Merge branch 'main' into 6-influx-client
Browse files Browse the repository at this point in the history
  • Loading branch information
suvayu authored Mar 16, 2024
2 parents 8a3d6d5 + bf5df78 commit cf460b0
Show file tree
Hide file tree
Showing 5 changed files with 191 additions and 81 deletions.
7 changes: 7 additions & 0 deletions src/TulipaIO.jl
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
module TulipaIO

include("exceptions.jl")

# InfluxDB client
include("influx.jl")
# ESDL JSON parser
include("parsers.jl")

# DuckDB pipeline
include("fmtsql.jl")
include("pipeline.jl")

end
45 changes: 45 additions & 0 deletions src/exceptions.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import DuckDB: DB

struct FileNotFoundError <: Exception
file::String
msg::String
function FileNotFoundError(file)
if ispath(file)
new(file, "$(file): exists, but not a regular file")
else
new(file, "$(file): file not found")
end
end
end

struct DirectoryNotFoundError <: Exception
dir::String
msg::String
function DirectoryNotFoundError(dir)
if ispath(dir)
new(dir, "$(dir): exists, but not a directory")
else
new(dir, "$(dir): directory not found")
end
end
end

struct TableNotFoundError <: Exception
con::DB
tbl::String
msg::String
TableNotFoundError(con, tbl) = new(con, tbl, "$(tbl): table not found in $(con)")
end

struct NeitherTableNorFileError <: Exception
con::DB
src::String
msg::String
NeitherTableNorFileError(con, src, msg) =
new(con, src, "$(src): neither table ($con) nor file found")
end

Base.showerror(io::IO, exc::FileNotFoundError) = print(io, exc.msg)
Base.showerror(io::IO, exc::DirectoryNotFoundError) = print(io, exc.msg)
Base.showerror(io::IO, exc::TableNotFoundError) = print(io, exc.msg)
Base.showerror(io::IO, exc::NeitherTableNorFileError) = print(io, exc.msg)
64 changes: 64 additions & 0 deletions src/fmtsql.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
module FmtSQL

using Printf: format, Format

function sprintf(fmt::String, args...)
format(Format(fmt), args...)
end

function fmt_opts(source::String; opts...)
_src = '?' in source ? "$source" : "'$(source)'"
join(["$(_src)"; [join(p, "=") for p in opts]], ", ")
end

function reader(source::String)
_, ext = splitext(source)
if ext in (".csv", ".parquet", ".json")
return "read_$(ext[2:end])_auto"
elseif '?' in source
# FIXME: how to support other file formats?
return "read_csv_auto"
else
error("$(ext[2:end]): unsupported input file '$(source)'")
end
end

function fmt_read(source::String; opts...)
sprintf("%s(%s)", reader(source), fmt_opts(source; opts...))
end

function fmt_select(source::String; opts...)
sprintf("SELECT * FROM %s", fmt_read(source; opts...))
end

function fmt_join(
from_subquery::String,
join_subquery::String;
on::Vector{String},
cols::Vector{String},
fill::Union{Bool,Vector::Any},
)
exclude = join(cols, ", ")
if fill # back fill
# e.g.: IFNULL(t2.investable, t1.investable) AS investable
include = join(map(c -> "IFNULL(t2.$c, t1.$c) AS $c", cols), ", ")
elseif !fill # explicit missing
include = join(map(c -> "t2.$c", cols), ", ")
else # fill with default
if length(fill) != length(cols)
msg = "number of default values does not match columns\n"
msg = msg * "columns: $cols\n"
msg = msg * "defaults: $fill"
error(msg)
end
include = join(map((c, f) -> "IFNULL(t2.$c, $f) AS $c", zip(cols, fill)), ", ")
end
select_ = "SELECT t1.* EXCLUDE ($exclude), $include"

join_on = join(map(c -> "t1.$c = t2.$c", on), " AND ")
from_ = "FROM $from_subquery t1 LEFT JOIN $join_subquery t2 ON ($join_on)"

"$(select_)\n$(from_)"
end

end # module FmtSQL
138 changes: 63 additions & 75 deletions src/pipeline.jl
Original file line number Diff line number Diff line change
@@ -1,115 +1,103 @@
import DataFrames as DF
import DuckDB: DB, DBInterface, Stmt
import Printf: format, Format
using DataFrames: DataFrames as DF
using DuckDB: DB, DBInterface, Stmt

function sprintf(fmt::String, args...)
format(Format(fmt), args...)
end
using .FmtSQL: fmt_join, fmt_read, fmt_select

function fmt_opts(source::String; opts...)
_src = '?' in source ? "$source" : "'$(source)'"
join(["$(_src)"; [join(p, "=") for p in opts]], ", ")
end
export create_tbl

function reader(source::String)
_, ext = splitext(source)
if ext in (".csv", ".parquet", ".json")
return "read_$(ext[2:end])_auto"
elseif '?' in source
# FIXME: how to support other file formats?
return "read_csv_auto"
else
error("$(ext[2:end]): unsupported input file '$(source)'")
end
end
# default options (for now)
_read_opts = pairs((header = true, skip = 1))

function fmt_query(source::String; opts...)
sprintf("SELECT * FROM %s(%s)", reader(source), fmt_opts(source; opts...))
function check_file(source::String)
# FIXME: handle globs
isfile(source)
end

function fmt_join(subquery1::String, subquery2::String; on::Vector{String}, cols::Vector{String})
exclude = join(cols, ", ")
include = join(map(c -> "t2.$c", cols), ", ")
select_ = "SELECT t1.* EXCLUDE ($exclude), $include"

join_on = join(map(c -> "t1.$c = t2.$c", on), " AND ")
from_ = "FROM $subquery1 t1 LEFT JOIN $subquery2 t2 ON ($join_on)"

"$(select_)\n$(from_)"
function check_tbl(con::DB, source::String)
res = DBInterface.execute(con, "SHOW TABLES")
@show res
tbls = res.tbl[:name]
source in tbls
end

function check_file(source::String)
# FIXME: handle globs
isfile(source) || throw(ArgumentError("$(source): is not a regular file"))
source
function fmt_source(con::DB, source::String)
if check_tbl(con, source)
return source
elseif check_file(source)
return fmt_read(source; _read_opts...)
else
throw(NeitherTableNorFileError(con, source))
end
end

## User facing functions below

# TODO: prepared statements; not used for now
struct Store
con::DB
read_csv::Stmt

function Store(store::String)
con = DBInterface.connect(DB, store)
query = fmt_query("(?)"; header = true, skip = 1)
query = fmt_select("(?)"; header = true, skip = 1)
stmt = DBInterface.prepare(con, query)
new(con, stmt)
end
end

Store() = Store(":memory:")

function read_csv(con::DB, source::String)
check_file(source)
query = fmt_query("(?)"; header = true, skip = 1)
res = DBInterface.execute(con, query, [source])
return DF.DataFrame(res)
end

function read_csv_alt_cols(
con::DB,
source1::String,
source2::String;
on::Vector{String},
cols::Vector{String},
)
check_file(source1)
check_file(source2)
subquery = sprintf("read_csv_auto(%s)", fmt_opts("(?)"; header = true, skip = 1))
query = fmt_join(subquery, subquery; on = on, cols = cols)
res = DBInterface.execute(con, query, [source1, source2])
return DF.DataFrame(res)
end
DEFAULT = Store()

function tmp_tbl_name(source::String)
name, _ = replace(splitext(basename(source)), r"[ ()\[\]{}\\+,.]+" => "_")
"t_$(name)"
end

function create_tbl(con::DB, name::String, source::String; tmp::Bool = false)
check_file(source)
query = fmt_query(source; header = true, skip = 1)
TEMP = tmp ? "TEMP" : ""
DBInterface.execute(con, "CREATE $TEMP TABLE $name AS $query")
# TODO: support "CREATE OR REPLACE" & "IF NOT EXISTS" for all create_* functions

function _create_tbl_impl(con::DB, query::String; name::String, tmp::Bool, show::Bool)
if (length(name) == 0) && !show
tmp = true
name = tmp_tbl_name(source)
end

if length(name) > 0
DBInterface.execute(con, "CREATE $(tmp ? "TEMP" : "") TABLE $name AS $query")
return show ? DF.DataFrame(DBInterface.execute(con, "SELECT * FROM $name")) : name
else # only show
res = DBInterface.execute(con, query)
return DF.DataFrame(res)
end
end

function create_alt_tbl(
function create_tbl(
con::DB,
ref::String,
alt::String,
source::String;
name::String = "",
tmp::Bool = false,
show::Bool = false,
)
check_file(source) ? true : throw(FileNotFoundError(source))
query = fmt_select(source; _read_opts...)

return _create_tbl_impl(con, query; name = name, tmp = tmp, show = show)
end

function create_tbl(
con::DB,
base_source::String,
alt_source::String;
variant::String = "",
on::Vector{String},
cols::Vector{String},
fill::Union{Bool,Vector::Any} = true,
tmp::Bool = false,
show::Bool = false,
)
check_file(source)
TEMP = tmp ? "TEMP" : ""
# TODO: support "CREATE OR REPLACE" & "IF NOT EXISTS"
create_ = "CREATE $TEMP TABLE $alt AS"

subquery = sprintf("%s(%s)", reader(source), fmt_opts(source; header = true, skip = 1))
query = fmt_join(ref, subquery; on = on, cols = cols)
sources = [fmt_source(con, src) for src in (base_source, alt_source)]
query = fmt_join(sources...; on = on, cols = cols, fill = fill)

DBInterface.execute(con, "$(create_)\n$(query)")
return _create_tbl_impl(con, query; name = variant, tmp = tmp, show = show)
end

# TODO:
Expand Down
18 changes: 12 additions & 6 deletions test/test-pipeline.jl
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,18 @@ end
con = DBInterface.connect(DB)

df_org = DF.DataFrame(CSV.File(csv_path; header = 2))
df_res = TulipaIO.read_csv(con, csv_path)
df_res = TulipaIO.create_tbl(con, csv_path; show = true)
@test shape(df_org) == shape(df_res)

csv_copy = replace(csv_path, "data.csv" => "data-copy.csv")
df_res =
TulipaIO.read_csv_alt_cols(con, csv_path, csv_copy; on = ["name"], cols = ["investable"])
df_res = TulipaIO.create_tbl(
con,
csv_path,
csv_copy;
on = ["name"],
cols = ["investable"],
show = true,
)
df_exp = DF.DataFrame(CSV.File(csv_copy; header = 2))
@test df_exp.investable == df_res.investable
@test df_org.investable != df_res.investable
Expand All @@ -27,16 +33,16 @@ end
con = DBInterface.connect(DB)

df_org = DF.DataFrame(CSV.File(csv_path; header = 2))
TulipaIO.create_tbl(con, "no_assets", csv_path)
TulipaIO.create_tbl(con, csv_path; name = "no_assets")
df_res = DF.DataFrame(DBInterface.execute(con, "SELECT * FROM no_assets"))
@test shape(df_org) == shape(df_res)

csv_copy = replace(csv_path, "data.csv" => "data-copy.csv")
TulipaIO.create_alt_tbl(
TulipaIO.create_tbl(
con,
"no_assets",
"alt_assets",
csv_copy;
variant = "alt_assets",
on = ["name"],
cols = ["investable"],
)
Expand Down

0 comments on commit cf460b0

Please sign in to comment.