Skip to content

Commit

Permalink
Merge remote-tracking branch 'refs/remotes/origin/main' into feature/…
Browse files Browse the repository at this point in the history
…multi-asset-StorageScheduler
  • Loading branch information
Flix6x committed Feb 1, 2025
2 parents 5834faa + c3c61bd commit 520834f
Show file tree
Hide file tree
Showing 9 changed files with 269 additions and 64 deletions.
1 change: 1 addition & 0 deletions documentation/changelog.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ New features
Infrastructure / Support
----------------------
* Enhance reporting infrastructure by ensuring that all ``Sensor.search_beliefs`` filters can be used as report parameters [see `PR #1318 <https://github.com/FlexMeasures/flexmeasures/pull/1318>`_]
* Improve searching for multi-sourced data by returning data from only the latest version of a data generator (e.g. forecaster or scheduler) by default, when using ``Sensor.search_beliefs`` [see `PR #1306 <https://github.com/FlexMeasures/flexmeasures/pull/1306>`_]

Bugfixes
-----------
Expand Down
105 changes: 83 additions & 22 deletions flexmeasures/data/models/data_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from typing import TYPE_CHECKING, Any, ClassVar
from sqlalchemy.ext.mutable import MutableDict

import pandas as pd
import timely_beliefs as tb

from packaging.version import Version
Expand Down Expand Up @@ -319,17 +320,23 @@ def label(self):

@property
def description(self):
"""Extended description
"""Extended description.
For example:
>>> DataSource("Seita", type="forecaster", model="naive", version="1.2").description
<<< "Seita's naive model v1.2.0"
<<< "Seita's naive forecaster v1.2"
>>> DataSource("Seita", type="scheduler", model="StorageScheduler", version="2").description
<<< "Seita's StorageScheduler model v2"
"""
descr = self.name
if self.model:
descr += f"'s {self.model} model"
descr += f"'s {self.model} "
# Mention the data source type unless the model name already mentions it
descr += (
self.type if self.type.lower() not in self.model.lower() else "model"
)
if self.version:
descr += f" v{self.version}"
return descr
Expand Down Expand Up @@ -373,25 +380,79 @@ def set_attribute(self, attribute: str, value):
self.attributes[attribute] = value


def keep_latest_version(data_sources: list[DataSource]) -> list[DataSource]:
"""
Filters the given list of data sources to only include the latest version
of each unique combination of (name, type, and model).
def keep_latest_version(
bdf: tb.BeliefsDataFrame,
one_deterministic_belief_per_event: bool = False,
) -> tb.BeliefsDataFrame:
"""Filters the BeliefsDataFrame to keep the latest version of each source, for each event.
The function performs the following steps:
1. Resets the index to flatten the DataFrame.
2. Adds columns for the source's name, type, model, and version.
3. Sorts the rows by event_start and source.version in descending order.
4. Removes duplicates based on event_start, source.name, source.type, and source.model, keeping the latest version.
5. Drops the temporary columns added for source attributes.
6. Restores the original index.
Parameters:
-----------
bdf : tb.BeliefsDataFrame
The input BeliefsDataFrame containing event_start and source information.
Returns:
--------
tb.BeliefsDataFrame
A new BeliefsDataFrame containing only the latest version of each source
for each event_start, with the original index restored.
"""
sources = dict()

for source in data_sources:
key = (source.name, source.type, source.model)
if key not in sources:
sources[key] = source
else:
sources[key] = max(
[source, sources[key]],
key=lambda x: Version(x.version if x.version else "0.0.0"),
)
if bdf.empty:
return bdf

# Remember the original index, then reset it
index_levels = bdf.index.names
bdf = bdf.reset_index()
belief_column = "belief_time"
if belief_column not in index_levels:
belief_column = "belief_horizon"
event_column = "event_start"
if event_column not in index_levels:
event_column = "event_end"

# Add source-related columns using vectorized operations for clarity
bdf[["source.name", "source.type", "source.model", "source.version"]] = bdf[
"source"
].apply(
lambda s: pd.Series(
{
"source.name": s.name,
"source.type": s.type,
"source.model": s.model,
"source.version": Version(
s.version if s.version is not None else "0.0.0"
),
}
)
)

last_version_sources = []
for source in sources.values():
last_version_sources.append(source)
# Sort by event_start and version, keeping only the latest version
bdf = bdf.sort_values(by=[event_column, "source.version"], ascending=[True, False])

# Drop duplicates based on event_start and source identifiers, keeping the latest version
unique_columns = [
event_column,
"cumulative_probability",
"source.name",
"source.type",
"source.model",
]
if not one_deterministic_belief_per_event:
unique_columns += [belief_column]
bdf = bdf.drop_duplicates(unique_columns)

# Remove temporary columns and restore the original index
bdf = bdf.drop(
columns=["source.name", "source.type", "source.model", "source.version"]
)
bdf = bdf.set_index(index_levels)

