Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Calculate tracer concentrations internally #1849

Merged
merged 22 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions core/src/Ribasim.jl
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ using Tables: Tables, AbstractRow, columntable
# Wrapper around a vector of structs to easily retrieve the same field from all elements.
using StructArrays: StructVector

# OrderedSet is used to store the order of the substances in the network.
using DataStructures: OrderedSet

export libribasim

include("schema.jl")
Expand Down
235 changes: 230 additions & 5 deletions core/src/callback.jl
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,14 @@
u0::ComponentVector,
saveat,
)::Tuple{CallbackSet, SavedResults}
(; starttime, basin, tabulated_rating_curve) = parameters
(;
starttime,
basin,
flow_boundary,
level_boundary,
user_demand,
tabulated_rating_curve,
) = parameters
callbacks = SciMLBase.DECallback[]

# Check for negative storage
Expand All @@ -32,6 +39,18 @@
basin_cb = PresetTimeCallback(tstops, update_basin!; save_positions = (false, false))
push!(callbacks, basin_cb)

# Update boundary concentrations
for (boundary, func) in (
(basin, update_basin_conc!),
(flow_boundary, update_flowb_conc!),
(level_boundary, update_levelb_conc!),
(user_demand, update_userd_conc!),
)
tstops = get_tstops(boundary.concentration_time.time, starttime)
conc_cb = PresetTimeCallback(tstops, func; save_positions = (false, false))
push!(callbacks, conc_cb)
end

