Skip to content

Commit

Permalink
Artifact tree regression (#195)
Browse files Browse the repository at this point in the history
* resolving bug in artifact tree

* update

* adding changes to modifying artifact name

* adding try and except

* adding dataslice and step metrics for try except block

* update

---------

Co-authored-by: Abhinav Chobey <[email protected]>
  • Loading branch information
abhinavchobey and Abhinav Chobey authored Aug 1, 2024
1 parent 095659c commit 0a66528
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 33 deletions.
20 changes: 11 additions & 9 deletions server/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,21 +287,23 @@ async def display_artifact(
"items": None
}

@app.get("/display_arti_tree_lineage/{lineagetype}/{pipeline_name}")
async def display_arti_tree_lineage(request: Request,lineagetype, pipeline_name: str):
@app.get("/display_arti_tree_lineage/{pipeline_name}")
async def display_arti_tree_lineage(request: Request, pipeline_name: str)-> List[List[Dict[str, Any]]]:
'''
returns dictionary of nodes and links for given execution_type.
response = {
nodes: [{id:"",name:"",execution_uuid:""}],
links: [{source:1,target:4},{}],
}
Returns:
A nested list of dictionaries with 'id' and 'parents' keys.
response = [
[{'id': 'data.xml.gz:236d', 'parents': []}],
[{'id': 'parsed/train.tsv:32b7', 'parents': ['data.xml.gz:236d']},
]
'''
# checks if mlmd file exists on server
response = None
if os.path.exists(server_store_path):
query = cmfquery.CmfQuery(server_store_path)
if (pipeline_name in query.get_pipeline_names()):
response = await query_artifact_tree_lineage(server_store_path, pipeline_name, dict_of_art_ids,lineagetype)
#response = "null"
response = await query_artifact_tree_lineage(server_store_path, pipeline_name, dict_of_art_ids)

return response

#This api's returns list of artifact types.
Expand Down
56 changes: 39 additions & 17 deletions server/app/query_artifact_tree_lineage.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,26 @@
import os
import os, re
from cmflib import cmfquery
from collections import deque, defaultdict
import pandas as pd
import json
from typing import List, Dict, Any

async def query_artifact_tree_lineage(mlmd_path,pipeline_name, dict_of_art_ids,lineagetype):
async def query_artifact_tree_lineage(mlmd_path: str,pipeline_name: str, dict_of_art_ids: Dict) -> List[List[Dict[str, Any]]]:
query = cmfquery.CmfQuery(mlmd_path)
pipeline_id = query.get_pipeline_id(pipeline_name)
id_name = {}
child_parent_artifact_id = {}
for type_, df in dict_of_art_ids[pipeline_name].items():
for index, row in df.iterrows():
id_name[row["id"]] = modify_arti_name(row["name"])
parent_artifacts = query.get_all_parent_artifacts(row["name"])
child_parent_artifact_id[row["id"]] = []
if not parent_artifacts.empty:
child_parent_artifact_id[row["id"]] = list(parent_artifacts["id"])
#creating a dictionary of id and artifact name {id:artifact name}
id_name[row["id"]] = modify_arti_name(row["name"],type_)
one_hop_parent_artifacts = query.get_one_hop_parent_artifacts(row["name"]) # get immediate artifacts
child_parent_artifact_id[row["id"]] = [] # assign empty dict for artifact with no parent artifact
if not one_hop_parent_artifacts.empty: # if artifact have parent artifacts
child_parent_artifact_id[row["id"]] = list(one_hop_parent_artifacts["id"])
data_organized = topological_sort(child_parent_artifact_id, id_name)
return data_organized

def topological_sort(input_data,artifact_name_id_dict):
def topological_sort(input_data,artifact_name_id_dict) -> List[Dict]:
# Initialize in-degree of all nodes to 0
in_degree = {node: 0 for node in input_data}
# Initialize adjacency list
Expand Down Expand Up @@ -51,11 +52,32 @@ def topological_sort(input_data,artifact_name_id_dict):
output_data= list(parent_dict.values())
return output_data

def modify_arti_name(arti_name):
if "metrics" in arti_name:
name = f"{arti_name.split(':')[0]}:{arti_name.split(':')[1][:4]}:{arti_name.split(':')[2]}"
else:
name = arti_name.split("artifacts/")[1].rsplit(":", 1)[0] + ":" + arti_name.rsplit(":", 1)[1][:4]
return name

#query_artifact_tree_lineage("/home/chobey/cmf-server/data/mlmd","Test-env",data,'Artifact_Tree')
def modify_arti_name(arti_name, type):
# artifact_name optimization based on artifact type.["Dataset","Model","Metrics"]
try:
name = ""
if type == "Metrics" : # Example metrics:4ebdc980-1e7c-11ef-b54c-25834a9c665c:388 -> metrics:4ebd:388
name = f"{arti_name.split(':')[0]}:{arti_name.split(':')[1][:4]}:{arti_name.split(':')[2]}"
elif type == "Model":
#first split on ':' then on '/' to get name. Example 'Test-env/prepare:uuid:32' -> prepare_uuid
name = arti_name.split(':')[-3].split("/")[-1] + ":" + arti_name.split(':')[-2][:4]
elif type == "Dataset":
# Example artifacts/data.xml.gz:236d9502e0283d91f689d7038b8508a2 -> data.xml.gz
name = arti_name.split(':')[-2] .split("/")[-1]
elif type == "Dataslice":
# cmf_artifacts/dataslices/ecd6dcde-4f3b-11ef-b8cd-f71a4cc9ba38/slice-1:e77e3466872898fcf2fa22a3752bc1ca
dataslice_part1 = arti_name.split("/",1)[1] #remove cmf_artifacts/
# dataslices/ecd6dcde-4f3b-11ef-b8cd-f71a4cc9ba38/slice-1 + : + e77e
name = dataslice_part1.rsplit(":",-1)[0] + ":" + dataslice_part1.rsplit(":",-1)[-1][:4]
elif type == "Step_Metrics":
#cmf_artifacts/metrics/1a86b01c-4da9-11ef-b8cd-f71a4cc9ba38/training_metrics:d7c32a3f4fce4888c905de07ba253b6e:3:2029c720-4da9-11ef-b8cd-f71a4cc9ba38
step_new = arti_name.split("/",1)[1] #remove cmf_artifacts/
step_metrics_part2 = arti_name.rsplit(":")
# metrics/1a86b01c-4da9-11ef-b8cd-f71a4cc9ba38/training_metrics: + d7c3 + : +3 + : + 2029
name = step_new.rsplit(":",-3)[0] + ":" + step_metrics_part2[-3][:4] + ":" + step_metrics_part2[-2] + ":" + step_metrics_part2[-1][:4]
else:
name = arti_name
except Exception as e:
print(f"Error parsing artifact name: {e}")
name = arti_name # Fallback to the original arti_name in case of error
return name
4 changes: 2 additions & 2 deletions ui/src/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ class FastAPIClient {
});
}

