Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batch: Process-parallel directory scan and initial file read #426

Draft
wants to merge 9 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions loki/batch/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"""

from loki.batch.configure import * # noqa
from loki.batch.executor import * # noqa
from loki.batch.item import * # noqa
from loki.batch.pipeline import * # noqa
from loki.batch.scheduler import * # noqa
Expand Down
44 changes: 44 additions & 0 deletions loki/batch/executor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# (C) Copyright 2018- ECMWF.
# This software is licensed under the terms of the Apache Licence Version 2.0
# which can be obtained at http://www.apache.org/licenses/LICENSE-2.0.
# In applying this licence, ECMWF does not waive the privileges and immunities
# granted to it by virtue of its status as an intergovernmental organisation
# nor does it submit to any jurisdiction.

"""
A dummy "executor" utility that allows parallel non-threaded execution
under the same API as thread or ProcessPoolExecutors.
"""

from concurrent.futures import Executor, Future

__all__ = ['SerialExecutor']


class SerialExecutor(Executor):
"""
A dummy "executor" utility that allows parallel non-threaded
execution with the same API as a ``ProcessPoolExecutors``.
"""

def submit(self, fn, *args, **kwargs): # pylint: disable=arguments-differ
"""
Executes the callable, *fn* as ``fn(*args, **kwargs)``
and wraps the return value in a *Future* object.
"""
f = Future()
try:
# Execute function on given args
result = fn(*args, **kwargs)
except BaseException as e:
f.set_exception(e)
else:
f.set_result(result)

return f

def map(self, fn, *args, **kwargs):
"""
Maps the callable *fn* via ``map(fn, *args, **kwargs)``.
"""
return map(fn, *args, **kwargs)
34 changes: 0 additions & 34 deletions loki/batch/item.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
)
from loki.logging import warning
from loki.module import Module
from loki.sourcefile import Sourcefile
from loki.subroutine import Subroutine
from loki.tools import as_tuple, flatten, CaseInsensitiveDict
from loki.types import DerivedType
Expand Down Expand Up @@ -1134,39 +1133,6 @@ def get_or_create_item(self, item_cls, item_name, scope_name, config=None):
self.item_cache[item_name] = item
return item

def get_or_create_file_item_from_path(self, path, config, frontend_args=None):
"""
Utility method to create a :any:`FileItem` for a given path

This is used to instantiate items for the first time during the scheduler's
discovery phase. It will use a cached item if it exists, or parse the source
file using the given :data:`frontend_args`.

Parameters
----------
path : str or pathlib.Path
The file path of the source file
config : :any:`SchedulerConfig`
The config object from which the item configuration will be derived
frontend_args : dict, optional
Frontend arguments that are given to :any:`Sourcefile.from_file` when
parsing the file
"""
item_name = str(path).lower()
if file_item := self.item_cache.get(item_name):
return file_item

if not frontend_args:
frontend_args = {}
if config:
frontend_args = config.create_frontend_args(path, frontend_args)

source = Sourcefile.from_file(path, **frontend_args)
item_conf = config.create_item_config(item_name) if config else None
file_item = FileItem(item_name, source=source, config=item_conf)
self.item_cache[item_name] = file_item
return file_item

def get_or_create_file_item_from_source(self, source, config):
"""
Utility method to create a :any:`FileItem` corresponding to a given source object
Expand Down
72 changes: 65 additions & 7 deletions loki/batch/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@
# granted to it by virtue of its status as an intergovernmental organisation
# nor does it submit to any jurisdiction.

from concurrent.futures import ProcessPoolExecutor
from os.path import commonpath
from pathlib import Path

from codetiming import Timer

