Skip to content

Commit

Permalink
implemented ParallelBatch in another attempt to fix issue #4
Browse files Browse the repository at this point in the history
  • Loading branch information
proycon committed Jul 6, 2016
1 parent 5838d5c commit 468750d
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 20 deletions.
44 changes: 41 additions & 3 deletions luiginlp/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,7 @@ def inherit_parameters(Class, *ChildClasses):
if isinstance(attr,luigi.Parameter) and not hasattr(Class, key):
setattr(Class,key, attr)

def outputfrominput(self, inputformat, stripextension, addextension, outputdirparam='outputdir'):
def outputfrominput(self, inputformat, stripextension, addextension, replaceinputdirparam='replaceinputdir', outputdirparam='outputdir'):
"""Derives the output filename from the input filename, removing the input extension and adding the output extension. Supports outputdir parameter."""

if not hasattr(self,'in_' + inputformat):
Expand All @@ -448,15 +448,23 @@ def outputfrominput(self, inputformat, stripextension, addextension, outputdirpa
if hasattr(self,outputdirparam):
outputdir = getattr(self,outputdirparam)
if outputdir and outputdir != '.':
return TargetInfo(self, os.path.join(outputdir, os.path.basename(replaceextension(inputfilename, stripextension,addextension))))
return TargetInfo(self, replaceextension(inputfilename, stripextension,addextension))
if hasattr(self, replaceinputdirparam):
replaceinputdir = getattr(self,replaceinputdirparam)
if replaceinputdir:
if inputfilename.startswith(replaceinputdir):
return TargetInfo(self, os.path.join(outputdir, os.path.basename(replaceextension(inputfilename[len(replaceinputdir):], stripextension,addextension))))
else:
return TargetInfo(self, os.path.join(outputdir, os.path.basename(replaceextension(inputfilename, stripextension,addextension))))
else:
return TargetInfo(self, replaceextension(inputfilename, stripextension,addextension))


class StandardWorkflowComponent(WorkflowComponent):
"""A workflow component that takes one inputfile"""

inputfile = luigi.Parameter()
outputdir = luigi.Parameter(default="")
replaceinputdir = luigi.Parameter(default="")

class TargetInfo(sciluigi.TargetInfo):
pass
Expand All @@ -479,6 +487,36 @@ def __init__(self, *args, **kwargs):
def __hash__(self):
return hash(tuple(sorted(self.items())))

class ParallelBatch(luigi.Task):
"""Meta workflow"""
inputfiles = luigi.Parameter()
component = luigi.Parameter()
passparameters = luigi.Parameter(default=PassParameters())

def requires(self):
if isinstance(self.passparameters, str):
self.passparameters = PassParameters(json.loads(self.passparameters.replace("'",'"')))
elif isinstance(self.passparameters, dict):
self.passparameters = PassParameters(self.passparameters)
elif not isinstance(self.passparameters, PassParameters):
raise TypeError("Keywork argument passparameters must be instance of PassParameters, got " + repr(self.passparameters))
tasks = []
ComponentClass = getcomponentclass(self.component)
if isinstance(self.inputfiles, str):
self.inputfiles = self.inputfiles.split(',')
for inputfile in self.inputfiles:
tasks.append( ComponentClass(inputfile=inputfile,**self.passparameters))
return tasks

def run(self):
if isinstance(self.inputfiles, str):
self.inputfiles = self.inputfiles.split(',')
with self.output().open('w') as f:
f.write("\n".join(self.inputfiles))

def output(self):
return luigi.LocalTarget('.parallelbatch-' + self.component + '-' + str(hash(self)) + '.done')

class Parallel(sciluigi.WorkflowTask):
"""Meta workflow"""
inputfiles = luigi.Parameter()
Expand Down
23 changes: 11 additions & 12 deletions luiginlp/modules/folia.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import natsort
import subprocess
import pickle
from luiginlp.engine import Task, TargetInfo, InputFormat, StandardWorkflowComponent, registercomponent, InputSlot, Parameter, BoolParameter, IntParameter
from luiginlp.engine import Task, TargetInfo, InputFormat, StandardWorkflowComponent, registercomponent, InputSlot, Parameter, BoolParameter, IntParameter, PassParameters, ParallelBatch
from luiginlp.util import getlog, recursive_glob, waitforslot, waitforcompletion, replaceextension, chunk
from luiginlp.modules.openconvert import OpenConvert_folia

Expand Down Expand Up @@ -167,28 +167,27 @@ def on_failure(self, exception):

def run(self):
#gather input files
batchsize = 1000
if self.outputdir and not os.path.exists(self.outputdir): os.makedirs(self.outputdir)

if os.path.exists(self.out_state().path):
log.info("Loading index...")
log.info("Collecting input files from saved state...")
with open(self.out_state().path,'rb') as f:
inputfiles = pickle.load(f)
else:
log.info("Collecting input files...")
inputfiles = recursive_glob(self.in_foliadir().path, '*.' + self.folia_extension)
log.info("Collected " + str(len(inputfiles)) + " input files")
with open(self.out_state().path,'wb') as f:
pickle.dump(inputfiles,f)

with open(self.out_state().path,'wb') as f:
pickle.dump(inputfiles[batchsize:],f)

log.info("Scheduling validators, " + str(len(inputfiles)) + " left...")
for taskbatch in chunk(inputfiles,batchsize): #schedule in batches of 1000 so we don't overload the scheduler
if self.outputdir:
yield [ FoliaValidator(inputfile=inputfile,folia_extension=self.folia_extension,outputdir=os.path.dirname(inputfile).replace(self.in_foliadir().path,self.outputdir)) for inputfile in taskbatch ]
else:
yield [ FoliaValidator(inputfile=inputfile,folia_extension=self.folia_extension) for inputfile in taskbatch ]
log.info("Scheduling validators")
if self.outputdir:
passparameters = PassParameters(folia_extension=self.folia_extension,replaceinputdir=self.in_foliadir().path, outputdir=self.outputdir)
else:
passparameters = PassParameters(folia_extension=self.folia_extension)

for inputfiles_batch in chunk(inputfiles,1000): #schedule in batches of 1000 so we don't overload the scheduler
yield ParallelBatch(component='FoliaValidator',inputfiles=inputfiles_batch,passparameters=passparameters)

log.info("Collecting output files...")
#Gather all output files
Expand Down
14 changes: 9 additions & 5 deletions test/scaletest.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
import glob
import shutil
import luiginlp
from luiginlp.engine import Task, StandardWorkflowComponent, PassParameters, InputFormat, InputComponent, InputSlot, Parameter, IntParameter, registercomponent, Parallel
import luigi
import json
from luiginlp.engine import Task, StandardWorkflowComponent, PassParameters, InputFormat, InputComponent, InputSlot, Parameter, IntParameter, registercomponent, ParallelBatch
from luiginlp.util import getlog, chunk

log = getlog()
Expand Down Expand Up @@ -49,6 +51,8 @@ def accepts(self):





class ScaleTestTask(Task):

in_txtdir = InputSlot()
Expand All @@ -67,11 +71,11 @@ def run(self):
log.info("Collected " + str(len(inputfiles)) + " input files")

#inception aka dynamic dependencies: we yield a list of tasks to perform which could not have been predicted statically
#in this case we run the OCR_singlepage component for each input file in the directory
for inputfiles_chunk in chunk(inputfiles, 1000):
yield ParallelBatch(component='Voweleater',inputfiles=','.join(inputfiles_chunk),passparameters=PassParameters(outputdir=self.out_txtdir().path))

chunks = [ Parallel(component='Voweleater',inputfiles=','.join(inputfiles_chunk),passparameters=PassParameters(outputdir=self.out_txtdir().path)) for inputfiles_chunk in chunk(inputfiles, 1000) ]
log.info("Scheduling chunks: " + str(len(chunks)))
yield chunks
#log.info("Scheduling chunks: " + str(len(chunks)))
#yield chunks

#yield [ Voweleater(inputfile=inputfile,outputdir=self.out_txtdir().path) for inputfile in inputfiles ]

Expand Down

0 comments on commit 468750d

Please sign in to comment.