async getArtiTreeLineage(pipeline,lineagetype) {
return this.apiClient.get(`/display_arti_tree_lineage/${lineagetype}/${pipeline}`)
async getArtiTreeLineage(pipeline) {
return this.apiClient.get(`/display_arti_tree_lineage/${pipeline}`)
.then(({ data }) => {
return data;
});
Expand Down
9 changes: 4 additions & 5 deletions ui/src/pages/lineage/index.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ const Lineage = () => {
}
else {

fetchArtifactTree(pipeline, selectedLineageType);
fetchArtifactTree(pipeline);
}}
};

Expand All @@ -95,7 +95,7 @@ const Lineage = () => {
fetchExecutionTypes(selectedPipeline, lineageType);
}
else {
fetchArtifactTree(selectedPipeline, lineageType);
fetchArtifactTree(selectedPipeline);
}
};

Expand All @@ -110,9 +110,8 @@ const Lineage = () => {
setLineageArtifactsKey((prevKey) => prevKey + 1);
};

const fetchArtifactTree = (pipelineName,lineageType) => {
client.getArtiTreeLineage(pipelineName,lineageType).then((data) => {
console.log(data,"fetchArtifact");
const fetchArtifactTree = (pipelineName) => {
client.getArtiTreeLineage(pipelineName).then((data) => {
if (data === null) {
setArtiTreeData(null);
}
Expand Down

0 comments on commit 0a66528

Please sign in to comment.