From 3665e9d630d9608dbba3337d46360745cb99881a Mon Sep 17 00:00:00 2001 From: Paul Natsuo Kishimoto Date: Tue, 6 Apr 2021 22:50:40 +0200 Subject: [PATCH] Partly implement DatabaseBackend.{get,remove,set}_meta() --- ixmp/backend/dbapi.py | 309 +++++++++++++++++++------------ ixmp/core.py | 48 +++-- ixmp/tests/backend/test_dbapi.py | 6 +- 3 files changed, 226 insertions(+), 137 deletions(-) diff --git a/ixmp/backend/dbapi.py b/ixmp/backend/dbapi.py index 66d267560..65370d212 100644 --- a/ixmp/backend/dbapi.py +++ b/ixmp/backend/dbapi.py @@ -157,30 +157,6 @@ def init(self, ts, annotation): self.conn.commit() - def _get(self, model, scenario, version): - args = [model, scenario] - if version: - query = """ - SELECT ts.id, ts.version FROM timeseries AS ts WHERE model_name = ? - AND scenario_name = ? AND version = ? - """ - args.append(version) - else: - query = """ - SELECT ts.id, ts.version FROM timeseries AS ts JOIN annotation AS a - ON a.obj_id == ts.id WHERE ts.model_name = ? AND ts.scenario_name = ? - AND a.obj_class == 'timeseries' AND a.id == '__ixmp_default_version' - """ - - cur = self.conn.cursor() - cur.execute(query, args) - - result = cur.fetchone() - if result is None: - raise ValueError(f"model={model}, scenario={scenario}") - else: - return result - def get(self, ts): id, version = self._get(ts.model, ts.scenario, ts.version) @@ -227,9 +203,6 @@ def is_default(self, ts): def run_id(self, ts): return self._jindex[ts] - def _hash(self, identifiers): - return sha1(json.dumps(identifiers).encode()).hexdigest() - def get_data(self, ts, region, variable, unit, year): # NB this is a simple implementation # TODO use the __ixmp_ts_info annotation to filter before loading item data @@ -261,35 +234,6 @@ def get_geo(self, ts): item["value"] = item["data"] yield tuple(item[f] for f in FIELDS["ts_get_geo"]) - def _set_item_with_identifiers(self, ts, kind, data, dims, identifiers): - cur = self.conn.cursor() - - # Compute a unique name for this combination of identifiers - name = self._hash(identifiers) - log.debug(f"hash {name} for {identifiers}") - - # Create the entry in the database - self.init_item(ts, kind, name, dims, dims) - - # Retrieve any existing data - id, existing = self._item_data(ts, name) - - if existing: - raise NotImplementedError("set_data() with existing data") - - all_data = identifiers.copy() - all_data["data"] = data - - # Dump the data - cur.execute( - "INSERT OR REPLACE INTO item_data (item, value) VALUES (?, ?)", - (id, pickle.dumps(all_data)), - ) - self.conn.commit() - - # Store an annotation with the identifiers - self._annotate(("item", name), "__ixmp_ts_info", repr(identifiers)) - def set_data(self, ts, region, variable, data, unit, subannual, meta): self._set_item_with_identifiers( ts=ts, @@ -323,21 +267,21 @@ def set_geo(self, ts, region, variable, subannual, year, value, unit, meta): # Methods for ixmp.Scenario - def _iter_items(self, s, type): - cur = self.conn.cursor() - cur.execute( - """ - SELECT i.name FROM timeseries AS ts - JOIN item as i ON ts.id == i.timeseries - WHERE ts.id = ? AND i.type = ? - """, - (self._index[s], type), + def clone( + self, + s, + platform_dest, + model, + scenario, + annotation, + keep_solution, + first_model_year=None, + ): + log.warning( + f"clone({s}, {platform_dest}, {model}, {scenario}, {annotation}, " + f"{keep_solution}, {first_model_year}) has no effect" ) - while True: - result = cur.fetchone() - if result is None: - return - yield result + return s def list_items(self, s, type): return list(map(itemgetter(0), self._iter_items(s, type))) @@ -366,31 +310,6 @@ def item_index(self, s, name, sets_or_names): dims = eval(cur.fetchone()[0]) return list(dims.keys() if sets_or_names == "names" else dims.values()) - def _item_data(self, s, name): - """Retrieve and unpickle data for item `name` in TimeSeries `s`. - - Returns - ------- - int - integer ID of the item. - object - item data. - """ - cur = self.conn.cursor() - - cur.execute( - """ - SELECT i.id, value FROM item AS i LEFT JOIN item_data - ON i.id == item_data.item WHERE i.timeseries = ? AND i.name = ? - """, - (self._index[s], name), - ) - result = cur.fetchone() - if result[1]: - return result[0], pickle.loads(result[1]) - else: - return result[0], None - def item_get_elements(self, s, type, name, filters): id, data = self._item_data(s, name) @@ -427,7 +346,6 @@ def item_get_elements(self, s, type, name, filters): # Scalar equations and variables result = dict(zip(("lvl", "mrg"), data)) else: - print(s, type, name, filters, data, idx_names, idx_sets) raise NotImplementedError("non-indexed items") return result @@ -451,8 +369,159 @@ def item_set_elements(self, s, type, name, elements): ) self.conn.commit() + def _get_meta(self, target): + cur = self.conn.cursor() + cur.execute( + "SELECT id, value FROM annotation WHERE obj_class = ? AND obj_id = ?", + target, + ) + result = dict() + + while True: + anno = cur.fetchone() + if anno is None: + break + result[anno[0]] = eval(anno[1]) + + return result + + def get_meta(self, model, scenario, version, strict): + if strict: + targets = [self._meta_target(model, scenario, version)] + else: + raise NotImplementedError + + result = dict() + for target in targets: + result.update(self._get_meta(target)) + + return result + + def set_meta(self, meta, model, scenario, version): + target = self._meta_target(model, scenario, version) + for key, value in meta.items(): + self._annotate(target, key, repr(value)) + + def remove_meta(self, categories, model, scenario, version): + cur = self.conn.cursor() + cur.execute( + f""" + DELETE FROM annotation WHERE obj_class = ? AND obj_id = ? + AND id IN ({', '.join('?' * len(categories))}) + """, + list(self._meta_target(model, scenario, version)) + as_str_list(categories), + ) + # Internal + def _annotate(self, obj, anno_id, value): + if isinstance(obj, TimeSeries): + data = ["timeseries", str(self._index[obj])] + elif isinstance(obj, tuple): + data = list(obj) + else: + raise NotImplementedError + + self.conn.execute( + "INSERT OR ABORT INTO annotation VALUES (?, ?, ?, ?)", + data + [anno_id, value], + ) + self.conn.commit() + + def _annotate_code(self, codelist, code_id, anno_id, value): + self.conn.execute( + "INSERT OR ABORT INTO annotation VALUES (?, ?, ?, ?)", + ("code", f"{codelist}:{code_id}", anno_id, value), + ) + self.conn.commit() + + def _get(self, model, scenario, version): + args = [model, scenario] + if version: + query = """ + SELECT ts.id, ts.version FROM timeseries AS ts WHERE model_name = ? + AND scenario_name = ? AND version = ? + """ + args.append(version) + else: + query = """ + SELECT ts.id, ts.version FROM timeseries AS ts JOIN annotation AS a + ON a.obj_id == ts.id WHERE ts.model_name = ? AND ts.scenario_name = ? + AND a.obj_class == 'timeseries' AND a.id == '__ixmp_default_version' + """ + + cur = self.conn.cursor() + cur.execute(query, args) + + result = cur.fetchone() + if result is None: + raise ValueError(f"model={model}, scenario={scenario}") + else: + return result + + def _hash(self, identifiers): + return sha1(json.dumps(identifiers).encode()).hexdigest() + + def _insert_code(self, codelist, id, parent=None): + self.conn.execute( + "INSERT OR ABORT INTO code VALUES (?, ?, ?)", (codelist, id, parent) + ) + self.conn.commit() + + def _item_data(self, s, name): + """Retrieve and unpickle data for item `name` in TimeSeries `s`. + + Returns + ------- + int + integer ID of the item. + object + item data. + """ + cur = self.conn.cursor() + + cur.execute( + """ + SELECT i.id, value FROM item AS i LEFT JOIN item_data + ON i.id == item_data.item WHERE i.timeseries = ? AND i.name = ? + """, + (self._index[s], name), + ) + result = cur.fetchone() + if result[1]: + return result[0], pickle.loads(result[1]) + else: + return result[0], None + + def _iter_items(self, s, type): + cur = self.conn.cursor() + cur.execute( + """ + SELECT i.name FROM timeseries AS ts + JOIN item as i ON ts.id == i.timeseries + WHERE ts.id = ? AND i.type = ? + """, + (self._index[s], type), + ) + while True: + result = cur.fetchone() + if result is None: + return + yield result + + def _meta_target(self, model, scenario, version): + """Return the target object to be annotated with metadata.""" + if scenario is version is None: + return ("code", f"model_name:{model}") + elif model is version is None: + return ("code", f"scenario_name:{scenario}") + elif isinstance(version, int): + id, _version = self._get(model, scenario, version) + assert _version == version + return ("timeseries", id) + else: + return ("model/scenario", f"{model}/{scenario}") + def _select_anno(self, obj, anno_id): if isinstance(obj, TimeSeries): data = ["timeseries", str(self._index[obj])] @@ -480,33 +549,35 @@ def _select_codes(self, codelist): break yield from results - def _insert_code(self, codelist, id, parent=None): - self.conn.execute( - "INSERT OR ABORT INTO code VALUES (?, ?, ?)", (codelist, id, parent) - ) - self.conn.commit() + def _set_item_with_identifiers(self, ts, kind, data, dims, identifiers): + cur = self.conn.cursor() - def _annotate(self, obj, anno_id, value): - if isinstance(obj, TimeSeries): - data = ["timeseries", str(self._index[obj])] - elif isinstance(obj, tuple): - data = list(obj) - else: - raise NotImplementedError + # Compute a unique name for this combination of identifiers + name = self._hash(identifiers) + log.debug(f"hash {name} for {identifiers}") - self.conn.execute( - "INSERT OR ABORT INTO annotation VALUES (?, ?, ?, ?)", - data + [anno_id, value], - ) - self.conn.commit() + # Create the entry in the database + self.init_item(ts, kind, name, dims, dims) - def _annotate_code(self, codelist, code_id, anno_id, value): - self.conn.execute( - "INSERT OR ABORT INTO annotation VALUES (?, ?, ?, ?)", - ("code", f"{codelist}:{code_id}", anno_id, value), + # Retrieve any existing data + id, existing = self._item_data(ts, name) + + if existing: + raise NotImplementedError("set_data() with existing data") + + all_data = identifiers.copy() + all_data["data"] = data + + # Dump the data + cur.execute( + "INSERT OR REPLACE INTO item_data (item, value) VALUES (?, ?)", + (id, pickle.dumps(all_data)), ) self.conn.commit() + # Store an annotation with the identifiers + self._annotate(("item", name), "__ixmp_ts_info", repr(identifiers)) + # Required methods that are not implemented # # Since base.Backend is an abstract base class with abstract methods, a subclass @@ -520,20 +591,16 @@ def nie(self, *args, **kwargs): cat_list = nie cat_set_elements = nie clear_solution = nie - clone = nie delete = nie delete_geo = nie delete_item = nie delete_meta = nie discard_changes = nie get_doc = nie - get_meta = nie get_scenarios = nie item_delete_elements = nie last_update = nie - remove_meta = nie set_doc = nie - set_meta = nie # Class-specific methods diff --git a/ixmp/core.py b/ixmp/core.py index 9eaa4f905..62a2ea6ef 100644 --- a/ixmp/core.py +++ b/ixmp/core.py @@ -363,7 +363,32 @@ def set_meta( .. todo:: Complete docstring before merging. """ - validate_meta_args(model, scenario, version) + kind = validate_meta_args(model, scenario, version) + + if len(data) == 0: + return + + # Arguments for checking other + other_args = [ + (model, None, None), + (None, scenario, None), + (model, scenario, None), + ] + other_data = list( + self._backend.get_meta(*args, strict=True) if i != kind else dict() + for i, args in enumerate(other_args) + ) + + # Check for existing meta key + for key, value in data.items(): + exists = list(key in other for other in other_data) + if any(exists): + args = other_args[exists.index(True)] + raise RuntimeError( + f"The meta category {repr(key)} is already used at another level: " + "model {!r}, scenario {!r}, version {!r}".format(*args) + ) + self._backend.set_meta(data, model, scenario, version) def timeslices(self): @@ -1759,15 +1784,9 @@ def set_meta(self, name_or_dict, value=None): if isinstance(name_or_dict, str): name_or_dict = {name_or_dict: value} else: - msg = ( - "Unsupported parameter type of name_or_dict: %s. " - "Supported parameter types for name_or_dict are " - "String and Dictionary" - ) % type(name_or_dict) - raise ValueError(msg) - self.platform._backend.set_meta( - name_or_dict, self.model, self.scenario, self.version - ) + raise TypeError(f"{type(name_or_dict)}; expected str or dict") + + self.platform.set_meta(name_or_dict, self.model, self.scenario, self.version) def delete_meta(self, *args, **kwargs): """Remove scenario meta. @@ -1916,9 +1935,12 @@ def validate_meta_args( model: Optional[str], scenario: Optional[str], version: Optional[str] ): """Helper for :meth:`.Platform.set_meta` and :meth:`.Platform.get_meta`.""" - mask = (model is not None, scenario is not None, version is not None) - if mask not in [(1, 0, 0), (0, 1, 0), (1, 1, 0), (1, 1, 1)]: + try: + return [(1, 0, 0), (0, 1, 0), (1, 1, 0), (1, 1, 1)].index( + (model is not None, scenario is not None, version is not None) + ) + except IndexError: raise ValueError( "Invalid arguments. Valid combinations are: (model), (scenario), " "(model, scenario), (model, scenario, version)" - ) + ) from None diff --git a/ixmp/tests/backend/test_dbapi.py b/ixmp/tests/backend/test_dbapi.py index a616d5ff3..b717f13f5 100644 --- a/ixmp/tests/backend/test_dbapi.py +++ b/ixmp/tests/backend/test_dbapi.py @@ -229,7 +229,7 @@ def test_unique_meta(mp_, meta): mp.set_meta(meta, model=DANTZIG["model"]) expected = ( r"The meta category .* is already used at another level: " - r"model canning problem, scenario null, version null" + r"model 'canning problem', scenario None, version None" ) with pytest.raises(Exception, match=expected): mp.set_meta(meta, **DANTZIG, version=scenario.version) @@ -242,7 +242,7 @@ def test_unique_meta(mp_, meta): meta["sample_entry"] = "test-string" expected = ( r"The meta category .* is already used at another level: " - r"model canning problem, scenario standard, version null" + r"model 'canning problem', scenario 'standard', version None" ) with pytest.raises(Exception, match=expected): mp.set_meta(meta, **DANTZIG, version=scenario.version) @@ -395,7 +395,7 @@ def test_unique_meta_scenario(mp_, meta): expected = ( r"The meta category .* is already used at another level: " - r"model canning problem, scenario standard, " + r"model 'canning problem', scenario 'standard', " ) with pytest.raises(Exception, match=expected): mp.set_meta(meta, **DANTZIG)