Skip to content

Commit

Permalink
1.7.5 (#280)
Browse files Browse the repository at this point in the history
* Feature/ben/null chain (#276)

* added minimum of 3 hours, estimate was too low for concurrency=128

* adapting scripts for null chain

* data reading and aggregation appears to work, at least for 1-node concurrency levels

* added aggregation logic

* fixing syntax bugs

* all data appears to be aggragating correctly

* fixed read in place

* added to CHANGELOG

* fixed typo

* fixed style

* added 'time' to 'merlin run' command, for better future provenance

* celery 5 (#279)

* upgraded to celery 5

* updated docs
  • Loading branch information
ben-bay authored Sep 25, 2020
1 parent ca82cd9 commit 07efd80
Show file tree
Hide file tree
Showing 11 changed files with 221 additions and 5 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ All notable changes to Merlin will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

### Changed
- Further improvements to the `null_spec` example.
- Now requiring Celery version 5.x.

## [1.7.4]

### Fixed
Expand Down
2 changes: 1 addition & 1 deletion docs/source/merlin_commands.rst
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,7 @@ An example of launching a simple celery worker using srun:

.. code:: bash
$ srun -n 1 celery worker -A merlin -l INFO
$ srun -n 1 celery -A merlin worker -l INFO
A parallel batch allocation launch is configured to run a single worker
process per node. This worker process will then launch a number of worker
Expand Down
2 changes: 1 addition & 1 deletion merlin/examples/workflows/null_spec/null_chain.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ study:
if ((NEXT_SAMPLES <= $(MAX_SAMPLES))) ; then
echo "Starting next iteration"
cd $(MERLIN_WORKSPACE)/../
merlin run $(SPECROOT)/null_chain.yaml --vars N_SAMPLES=$NEXT_SAMPLES CONC=$(CONC) RUN_ID=$(RUN_ID) N_WORK=$(N_WORK) QUEUE=$(QUEUE) TIME=$(TIME) MAX_SAMPLES=$(MAX_SAMPLES)
time merlin run $(SPECROOT)/null_chain.yaml --vars N_SAMPLES=$NEXT_SAMPLES CONC=$(CONC) RUN_ID=$(RUN_ID) N_WORK=$(N_WORK) QUEUE=$(QUEUE) TIME=$(TIME) MAX_SAMPLES=$(MAX_SAMPLES)
else
echo "Chain complete"
fi
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@

# $1 read_path

samples=(1 10 100 1000 10000 100000)

ALL_DIR=all_data
mkdir ${ALL_DIR}
runs=( $(find $1 -type d -name "run_*" -maxdepth 1) )
for run in "${runs[@]}"
do
r=${run: -1}
read_path="$1/run_${r}"
LOGS=( $(find $read_path -type f -name "*.log" -maxdepth 1) )
DATA_DIR=${ALL_DIR}/my_data${r}
mkdir ${DATA_DIR}
cat $read_path/*.log > ${DATA_DIR}/extra_lines.log
grep '^\[' ${DATA_DIR}/extra_lines.log > ${DATA_DIR}/all_nodes.log
sort ${DATA_DIR}/all_nodes.log -o ${DATA_DIR}/all_nodes.log
csplit -f ${DATA_DIR}/split_log_ -z ${DATA_DIR}/all_nodes.log /"Step 'verify' in "/ '{*}'

SPLIT_LOGS=( $(find $DATA_DIR -type f -name "split_log_*_*" -maxdepth 1) )
# samples
# 0 1
# 1 10
# 2 100
# 3 1000
# 4 10000
# 5 100000
# 6 n/a

DATA=${DATA_DIR}/my_data${r}.yaml
touch ${DATA}

s=1
i=0
c=$((2**$(($r-1))))
echo "${c}"
python3 read_output_chain.py ${read_path}/ ${DATA_DIR}/ $c >> ${DATA}
perl -pi -e 's/ : \n/ : /g' ${DATA}
done

cat all_data/my_data*/my_data*.yaml > all_data/all_data.yaml
sort all_data/all_data.yaml -o all_data/all_data.yaml

6 changes: 6 additions & 0 deletions merlin/examples/workflows/null_spec/scripts/kill_all.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
# 1 run id
# 2 concurrency

scancel -u ${USER}
merlin stop-workers
merlin purge -f ../null_chain.yaml --vars QUEUE=queue_c${2}_r${1}_chain
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@
partition = "pbatch"
if total_time > 1440:
total_time = 1440
if total_time < 180:
total_time = 180

output_path = os.path.join(args.output_path, f"run_{args.run_id}")
os.makedirs(output_path, exist_ok=True)
Expand Down
155 changes: 155 additions & 0 deletions merlin/examples/workflows/null_spec/scripts/read_output_chain.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
import argparse
import datetime
import glob
import os
import re
import subprocess
import sys


SAMPLES = [1, 10, 100, 1000, 10000, 100000]

# argument parsing
parser = argparse.ArgumentParser(description="Make some samples (names of people).")
parser.add_argument("output_path", type=str, help="path to original output")
parser.add_argument("split_data_path", type=str, help="path to split data")
parser.add_argument("c", type=int, help="concurrency")
args = parser.parse_args()

filled_c = str(args.c).zfill(3)

logpattern = os.path.join(args.split_data_path, "split_log_*")
errpattern = os.path.join(args.output_path, "*.err")
args.logfile = glob.glob(logpattern)
args.errfile = glob.glob(errpattern)

if len(args.logfile) == 0:
print(f"{logpattern} returned no glob matches!")
sys.exit()
if len(args.errfile) == 0:
print(f"{errpattern} returned no glob matches!")
sys.exit()

logmap = {}
i = 1
for log in args.logfile:
if i == 1000000:
break
logmap[i] = log
i *= 10


def single_task_times():
for k, v in logmap.items():
task_durations = []
try:
pre_lines = subprocess.check_output(
f'grep " succeeded in " {v}', shell=True
).decode("ascii")

pre_list = pre_lines.strip().split("\n")

for line in pre_list:
matches = re.search(r"\d+.\d+s:", line)
if matches:
match = matches.group(0)
match = float(match.strip("s:"))
task_durations.append(match)
except:
print(f"c{filled_c}_s{k} task times : ERROR")
continue

print(f"c{filled_c}_s{k} task times : " + str(task_durations))


def merlin_run_time():
total = 0
for err in args.errfile:
pre_line = subprocess.check_output(f'grep "real" {err}', shell=True).decode(
"ascii"
)
pre_line = pre_line.strip()
matches = re.search(r"\d\.\d\d\d", pre_line)
match = matches[0]
result = float(match)
total += result
try:
print(f"c{filled_c} merlin run : " + str(result))
except:
result = None
print(
f"c{filled_c} merlin run : ERROR -- result={result}, args.errfile={args.errfile}"
)


def start_verify_time():
for k, v in logmap.items():
all_timestamps = []
try:
pre_line = subprocess.check_output(
f'grep -m2 "verify" {v} | tail -n1', shell=True
).decode("ascii")
pre_line = pre_line.strip()
matches = re.search(r"\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d,\d\d\d", pre_line)
match = matches[0]
element = datetime.datetime.strptime(match, "%Y-%m-%d %H:%M:%S,%f")
timestamp = datetime.datetime.timestamp(element)
all_timestamps.append(timestamp)
except:
print(f"c{filled_c}_s{k} start verify : ERROR")
continue
try:
print(f"c{filled_c}_s{k} start verify : " + str(all_timestamps[0]))
except BaseException:
print(f"c{filled_c}_s{k} start verify : ERROR")


def start_run_workers_time():
for k, v in logmap.items():
all_timestamps = []
try:
pre_line = subprocess.check_output(f'grep -m1 "" {v}', shell=True).decode(
"ascii"
)
pre_line = pre_line.strip()
matches = re.search(r"\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d,\d\d\d", pre_line)
match = matches[0]
element = datetime.datetime.strptime(match, "%Y-%m-%d %H:%M:%S,%f")
timestamp = datetime.datetime.timestamp(element)
all_timestamps.append(timestamp)
except:
continue
earliest = min(all_timestamps)
print(f"c{filled_c}_s{k} start run-workers : " + str(earliest))


def start_sample1_time():
for k, v in logmap.items():
all_timestamps = []
try:
pre_line = subprocess.check_output(
f"grep -m1 \"Executing step 'null_step'\" {v}", shell=True
).decode("ascii")
pre_line = pre_line.strip()
matches = re.search(r"\d\d\d\d-\d\d-\d\d \d\d:\d\d:\d\d,\d\d\d", pre_line)
match = matches[0]
element = datetime.datetime.strptime(match, "%Y-%m-%d %H:%M:%S,%f")
timestamp = datetime.datetime.timestamp(element)
all_timestamps.append(timestamp)
except:
print(f"c{filled_c}_s{k} start samp1 : ERROR")
continue
earliest = min(all_timestamps)
print(f"c{filled_c}_s{k} start samp1 : " + str(earliest))


def main():
single_task_times()
merlin_run_time()
start_verify_time()
start_run_workers_time()
start_sample1_time()


if __name__ == "__main__":
main()
3 changes: 3 additions & 0 deletions merlin/examples/workflows/null_spec/scripts/search.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# 1 path to search

find $1 -name MERLIN_FINISHED -type f | wc -l
2 changes: 1 addition & 1 deletion merlin/study/celeryadapter.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ def launch_celery_workers(spec, steps=None, worker_args="", just_return_command=
just_return_command Don't execute, just return the command
"""
queues = spec.make_queue_string(steps)
worker_command = " ".join(["celery worker -A merlin", worker_args, "-Q", queues])
worker_command = " ".join(["celery -A merlin worker", worker_args, "-Q", queues])
if just_return_command:
return worker_command
else:
Expand Down
2 changes: 1 addition & 1 deletion requirements/release.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
cached_property
celery[redis,sqlalchemy]>=4.4.5
celery[redis,sqlalchemy]>=5.0.0
coloredlogs
cryptography
importlib_resources; python_version < '3.7'
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/run_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ def define_tests():
is the test's name, and the value is a tuple
of (shell command, condition(s) to satisfy).
"""
celery_regex = r"(srun\s+.*)?celery\s+worker\s+(-A|--app)\s+merlin\s+.*"
celery_regex = r"(srun\s+.*)?celery\s+(-A|--app)\s+merlin\s+worker\s+.*"

# shortcut string variables
workers = "merlin run-workers"
Expand Down

0 comments on commit 07efd80

Please sign in to comment.