Skip to content

Commit

Permalink
check to see if the output files actually exist and warn appropriately
Browse files Browse the repository at this point in the history
  • Loading branch information
lacker committed Oct 19, 2022
1 parent 2ee455a commit ca4bea4
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 67 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
automator.egg-info/
build/
dist/
*~
*~
__pycache__
46 changes: 39 additions & 7 deletions automator/automator.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import subprocess
import sys

from automator import redis_util
from .logger import log
from .subarray import Subarray
from .proc_hpguppi import ProcHpguppi
Expand Down Expand Up @@ -103,7 +104,7 @@ def __init__(self, redis_endpoint, redis_chan, margin,
self.nshot_msg = nshot_msg
self.active_subarrays = {}
self.paused = False
self.alert("starting the automator at " +
self.alert("starting at " +
datetime.now(timezone.utc).astimezone().strftime("%Y-%m-%d %H:%M:%S %Z"))

def start(self):
Expand Down Expand Up @@ -358,7 +359,7 @@ def not_tracking(self, subarray_name):


def pause(self, message):
full_message = message + "; pausing automator for debugging."
full_message = message + "; pausing for debugging."
self.alert(full_message)
self.paused = True

Expand Down Expand Up @@ -393,12 +394,43 @@ def processing(self, subarray_name):
self.redis_server.llen(host_key))
# Format for host name (rather than instance name):
host_list = [host.split('/')[0] for host in instance_list]
# DATADIR
datadir = self.redis_server.get('{}:current_sb_id'.format(subarray_name))

# Check that raw files are where we expect them to be
datadir = redis_util.sb_id(self.redis_server, subarray_name)
raw_files = redis_util.raw_files(self.redis_server)
hosts_with_no_files = []
inputdir = None
for host in host_list:
filenames = raw_files.get(host, [])
dirnames = sorted(set(os.path.dirname(filename) for filename in filenames))
if not dirnames:
hosts_with_no_files.append(host)
continue
if len(dirnames > 1):
self.pause("multiple raw files directories on {}: {}".format(host, dirnames))
return
dirname = dirnames[0]
if not dirname.endswith(datadir):
self.pause("sb id is {} but raw files are in {}:{}".format(datadir, host, dirname))
return
if not inputdir:
inputdir = datadir

if inputdir is None:
self.alert("there were no input files when we expected input files.")
subarray.processing = False
return

if len(hosts_with_no_files) > len(host_list) / 2:
self.pause("many hosts have no files: {}".format(hosts_with_no_files))
return


# seticore processing
self.alert("running seticore...")
proc = ProcSeticore()
result_seticore = proc.process('/home/lacker/bin/seticore', host_list, BFRDIR, subarray_name)
proc = ProcSeticore(self.redis_server)
result_seticore = proc.process('/home/lacker/bin/seticore', host_list, BFRDIR,
subarray_name, inputdir, datadir)
if result_seticore > 1:
if result_seticore > 128:
self.pause("seticore killed with signal {}".format(result_seticore - 128))
Expand Down Expand Up @@ -543,5 +575,5 @@ def alert(self, message, slack_channel=SLACK_CHANNEL,
"""
log.info(message)
# Format: <Slack channel>:<Slack message text>
alert_msg = '{}:{}'.format(slack_channel, message)
alert_msg = '{}:automator: {}'.format(slack_channel, message)
self.redis_server.publish(slack_proxy_channel, alert_msg)
2 changes: 0 additions & 2 deletions automator/proc_hpguppi.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,6 @@ def process(self, proc_domain, hosts, subarray, bfrdir):
datadir = self.redis_server.get('{}:current_sb_id'.format(subarray))

# Determine input directory:
# Check for set of files from each node in case one of them
# failed to record data.
rawfiles = set()
for host in hosts:
rawfiles = self.redis_server.smembers('bluse_raw_watch:{}'.format(host))
Expand Down
100 changes: 43 additions & 57 deletions automator/proc_seticore.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import os
import subprocess

from automator import redis_util
from .logger import log

class ProcSeticore(object):
Expand All @@ -15,11 +16,11 @@ class ProcSeticore(object):
[1] https://github.com/lacker/seticore
"""

def __init__(self):
self.redis_server = redis.StrictRedis(decode_responses=True)
def __init__(self, redis_server):
self.redis_server = redis_server
self.PROC_STATUS_KEY = 'PROCSTAT'

def process(self, seticore, hosts, bfrdir, arrayid):
def process(self, seticore, hosts, bfrdir, arrayid, inputdir, sb_id):
"""Processes the incoming data using seticore.
Args:
Expand All @@ -29,62 +30,47 @@ def process(self, seticore, hosts, bfrdir, arrayid):
bfrdir (str): Directory containing the beamformer recipe files
associated with the data in the NVMe modules.
arrayid (str): Name of the current subarray.
inputdir (str): Directory containing raw file input
sb_id (str): the schedule block id
Returns:
None
"""
# Determine input directory:
# Check for set of files from each node in case one of them
# failed to record data.
rawfiles = set()
for host in hosts:
rawfiles = self.redis_server.smembers('bluse_raw_watch:{}'.format(host))
log.info("First rawfile(s) in set: {}".format(rawfiles))
if(len(rawfiles) > 0):
break

if(len(rawfiles) > 0):
# Take one of the filepaths from which to determine the input directory
inputdirs = os.path.dirname(rawfiles.pop())
# Get mode and determine FFT size:
fenchan = self.redis_server.get('{}:n_channels'.format(arrayid))
# Change FFT size to achieve roughly 1Hz channel bandwidth
if(fenchan == '32768'):
fft_size = str(2**14)
elif(fenchan == '4096'):
fft_size = str(2**17)
elif(fenchan == '1024'):
fft_size = str(2**19)
else:
log.error('Unexpected FENCHAN: {}'.format(fenchan))
fft_size = str(2**14)
# SB ID:
datadir = self.redis_server.get('{}:current_sb_id'.format(arrayid))
# Create output directories:
outputdir = '/scratch/data/{}/seticore_search'.format(datadir)
h5dir = '/scratch/data/{}/seticore_beamformer'.format(datadir)
log.info('Creating output directories...')
log.info('\nsearch: {}\nbeamformer: {}'.format(outputdir, h5dir))
for host in hosts:
cmd = ['ssh', host, 'mkdir', '-p', '-m', '1777', outputdir]
subprocess.run(cmd)
cmd = ['ssh', host, 'mkdir', '-p', '-m', '1777', h5dir]
subprocess.run(cmd)
# Build slurm command:
seticore_args = ['--input', inputdirs,
'--output', outputdir,
'--snr', '6',
'--h5_dir', h5dir,
'--num_bands', '16',
'--fft_size', fft_size,
'--telescope_id', '64',
'--recipe_dir', bfrdir]
err = '/home/obs/seticore_slurm/seticore_%N.err'
out = '/home/obs/seticore_slurm/seticore_%N.out'
cmd = ['srun', '--open-mode=append', '-e', err, '-o', out, '-w'] + [' '.join(hosts)] + [seticore] + seticore_args
log.info('Running seticore: {}'.format(cmd))
result = subprocess.run(cmd).returncode
return result
# Get mode and determine FFT size:
fenchan = self.redis_server.get('{}:n_channels'.format(arrayid))
# Change FFT size to achieve roughly 1Hz channel bandwidth
if(fenchan == '32768'):
fft_size = str(2**14)
elif(fenchan == '4096'):
fft_size = str(2**17)
elif(fenchan == '1024'):
fft_size = str(2**19)
else:
log.info('No data to process')
return 1
log.error('Unexpected FENCHAN: {}'.format(fenchan))
fft_size = str(2**14)

# Create output directories:
outputdir = '/scratch/data/{}/seticore_search'.format(sb_id)
h5dir = '/scratch/data/{}/seticore_beamformer'.format(sb_id)
log.info('Creating output directories...')
log.info('\nsearch: {}\nbeamformer: {}'.format(outputdir, h5dir))
for host in hosts:
cmd = ['ssh', host, 'mkdir', '-p', '-m', '1777', outputdir]
subprocess.run(cmd)
cmd = ['ssh', host, 'mkdir', '-p', '-m', '1777', h5dir]
subprocess.run(cmd)
# Build slurm command:
seticore_args = ['--input', inputdir,
'--output', outputdir,
'--snr', '6',
'--h5_dir', h5dir,
'--num_bands', '16',
'--fft_size', fft_size,
'--telescope_id', '64',
'--recipe_dir', bfrdir]
err = '/home/obs/seticore_slurm/seticore_%N.err'
out = '/home/obs/seticore_slurm/seticore_%N.out'
cmd = ['srun', '--open-mode=append', '-e', err, '-o', out, '-w'] + [' '.join(hosts)] + [seticore] + seticore_args
log.info('Running seticore: {}'.format(cmd))
result = subprocess.run(cmd).returncode
return result
49 changes: 49 additions & 0 deletions automator/redis_util.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
#!/usr/bin/env python
# Helpers functions for looking up various redis data.
# The convention is that "r" is our redis client.

import redis
import sys

def raw_files(r):
"""Returns a dict mapping host name to a list of raw files on the host."""
hosts = r.keys("bluse_raw_watch:*")
pipe = r.pipeline()
for host in hosts:
pipe.set("smembers", "bluse_raw_watch:" + host)
results = pipe.execute()
answer = {}
for host, result in zip(hosts, results):
answer[host] = sorted(result)
return answer


def sb_id(r, subarray):
return r.get('{}:current_sb_id'.format(subarray))


def main():
if len(sys.argv) < 2:
print("no command specified")
return

command = sys.argv[1]
args = sys.argv[2:]
r = redis.StrictRedis(decode_responses=True)

if command == "raw_files":
hosts = ["blpn{}".format(i) for i in range(64)]
for host, result in raw_files(r):
for r in result:
print(host, r)
return

if command == "sb_id":
arr = args[0]
print(sb_id(r, arr))
return

print("unrecognized command:", command)

if __name__ == "__main__":
main()

0 comments on commit ca4bea4

Please sign in to comment.