Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Osdf cache support #221

Merged
merged 4 commits into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 12 additions & 4 deletions cmflib/cmf.py
Original file line number Diff line number Diff line change
Expand Up @@ -2137,6 +2137,7 @@ def cmf_init(type: str = "",
password: str = "",
port: int = 0,
osdf_path: str = "",
osdf_cache: str = "",
key_id: str = "",
key_path: str = "",
key_issuer: str = "",
Expand Down Expand Up @@ -2169,7 +2170,12 @@ def cmf_init(type: str = "",
session_token: Session token for AmazonS3.
user: SSH remote username.
password: SSH remote password.
port: SSH remote port
port: SSH remote port.
osdf_path: OSDF Origin Path.
osdf_cache: OSDF Cache Path (Optional).
key_id: OSDF Key ID.
key_path: OSDF Private Key Path.
key_issuer: OSDF Key Issuer URL.
Returns:
Output based on the initialized repository type.
"""
Expand All @@ -2196,6 +2202,7 @@ def cmf_init(type: str = "",
'user': user,
'password': password,
'osdf_path': osdf_path,
'osdf_cache': osdf_cache,
'key_id': key_id,
'key_path': key_path,
'key-issuer': key_issuer,
Expand Down Expand Up @@ -2265,10 +2272,11 @@ def cmf_init(type: str = "",

return output

elif type == "osdfremote" and osdf_path != "" and key_id != "" and key_path != 0 and key_issuer != "" and git_remote_url != "":
elif type == "osdfremote" and osdf_path != "" and key_id != "" and key_path != "" and key_issuer != "" and git_remote_url != "":
"""Initialize osdfremote repository"""
output = _init_osdfremote(
osdf_path,
osdf_cache,
key_id,
key_path,
key_issuer,
Expand All @@ -2293,10 +2301,10 @@ def non_related_args(type : str, args : dict):
minioS3=["url", "endpoint_url", "access_key_id", "secret_key", "git_remote_url"]
amazonS3=["url", "access_key_id", "secret_key", "session_token", "git_remote_url"]
sshremote=["path", "user", "port", "password", "git_remote_url"]
osdfremote=["osdf_path", "key_id", "key_path", "key-issuer", "git_remote_url"]
osdfremote=["osdf_path", "osdf_cache", "key_id", "key_path", "key-issuer", "git_remote_url"]


dict_repository_args={"local" : local, "minioS3" : minioS3, "amazonS3" : amazonS3, "sshremote" : sshremote}
dict_repository_args={"local" : local, "minioS3" : minioS3, "amazonS3" : amazonS3, "sshremote" : sshremote, "osdfremote": osdfremote}

for repo,arg in dict_repository_args.items():
if repo ==type:
Expand Down
4 changes: 3 additions & 1 deletion cmflib/cmf_commands_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,13 +252,15 @@ def _init_sshremote(path,user, port, password, git_remote_url, cmf_server_url, n
print(msg)
return msg

def _init_osdfremote(path, key_id, key_path, key_issuer, git_remote_url, cmf_server_url, neo4j_user, neo4j_password, neo4j_uri):
def _init_osdfremote(path, cache, key_id, key_path, key_issuer, git_remote_url, cmf_server_url, neo4j_user, neo4j_password, neo4j_uri):
cli_args = cli.parse_args(
[
"init",
"osdf",
"--path",
path,
"--cache",
cache,
"--key-id",
key_id,
"--key-path",
Expand Down
20 changes: 16 additions & 4 deletions cmflib/commands/artifact/pull.py
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,11 @@ def search_artifact(self, input_dict):
continue
# Splitting the 'name' using ':' as the delimiter and storing the first argument in the 'name' variable.
name = name.split(":")[0]
artifact_hash = name = name.split(":")[1]
# Splitting the path on '/' to extract the file name, excluding the directory structure.
file_name = name.split('/')[-1]
if file_name == self.args.artifact_name:
return name, url
return name, url, artifact_hash
else:
pass

Expand Down Expand Up @@ -201,7 +202,8 @@ def run(self):
) # getting all artifacts with id
temp_dict = dict(zip(get_artifacts['name'], get_artifacts['url'])) # getting dictionary of name and url pair
name_url_dict.update(temp_dict) # updating name_url_dict with temp_dict
# print(name_url_dict)

#print(name_url_dict)
# name_url_dict = ('artifacts/parsed/test.tsv:6f597d341ceb7d8fbbe88859a892ef81', 'Test-env:/home/sharvark/local-storage/6f/597d341ceb7d8fbbe88859a892ef81'
# name_url_dict = ('artifacts/parsed/test.tsv:6f597d341ceb7d8fbbe88859a892ef81', 'Test-env:/home/sharvark/local-storage/6f/597d341ceb7d8fbbe88859a892ef81,Second-env:/home/sharvark/local-storage/6f/597d341ceb7d8fbbe88859a892ef81')

Expand All @@ -216,6 +218,7 @@ def run(self):
output = self.search_artifact(name_url_dict)
# output[0] = name
# output[1] = url
# output[2] = hash
if output is None:
print(f"{self.args.artifact_name} doesn't exist.")
else:
Expand Down Expand Up @@ -322,36 +325,45 @@ def run(self):
#Need to write to cmfconfig with new credentials
#CmfConfig.write_config(cmf_config, "osdf", attr_dict, True)
#Now Ready to do dvc pull
cache_path=cmf_config["osdf-cache"]

osdfremote_class_obj = osdf_artifacts.OSDFremoteArtifacts()
if self.args.artifact_name:
output = self.search_artifact(name_url_dict)
# output[0] = name
# output[1] = url
# output[3]=artifact_hash
if output is None:
print(f"{self.args.artifact_name} doesn't exist.")
else:
args = self.extract_repo_args("osdf", output[0], output[1], current_directory)
#print(f"Hash for the artifact {self.args.artifact_name} is {output[3]}")
stmt = osdfremote_class_obj.download_artifacts(
dvc_config_op,
args[0], # s_url of the artifact
cache_path,
current_directory,
args[1], # download_loc of the artifact
args[2] # name of the artifact
args[2], # name of the artifact
output[3] #Artifact Hash
)
print(stmt)
else:
for name, url in name_url_dict.items():
#print(name, url)
if not isinstance(url, str):
continue
artifact_hash = name.split(':')[1] #Extract Hash of the artifact from name
#print(f"Hash for the artifact {name} is {artifact_hash}")
args = self.extract_repo_args("osdf", name, url, current_directory)
stmt = osdfremote_class_obj.download_artifacts(
dvc_config_op,
args[0], # host,
cache_path,
current_directory,
args[1], # remote_loc of the artifact
args[2] # name
args[2], # name
artifact_hash #Artifact Hash
)
print(stmt)
return "Done"
Expand Down
9 changes: 9 additions & 0 deletions cmflib/commands/init/osdfremote.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ def run(self):

attr_dict = {}
attr_dict["path"] = self.args.path
attr_dict["cache"] = self.args.cache
attr_dict["key_id"] = self.args.key_id
attr_dict["key_path"] = self.args.key_path
attr_dict["key_issuer"] = self.args.key_issuer
Expand Down Expand Up @@ -127,6 +128,14 @@ def add_parser(subparsers, parent_parser):
default=argparse.SUPPRESS,
)

parser.add_argument(
"--cache",
help="Specify FQDN for OSDF cache path including port and path. For Ex. https://osdf-director.osg-htc.org/nrp/fdp/",
metavar="<cache>",
#default="https://osdf-director.osg-htc.org/nrp/fdp/",
default="",
)

required_arguments.add_argument(
"--key-id",
required=True,
Expand Down
146 changes: 119 additions & 27 deletions cmflib/storage_backends/osdf_artifacts.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,91 @@
import requests
#import urllib3
#urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
import hashlib
import time
from urllib.parse import urlparse

def generate_cached_url(url, cache):
#This takes host URL as supplied from MLMD records and generates cached URL=cache_path + path
#Example Input: https://sdsc-origin.nationalresearchplatform.org:8443/nrp/fdp/23/6d9502e0283d91f689d7038b8508a2
#Example Output: https://osdf-director.osg-htc.org/nrp/fdp/23/6d9502e0283d91f689d7038b8508a2
#The assumption is that url obtained from MLMD is more accurate. So we use the path from this URL and append it to cache path
#but we clean up the cache path to only its scheme + netloc: https://osdf-director.osg-htc.org
parsed_url = urlparse(url)
parsed_cache_url= urlparse(cache)
cached_url= parsed_cache_url.scheme + "://" + parsed_cache_url.netloc + parsed_url.path
return cached_url

def calculate_md5_from_file(file_path, chunk_size=8192):
md5 = hashlib.md5()
try:
with open(file_path, 'rb') as f:
while chunk := f.read(chunk_size):
md5.update(chunk)
except Exception as e:
print(f"An error occurred while reading the file: {e}")
return None
return md5.hexdigest()

def download_and_verify_file(host, headers, remote_file_path, local_path, artifact_hash, timeout):
print(f"Fetching artifact={local_path}, surl={host} to {remote_file_path}")
data= None
try:
response = requests.get(host, headers=headers, timeout=timeout, verify=True) # This should be made True. otherwise this will produce Insecure SSL Warning
if response.status_code == 200 and response.content:
data = response.content
else:
return False, "No data received from the server."
#pass
except requests.exceptions.Timeout:
return False, "The request timed out."
#pass
except Exception as exception:
return False, str(exception)

if data is not None:
try:
with open(remote_file_path, 'wb') as file:
file.write(data)
if os.path.exists(remote_file_path) and os.path.getsize(remote_file_path) > 0:
# Calculate MD5 hash of the downloaded file
start_time = time.time()
md5_hash = calculate_md5_from_file(remote_file_path)
end_time = time.time()
time_taken = end_time - start_time
if md5_hash:
#print(f"MD5 hash of the downloaded file is: {md5_hash}")
#print(f"Time taken to calculate MD5 hash: {time_taken:.2f} seconds")
if artifact_hash == md5_hash:
#print("MD5 hash of the downloaded file matches the hash in MLMD records.")
stmt = f"object {local_path} downloaded at {remote_file_path} in {time_taken:.2f} seconds and matches MLMD records."
success=True
else:
#print("Error: MD5 hash of the downloaded file does not match the hash in MLMD records.")
stmt = f"object {local_path} downloaded at {remote_file_path} in {time_taken:.2f} seconds and does NOT match MLMD records."
success=False
return success, stmt
else:
print("Failed to calculate MD5 hash of the downloaded file.")
except Exception as e:
print(f"An error occurred while writing to the file: {e}")
return False, f"An error occurred while writing to the file: {e}"

return False, "Data is None."


class OSDFremoteArtifacts:
def download_artifacts(
self,
dvc_config_op,
host: str, #s_url
cache: str, #cache_path from cmfconfig
current_directory: str, #current_directory where cmf artifact pull is executed
remote_file_path: str, # download_loc of the artifact
local_path: str, #name of the artifact
artifact_hash: str, #hash of the artifact from MLMD records
):
#print(f"Configured Host from MLMD record={host}. User configured cache redirector={cache}")
output = ""
remote_repo = dvc_config_op["remote.osdf.url"]
user = "nobody"
Expand All @@ -36,33 +111,50 @@ def download_artifacts(
#print(f"dynamic password from download_artifacts={dynamic_password}")
#print(f"Fetching artifact={local_path}, surl={host} to {remote_file_path} when this has been called at {current_directory}")

try:
headers={dvc_config_op["remote.osdf.custom_auth_header"]: dvc_config_op["remote.osdf.password"]}
temp = local_path.split("/")
temp.pop()
dir_path = "/".join(temp)
dir_to_create = os.path.join(current_directory, dir_path)
os.makedirs(
dir_to_create, mode=0o777, exist_ok=True
) # creates subfolders needed as per artifacts folder structure
local_file_path = os.path.join(current_directory, local_path)
local_file_path = os.path.abspath(local_file_path)

response = requests.get(host, headers=headers, verify=True) #This should be made True. otherwise this will produce Insecure SSL Warning
if response.status_code == 200 and response.content:
data = response.content
# Prepare directories and file paths
headers={dvc_config_op["remote.osdf.custom_auth_header"]: dvc_config_op["remote.osdf.password"]}
temp = local_path.split("/")
temp.pop()
dir_path = "/".join(temp)
dir_to_create = os.path.join(current_directory, dir_path)
os.makedirs(
dir_to_create, mode=0o777, exist_ok=True
) # creates subfolders needed as per artifacts folder structure
local_file_path = os.path.join(current_directory, local_path)
local_file_path = os.path.abspath(local_file_path)

#Cache can be Blank. If so, fetch from Origin
if cache == "":
#Fetch from Origin
success, result = download_and_verify_file(host, headers, remote_file_path, local_file_path, artifact_hash, timeout=10)
if success:
#print(result)
return result
else:
return "No data received from the server."
#print(f"Failed to download and verify file: {result}")
return f"Failed to download and verify file"
else:
#Generate Cached path for artifact
cached_s_url=generate_cached_url(host,cache)
#Try to fetch from cache first
success, cached_result = download_and_verify_file(cached_s_url, headers, remote_file_path, local_path, artifact_hash,timeout=5)
if success:
#print(cached_result)
return cached_result
else:
print(f"Failed to download and verify file from cache: {cached_result}")
print(f"Trying Origin at {host}")
#Fetch from Origin
success, origin_result = download_and_verify_file(host, headers, remote_file_path, local_path, artifact_hash, timeout=10)
if success:
#print(origin_result)
return origin_result
else:
#print(f"Failed to download and verify file: {result}")
return f"Failed to download and verify file"


except Exception as exception:
return exception



try:
with open(remote_file_path, 'wb') as file:
file.write(data)
if os.path.exists(remote_file_path) and os.path.getsize(remote_file_path) > 0:
#print(f"object {local_path} downloaded at {remote_file_path}")
stmt = f"object {local_path} downloaded at {remote_file_path}."
return stmt
except Exception as e:
print(f"An error occurred while writing to the file: {e}")

6 changes: 4 additions & 2 deletions docs/cmf_client/cmf_client.md
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ Optional Arguments
### cmf init osdfremote
```
Usage: cmf init osdfremote [-h] --path [path]
--cache [cache]
--key-id [key_id]
--key-path [key_path]
--key-issuer [key_issuer]
Expand All @@ -196,18 +197,19 @@ Usage: cmf init osdfremote [-h] --path [path]
```
`cmf init osdfremote` configures a OSDF Origin as a cmf artifact repository.
```
cmf init osdfremote --path https://[Some Origin]:8443/nrp/fdp/ --key-id c2a5 --key-path ~/.ssh/fdp.pem --key-issuer https://[Token Issuer]] --git-remote-url https://github.com/user/experiment-repo.git --git-remote-url https://github.com/user/experiment-repo.git --cmf-server-url http://127.0.0.1:80 --neo4j-user neo4j --neo4j-password password --neo4j-uri bolt://localhost:7687
cmf init osdfremote --path https://[Some Origin]:8443/nrp/fdp/ --cache http://[Some Redirector]/nrp/fdp --key-id c2a5 --key-path ~/.ssh/fdp.pem --key-issuer https://[Token Issuer]] --git-remote-url https://github.com/user/experiment-repo.git --git-remote-url https://github.com/user/experiment-repo.git --cmf-server-url http://127.0.0.1:80 --neo4j-user neo4j --neo4j-password password --neo4j-uri bolt://localhost:7687
```
Required Arguments
```
--path [path] Specify FQDN for OSDF origin including including port and directory path
--path [path] Specify FQDN for OSDF origin including including port and directory path if any
--key-id [key_id] Specify key_id for provided private key. eg. b2d3
--key-path [key_path] Specify path for private key on local filesystem. eg. ~/.ssh/XXX.pem
--key-issuer [key_issuer] Specify URL for Key Issuer. eg. https://t.nationalresearchplatform.org/XXX
--git-remote-url [git_remote_url] Specify git repo url. eg: https://github.com/XXX/example.git
```
Optional Arguments
```
--cache [cache] Specify FQDN for OSDF cache including including port and directory path if any
-h, --help show this help message and exit
--cmf-server-url [cmf_server_url] Specify cmf-server url. (default: http://127.0.0.1:80)
--neo4j-user [neo4j_user] Specify neo4j user. (default: None)
Expand Down