From b41505d7fbb0e6f830686db295399d8f94b92520 Mon Sep 17 00:00:00 2001 From: "Jose M. de la Rosa" Date: Fri, 19 Jul 2024 16:37:00 -0500 Subject: [PATCH 1/8] separating 2d from preprocessing --- emtools/scripts/emt-scipion-otf.py | 55 +++++++++++++++--------------- 1 file changed, 28 insertions(+), 27 deletions(-) diff --git a/emtools/scripts/emt-scipion-otf.py b/emtools/scripts/emt-scipion-otf.py index 26cb784..e42719f 100755 --- a/emtools/scripts/emt-scipion-otf.py +++ b/emtools/scripts/emt-scipion-otf.py @@ -15,17 +15,14 @@ # * # ************************************************************************** -import sys import json import os.path import argparse -import ast import time -from collections import OrderedDict import datetime as dt -import re -from emtools.utils import Process, Color, System, Pipeline +from emtools.utils import Process, Color, System +from emtools.jobs import Pipeline from emtools.metadata import EPU, SqliteFile, StarFile, Table import pyworkflow as pw @@ -58,9 +55,9 @@ def loadGpus(): ngpu = len(gpuProc) half = int(ngpu/2) gpus = list(range(ngpu)) - # Take half of gpus for Motioncor and the other half for 2D + # Take half of gpus for Motioncor and the other half for cryolo params['mcGpus'] = gpus[:half] - params['cls2dGpus'] = gpus[half:] + params['cryoloGpus'] = gpus[half:] def calculateBoxSize(protCryolo): @@ -101,7 +98,16 @@ def save_otf(otf): json.dump(otf, f, indent=4) -def run2DPipeline(wf, protExtract): +def run2DPipeline(wf, protExtract, particleSizeA, gpus): + """ + Run the 2D pipeline for a given workflow, after particles extraction. + + Args: + wf: workflow instance. + protExtract: particles extraction protocol. + particleSizeA: Size of the particles in Angstroms + gpus: list with GPUs that will be used for 2D classification + """ print(f"\n>>> Running 2D pipeline: input extract " f"{Color.bold(protExtract.getRunName())}") @@ -131,12 +137,12 @@ def _createBatch2D(gridsquare, subsetParts): protRelion2D = wf.createProtocol( 'relion.protocols.ProtRelionClassify2D', objLabel=f'relion2d: {gridsquare}', - maskDiameterA=round(params['partSizeA'] * 1.5), + maskDiameterA=round(particleSizeA * 1.5), numberOfClasses=200, extraParams='--maxsig 50', pooledParticles=50, doGpu=True, - gpusToUse=','.join(str(g) for g in params['cls2dGpus']), + gpusToUse=','.join(str(g) for g in gpus), numberOfThreads=32, numberOfMpi=1, allParticlesRam=True, @@ -347,7 +353,9 @@ def _path(*p): wf.launchProtocol(protCTF, wait={OUT_CTFS: 16}) protCryoloImport = None - cryoloInputModelFrom = 0 # General model (low pass filtered) + #cryoloInputModelFrom = 0 # General model (low pass filtered) + cryoloInputModelFrom = 1 # Denoised with Janni + if 'cryolo_model' in picking: protCryoloImport = wf.createProtocol( 'sphire.protocols.SphireProtCryoloImport', @@ -358,13 +366,13 @@ def _path(*p): cryoloInputModelFrom = 2 # Other protCryolo = wf.createProtocol( - 'sphire.protocols.SphireProtCRYOLOPicking', + 'sphire.protocols.SphireProtCRYOLOPickingTasks', objLabel='cryolo picking', boxSize=0, # let cryolo estimate the box size conservPickVar=0.05, # less conservative than default 0.3 - useGpu=False, # use cpu for picking, fast enough + useGpu=True, # use gpu for speed up janni denoising numCpus=8, - gpuList='', + gpuList=' '.join(str(g) for g in params['cryoloGpus']), streamingBatchSize=16, streamingSleepOnWait=60, numberOfThreads=1, @@ -377,11 +385,6 @@ def _path(*p): wf.launchProtocol(protCryolo, wait={OUT_COORD: 100}) - skip_2d = not opts.get('2d', True) - - if skip_2d: - return - calculateBoxSize(protCryolo) protRelionExtract = wf.createProtocol( @@ -403,16 +406,13 @@ def _path(*p): _setPointer(protRelionExtract.inputCoordinates, protCryolo, OUT_COORD) # Ensure there are at least some particles wf.launchProtocol(protRelionExtract, wait={OUT_PART: 100}) - run2DPipeline(wf, protRelionExtract) -def continue_project(workingDir): +def continue_2d(workingDir, gpus): print(f"Loading project from {workingDir}") project = Project(pw.Config.getDomain(), workingDir) project.load() wf = Workflow(project) - loadGpus() - protExtract = protCryolo = None for run in project.getRuns(): @@ -429,7 +429,7 @@ def continue_project(workingDir): calculateBoxSize(protCryolo) - run2DPipeline(wf, protExtract) + run2DPipeline(wf, protExtract, params['partSizeA'], gpus) def restart(workingDir, args): @@ -718,7 +718,8 @@ def main(): help="Some test code") g.add_argument('--clean', action="store_true", help="Clean Scipion project files/folders.") - g.add_argument('--continue_2d', action="store_true") + g.add_argument('--continue_2d', nargs='+', type=int, + metavar='GPUs') g.add_argument('--write_stars', default=argparse.SUPPRESS, nargs='*', help="Generate STAR micrographs and particles STAR files." @@ -751,8 +752,8 @@ def main(): clean_project(cwd) elif 'write_stars' in args: write_stars(cwd, ids=args.write_stars) - elif args.continue_2d: - continue_project(cwd) + elif gpus := args.continue_2d: + continue_2d(cwd, gpus) elif args.clone_project: src, dst = args.clone_project clone_project(src, dst) From 42f8371afd77648178e88467ad352f4b3a346461 Mon Sep 17 00:00:00 2001 From: "Jose M. de la Rosa" Date: Wed, 18 Sep 2024 16:21:26 -0500 Subject: [PATCH 2/8] Working on StarMonitor and more general BatchManager --- emtools/metadata/__init__.py | 10 +- emtools/metadata/misc.py | 64 +++++- emtools/metadata/starfile.py | 66 ++++++ emtools/pwx/__init__.py | 5 +- emtools/pwx/monitors.py | 48 ----- emtools/scripts/emt-scipion-otf.py | 312 ++++++++++++++++++----------- emtools/tests/test_metadata.py | 85 +++++++- 7 files changed, 418 insertions(+), 172 deletions(-) diff --git a/emtools/metadata/__init__.py b/emtools/metadata/__init__.py index e0f934f..3c80c4d 100644 --- a/emtools/metadata/__init__.py +++ b/emtools/metadata/__init__.py @@ -15,10 +15,12 @@ # ************************************************************************** from .table import Column, ColumnList, Table -from .starfile import StarFile +from .starfile import StarFile, StarMonitor from .epu import EPU -from .misc import Bins, TsBins, DataFiles, MovieFiles +from .misc import Bins, TsBins, DataFiles, MovieFiles, BatchManager from .sqlite import SqliteFile -__all__ = ["Column", "ColumnList", "Table", "StarFile", "EPU", - "Bins", "TsBins", "SqliteFile", "DataFiles", "MovieFiles"] + +__all__ = ["Column", "ColumnList", "Table", "StarFile", "StarMonitor", "EPU", + "Bins", "TsBins", "SqliteFile", "DataFiles", "MovieFiles", + "BatchManager"] diff --git a/emtools/metadata/misc.py b/emtools/metadata/misc.py index 69d857f..238cfac 100644 --- a/emtools/metadata/misc.py +++ b/emtools/metadata/misc.py @@ -15,9 +15,10 @@ # ************************************************************************** import os +from uuid import uuid4 from datetime import datetime, timedelta -from emtools.utils import Path, Pretty +from emtools.utils import Path, Pretty, Process class Bins: @@ -247,3 +248,64 @@ def total_movies(self): def print(self, sort=None): DataFiles.print(self, sort=sort) self.counters[1].print('movie') + + +class BatchManager: + """ Class used to generate and handle the creation of batches + from an input stream of items. + + This is used for streaming/parallel processing. Batches will have a folder + and a filename is extracted from each item and linked into the batch + folder. + """ + def __init__(self, batchSize, inputItemsIterator, workingPath, + itemFileNameFunc=lambda item: item.getFileName()): + """ + Args: + batchSize: Number of items that will be grouped into one batch + inputItemsIterator: input items iterator + workingPath: path where the batches folder will be created + itemFileNameFunc: function to extract a filename from each item + (by default: lambda item: item.getFileName()) + """ + self._items = inputItemsIterator + self._batchSize = batchSize + self._batchCount = 0 + self._workingPath = workingPath + self._itemFileNameFunc = itemFileNameFunc + + def generate(self): + """ Generate batches based on the input items. """ + def _createBatch(items): + batch_id = str(uuid4()) + batch_path = os.path.join(self._workingPath, batch_id) + ts = Pretty.now() + + print(f"{ts}: Creating batch: {batch_path}") + Process.system(f"rm -rf '{batch_path}'") + Process.system(f"mkdir '{batch_path}'") + + for item in items: + fn = item.getFileName() + baseName = os.path.basename(fn) + os.symlink(os.path.abspath(fn), + os.path.join(batch_path, baseName)) + self._batchCount += 1 + return { + 'items': items, + 'id': batch_id, + 'path': batch_path, + 'index': self._batchCount + } + + items = [] + + for item in self._items: + items.append(item) + + if len(items) == self._batchSize: + yield _createBatch(items) + items = [] + + if items: + yield _createBatch(items) diff --git a/emtools/metadata/starfile.py b/emtools/metadata/starfile.py index b5dd2af..6fee548 100644 --- a/emtools/metadata/starfile.py +++ b/emtools/metadata/starfile.py @@ -21,9 +21,13 @@ __author__ = 'Jose Miguel de la Rosa Trevin, Grigory Sharov' +import os import sys +import time import re from contextlib import AbstractContextManager +from collections import OrderedDict +from datetime import datetime, timedelta from .table import ColumnList, Table @@ -391,6 +395,68 @@ def writeTable(self, tableName, table, singleRow=False): self._writeTableName(tableName) +class StarMonitor(OrderedDict): + """ + Monitor a STAR file for changes and return new items in a given table. + + This class will subclass OrderedDict to hold a clone of each new element. + It will also keep internally the last access timestamp to prevent loading + the STAR file if it has not been modified since the last check. + """ + def __init__(self, fileName, tableName, rowKeyFunc, **kwargs): + self._seenItems = set() + self._fileName = fileName + self._tableName = tableName + self._rowKeyFunc = rowKeyFunc + self._sleepOnWait = kwargs.get('sleepOnWait', 10) + self._inputTimeout = timedelta(seconds=kwargs.get('inputTimeout', 300)) + self.lastCheck = None # Last timestamp when input was checked + self.lastUpdate = None # Last timestamp when new items were found + self.inputCount = 0 # Count all input elements + + # Black list some items to not be monitored again + # We are not interested in the items but just skip them from + # the processing + blacklist = kwargs.get('blacklist', None) + if blacklist: + for row in blacklist: + self._seenItems.add(self._rowKeyFunc(row)) + + def update(self): + newRows = [] + now = datetime.now() + mTime = datetime.fromtimestamp(os.path.getmtime(self._fileName)) + + if self.lastCheck is None or mTime > self.lastCheck: + with StarFile(self._fileName) as sf: + for row in sf.iterTable(self._tableName): + rowKey = self._rowKeyFunc(row) + if rowKey not in self._seenItems: + self.inputCount += 1 + self._seenItems.add(rowKey) + newRows.append(row) + + self.lastCheck = now + if newRows: + self.lastUpdate = now + return newRows + + def timedOut(self): + """ Return True when there has been inputTimeout seconds + since last new items were found. """ + if self.lastCheck is None or self.lastUpdate is None: + return False + else: + return self.lastCheck - self.lastUpdate > self._inputTimeout + + def newItems(self, sleep=10): + """ Yield new items since last update until the stream is closed. """ + while not self.timedOut(): + for row in self.update(): + yield row + time.sleep(self._sleepOnWait) + + # --------- Helper functions ------------------------ def _formatValue(v): return '%0.6f' % v if isinstance(v, float) else str(v) diff --git a/emtools/pwx/__init__.py b/emtools/pwx/__init__.py index 68105c9..f020505 100644 --- a/emtools/pwx/__init__.py +++ b/emtools/pwx/__init__.py @@ -16,8 +16,11 @@ # This emtools submodule need Scipion environment -from .monitors import ProtocolMonitor, SetMonitor, BatchManager +from .monitors import ProtocolMonitor, SetMonitor from .workflow import Workflow +# This is imported here for backward compatibility +from emtools.metadata import BatchManager + __all__ = ["ProtocolMonitor", "SetMonitor", "Workflow", "BatchManager"] diff --git a/emtools/pwx/monitors.py b/emtools/pwx/monitors.py index 28f1b43..4c6f97b 100644 --- a/emtools/pwx/monitors.py +++ b/emtools/pwx/monitors.py @@ -18,9 +18,7 @@ import time from datetime import datetime from collections import OrderedDict -from uuid import uuid4 -from emtools.utils import Pretty, Process import pyworkflow.protocol as pwprot @@ -177,49 +175,3 @@ def iterProtocolInput(self, prot, label, waitSecs=60): prot.info(f"No more {label}, stream closed. Total: {len(self)}") - -class BatchManager: - """ Class used to generate and handle creation of item batch - for streaming/parallel processing. - """ - def __init__(self, batchSize, inputItemsIterator, workingPath): - self._items = inputItemsIterator - self._batchSize = batchSize - self._batchCount = 0 - self._workingPath = workingPath - - def generate(self): - """ Generate batches based on the input items. """ - def _createBatch(items): - batch_id = str(uuid4()) - batch_path = os.path.join(self._workingPath, batch_id) - ts = Pretty.now() - - print(f"{ts}: Creating batch: {batch_path}") - Process.system(f"rm -rf '{batch_path}'") - Process.system(f"mkdir '{batch_path}'") - - for item in items: - fn = item.getFileName() - baseName = os.path.basename(fn) - os.symlink(os.path.abspath(fn), - os.path.join(batch_path, baseName)) - self._batchCount += 1 - return { - 'items': items, - 'id': batch_id, - 'path': batch_path, - 'index': self._batchCount - } - - items = [] - - for item in self._items: - items.append(item) - - if len(items) == self._batchSize: - yield _createBatch(items) - items = [] - - if items: - yield _createBatch(items) diff --git a/emtools/scripts/emt-scipion-otf.py b/emtools/scripts/emt-scipion-otf.py index e42719f..07188d6 100755 --- a/emtools/scripts/emt-scipion-otf.py +++ b/emtools/scripts/emt-scipion-otf.py @@ -18,10 +18,12 @@ import json import os.path import argparse +import threading import time import datetime as dt +from pprint import pprint -from emtools.utils import Process, Color, System +from emtools.utils import Process, Color, System, Pretty from emtools.jobs import Pipeline from emtools.metadata import EPU, SqliteFile, StarFile, Table @@ -40,7 +42,6 @@ # Some global workflow parameters params = {} -OTF_FILE = 'scipion-otf.json' cwd = os.getcwd() @@ -74,8 +75,9 @@ def calculateBoxSize(protCryolo): # Calculate the boxsize based on double of cryolo particle estimation # and recommended EMAN's boxsizes for performance + ps = protCryolo.inputMicrographs.get().getSamplingRate() params['partSizePx'] = protCryolo.boxsize.get() - params['partSizeA'] = params['partSizePx'] * protCryolo.inputMicrographs.get().getSamplingRate() + params['partSizeA'] = params['partSizePx'] * ps boxSize = max(params['partSizePx'] * 2.5, 100) for bs in EMAN_BOXSIZES: if bs > boxSize: @@ -84,83 +86,120 @@ def calculateBoxSize(protCryolo): params['boxSize'] = boxSize -def load_otf(): - otf = {'2d': {}} - if os.path.exists(OTF_FILE): - with open(OTF_FILE) as f: - otf = json.load(f) - - return otf - - -def save_otf(otf): - with open(OTF_FILE, 'w') as f: - json.dump(otf, f, indent=4) - - -def run2DPipeline(wf, protExtract, particleSizeA, gpus): - """ - Run the 2D pipeline for a given workflow, after particles extraction. - - Args: - wf: workflow instance. - protExtract: particles extraction protocol. - particleSizeA: Size of the particles in Angstroms - gpus: list with GPUs that will be used for 2D classification - """ - print(f"\n>>> Running 2D pipeline: input extract " - f"{Color.bold(protExtract.getRunName())}") - - otf = load_otf() - - def _createBatch2D(gridsquare, subsetParts): - rangeStr = f"{subsetParts[0].getObjId()} - {subsetParts[-1].getObjId()}" - print(f"Creating subset with range: {rangeStr}") - protSubset = wf.createProtocol( - 'pwem.protocols.ProtUserSubSet', - objLabel=f'subset: {gridsquare} : {rangeStr}', - ) - _setPointer(protSubset.inputObject, protExtract, OUT_PART) - wf.saveProtocol(protSubset) - protSubset.makePathsAndClean() - # Create subset particles as output for the protocol - inputParticles = protExtract.outputParticles - outputParticles = protSubset._createSetOfParticles() - outputParticles.copyInfo(inputParticles) - for particle in subsetParts: - outputParticles.append(particle) - protSubset._defineOutputs(outputParticles=outputParticles) - protSubset._defineTransformRelation(inputParticles, outputParticles) - protSubset.setStatus(STATUS_FINISHED) - wf.project._storeProtocol(protSubset) - - protRelion2D = wf.createProtocol( - 'relion.protocols.ProtRelionClassify2D', - objLabel=f'relion2d: {gridsquare}', - maskDiameterA=round(particleSizeA * 1.5), - numberOfClasses=200, - extraParams='--maxsig 50', - pooledParticles=50, - doGpu=True, - gpusToUse=','.join(str(g) for g in gpus), - numberOfThreads=32, - numberOfMpi=1, - allParticlesRam=True, - useGradientAlg=True, - ) - - _setPointer(protRelion2D.inputParticles, protSubset, OUT_PART) - wf.saveProtocol(protRelion2D) - otf['2d'][gridsquare] = { - 'runId': protRelion2D.getObjId(), - 'runName': protRelion2D.getRunName(), - 'runDir': protRelion2D.getWorkingDir() +class Rln2DPipeline(Pipeline): + def __init__(self, wf, protExtract, particleSizeA, gpus, dry=False): + """ + 2D pipeline for a given workflow, after particles extraction. + + Args: + wf: workflow instance. + protExtract: particles extraction protocol. + particleSizeA: Size of the particles in Angstroms + gpus: list with GPUs that will be used for 2D classification + """ + Pipeline.__init__(self) + print(f"\n>>> Running 2D pipeline: input extract " + f"{Color.bold(protExtract.getRunName())}") + + self.lock = threading.Lock() + self._jsonFile = 'scipion-otf-2d.json' + self._json = self._load_json() + pprint(self._json) + + g = self.addGenerator(self._generate2D) + self.addProcessor(g.outputQueue, self._run2D) + + + def _load_json(self): + self._json = {'groups': {}, 'gridsquares': {}} + + if os.path.exists(self._jsonFile): + with open(self._jsonFile) as f: + self._json = json.load(f) + + def _save_json(self): + with open(self._jsonFile, 'w') as f: + json.dump(self._json, f, indent=4) + + @property + def batches(self): + return self._json['batches'] + + def _updateBatch(self, batch): + self.lock.acquire() + self.batches[batch['id']] = batch + _save_json(self._json) + self.lock.release() + def _createBatch2D(self, gridsquare, subsetParts): + firstId, lastId = subsetParts[0].getObjId(), subsetParts[-1].getObjId() + suffix = f"{gridsquare}: {firstId} - {lastId}" + batch = { + 'id': str(len(self.batches) + 1), + 'gridsquares': [gridsquare], + 'suffix': suffix, + 'firstId': firstId, + 'lastId': lastId } - save_otf(otf) - - return {'gs': gridsquare, 'prot': protRelion2D} - def _generate2D(): + print(f"Creating subset for {suffix}") + if dry: + print("...running in DRY MODE.") + else: + protSubset = wf.createProtocol( + 'pwem.protocols.ProtUserSubSet', + objLabel=f'subset: {suffix}', + ) + _setPointer(protSubset.inputObject, protExtract, OUT_PART) + wf.saveProtocol(protSubset) + protSubset.makePathsAndClean() + # Create subset particles as output for the protocol + inputParticles = protExtract.outputParticles + outputParticles = protSubset._createSetOfParticles() + outputParticles.copyInfo(inputParticles) + for particle in subsetParts: + outputParticles.append(particle) + protSubset._defineOutputs(outputParticles=outputParticles) + protSubset._defineTransformRelation(inputParticles, outputParticles) + protSubset.setStatus(STATUS_FINISHED) + wf.project._storeProtocol(protSubset) + + protRelion2D = wf.createProtocol( + 'relion.protocols.ProtRelionClassify2D', + objLabel=f'relion2d: {suffix}', + maskDiameterA=round(particleSizeA * 1.5), + numberOfClasses=200, + extraParams='--maxsig 50', + pooledParticles=50, + doGpu=True, + gpusToUse=','.join(str(g) for g in gpus), + numberOfThreads=32, + numberOfMpi=1, + allParticlesRam=True, + useGradientAlg=True, + ) + + _setPointer(protRelion2D.inputParticles, protSubset, OUT_PART) + wf.saveProtocol(protRelion2D) + + lock.acquire() + + def _run(prot): + return { + 'runId': prot.getObjId(), + 'runName': prot.getRunName(), + 'runDir': prot.getWorkingDir() + } + + batch.update( + {'runs': [_run(p) for p in [protSubset, protRelion2D]]}) + otf_2d['gridsquares'][gridsquare] = batch['id'] + self._updateBatch(batch) + + batch['prot'] = protRelion2D + + return batch + + def _generate2D(self): """ Generated subset of 2D from the outputParticles from extract protocol. Subsets will be created based on the GridSquare of the micrographs. """ lastParticleIndex = 0 @@ -168,14 +207,11 @@ def _generate2D(): # Classify in batches lastMt = 0 extractSqliteFn = protExtract.outputParticles.getFileName() - tmpSqliteFn = '/tmp/particles.sqlite' + tmpSqlite = os.path.abspath(extractSqliteFn).replace('/', '__') + tmpSqliteFn = os.path.join('/tmp', tmpSqlite) while True: - if lastGs: # Not the first time, let's wait - print("Sleeping...") - time.sleep(30) # wait for 5 minutes before checking for new jobs - - print("Wake up!!!") + print("Checking new particles!!!") mt = os.path.getmtime(extractSqliteFn) if mt > lastMt: # Let's iterate over the particles to check if there is a @@ -186,6 +222,8 @@ def _generate2D(): Process.system(f'rm -rf {tmpSqliteFn}') SqliteFile.copyDb(extractSqliteFn, tmpSqliteFn, tries=10, wait=30) print("Copy done!") + else: + print("Particles db has not changed since: ", Pretty.timestamp(lastMt)) parts = SetOfParticles(filename=tmpSqliteFn) subsetParts = [] @@ -236,25 +274,25 @@ def _generate2D(): break lastMt = mt + print("Sleeping...") + time.sleep(30) # wait for 5 minutes before checking for new jobs - def _run2D(batch): - protRelion2D = batch['prot'] - wf.launchProtocol(protRelion2D, wait=True) + def _run2D(self, batch): + # In Dry Mode the is not prot in the batch dict + if protRelion2D := batch.get('prot', None): + #wf.launchProtocol(protRelion2D, wait=True) + wf.saveProtocol(protRelion2D) - protRelion2DSelect = wf.createProtocol( - 'relion.protocols.ProtRelionSelectClasses2D', - objLabel=f"select 2d - {batch['gs']}", - minThreshold=0.05, - minResolution=30.0, - ) + protRelion2DSelect = wf.createProtocol( + 'relion.protocols.ProtRelionSelectClasses2D', + objLabel=f"select 2d - {batch['gs']}", + minThreshold=0.05, + minResolution=30.0, + ) - protRelion2DSelect.inputProtocol.set(protRelion2D) - wf.launchProtocol(protRelion2DSelect, wait=True) - - ppl = Pipeline() - g = ppl.addGenerator(_generate2D) - ppl.addProcessor(g.outputQueue, _run2D) - ppl.run() + protRelion2DSelect.inputProtocol.set(protRelion2D) + #wf.launchProtocol(protRelion2DSelect, wait=True) + wf.saveProtocol(protRelion2DSelect) def clean_project(workingDir): @@ -408,28 +446,31 @@ def _path(*p): wf.launchProtocol(protRelionExtract, wait={OUT_PART: 100}) -def continue_2d(workingDir, gpus): +def continue_2d(workingDir, gpus, dry): print(f"Loading project from {workingDir}") project = Project(pw.Config.getDomain(), workingDir) project.load() wf = Workflow(project) - protExtract = protCryolo = None - - for run in project.getRuns(): - clsName = run.getClassName() - print(f"Run {run.getObjId()}: {clsName}") - if clsName == 'ProtRelionExtractParticles': - protExtract = run - elif clsName.startswith('SphireProtCRYOLOPicking'): - protCryolo = run + protocols = {} - if not protExtract.isActive(): - print("Re-running extract protocol...") - wf.launchProtocol(protExtract, wait={OUT_PART: 100}) + def _loadProtocols(): + for run in project.getRuns(): + if clsName.startswith('SphireProtCRYOLOPicking'): + protocols['picking'] = run + elif run.getClassName() == 'ProtRelionExtractParticles': + protocols['extract'] = run + return protocols - calculateBoxSize(protCryolo) + _loadProtocols() + protExtract = protocols.get('extract', None) + while protExtract is None: + time.sleep(60) # wait for 5 mins + _loadProtocols() + protExtract = protocols.get('extract', None) - run2DPipeline(wf, protExtract, params['partSizeA'], gpus) + wf.wait(protExtract, wait={OUT_PART: 100}) + calculateBoxSize(protocols['picking']) + Rln2DPipeline(wf, protExtract, params['partSizeA'], gpus, dry=dry).run() def restart(workingDir, args): @@ -588,6 +629,39 @@ def write_coordinates(micStarFn, prot): def print_prot(prot, label='Protocol'): print(f">>> {label} {prot.getObjId():>6} {prot.getClassName():<30} {prot.getRunName()}") +def match_epu_xmls(workingDir, micStarFn='micrographs_ctf.star'): + total = 0 + missing = 0 + xmlFolder = os.path.join('EPU', 'MicsXML') + Process.system(f'rm -rf {xmlFolder} && mkdir {xmlFolder}') + with StarFile(micStarFn) as sf: + for row in sf.iterTable('micrographs'): + _, name = row.rlnMicrographMovieName.split('Images-Disc1_') + xmlName = os.path.join('Images-Disc1', name.replace('_EER.eer', '.xml')) + xmlFile = os.path.join('EPU', xmlName) + total += 1 + micBase = os.path.basename(row.rlnMicrographName) + if os.path.exists(xmlFile): + linkName = os.path.join(xmlFolder, micBase.replace('.mrc', '.xml')) + cmd = f"ln -s {xmlFile.replace('EPU/', '../')} {linkName}" + Process.system(cmd) + else: + print(micBase, '->', Color.red(xmlFile)) + missing += 1 + + print(f"Missing {Color.red(missing)} XMLs out of {Color.bold(total)} micrographs") + + # mics = Table(['rlnMicrographName', + # 'rlnOpticsGroup', + # 'rlnCtfImage', + # 'rlnDefocusU', + # 'rlnDefocusV', + # 'rlnCtfAstigmatism', + # 'rlnDefocusAngle', + # 'rlnCtfFigureOfMerit', + # 'rlnCtfMaxResolution', + # 'rlnMicrographMovieName']) + # ps = firstMic.getSamplingRate() def write_stars(workingDir, ids=None): """ Write star files for Relion. Generates micrographs_ctf.star, @@ -727,12 +801,16 @@ def main(): "and the Cryolo picking for picking. One can pass a string" "with the protocol ids for ctfs and/or picking. For example:" "--write_starts 'ctfs=1524 picking=1711'") + g.add_argument('--match_epu_xmls', action='store_true') + g.add_argument('--clone_project', nargs=2, metavar=('SRC', 'DST'), help="Clone an existing Scipion project") g.add_argument('--fix_run_links', metavar='RUNS_SRC', help="Fix links of Runs of this project from another one.") g.add_argument('--print_protocol', '-p', help="Print the values of a given protocol.") + p.add_argument('--dry', action='store_true', + help="Do not excute operations, just show the steps.") args = p.parse_args() @@ -753,7 +831,7 @@ def main(): elif 'write_stars' in args: write_stars(cwd, ids=args.write_stars) elif gpus := args.continue_2d: - continue_2d(cwd, gpus) + continue_2d(cwd, gpus, args.dry) elif args.clone_project: src, dst = args.clone_project clone_project(src, dst) @@ -761,6 +839,8 @@ def main(): fix_run_links(cwd, args.fix_run_links) elif protId := args.print_protocol: print_protocol(cwd, protId) + elif args.match_epu_xmls: + match_epu_xmls(cwd) else: # by default open the GUI from pyworkflow.gui.project import ProjectWindow ProjectWindow(cwd).show() diff --git a/emtools/tests/test_metadata.py b/emtools/tests/test_metadata.py index 84b41e4..05c6bc3 100644 --- a/emtools/tests/test_metadata.py +++ b/emtools/tests/test_metadata.py @@ -16,10 +16,14 @@ import os import unittest import tempfile +import random +import time +import threading from pprint import pprint +from datetime import datetime -from emtools.utils import Timer, Color -from emtools.metadata import StarFile, SqliteFile, EPU +from emtools.utils import Timer, Color, Pretty +from emtools.metadata import StarFile, SqliteFile, EPU, StarMonitor from emtools.tests import testpath # Try to load starfile library to launch some comparisons @@ -240,6 +244,82 @@ def _checkValues(t): os.unlink(ftmp.name) + def test_star_monitor(self): + partStar = testpath('metadata', 'particles_1k.star') + if partStar is None: + return + + N = 1000 + + with StarFile(partStar) as sf: + ptable = sf.getTable('particles') + self.assertEqual(len(ptable), N) + otable = sf.getTable('optics') + self.assertEqual(len(otable), 1) + + ftmp = tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.star') + print(f">>>> Using temporary file: {ftmp.name}") + + def _write_star_parts(): + with StarFile(ftmp) as sfOut: + sfOut.writeTable('optics', otable) + sfOut.writeHeader('particles', ptable) + u = int(random.uniform(5, 10)) + s = u * 10 + w = 0 + for i, row in enumerate(ptable): + if i == s: + print(f"{w} rows written.") + ftmp.flush() + time.sleep(3) + u = int(random.uniform(5, 10)) + s = i + u * 10 + w = 0 + sfOut.writeRow(row) + w += 1 + + print(f"{w} rows written.") + + th = threading.Thread(target=_write_star_parts) + print(">>> Starting thread...") + th.start() + + monitor = StarMonitor(ftmp.name, 'particles', + lambda row: row.rlnImageId, + inputTimeout=30) + + totalRows = 0 + while not monitor.timedOut(): + newRows = monitor.update() + n = len(newRows) + totalRows += n + print(f"New rows: {n}") + print(f"Last update: {Pretty.datetime(monitor.lastUpdate)} " + f"Last check: {Pretty.datetime(monitor.lastCheck)} " + f"No activity: {Pretty.delta(monitor.lastCheck - monitor.lastUpdate)}") + time.sleep(5) + + self.assertEqual(totalRows, N) + # + # for i in range(10): + # time.sleep(5) + # mTime = datetime.fromtimestamp(os.path.getmtime(ftmp.name)) + # print("Last modified: ", Pretty.datetime(mTime)) + + print("<<< Waiting for thread") + th.join() + + ftmp.close() + + # Check output is what we expect + with StarFile(ftmp.name) as sf: + ptable = sf.getTable('particles') + self.assertEqual(len(ptable), N) + otable = sf.getTable('optics') + self.assertEqual(len(otable), 1) + + os.unlink(ftmp.name) + class TestEPU(unittest.TestCase): """ Tests for EPU class. """ @@ -266,6 +346,7 @@ def test_read_session_info(self): pprint(session) + class TestSqliteFile(unittest.TestCase): """ Tests for StarFile class. From a8cc5d8d96960055e73a6f05f26e10ecd6d559f5 Mon Sep 17 00:00:00 2001 From: "J.M de la Rosa Trevin" Date: Thu, 19 Sep 2024 21:13:44 -0500 Subject: [PATCH 3/8] More on Star monitor tools --- emtools/jobs/__init__.py | 3 +- emtools/jobs/batch_manager.py | 80 +++++++++++++++++++++++ emtools/metadata/__init__.py | 5 +- emtools/metadata/misc.py | 63 +------------------ emtools/metadata/starfile.py | 18 +++--- emtools/pwx/__init__.py | 2 +- emtools/tests/star_pipeline_tester.py | 91 +++++++++++++++++++++++++++ emtools/tests/test_metadata.py | 88 ++++++++++++++++++++------ 8 files changed, 254 insertions(+), 96 deletions(-) create mode 100644 emtools/jobs/batch_manager.py create mode 100644 emtools/tests/star_pipeline_tester.py diff --git a/emtools/jobs/__init__.py b/emtools/jobs/__init__.py index fff9a6d..d09e3e8 100644 --- a/emtools/jobs/__init__.py +++ b/emtools/jobs/__init__.py @@ -15,5 +15,6 @@ # ************************************************************************** from .pipeline import Pipeline +from .batch_manager import BatchManager -__all__ = ["Pipeline"] \ No newline at end of file +__all__ = ["Pipeline", "BatchManager"] \ No newline at end of file diff --git a/emtools/jobs/batch_manager.py b/emtools/jobs/batch_manager.py new file mode 100644 index 0000000..8dc4e5f --- /dev/null +++ b/emtools/jobs/batch_manager.py @@ -0,0 +1,80 @@ +# ************************************************************************** +# * +# * Authors: J.M. de la Rosa Trevin (delarosatrevin@gmail.com) +# * +# * This program is free software; you can redistribute it and/or modify +# * it under the terms of the GNU General Public License as published by +# * the Free Software Foundation; either version 3 of the License, or +# * (at your option) any later version. +# * +# * This program is distributed in the hope that it will be useful, +# * but WITHOUT ANY WARRANTY; without even the implied warranty of +# * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# * GNU General Public License for more details. +# * +# ************************************************************************** + +import os +from uuid import uuid4 +from emtools.utils import Path, Pretty, Process + + +class BatchManager: + """ Class used to generate and handle the creation of batches + from an input stream of items. + + This is used for streaming/parallel processing. Batches will have a folder + and a filename is extracted from each item and linked into the batch + folder. + """ + def __init__(self, batchSize, inputItemsIterator, workingPath, + itemFileNameFunc=lambda item: item.getFileName()): + """ + Args: + batchSize: Number of items that will be grouped into one batch + inputItemsIterator: input items iterator + workingPath: path where the batches folder will be created + itemFileNameFunc: function to extract a filename from each item + (by default: lambda item: item.getFileName()) + """ + self._items = inputItemsIterator + self._batchSize = batchSize + self._batchCount = 0 + self._workingPath = workingPath + self._itemFileNameFunc = itemFileNameFunc + + def generate(self): + """ Generate batches based on the input items. """ + def _createBatch(items): + batch_id = str(uuid4()) + batch_path = os.path.join(self._workingPath, batch_id) + ts = Pretty.now() + + print(f"{ts}: Creating batch: {batch_path}") + Process.system(f"rm -rf '{batch_path}'") + Process.system(f"mkdir '{batch_path}'") + + for item in items: + fn = self._itemFileNameFunc(item) + baseName = os.path.basename(fn) + os.symlink(os.path.abspath(fn), + os.path.join(batch_path, baseName)) + self._batchCount += 1 + return { + 'items': items, + 'id': batch_id, + 'path': batch_path, + 'index': self._batchCount + } + + items = [] + + for item in self._items: + items.append(item) + + if len(items) == self._batchSize: + yield _createBatch(items) + items = [] + + if items: + yield _createBatch(items) diff --git a/emtools/metadata/__init__.py b/emtools/metadata/__init__.py index 3c80c4d..0445682 100644 --- a/emtools/metadata/__init__.py +++ b/emtools/metadata/__init__.py @@ -17,10 +17,9 @@ from .table import Column, ColumnList, Table from .starfile import StarFile, StarMonitor from .epu import EPU -from .misc import Bins, TsBins, DataFiles, MovieFiles, BatchManager +from .misc import Bins, TsBins, DataFiles, MovieFiles from .sqlite import SqliteFile __all__ = ["Column", "ColumnList", "Table", "StarFile", "StarMonitor", "EPU", - "Bins", "TsBins", "SqliteFile", "DataFiles", "MovieFiles", - "BatchManager"] + "Bins", "TsBins", "SqliteFile", "DataFiles", "MovieFiles"] diff --git a/emtools/metadata/misc.py b/emtools/metadata/misc.py index 238cfac..e570fc1 100644 --- a/emtools/metadata/misc.py +++ b/emtools/metadata/misc.py @@ -15,7 +15,7 @@ # ************************************************************************** import os -from uuid import uuid4 + from datetime import datetime, timedelta from emtools.utils import Path, Pretty, Process @@ -248,64 +248,3 @@ def total_movies(self): def print(self, sort=None): DataFiles.print(self, sort=sort) self.counters[1].print('movie') - - -class BatchManager: - """ Class used to generate and handle the creation of batches - from an input stream of items. - - This is used for streaming/parallel processing. Batches will have a folder - and a filename is extracted from each item and linked into the batch - folder. - """ - def __init__(self, batchSize, inputItemsIterator, workingPath, - itemFileNameFunc=lambda item: item.getFileName()): - """ - Args: - batchSize: Number of items that will be grouped into one batch - inputItemsIterator: input items iterator - workingPath: path where the batches folder will be created - itemFileNameFunc: function to extract a filename from each item - (by default: lambda item: item.getFileName()) - """ - self._items = inputItemsIterator - self._batchSize = batchSize - self._batchCount = 0 - self._workingPath = workingPath - self._itemFileNameFunc = itemFileNameFunc - - def generate(self): - """ Generate batches based on the input items. """ - def _createBatch(items): - batch_id = str(uuid4()) - batch_path = os.path.join(self._workingPath, batch_id) - ts = Pretty.now() - - print(f"{ts}: Creating batch: {batch_path}") - Process.system(f"rm -rf '{batch_path}'") - Process.system(f"mkdir '{batch_path}'") - - for item in items: - fn = item.getFileName() - baseName = os.path.basename(fn) - os.symlink(os.path.abspath(fn), - os.path.join(batch_path, baseName)) - self._batchCount += 1 - return { - 'items': items, - 'id': batch_id, - 'path': batch_path, - 'index': self._batchCount - } - - items = [] - - for item in self._items: - items.append(item) - - if len(items) == self._batchSize: - yield _createBatch(items) - items = [] - - if items: - yield _createBatch(items) diff --git a/emtools/metadata/starfile.py b/emtools/metadata/starfile.py index 6fee548..131beb4 100644 --- a/emtools/metadata/starfile.py +++ b/emtools/metadata/starfile.py @@ -395,7 +395,7 @@ def writeTable(self, tableName, table, singleRow=False): self._writeTableName(tableName) -class StarMonitor(OrderedDict): +class StarMonitor: """ Monitor a STAR file for changes and return new items in a given table. @@ -405,11 +405,11 @@ class StarMonitor(OrderedDict): """ def __init__(self, fileName, tableName, rowKeyFunc, **kwargs): self._seenItems = set() - self._fileName = fileName + self.fileName = fileName self._tableName = tableName self._rowKeyFunc = rowKeyFunc - self._sleepOnWait = kwargs.get('sleepOnWait', 10) - self._inputTimeout = timedelta(seconds=kwargs.get('inputTimeout', 300)) + self._wait = kwargs.get('wait', 10) + self._timeout = timedelta(seconds=kwargs.get('timeout', 300)) self.lastCheck = None # Last timestamp when input was checked self.lastUpdate = None # Last timestamp when new items were found self.inputCount = 0 # Count all input elements @@ -425,10 +425,10 @@ def __init__(self, fileName, tableName, rowKeyFunc, **kwargs): def update(self): newRows = [] now = datetime.now() - mTime = datetime.fromtimestamp(os.path.getmtime(self._fileName)) + mTime = datetime.fromtimestamp(os.path.getmtime(self.fileName)) if self.lastCheck is None or mTime > self.lastCheck: - with StarFile(self._fileName) as sf: + with StarFile(self.fileName) as sf: for row in sf.iterTable(self._tableName): rowKey = self._rowKeyFunc(row) if rowKey not in self._seenItems: @@ -442,19 +442,19 @@ def update(self): return newRows def timedOut(self): - """ Return True when there has been inputTimeout seconds + """ Return True when there has been timeout seconds since last new items were found. """ if self.lastCheck is None or self.lastUpdate is None: return False else: - return self.lastCheck - self.lastUpdate > self._inputTimeout + return self.lastCheck - self.lastUpdate > self._timeout def newItems(self, sleep=10): """ Yield new items since last update until the stream is closed. """ while not self.timedOut(): for row in self.update(): yield row - time.sleep(self._sleepOnWait) + time.sleep(self._wait) # --------- Helper functions ------------------------ diff --git a/emtools/pwx/__init__.py b/emtools/pwx/__init__.py index f020505..a49fe6b 100644 --- a/emtools/pwx/__init__.py +++ b/emtools/pwx/__init__.py @@ -20,7 +20,7 @@ from .workflow import Workflow # This is imported here for backward compatibility -from emtools.metadata import BatchManager +from emtools.jobs import BatchManager __all__ = ["ProtocolMonitor", "SetMonitor", "Workflow", "BatchManager"] diff --git a/emtools/tests/star_pipeline_tester.py b/emtools/tests/star_pipeline_tester.py new file mode 100644 index 0000000..4f46474 --- /dev/null +++ b/emtools/tests/star_pipeline_tester.py @@ -0,0 +1,91 @@ +# ************************************************************************** +# * +# * Authors: J.M. de la Rosa Trevin (delarosatrevin@gmail.com) +# * +# * This program is free software; you can redistribute it and/or modify +# * it under the terms of the GNU General Public License as published by +# * the Free Software Foundation; either version 3 of the License, or +# * (at your option) any later version. +# * +# * This program is distributed in the hope that it will be useful, +# * but WITHOUT ANY WARRANTY; without even the implied warranty of +# * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# * GNU General Public License for more details. +# * +# ************************************************************************** + +import os +import time + +from emtools.jobs import Pipeline, BatchManager +from emtools.metadata import StarFile, StarMonitor +from emtools.utils import Pretty + + +class StarPipelineTester(Pipeline): + """ Helper class to test Pipeline behaviour base on an input + STAR file generated in streaming. + """ + def __init__(self, inputStar, workingDir, **kwargs): + Pipeline.__init__(self) + + self.threads = kwargs.get('threads', 4) + self.batchSize = kwargs.get('batchSize', 128) + self.workingDir = workingDir + self.inputStar = inputStar + self.outputStar = os.path.join(workingDir, 'output.star') + self._sf = None + self._file = None + self.totalItems = 0 + + print(f">>> {Pretty.now()}: ----------------- " + f"Starting STAR Pipeline Tester ----------- ") + + monitor = StarMonitor(inputStar, 'particles', + lambda row: row.rlnImageId, + timeout=30) + + batchMgr = BatchManager(self.batchSize, monitor.newItems(), workingDir, + itemFileNameFunc=self._filename) + + g = self.addGenerator(batchMgr.generate) + outputQueue = None + print(f"Creating {self.threads} processing threads.") + for _ in range(self.threads): + p = self.addProcessor(g.outputQueue, self._process, + outputQueue=outputQueue) + outputQueue = p.outputQueue + + self.addProcessor(outputQueue, self._output) + + def _filename(self, row): + """ Helper to get unique name from a particle row. """ + pts, stack = row.rlnImageName.split('@') + return stack.replace('.mrcs', f'_p{pts}.mrcs') + + def _process(self, batch): + """ Dummy function to process an input batch. """ + time.sleep(5) + return batch + + def _output(self, batch): + """ Compile a batch that has been 'processed'. """ + if self._sf is None: + self._file = open(self.outputStar, 'w') + self._sf = StarFile(self._file) + with StarFile(self.inputStar) as sf: + self._sf.writeTable('optics', sf.getTable('optics')) + self._sf.writeHeader('particles', sf.getTableInfo('particles')) + + for row in batch['items']: + self._sf.writeRow(row) + self._file.flush() + + self.totalItems += len(batch['items']) + + def run(self): + Pipeline.run(self) + if self._sf is not None: + self._sf.close() + + diff --git a/emtools/tests/test_metadata.py b/emtools/tests/test_metadata.py index 05c6bc3..6499685 100644 --- a/emtools/tests/test_metadata.py +++ b/emtools/tests/test_metadata.py @@ -19,13 +19,17 @@ import random import time import threading +import tempfile from pprint import pprint from datetime import datetime from emtools.utils import Timer, Color, Pretty from emtools.metadata import StarFile, SqliteFile, EPU, StarMonitor +from emtools.jobs import BatchManager from emtools.tests import testpath +from .star_pipeline_tester import StarPipelineTester + # Try to load starfile library to launch some comparisons try: import starfile @@ -244,7 +248,7 @@ def _checkValues(t): os.unlink(ftmp.name) - def test_star_monitor(self): + def __test_star_streaming(self, monitorFunc): partStar = testpath('metadata', 'particles_1k.star') if partStar is None: return @@ -286,25 +290,10 @@ def _write_star_parts(): monitor = StarMonitor(ftmp.name, 'particles', lambda row: row.rlnImageId, - inputTimeout=30) - - totalRows = 0 - while not monitor.timedOut(): - newRows = monitor.update() - n = len(newRows) - totalRows += n - print(f"New rows: {n}") - print(f"Last update: {Pretty.datetime(monitor.lastUpdate)} " - f"Last check: {Pretty.datetime(monitor.lastCheck)} " - f"No activity: {Pretty.delta(monitor.lastCheck - monitor.lastUpdate)}") - time.sleep(5) - - self.assertEqual(totalRows, N) - # - # for i in range(10): - # time.sleep(5) - # mTime = datetime.fromtimestamp(os.path.getmtime(ftmp.name)) - # print("Last modified: ", Pretty.datetime(mTime)) + timeout=30) + + totalCount = monitorFunc(monitor) + self.assertEqual(totalCount, N) print("<<< Waiting for thread") th.join() @@ -320,6 +309,65 @@ def _write_star_parts(): os.unlink(ftmp.name) + def test_star_monitor(self): + """ Basic test checking that we are able to monitor a streaming + generated star file. The final count of rows should be the + same as the input one. + """ + def _monitor(monitor): + totalRows = 0 + while not monitor.timedOut(): + newRows = monitor.update() + n = len(newRows) + totalRows += n + print(f"New rows: {n}") + print(f"Last update: {Pretty.datetime(monitor.lastUpdate)} " + f"Last check: {Pretty.datetime(monitor.lastCheck)} " + f"No activity: {Pretty.delta(monitor.lastCheck - monitor.lastUpdate)}") + time.sleep(5) + return totalRows + + self.__test_star_streaming(_monitor) + + def test_star_batchmanager(self): + """ Testing the creating of batches from an input star monitor + using different batch sizes. + """ + + def _filename(row): + """ Helper to get unique name from a particle row. """ + pts, stack = row.rlnImageName.split('@') + return stack.replace('.mrcs', f'_p{pts}.mrcs') + + def _batchmanager(monitor, batchSize): + totalFiles = 0 + + with tempfile.TemporaryDirectory() as tmp: + print(f"Using dir: {tmp}") + + batchMgr = BatchManager(batchSize, monitor.newItems(), tmp, + itemFileNameFunc=_filename) + + for batch in batchMgr.generate(): + files = len(os.listdir(batch['path'])) + print(f"Batch {batch['id']} -> {batch['path']}, files: {files}") + totalFiles += files + + return totalFiles + + self.__test_star_streaming(lambda m: _batchmanager(m, 128)) + self.__test_star_streaming(lambda m: _batchmanager(m, 200)) + + def test_star_pipeline(self): + def _pipeline(monitor): + with tempfile.TemporaryDirectory() as tmp: + print(f"Using dir: {tmp}") + p = StarPipelineTester(monitor.fileName, tmp) + p.run() + return p.totalItems + + self.__test_star_streaming(_pipeline) + class TestEPU(unittest.TestCase): """ Tests for EPU class. """ From 82074219e6e5ee77a4bfa918309b2dcd1ae5cd80 Mon Sep 17 00:00:00 2001 From: "Jose M. de la Rosa" Date: Fri, 20 Sep 2024 09:29:05 -0500 Subject: [PATCH 4/8] Allow to test with/without input STAR in streaming --- emtools/tests/test_metadata.py | 37 +++++++++++++++++++--------------- 1 file changed, 21 insertions(+), 16 deletions(-) diff --git a/emtools/tests/test_metadata.py b/emtools/tests/test_metadata.py index 6499685..0c9dbc8 100644 --- a/emtools/tests/test_metadata.py +++ b/emtools/tests/test_metadata.py @@ -248,7 +248,7 @@ def _checkValues(t): os.unlink(ftmp.name) - def __test_star_streaming(self, monitorFunc): + def __test_star_streaming(self, monitorFunc, inputStreaming=True): partStar = testpath('metadata', 'particles_1k.star') if partStar is None: return @@ -267,20 +267,24 @@ def __test_star_streaming(self, monitorFunc): def _write_star_parts(): with StarFile(ftmp) as sfOut: sfOut.writeTable('optics', otable) - sfOut.writeHeader('particles', ptable) - u = int(random.uniform(5, 10)) - s = u * 10 - w = 0 - for i, row in enumerate(ptable): - if i == s: - print(f"{w} rows written.") - ftmp.flush() - time.sleep(3) - u = int(random.uniform(5, 10)) - s = i + u * 10 - w = 0 - sfOut.writeRow(row) - w += 1 + if inputStreaming: + sfOut.writeHeader('particles', ptable) + u = int(random.uniform(5, 10)) + s = u * 10 + w = 0 + for i, row in enumerate(ptable): + if i == s: + print(f"{w} rows written.") + ftmp.flush() + time.sleep(3) + u = int(random.uniform(5, 10)) + s = i + u * 10 + w = 0 + sfOut.writeRow(row) + w += 1 + else: + sfOut.writeTable('particles', ptable) + w = len(ptable) print(f"{w} rows written.") @@ -366,7 +370,8 @@ def _pipeline(monitor): p.run() return p.totalItems - self.__test_star_streaming(_pipeline) + self.__test_star_streaming(_pipeline, inputStreaming=True) + self.__test_star_streaming(_pipeline, inputStreaming=False) class TestEPU(unittest.TestCase): From f8c5ffc1d6f0d3a78ddad62953ebe9e41efd28db Mon Sep 17 00:00:00 2001 From: "Jose M. de la Rosa" Date: Tue, 24 Sep 2024 09:16:19 -0500 Subject: [PATCH 5/8] Updated batch creation and some fixes --- emtools/jobs/__main__.py | 37 ----- emtools/jobs/batch_manager.py | 55 ++++---- emtools/jobs/motioncor.py | 191 -------------------------- emtools/metadata/starfile.py | 12 +- emtools/tests/star_pipeline_tester.py | 8 +- emtools/tests/test_metadata.py | 2 +- emtools/utils/path.py | 9 +- 7 files changed, 50 insertions(+), 264 deletions(-) delete mode 100755 emtools/jobs/__main__.py delete mode 100755 emtools/jobs/motioncor.py diff --git a/emtools/jobs/__main__.py b/emtools/jobs/__main__.py deleted file mode 100755 index e944a47..0000000 --- a/emtools/jobs/__main__.py +++ /dev/null @@ -1,37 +0,0 @@ -#!/usr/bin/env python -# ************************************************************************** -# * -# * Authors: J.M. de la Rosa Trevin (delarosatrevin@gmail.com) -# * -# * This program is free software; you can redistribute it and/or modify -# * it under the terms of the GNU General Public License as published by -# * the Free Software Foundation; either version 3 of the License, or -# * (at your option) any later version. -# * -# * This program is distributed in the hope that it will be useful, -# * but WITHOUT ANY WARRANTY; without even the implied warranty of -# * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# * GNU General Public License for more details. -# * -# ************************************************************************** - -from .motioncor import Main as mc_Main -import argparse - - -parser = argparse.ArgumentParser(prog='emtools.processing') -subparsers = parser.add_subparsers( - help="Utils' command (motioncor)", - dest='command') - -mc_Main.add_arguments(subparsers.add_parser("motioncor")) -parser.add_argument('--verbose', '-v', action='count') -args = parser.parse_args() -cmd = args.command - -if cmd == "motioncor": - mc_Main.run(args) -elif cmd: - raise Exception(f"Unknown option '{cmd}'") -else: - parser.print_help() diff --git a/emtools/jobs/batch_manager.py b/emtools/jobs/batch_manager.py index 8dc4e5f..706777d 100644 --- a/emtools/jobs/batch_manager.py +++ b/emtools/jobs/batch_manager.py @@ -16,7 +16,9 @@ import os from uuid import uuid4 -from emtools.utils import Path, Pretty, Process +from datetime import datetime + +from emtools.utils import Process class BatchManager: @@ -43,38 +45,43 @@ def __init__(self, batchSize, inputItemsIterator, workingPath, self._workingPath = workingPath self._itemFileNameFunc = itemFileNameFunc - def generate(self): - """ Generate batches based on the input items. """ - def _createBatch(items): - batch_id = str(uuid4()) - batch_path = os.path.join(self._workingPath, batch_id) - ts = Pretty.now() + def _createBatchId(self): + # We will use batchCount, before the batch is created + nowPrefix = datetime.now().strftime('%y%m%d-%H%M%S') + countStr = '%02d' % (self._batchCount + 1) + uuidSuffix = str(uuid4()).split('-')[0] + return f"{nowPrefix}_{countStr}_{uuidSuffix}" - print(f"{ts}: Creating batch: {batch_path}") - Process.system(f"rm -rf '{batch_path}'") - Process.system(f"mkdir '{batch_path}'") + def _createBatch(self, items): + batch_id = self._createBatchId() + batch_path = os.path.join(self._workingPath, batch_id) + print(f"Creating batch: {batch_path}") + Process.system(f"rm -rf '{batch_path}'") + Process.system(f"mkdir '{batch_path}'") - for item in items: - fn = self._itemFileNameFunc(item) - baseName = os.path.basename(fn) - os.symlink(os.path.abspath(fn), - os.path.join(batch_path, baseName)) - self._batchCount += 1 - return { - 'items': items, - 'id': batch_id, - 'path': batch_path, - 'index': self._batchCount - } + for item in items: + fn = self._itemFileNameFunc(item) + baseName = os.path.basename(fn) + os.symlink(os.path.abspath(fn), + os.path.join(batch_path, baseName)) + self._batchCount += 1 + return { + 'items': items, + 'id': batch_id, + 'path': batch_path, + 'index': self._batchCount + } + def generate(self): + """ Generate batches based on the input items. """ items = [] for item in self._items: items.append(item) if len(items) == self._batchSize: - yield _createBatch(items) + yield self._createBatch(items) items = [] if items: - yield _createBatch(items) + yield self._createBatch(items) diff --git a/emtools/jobs/motioncor.py b/emtools/jobs/motioncor.py deleted file mode 100755 index 938cc53..0000000 --- a/emtools/jobs/motioncor.py +++ /dev/null @@ -1,191 +0,0 @@ -#!/usr/bin/env python - -import os -import argparse -import threading -from glob import glob -from uuid import uuid4 - -from emtools.utils import Timer, Pipeline -from emtools.metadata import StarFile - - -class McPipeline(Pipeline): - """ Pipeline specific to Motioncor processing. """ - def __init__(self, generateInputFunc, gpuList, outputDir, threads=1, **kwargs): - Pipeline.__init__(self, **kwargs) - self.mc = { - 'program': os.environ['MC_PROGRAM'], - 'args': os.environ['MC_ARGS'], - 'gain': os.environ['MC_GAIN'] - } - self.run_id = f'R-{uuid4()}' - self.outputDir = outputDir - self.scratchDir = kwargs.get('scratchDir', self.outputDir) - self.workingDir = os.path.join(self.scratchDir, self.run_id) - - g = self.addGenerator(generateInputFunc) - f1 = self.addProcessor(g.outputQueue, self.convert) - if threads > 1: - for i in range(1, threads): - self.addProcessor(g.outputQueue, self.convert, - outputQueue=f1.outputQueue) - moveQueue = f1.outputQueue - if gpuList: - mc1 = self.addProcessor(f1.outputQueue, - self.get_motioncor_proc(gpuList[0])) - for gpu in gpuList[1:]: - self.addProcessor(f1.outputQueue, self.get_motioncor_proc(gpu), - outputQueue=mc1.outputQueue) - moveQueue = mc1.outputQueue - - self.addProcessor(moveQueue, self.moveout) - - def convert(self, batch): - images = batch['images'] - thread_id = threading.get_ident() - batch_id = f'{str(uuid4()).split("-")[0]}' - batch_dir = os.path.join(self.workingDir, batch_id) - - print(f'T-{thread_id}: converting {len(images)} images...batch dir: {batch_dir}') - - batch = { - 'images': images, - 'batch_id': batch_id, - 'batch_dir': batch_dir - } - - script = f"rm -rf {batch_dir} && mkdir -p {batch_dir}\n" - - # FIXME: Just create links if input is already in .mrc format - cmd = 'tif2mrc' - - for fn in images: - base = os.path.basename(fn) - if cmd == 'tif2mrc': - base = base.replace('.tiff', '.mrc') - outFn = os.path.join(batch_dir, base) - script += f'{cmd} {fn} {outFn}\n' - - prefix = f'{thread_id}_{batch_id}_tif2mrc' - scriptFn = f'{prefix}_script.sh' - logFn = f'{thread_id}_tif2mrc_log.txt' - - with open(scriptFn, 'w') as f: - f.write(script) - - os.system(f'bash -x {scriptFn} &>> {logFn} && rm {scriptFn}') - - return batch - - def motioncor(self, gpu, batch): - mc = self.mc - batch_id = batch['batch_id'] - batch_dir = batch['batch_dir'] - script = f""" - cd {batch_dir} - {mc['program']} -InMrc ./FoilHole_ -InSuffix fractions.mrc -OutMrc aligned_ -Serial 1 -Gpu {gpu} -Gain {mc['gain']} -LogDir ./ {mc['args']} - """ - prefix = f'motioncor-gpu{gpu}' - print(f'{prefix}: running Motioncor for batch {batch_id} on GPU {gpu}') - - scriptFn = f'{prefix}_{batch_id}_script.sh' - logFn = f'{prefix}_motioncor_log.txt' - with open(scriptFn, 'w') as f: - f.write(script) - os.system(f'bash -x {scriptFn} &>> {logFn} && rm {scriptFn}') - return batch - - def get_motioncor_proc(self, gpu): - def _motioncor(batch): - return self.motioncor(gpu, batch) - - return _motioncor - - def moveout(self, batch): - batch_dir = batch['batch_dir'] - thread_id = threading.get_ident() - print(f'T-{thread_id}: moving output from batch dir: {batch_dir}') - # FIXME: Check what we want to move to output - os.system(f'mv {batch_dir}/* {self.outputDir}/ && rm -rf {batch_dir}') - return batch - - -class Main: - @staticmethod - def add_arguments(parser): - parser.add_argument('input_images', - help='Input images, can be a star file, a txt file or ' - 'a pattern.') - - parser.add_argument('--convert', choices=['default', 'tif2mrc', 'cp']) - - parser.add_argument('--nimages', '-n', nargs='?', type=int, - default=argparse.SUPPRESS) - parser.add_argument('--output', '-o', default='output') - parser.add_argument('-j', type=int, default=1, - help='Number of parallel threads') - parser.add_argument('--batch', '-b', type=int, default=0, - help='Batch size') - parser.add_argument('--gpu', default='', nargs='?', - help='Gpu list, separated by comma.' - 'E.g --gpu 0,1') - parser.add_argument('--scratch', default='', - help='Scratch directory to do intermediate I/O') - - @staticmethod - def run(args): - n = args.nimages - output = args.output - - if args.input_images.endswith('.star'): - input_star = args.input_images - - with StarFile(input_star) as sf: - # Read table in a different order as they appear in file - # also before the getTableNames() call that create the offsets - tableMovies = sf.getTable('movies') - - all_images = [row.rlnMicrographMovieName for row in tableMovies] - elif '*' in args.input_images: - all_images = glob(args.input_images) - else: - raise Exception('Please provide input as star file or files pattern (with * in it).') - - input_images = all_images[:n] - - run_id = f'R-{uuid4()}' - - print(f' run_id: {run_id}') - print(f' images: {len(input_images)}') - print(f' output: {output}') - print(f'threads: {args.j}') - print(f' gpus: {args.gpu}') - print(f' batch: {args.batch or len(input_images)}') - - wd = args.scratch if args.scratch else output - intermediate = os.path.join(wd, run_id) - - def generate(): - b = args.batch - if b: - n = len(input_images) // b - for i in range(n): - yield {'images': input_images[i*b:(i+1)*b]} - else: - yield {'images': input_images} - - os.system(f'rm -rf {output} && mkdir {output}') - os.system(f'rm -rf {intermediate} && mkdir {intermediate}') - - t = Timer() - - gpuList = args.gpu.split(',') - mc = McPipeline(generate, gpuList, output, threads=args.j, debug=False) - mc.run() - - os.system(f'rm -rf {intermediate}') - - t.toc() - - diff --git a/emtools/metadata/starfile.py b/emtools/metadata/starfile.py index 131beb4..baeb930 100644 --- a/emtools/metadata/starfile.py +++ b/emtools/metadata/starfile.py @@ -47,10 +47,10 @@ class StarFile(AbstractContextManager): @staticmethod def printTable(table, tableName=''): - w = StarFile(sys.stdout) + w = StarFile(sys.stdout, closeFile=False) w.writeTable(tableName, table, singleRow=len(table) <= 1) - def __init__(self, inputFile, mode='r'): + def __init__(self, inputFile, mode='r', **kwargs): """ Args: inputFile: can be a str with the file path or a file object. @@ -58,6 +58,7 @@ def __init__(self, inputFile, mode='r'): the mode will be ignored. """ self._file = self.__loadFile(inputFile, mode) + self._closeFile = kwargs.get('closeFile', True) # While parsing the file, store the offsets for data_ blocks # for quick access when need to load data rows @@ -305,7 +306,8 @@ def _iterRowLines(self): def close(self): if getattr(self, '_file', None): - self._file.close() + if self._closeFile: + self._file.close() self._file = None # ---------------------- Writer functions -------------------------------- @@ -336,7 +338,7 @@ def writeHeader(self, tableName, table): for col in self._columns: self._file.write("_%s \n" % col.getName()) - def _writeRowValues(self, values): + def writeRowValues(self, values): """ Write to file a line for these row values. Order should be ensured that is the same of the expected columns. """ @@ -350,7 +352,7 @@ def writeRow(self, row): """ Write to file the line for this row. Row should be an instance of the expected Row class. """ - self._writeRowValues(row._asdict().values()) + self.writeRowValues(row._asdict().values()) def _writeNewline(self): self._file.write('\n') diff --git a/emtools/tests/star_pipeline_tester.py b/emtools/tests/star_pipeline_tester.py index 4f46474..3331396 100644 --- a/emtools/tests/star_pipeline_tester.py +++ b/emtools/tests/star_pipeline_tester.py @@ -26,14 +26,14 @@ class StarPipelineTester(Pipeline): """ Helper class to test Pipeline behaviour base on an input STAR file generated in streaming. """ - def __init__(self, inputStar, workingDir, **kwargs): + def __init__(self, inputStar, outputDir, **kwargs): Pipeline.__init__(self) self.threads = kwargs.get('threads', 4) self.batchSize = kwargs.get('batchSize', 128) - self.workingDir = workingDir + self.outputDir = outputDir self.inputStar = inputStar - self.outputStar = os.path.join(workingDir, 'output.star') + self.outputStar = os.path.join(outputDir, 'output.star') self._sf = None self._file = None self.totalItems = 0 @@ -45,7 +45,7 @@ def __init__(self, inputStar, workingDir, **kwargs): lambda row: row.rlnImageId, timeout=30) - batchMgr = BatchManager(self.batchSize, monitor.newItems(), workingDir, + batchMgr = BatchManager(self.batchSize, monitor.newItems(), outputDir, itemFileNameFunc=self._filename) g = self.addGenerator(batchMgr.generate) diff --git a/emtools/tests/test_metadata.py b/emtools/tests/test_metadata.py index 0c9dbc8..cdf9ab4 100644 --- a/emtools/tests/test_metadata.py +++ b/emtools/tests/test_metadata.py @@ -370,7 +370,7 @@ def _pipeline(monitor): p.run() return p.totalItems - self.__test_star_streaming(_pipeline, inputStreaming=True) + #self.__test_star_streaming(_pipeline, inputStreaming=True) self.__test_star_streaming(_pipeline, inputStreaming=False) diff --git a/emtools/utils/path.py b/emtools/utils/path.py index b1049be..e256170 100644 --- a/emtools/utils/path.py +++ b/emtools/utils/path.py @@ -166,8 +166,8 @@ def _mkdir(d): @staticmethod def replaceExt(filename, newExt): """ Replace the current path extension(from last .) - with a new one. The new one should not contain the .""" - return Path.removeExt(filename) + '.' + newExt + with a new one. The new one should contain the .""" + return Path.removeExt(filename) + newExt @staticmethod def replaceBaseExt(filename, newExt): @@ -186,6 +186,11 @@ def removeExt(filename): """ Remove extension from basename """ return os.path.splitext(filename)[0] + @staticmethod + def getExt(filename): + """ Get filename extension """ + return os.path.splitext(filename)[1] + @staticmethod def exists(path): """ Just avoid empty or None path to raise exception From 3bc400583a0c251dfedad150c94f3e3455b52050 Mon Sep 17 00:00:00 2001 From: "Jose M. de la Rosa" Date: Fri, 4 Oct 2024 09:49:23 -0500 Subject: [PATCH 6/8] Bumped version to 0.1.1 --- emtools/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/emtools/__init__.py b/emtools/__init__.py index aed35e0..d4692f3 100644 --- a/emtools/__init__.py +++ b/emtools/__init__.py @@ -24,5 +24,5 @@ # * # ************************************************************************** -__version__ = '0.1.0' +__version__ = '0.1.1' From 17836518704ad41e2590f2ef6fa98e3804b3c653 Mon Sep 17 00:00:00 2001 From: "Jose M. de la Rosa" Date: Fri, 4 Oct 2024 10:23:43 -0500 Subject: [PATCH 7/8] Fixed error --- emtools/scripts/emt-scipion-otf.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/emtools/scripts/emt-scipion-otf.py b/emtools/scripts/emt-scipion-otf.py index 07188d6..41781f5 100755 --- a/emtools/scripts/emt-scipion-otf.py +++ b/emtools/scripts/emt-scipion-otf.py @@ -128,8 +128,9 @@ def batches(self): def _updateBatch(self, batch): self.lock.acquire() self.batches[batch['id']] = batch - _save_json(self._json) + self._save_json(self._json) self.lock.release() + def _createBatch2D(self, gridsquare, subsetParts): firstId, lastId = subsetParts[0].getObjId(), subsetParts[-1].getObjId() suffix = f"{gridsquare}: {firstId} - {lastId}" From 3afe1fcf13c5a5035333d752d8031777a1243189 Mon Sep 17 00:00:00 2001 From: "Jose M. de la Rosa" Date: Fri, 4 Oct 2024 10:59:03 -0500 Subject: [PATCH 8/8] Reverted changes related to Scipion-OTF --- emtools/scripts/emt-scipion-otf.py | 369 +++++++++++------------------ 1 file changed, 144 insertions(+), 225 deletions(-) diff --git a/emtools/scripts/emt-scipion-otf.py b/emtools/scripts/emt-scipion-otf.py index 41781f5..6155cdf 100755 --- a/emtools/scripts/emt-scipion-otf.py +++ b/emtools/scripts/emt-scipion-otf.py @@ -15,16 +15,17 @@ # * # ************************************************************************** +import sys import json import os.path import argparse -import threading +import ast import time +from collections import OrderedDict import datetime as dt -from pprint import pprint +import re -from emtools.utils import Process, Color, System, Pretty -from emtools.jobs import Pipeline +from emtools.utils import Process, Color, System, Pipeline from emtools.metadata import EPU, SqliteFile, StarFile, Table import pyworkflow as pw @@ -42,6 +43,7 @@ # Some global workflow parameters params = {} +OTF_FILE = 'scipion-otf.json' cwd = os.getcwd() @@ -56,9 +58,9 @@ def loadGpus(): ngpu = len(gpuProc) half = int(ngpu/2) gpus = list(range(ngpu)) - # Take half of gpus for Motioncor and the other half for cryolo + # Take half of gpus for Motioncor and the other half for 2D params['mcGpus'] = gpus[:half] - params['cryoloGpus'] = gpus[half:] + params['cls2dGpus'] = gpus[half:] def calculateBoxSize(protCryolo): @@ -75,9 +77,8 @@ def calculateBoxSize(protCryolo): # Calculate the boxsize based on double of cryolo particle estimation # and recommended EMAN's boxsizes for performance - ps = protCryolo.inputMicrographs.get().getSamplingRate() params['partSizePx'] = protCryolo.boxsize.get() - params['partSizeA'] = params['partSizePx'] * ps + params['partSizeA'] = params['partSizePx'] * protCryolo.inputMicrographs.get().getSamplingRate() boxSize = max(params['partSizePx'] * 2.5, 100) for bs in EMAN_BOXSIZES: if bs > boxSize: @@ -86,121 +87,74 @@ def calculateBoxSize(protCryolo): params['boxSize'] = boxSize -class Rln2DPipeline(Pipeline): - def __init__(self, wf, protExtract, particleSizeA, gpus, dry=False): - """ - 2D pipeline for a given workflow, after particles extraction. - - Args: - wf: workflow instance. - protExtract: particles extraction protocol. - particleSizeA: Size of the particles in Angstroms - gpus: list with GPUs that will be used for 2D classification - """ - Pipeline.__init__(self) - print(f"\n>>> Running 2D pipeline: input extract " - f"{Color.bold(protExtract.getRunName())}") - - self.lock = threading.Lock() - self._jsonFile = 'scipion-otf-2d.json' - self._json = self._load_json() - pprint(self._json) - - g = self.addGenerator(self._generate2D) - self.addProcessor(g.outputQueue, self._run2D) - - - def _load_json(self): - self._json = {'groups': {}, 'gridsquares': {}} - - if os.path.exists(self._jsonFile): - with open(self._jsonFile) as f: - self._json = json.load(f) - - def _save_json(self): - with open(self._jsonFile, 'w') as f: - json.dump(self._json, f, indent=4) - - @property - def batches(self): - return self._json['batches'] - - def _updateBatch(self, batch): - self.lock.acquire() - self.batches[batch['id']] = batch - self._save_json(self._json) - self.lock.release() - - def _createBatch2D(self, gridsquare, subsetParts): - firstId, lastId = subsetParts[0].getObjId(), subsetParts[-1].getObjId() - suffix = f"{gridsquare}: {firstId} - {lastId}" - batch = { - 'id': str(len(self.batches) + 1), - 'gridsquares': [gridsquare], - 'suffix': suffix, - 'firstId': firstId, - 'lastId': lastId +def load_otf(): + otf = {'2d': {}} + if os.path.exists(OTF_FILE): + with open(OTF_FILE) as f: + otf = json.load(f) + + return otf + + +def save_otf(otf): + with open(OTF_FILE, 'w') as f: + json.dump(otf, f, indent=4) + + +def run2DPipeline(wf, protExtract): + print(f"\n>>> Running 2D pipeline: input extract " + f"{Color.bold(protExtract.getRunName())}") + + otf = load_otf() + + def _createBatch2D(gridsquare, subsetParts): + rangeStr = f"{subsetParts[0].getObjId()} - {subsetParts[-1].getObjId()}" + print(f"Creating subset with range: {rangeStr}") + protSubset = wf.createProtocol( + 'pwem.protocols.ProtUserSubSet', + objLabel=f'subset: {gridsquare} : {rangeStr}', + ) + _setPointer(protSubset.inputObject, protExtract, OUT_PART) + wf.saveProtocol(protSubset) + protSubset.makePathsAndClean() + # Create subset particles as output for the protocol + inputParticles = protExtract.outputParticles + outputParticles = protSubset._createSetOfParticles() + outputParticles.copyInfo(inputParticles) + for particle in subsetParts: + outputParticles.append(particle) + protSubset._defineOutputs(outputParticles=outputParticles) + protSubset._defineTransformRelation(inputParticles, outputParticles) + protSubset.setStatus(STATUS_FINISHED) + wf.project._storeProtocol(protSubset) + + protRelion2D = wf.createProtocol( + 'relion.protocols.ProtRelionClassify2D', + objLabel=f'relion2d: {gridsquare}', + maskDiameterA=round(params['partSizeA'] * 1.5), + numberOfClasses=200, + extraParams='--maxsig 50', + pooledParticles=50, + doGpu=True, + gpusToUse=','.join(str(g) for g in params['cls2dGpus']), + numberOfThreads=32, + numberOfMpi=1, + allParticlesRam=True, + useGradientAlg=True, + ) + + _setPointer(protRelion2D.inputParticles, protSubset, OUT_PART) + wf.saveProtocol(protRelion2D) + otf['2d'][gridsquare] = { + 'runId': protRelion2D.getObjId(), + 'runName': protRelion2D.getRunName(), + 'runDir': protRelion2D.getWorkingDir() } + save_otf(otf) - print(f"Creating subset for {suffix}") - if dry: - print("...running in DRY MODE.") - else: - protSubset = wf.createProtocol( - 'pwem.protocols.ProtUserSubSet', - objLabel=f'subset: {suffix}', - ) - _setPointer(protSubset.inputObject, protExtract, OUT_PART) - wf.saveProtocol(protSubset) - protSubset.makePathsAndClean() - # Create subset particles as output for the protocol - inputParticles = protExtract.outputParticles - outputParticles = protSubset._createSetOfParticles() - outputParticles.copyInfo(inputParticles) - for particle in subsetParts: - outputParticles.append(particle) - protSubset._defineOutputs(outputParticles=outputParticles) - protSubset._defineTransformRelation(inputParticles, outputParticles) - protSubset.setStatus(STATUS_FINISHED) - wf.project._storeProtocol(protSubset) - - protRelion2D = wf.createProtocol( - 'relion.protocols.ProtRelionClassify2D', - objLabel=f'relion2d: {suffix}', - maskDiameterA=round(particleSizeA * 1.5), - numberOfClasses=200, - extraParams='--maxsig 50', - pooledParticles=50, - doGpu=True, - gpusToUse=','.join(str(g) for g in gpus), - numberOfThreads=32, - numberOfMpi=1, - allParticlesRam=True, - useGradientAlg=True, - ) - - _setPointer(protRelion2D.inputParticles, protSubset, OUT_PART) - wf.saveProtocol(protRelion2D) - - lock.acquire() - - def _run(prot): - return { - 'runId': prot.getObjId(), - 'runName': prot.getRunName(), - 'runDir': prot.getWorkingDir() - } - - batch.update( - {'runs': [_run(p) for p in [protSubset, protRelion2D]]}) - otf_2d['gridsquares'][gridsquare] = batch['id'] - self._updateBatch(batch) - - batch['prot'] = protRelion2D - - return batch - - def _generate2D(self): + return {'gs': gridsquare, 'prot': protRelion2D} + + def _generate2D(): """ Generated subset of 2D from the outputParticles from extract protocol. Subsets will be created based on the GridSquare of the micrographs. """ lastParticleIndex = 0 @@ -208,11 +162,14 @@ def _generate2D(self): # Classify in batches lastMt = 0 extractSqliteFn = protExtract.outputParticles.getFileName() - tmpSqlite = os.path.abspath(extractSqliteFn).replace('/', '__') - tmpSqliteFn = os.path.join('/tmp', tmpSqlite) + tmpSqliteFn = '/tmp/particles.sqlite' while True: - print("Checking new particles!!!") + if lastGs: # Not the first time, let's wait + print("Sleeping...") + time.sleep(30) # wait for 5 minutes before checking for new jobs + + print("Wake up!!!") mt = os.path.getmtime(extractSqliteFn) if mt > lastMt: # Let's iterate over the particles to check if there is a @@ -223,8 +180,6 @@ def _generate2D(self): Process.system(f'rm -rf {tmpSqliteFn}') SqliteFile.copyDb(extractSqliteFn, tmpSqliteFn, tries=10, wait=30) print("Copy done!") - else: - print("Particles db has not changed since: ", Pretty.timestamp(lastMt)) parts = SetOfParticles(filename=tmpSqliteFn) subsetParts = [] @@ -275,25 +230,25 @@ def _generate2D(self): break lastMt = mt - print("Sleeping...") - time.sleep(30) # wait for 5 minutes before checking for new jobs - def _run2D(self, batch): - # In Dry Mode the is not prot in the batch dict - if protRelion2D := batch.get('prot', None): - #wf.launchProtocol(protRelion2D, wait=True) - wf.saveProtocol(protRelion2D) + def _run2D(batch): + protRelion2D = batch['prot'] + wf.launchProtocol(protRelion2D, wait=True) + + protRelion2DSelect = wf.createProtocol( + 'relion.protocols.ProtRelionSelectClasses2D', + objLabel=f"select 2d - {batch['gs']}", + minThreshold=0.05, + minResolution=30.0, + ) - protRelion2DSelect = wf.createProtocol( - 'relion.protocols.ProtRelionSelectClasses2D', - objLabel=f"select 2d - {batch['gs']}", - minThreshold=0.05, - minResolution=30.0, - ) + protRelion2DSelect.inputProtocol.set(protRelion2D) + wf.launchProtocol(protRelion2DSelect, wait=True) - protRelion2DSelect.inputProtocol.set(protRelion2D) - #wf.launchProtocol(protRelion2DSelect, wait=True) - wf.saveProtocol(protRelion2DSelect) + ppl = Pipeline() + g = ppl.addGenerator(_generate2D) + ppl.addProcessor(g.outputQueue, _run2D) + ppl.run() def clean_project(workingDir): @@ -312,11 +267,27 @@ def create_project(workingDir): def _path(*p): return os.path.join(workingDir, *p) + """ + {"acquisition": {"voltage": 200, "magnification": 79000, "pixel_size": 1.044, "dose": 1.063, "cs": 2.7}} + """ + scipionOptsFn = _path('scipion_otf_options.json') relionOptsFn = _path('relion_it_options.py') - with open(scipionOptsFn) as f: - opts = json.load(f) + if os.path.exists(scipionOptsFn): + with open(scipionOptsFn) as f: + opts = json.load(f) + + elif os.path.exists(relionOptsFn): + with open(_path('relion_it_options.py')) as f: + relionOpts = OrderedDict(ast.literal_eval(f.read())) + opts = {'acquisition': { + 'voltage': relionOpts['prep__importmovies__kV'], + 'pixel_size': relionOpts['prep__importmovies__angpix'], + 'cs': relionOpts['prep__importmovies__Cs'], + 'magnification': 130000, + 'dose': relionOpts['prep__motioncorr__dose_per_frame'] + }} acq = opts['acquisition'] picking = opts.get('picking', {}) @@ -392,9 +363,7 @@ def _path(*p): wf.launchProtocol(protCTF, wait={OUT_CTFS: 16}) protCryoloImport = None - #cryoloInputModelFrom = 0 # General model (low pass filtered) - cryoloInputModelFrom = 1 # Denoised with Janni - + cryoloInputModelFrom = 0 # General model (low pass filtered) if 'cryolo_model' in picking: protCryoloImport = wf.createProtocol( 'sphire.protocols.SphireProtCryoloImport', @@ -405,13 +374,13 @@ def _path(*p): cryoloInputModelFrom = 2 # Other protCryolo = wf.createProtocol( - 'sphire.protocols.SphireProtCRYOLOPickingTasks', + 'sphire.protocols.SphireProtCRYOLOPicking', objLabel='cryolo picking', boxSize=0, # let cryolo estimate the box size conservPickVar=0.05, # less conservative than default 0.3 - useGpu=True, # use gpu for speed up janni denoising + useGpu=False, # use cpu for picking, fast enough numCpus=8, - gpuList=' '.join(str(g) for g in params['cryoloGpus']), + gpuList='', streamingBatchSize=16, streamingSleepOnWait=60, numberOfThreads=1, @@ -445,33 +414,33 @@ def _path(*p): _setPointer(protRelionExtract.inputCoordinates, protCryolo, OUT_COORD) # Ensure there are at least some particles wf.launchProtocol(protRelionExtract, wait={OUT_PART: 100}) + run2DPipeline(wf, protRelionExtract) -def continue_2d(workingDir, gpus, dry): +def continue_project(workingDir): print(f"Loading project from {workingDir}") project = Project(pw.Config.getDomain(), workingDir) project.load() wf = Workflow(project) - protocols = {} + loadGpus() + + protExtract = protCryolo = None + + for run in project.getRuns(): + clsName = run.getClassName() + print(f"Run {run.getObjId()}: {clsName}") + if clsName == 'ProtRelionExtractParticles': + protExtract = run + elif clsName.startswith('SphireProtCRYOLOPicking'): + protCryolo = run - def _loadProtocols(): - for run in project.getRuns(): - if clsName.startswith('SphireProtCRYOLOPicking'): - protocols['picking'] = run - elif run.getClassName() == 'ProtRelionExtractParticles': - protocols['extract'] = run - return protocols + if not protExtract.isActive(): + print("Re-running extract protocol...") + wf.launchProtocol(protExtract, wait={OUT_PART: 100}) - _loadProtocols() - protExtract = protocols.get('extract', None) - while protExtract is None: - time.sleep(60) # wait for 5 mins - _loadProtocols() - protExtract = protocols.get('extract', None) + calculateBoxSize(protCryolo) - wf.wait(protExtract, wait={OUT_PART: 100}) - calculateBoxSize(protocols['picking']) - Rln2DPipeline(wf, protExtract, params['partSizeA'], gpus, dry=dry).run() + run2DPipeline(wf, protExtract) def restart(workingDir, args): @@ -507,7 +476,7 @@ def print_protocol(workingDir, protId): if protId == 'all': for prot in project.getRuns(iterate=True): clsName = prot.getClassName() - print(f"- {prot.getObjId():>6} {prot.getStatus():<10} {clsName:<30} - {prot.getRunName()}") + print(f"- {prot.getObjId():>8} {prot.getStatus():<10} {clsName}") else: prot = project.getProtocol(int(protId)) if prot is None: @@ -602,9 +571,6 @@ def write_coordinates(micStarFn, prot): for coord in coords.iterItems(orderBy='_micId', direction='ASC'): micId = coord.getMicId() - if micId not in micDict: - continue - if micId not in micIds: micIds.add(micId) micFn = micDict[micId] @@ -630,44 +596,9 @@ def write_coordinates(micStarFn, prot): def print_prot(prot, label='Protocol'): print(f">>> {label} {prot.getObjId():>6} {prot.getClassName():<30} {prot.getRunName()}") -def match_epu_xmls(workingDir, micStarFn='micrographs_ctf.star'): - total = 0 - missing = 0 - xmlFolder = os.path.join('EPU', 'MicsXML') - Process.system(f'rm -rf {xmlFolder} && mkdir {xmlFolder}') - with StarFile(micStarFn) as sf: - for row in sf.iterTable('micrographs'): - _, name = row.rlnMicrographMovieName.split('Images-Disc1_') - xmlName = os.path.join('Images-Disc1', name.replace('_EER.eer', '.xml')) - xmlFile = os.path.join('EPU', xmlName) - total += 1 - micBase = os.path.basename(row.rlnMicrographName) - if os.path.exists(xmlFile): - linkName = os.path.join(xmlFolder, micBase.replace('.mrc', '.xml')) - cmd = f"ln -s {xmlFile.replace('EPU/', '../')} {linkName}" - Process.system(cmd) - else: - print(micBase, '->', Color.red(xmlFile)) - missing += 1 - - print(f"Missing {Color.red(missing)} XMLs out of {Color.bold(total)} micrographs") - - # mics = Table(['rlnMicrographName', - # 'rlnOpticsGroup', - # 'rlnCtfImage', - # 'rlnDefocusU', - # 'rlnDefocusV', - # 'rlnCtfAstigmatism', - # 'rlnDefocusAngle', - # 'rlnCtfFigureOfMerit', - # 'rlnCtfMaxResolution', - # 'rlnMicrographMovieName']) - # ps = firstMic.getSamplingRate() def write_stars(workingDir, ids=None): - """ Write star files for Relion. Generates micrographs_ctf.star, - coordinates.star and Coordinates folder. - """ + """ Restart one or more protocols. """ print("ids", ids) def _get_keys(tokens): @@ -685,12 +616,8 @@ def _get_keys(tokens): idsDict = {k: v for k, v in _get_keys(ids)} if 'ctfs' in idsDict: protCtf = project.getProtocol(idsDict['ctfs']) - if protCtf is None: - raise Exception(f"There is no CTF protocol with id {idsDict['ctfs']}") if 'picking' in idsDict: protPicking = project.getProtocol(idsDict['picking']) - if protPicking is None: - raise Exception(f"There is no CTF protocol with id {idsDict['picking']}") else: # Default option when running OTF that we export STAR files # from CTFFind and Cryolo runs @@ -777,7 +704,7 @@ def main(): p = argparse.ArgumentParser(prog='scipion-otf') g = p.add_mutually_exclusive_group() - g.add_argument('--create', metavar='Scipion project path', + g.add_argument('--create', action='store_true', help="Create a new Scipion project in the working " "directory. This will overwrite any existing " "'scipion' folder there.") @@ -793,30 +720,24 @@ def main(): help="Some test code") g.add_argument('--clean', action="store_true", help="Clean Scipion project files/folders.") - g.add_argument('--continue_2d', nargs='+', type=int, - metavar='GPUs') - + g.add_argument('--continue_2d', action="store_true") g.add_argument('--write_stars', default=argparse.SUPPRESS, nargs='*', help="Generate STAR micrographs and particles STAR files." "By default, it will get the first CTFfind protocol for ctfs" "and the Cryolo picking for picking. One can pass a string" "with the protocol ids for ctfs and/or picking. For example:" "--write_starts 'ctfs=1524 picking=1711'") - g.add_argument('--match_epu_xmls', action='store_true') - g.add_argument('--clone_project', nargs=2, metavar=('SRC', 'DST'), help="Clone an existing Scipion project") g.add_argument('--fix_run_links', metavar='RUNS_SRC', help="Fix links of Runs of this project from another one.") g.add_argument('--print_protocol', '-p', help="Print the values of a given protocol.") - p.add_argument('--dry', action='store_true', - help="Do not excute operations, just show the steps.") args = p.parse_args() if args.create: - create_project(args.create) + create_project(cwd) elif args.restart: restart(cwd, args.restart) elif args.restart_rankers: @@ -831,8 +752,8 @@ def main(): clean_project(cwd) elif 'write_stars' in args: write_stars(cwd, ids=args.write_stars) - elif gpus := args.continue_2d: - continue_2d(cwd, gpus, args.dry) + elif args.continue_2d: + continue_project(cwd) elif args.clone_project: src, dst = args.clone_project clone_project(src, dst) @@ -840,8 +761,6 @@ def main(): fix_run_links(cwd, args.fix_run_links) elif protId := args.print_protocol: print_protocol(cwd, protId) - elif args.match_epu_xmls: - match_epu_xmls(cwd) else: # by default open the GUI from pyworkflow.gui.project import ProjectWindow ProjectWindow(cwd).show()