Skip to content

Commit

Permalink
Merge pull request #22 from icaropires/small-fixes
Browse files Browse the repository at this point in the history
Fixes #20, #19 and add don't use pool when num_cpus=1 for small case
  • Loading branch information
icaropires authored Sep 8, 2020
2 parents c3f3472 + 45f9012 commit bfde2d1
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 6 deletions.
9 changes: 6 additions & 3 deletions pdf2dataset/extract_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,19 @@
import pyarrow as pa


def feature(pyarrow_type, is_helper=False, exceptions=None):
def feature(pyarrow_type=None, is_helper=False, exceptions=None, **type_args):
exceptions = exceptions or tuple()
exceptions = tuple(exceptions)

if not (pyarrow_type or is_helper):
raise ValueError("If feature is not helper, must set 'pyarrow_type'")

def decorator(feature_method):
feature_method.pyarrow_type = None
feature_method.is_feature = True
feature_method.is_helper = is_helper
feature_method.pyarrow_type = None

type_ = getattr(pa, pyarrow_type)()
type_ = getattr(pa, pyarrow_type)(**type_args)

if isinstance(type_, pa.DataType):
feature_method.pyarrow_type = type_
Expand Down
39 changes: 37 additions & 2 deletions pdf2dataset/extraction.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,9 @@ def gen_tasks(self):
Returns tasks to be processed.
For faulty files, only the page -1 will be available
'''
if self.num_cpus == 1:
return self._gen_tasks_sequential()

# 10 because this is a fast operation
chunksize = int(max(1, (len(self.files)/self.num_cpus)//10))

Expand All @@ -156,6 +159,22 @@ def gen_tasks(self):

return tasks

def _gen_tasks_sequential(self):
tasks = []

with tqdm(desc='Counting pages', unit='pages') as pbar:
results = map(self.get_pages_range, self.files)

for path, range_pages in zip(self.files, results):
new_tasks = [
self.task_class(path, p, **self.task_params)
for p in range_pages
]
tasks += new_tasks
pbar.update(len(range_pages))

return tasks

@staticmethod
def get_pages_range(file_path, file_bin=None):
# Using pdftotext to get num_pages because it's the best way I know
Expand All @@ -179,7 +198,7 @@ def _get_processing_bar(self, num_tasks, iterable=None):
num_skipped = self.num_skipped or 0

return tqdm(
iterable, total=num_tasks, initial=num_skipped,
iterable, total=num_tasks+num_skipped, initial=num_skipped,
desc='Processing pages', unit='pages', dynamic_ncols=True
)

Expand Down Expand Up @@ -279,17 +298,33 @@ def _apply_to_small(self, tasks):
- Returns the resultant dataframe
'''

if self.num_cpus == 1:
return self._apply_sequential(tasks)

num_tasks = len(tasks)
tasks = (self.copy_and_load_task(t) for t in tasks)

with Pool(self.num_cpus) as pool:
processing_tasks = pool.imap_unordered(self._process_task, tasks)
processing_tasks = pool.imap_unordered(
self._process_task, tasks, self.chunksize
)
results = self._get_processing_bar(num_tasks, processing_tasks)

self.results.append(results)

return self.results.get()

def _apply_sequential(self, tasks):
num_tasks = len(tasks)
tasks = (self.copy_and_load_task(t) for t in tasks)

processing_tasks = map(self._process_task, tasks)

results = self._get_processing_bar(num_tasks, processing_tasks)
self.results.append(results)

return self.results.get()

def filter_processed_tasks(self, tasks):
is_processed = self.results.is_tasks_processed(tasks)
tasks = [t for t, is_ in zip(tasks, is_processed) if not is_]
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "pdf2dataset"
version = "0.5.1"
version = "0.5.2"
readme = "README.md"
description = "Easily convert a subdirectory with big volume of PDF documents into a dataset, supports extracting text and images"
authors = ["Ícaro Pires <[email protected]>"]
Expand Down

0 comments on commit bfde2d1

Please sign in to comment.