Skip to content

Commit

Permalink
Use pyarrow dtype_backend (#1781)
Browse files Browse the repository at this point in the history
Fixes #1721
Fixes #1394

The `dtype_backend` part of this is not breaking. The only part that is
technically breaking is that we specify a unit of milliseconds for the
Arrow time type. Previously we used the default nanosecond precision,
which was then truncated to milliseconds in the core. I think it is
better to disallow precision higher than milliseconds if we cannot
distinguish them in the core.
  • Loading branch information
visr authored Sep 26, 2024
1 parent c9948a2 commit f5bfb50
Show file tree
Hide file tree
Showing 25 changed files with 674 additions and 264 deletions.
12 changes: 6 additions & 6 deletions core/src/schema.jl
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ end
demand::Union{Missing, Float64}
return_factor::Float64
min_level::Float64
priority::Int32
priority::Union{Missing, Int32}
end

@version UserDemandTimeV1 begin
Expand All @@ -295,33 +295,33 @@ end
demand::Float64
return_factor::Float64
min_level::Float64
priority::Int32
priority::Union{Missing, Int32}
end

@version LevelDemandStaticV1 begin
node_id::Int32
min_level::Union{Missing, Float64}
max_level::Union{Missing, Float64}
priority::Int32
priority::Union{Missing, Int32}
end

@version LevelDemandTimeV1 begin
node_id::Int32
time::DateTime
min_level::Union{Missing, Float64}
max_level::Union{Missing, Float64}
priority::Int32
priority::Union{Missing, Int32}
end

@version FlowDemandStaticV1 begin
node_id::Int
demand::Float64
priority::Int32
priority::Union{Missing, Int32}
end

@version FlowDemandTimeV1 begin
node_id::Int
time::DateTime
demand::Float64
priority::Int32
priority::Union{Missing, Int32}
end
9 changes: 4 additions & 5 deletions core/src/util.jl
Original file line number Diff line number Diff line change
Expand Up @@ -446,11 +446,10 @@ function get_all_priorities(db::DB, config::Config)::Vector{Int32}
(FlowDemandStaticV1, "FlowDemand / static"),
(FlowDemandTimeV1, "FlowDemand / time"),
]
if valid_priorities(
load_structvector(db, config, type).priority,
config.allocation.use_allocation,
)
union!(priorities, load_structvector(db, config, type).priority)
priority_col = load_structvector(db, config, type).priority
priority_col = Int32.(coalesce.(priority_col, Int32(0)))
if valid_priorities(priority_col, config.allocation.use_allocation)
union!(priorities, priority_col)
else
is_valid = false
@error "Missing priority parameter(s) for a $name node in the allocation problem."
Expand Down
10 changes: 5 additions & 5 deletions core/src/validation.jl
Original file line number Diff line number Diff line change
Expand Up @@ -673,10 +673,10 @@ function valid_sources(
return !errors
end

function valid_priorities(priorities::Vector, use_allocation::Bool)::Bool
errors = false
if 0 in priorities && use_allocation
errors = true
function valid_priorities(priorities::Vector{Int32}, use_allocation::Bool)::Bool
if use_allocation && any(iszero, priorities)
return false
else
return true
end
return !errors
end
4 changes: 1 addition & 3 deletions core/test/validation_test.jl
Original file line number Diff line number Diff line change
Expand Up @@ -460,11 +460,9 @@ end
normpath(@__DIR__, "../../generated_testmodels/invalid_priorities/ribasim.toml")
@test ispath(toml_path)

config = Ribasim.Config(toml_path; allocation_use_allocation = true)

logger = TestLogger()
with_logger(logger) do
@test_throws "Priority parameter is missing" Ribasim.run(config)
@test_throws "Priority parameter is missing" Ribasim.run(toml_path)
end
@test length(logger.logs) == 3
@test logger.logs[1].level == Error
Expand Down
38 changes: 27 additions & 11 deletions docs/guide/examples.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -414,7 +414,9 @@
"metadata": {},
"outputs": [],
"source": [
"df_basin = pd.read_feather(datadir / \"basic/results/basin.arrow\")\n",
"df_basin = pd.read_feather(\n",
" datadir / \"basic/results/basin.arrow\", dtype_backend=\"pyarrow\"\n",
")\n",
"df_basin_wide = df_basin.pivot_table(\n",
" index=\"time\", columns=\"node_id\", values=[\"storage\", \"level\"]\n",
")\n",
Expand All @@ -428,7 +430,7 @@
"metadata": {},
"outputs": [],
"source": [
"df_flow = pd.read_feather(datadir / \"basic/results/flow.arrow\")\n",
"df_flow = pd.read_feather(datadir / \"basic/results/flow.arrow\", dtype_backend=\"pyarrow\")\n",
"df_flow[\"edge\"] = list(zip(df_flow.from_node_id, df_flow.to_node_id))\n",
"df_flow[\"flow_m3d\"] = df_flow.flow_rate * 86400\n",
"ax = df_flow.pivot_table(index=\"time\", columns=\"edge\", values=\"flow_m3d\").plot()\n",
Expand Down Expand Up @@ -709,7 +711,9 @@
"metadata": {},
"outputs": [],
"source": [
"df_basin = pd.read_feather(datadir / \"level_range/results/basin.arrow\")\n",
"df_basin = pd.read_feather(\n",
" datadir / \"level_range/results/basin.arrow\", dtype_backend=\"pyarrow\"\n",
")\n",
"df_basin_wide = df_basin.pivot_table(\n",
" index=\"time\", columns=\"node_id\", values=[\"storage\", \"level\"]\n",
")\n",
Expand Down Expand Up @@ -991,7 +995,9 @@
"source": [
"from matplotlib.dates import date2num\n",
"\n",
"df_basin = pd.read_feather(datadir / \"pid_control/results/basin.arrow\")\n",
"df_basin = pd.read_feather(\n",
" datadir / \"pid_control/results/basin.arrow\", dtype_backend=\"pyarrow\"\n",
")\n",
"df_basin_wide = df_basin.pivot_table(\n",
" index=\"time\", columns=\"node_id\", values=[\"storage\", \"level\"]\n",
")\n",
Expand Down Expand Up @@ -1272,7 +1278,9 @@
"source": [
"import matplotlib.ticker as plticker\n",
"\n",
"df_allocation = pd.read_feather(datadir / \"allocation_example/results/allocation.arrow\")\n",
"df_allocation = pd.read_feather(\n",
" datadir / \"allocation_example/results/allocation.arrow\", dtype_backend=\"pyarrow\"\n",
")\n",
"df_allocation_wide = df_allocation.pivot_table(\n",
" index=\"time\",\n",
" columns=[\"node_type\", \"node_id\", \"priority\"],\n",
Expand Down Expand Up @@ -1318,7 +1326,9 @@
"metadata": {},
"outputs": [],
"source": [
"df_basin = pd.read_feather(datadir / \"allocation_example/results/basin.arrow\")\n",
"df_basin = pd.read_feather(\n",
" datadir / \"allocation_example/results/basin.arrow\", dtype_backend=\"pyarrow\"\n",
")\n",
"df_basin_wide = df_basin.pivot_table(\n",
" index=\"time\", columns=\"node_id\", values=[\"storage\", \"level\"]\n",
")\n",
Expand Down Expand Up @@ -1557,7 +1567,9 @@
"metadata": {},
"outputs": [],
"source": [
"df_basin = pd.read_feather(datadir / \"level_demand/results/basin.arrow\")\n",
"df_basin = pd.read_feather(\n",
" datadir / \"level_demand/results/basin.arrow\", dtype_backend=\"pyarrow\"\n",
")\n",
"df_basin = df_basin[df_basin.node_id == 2]\n",
"df_basin_wide = df_basin.pivot_table(\n",
" index=\"time\", columns=\"node_id\", values=[\"storage\", \"level\"]\n",
Expand Down Expand Up @@ -1953,7 +1965,7 @@
"outputs": [],
"source": [
"datadir_flow = datadir / \"local_pidcontrolled_cascade/results/flow.arrow\"\n",
"df_flow = pd.read_feather(datadir_flow)\n",
"df_flow = pd.read_feather(datadir_flow, dtype_backend=\"pyarrow\")\n",
"df_flow[\"edge\"] = list(zip(df_flow.from_node_id, df_flow.to_node_id))\n",
"df_flow[\"flow_m3d\"] = df_flow.flow_rate * 86400\n",
"\n",
Expand Down Expand Up @@ -1995,7 +2007,7 @@
"outputs": [],
"source": [
"datadir_basin = datadir / \"local_pidcontrolled_cascade/results/basin.arrow\"\n",
"df_basin = pd.read_feather(datadir_basin)\n",
"df_basin = pd.read_feather(datadir_basin, dtype_backend=\"pyarrow\")\n",
"df_basin[\"vertical_flux\"] = (\n",
" df_basin[\"precipitation\"]\n",
" - df_basin[\"evaporation\"]\n",
Expand Down Expand Up @@ -2060,7 +2072,9 @@
" Node(1, Point(0, 0)),\n",
" [\n",
" level_boundary.Time(\n",
" time=pd.date_range(start=\"2020-01-01\", end=\"2021-01-01\", periods=100),\n",
" time=pd.date_range(\n",
" start=\"2020-01-01\", end=\"2021-01-01\", periods=100, unit=\"ms\"\n",
" ),\n",
" level=6.0 + np.sin(np.linspace(0, 6 * np.pi, 100)),\n",
" )\n",
" ],\n",
Expand Down Expand Up @@ -2286,7 +2300,9 @@
"metadata": {},
"outputs": [],
"source": [
"df_flow = pd.read_feather(datadir / \"outlet_continuous_control/results/flow.arrow\")\n",
"df_flow = pd.read_feather(\n",
" datadir / \"outlet_continuous_control/results/flow.arrow\", dtype_backend=\"pyarrow\"\n",
")\n",
"fig, ax = plt.subplots()\n",
"\n",
"\n",
Expand Down
8 changes: 6 additions & 2 deletions docs/tutorial/irrigation-demand.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,9 @@
"metadata": {},
"outputs": [],
"source": [
"df_basin = pd.read_feather(base_dir / \"Crystal-2/results/basin.arrow\")\n",
"df_basin = pd.read_feather(\n",
" base_dir / \"Crystal-2/results/basin.arrow\", dtype_backend=\"pyarrow\"\n",
")\n",
"\n",
"# Create pivot tables and plot for basin data\n",
"df_basin_wide = df_basin.pivot_table(\n",
Expand Down Expand Up @@ -393,7 +395,9 @@
"metadata": {},
"outputs": [],
"source": [
"df_flow = pd.read_feather(base_dir / \"Crystal-2/results/flow.arrow\")\n",
"df_flow = pd.read_feather(\n",
" base_dir / \"Crystal-2/results/flow.arrow\", dtype_backend=\"pyarrow\"\n",
")\n",
"# Add the edge names and then remove unnamed edges\n",
"df_flow[\"name\"] = model.edge.df[\"name\"].loc[df_flow[\"edge_id\"]].to_numpy()\n",
"df_flow = df_flow[df_flow[\"name\"].astype(bool)]\n",
Expand Down
8 changes: 6 additions & 2 deletions docs/tutorial/natural-flow.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,9 @@
"metadata": {},
"outputs": [],
"source": [
"df_basin = pd.read_feather(base_dir / \"Crystal-1/results/basin.arrow\")\n",
"df_basin = pd.read_feather(\n",
" base_dir / \"Crystal-1/results/basin.arrow\", dtype_backend=\"pyarrow\"\n",
")\n",
"\n",
"# Create pivot tables and plot for Basin data\n",
"df_basin_wide = df_basin.pivot_table(\n",
Expand Down Expand Up @@ -453,7 +455,9 @@
"source": [
"# Plot flow data\n",
"# Read the flow results\n",
"df_flow = pd.read_feather(base_dir / \"Crystal-1/results/flow.arrow\")\n",
"df_flow = pd.read_feather(\n",
" base_dir / \"Crystal-1/results/flow.arrow\", dtype_backend=\"pyarrow\"\n",
")\n",
"# Add the edge names and then remove unnamed edges\n",
"df_flow[\"name\"] = model.edge.df[\"name\"].loc[df_flow[\"edge_id\"]].to_numpy()\n",
"df_flow = df_flow[df_flow[\"name\"].astype(bool)]\n",
Expand Down
8 changes: 6 additions & 2 deletions docs/tutorial/reservoir.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,9 @@
"metadata": {},
"outputs": [],
"source": [
"df_basin = pd.read_feather(base_dir / \"Crystal-3/results/basin.arrow\")\n",
"df_basin = pd.read_feather(\n",
" base_dir / \"Crystal-3/results/basin.arrow\", dtype_backend=\"pyarrow\"\n",
")\n",
"\n",
"# Create pivot tables and plot for Basin data\n",
"df_basin_wide = df_basin.pivot_table(\n",
Expand Down Expand Up @@ -329,7 +331,9 @@
"metadata": {},
"outputs": [],
"source": [
"df_flow = pd.read_feather(base_dir / \"Crystal-3/results/flow.arrow\")\n",
"df_flow = pd.read_feather(\n",
" base_dir / \"Crystal-3/results/flow.arrow\", dtype_backend=\"pyarrow\"\n",
")\n",
"# Add the edge names and then remove unnamed edges\n",
"df_flow[\"name\"] = model.edge.df[\"name\"].loc[df_flow[\"edge_id\"]].to_numpy()\n",
"df_flow = df_flow[df_flow[\"name\"].astype(bool)]\n",
Expand Down
2 changes: 2 additions & 0 deletions pixi.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ test-ribasim-regression = { cmd = "julia --project=core --eval 'using Pkg; Pkg.t
generate-testmodels = { cmd = "python utils/generate-testmodels.py", inputs = [
"python/ribasim",
"python/ribasim_testmodels",
"utils/generate-testmodels.py",
], outputs = [
"generated_testmodels",
] }
Expand All @@ -99,6 +100,7 @@ codegen = { cmd = "julia --project utils/gen_python.jl && ruff format python/rib
"initialize-julia",
], inputs = [
"core",
"utils",
], outputs = [
"python/ribasim/ribasim/schemas.py",
"python/ribasim/ribasim/validation.py",
Expand Down
13 changes: 6 additions & 7 deletions python/ribasim/ribasim/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@
UserDemandStaticSchema,
UserDemandTimeSchema,
)
from ribasim.utils import _pascal_to_snake
from ribasim.utils import _concat, _pascal_to_snake


class Allocation(ChildModel):
Expand Down Expand Up @@ -242,11 +242,10 @@ def add(
)
assert table.df is not None
table_to_append = table.df.assign(node_id=node_id)
setattr(
self,
member_name,
pd.concat([existing_table, table_to_append], ignore_index=True),
)
if isinstance(table_to_append, GeoDataFrame):
table_to_append.set_crs(self._parent.crs, inplace=True)
new_table = _concat([existing_table, table_to_append], ignore_index=True)
setattr(self, member_name, new_table)

node_table = node.into_geodataframe(
node_type=self.__class__.__name__, node_id=node_id
Expand All @@ -255,7 +254,7 @@ def add(
if self.node.df is None:
self.node.df = node_table
else:
df = pd.concat([self.node.df, node_table])
df = _concat([self.node.df, node_table])
self.node.df = df

self._parent._used_node_ids.add(node_id)
Expand Down
25 changes: 15 additions & 10 deletions python/ribasim/ribasim/delwaq/generate.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from datetime import timedelta
from pathlib import Path

from ribasim.utils import MissingOptionalModule
from ribasim.utils import MissingOptionalModule, _concat

try:
import networkx as nx
Expand Down Expand Up @@ -70,17 +70,18 @@ def _make_boundary(data, boundary_type):
"""
bid = _boundary_name(data.node_id.iloc[0], boundary_type)
piv = (
data.pivot_table(index="time", columns="substance", values="concentration")
data.pivot_table(
index="time", columns="substance", values="concentration", fill_value=-999
)
.reset_index()
.reset_index(drop=True)
)
piv.time = piv.time.dt.strftime("%Y/%m/%d-%H:%M:%S")
# Convert Arrow time to Numpy to avoid needing tzdata somehow
piv.time = piv.time.astype("datetime64[ns]").dt.strftime("%Y/%m/%d-%H:%M:%S")
boundary = {
"name": bid,
"substances": list(map(_quote, piv.columns[1:])),
"df": piv.to_string(
formatters={"time": _quote}, header=False, index=False, na_rep=-999
),
"df": piv.to_string(formatters={"time": _quote}, header=False, index=False),
}
substances = data.substance.unique()
return boundary, substances
Expand Down Expand Up @@ -181,7 +182,7 @@ def _setup_graph(nodes, edge, use_evaporation=True):
boundary_id -= 1
node_mapping[node_id] = boundary_id
else:
raise Exception("Found unexpected node $node_id in delwaq graph.")
raise Exception(f"Found unexpected node {node_id} in delwaq graph.")

nx.relabel_nodes(G, node_mapping, copy=False)

Expand Down Expand Up @@ -281,8 +282,12 @@ def generate(

# Read in model and results
model = ribasim.Model.read(toml_path)
basins = pd.read_feather(toml_path.parent / "results" / "basin.arrow")
flows = pd.read_feather(toml_path.parent / "results" / "flow.arrow")
basins = pd.read_feather(
toml_path.parent / "results" / "basin.arrow", dtype_backend="pyarrow"
)
flows = pd.read_feather(
toml_path.parent / "results" / "flow.arrow", dtype_backend="pyarrow"
)

output_folder.mkdir(exist_ok=True)

Expand Down Expand Up @@ -359,7 +364,7 @@ def generate(
columns={boundary_type: "flow_rate"}
)
df["edge_id"] = edge_id
nflows = pd.concat([nflows, df], ignore_index=True)
nflows = _concat([nflows, df], ignore_index=True)

# Save flows to Delwaq format
nflows.sort_values(by=["time", "edge_id"], inplace=True)
Expand Down
Loading

0 comments on commit f5bfb50

Please sign in to comment.