Skip to content

Commit

Permalink
Implement ChRIS links for out-of-network zipfile and in-network swift…
Browse files Browse the repository at this point in the history
… storages
  • Loading branch information
jbernal0019 committed Feb 16, 2024
1 parent 94462c4 commit 2f5ebf4
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 27 deletions.
62 changes: 61 additions & 1 deletion pfcon/base_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,9 @@
"""

import logging
import os
import abc
import shutil
import shutil, errno


logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -38,3 +39,62 @@ def delete_data(self, job_dir):
Delete job data from the local storage.
"""
shutil.rmtree(job_dir)

def process_chrislink_files(self, job_incoming_dir):
"""
Rearrange the local job incoming directory tree by creating folders that trace
the source dirs pointed by ChRIS link files.
"""
linked_paths = set()
nlinks = 0

for root, dirs, files in os.walk(job_incoming_dir):
for filename in files:
if filename.endswith('.chrislink'):
link_file_path = os.path.join(root, filename)

with open(link_file_path, 'rb') as f:
path = f.read().decode().strip()
if os.path.isfile(os.path.join(job_incoming_dir, path)):
path = os.path.dirname(path)

source_trace_dir = path.replace('/', '_')
dst_path = os.path.join(root, source_trace_dir)

if not os.path.isdir(dst_path):
self.copysrc(os.path.join(job_incoming_dir, path), dst_path)

linked_paths.add(path)

os.remove(link_file_path)
nlinks += 1

linked_path_top_folders = set()
for path in linked_paths:
linked_path_top_folders.add(path.split('/', 1)[0])

for folder in linked_path_top_folders:
if folder not in linked_paths:
self.deletesrc(os.path.join(job_incoming_dir, folder))

return nlinks

@staticmethod
def copysrc(src, dst):
try:
shutil.copytree(src, dst)
except OSError as e:
if e.errno in (errno.ENOTDIR, errno.EINVAL):
shutil.copy(src, dst)
else:
raise

@staticmethod
def deletesrc(src):
try:
shutil.rmtree(src)
except OSError as e:
if e.errno in (errno.ENOTDIR, errno.EINVAL):
os.remove(src)
else:
raise
6 changes: 4 additions & 2 deletions pfcon/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def post(self):
if self.pfcon_innetwork:
if args.input_dirs is None:
abort(400, message='input_dirs: field is required')
if self.storage_env == 'filesystem' and args.output_dir is None:
if args.output_dir is None:
abort(400, message='output_dir: field is required')
else:
if request.files['data_file'] is None:
Expand All @@ -99,6 +99,7 @@ def post(self):
input_dir = args.input_dirs[0].strip('/')
output_dir = args.output_dir.strip('/')
incoming_dir = os.path.join(self.storebase_mount, input_dir)