return last_version_sources
return bdf
44 changes: 35 additions & 9 deletions flexmeasures/data/models/reporting/aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import pandas as pd

from flexmeasures.data.models.reporting import Reporter
from flexmeasures.data.models.time_series import Sensor
from flexmeasures.data.schemas.reporting.aggregation import (
AggregatorConfigSchema,
AggregatorParametersSchema,
Expand Down Expand Up @@ -42,22 +43,23 @@ def _compute_report(
"""

method: str = self._config.get("method")
weights: list = self._config.get("weights", {})
weights: dict = self._config.get("weights", {})

dataframes = []

if belief_time is None:
belief_time = server_now()

for input_description in input:
sensor = input_description["sensor"]
sensor: Sensor = input_description.pop("sensor")
# if name is not in belief_search_config, using the Sensor id instead
column_name = input_description.get(
"name", f"sensor_{input_description['sensor'].id}"
)
column_name = input_description.pop("name", f"sensor_{sensor.id}")

source = input_description.get("source")
source = input_description.get("sources", source)
source = input_description.pop(
"source", input_description.pop("sources", None)
)
if source is not None and not isinstance(source, list):
source = [source]

df = sensor.search_beliefs(
event_starts_after=start,
Expand All @@ -66,10 +68,34 @@ def _compute_report(
beliefs_before=belief_time,
source=source,
one_deterministic_belief_per_event=True,
**input_description,
)

# found multiple sources in the beliefs of df but no source is specified
if len(df.lineage.sources) > 1 and (source is None or len(source) == 0):
# Check for multi-sourced events (i.e. multiple sources for a single event)
if len(df.lineage.events) != len(df):
duplicate_events = df[
df.index.get_level_values("event_start").duplicated()
]
raise ValueError(
f"{len(duplicate_events)} event(s) are duplicate. First duplicate: {duplicate_events[0]}. Consider using (more) source filters."
)

# Check for multiple sources within the entire frame (excluding different versions of the same source)
unique_sources = df.lineage.sources
properties = [
"name",
"type",
"model",
] # properties to identify different versions of the same source
if (
len(unique_sources) > 1
and not all(
getattr(source, prop) == getattr(unique_sources[0], prop)
for prop in properties
for source in unique_sources
)
and (source is None or len(source) == 0)
):
raise ValueError(
"Missing attribute source or sources. The fields `source` or `sources` is required when having multiple sources within the time window."
)
Expand Down
30 changes: 14 additions & 16 deletions flexmeasures/data/models/reporting/pandas_reporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
PandasReporterParametersSchema,
)
from flexmeasures.data.models.time_series import Sensor
from flexmeasures.data.models.data_sources import keep_latest_version
from flexmeasures.utils.time_utils import server_now


Expand Down Expand Up @@ -52,18 +51,29 @@ def fetch_data(
input: dict,
resolution: timedelta | None = None,
belief_time: datetime | None = None,
use_latest_version_only: bool = False,
use_latest_version_only: bool | None = None, # deprecated
):
"""
Fetches the time_beliefs from the database
Fetches the timed_beliefs from the database
"""

# todo: deprecate the 'use_latest_version_only' argument (announced v0.25.0)
if use_latest_version_only is not None:
current_app.logger.warning(
"""The `use_latest_version_only` argument to `PandasReporter.compute()` is deprecated. By default, data is sourced by the latest version of a data generator by default. You can still override this behaviour by calling `PandasReporter().compute(input=[dict(use_latest_version_per_event=False)])` instead."""
)

droplevels = self._config.get("droplevels", False)

self.data = {}
for input_search_parameters in input:
_input_search_parameters = input_search_parameters.copy()

if use_latest_version_only is not None:
_input_search_parameters["use_latest_version_per_event"] = (
use_latest_version_only
)

sensor: Sensor = _input_search_parameters.pop("sensor", None)

name = _input_search_parameters.pop("name", f"sensor_{sensor.id}")
Expand All @@ -79,18 +89,6 @@ def fetch_data(
"source", _input_search_parameters.pop("sources", None)
)

if use_latest_version_only and source is None:
source = sensor.search_data_sources(
event_ends_after=start,
event_starts_before=end,
source_types=_input_search_parameters.pop("source_types", None),
exclude_source_types=_input_search_parameters.pop(
"exclude_source_types", None
),
)
if len(source) > 0:
source = keep_latest_version(source)

bdf = sensor.search_beliefs(
event_starts_after=event_starts_after,
event_ends_before=event_ends_before,
Expand All @@ -113,7 +111,7 @@ def fetch_data(
event_resolution=sensor.event_resolution,
)
if droplevels:
# dropping belief_time, source and cummulative_probability columns
# dropping belief_time, source and cumulative_probability columns
bdf = bdf.droplevel([1, 2, 3])
assert (
bdf.index.is_unique
Expand Down
20 changes: 15 additions & 5 deletions flexmeasures/data/models/reporting/tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,11 @@ def setup_dummy_data(db, app, generic_report):
db.session.add(daily_report_sensor)

"""
Create 2 DataSources
Create 3 DataSources
"""
source1 = DataSource("source1")
source2 = DataSource("source2")
source1 = DataSource("source1", type="A")
source2 = DataSource("source2", type="B")
source2v02 = DataSource("source2", type="B", version="0.2")

"""
Create TimedBeliefs
Expand Down Expand Up @@ -250,8 +251,7 @@ def setup_dummy_data(db, app, generic_report):
source=source2,
)
)

