Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support for various metadata store backends [need feedback and review] #45

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
134 changes: 57 additions & 77 deletions cmflib/cmf.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,13 @@
# This import is needed for jupyterlab environment
import dvc
from ml_metadata.proto import metadata_store_pb2 as mlpb
from ml_metadata.metadata_store import metadata_store
from cmflib.dvc_wrapper import dvc_get_url, dvc_get_hash, git_get_commit, \
commit_output, git_get_repo, commit_dvc_lock_file, \
git_checkout_new_branch, \
check_git_repo, check_default_remote, check_git_remote,git_commit
from cmflib import graph_wrapper
from cmflib.metadata_helper import get_or_create_parent_context, \
get_or_create_run_context, associate_child_to_parent_context, \
create_new_execution_in_existing_run_context, link_execution_to_artifact, \
create_new_artifact_event_and_attribution, get_artifacts_by_id, \
put_artifact, link_execution_to_input_artifact
from cmflib.metadata.store import MetadataStore
from cmflib.metadata.mlmd_store import MlmdStore


class Cmf:
Expand Down Expand Up @@ -78,10 +74,7 @@ def __init__(self, filename: str = "mlmd",
Cmf.__prechecks()
if custom_properties is None:
custom_properties = {}
config = mlpb.ConnectionConfig()
config.sqlite.filename_uri = filename
self.store = metadata_store.MetadataStore(config)
self.filename = filename
self.store: MetadataStore = MlmdStore(backend='sqlite', config={'filename': filename})
self.child_context = None
self.execution = None
self.execution_name = ""
Expand All @@ -94,8 +87,9 @@ def __init__(self, filename: str = "mlmd",

if is_server is False:
git_checkout_new_branch(self.branch_name)
self.parent_context = get_or_create_parent_context(
store=self.store, pipeline=pipeline_name, custom_properties=custom_properties)
self.parent_context = self.store.get_or_create_parent_context(
pipeline=pipeline_name, custom_properties=custom_properties
)
if graph is True:
self.driver = graph_wrapper.GraphDriver(
Cmf.__neo4j_uri, Cmf.__neo4j_user, Cmf.__neo4j_password)
Expand Down Expand Up @@ -159,27 +153,28 @@ def __del__(self):
def create_context(self, pipeline_stage: str, custom_properties: {} = None) -> mlpb.Context:
custom_props = {} if custom_properties is None else custom_properties
pipeline_stage = self.parent_context.name+'/'+pipeline_stage
ctx = get_or_create_run_context(self.store, pipeline_stage, custom_props)

ctx = self.store.get_or_create_run_context(pipeline_stage, custom_props)
self.child_context = ctx
associate_child_to_parent_context(store=self.store, parent_context=self.parent_context,
child_context=ctx)
self.store.associate_child_to_parent_context(parent_context=self.parent_context, child_context=ctx)

if self.graph:
self.driver.create_stage_node(
pipeline_stage, self.parent_context, ctx.id, custom_props)
return ctx

def merge_created_context(self, pipeline_stage: str, custom_properties: {} = None) -> mlpb.Context:
custom_props = {} if custom_properties is None else custom_properties
ctx = get_or_create_run_context(self.store, pipeline_stage, custom_props)

ctx = self.store.get_or_create_run_context(pipeline_stage, custom_props)
self.child_context = ctx
associate_child_to_parent_context(store=self.store, parent_context=self.parent_context,
child_context=ctx)
self.store.associate_child_to_parent_context(parent_context=self.parent_context, child_context=ctx)

if self.graph:
self.driver.create_stage_node(
pipeline_stage, self.parent_context, ctx.id, custom_props)
return ctx


def create_execution(self, execution_type: str,
custom_properties: t.Optional[t.Dict] = None) -> mlpb.Execution:
"""Create execution.
Expand Down Expand Up @@ -217,8 +212,7 @@ def create_execution(self, execution_type: str,
custom_props = {} if custom_properties is None else custom_properties
git_repo = git_get_repo()
git_start_commit = git_get_commit()
self.execution = create_new_execution_in_existing_run_context(
store=self.store,
self.execution = self.store.create_new_execution_in_existing_run_context(
execution_type_name=execution_type,
context_id=self.child_context.id,
execution=str(sys.argv),
Expand Down Expand Up @@ -251,8 +245,7 @@ def update_execution(self, execution_id: int, custom_properties: t.Optional[t.Di
if self.execution is None:
print("Error - no execution id")
sys.exit(1)
execution_type = self.store.get_execution_types_by_id(
[self.execution.type_id])[0]
execution_type = self.store.get_execution_types_by_id([self.execution.type_id])[0]

if custom_properties:
for key, value in custom_properties.items():
Expand All @@ -261,7 +254,7 @@ def update_execution(self, execution_id: int, custom_properties: t.Optional[t.Di
else:
self.execution.custom_properties[key].string_value = str(
value)
self.store.put_executions([self.execution])
self.store.put_execution(self.execution)
c_props = {}
for k, v in self.execution.custom_properties.items():
key = re.sub('-', '_', k)
Expand Down Expand Up @@ -300,17 +293,16 @@ def merge_created_execution(self, execution_type: str, execution_cmd: str, prope
# print(custom_props)
git_repo = properties.get("Git_Repo", "")
git_start_commit = properties.get("Git_Start_Commit", "")
self.execution = create_new_execution_in_existing_run_context \
(store=self.store,
execution_type_name=execution_type,
context_id=self.child_context.id,
execution=execution_cmd,
pipeline_id=self.parent_context.id,
pipeline_type=self.parent_context.name,
git_repo=git_repo,
git_start_commit=git_start_commit,
custom_properties=custom_props
)
self.execution = self.store.create_new_execution_in_existing_run_context(
execution_type_name=execution_type,
context_id=self.child_context.id,
execution=execution_cmd,
pipeline_id=self.parent_context.id,
pipeline_type=self.parent_context.name,
git_repo=git_repo,
git_start_commit=git_start_commit,
custom_properties=custom_props
)
self.execution_name = str(self.execution.id) + "," + execution_type
self.execution_command = execution_cmd
for k, v in custom_props.items():
Expand Down Expand Up @@ -372,20 +364,18 @@ def log_dataset(self, url: str, event: str, custom_properties: t.Optional[t.Dict
self.update_existing_artifact(
existing_artifact, custom_properties)
uri = c_hash
# update url for existing artifact
self.update_dataset_url(existing_artifact, dvc_url_with_pipeline)
artifact = link_execution_to_artifact(
store=self.store,
artifact = self.store.link_execution_to_artifact(
execution_id=self.execution.id,
uri=uri,
input_name=url,
event_type=event_type)
event_type=event_type
)
else:
# if((existing_artifact and len(existing_artifact )!= 0) and c_hash != ""):
# url = url + ":" + str(self.execution.id)
uri = c_hash if c_hash and c_hash.strip() else str(uuid.uuid1())
artifact = create_new_artifact_event_and_attribution(
store=self.store,
artifact = self.store.create_new_artifact_event_and_attribution(
execution_id=self.execution.id,
context_id=self.child_context.id,
uri=uri,
Expand Down Expand Up @@ -451,7 +441,7 @@ def update_dataset_url(self, artifact: mlpb.Artifact, updated_url: str):
if updated_url not in old_url:
new_url = f"{old_url},{updated_url}"
artifact.properties[key].string_value = new_url
put_artifact(self.store, artifact)
self.store.put_artifact(artifact)

def update_model_url(self, dup_artifact: list, updated_url: str):
for art in dup_artifact:
Expand All @@ -462,7 +452,7 @@ def update_model_url(self, dup_artifact: list, updated_url: str):
if updated_url not in old_url:
new_url = f"{old_url},{updated_url}"
dup_art.properties[key].string_value = new_url
put_artifact(self.store, dup_art)
self.store.put_artifact(dup_art)
return dup_artifact

def log_dataset_with_version(self, url: str, version: str, event: str, props: dict,
Expand Down Expand Up @@ -493,10 +483,8 @@ def log_dataset_with_version(self, url: str, version: str, event: str, props: di
self.update_existing_artifact(
existing_artifact, custom_properties)
uri = c_hash
# update url for existing artifact
self.update_dataset_url(existing_artifact, props['url'])
artifact = link_execution_to_artifact(
store=self.store,
artifact = self.store.link_execution_to_artifact(
execution_id=self.execution.id,
uri=uri,
input_name=url,
Expand All @@ -505,8 +493,7 @@ def log_dataset_with_version(self, url: str, version: str, event: str, props: di
# if((existing_artifact and len(existing_artifact )!= 0) and c_hash != ""):
# url = url + ":" + str(self.execution.id)
uri = c_hash if c_hash and c_hash.strip() else str(uuid.uuid1())
artifact = create_new_artifact_event_and_attribution(
store=self.store,
artifact = self.store.create_new_artifact_event_and_attribution(
execution_id=self.execution.id,
context_id=self.child_context.id,
uri=uri,
Expand Down Expand Up @@ -621,22 +608,21 @@ def log_model(self, path: str, event: str, model_framework: str = "Default",

if existing_artifact and len(
existing_artifact) != 0 and event_type == mlpb.Event.Type.INPUT:
# update url for existing artifact

print(type(existing_artifact))
existing_artifact = self.update_model_url(existing_artifact, url_with_pipeline)
artifact = link_execution_to_artifact(
store=self.store,
artifact = self.store.link_execution_to_artifact(
execution_id=self.execution.id,
uri=c_hash,
input_name=model_uri,
event_type=event_type)
event_type=event_type
)
model_uri = artifact.name
else:

uri = c_hash if c_hash and c_hash.strip() else str(uuid.uuid1())
model_uri = model_uri + ":" + str(self.execution.id)
artifact = create_new_artifact_event_and_attribution(
store=self.store,
artifact = self.store.create_new_artifact_event_and_attribution(
execution_id=self.execution.id,
context_id=self.child_context.id,
uri=uri,
Expand Down Expand Up @@ -726,18 +712,18 @@ def log_model_with_version(self, path: str, event: str, props=None,
if existing_artifact and len(existing_artifact) != 0 and event_type == mlpb.Event.Type.INPUT:
# update url for existing artifact
existing_artifact = self.update_model_url(existing_artifact, url)
artifact = link_execution_to_artifact(store=self.store,
execution_id=self.execution.id,
uri=c_hash,
input_name=model_uri,
event_type=event_type)
artifact = self.store.link_execution_to_artifact(
execution_id=self.execution.id,
uri=c_hash,
input_name=model_uri,
event_type=event_type
)
model_uri = artifact.name
else:

uri = c_hash if c_hash and c_hash.strip() else str(uuid.uuid1())
model_uri = model_uri + ":" + str(self.execution.id)
artifact = create_new_artifact_event_and_attribution(
store=self.store,
artifact = self.store.create_new_artifact_event_and_attribution(
execution_id=self.execution.id,
context_id=self.child_context.id,
uri=uri,
Expand Down Expand Up @@ -801,8 +787,7 @@ def log_execution_metrics(self, metrics_name: str, custom_properties: t.Optional
custom_props = {} if custom_properties is None else custom_properties
uri = str(uuid.uuid1())
metrics_name = metrics_name + ":" + uri + ":" + str(self.execution.id)
metrics = create_new_artifact_event_and_attribution(
store=self.store,
metrics = self.store.create_new_artifact_event_and_attribution(
execution_id=self.execution.id,
context_id=self.child_context.id,
uri=uri,
Expand Down Expand Up @@ -876,10 +861,9 @@ def commit_metrics(self, metrics_name: str):
uri = dvc_get_hash(metrics_name)
metrics_commit = uri
name = metrics_name + ":" + uri + ":" + \
str(self.execution.id) + ":" + str(uuid.uuid1())
str(self.execution.id) + ":" + str(uuid.uuid1())
custom_props = {"Name": metrics_name, "Commit": metrics_commit} #passing uri value to commit
metrics = create_new_artifact_event_and_attribution(
store=self.store,
metrics = self.store.create_new_artifact_event_and_attribution(
execution_id=self.execution.id,
context_id=self.child_context.id,
uri=uri,
Expand Down Expand Up @@ -911,8 +895,7 @@ def commit_metrics(self, metrics_name: str):

def log_validation_output(self, version: str, custom_properties: t.Optional[t.Dict] = None) -> object:
uri = str(uuid.uuid1())
return create_new_artifact_event_and_attribution(
store=self.store,
return self.store.create_new_artifact_event_and_attribution(
execution_id=self.execution.id,
context_id=self.child_context.id,
uri=uri,
Expand All @@ -933,15 +916,15 @@ def update_existing_artifact(self, artifact: mlpb.Artifact, custom_properties: t
artifact.custom_properties[key].int_value = value
else:
artifact.custom_properties[key].string_value = str(value)
put_artifact(self.store, artifact)
self.store.put_artifact(artifact)

def get_artifact(self, artifact_id: int) -> mlpb.Artifact:
"""Gets the artifact object from mlmd"""
return get_artifacts_by_id(self.store, [artifact_id])[0]
return self.store.get_artifacts_by_id(artifact_id)[0]

def update_model_output(self, artifact: mlpb.Artifact):
"""updates an artifact"""
put_artifact(self.store, artifact)
self.store.put_artifact(artifact)

def create_dataslice(self, name: str) -> "Cmf.DataSlice":
"""Creates a dataslice object.
Expand Down Expand Up @@ -1040,8 +1023,7 @@ def commit(self, custom_properties: t.Optional[t.Dict] = None) -> None:
self.writer.store.get_artifacts_by_uri(c_hash))
if existing_artifact and len(existing_artifact) != 0:
print("Adding to existing data slice")
_ = link_execution_to_input_artifact(
store=self.writer.store,
_ = self.writer.store.link_execution_to_input_artifact(
execution_id=self.writer.execution.id,
uri=c_hash,
input_name=self.name + ":" + c_hash)
Expand All @@ -1050,10 +1032,8 @@ def commit(self, custom_properties: t.Optional[t.Dict] = None) -> None:
"Commit": dataslice_commit, #passing c_hash value to commit
"git_repo": git_repo,
"Remote": remote}
custom_properties = props.update(
custom_properties) if custom_properties else props
create_new_artifact_event_and_attribution(
store=self.writer.store,
custom_properties = props.update(custom_properties) if custom_properties else props
self.writer.store.create_new_artifact_event_and_attribution(
execution_id=self.writer.execution.id,
context_id=self.writer.child_context.id,
uri=c_hash,
Expand Down
Loading