Skip to content

Commit

Permalink
Add automated tests for the new ChRIS link feature and fix bug when e…
Browse files Browse the repository at this point in the history
…xpanding links
  • Loading branch information
jbernal0019 committed Feb 29, 2024
1 parent 02d2f1c commit 47b73b7
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 26 deletions.
60 changes: 37 additions & 23 deletions pfcon/base_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,39 +45,53 @@ def process_chrislink_files(self, job_incoming_dir):
Rearrange the local job incoming directory tree by creating folders that trace
the source dirs pointed by ChRIS link files.
"""
linked_paths = set()
nlinks = 0
self.job_incoming_dir = job_incoming_dir
self._linked_paths = set()
self._nlinks = 0
self._already_copied_src_set = set()

for root, dirs, files in os.walk(job_incoming_dir):
self._process_chrislink_files(job_incoming_dir)

linked_path_top_folders = set()
for path in self._linked_paths:
linked_path_top_folders.add(path.split('/', 1)[0])

for folder in linked_path_top_folders:
if folder not in self._linked_paths:
self.deletesrc(os.path.join(job_incoming_dir, folder))

return self._nlinks

def _process_chrislink_files(self, dir):
"""
Recursively expand (substitute by actual folders) and remove ChRIS link files.
"""
for root, dirs, files in os.walk(dir):
for filename in files:
if filename.endswith('.chrislink'):
link_file_path = os.path.join(root, filename)

with open(link_file_path, 'rb') as f:
path = f.read().decode().strip()
if os.path.isfile(os.path.join(job_incoming_dir, path)):
path = os.path.dirname(path)

source_trace_dir = path.replace('/', '_')
dst_path = os.path.join(root, source_trace_dir)

if not os.path.isdir(dst_path):
self.copysrc(os.path.join(job_incoming_dir, path), dst_path)
if not link_file_path.startswith(tuple(self._already_copied_src_set)): # only expand a link once
with open(link_file_path, 'rb') as f:
rel_path = f.read().decode().strip()
abs_path = os.path.join(self.job_incoming_dir, rel_path)

linked_paths.add(path)
if os.path.isfile(abs_path):
rel_path = os.path.dirname(rel_path)
abs_path = os.path.dirname(abs_path)

os.remove(link_file_path)
nlinks += 1
source_trace_dir = rel_path.replace('/', '_')
dst_path = os.path.join(root, source_trace_dir)

linked_path_top_folders = set()
for path in linked_paths:
linked_path_top_folders.add(path.split('/', 1)[0])
if not os.path.isdir(dst_path): # only copy once to a dest path
self.copysrc(abs_path, dst_path)
self._already_copied_src_set.add(abs_path)
self._process_chrislink_files(dst_path) # recursive call

for folder in linked_path_top_folders:
if folder not in linked_paths:
self.deletesrc(os.path.join(job_incoming_dir, folder))
self._linked_paths.add(rel_path)

return nlinks
os.remove(link_file_path)
self._nlinks += 1

@staticmethod
def copysrc(src, dst):
Expand Down
53 changes: 53 additions & 0 deletions tests/test_resources_innetwork.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,59 @@ def test_post(self):
# cleanup swarm job
pman.delete_job(job_id)

def test_post_with_chris_links(self):
job_id = 'chris-jid-1'
self.job_dir = os.path.join(self.storebase_mount, 'key-' + job_id)

data = {
'jid': job_id,
'entrypoint': ['python3', '/usr/local/bin/simpledsapp'],
'args': ['--saveinputmeta', '--saveoutputmeta', '--prefix', 'lo'],
'auid': 'cube',
'number_of_workers': '1',
'cpu_limit': '1000',
'memory_limit': '200',
'gpu_limit': '0',
'image': 'fnndsc/pl-simpledsapp',
'type': 'ds',
'input_dirs': [self.swift_input_path],
'output_dir': self.swift_output_path
}

pipeline_dir = 'PIPELINES/bob'
with io.StringIO('Test pipeline') as f:
self.swift_manager.upload_obj(pipeline_dir + '/pipeline.yml', f.read(),
content_type='text/plain')
link_dir = 'home/bob'
with io.StringIO('PIPELINES/bob') as f:
self.swift_manager.upload_obj(link_dir + '/PIPELINES_bob.chrislink', f.read(),
content_type='text/plain')

with io.StringIO('home/bob') as f:
self.swift_manager.upload_obj(self.swift_input_path + '/home_bob.chrislink',
f.read(), content_type='text/plain')

# make the POST request
response = self.client.post(self.url, data=data, headers=self.headers)
self.assertEqual(response.status_code, 201)
self.assertIn('compute', response.json)
self.assertIn('data', response.json)
self.assertEqual(response.json['data']['nfiles'], 2)

with self.app.test_request_context():
pman = PmanService.get_service_obj()
for _ in range(10):
time.sleep(3)
d_compute_response = pman.get_job(job_id)
if d_compute_response['status'] == 'finishedSuccessfully': break
self.assertEqual(d_compute_response['status'], 'finishedSuccessfully')

self.assertTrue(os.path.isfile(f'{self.storebase_mount}/key-{job_id}/outgoing/home_bob/PIPELINES_bob/lopipeline.yml'))


# cleanup swarm job
pman.delete_job(job_id)


class TestJob(ResourceTests):
"""
Expand Down
61 changes: 58 additions & 3 deletions tests/test_resources_innetwork_fslink.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def setUp(self):
logging.disable(logging.WARNING)

