Skip to content

Commit

Permalink
Merge pull request #66 from big-data-lab-team/develop
Browse files Browse the repository at this point in the history
Release 0.2
  • Loading branch information
ValHayot authored Oct 19, 2017
2 parents 9b2af92 + 5360811 commit 63de3f1
Show file tree
Hide file tree
Showing 25 changed files with 448 additions and 112 deletions.
24 changes: 24 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
language: python

sudo: required

services:
- docker
- privileged: true

python:
- 2.7

install: true

before_script:
- docker build -t simtool .

script:
- docker run --privileged --rm=false -v /var/run/docker.sock:/var/run/docker.sock -v $PWD:$PWD -w $PWD/sim simtool ./tests/execute_pytest.sh

after_success:
- coveralls

deploy:

30 changes: 24 additions & 6 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,11 +1,29 @@
FROM mesosphere/spark:2.0.0-2.2.0-1-hadoop-2.6
FROM jupyter/pyspark-notebook:82b978b3ceeb

RUN apt-get -y install docker.io \
python-setuptools && \
easy_install pip
USER root

RUN apt-get update && \
echo 'Y' | apt-get install apt-utils && \
echo 'Y' | apt-get install curl && \
curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo apt-key add - && \
echo 'Y' | apt install --reinstall base-files lsb-release lsb-base && \
echo 'Y' | apt-get install software-properties-common && \
echo 'Y' | apt-get install apt-transport-https && \
add-apt-repository "deb [arch=amd64] https://download.docker.com/linux/ubuntu $( lsb_release -cs ) stable" && \
apt-get update && \
apt-get install -y docker-ce && \
service docker start

RUN conda create -n simenv python=2.7 pytest py4j==0.10.4 pyspark pytest-cov

RUN pip install boutiques pytest pyspark pybids
ENV PATH /opt/conda/envs/simenv/bin:$PATH

ENTRYPOINT ["pytest"]
RUN /bin/bash -c "source activate simenv"

ENV PYTHONPATH /opt/conda/envs/python2/lib/python2.7/site-packages:\
/usr/local/spark-2.2.0-bin-hadoop2.7/python:\
/opt/conda/envs/python2/bin:$PYTHONPATH

RUN pip install boutiques pybids duecredit nipype

