Skip to content

Commit

Permalink
Taking the index update offline to help mitigate the huge cost of an …
Browse files Browse the repository at this point in the history
…index update with such large buckets
  • Loading branch information
keenon committed Sep 30, 2024
1 parent 91c8496 commit c78367f
Show file tree
Hide file tree
Showing 4 changed files with 231 additions and 162 deletions.
4 changes: 2 additions & 2 deletions server/app/src/data_harvester.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def copy_snapshots(self, datasets: List[StandardizedDataset]):
return

# Download the dataset locally
tmp_folder: str = self.index.downloadToTmp(self.path)
tmp_folder: str = self.index.download_to_tmp(self.path)

# Identify trials that are too short. We don't want to process these
trial_paths_to_remove: List[str] = self.id_short_trials(tmp_folder)
Expand Down Expand Up @@ -305,7 +305,7 @@ def __init__(self, bucket: str, deployment: str, disable_pubsub: bool) -> None:
self.index.addChangeListener(self.on_change)
self.index.refreshIndex()
if not disable_pubsub:
self.index.registerPubSub()
self.index.register_pub_sub()

def on_change(self):
print('S3 CHANGED!')
Expand Down
93 changes: 53 additions & 40 deletions server/app/src/mocap_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import boto3
import threading
import argparse
from typing import Tuple
from typing import Tuple, Any
import traceback


Expand Down Expand Up @@ -230,13 +230,13 @@ def getHref(self):
else:
return 'https://app.addbiomechanics.org/data/'+userId+'/'+filePath

def markAsQueuedOnSlurm(self):
def mark_as_queued_on_slurm(self):
"""
This marks a subject as having been queued for processing on a slurm cluster
"""
self.index.uploadText(self.queuedOnSlurmFlagFile, '')

def markAsNotQueuedOnSlurm(self):
def mark_as_not_queued_on_slurm(self):
"""
This un-marks a subject that was previously queued on the slurm cluster, because something went wrong
"""
Expand Down Expand Up @@ -587,24 +587,22 @@ def __init__(self, bucket: str, deployment: str, singularity_image_path: str) ->

# Set up index
self.index = ReactiveS3Index(bucket, deployment)
self.index.addChangeListener(self.onChange)
self.index.refreshIndex()
self.index.registerPubSub()
self.index.register_pub_sub()
self.pubSubIsAlive = True

# Subscribe to PubSub status checks.
self.pingId = str(self.serverId[:16]).replace('-', '')
self.index.pubSub.subscribe("/PING/"+self.pingId, self.onPubSubStatusReceived)
self.index.pubSub.subscribe("/PING/" + self.pingId, self.on_pub_sub_status_received)

pubsub_status_thread = threading.Thread(
target=self.checkPubSubStatusForever, daemon=True)
target=self.check_pub_sub_status_forever, daemon=True)
pubsub_status_thread.start()

def onChange(self):
print('S3 CHANGED!')
def recompute_queue(self):
start_time = time.time()

shouldProcessSubjects: List[SubjectToProcess] = []
should_process_subjects: List[SubjectToProcess] = []

# 1. Collect all Trials
for folder in self.index.listAllFolders():
Expand All @@ -613,29 +611,26 @@ def onChange(self):
folder += '/'
subject = SubjectToProcess(self.index, folder)
if subject.shouldProcess():
shouldProcessSubjects.append(subject)
should_process_subjects.append(subject)

