Skip to content

Commit

Permalink
Calculate tracer concentrations internally (#1849)
Browse files Browse the repository at this point in the history
Fixes #1105

This uses the intermediate flows from `update_cumulative_flows` to
update the mass balance in all basins using straightforward
addition/subtraction for each solver timestep. It is a shortcut, assuming piecewise constant concentrations over a timestep, that seems to work fine, but is not a good fit for very large timesteps.
  • Loading branch information
evetion authored Oct 16, 2024
1 parent 540174f commit d3a2b37
Show file tree
Hide file tree
Showing 20 changed files with 678 additions and 47 deletions.
3 changes: 3 additions & 0 deletions core/src/Ribasim.jl
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ using Tables: Tables, AbstractRow, columntable
# Wrapper around a vector of structs to easily retrieve the same field from all elements.
using StructArrays: StructVector

# OrderedSet is used to store the order of the substances in the network.
using DataStructures: OrderedSet

export libribasim

include("schema.jl")
Expand Down
235 changes: 230 additions & 5 deletions core/src/callback.jl
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,14 @@ function create_callbacks(
u0::ComponentVector,
saveat,
)::Tuple{CallbackSet, SavedResults}
(; starttime, basin, tabulated_rating_curve) = parameters
(;
starttime,
basin,
flow_boundary,
level_boundary,
user_demand,
tabulated_rating_curve,
) = parameters
callbacks = SciMLBase.DECallback[]

# Check for negative storage
Expand All @@ -32,6 +39,18 @@ function create_callbacks(
basin_cb = PresetTimeCallback(tstops, update_basin!; save_positions = (false, false))
push!(callbacks, basin_cb)

# Update boundary concentrations
for (boundary, func) in (
(basin, update_basin_conc!),
(flow_boundary, update_flowb_conc!),
(level_boundary, update_levelb_conc!),
(user_demand, update_userd_conc!),
)
tstops = get_tstops(boundary.concentration_time.time, starttime)
conc_cb = PresetTimeCallback(tstops, func; save_positions = (false, false))
push!(callbacks, conc_cb)
end

# Update TabulatedRatingCurve Q(h) relationships
tstops = get_tstops(tabulated_rating_curve.time.time, starttime)
tabulated_rating_curve_cb = PresetTimeCallback(
Expand Down Expand Up @@ -88,18 +107,36 @@ Update with the latest timestep:
- Cumulative flows/forcings which are input for the allocation algorithm
- Cumulative flows/forcings which are realized demands in the allocation context
During these cumulative flow updates, we can also update the mass balance of the system,
as each flow carries mass, based on the concentrations of the flow source.
Specifically, we first use all the inflows to update the mass of the basins, recalculate
the basin concentration(s) and then remove the mass that is being lost to the outflows.
"""
function update_cumulative_flows!(u, t, integrator)::Nothing
(; p, tprev, dt) = integrator
(; basin, flow_boundary, allocation) = p
(; p, uprev, tprev, dt) = integrator
(;
basin,
state_inflow_edge,
state_outflow_edge,
user_demand,
level_boundary,
flow_boundary,
allocation,
) = p
(; vertical_flux) = basin

# Update tprev
p.tprev[] = t

# Reset cumulative flows, used to calculate the concentration
# of the basins after processing inflows only
fill!(basin.cumulative_in, 0.0)

# Update cumulative forcings which are integrated exactly
@. basin.cumulative_drainage += vertical_flux.drainage * dt
@. basin.cumulative_drainage_saveat += vertical_flux.drainage * dt
basin.mass .+= basin.concentration[1, :, :] .* vertical_flux.drainage * dt
basin.cumulative_in .= vertical_flux.drainage * dt

# Precipitation depends on fixed area
for node_id in basin.node_id
Expand All @@ -108,15 +145,26 @@ function update_cumulative_flows!(u, t, integrator)::Nothing

basin.cumulative_precipitation[node_id.idx] += added_precipitation
basin.cumulative_precipitation_saveat[node_id.idx] += added_precipitation
basin.mass[node_id.idx, :] .+=
basin.concentration[2, node_id.idx, :] .* added_precipitation
basin.cumulative_in[node_id.idx] += added_precipitation
end

# Exact boundary flow over time step
for (id, flow_rate, active) in
zip(flow_boundary.node_id, flow_boundary.flow_rate, flow_boundary.active)
for (id, flow_rate, active, edge) in zip(
flow_boundary.node_id,
flow_boundary.flow_rate,
flow_boundary.active,
flow_boundary.outflow_edges,
)
if active
outflow_id = edge[1].edge[2]
volume = integral(flow_rate, tprev, t)
flow_boundary.cumulative_flow[id.idx] += volume
flow_boundary.cumulative_flow_saveat[id.idx] += volume
basin.mass[outflow_id.idx, :] .+=
flow_boundary.concentration[id.idx, :] .* volume
basin.cumulative_in[outflow_id.idx] += volume
end
end

Expand All @@ -141,6 +189,132 @@ function update_cumulative_flows!(u, t, integrator)::Nothing
end
end
end

# Process mass updates for UserDemand separately
# as the inflow and outflow are decoupled in the states
for (inflow_edge, outflow_edge) in
zip(user_demand.inflow_edge, user_demand.outflow_edge)
from_node = inflow_edge.edge[1]
to_node = outflow_edge.edge[2]
userdemand_idx = outflow_edge.edge[1].idx
if from_node.type == NodeType.Basin
flow = flow_update_on_edge(integrator, inflow_edge.edge)
if flow < 0
basin.mass[from_node.idx, :] .-=
basin.concentration_state[to_node.idx, :] .* flow
basin.mass[from_node.idx, :] .-=
user_demand.concentration[userdemand_idx, :] .* flow
end
end
if to_node.type == NodeType.Basin
flow = flow_update_on_edge(integrator, outflow_edge.edge)
if flow > 0
basin.mass[to_node.idx, :] .+=
basin.concentration_state[from_node.idx, :] .* flow
basin.mass[to_node.idx, :] .+=
user_demand.concentration[userdemand_idx, :] .* flow
end
end
end

# Process all mass inflows to basins
for (inflow_edge, outflow_edge) in zip(state_inflow_edge, state_outflow_edge)
from_node = inflow_edge.edge[1]
to_node = outflow_edge.edge[2]
if from_node.type == NodeType.Basin
flow = flow_update_on_edge(integrator, inflow_edge.edge)
if flow < 0
basin.cumulative_in[from_node.idx] -= flow
if to_node.type == NodeType.Basin
basin.mass[from_node.idx, :] .-=
basin.concentration_state[to_node.idx, :] .* flow
elseif to_node.type == NodeType.LevelBoundary
basin.mass[from_node.idx, :] .-=
level_boundary.concentration[to_node.idx, :] .* flow
elseif to_node.type == NodeType.UserDemand
basin.mass[from_node.idx, :] .-=
user_demand.concentration[to_node.idx, :] .* flow
else
@warn "Unsupported outflow from $(to_node.type) #$(to_node.value) to $(from_node.type) #$(from_node.value) with flow $flow"
end
end
end

if to_node.type == NodeType.Basin
flow = flow_update_on_edge(integrator, outflow_edge.edge)
if flow > 0
basin.cumulative_in[to_node.idx] += flow
if from_node.type == NodeType.Basin
basin.mass[to_node.idx, :] .+=
basin.concentration_state[from_node.idx, :] .* flow
elseif from_node.type == NodeType.LevelBoundary
basin.mass[to_node.idx, :] .+=
level_boundary.concentration[from_node.idx, :] .* flow
elseif from_node.type == NodeType.UserDemand
basin.mass[to_node.idx, :] .+=
user_demand.concentration[from_node.idx, :] .* flow
elseif from_node.type == NodeType.Terminal && from_node.value == 0
# UserDemand outflow is discoupled from its inflow,
# and the unset flow edge defaults to Terminal #0
nothing
else
@warn "Unsupported outflow from $(from_node.type) #$(from_node.value) to $(to_node.type) #$(to_node.value) with flow $flow"
end
end
end
end

# Update the basin concentrations based on the added mass and flows
basin.concentration_state .= basin.mass ./ (basin.storage_prev .+ basin.cumulative_in)

# Process all mass outflows from basins
for (inflow_edge, outflow_edge) in zip(state_inflow_edge, state_outflow_edge)
from_node = inflow_edge.edge[1]
to_node = outflow_edge.edge[2]
if from_node.type == NodeType.Basin
flow = flow_update_on_edge(integrator, inflow_edge.edge)
if flow > 0
basin.mass[from_node.idx, :] .-=
basin.concentration_state[from_node.idx, :] .* flow
end
end
if to_node.type == NodeType.Basin
flow = flow_update_on_edge(integrator, outflow_edge.edge)
if flow < 0
basin.mass[to_node.idx, :] .+=
basin.concentration_state[to_node.idx, :] .* flow
end
end
end

# Evaporate mass to keep the mass balance, if enabled in model config
if basin.evaporate_mass
basin.mass .-= basin.concentration_state .* (u.evaporation - uprev.evaporation)
end
basin.mass .-= basin.concentration_state .* (u.infiltration - uprev.infiltration)

# Take care of infinitely small masses, possibly becoming negative due to truncation.
for I in eachindex(basin.mass)
if (-eps(Float64)) < basin.mass[I] < (eps(Float64))
basin.mass[I] = 0.0
end
end

# Check for negative masses
if any(<(0), basin.mass)
R = CartesianIndices(basin.mass)
locations = findall(<(0), basin.mass)
for I in locations
basin_idx, substance_idx = Tuple(R[I])
@error "$(basin.node_id[basin_idx]) has negative mass $(basin.mass[I]) for substance $(basin.substances[substance_idx])"
end
error("Negative mass(es) detected")
end

# Update the basin concentrations again based on the removed mass
basin.concentration_state .= basin.mass ./ basin.current_storage[parent(u)]
basin.storage_prev .= basin.current_storage[parent(u)]

return nothing
end

Expand Down Expand Up @@ -247,13 +421,15 @@ function save_flow(u, t, integrator)
@. basin.cumulative_precipitation_saveat = 0.0
@. basin.cumulative_drainage_saveat = 0.0

concentration = copy(basin.concentration_state)
saved_flow = SavedFlow(;
flow = flow_mean,
inflow = inflow_mean,
outflow = outflow_mean,
flow_boundary = flow_boundary_mean,
precipitation,
drainage,
concentration,
t,
)
check_water_balance_error!(saved_flow, integrator, Δt)
Expand Down Expand Up @@ -500,6 +676,10 @@ function get_value(subvariable::NamedTuple, p::Parameters, du::AbstractVector, t

elseif startswith(variable, "concentration_external.")
value = basin.concentration_external[listen_node_id.idx][variable](t)
elseif startswith(variable, "concentration.")
substance = Symbol(last(split(variable, ".")))
var_idx = findfirst(==(substance), basin.substances)
value = basin.concentration_state[listen_node_id.idx, var_idx]
else
error("Unsupported condition variable $variable.")
end
Expand Down Expand Up @@ -590,6 +770,51 @@ function update_basin!(integrator)::Nothing
return nothing
end

"Load updates from 'Basin / concentration' into the parameters"
function update_basin_conc!(integrator)::Nothing
(; p) = integrator
(; basin) = p
(; node_id, concentration, concentration_time, substances) = basin
t = datetime_since(integrator.t, integrator.p.starttime)

rows = searchsorted(concentration_time.time, t)
timeblock = view(concentration_time, rows)

for row in timeblock
i = searchsortedfirst(node_id, NodeID(NodeType.Basin, row.node_id, 0))
j = findfirst(==(Symbol(row.substance)), substances)
ismissing(row.drainage) || (concentration[1, i, j] = row.drainage)
ismissing(row.precipitation) || (concentration[2, i, j] = row.precipitation)
end
return nothing
end

"Load updates from 'concentration' tables into the parameters"
function update_conc!(integrator, parameter, nodetype)::Nothing
(; p) = integrator
node = getproperty(p, parameter)
(; basin) = p
(; node_id, concentration, concentration_time) = node
(; substances) = basin
t = datetime_since(integrator.t, integrator.p.starttime)

rows = searchsorted(concentration_time.time, t)
timeblock = view(concentration_time, rows)

for row in timeblock
i = searchsortedfirst(node_id, NodeID(nodetype, row.node_id, 0))
j = findfirst(==(Symbol(row.substance)), substances)
ismissing(row.concentration) || (concentration[i, j] = row.concentration)
end
return nothing
end
update_flowb_conc!(integrator)::Nothing =
update_conc!(integrator, :flow_boundary, NodeType.FlowBoundary)
update_levelb_conc!(integrator)::Nothing =
update_conc!(integrator, :level_boundary, NodeType.LevelBoundary)
update_userd_conc!(integrator)::Nothing =
update_conc!(integrator, :user_demand, NodeType.UserDemand)

"Solve the allocation problem for all demands and assign allocated abstractions."
function update_allocation!(integrator)::Nothing
(; p, t, u) = integrator
Expand Down
1 change: 1 addition & 0 deletions core/src/config.jl
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ const nodetypes = collect(keys(nodekinds))
maxiters::Int = 1e9
sparse::Bool = true
autodiff::Bool = false
evaporate_mass::Bool = true
end

# Separate struct, as basin clashes with nodetype
Expand Down
2 changes: 1 addition & 1 deletion core/src/main.jl
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ function main(toml_path::AbstractString)::Cint
if config.ribasim_version != cli.ribasim_version
@warn "The Ribasim version in the TOML config file does not match the used Ribasim CLI version." config.ribasim_version cli.ribasim_version
end
@info "Starting a Ribasim simulation." cli.ribasim_version starttime endtime
@info "Starting a Ribasim simulation." toml_path cli.ribasim_version starttime endtime

try
model = run(config)
Expand Down
Loading

0 comments on commit d3a2b37

Please sign in to comment.