Skip to content

Commit

Permalink
Partly implement DatabaseBackend.{get,remove,set}_meta()
Browse files Browse the repository at this point in the history
  • Loading branch information
khaeru committed Apr 6, 2021
1 parent d5c5371 commit 3665e9d
Show file tree
Hide file tree
Showing 3 changed files with 226 additions and 137 deletions.
309 changes: 188 additions & 121 deletions ixmp/backend/dbapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,30 +157,6 @@ def init(self, ts, annotation):

self.conn.commit()

def _get(self, model, scenario, version):
args = [model, scenario]
if version:
query = """
SELECT ts.id, ts.version FROM timeseries AS ts WHERE model_name = ?
AND scenario_name = ? AND version = ?
"""
args.append(version)
else:
query = """
SELECT ts.id, ts.version FROM timeseries AS ts JOIN annotation AS a
ON a.obj_id == ts.id WHERE ts.model_name = ? AND ts.scenario_name = ?
AND a.obj_class == 'timeseries' AND a.id == '__ixmp_default_version'
"""

cur = self.conn.cursor()
cur.execute(query, args)

result = cur.fetchone()
if result is None:
raise ValueError(f"model={model}, scenario={scenario}")
else:
return result

def get(self, ts):
id, version = self._get(ts.model, ts.scenario, ts.version)

Expand Down Expand Up @@ -227,9 +203,6 @@ def is_default(self, ts):
def run_id(self, ts):
return self._jindex[ts]

def _hash(self, identifiers):
return sha1(json.dumps(identifiers).encode()).hexdigest()

def get_data(self, ts, region, variable, unit, year):
# NB this is a simple implementation
# TODO use the __ixmp_ts_info annotation to filter before loading item data
Expand Down Expand Up @@ -261,35 +234,6 @@ def get_geo(self, ts):
item["value"] = item["data"]
yield tuple(item[f] for f in FIELDS["ts_get_geo"])

def _set_item_with_identifiers(self, ts, kind, data, dims, identifiers):
cur = self.conn.cursor()

# Compute a unique name for this combination of identifiers
name = self._hash(identifiers)
log.debug(f"hash {name} for {identifiers}")

# Create the entry in the database
self.init_item(ts, kind, name, dims, dims)

# Retrieve any existing data
id, existing = self._item_data(ts, name)

if existing:
raise NotImplementedError("set_data() with existing data")

all_data = identifiers.copy()
all_data["data"] = data

# Dump the data
cur.execute(
"INSERT OR REPLACE INTO item_data (item, value) VALUES (?, ?)",
(id, pickle.dumps(all_data)),
)
self.conn.commit()

# Store an annotation with the identifiers
self._annotate(("item", name), "__ixmp_ts_info", repr(identifiers))