# 2. Sort Trials. First we prioritize subjects that are not just copies in the "standardized" bucket, then
# we sort oldest to newest. The sort method gets passed a Tuple, which goes left to right, True before False,
# and low to high.
shouldProcessSubjects.sort(key=lambda x: (
should_process_subjects.sort(key=lambda x: (
x.subjectPath.startswith("standardized"), x.latestInputTimestamp()))

# 3. Update the queue. There's another thread that busy-waits on the queue changing, that can then grab a queue entry and continue
self.queue = shouldProcessSubjects
self.queue = should_process_subjects

print('Queue updated in ' + str(time.time() - start_time) + ' seconds')

for subject in self.queue:
print('- should process: ' + str(subject.subjectPath))

# 4. Update status file. Do this on another thread, to avoid deadlocking with the pubsub service when this is being called
# inside a callback from a message, and then wants to send its own message out about updating the status file.
t = threading.Thread(
target=self.updateStatusFile, daemon=True)
t.start()
# 4. Update status file
self.update_status_file()

def updateStatusFile(self):
def update_status_file(self):
"""
This writes an updated version of our status file to S3, if anything has changed since our last write
"""
Expand All @@ -657,12 +652,11 @@ def updateStatusFile(self):
'protected/server_status/'+self.serverId, statusStr)
print('Uploaded updated status file: \n' + statusStr)

def onPubSubStatusReceived(self, topic: str, payload: bytes):
def on_pub_sub_status_received(self, topic: str, payload: bytes):
print(f'Received PubSub status update on server {self.serverId}')
self.index.pubSub.alive = True

def checkPubSubStatusForever(self):

def check_pub_sub_status_forever(self):
while True:
# First, assume that PubSub is down.
self.index.pubSub.alive = False
Expand All @@ -681,7 +675,7 @@ def checkPubSubStatusForever(self):

time.sleep(60)

def getSlurmJobQueueLen(self) -> Tuple[int, int]:
def get_slurm_job_queue_len(self) -> Tuple[int, int]:
"""
This uses the `squeue` command to check how many jobs we currently have pending in the queue, if we're on the SLURM cluster.
"""
Expand All @@ -703,37 +697,54 @@ def getSlurmJobQueueLen(self) -> Tuple[int, int]:
print('Failed to get SLURM job queue length: '+str(e))
return 0, 0

def processQueueForever(self):
def process_queue_forever(self):
"""
This busy-waits on the queue updating, and will process the head of the queue one at a time when it becomes available.
While processing, this blocks, so even though the queue is updating in the background, that shouldn't change the outcome of this process.
"""
while True:
try:
# First, we process the queue of PubSub messages that the index has received since the last time we
# checked.
print('Processing incoming messages...')
start_time = time.time()
any_changed = self.index.process_incoming_messages()
print('[PERFORMANCE] Processed incoming messages in ' + str(time.time() - start_time) + ' seconds')
if any_changed:
print('Incoming messages changed the state of the index, recomputing queue')
start_time = time.time()
self.recompute_queue()
print('[PERFORMANCE] Recomputed queue in ' + str(time.time() - start_time) + ' seconds')

if len(self.queue) > 0 and self.pubSubIsAlive:
start_time = time.time()

self.currentlyProcessing = self.queue[0]
self.updateStatusFile()
self.update_status_file()

# This will update the state of S3, which will in turn update and remove this element from our queue automatically.
# If it doesn't, then perhaps something went wrong and it's actually fine to process again. So the key idea is DON'T
# MANUALLY MANAGE THE WORK QUEUE! That happens in self.onChange()
# This will update the state of S3, which will in turn update and remove this element from our
# queue automatically. If it doesn't, then perhaps something went wrong and it's actually fine to
# process again. So the key idea is DON'T MANUALLY MANAGE THE WORK QUEUE! That happens in
# self.onChange()

if len(self.singularity_image_path) > 0:
reprocessing_job: bool = self.currentlyProcessing.subjectPath.startswith('standardized')

# SLURM has resource limits, and will fail to queue our job with sbatch if we're too greedy. So we need to check
# the queue length before we queue up a new job, and not queue up more than 15 jobs at a time (though the precise limit
# isn't documented anywhere, I figure 15 concurrent jobs per deployment (so 30 total between dev and prod) is probably a reasonable limit).
slurm_new_jobs, slurm_reprocessing_jobs = self.getSlurmJobQueueLen()
# SLURM has resource limits, and will fail to queue our job with sbatch if we're too greedy.
# So we need to check the queue length before we queue up a new job, and not queue up more
# than 15 jobs at a time (though the precise limit isn't documented anywhere, I figure 15
# concurrent jobs per deployment (so 30 total between dev and prod) is probably a reasonable
# limit).
slurm_new_jobs, slurm_reprocessing_jobs = self.get_slurm_job_queue_len()
slurm_total_jobs = slurm_new_jobs + slurm_reprocessing_jobs
print('Queueing subject for processing on SLURM: ' +
self.currentlyProcessing.subjectPath)
# We always leave a few slots open for new jobs, since they're more important than reprocessing
if (reprocessing_job and slurm_reprocessing_jobs < 10) or \
(not reprocessing_job and slurm_total_jobs < 150):
if (reprocessing_job and slurm_reprocessing_jobs < 15) or \
(not reprocessing_job and slurm_total_jobs < 30):
# Mark the subject as having been queued in SLURM, so that we don't try to process it again
self.currentlyProcessing.markAsQueuedOnSlurm()
self.currentlyProcessing.mark_as_queued_on_slurm()
print('Queueing subject for processing on SLURM: ' +
self.currentlyProcessing.subjectPath)
# Now launch a SLURM job to process this subject
Expand Down Expand Up @@ -770,7 +781,7 @@ def processQueueForever(self):
except Exception as e:
# If we fail to queue, then we need to mark the subject as not queued, so that we can try again later
print('Failed to queue SLURM job: '+str(e))
self.currentlyProcessing.markAsNotQueuedOnSlurm()
self.currentlyProcessing.mark_as_not_queued_on_slurm()
else:
print(
'Not queueing subject for processing on SLURM, because the queue is too long. Waiting for some jobs to finish')
Expand All @@ -780,14 +791,16 @@ def processQueueForever(self):

# This helps our status thread to keep track of what we're doing
self.currentlyProcessing = None
self.updateStatusFile()
self.update_status_file()

print('[PERFORMANCE] Processed subject in ' + str(time.time() - start_time) + ' seconds')

# We need to avoid race conditions with S3 by not immediately processing the next item. Give S3 a chance to update.
# We need to avoid race conditions with S3 by not immediately processing the next item. Give S3 a
# chance to update.
print(
'Sleeping for 10 seconds before attempting to process the next item...')
time.sleep(10)
print('Done sleeping')
self.onChange()
else:
time.sleep(1)
except Exception as e:
Expand Down Expand Up @@ -829,4 +842,4 @@ def processQueueForever(self):
args.singularity_image_path)

# 2. Run forever
server.processQueueForever()
server.process_queue_forever()
Loading

0 comments on commit c78367f

Please sign in to comment.