Skip to content

Commit

Permalink
Merge pull request #62 from loostrum/dev
Browse files Browse the repository at this point in the history
Add new monitoring page
  • Loading branch information
loostrum authored Jan 5, 2021
2 parents d439fb3 + 495a566 commit 5812ec7
Show file tree
Hide file tree
Showing 11 changed files with 814 additions and 25 deletions.
4 changes: 4 additions & 0 deletions darc/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,8 @@ sb_generator:
processor_master:
log_file: "{home}/darc/log/processor.{hostname}.log"
scavenger_interval: 60 # interval for thread scavenger to remove dead threads (seconds)
processing_status_generator_interval: 60 # interval for status generator to generate status file
processing_status_path: "{home}/public_html/darc/processing/" # output path for status file
check_interval: 60 # How often to check whether workers are done with processing (s)
max_wait_time: 600 # After how much processing time we should check if a node is still online
ntrig_email_max: 100 # Maximum number of triggers to show in email
Expand All @@ -152,6 +154,8 @@ processor_master:
processor:
log_file: "{home}/darc/log/processor.{hostname}.log"
scavenger_interval: 60 # interval for thread scavenger to remove dead threads (seconds)
processing_status_generator_interval: 60 # interval for status generator to generate status file
processing_status_path: "{home}/public_html/darc/processing/" # output path for status file
interval: 1.1 # how often to process candidates (seconds)
num_extractor: 2 # how many data extractors to run
output_subdir: "triggers_realtime" # output dir for triggers on each node
Expand Down
24 changes: 12 additions & 12 deletions darc/offline_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,16 +194,16 @@ def _start_observation_master(self, obs_config, reload=True):
:param dict obs_config: Observation config
:param bool reload: reload service settings (default: True)
"""
# reload config
if reload:
self.load_config()

if not self.full_processing_enabled:
self.logger.info("Full processing disabled - not running offline processing")
self.logger.info("Full processing disabled - nothing to run on master node")
return

self.logger.info("Starting observation on master node")

# reload config
if reload:
self.load_config()

# create result dir
try:
util.makedirs(obs_config['result_dir'])
Expand Down Expand Up @@ -265,13 +265,13 @@ def _start_observation_worker(self, obs_config, reload=True):
self.logger.error("Failed to create results directory")
raise OfflineProcessingException("Failed to create result directory: {}".format(e))

# TAB or IAB mode
if obs_config['ntabs'] == 1:
obs_config['mode'] = 'IAB'
trigger_output_file = "{output_dir}/triggers/data/data_00_full.hdf5".format(**obs_config)
else:
obs_config['mode'] = 'TAB'
trigger_output_file = "{output_dir}/triggers/data/data_full.hdf5".format(**obs_config)
# TAB or IAB mode
if obs_config['ntabs'] == 1:
obs_config['mode'] = 'IAB'
trigger_output_file = "{output_dir}/triggers/data/data_00_full.hdf5".format(**obs_config)
else:
obs_config['mode'] = 'TAB'
trigger_output_file = "{output_dir}/triggers/data/data_full.hdf5".format(**obs_config)

# wait until end time + delay
start_processing_time = Time(obs_config['parset']['task.stopTime']) + TimeDelta(self.delay, format='sec')
Expand Down
68 changes: 67 additions & 1 deletion darc/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

import os
import ast
from textwrap import dedent
import socket
import threading
import multiprocessing as mp
from time import sleep
Expand Down Expand Up @@ -43,9 +45,11 @@ def __init__(self, *args, **kwargs):
self.logger = get_queue_logger(self.module_name, self.log_queue)

self.observations = {}
self.observation_end_times = {}
self.observation_queues = {}
self.current_observation_queue = None
self.scavenger = None
self.status_generator = None

self.logger.info("{} initialized".format(self.log_name))

Expand All @@ -56,29 +60,89 @@ def run(self):
# create a thread scavenger
self.scavenger = threading.Thread(target=self.thread_scavenger, name='scavenger')
self.scavenger.start()
# create a status generator for the processing website
self.status_generator = threading.Thread(target=self.processing_status_generator, name='status_generator')
self.status_generator.start()
super(ProcessorManager, self).run()

def thread_scavenger(self):
"""
Remove any finished threads at regular intervals
"""
self.logger.info("Starting thread scavenger")
while not self.stop_event.is_set():
for taskid, thread in self.observations.copy().items():
if not thread.is_alive():
# if the thread is dead, remove it from the list
self.logger.info(f"Scavenging thread of taskid {taskid}")
self.observations.pop(taskid)
self.observation_queues.pop(taskid)
self.observation_end_times.pop(taskid)
self.stop_event.wait(self.scavenger_interval)

def processing_status_generator(self):
"""
At regular interval, create status file for processing website
"""
self.logger.info("Starting processing status file generator")
# create the output directory if it does not exist
util.makedirs(self.processing_status_path)
hostname = socket.gethostname()
out_file = os.path.join(self.processing_status_path, f"{hostname}.js")
while not self.stop_event.is_set():
# get list of taskids that are being processed
taskids = sorted(self.observations.keys())
times = []
if not taskids:
# nothing is running
status = "idle"
else:
status = "running"
now = Time.now()
for taskid in taskids:
# check elapsed time
processing_time = now - self.observation_end_times[taskid]
# if negative, the observation is still running
if processing_time.sec < 0:
times.append('observing')
else:
# format as hh:mm:ss
full_min, seconds = divmod(processing_time.sec, 60)
hours, minutes = divmod(full_min, 60)
times.append(f"{hours:02.0f}h{minutes:02.0f}m{seconds:02.0f}s")

content = dedent(f"""
var {hostname} = {{
"node_name": "{hostname}",
"node_status": "{status}",
"node_process": "{','.join(taskids)}",
"time": "{','.join(times)}"
}};
""")
with open(out_file, 'w') as f:
f.write(content)
self.stop_event.wait(self.processing_status_generator_interval)

# upon exit, create file to indicate node is offline
content = dedent(f"""
var {hostname} = {{
"node_name": "{hostname}",
"node_status": "offline",
"node_process": "",
"time": ""
}};
""")
with open(out_file, 'w') as f:
f.write(content)

def stop(self, abort=False):
"""
Stop this service
:param bool abort: Ignored; a stop of the manager always equals an abort
"""
self.logger.info("Stopping {}".format(self.log_name))
# Abort any exisiting observations
# Abort any existing observations
# loop over dictionary items. Use copy to avoid changing dict in loop
for taskid, obs in self.observations.copy().items():
if obs.is_alive():
Expand Down Expand Up @@ -118,6 +182,8 @@ def start_observation(self, obs_config, reload=True):
queue.put({'command': 'start_observation', 'obs_config': obs_config, 'reload': reload})
self.observations[taskid] = proc
self.observation_queues[taskid] = queue
self.observation_end_times[taskid] = Time(obs_config['startpacket'] / TIME_UNIT, format='unix') + \
TimeDelta(obs_config['duration'], format='sec')
self.current_observation_queue = queue
return

Expand Down
67 changes: 65 additions & 2 deletions darc/processor_master.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@
import multiprocessing as mp
import astropy.units as u
from astropy.coordinates import SkyCoord
from astropy.time import Time
from astropy.time import Time, TimeDelta
import numpy as np

from darc import DARCBase
from darc import util
from darc.control import send_command
from darc.definitions import WORKERS, TSAMP
from darc.definitions import WORKERS, TSAMP, TIME_UNIT
from darc.logger import get_queue_logger, get_queue_logger_listener


Expand All @@ -40,9 +40,11 @@ def __init__(self, *args, **kwargs):
self.logger = get_queue_logger(self.module_name, self.log_queue)

self.observations = {}
self.observation_end_times = {}
self.observation_queues = {}

self.scavenger = None
self.status_generator = None

# reduce logging from status check commands
logging.getLogger('darc.control').setLevel(logging.ERROR)
Expand All @@ -56,12 +58,16 @@ def run(self):
# create a thread scavenger
self.scavenger = threading.Thread(target=self.thread_scavenger, name='scavenger')
self.scavenger.start()
# create a status generator for the processing website
self.status_generator = threading.Thread(target=self.processing_status_generator, name='status_generator')
self.status_generator.start()
super(ProcessorMasterManager, self).run()

def thread_scavenger(self):
"""
Remove any finished threads at regular intervals
"""
self.logger.info("Starting thread scavenger")
while not self.stop_event.is_set():
for taskid, thread in self.observations.copy().items():
if not thread.is_alive():
Expand All @@ -72,6 +78,61 @@ def thread_scavenger(self):

self.stop_event.wait(self.scavenger_interval)

def processing_status_generator(self):
"""
At regular interval, create status file for processing website
"""
self.logger.info("Starting processing status file generator")
# create the output directory if it does not exist
util.makedirs(self.processing_status_path)
hostname = socket.gethostname()
out_file = os.path.join(self.processing_status_path, f"{hostname}.js")
while not self.stop_event.is_set():
# get list of taskids that are being processed
taskids = sorted(self.observations.keys())
times = []
if not taskids:
# nothing is running
status = "idle"
else:
status = "running"
now = Time.now()
for taskid in taskids:
# check elapsed time
processing_time = now - self.observation_end_times[taskid]
# if negative, the observation is still running
if processing_time.sec < 0:
times.append('observing')
else:
# format as hh:mm:ss
full_min, seconds = divmod(processing_time.sec, 60)
hours, minutes = divmod(full_min, 60)
times.append(f"{hours:02.0f}h{minutes:02.0f}m{seconds:02.0f}s")

content = dedent(f"""
var {hostname} = {{
"node_name": "{hostname}",
"node_status": "{status}",
"node_process": "{','.join(taskids)}",
"time": "{','.join(times)}"
}};
""")
with open(out_file, 'w') as f:
f.write(content)
self.stop_event.wait(self.processing_status_generator_interval)

# upon exit, create file to indicate node is offline
content = dedent(f"""
var {hostname} = {{
"node_name": "{hostname}",
"node_status": "offline",
"node_process": "",
"time": ""
}};
""")
with open(out_file, 'w') as f:
f.write(content)

def stop(self, abort=False):
"""
Stop this service
Expand Down Expand Up @@ -119,6 +180,8 @@ def start_observation(self, obs_config, reload=True):
queue.put({'command': 'start_observation', 'obs_config': obs_config, 'reload': reload})
self.observations[taskid] = proc
self.observation_queues[taskid] = queue
self.observation_end_times[taskid] = Time(obs_config['startpacket'] / TIME_UNIT, format='unix') + \
TimeDelta(obs_config['duration'], format='sec')
return

def stop_observation(self, obs_config):
Expand Down
8 changes: 0 additions & 8 deletions docs/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,3 @@ For more information about these different parts of DARC, consult one of the fol
_modules/offline
_modules/mac
_modules/api



Indices and tables
==================
* :ref:`genindex`
* :ref:`modindex`
* :ref:`search`
5 changes: 3 additions & 2 deletions test/test_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,11 +303,12 @@ def test_processor_obs(self):
self.amber_queue.put({'command': 'stop_observation'})
self.processor.source_queue.put({'command': 'stop_observation'})

# wait for processor to exit
self.processor.join()

# stop services
self.amber_listener.source_queue.put('stop')
self.amber_listener.join()
self.processor.source_queue.put('stop')
self.processor.join()


@unittest.skipUnless(socket.gethostname() == 'zeus', "Test can only run on zeus")
Expand Down
Binary file added webpage/arts-logo.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
6 changes: 6 additions & 0 deletions webpage/arts001_example.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
var arts001 = {
"node_name": "arts001",
"node_status": "running", // options are "running", "idle", "offline"
"node_process": "190101001",
"time": "00h00m00s"
};
Loading

0 comments on commit 5812ec7

Please sign in to comment.