ENTRYPOINT ["/bin/bash"]
4 changes: 3 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
[![CircleCI](https://circleci.com/gh/big-data-lab-team/sim/tree/master.svg?style=svg)](https://circleci.com/gh/big-data-lab-team/sim/tree/master)
[![PyPI](https://img.shields.io/pypi/v/simtools.svg)](https://pypi.python.org/pypi/simtools)
[![Build Status](https://travis-ci.org/big-data-lab-team/sim.svg?branch=master)](https://travis-ci.org/big-data-lab-team/sim)
[![Coverage Status](https://coveralls.io/repos/github/big-data-lab-team/sim/badge.svg?branch=master)](https://coveralls.io/github/big-data-lab-team/sim?branch=master)

# Spark for neuroIMaging (sim)

Expand Down
21 changes: 0 additions & 21 deletions circle.yml

This file was deleted.

2 changes: 2 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
[metadata]
description-file = README.md
[aliases]
test=pytest
7 changes: 4 additions & 3 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@
DEPS = [
"boutiques",
"pybids",
"pyspark"
"pyspark",
"pytest-runner"
]

setup(name="simtools",
Expand All @@ -16,8 +17,8 @@
license="GPL3.0",
packages=["sim"],
include_package_data=True,
test_suite="nose.collector",
tests_require=["nose"],
test_suite="pytest",
tests_require=["pytest"],
setup_requires=DEPS,
install_requires=DEPS,
entry_points = {
Expand Down
68 changes: 68 additions & 0 deletions sim/Sim.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
import boutiques, os, time, errno, tarfile, json

class Sim(object):

def __init__(self, boutiques_descriptor, input_path, output_dir):

self.boutiques_descriptor = boutiques_descriptor
self.input_path = input_path
self.output_dir = output_dir

def create_tar_file(self, out_dir, tar_name, files):
try:
os.makedirs(out_dir)
except OSError as e:
if e.errno != errno.EEXIST:
raise
with tarfile.open(os.path.join(out_dir, tar_name), "w") as tar:
for f in files:
tar.add(f)

def write_invocation_file(self, invocation, invocation_file):

# Note: the invocation file format will change soon
json_invocation = json.dumps(invocation)

# Writes invocation
with open(invocation_file,"w") as f:
f.write(json_invocation)

def bosh_exec(self, invocation_file, mount=None):
try:

if mount is None:
boutiques.execute("launch",self.boutiques_descriptor,invocation_file, "-x")
else:
boutiques.execute("launch", self.boutiques_descriptor, invocation_file, "-v",
"{0}:{0}".format(mount), "-x")
result = 0
except SystemExit as e:
result = e.code
return (result, "Empty log, Boutiques API doesn't return it yet.\n")

def pretty_print(self, result):
(label, (returncode, log)) = result
status = "SUCCESS" if returncode == 0 else "ERROR"
timestamp = str(int(time.time() * 1000))
filename = "{0}.{1}.log".format(timestamp, label)
with open(filename,"w") as f:
f.write(log)
print(" [ {0} ({1}) ] {2} - {3}".format(status, returncode, label, filename))

def check_failure(self, result):
(label, (returncode, log)) = result
return True if returncode !=0 else False


def write_BIDS_invocation(self, analysis_level, participant_label, invocation_file):

invocation = {}
invocation["bids_dir"] = self.input_path
invocation["output_dir_name"] = self.output_dir
if analysis_level == "participant":
invocation["analysis_level"] = "participant"
invocation["participant_label"] = participant_label
elif analysis_level == "group":
invocation["analysis_level"] = "group"

self.write_invocation_file(invocation, invocation_file)
88 changes: 18 additions & 70 deletions sim/SparkBIDS.py
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
from bids.grabbids import BIDSLayout
import json, os, errno, subprocess, time, tarfile, shutil
import boutiques, errno, json, os, shutil, subprocess, tarfile, time
from Sim import Sim

class SparkBIDS(object):
class SparkBIDS(Sim):

def __init__(self, boutiques_descriptor, bids_dataset, output_dir, options={}):

self.boutiques_descriptor = os.path.join(os.path.abspath(boutiques_descriptor))
self.bids_dataset = bids_dataset
self.output_dir = output_dir
super(SparkBIDS, self).__init__(os.path.abspath(boutiques_descriptor), bids_dataset, output_dir)


# Includes: use_hdfs, skip_participant_analysis,
# skip_group_analysis, skip_participants_file
Expand All @@ -20,6 +20,8 @@ def __init__(self, boutiques_descriptor, bids_dataset, output_dir, options={}):
and not self.skip_group_analysis
self.skipped_participants = self.skip_participants_file.read().split() if self.skip_participants_file else []

print(self.skipped_participants)

# Print analysis summary
print("Computed Analyses: Participant [ {0} ] - Group [ {1} ]".format(str(self.do_participant_analysis).upper(),
str(self.do_group_analysis).upper()))
Expand All @@ -42,6 +44,11 @@ def run(self, sc):

for result in mapped.collect():
self.pretty_print(result)
if self.check_failure(result):
# Disable Group Analysis if Participant Analysis Fails
self.do_group_analysis = False
print("ERROR# Participant analysis failed. Group analysis will be aborted.")


# Group analysis
if self.do_group_analysis:
Expand All @@ -66,7 +73,7 @@ def create_RDD(self, sc):

sub_dir="tar_files"

layout = BIDSLayout(self.bids_dataset)
layout = BIDSLayout(self.input_path)
participants = layout.get_subjects()

# Create RDD of file paths as key and tarred subject data as value
Expand All @@ -84,46 +91,6 @@ def create_RDD(self, sc):

return sc.parallelize(list_participants)

def create_tar_file(self, out_dir, tar_name, files):
try:
os.makedirs(out_dir)
except OSError as e:
if e.errno != errno.EEXIST:
raise
with tarfile.open(os.path.join(out_dir, tar_name), "w") as tar:
for f in files:
tar.add(f)

def pretty_print(self, result):
(label, (log, returncode)) = result
status = "SUCCESS" if returncode == 0 else "ERROR"
timestamp = str(int(time.time() * 1000))
filename = "{0}.{1}.log".format(timestamp, label)
with open(filename,"w") as f:
f.write(log)
print(" [ {3} ({0}) ] {1} - {2}".format(returncode, label, filename, status))

def write_invocation_file(self, analysis_level, participant_label, invocation_file):

# Note: the invocation file format will change soon

# Creates invocation object
invocation = {}
invocation["inputs"] = [ ]
invocation["inputs"].append({"bids_dir": self.bids_dataset})
invocation["inputs"].append({"output_dir_name": self.output_dir})
if analysis_level == "participant":
invocation["inputs"].append({"analysis_level": "participant"})
invocation["inputs"].append({"participant_label": participant_label})
elif analysis_level == "group":
invocation["inputs"].append({"analysis_level": "group"})

json_invocation = json.dumps(invocation)

# Writes invocation
with open(invocation_file,"w") as f:
f.write(json_invocation)

def get_bids_dataset(self, data, participant_label):

filename = 'sub-{0}.tar'.format(participant_label)
Expand All @@ -141,7 +108,7 @@ def get_bids_dataset(self, data, participant_label):

os.remove(filename)

return os.path.join(tmp_dataset, os.path.abspath(self.bids_dataset))
return os.path.join(tmp_dataset, os.path.abspath(self.input_path))

def run_participant_analysis(self, participant_label, data):

Expand All @@ -158,42 +125,23 @@ def run_participant_analysis(self, participant_label, data):
raise

invocation_file = "./invocation-{0}.json".format(participant_label)
self.write_invocation_file("participant",
self.write_BIDS_invocation("participant",
participant_label,
invocation_file)

exec_result = self.bosh_exec(invocation_file)
exec_result = self.bosh_exec(invocation_file, os.path.dirname(os.path.abspath(self.input_path)))
os.remove(invocation_file)
return (participant_label, exec_result)

def run_group_analysis(self):
invocation_file = "./invocation-group.json"
self.write_invocation_file("group",
self.write_BIDS_invocation("group",
None,
invocation_file)
exec_result = self.bosh_exec(invocation_file)
exec_result = self.bosh_exec(invocation_file, os.path.dirname(os.path.abspath(self.input_path)))
os.remove(invocation_file)
return ("group", exec_result)

def bosh_exec(self, invocation_file):
run_command = "bosh {0} -i {1} -e -d".format(self.boutiques_descriptor, invocation_file)
result = None
try:
log = subprocess.check_output(run_command, shell=True, stderr=subprocess.STDOUT)
result = (log, 0)
except subprocess.CalledProcessError as e:
result = (e.output, e.returncode)
try:
shutil.rmtree(label)
except:
pass
return result

def is_valid_file(parser, arg):
if not os.path.exists(arg):
parser.error("The file %s does not exist!" % arg)
else:
return open(arg, 'r')

def get_participant_from_fn(self,filename):
if filename.endswith(".tar"): return filename.split('-')[-1][:-4]
Expand Down
Loading

0 comments on commit 63de3f1

Please sign in to comment.