From c78367fc54f07a4f73e509fd464f3b7cf3ac3d70 Mon Sep 17 00:00:00 2001 From: Keenon Werling Date: Mon, 30 Sep 2024 16:52:44 -0700 Subject: [PATCH] Taking the index update offline to help mitigate the huge cost of an index update with such large buckets --- server/app/src/data_harvester.py | 4 +- server/app/src/mocap_server.py | 93 +++++---- .../app/src/reactive_s3/reactive_s3_index.py | 194 +++++++++--------- server/app/src/reprocess_standardized.py | 102 +++++++-- 4 files changed, 231 insertions(+), 162 deletions(-) diff --git a/server/app/src/data_harvester.py b/server/app/src/data_harvester.py index 8a97db79..d0ca8502 100644 --- a/server/app/src/data_harvester.py +++ b/server/app/src/data_harvester.py @@ -123,7 +123,7 @@ def copy_snapshots(self, datasets: List[StandardizedDataset]): return # Download the dataset locally - tmp_folder: str = self.index.downloadToTmp(self.path) + tmp_folder: str = self.index.download_to_tmp(self.path) # Identify trials that are too short. We don't want to process these trial_paths_to_remove: List[str] = self.id_short_trials(tmp_folder) @@ -305,7 +305,7 @@ def __init__(self, bucket: str, deployment: str, disable_pubsub: bool) -> None: self.index.addChangeListener(self.on_change) self.index.refreshIndex() if not disable_pubsub: - self.index.registerPubSub() + self.index.register_pub_sub() def on_change(self): print('S3 CHANGED!') diff --git a/server/app/src/mocap_server.py b/server/app/src/mocap_server.py index 2e779a5c..5f5313f6 100644 --- a/server/app/src/mocap_server.py +++ b/server/app/src/mocap_server.py @@ -11,7 +11,7 @@ import boto3 import threading import argparse -from typing import Tuple +from typing import Tuple, Any import traceback @@ -230,13 +230,13 @@ def getHref(self): else: return 'https://app.addbiomechanics.org/data/'+userId+'/'+filePath - def markAsQueuedOnSlurm(self): + def mark_as_queued_on_slurm(self): """ This marks a subject as having been queued for processing on a slurm cluster """ self.index.uploadText(self.queuedOnSlurmFlagFile, '') - def markAsNotQueuedOnSlurm(self): + def mark_as_not_queued_on_slurm(self): """ This un-marks a subject that was previously queued on the slurm cluster, because something went wrong """ @@ -587,24 +587,22 @@ def __init__(self, bucket: str, deployment: str, singularity_image_path: str) -> # Set up index self.index = ReactiveS3Index(bucket, deployment) - self.index.addChangeListener(self.onChange) self.index.refreshIndex() - self.index.registerPubSub() + self.index.register_pub_sub() self.pubSubIsAlive = True # Subscribe to PubSub status checks. self.pingId = str(self.serverId[:16]).replace('-', '') - self.index.pubSub.subscribe("/PING/"+self.pingId, self.onPubSubStatusReceived) + self.index.pubSub.subscribe("/PING/" + self.pingId, self.on_pub_sub_status_received) pubsub_status_thread = threading.Thread( - target=self.checkPubSubStatusForever, daemon=True) + target=self.check_pub_sub_status_forever, daemon=True) pubsub_status_thread.start() - def onChange(self): - print('S3 CHANGED!') + def recompute_queue(self): start_time = time.time() - shouldProcessSubjects: List[SubjectToProcess] = [] + should_process_subjects: List[SubjectToProcess] = [] # 1. Collect all Trials for folder in self.index.listAllFolders(): @@ -613,29 +611,26 @@ def onChange(self): folder += '/' subject = SubjectToProcess(self.index, folder) if subject.shouldProcess(): - shouldProcessSubjects.append(subject) + should_process_subjects.append(subject) # 2. Sort Trials. First we prioritize subjects that are not just copies in the "standardized" bucket, then # we sort oldest to newest. The sort method gets passed a Tuple, which goes left to right, True before False, # and low to high. - shouldProcessSubjects.sort(key=lambda x: ( + should_process_subjects.sort(key=lambda x: ( x.subjectPath.startswith("standardized"), x.latestInputTimestamp())) # 3. Update the queue. There's another thread that busy-waits on the queue changing, that can then grab a queue entry and continue - self.queue = shouldProcessSubjects + self.queue = should_process_subjects print('Queue updated in ' + str(time.time() - start_time) + ' seconds') for subject in self.queue: print('- should process: ' + str(subject.subjectPath)) - # 4. Update status file. Do this on another thread, to avoid deadlocking with the pubsub service when this is being called - # inside a callback from a message, and then wants to send its own message out about updating the status file. - t = threading.Thread( - target=self.updateStatusFile, daemon=True) - t.start() + # 4. Update status file + self.update_status_file() - def updateStatusFile(self): + def update_status_file(self): """ This writes an updated version of our status file to S3, if anything has changed since our last write """ @@ -657,12 +652,11 @@ def updateStatusFile(self): 'protected/server_status/'+self.serverId, statusStr) print('Uploaded updated status file: \n' + statusStr) - def onPubSubStatusReceived(self, topic: str, payload: bytes): + def on_pub_sub_status_received(self, topic: str, payload: bytes): print(f'Received PubSub status update on server {self.serverId}') self.index.pubSub.alive = True - def checkPubSubStatusForever(self): - + def check_pub_sub_status_forever(self): while True: # First, assume that PubSub is down. self.index.pubSub.alive = False @@ -681,7 +675,7 @@ def checkPubSubStatusForever(self): time.sleep(60) - def getSlurmJobQueueLen(self) -> Tuple[int, int]: + def get_slurm_job_queue_len(self) -> Tuple[int, int]: """ This uses the `squeue` command to check how many jobs we currently have pending in the queue, if we're on the SLURM cluster. """ @@ -703,7 +697,7 @@ def getSlurmJobQueueLen(self) -> Tuple[int, int]: print('Failed to get SLURM job queue length: '+str(e)) return 0, 0 - def processQueueForever(self): + def process_queue_forever(self): """ This busy-waits on the queue updating, and will process the head of the queue one at a time when it becomes available. @@ -711,29 +705,46 @@ def processQueueForever(self): """ while True: try: + # First, we process the queue of PubSub messages that the index has received since the last time we + # checked. + print('Processing incoming messages...') + start_time = time.time() + any_changed = self.index.process_incoming_messages() + print('[PERFORMANCE] Processed incoming messages in ' + str(time.time() - start_time) + ' seconds') + if any_changed: + print('Incoming messages changed the state of the index, recomputing queue') + start_time = time.time() + self.recompute_queue() + print('[PERFORMANCE] Recomputed queue in ' + str(time.time() - start_time) + ' seconds') + if len(self.queue) > 0 and self.pubSubIsAlive: + start_time = time.time() + self.currentlyProcessing = self.queue[0] - self.updateStatusFile() + self.update_status_file() - # This will update the state of S3, which will in turn update and remove this element from our queue automatically. - # If it doesn't, then perhaps something went wrong and it's actually fine to process again. So the key idea is DON'T - # MANUALLY MANAGE THE WORK QUEUE! That happens in self.onChange() + # This will update the state of S3, which will in turn update and remove this element from our + # queue automatically. If it doesn't, then perhaps something went wrong and it's actually fine to + # process again. So the key idea is DON'T MANUALLY MANAGE THE WORK QUEUE! That happens in + # self.onChange() if len(self.singularity_image_path) > 0: reprocessing_job: bool = self.currentlyProcessing.subjectPath.startswith('standardized') - # SLURM has resource limits, and will fail to queue our job with sbatch if we're too greedy. So we need to check - # the queue length before we queue up a new job, and not queue up more than 15 jobs at a time (though the precise limit - # isn't documented anywhere, I figure 15 concurrent jobs per deployment (so 30 total between dev and prod) is probably a reasonable limit). - slurm_new_jobs, slurm_reprocessing_jobs = self.getSlurmJobQueueLen() + # SLURM has resource limits, and will fail to queue our job with sbatch if we're too greedy. + # So we need to check the queue length before we queue up a new job, and not queue up more + # than 15 jobs at a time (though the precise limit isn't documented anywhere, I figure 15 + # concurrent jobs per deployment (so 30 total between dev and prod) is probably a reasonable + # limit). + slurm_new_jobs, slurm_reprocessing_jobs = self.get_slurm_job_queue_len() slurm_total_jobs = slurm_new_jobs + slurm_reprocessing_jobs print('Queueing subject for processing on SLURM: ' + self.currentlyProcessing.subjectPath) # We always leave a few slots open for new jobs, since they're more important than reprocessing - if (reprocessing_job and slurm_reprocessing_jobs < 10) or \ - (not reprocessing_job and slurm_total_jobs < 150): + if (reprocessing_job and slurm_reprocessing_jobs < 15) or \ + (not reprocessing_job and slurm_total_jobs < 30): # Mark the subject as having been queued in SLURM, so that we don't try to process it again - self.currentlyProcessing.markAsQueuedOnSlurm() + self.currentlyProcessing.mark_as_queued_on_slurm() print('Queueing subject for processing on SLURM: ' + self.currentlyProcessing.subjectPath) # Now launch a SLURM job to process this subject @@ -770,7 +781,7 @@ def processQueueForever(self): except Exception as e: # If we fail to queue, then we need to mark the subject as not queued, so that we can try again later print('Failed to queue SLURM job: '+str(e)) - self.currentlyProcessing.markAsNotQueuedOnSlurm() + self.currentlyProcessing.mark_as_not_queued_on_slurm() else: print( 'Not queueing subject for processing on SLURM, because the queue is too long. Waiting for some jobs to finish') @@ -780,14 +791,16 @@ def processQueueForever(self): # This helps our status thread to keep track of what we're doing self.currentlyProcessing = None - self.updateStatusFile() + self.update_status_file() + + print('[PERFORMANCE] Processed subject in ' + str(time.time() - start_time) + ' seconds') - # We need to avoid race conditions with S3 by not immediately processing the next item. Give S3 a chance to update. + # We need to avoid race conditions with S3 by not immediately processing the next item. Give S3 a + # chance to update. print( 'Sleeping for 10 seconds before attempting to process the next item...') time.sleep(10) print('Done sleeping') - self.onChange() else: time.sleep(1) except Exception as e: @@ -829,4 +842,4 @@ def processQueueForever(self): args.singularity_image_path) # 2. Run forever - server.processQueueForever() + server.process_queue_forever() diff --git a/server/app/src/reactive_s3/reactive_s3_index.py b/server/app/src/reactive_s3/reactive_s3_index.py index ca8622e0..d6d63d10 100644 --- a/server/app/src/reactive_s3/reactive_s3_index.py +++ b/server/app/src/reactive_s3/reactive_s3_index.py @@ -65,7 +65,6 @@ class ReactiveS3Index: pubSub: PubSub files: Dict[str, FileMetadata] children: Dict[str, List[str]] - changeListeners: List[Callable] bucketName: str deployment: str lock: threading.Lock @@ -87,7 +86,7 @@ def __init__(self, bucket: str, deployment: str, disable_pubsub = False) -> None self.disable_pubsub = True self.files = {} self.children = {} - self.changeListeners = [] + self.incomingMessages = [] # Add pickling support def __getstate__(self): @@ -97,7 +96,6 @@ def __getstate__(self): del state['lock'] if 'pubSub' in state: del state['pubSub'] - del state['changeListeners'] del state['bucket'] return state @@ -110,16 +108,22 @@ def __setstate__(self, state): self.bucket = self.s3.Bucket(self.bucketName) self.lock = threading.Lock() self.disable_pubsub = True - self.changeListeners = [] + self.incomingMessages = [] - def registerPubSub(self) -> None: + def queue_pub_sub_update_message(self, topic: str, payload: bytes) -> None: + self.incomingMessages.append(('UPDATE', topic, payload)) + + def queue_pub_sub_delete_message(self, topic: str, payload: bytes) -> None: + self.incomingMessages.append(('DELETE', topic, payload)) + + def register_pub_sub(self) -> None: """ This registers a PubSub listener """ if self.disable_pubsub: return - self.pubSub.subscribe("/UPDATE/#", self._onUpdate) - self.pubSub.subscribe("/DELETE/#", self._onDelete) + self.pubSub.subscribe("/UPDATE/#", self.queue_pub_sub_update_message) + self.pubSub.subscribe("/DELETE/#", self.queue_pub_sub_delete_message) # Make sure we refresh our index when our connection resumes, if our connection was interrupted # @@ -127,49 +131,52 @@ def registerPubSub(self) -> None: # # self.pubSub.addResumeListener(self.refreshIndex) + def process_incoming_messages(self) -> bool: + """ + This processes incoming PubSub messages + """ + any_changes = False + while len(self.incomingMessages) > 0: + message = self.incomingMessages.pop(0) + if message[0] == 'UPDATE': + any_changes |= self._onUpdate(message[1], message[2]) + elif message[0] == 'DELETE': + any_changes |= self._onDelete(message[1], message[2]) + return any_changes + def load_only_folder(self, folder: str) -> None: """ This updates the index """ - self.lock.acquire() - try: - print('Loading folder '+folder) - self.files.clear() - self.children.clear() - for object in self.bucket.objects.filter(Prefix=folder): - key: str = object.key - lastModified: int = int(object.last_modified.timestamp() * 1000) - eTag = object.e_tag[1:-1] # Remove the double quotes around the ETag value - size: int = object.size - file = FileMetadata(key, lastModified, size, eTag) - self.updateChildrenOnAddFile(key) - self.files[key] = file - print('Folder load finished!') - finally: - self.lock.release() - self._onRefresh() + print('Loading folder '+folder) + self.files.clear() + self.children.clear() + for object in self.bucket.objects.filter(Prefix=folder): + key: str = object.key + lastModified: int = int(object.last_modified.timestamp() * 1000) + eTag = object.e_tag[1:-1] # Remove the double quotes around the ETag value + size: int = object.size + file = FileMetadata(key, lastModified, size, eTag) + self.updateChildrenOnAddFile(key) + self.files[key] = file + print('Folder load finished!') def refreshIndex(self) -> None: """ This updates the index """ - self.lock.acquire() - try: - print('Doing full index refresh...') - self.files.clear() - self.children.clear() - for object in self.bucket.objects.all(): - key: str = object.key - lastModified: int = int(object.last_modified.timestamp() * 1000) - eTag = object.e_tag[1:-1] # Remove the double quotes around the ETag value - size: int = object.size - file = FileMetadata(key, lastModified, size, eTag) - self.updateChildrenOnAddFile(key) - self.files[key] = file - print('Full index refresh finished!') - finally: - self.lock.release() - self._onRefresh() + print('Doing full index refresh...') + self.files.clear() + self.children.clear() + for object in self.bucket.objects.all(): + key: str = object.key + lastModified: int = int(object.last_modified.timestamp() * 1000) + eTag = object.e_tag[1:-1] # Remove the double quotes around the ETag value + size: int = object.size + file = FileMetadata(key, lastModified, size, eTag) + self.updateChildrenOnAddFile(key) + self.files[key] = file + print('Full index refresh finished!') def updateChildrenOnAddFile(self, path: str): cursor = -1 @@ -293,9 +300,6 @@ def hasChildren(self, folder: str, subPaths: List[str]) -> bool: return False return True - def addChangeListener(self, listener: Callable) -> None: - self.changeListeners.append(listener) - def uploadFile(self, bucketPath: str, localPath: str): """ This uploads a local file to a given spot in the bucket @@ -304,8 +308,10 @@ def uploadFile(self, bucketPath: str, localPath: str): self.s3.Object(self.bucketName, bucketPath).put( Body=open(localPath, 'rb')) if 'pubSub' in self.__dict__ and self.pubSub is not None: - self.pubSub.publish( - makeTopicPubSubSafe("/UPDATE/"+bucketPath), {'key': bucketPath, 'lastModified': time.time() * 1000, 'size': os.path.getsize(localPath)}) + topic = makeTopicPubSubSafe("/UPDATE/"+bucketPath) + body = {'key': bucketPath, 'lastModified': time.time() * 1000, 'size': os.path.getsize(localPath)} + self.queue_pub_sub_update_message(topic, json.dumps(body).encode('utf-8')) + self.pubSub.publish(topic, body) def uploadText(self, bucketPath: str, text: str): """ @@ -313,8 +319,10 @@ def uploadText(self, bucketPath: str, text: str): """ self.s3.Object(self.bucketName, bucketPath).put(Body=text) if 'pubSub' in self.__dict__ and self.pubSub is not None: - self.pubSub.publish( - makeTopicPubSubSafe("/UPDATE/"+bucketPath), {'key': bucketPath, 'lastModified': time.time() * 1000, 'size': len(text.encode('utf-8'))}) + topic = makeTopicPubSubSafe("/UPDATE/"+bucketPath) + body = {'key': bucketPath, 'lastModified': time.time() * 1000, 'size': len(text.encode('utf-8'))} + self.queue_pub_sub_update_message(topic, json.dumps(body).encode('utf-8')) + self.pubSub.publish(topic, body) def uploadJSON(self, bucketPath: str, contents: Dict[str, Any]): """ @@ -331,14 +339,16 @@ def delete(self, bucketPath: str): return bytearray() self.s3.Object(self.bucketName, bucketPath).delete() if 'pubSub' in self.__dict__ and self.pubSub is not None: - self.pubSub.publish( - makeTopicPubSubSafe("/DELETE/"+bucketPath), {'key': bucketPath}) + topic = makeTopicPubSubSafe("/DELETE/"+bucketPath) + body = {'key': bucketPath} + self.queue_pub_sub_delete_message(topic, json.dumps(body).encode('utf-8')) + self.pubSub.publish(topic, body) def download(self, bucketPath: str, localPath: str) -> None: print('downloading file '+bucketPath+' into '+localPath) self.bucket.download_file(bucketPath, localPath) - def downloadToTmp(self, bucketPath: str) -> str: + def download_to_tmp(self, bucketPath: str) -> str: """ This downloads a folder, or a file, creating a temporary folder. This returns the temporary file path. @@ -382,61 +392,45 @@ def getText(self, bucketPath: str) -> bytes: def getJSON(self, bucketPath: str) -> Dict[str, Any]: return json.loads(self.getText(bucketPath)) - def _onUpdate(self, topic: str, payload: bytes) -> None: + def _onUpdate(self, topic: str, payload: bytes) -> bool: """ We received a PubSub message telling us a file was created """ - self.lock.acquire() + body = json.loads(payload) + key: str = body['key'] + last_modified_str: str = body['lastModified'] + last_modified: int try: - body = json.loads(payload) - key: str = body['key'] - last_modified_str: str = body['lastModified'] - last_modified: int + last_modified = int(last_modified_str) + except ValueError: try: - last_modified = int(last_modified_str) - except ValueError: - try: - # Removing 'Z' and manually handling it as UTC - last_modified_str_utc = last_modified_str.replace("Z", "+00:00") - dt = datetime.fromisoformat(last_modified_str_utc) - last_modified = int(dt.timestamp() * 1000) - except ValueError as e: - print("Error parsing lastModified: "+last_modified_str) - print(e) - print('Defaulting to current time') - last_modified = int(time.time() * 1000) - size: int = body['size'] - e_tag: str = body['eTag'] if 'eTag' in body else '' - file = FileMetadata(key, last_modified, size, e_tag) - print("onUpdate() file: "+str(file)) - self.files[key] = file - self.updateChildrenOnAddFile(key) - finally: - self.lock.release() - self._onRefresh() + # Removing 'Z' and manually handling it as UTC + last_modified_str_utc = last_modified_str.replace("Z", "+00:00") + dt = datetime.fromisoformat(last_modified_str_utc) + last_modified = int(dt.timestamp() * 1000) + except ValueError as e: + print("Error parsing lastModified: "+last_modified_str) + print(e) + print('Defaulting to current time') + last_modified = int(time.time() * 1000) + size: int = body['size'] + e_tag: str = body['eTag'] if 'eTag' in body else '' + file = FileMetadata(key, last_modified, size, e_tag) + print("onUpdate() file: "+str(file)) + self.files[key] = file + self.updateChildrenOnAddFile(key) + return True - def _onDelete(self, topic: str, payload: bytes) -> None: + def _onDelete(self, topic: str, payload: bytes) -> bool: """ We received a PubSub message telling us a file was deleted """ - self.lock.acquire() - try: - body = json.loads(payload) - key: str = body['key'] - print("onDelete() key: "+str(key)) - anyDeleted = False - if key in self.files: - self.updateChildrenOnRemoveFile(key) - del self.files[key] - anyDeleted = True - finally: - self.lock.release() - if anyDeleted: - self._onRefresh() - - def _onRefresh(self) -> None: - """ - Iterate through all the files in the bucket, looking for updates - """ - for listener in self.changeListeners: - listener() + body = json.loads(payload) + key: str = body['key'] + print("onDelete() key: "+str(key)) + anyDeleted = False + if key in self.files: + self.updateChildrenOnRemoveFile(key) + del self.files[key] + anyDeleted = True + return anyDeleted \ No newline at end of file diff --git a/server/app/src/reprocess_standardized.py b/server/app/src/reprocess_standardized.py index 19eb8ab0..5194e9ce 100644 --- a/server/app/src/reprocess_standardized.py +++ b/server/app/src/reprocess_standardized.py @@ -1,10 +1,13 @@ import argparse from reactive_s3 import ReactiveS3Index, FileMetadata import json +from typing import Dict, List, Tuple if __name__ == "__main__": parser = argparse.ArgumentParser( description='This is a skeleton of a script to patch the standardized data in various ways.') + # "biomechanics-uploads161949-dev" + # "biomechanics-uploads83039-prod" parser.add_argument('--bucket', type=str, default="biomechanics-uploads161949-dev", help='The S3 bucket to access user data in') parser.add_argument('--deployment', type=str, @@ -14,8 +17,21 @@ print('Connecting to S3...') index = ReactiveS3Index(args.bucket, args.deployment, disable_pubsub=True) - index.refreshIndex() + # for object in index.bucket.objects.all(): + # key: str = object.key + # lastModified: int = int(object.last_modified.timestamp() * 1000) + # eTag = object.e_tag[1:-1] # Remove the double quotes around the ETag value + # size: int = object.size + # print(key) + + subject_hashes: Dict[str, List[Tuple[str, int]]] = {} + + subjects_with_dynamics = 0 + subjects_without_dynamics = 0 + subjects_not_yet_updated = 0 + + index.refreshIndex() for folder in index.listAllFolders(): # We want to collect all the dataset targets that we're supposed to be copying data to, in case # those have been updated @@ -23,22 +39,68 @@ if folder.endswith('/'): folder = folder[:-1] children = index.getImmediateChildren(folder + '/') - subject_files = [x for x in children if x.endswith('_subject.json')] - if len(subject_files) == 1: - print('Checking subject file: '+subject_files[0]) - tmp_file: str = index.downloadToTmp(folder + '/_subject.json') - with open(tmp_file, 'r') as f: - subject = json.load(f) - if 'skeletonPreset' in subject: - skeleton_preset = subject['skeletonPreset'] - if skeleton_preset != 'custom': - print('Found a non-custom skeleton preset: '+skeleton_preset) - print('Patching to custom skeleton preset') - subject['skeletonPreset'] = 'custom' - index.uploadJSON(folder + '/_subject.json', subject) - index.delete(folder + '/SLURM') - index.delete(folder + '/PROCESSING') - index.delete(folder + '/_results.json') - bin_files = [x for x in children if x.endswith('.bin')] - for bin_file in bin_files: - index.delete(folder + '/' + bin_file) + is_subject = any([x.endswith('_subject.json') for x in children]) + + if not is_subject: + continue + + has_dynamics_b3d = any([x.endswith('_dynamics_trials_only.b3d') for x in children]) + has_no_dynamics_flag = any([x.endswith('NO_DYNAMICS_TRIALS') for x in children]) + if has_dynamics_b3d: + subjects_with_dynamics += 1 + elif has_no_dynamics_flag: + subjects_without_dynamics += 1 + else: + subjects_not_yet_updated += 1 + + path_parts = folder.split('/') + hash_id = path_parts[-1] + original_subject = '/'.join(path_parts[:-1]) + try: + subject_json_last_modified = index.getMetadata(folder + '/_subject.json').lastModified + + if original_subject not in subject_hashes: + subject_hashes[original_subject] = [] + subject_hashes[original_subject].append((hash_id, subject_json_last_modified)) + except: + print(f'Error processing {folder}') + + average_num_hashes = sum([len(x) for x in subject_hashes.values()]) / len(subject_hashes) + + # num_reprocessed = 0 + # for original_subject, hashes in subject_hashes.items(): + # # sort hashes by last modified date + # hashes.sort(key=lambda x: x[1]) + # assert(len(hashes) > 0) + # # Get the last hash + # hash_id = hashes[-1][0] + # folder = f'{original_subject}/{hash_id}' + # children = index.getImmediateChildren(folder + '/') + # dynamics_b3d = [x for x in children if x.endswith('_dynamics_trials_only.b3d')] + # no_dynamics_flag = [x for x in children if x.endswith('NO_DYNAMICS_TRIALS')] + # missing_dynamics = ((len(dynamics_b3d) + len(no_dynamics_flag)) == 0) + # b3d_files = [x for x in children if x.endswith('.b3d')] + # results_files = [x for x in children if x.endswith('_results.json')] + # + # if len(dynamics_b3d) + len(no_dynamics_flag) == 0: + # if len(b3d_files) > 0 and len(results_files) > 0: + # print(f'Reprocessing {folder}...') + # print('Deleting ' + folder + '/SLURM') + # index.delete(folder + '/SLURM') + # index.delete(folder + '/PROCESSING') + # index.delete(folder + '/_results.json') + # b3d_files = [x for x in children if x.endswith('.b3d')] + # for b3d_file in b3d_files: + # index.delete(folder + '/' + b3d_file) + # num_reprocessed += 1 + # if num_reprocessed >= 150: + # break + + print('Done!') + print('Average number of hashes per subject: ' + str(average_num_hashes)) + print('Number of subjects: ' + str(len(subject_hashes))) + print('Number of hashes: ' + str(sum([len(x) for x in subject_hashes.values()]))) + print('Number of subjects with dynamics: ' + str(subjects_with_dynamics)) + print('Number of subjects without dynamics: ' + str(subjects_without_dynamics)) + print('Number of subjects not yet updated: ' + str(subjects_not_yet_updated)) +