# Update TabulatedRatingCurve Q(h) relationships
tstops = get_tstops(tabulated_rating_curve.time.time, starttime)
tabulated_rating_curve_cb = PresetTimeCallback(
Expand Down Expand Up @@ -88,18 +107,36 @@
- Cumulative flows/forcings which are input for the allocation algorithm
- Cumulative flows/forcings which are realized demands in the allocation context

During these cumulative flow updates, we can also update the mass balance of the system,
as each flow carries mass, based on the concentrations of the flow source.
Specifically, we first use all the inflows to update the mass of the basins, recalculate
the basin concentration(s) and then remove the mass that is being lost to the outflows.
"""
function update_cumulative_flows!(u, t, integrator)::Nothing
(; p, tprev, dt) = integrator
(; basin, flow_boundary, allocation) = p
(; p, uprev, tprev, dt) = integrator
(;
basin,
state_inflow_edge,
state_outflow_edge,
user_demand,
level_boundary,
flow_boundary,
allocation,
) = p
(; vertical_flux) = basin

# Update tprev
p.tprev[] = t

# Reset cumulative flows, used to calculate the concentration
# of the basins after processing inflows only
fill!(basin.cumulative_in, 0.0)

# Update cumulative forcings which are integrated exactly
@. basin.cumulative_drainage += vertical_flux.drainage * dt
@. basin.cumulative_drainage_saveat += vertical_flux.drainage * dt
basin.mass .+= basin.concentration[1, :, :] .* vertical_flux.drainage * dt
basin.cumulative_in .= vertical_flux.drainage * dt

# Precipitation depends on fixed area
for node_id in basin.node_id
Expand All @@ -108,15 +145,26 @@

basin.cumulative_precipitation[node_id.idx] += added_precipitation
basin.cumulative_precipitation_saveat[node_id.idx] += added_precipitation
basin.mass[node_id.idx, :] .+=
basin.concentration[2, node_id.idx, :] .* added_precipitation
basin.cumulative_in[node_id.idx] += added_precipitation
end

# Exact boundary flow over time step
for (id, flow_rate, active) in
zip(flow_boundary.node_id, flow_boundary.flow_rate, flow_boundary.active)
for (id, flow_rate, active, edge) in zip(
flow_boundary.node_id,
flow_boundary.flow_rate,
flow_boundary.active,
flow_boundary.outflow_edges,
)
if active
outflow_id = edge[1].edge[2]
volume = integral(flow_rate, tprev, t)
flow_boundary.cumulative_flow[id.idx] += volume
flow_boundary.cumulative_flow_saveat[id.idx] += volume
basin.mass[outflow_id.idx, :] .+=
flow_boundary.concentration[id.idx, :] .* volume
basin.cumulative_in[outflow_id.idx] += volume
end
end

Expand All @@ -141,6 +189,132 @@
end
end
end

evetion marked this conversation as resolved.
Show resolved Hide resolved
# Process mass updates for UserDemand separately
# as the inflow and outflow are decoupled in the states
for (inflow_edge, outflow_edge) in
zip(user_demand.inflow_edge, user_demand.outflow_edge)
from_node = inflow_edge.edge[1]
to_node = outflow_edge.edge[2]
userdemand_idx = outflow_edge.edge[1].idx
if from_node.type == NodeType.Basin
flow = flow_update_on_edge(integrator, inflow_edge.edge)
if flow < 0
basin.mass[from_node.idx, :] .-=

Check warning on line 203 in core/src/callback.jl

View check run for this annotation

Codecov / codecov/patch

core/src/callback.jl#L203

Added line #L203 was not covered by tests
basin.concentration_state[to_node.idx, :] .* flow
basin.mass[from_node.idx, :] .-=

Check warning on line 205 in core/src/callback.jl

View check run for this annotation

Codecov / codecov/patch

core/src/callback.jl#L205

Added line #L205 was not covered by tests
user_demand.concentration[userdemand_idx, :] .* flow
end
end
if to_node.type == NodeType.Basin
flow = flow_update_on_edge(integrator, outflow_edge.edge)
if flow > 0
basin.mass[to_node.idx, :] .+=
basin.concentration_state[from_node.idx, :] .* flow
basin.mass[to_node.idx, :] .+=
user_demand.concentration[userdemand_idx, :] .* flow
end
end
end

# Process all mass inflows to basins
for (inflow_edge, outflow_edge) in zip(state_inflow_edge, state_outflow_edge)
from_node = inflow_edge.edge[1]
to_node = outflow_edge.edge[2]
if from_node.type == NodeType.Basin
flow = flow_update_on_edge(integrator, inflow_edge.edge)
if flow < 0
basin.cumulative_in[from_node.idx] -= flow
if to_node.type == NodeType.Basin
basin.mass[from_node.idx, :] .-=
basin.concentration_state[to_node.idx, :] .* flow
elseif to_node.type == NodeType.LevelBoundary
basin.mass[from_node.idx, :] .-=
level_boundary.concentration[to_node.idx, :] .* flow
elseif to_node.type == NodeType.UserDemand
basin.mass[from_node.idx, :] .-=

Check warning on line 235 in core/src/callback.jl

View check run for this annotation

Codecov / codecov/patch

core/src/callback.jl#L235

Added line #L235 was not covered by tests
user_demand.concentration[to_node.idx, :] .* flow
else
@warn "Unsupported outflow from $(to_node.type) #$(to_node.value) to $(from_node.type) #$(from_node.value) with flow $flow"
end
end
end

if to_node.type == NodeType.Basin
flow = flow_update_on_edge(integrator, outflow_edge.edge)
if flow > 0
basin.cumulative_in[to_node.idx] += flow
if from_node.type == NodeType.Basin
basin.mass[to_node.idx, :] .+=
basin.concentration_state[from_node.idx, :] .* flow
elseif from_node.type == NodeType.LevelBoundary
basin.mass[to_node.idx, :] .+=
level_boundary.concentration[from_node.idx, :] .* flow
elseif from_node.type == NodeType.UserDemand
basin.mass[to_node.idx, :] .+=

Check warning on line 254 in core/src/callback.jl

View check run for this annotation

Codecov / codecov/patch

core/src/callback.jl#L254

Added line #L254 was not covered by tests
user_demand.concentration[from_node.idx, :] .* flow
elseif from_node.type == NodeType.Terminal && from_node.value == 0
# UserDemand outflow is discoupled from its inflow,
# and the unset flow edge defaults to Terminal #0
nothing
else
@warn "Unsupported outflow from $(from_node.type) #$(from_node.value) to $(to_node.type) #$(to_node.value) with flow $flow"

Check warning on line 261 in core/src/callback.jl

View check run for this annotation

Codecov / codecov/patch

core/src/callback.jl#L261

Added line #L261 was not covered by tests
end
end
end
end

# Update the basin concentrations based on the added mass and flows
basin.concentration_state .= basin.mass ./ (basin.storage_prev .+ basin.cumulative_in)

# Process all mass outflows from basins
for (inflow_edge, outflow_edge) in zip(state_inflow_edge, state_outflow_edge)
from_node = inflow_edge.edge[1]
to_node = outflow_edge.edge[2]
if from_node.type == NodeType.Basin
flow = flow_update_on_edge(integrator, inflow_edge.edge)
if flow > 0
basin.mass[from_node.idx, :] .-=
basin.concentration_state[from_node.idx, :] .* flow
end
end
if to_node.type == NodeType.Basin
flow = flow_update_on_edge(integrator, outflow_edge.edge)
if flow < 0
basin.mass[to_node.idx, :] .+=
basin.concentration_state[to_node.idx, :] .* flow
end
end
end

# Evaporate mass to keep the mass balance, if enabled in model config
if basin.evaporate_mass
basin.mass .-= basin.concentration_state .* (u.evaporation - uprev.evaporation)
end
basin.mass .-= basin.concentration_state .* (u.infiltration - uprev.infiltration)

# Take care of infinitely small masses, possibly becoming negative due to truncation.
for I in eachindex(basin.mass)
if (-eps(Float64)) < basin.mass[I] < (eps(Float64))
basin.mass[I] = 0.0
end
end

# Check for negative masses
if any(<(0), basin.mass)
R = CartesianIndices(basin.mass)
locations = findall(<(0), basin.mass)
for I in locations
basin_idx, substance_idx = Tuple(R[I])
@error "$(basin.node_id[basin_idx]) has negative mass $(basin.mass[I]) for substance $(basin.substances[substance_idx])"
end
error("Negative mass(es) detected")

Check warning on line 311 in core/src/callback.jl

View check run for this annotation

Codecov / codecov/patch

core/src/callback.jl#L305-L311

Added lines #L305 - L311 were not covered by tests
end

# Update the basin concentrations again based on the removed mass
basin.concentration_state .= basin.mass ./ basin.current_storage[parent(u)]
basin.storage_prev .= basin.current_storage[parent(u)]

return nothing
end

Expand Down Expand Up @@ -247,13 +421,15 @@
@. basin.cumulative_precipitation_saveat = 0.0
@. basin.cumulative_drainage_saveat = 0.0

concentration = copy(basin.concentration_state)
saved_flow = SavedFlow(;
flow = flow_mean,
inflow = inflow_mean,
outflow = outflow_mean,
flow_boundary = flow_boundary_mean,
precipitation,
drainage,
concentration,
t,
)
check_water_balance_error!(saved_flow, integrator, Δt)
Expand Down Expand Up @@ -500,6 +676,10 @@

elseif startswith(variable, "concentration_external.")
value = basin.concentration_external[listen_node_id.idx][variable](t)
elseif startswith(variable, "concentration.")
substance = Symbol(last(split(variable, ".")))
var_idx = findfirst(==(substance), basin.substances)
value = basin.concentration_state[listen_node_id.idx, var_idx]

Check warning on line 682 in core/src/callback.jl

View check run for this annotation

Codecov / codecov/patch

core/src/callback.jl#L679-L682

Added lines #L679 - L682 were not covered by tests
else
error("Unsupported condition variable $variable.")
end
Expand Down Expand Up @@ -590,6 +770,51 @@
return nothing
end

"Load updates from 'Basin / concentration' into the parameters"
function update_basin_conc!(integrator)::Nothing
(; p) = integrator
(; basin) = p
(; node_id, concentration, concentration_time, substances) = basin
t = datetime_since(integrator.t, integrator.p.starttime)

rows = searchsorted(concentration_time.time, t)
timeblock = view(concentration_time, rows)

for row in timeblock
i = searchsortedfirst(node_id, NodeID(NodeType.Basin, row.node_id, 0))
j = findfirst(==(Symbol(row.substance)), substances)
ismissing(row.drainage) || (concentration[1, i, j] = row.drainage)
ismissing(row.precipitation) || (concentration[2, i, j] = row.precipitation)
end
return nothing
end

"Load updates from 'concentration' tables into the parameters"
function update_conc!(integrator, parameter, nodetype)::Nothing
(; p) = integrator
node = getproperty(p, parameter)
(; basin) = p
(; node_id, concentration, concentration_time) = node
(; substances) = basin
t = datetime_since(integrator.t, integrator.p.starttime)

rows = searchsorted(concentration_time.time, t)
timeblock = view(concentration_time, rows)

for row in timeblock
i = searchsortedfirst(node_id, NodeID(nodetype, row.node_id, 0))
j = findfirst(==(Symbol(row.substance)), substances)
ismissing(row.concentration) || (concentration[i, j] = row.concentration)
end
return nothing
end
update_flowb_conc!(integrator)::Nothing =
update_conc!(integrator, :flow_boundary, NodeType.FlowBoundary)
update_levelb_conc!(integrator)::Nothing =
update_conc!(integrator, :level_boundary, NodeType.LevelBoundary)
update_userd_conc!(integrator)::Nothing =

Check warning on line 815 in core/src/callback.jl

View check run for this annotation

Codecov / codecov/patch

core/src/callback.jl#L815

Added line #L815 was not covered by tests
update_conc!(integrator, :user_demand, NodeType.UserDemand)

"Solve the allocation problem for all demands and assign allocated abstractions."
function update_allocation!(integrator)::Nothing
(; p, t, u) = integrator
Expand Down
1 change: 1 addition & 0 deletions core/src/config.jl
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ const nodetypes = collect(keys(nodekinds))
maxiters::Int = 1e9
sparse::Bool = true
autodiff::Bool = false
evaporate_mass::Bool = true
evetion marked this conversation as resolved.
Show resolved Hide resolved
end

# Separate struct, as basin clashes with nodetype
Expand Down
2 changes: 1 addition & 1 deletion core/src/main.jl
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ function main(toml_path::AbstractString)::Cint
if config.ribasim_version != cli.ribasim_version
@warn "The Ribasim version in the TOML config file does not match the used Ribasim CLI version." config.ribasim_version cli.ribasim_version
end
@info "Starting a Ribasim simulation." cli.ribasim_version starttime endtime
@info "Starting a Ribasim simulation." toml_path cli.ribasim_version starttime endtime
evetion marked this conversation as resolved.
Show resolved Hide resolved

try
model = run(config)
Expand Down
Loading