self.app = create_app({'PFCON_INNETWORK': True,
'STORAGE_ENV': 'filesystem'
'STORAGE_ENV': 'fslink'
})
self.client = self.app.test_client()
with self.app.test_request_context():
Expand All @@ -37,7 +37,7 @@ def setUp(self):
self.headers = {'Authorization': 'Bearer ' + response.json['token']}

self.storebase_mount = self.app.config.get('STOREBASE_MOUNT')
self.user_dir = os.path.join(self.storebase_mount, 'foo')
self.user_dir = os.path.join(self.storebase_mount, 'home/foo')

# copy a file to the filesystem storage input path
self.fs_input_path = os.path.join(self.user_dir, 'feed/input')
Expand All @@ -52,6 +52,10 @@ def tearDown(self):
if os.path.isdir(self.user_dir):
shutil.rmtree(self.user_dir)

pipeline_dir = os.path.join(self.storebase_mount, 'PIPELINES')
if os.path.isdir(pipeline_dir):
shutil.rmtree(pipeline_dir)

# re-enable logging
logging.disable(logging.NOTSET)

Expand All @@ -71,7 +75,7 @@ def test_get(self):
self.assertEqual(response.status_code, 200)
self.assertTrue('server_version' in response.json)
self.assertTrue(response.json['pfcon_innetwork'])
self.assertEqual(response.json['storage_env'], 'filesystem')
self.assertEqual(response.json['storage_env'], 'fslink')

def test_post(self):
job_id = 'chris-jid-1'
Expand All @@ -90,6 +94,7 @@ def test_post(self):
'input_dirs': [os.path.relpath(self.fs_input_path, self.storebase_mount)],
'output_dir': os.path.relpath(self.fs_output_path, self.storebase_mount)
}

# make the POST request
response = self.client.post(self.url, data=data, headers=self.headers)
self.assertEqual(response.status_code, 201)
Expand All @@ -108,6 +113,56 @@ def test_post(self):
# cleanup swarm job
pman.delete_job(job_id)

def test_post_with_chris_links(self):
job_id = 'chris-jid-1'

data = {
'jid': job_id,
'entrypoint': ['python3', '/usr/local/bin/simpledsapp'],
'args': ['--saveinputmeta', '--saveoutputmeta', '--prefix', 'lo'],
'auid': 'cube',
'number_of_workers': '1',
'cpu_limit': '1000',
'memory_limit': '200',
'gpu_limit': '0',
'image': 'fnndsc/pl-simpledsapp',
'type': 'ds',
'input_dirs': [os.path.relpath(self.fs_input_path, self.storebase_mount)],
'output_dir': os.path.relpath(self.fs_output_path, self.storebase_mount)
}

pipeline_dir = os.path.join(self.storebase_mount, 'PIPELINES/bob')
os.makedirs(pipeline_dir, exist_ok=True)
link_dir = os.path.join(self.storebase_mount, 'home/bob')
os.makedirs(link_dir, exist_ok=True)

with open(pipeline_dir + '/pipeline.yml', 'w') as f:
f.write('Test pipeline')
with open(link_dir + '/PIPELINES_bob.chrislink', 'w') as f:
f.write('PIPELINES/bob')
with open(self.fs_input_path + '/home_bob.chrislink', 'w') as f:
f.write('home/bob')

# make the POST request
response = self.client.post(self.url, data=data, headers=self.headers)
self.assertEqual(response.status_code, 201)
self.assertIn('compute', response.json)
self.assertIn('data', response.json)
self.assertEqual(response.json['data']['nfiles'], 2)

with self.app.test_request_context():
pman = PmanService.get_service_obj()
for _ in range(10):
time.sleep(3)
d_compute_response = pman.get_job(job_id)
if d_compute_response['status'] == 'finishedSuccessfully': break
self.assertEqual(d_compute_response['status'], 'finishedSuccessfully')

self.assertTrue(os.path.isfile(f'{self.fs_output_path}/home_bob/PIPELINES_bob/lopipeline.yml'))

# cleanup swarm job
pman.delete_job(job_id)


class TestJob(ResourceTests):
"""
Expand Down

0 comments on commit 47b73b7

Please sign in to comment.