def set_data(self, ts, region, variable, data, unit, subannual, meta):
self._set_item_with_identifiers(
ts=ts,
Expand Down Expand Up @@ -323,21 +267,21 @@ def set_geo(self, ts, region, variable, subannual, year, value, unit, meta):

# Methods for ixmp.Scenario

def _iter_items(self, s, type):
cur = self.conn.cursor()
cur.execute(
"""
SELECT i.name FROM timeseries AS ts
JOIN item as i ON ts.id == i.timeseries
WHERE ts.id = ? AND i.type = ?
""",
(self._index[s], type),
def clone(
self,
s,
platform_dest,
model,
scenario,
annotation,
keep_solution,
first_model_year=None,
):
log.warning(
f"clone({s}, {platform_dest}, {model}, {scenario}, {annotation}, "
f"{keep_solution}, {first_model_year}) has no effect"
)
while True:
result = cur.fetchone()
if result is None:
return
yield result
return s

def list_items(self, s, type):
return list(map(itemgetter(0), self._iter_items(s, type)))
Expand Down Expand Up @@ -366,31 +310,6 @@ def item_index(self, s, name, sets_or_names):
dims = eval(cur.fetchone()[0])
return list(dims.keys() if sets_or_names == "names" else dims.values())

def _item_data(self, s, name):
"""Retrieve and unpickle data for item `name` in TimeSeries `s`.
Returns
-------
int
integer ID of the item.
object
item data.
"""
cur = self.conn.cursor()

cur.execute(
"""
SELECT i.id, value FROM item AS i LEFT JOIN item_data
ON i.id == item_data.item WHERE i.timeseries = ? AND i.name = ?
""",
(self._index[s], name),
)
result = cur.fetchone()
if result[1]:
return result[0], pickle.loads(result[1])
else:
return result[0], None

def item_get_elements(self, s, type, name, filters):
id, data = self._item_data(s, name)

Expand Down Expand Up @@ -427,7 +346,6 @@ def item_get_elements(self, s, type, name, filters):
# Scalar equations and variables
result = dict(zip(("lvl", "mrg"), data))
else:
print(s, type, name, filters, data, idx_names, idx_sets)
raise NotImplementedError("non-indexed items")

return result
Expand All @@ -451,8 +369,159 @@ def item_set_elements(self, s, type, name, elements):
)
self.conn.commit()

def _get_meta(self, target):
cur = self.conn.cursor()
cur.execute(
"SELECT id, value FROM annotation WHERE obj_class = ? AND obj_id = ?",
target,
)
result = dict()

while True:
anno = cur.fetchone()
if anno is None:
break
result[anno[0]] = eval(anno[1])

return result

def get_meta(self, model, scenario, version, strict):
if strict:
targets = [self._meta_target(model, scenario, version)]
else:
raise NotImplementedError

result = dict()
for target in targets:
result.update(self._get_meta(target))

return result

def set_meta(self, meta, model, scenario, version):
target = self._meta_target(model, scenario, version)
for key, value in meta.items():
self._annotate(target, key, repr(value))

def remove_meta(self, categories, model, scenario, version):
cur = self.conn.cursor()
cur.execute(
f"""
DELETE FROM annotation WHERE obj_class = ? AND obj_id = ?
AND id IN ({', '.join('?' * len(categories))})
""",
list(self._meta_target(model, scenario, version)) + as_str_list(categories),
)

# Internal

def _annotate(self, obj, anno_id, value):
if isinstance(obj, TimeSeries):
data = ["timeseries", str(self._index[obj])]
elif isinstance(obj, tuple):
data = list(obj)
else:
raise NotImplementedError

self.conn.execute(
"INSERT OR ABORT INTO annotation VALUES (?, ?, ?, ?)",
data + [anno_id, value],
)
self.conn.commit()

def _annotate_code(self, codelist, code_id, anno_id, value):
self.conn.execute(
"INSERT OR ABORT INTO annotation VALUES (?, ?, ?, ?)",
("code", f"{codelist}:{code_id}", anno_id, value),
)
self.conn.commit()

def _get(self, model, scenario, version):
args = [model, scenario]
if version:
query = """
SELECT ts.id, ts.version FROM timeseries AS ts WHERE model_name = ?
AND scenario_name = ? AND version = ?
"""
args.append(version)
else:
query = """
SELECT ts.id, ts.version FROM timeseries AS ts JOIN annotation AS a
ON a.obj_id == ts.id WHERE ts.model_name = ? AND ts.scenario_name = ?
AND a.obj_class == 'timeseries' AND a.id == '__ixmp_default_version'
"""

cur = self.conn.cursor()
cur.execute(query, args)

result = cur.fetchone()
if result is None:
raise ValueError(f"model={model}, scenario={scenario}")
else:
return result

def _hash(self, identifiers):
return sha1(json.dumps(identifiers).encode()).hexdigest()

def _insert_code(self, codelist, id, parent=None):
self.conn.execute(
"INSERT OR ABORT INTO code VALUES (?, ?, ?)", (codelist, id, parent)
)
self.conn.commit()

def _item_data(self, s, name):
"""Retrieve and unpickle data for item `name` in TimeSeries `s`.
Returns
-------
int
integer ID of the item.
object
item data.
"""
cur = self.conn.cursor()

cur.execute(
"""
SELECT i.id, value FROM item AS i LEFT JOIN item_data
ON i.id == item_data.item WHERE i.timeseries = ? AND i.name = ?
""",
(self._index[s], name),
)
result = cur.fetchone()
if result[1]:
return result[0], pickle.loads(result[1])
else:
return result[0], None

def _iter_items(self, s, type):
cur = self.conn.cursor()
cur.execute(
"""
SELECT i.name FROM timeseries AS ts
JOIN item as i ON ts.id == i.timeseries
WHERE ts.id = ? AND i.type = ?
""",
(self._index[s], type),
)
while True:
result = cur.fetchone()
if result is None:
return
yield result

def _meta_target(self, model, scenario, version):
"""Return the target object to be annotated with metadata."""
if scenario is version is None:
return ("code", f"model_name:{model}")
elif model is version is None:
return ("code", f"scenario_name:{scenario}")
elif isinstance(version, int):
id, _version = self._get(model, scenario, version)
assert _version == version
return ("timeseries", id)
else:
return ("model/scenario", f"{model}/{scenario}")

def _select_anno(self, obj, anno_id):
if isinstance(obj, TimeSeries):
data = ["timeseries", str(self._index[obj])]
Expand Down Expand Up @@ -480,33 +549,35 @@ def _select_codes(self, codelist):
break
yield from results

def _insert_code(self, codelist, id, parent=None):
self.conn.execute(
"INSERT OR ABORT INTO code VALUES (?, ?, ?)", (codelist, id, parent)
)
self.conn.commit()
def _set_item_with_identifiers(self, ts, kind, data, dims, identifiers):
cur = self.conn.cursor()

def _annotate(self, obj, anno_id, value):
if isinstance(obj, TimeSeries):
data = ["timeseries", str(self._index[obj])]
elif isinstance(obj, tuple):
data = list(obj)
else:
raise NotImplementedError
# Compute a unique name for this combination of identifiers
name = self._hash(identifiers)
log.debug(f"hash {name} for {identifiers}")

self.conn.execute(
"INSERT OR ABORT INTO annotation VALUES (?, ?, ?, ?)",
data + [anno_id, value],
)
self.conn.commit()
# Create the entry in the database
self.init_item(ts, kind, name, dims, dims)

def _annotate_code(self, codelist, code_id, anno_id, value):
self.conn.execute(
"INSERT OR ABORT INTO annotation VALUES (?, ?, ?, ?)",
("code", f"{codelist}:{code_id}", anno_id, value),
# Retrieve any existing data
id, existing = self._item_data(ts, name)

if existing:
raise NotImplementedError("set_data() with existing data")

all_data = identifiers.copy()
all_data["data"] = data

# Dump the data
cur.execute(
"INSERT OR REPLACE INTO item_data (item, value) VALUES (?, ?)",
(id, pickle.dumps(all_data)),
)
self.conn.commit()

# Store an annotation with the identifiers
self._annotate(("item", name), "__ixmp_ts_info", repr(identifiers))

# Required methods that are not implemented
#
# Since base.Backend is an abstract base class with abstract methods, a subclass
Expand All @@ -520,20 +591,16 @@ def nie(self, *args, **kwargs):
cat_list = nie
cat_set_elements = nie
clear_solution = nie
clone = nie
delete = nie
delete_geo = nie
delete_item = nie
delete_meta = nie
discard_changes = nie
get_doc = nie
get_meta = nie
get_scenarios = nie
item_delete_elements = nie
last_update = nie
remove_meta = nie
set_doc = nie
set_meta = nie

# Class-specific methods

Expand Down
Loading

0 comments on commit 3665e9d

Please sign in to comment.