Skip to content

Commit

Permalink
mapreduce| Fixes -jN for map/reduce Checkers (e.g. SimilarChecker)
Browse files Browse the repository at this point in the history
This integrate the map/reduce functionality into lint.check_process().

We previously had `map` being invoked, here we add `reduce` support.

We do this by collecting the map-data by worker and then passing it to a
reducer function on the Checker object, if available - determined by
whether they confirm to the `mapreduce_checker.MapReduceMixin` mixin
interface or nor.

This allows Checker objects to function across file-streams when using
multiprocessing/-j2+. For example SimilarChecker needs to be able to
compare data across all files.

The tests, that we also add here, check that a Checker instance returns
and reports expected data and errors, such as error-messages and stats -
at least in a exit-ok (0) situation.

On a personal note, as we are copying more data across process
boundaries, I suspect that the memory implications of this might cause
issues for large projects already running with -jN and duplicate code
detection on. That said, given that it takes a long time to perform
lints of large code bases that is an issue for the [near?] future and
likely to be part of the performance work. Either way but let's get it
working first and deal with memory and perforamnce considerations later
- I say this as there are many quick wins we can make here, e.g.
file-batching, hashing lines, data compression and so on.
  • Loading branch information
doublethefish committed Apr 23, 2020
1 parent d1bbd4c commit bcb88df
Show file tree
Hide file tree
Showing 5 changed files with 79 additions and 11 deletions.
2 changes: 2 additions & 0 deletions pylint/checkers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
# Copyright (c) 2015 Ionel Cristian Maries <[email protected]>
# Copyright (c) 2016 Moises Lopez <[email protected]>
# Copyright (c) 2017-2018 Bryce Guinta <[email protected]>
# Copyright (c) 2020 Frank Harrison <[email protected]>

# Licensed under the GPL: https://www.gnu.org/licenses/old-licenses/gpl-2.0.html
# For details: https://github.com/PyCQA/pylint/blob/master/COPYING
Expand Down Expand Up @@ -40,6 +41,7 @@
"""

from pylint.checkers.base_checker import BaseChecker, BaseTokenChecker
from pylint.checkers.mapreduce_checker import MapReduceMixin
from pylint.utils import register_plugins


Expand Down
18 changes: 18 additions & 0 deletions pylint/checkers/mapreduce_checker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
# Copyright (c) 2020 Frank Harrison <[email protected]>

# Licensed under the GPL: https://www.gnu.org/licenses/old-licenses/gpl-2.0.html
# For details: https://github.com/PyCQA/pylint/blob/master/COPYING
import abc


class MapReduceMixin(metaclass=abc.ABCMeta):
""" A mixin design to allow multiprocess/threaded runs of a Checker """

@abc.abstractmethod
def get_map_data(self):
""" Returns mergable/reducible data that will be examined """

@classmethod
@abc.abstractmethod
def reduce_map_data(cls, linter, data):
""" For a given Checker, receives data for all mapped runs """
4 changes: 2 additions & 2 deletions pylint/checkers/similar.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

import astroid

from pylint.checkers import BaseChecker, table_lines_from_stats
from pylint.checkers import BaseChecker, MapReduceMixin, table_lines_from_stats
from pylint.interfaces import IRawChecker
from pylint.reporters.ureports.nodes import Table
from pylint.utils import decoding_stream
Expand Down Expand Up @@ -291,7 +291,7 @@ def report_similarities(sect, stats, old_stats):


# wrapper to get a pylint checker from the similar class
class SimilarChecker(BaseChecker, Similar):
class SimilarChecker(BaseChecker, Similar, MapReduceMixin):
"""checks for similarities and duplicated code. This computation may be
memory / CPU intensive, so you should disable it if you experiment some
problems.
Expand Down
61 changes: 54 additions & 7 deletions pylint/lint.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
# Copyright (c) 2018 Yuval Langer <[email protected]>
# Copyright (c) 2018 Nick Drozd <[email protected]>
# Copyright (c) 2018 kapsh <[email protected]>
# Copyright (c) 2020 Frank Harrison <[email protected]>

# Licensed under the GPL: https://www.gnu.org/licenses/old-licenses/gpl-2.0.html
# For details: https://github.com/PyCQA/pylint/blob/master/COPYING
Expand Down Expand Up @@ -1280,14 +1281,18 @@ def _report_evaluation(self):