from loki.batch.configure import SchedulerConfig
from loki.batch.executor import SerialExecutor
from loki.batch.item import (
FileItem, ModuleItem, ProcedureItem, ProcedureBindingItem,
InterfaceItem, TypeDefItem, ExternalItem, ItemFactory
Expand All @@ -20,6 +23,7 @@
from loki.batch.transformation import Transformation

from loki.frontend import FP, REGEX, RegexParserClass
from loki.sourcefile import Sourcefile
from loki.tools import as_tuple, CaseInsensitiveDict, flatten
from loki.logging import info, perf, warning, debug, error

Expand Down Expand Up @@ -118,6 +122,9 @@ class Scheduler:
full_parse: bool, optional
Flag indicating whether a full parse of all sourcefiles is required.
By default a full parse is executed, use this flag to suppress.
num_workers : int, default: 0
Number of processes to use for parallel processing. Use the default
value ``0`` to bypass parallel processing and process serially.
frontend : :any:`Frontend`, optional
Frontend to use for full parse of source files (default :any:`FP`).
"""
Expand All @@ -127,7 +134,7 @@ class Scheduler:

def __init__(self, paths, config=None, seed_routines=None, preprocess=False,
includes=None, defines=None, definitions=None, xmods=None,
omni_includes=None, full_parse=True, frontend=FP):
omni_includes=None, full_parse=True, num_workers=0, frontend=FP):
# Derive config from file or dict
if isinstance(config, SchedulerConfig):
self.config = config
Expand All @@ -138,6 +145,13 @@ def __init__(self, paths, config=None, seed_routines=None, preprocess=False,

self.full_parse = full_parse

if num_workers > 0:
# Create the parallel pool executor for parallel processing
self.executor = ProcessPoolExecutor(max_workers=num_workers)
else:
# Create a dummy executor that mimics the concurrent Exectuor API
self.executor = SerialExecutor()

# Build-related arguments to pass to the sources
self.paths = [Path(p) for p in as_tuple(paths)]
self.seeds = tuple(
Expand Down Expand Up @@ -167,6 +181,27 @@ def __init__(self, paths, config=None, seed_routines=None, preprocess=False,
# Attach interprocedural call-tree information
self._enrich()

def __del__(self):
# Shut down the parallel process pool
self.executor.shutdown()

@staticmethod
def _parse_source(source, frontend_args):
"""
Utility function that exposes the parsing step for one
:any:`SourceFile` as a pure function for the parallel
executor.

Parameters
----------
source : :any:`Sourcefile`
The sourcefile object to trigger full parse on
frontend_args : dict
Dict of arguments to pass to :meth:`make_complete`
"""
source.make_complete(**frontend_args)
return source

@Timer(logger=info, text='[Loki::Scheduler] Performed initial source scan in {:.2f}s')
def _discover(self):
"""
Expand All @@ -185,8 +220,18 @@ def _discover(self):
path_list = list(set(flatten(path_list))) # Filter duplicates and flatten

# Instantiate FileItem instances for all files in the search path
for path in path_list:
self.item_factory.get_or_create_file_item_from_path(path, self.config, frontend_args)
# TODO: This is essentially item-cache creation, and should live on ItemFactory
path_fargs = tuple(
(path, self.config.create_frontend_args(path, frontend_args)) for path in path_list
)
with Timer(logger=info, text='[Loki::Scheduler] Scheduler:: Initial file parse in {:.2f}s'):
src_futures = tuple(
self.executor.submit(Sourcefile.from_file, path, **frontend_args)
for path, fargs in path_fargs
)
sources = tuple(src.result() for src in src_futures)
for source in sources:
self.item_factory.get_or_create_file_item_from_source(source, self.config)

# Instantiate the basic list of items for files and top-level program units
# in each file, i.e., modules and subroutines
Expand Down Expand Up @@ -276,14 +321,27 @@ def _parse_items(self):
# Force the parsing of the routines
default_frontend_args = self.build_args.copy()
default_frontend_args['definitions'] = as_tuple(default_frontend_args['definitions']) + self.definitions
for item in SFilter(self.file_graph, reverse=True):
frontend_args = self.config.create_frontend_args(item.name, default_frontend_args)
item.source.make_complete(**frontend_args)

# Get the iteration order from the Sfilter
items = SFilter(self.file_graph, reverse=True)

# Build the arguments for the parser function and call parallel map
with Timer(logger=perf, text='[Loki::Scheduler] Performed the actual parse loop in {:.2f}s'):
sources = tuple(item.source for item in items)
fargs = tuple(
self.config.create_frontend_args(item.name, default_frontend_args)
for item in items
)
f_sources = self.executor.map(self._parse_source, sources, fargs)

# Set the "completed" Sourcefile on the item
for item, source in zip(items, f_sources):
item.source = source

# Re-build the SGraph after parsing to pick up all new connections
self._sgraph = SGraph.from_seed(self.seeds, self.item_factory, self.config)

@Timer(logger=perf, text='[Loki::Scheduler] Enriched call tree in {:.2f}s')
@Timer(logger=info, text='[Loki::Scheduler] Enriched call tree in {:.2f}s')
def _enrich(self):
"""
For items that have a specific enrichment list provided as part of their
Expand Down
11 changes: 9 additions & 2 deletions loki/expression/symbols.py
Original file line number Diff line number Diff line change
Expand Up @@ -1424,10 +1424,17 @@ class Cast(pmbl.Call):
Internal representation of a data type cast.
"""

def __init__(self, name, expression, kind=None, **kwargs):
init_arg_names = ('function', 'parameters', 'kind')

def __getinitargs__(self):
return (self.function, self.parameters, self.kind)

def __init__(self, function, expression, kind=None, **kwargs):
if not isinstance(function, pmbl.Expression):
function = pmbl.make_variable(function)
assert kind is None or isinstance(kind, pmbl.Expression)
self.kind = kind
super().__init__(pmbl.make_variable(name), as_tuple(expression), **kwargs)
super().__init__(function, as_tuple(expression), **kwargs)

mapper_method = intern('map_cast')

Expand Down
3 changes: 2 additions & 1 deletion loki/module.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,8 @@ def __getstate__(self):
s = self.__dict__.copy()
# TODO: We need to remove the AST, as certain AST types
# (eg. FParser) are not pickle-safe.
del s['_ast']
if '_ast' in s:
del s['_ast']
return s

def __setstate__(self, s):
Expand Down
27 changes: 27 additions & 0 deletions loki/tests/test_pickle.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,3 +306,30 @@ def test_pickle_scheduler_item(here, frontend, tmp_path):
assert loads(dumps(item_a.source.ir)) == item_a.source.ir
assert loads(dumps(item_a.source)) == item_a.source
assert loads(dumps(item_a)) == item_a


@pytest.mark.parametrize('frontend', available_frontends())
def test_pickle_expressions(here, frontend):

fcode = """
subroutine my_routine(n, a, b, c)
integer, intent(in) :: n
real, intent(in) :: a(n), b(n), c(n)
real :: x, y, z
integer :: i, j

a(:) = b(:) + c(:) ! Test arrays
x = max(x, a(1)) ! Test scalar and intrinsic

i = real(x, 4) ! Test casts
end subroutine my_routine
"""
routine = Subroutine.from_source(fcode, frontend=frontend)

# Ensure equivalence after pickle-cyle with scope-level replication
routine_new = loads(dumps(routine))
assert routine_new.spec == routine.spec
assert routine_new.body == routine.body
assert routine_new.contains == routine.contains
assert routine_new.symbol_attrs == routine.symbol_attrs
assert routine_new == routine
16 changes: 12 additions & 4 deletions scripts/loki_transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,12 +116,14 @@ def cli(debug):
@click.option('--log-level', '-l', default='info', envvar='LOKI_LOGGING',
type=click.Choice(['debug', 'detail', 'perf', 'info', 'warning', 'error']),
help='Log level to output during batch processing')
@click.option('--num-workers', type=int, default=0, envvar='LOKI_NUM_WORKERS',
help='Number of worker processes to use for parallel processing steps.')
def convert(
mode, config, build, source, header, cpp, directive, include, define, omni_include, xmod,
data_offload, remove_openmp, assume_deviceptr, frontend, trim_vector_sections,
global_var_offload, remove_derived_args, inline_members, inline_marked,
resolve_sequence_association, resolve_sequence_association_inlined_calls,
derive_argument_array_shape, eliminate_dead_code, log_level
derive_argument_array_shape, eliminate_dead_code, log_level, num_workers
):
"""
Batch-processing mode for Fortran-to-Fortran transformations that
Expand Down Expand Up @@ -169,7 +171,8 @@ def convert(
paths = [Path(p).resolve() for p in as_tuple(source)]
paths += [Path(h).resolve().parent for h in as_tuple(header)]
scheduler = Scheduler(
paths=paths, config=config, frontend=frontend, definitions=definitions, **build_args
paths=paths, config=config, frontend=frontend,
definitions=definitions, num_workers=num_workers, **build_args
)

# If requested, apply a custom pipeline from the scheduler config
Expand Down Expand Up @@ -437,9 +440,11 @@ def convert(
@click.option('--log-level', '-l', default='info', envvar='LOKI_LOGGING',
type=click.Choice(['debug', 'detail', 'perf', 'info', 'warning', 'error']),
help='Log level to output during batch processing')
@click.option('--num-workers', type=int, default=0, envvar='LOKI_NUM_WORKERS',
help='Number of worker processes to use for parallel processing steps.')
def plan(
mode, config, header, source, build, root, cpp, directive,
frontend, callgraph, plan_file, log_level
frontend, callgraph, plan_file, log_level, num_workers
):
"""
Create a "plan", a schedule of files to inject and transform for a
Expand All @@ -453,7 +458,10 @@ def plan(

paths = [Path(s).resolve() for s in source]
paths += [Path(h).resolve().parent for h in header]
scheduler = Scheduler(paths=paths, config=config, frontend=frontend, full_parse=False, preprocess=cpp)
scheduler = Scheduler(
paths=paths, config=config, frontend=frontend,
full_parse=False, preprocess=cpp, num_workers=num_workers
)

mode = mode.replace('-', '_') # Sanitize mode string

Expand Down
Loading