storage = FileSystemStorage(app.config)
try:
d_info = storage.store_data(job_id, incoming_dir)
Expand All @@ -117,7 +118,8 @@ def post(self):
if self.storage_env == 'swift':
storage = SwiftStorage(app.config)
try:
d_info = storage.store_data(job_id, incoming_dir, args.input_dirs)
d_info = storage.store_data(job_id, incoming_dir, args.input_dirs,
job_output_path=args.output_dir.strip('/'))
except ClientException as e:
logger.error(f'Error while fetching files from swift and '
f'storing job {job_id} data, detail: {str(e)}')
Expand Down
97 changes: 75 additions & 22 deletions pfcon/swift_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,30 +32,41 @@ def store_data(self, job_id, job_incoming_dir, data, **kwargs):
Fetch the files with prefixes in the data list from swift storage into the
specified incoming directory.
"""
nfiles = 0
self.job_id = job_id
self.job_output_path = kwargs['job_output_path']

all_obj_paths = set()

for swift_path in data:
try:
l_ls = self.swift_manager.ls(swift_path)
except ClientException as e:
logger.error(f'Error while listing swift storage files in {swift_path} '
f'for job {job_id}, detail: {str(e)}')
raise
for obj_path in l_ls:
try:
contents = self.swift_manager.download_obj(obj_path)
except ClientException as e:
logger.error(f'Error while downloading file {obj_path} from swift '
f'storage for job {job_id}, detail: {str(e)}')
raise

local_file_path = obj_path.replace(swift_path, '', 1).lstrip('/')
local_file_path = os.path.join(job_incoming_dir, local_file_path)
os.makedirs(os.path.dirname(local_file_path), exist_ok=True)
with open(local_file_path, 'wb') as f:
f.write(contents)
nfiles += 1
obj_paths = set()
visited_paths = set()

self._find_all_storage_object_paths(swift_path, obj_paths, visited_paths)

for obj_path in obj_paths:
if obj_path not in all_obj_paths: # download a given file only once
try:
contents = self.swift_manager.download_obj(obj_path)
except ClientException as e:
logger.error(f'Error while downloading file {obj_path} from swift '
f'storage for job {job_id}, detail: {str(e)}')
raise

local_file_path = obj_path.replace(swift_path, '', 1).lstrip('/')
local_file_path = os.path.join(job_incoming_dir, local_file_path)
os.makedirs(os.path.dirname(local_file_path), exist_ok=True)

with open(local_file_path, 'wb') as f:
f.write(contents)

all_obj_paths.add(obj_path)

nfiles = len(all_obj_paths)
logger.info(f'{nfiles} files fetched from swift storage for job {job_id}')

nlinks = self.process_chrislink_files(job_incoming_dir)
nfiles -= nlinks

return {
'jid': job_id,
'nfiles': nfiles,
Expand Down Expand Up @@ -83,7 +94,8 @@ def get_data(self, job_id, job_outgoing_dir, **kwargs):
for root, dirs, files in os.walk(job_outgoing_dir):
for filename in files:
local_file_path = os.path.join(root, filename)
if not os.path.islink(local_file_path):

if not os.path.islink(local_file_path) and not local_file_path.endswith('.chrislink'):
rel_file_path = os.path.relpath(local_file_path, job_outgoing_dir)
swift_file_path = os.path.join(swift_output_path, rel_file_path)

Expand All @@ -100,8 +112,49 @@ def get_data(self, job_id, job_outgoing_dir, **kwargs):
logger.error(f'Failed to read file {local_file_path} for '
f'job {job_id}, detail: {str(e)}')
raise

swift_rel_file_paths.append(rel_file_path)

data = {'job_output_path': swift_output_path,
'rel_file_paths': swift_rel_file_paths}
return io.BytesIO(json.dumps(data).encode())

def _find_all_storage_object_paths(self, storage_path, obj_paths, visited_paths):
"""
Find all object storage paths from the passed storage path (prefix) by
recursively following ChRIS links. The resulting set of object paths is given
by the obj_paths set argument.
"""
if not storage_path.startswith(tuple(visited_paths)): # avoid infinite loops
visited_paths.add(storage_path)
job_id = self.job_id
job_output_path = self.job_output_path

try:
l_ls = self.swift_manager.ls(storage_path)
except Exception as e:
logger.error(f'Error while listing swift storage files in {storage_path} '
f'for job {job_id}, detail: {str(e)}')
raise

for obj_path in l_ls:
if obj_path.endswith('.chrislink'):
try:
linked_path = self.swift_manager.download_obj(
obj_path).decode().strip()
except Exception as e:
logger.error(f'Error while downloading file {obj_path} from '
f'swift storage for job {job_id}, detail: {str(e)}')
raise

if f'{job_output_path}/'.startswith(linked_path.rstrip('/') + '/'):
# link files are not allowed to point to the job output dir or
# any of its ancestors
logger.error(f'Found invalid input path {linked_path} for job '
f'{job_id} pointing to an ancestor of the job '
f'output dir: {job_output_path}')
raise ValueError(f'Invalid input path: {linked_path}')

self._find_all_storage_object_paths(linked_path, obj_paths,
visited_paths) # recursive call
obj_paths.add(obj_path)
7 changes: 6 additions & 1 deletion pfcon/zip_file_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ def store_data(self, job_id, job_incoming_dir, data, **kwargs):
logger.info(f'{nfiles} files to decompress for job {job_id}')
job_zip.extractall(path=job_incoming_dir)

nlinks = self.process_chrislink_files(job_incoming_dir)
nfiles -= nlinks

return {
'jid': job_id,
'nfiles': nfiles,
Expand All @@ -51,7 +54,8 @@ def get_data(self, job_id, job_outgoing_dir, **kwargs):
for root, dirs, files in os.walk(job_outgoing_dir):
for filename in files:
local_file_path = os.path.join(root, filename)
if not os.path.islink(local_file_path):

if not os.path.islink(local_file_path) and not local_file_path.endswith('.chrislink'):
arc_file_path = os.path.relpath(local_file_path, job_outgoing_dir)
try:
with open(local_file_path, 'rb') as f:
Expand All @@ -61,6 +65,7 @@ def get_data(self, job_id, job_outgoing_dir, **kwargs):
f'job {job_id}, detail: {str(e)}')
raise
nfiles += 1

memory_zip_file.seek(0)

logger.info(f'{nfiles} files compressed for job {job_id}')
Expand Down
3 changes: 2 additions & 1 deletion tests/test_resources_innetwork.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ def test_post(self):
'gpu_limit': '0',
'image': 'fnndsc/pl-simplefsapp',
'type': 'fs',
'input_dirs': [self.swift_input_path]
'input_dirs': [self.swift_input_path],
'output_dir': self.swift_output_path
}
# make the POST request
response = self.client.post(self.url, data=data, headers=self.headers)
Expand Down

0 comments on commit 2f5ebf4

Please sign in to comment.