# add a belief belonging to Source 2 in the second half of the day ()
# add a belief belonging to Source 1 in the second half of the day
beliefs.append(
TimedBelief(
event_start=datetime(2023, 4, 24, tzinfo=utc) + timedelta(hours=12),
Expand All @@ -261,6 +261,16 @@ def setup_dummy_data(db, app, generic_report):
source=source1,
)
)
# add a belief belonging to version 0.2 of Source 2 around the end of the day, recorded 25 instead of 24 hours in advance
beliefs.append(
TimedBelief(
event_start=datetime(2023, 4, 24, tzinfo=utc) + timedelta(hours=23),
belief_horizon=timedelta(hours=25),
event_value=3,
sensor=sensor3,
source=source2v02,
)
)

# add data for sensor 4
for t in range(24 * 3):
Expand Down
53 changes: 51 additions & 2 deletions flexmeasures/data/models/reporting/tests/test_aggregator.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ def test_source_transition(setup_dummy_data, db):
From 12:00 to 24:00, there are events belonging to Source 2 with value -1.
We expect the reporter to use only the values defined in the `sources` array in the `input` field.
In case of encountering more that one source per event, the first source defined in the sources
In case of encountering more than one source per event, the first source defined in the sources
array is prioritized.
"""
Expand Down Expand Up @@ -214,7 +214,7 @@ def test_source_transition(setup_dummy_data, db):
assert (result == -1).all().event_value

# if no source is passed, the reporter should raise a ValueError
# as there are events with different time sources in the report time period.
# as there are events with different data sources in the report time period.
# This is important, for instance, for sensors containing power and scheduled values
# where we could get beliefs from both sources.
with pytest.raises(ValueError):
Expand All @@ -225,3 +225,52 @@ def test_source_transition(setup_dummy_data, db):
output=[dict(sensor=report_sensor)],
belief_time=tz.localize(datetime(2023, 12, 1)),
)[0]["data"]

# The exception to the above is when a new version of the same source recorded a value,
# in which case the latest version takes precedence. This happened in the last hour of the day.
result = agg_reporter.compute(
start=tz.localize(datetime(2023, 4, 24, 18, 0)),
end=tz.localize(datetime(2023, 4, 25)),
input=[dict(sensor=s3)],
output=[dict(sensor=report_sensor)],
belief_time=tz.localize(datetime(2023, 12, 1)),
)[0]["data"]

assert (result[:5] == -1).all().event_value # beliefs from the older version
assert (result[5:] == 3).all().event_value # belief from the latest version

# If we exclude source type "A" (source 1 is of that type) we should get the same result.
same_result = agg_reporter.compute(
start=tz.localize(datetime(2023, 4, 24, 18, 0)),
end=tz.localize(datetime(2023, 4, 25)),
input=[dict(sensor=s3, exclude_source_types=["A"])],
output=[dict(sensor=report_sensor)],
belief_time=tz.localize(datetime(2023, 12, 1)),
)[0]["data"]

assert (same_result == result).all().event_value

# If we exclude source type "B" (both versions of source 2 are of that type) we should get an empty result
result = agg_reporter.compute(
start=tz.localize(datetime(2023, 4, 24, 18, 0)),
end=tz.localize(datetime(2023, 4, 25)),
input=[dict(sensor=s3, exclude_source_types=["B"])],
output=[dict(sensor=report_sensor)],
belief_time=tz.localize(datetime(2023, 12, 1)),
)[0]["data"]

assert result.empty

# If we set use_latest_version_per_event=False, we should get both versions of source 2,
# and one_deterministic_belief_per_event=True kicks in to give back the most recent version
result = agg_reporter.compute(
start=tz.localize(datetime(2023, 4, 24, 18, 0)),
end=tz.localize(datetime(2023, 4, 25)),
input=[dict(sensor=s3, use_latest_version_per_event=False)],
output=[dict(sensor=report_sensor)],
belief_time=tz.localize(datetime(2023, 12, 1)),
)[0]["data"]

assert len(result) == 6
assert (result[:5] == -1).all().event_value # beliefs from the older version
assert (result[5:] == 3).all().event_value # belief from the latest version
Loading

0 comments on commit 520834f

Please sign in to comment.