def check_parallel(linter, jobs, files, arguments=None):
"""Use the given linter to lint the files with given amount of workers (jobs)"""
# The reporter does not need to be passed to worker processess, i.e. the reporter does
"""Use the given linter to lint the files with given amount of workers (jobs)
This splits the work filestream-by-filestream. If you need to do work across
multiple files, as in the similarity-checker, then inherit from MapReduceMixin and
implement the map/reduce mixin functionality """
# The reporter does not need to be passed to worker processes, i.e. the reporter does
# not need to be pickleable
original_reporter = linter.reporter
linter.reporter = None

# The linter is inherited by all the pool's workers, i.e. the linter
# is identical to the linter object here. This is requred so that
# is identical to the linter object here. This is required so that
# a custom PyLinter object can be used.
initializer = functools.partial(_worker_initialize, arguments=arguments)
with multiprocessing.Pool(jobs, initializer=initializer, initargs=[linter]) as pool:
Expand All @@ -1298,18 +1303,29 @@ def check_parallel(linter, jobs, files, arguments=None):
linter.open()

all_stats = []

for module, messages, stats, msg_status in pool.imap_unordered(
_worker_check_single_file, files
):
all_mapreduce_data = collections.defaultdict(list)

# Maps each file to be worked on by a single _worker_check_single_file() call,
# collecting any map/reduce data by checker module so that we can 'reduce' it
# later.
for (
worker_idx, # used to merge map/reduce data across workers
module,
messages,
stats,
msg_status,
mapreduce_data,
) in pool.imap_unordered(_worker_check_single_file, files):
linter.set_current_module(module)
for msg in messages:
msg = Message(*msg)
linter.reporter.handle_message(msg)

all_stats.append(stats)
all_mapreduce_data[worker_idx].append(mapreduce_data)
linter.msg_status |= msg_status

_merge_mapreduce_data(linter, all_mapreduce_data)
linter.stats = _merge_stats(all_stats)

# Insert stats data to local checkers.
Expand Down Expand Up @@ -1342,15 +1358,46 @@ def _worker_check_single_file(file_item):
_worker_linter.open()
_worker_linter.check_single_file(name, filepath, modname)

mapreduce_data = collections.defaultdict(list)
for checker in _worker_linter.get_checkers():
try:
data = checker.get_map_data()
except AttributeError:
continue
mapreduce_data[checker.name].append(data)

msgs = [_get_new_args(m) for m in _worker_linter.reporter.messages]
return (
id(multiprocessing.current_process()),
_worker_linter.current_name,
msgs,
_worker_linter.stats,
_worker_linter.msg_status,
mapreduce_data,
)


def _merge_mapreduce_data(linter, all_mapreduce_data):
""" Merges map/reduce data across workers, invoking relevant APIs on checkers """
# First collate the data, preparing it so we can send it to the checkers for
# validation. The intent here is to collect all the mapreduce data for all checker-
# runs across processes - that will then be passed to a static method on the
# checkers to be reduced and further processed.
collated_map_reduce_data = collections.defaultdict(list)
for linter_data in all_mapreduce_data.values():
for run_data in linter_data:
for checker_name, data in run_data.items():
collated_map_reduce_data[checker_name].extend(data)

# Send the data to checkers that support/require consolidated data
original_checkers = linter.get_checkers()
for checker in original_checkers:
if checker.name in collated_map_reduce_data:
# Assume that if the check has returned map/reduce data that it has the
# reducer function
checker.reduce_map_data(linter, collated_map_reduce_data[checker.name])


# some reporting functions ####################################################


Expand Down
5 changes: 3 additions & 2 deletions tests/test_self.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@

import pytest

from pylint.constants import MAIN_CHECKER_NAME
from pylint.constants import MAIN_CHECKER_NAME, MSG_TYPES_STATUS
from pylint.lint import Run
from pylint.reporters import JSONReporter
from pylint.reporters.text import BaseReporter, ColorizedTextReporter, TextReporter
Expand Down Expand Up @@ -244,7 +244,8 @@ def test_parallel_execution(self):
join(HERE, "functional", "a", "arguments.py"),
join(HERE, "functional", "a", "arguments.py"),
],
code=2,
# We expect similarities to fail and an error
code=MSG_TYPES_STATUS["R"] | MSG_TYPES_STATUS["E"],
)

def test_parallel_execution_missing_arguments(self):
Expand Down

0 comments on commit bcb88df

Please sign in to comment.