Skip to content

Commit

Permalink
Make ray process in batches and bump version
Browse files Browse the repository at this point in the history
  • Loading branch information
icaropires committed Jul 27, 2020
1 parent 8ebc9c8 commit 7f3e202
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 33 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ be understood as that the extraction won't run for hours or days and locally).

The complete list of differences are:

* Faster initialization (use multiprocessing instead of ray)
* Don't save processing progress
* Distributed processing not supported
* Don't write dataframe to disk
Expand Down
68 changes: 37 additions & 31 deletions pdf2dataset/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import threading
from concurrent.futures import ThreadPoolExecutor
from multiprocessing import Pool
from more_itertools import ichunked

import pandas as pd
import fastparquet
Expand All @@ -30,7 +31,7 @@ class TextExtraction:
def __init__(
self, input_dir, results_file='', *,
tmp_dir='', lang='por', ocr=False, small=False,
chunk_df_size=10000, simultaneous_documents=100, **kwargs
chunk_df_size=10000, **kwargs
):

self.input_dir = Path(input_dir).resolve()
Expand Down Expand Up @@ -61,7 +62,6 @@ def __init__(

self._df_lock = threading.Lock()
self.chunk_df_size = chunk_df_size
self.simultaneous_documents = simultaneous_documents

@staticmethod
def get_docs(input_dir):
Expand Down Expand Up @@ -179,19 +179,20 @@ def _gen_tasks(self, docs):
chunksize = int(max(1, (len(docs)/self.num_cpus)//10))

tasks = []
with tqdm(desc='Counting pages', unit='pages') as pbar:
with Pool(self.num_cpus) as pool:
results = pool.imap(
self._get_pages_range, docs, chunksize=chunksize
)
with Pool(self.num_cpus) as pool, \
tqdm(desc='Counting pages', unit='pages') as pbar:

results = pool.imap(
self._get_pages_range, docs, chunksize=chunksize
)

for doc, range_pages in zip(docs, results):
new_tasks = [
ExtractionTask(doc, p, lang=self.lang, ocr=self.ocr)
for p in range_pages
]
tasks += new_tasks
pbar.update(len(new_tasks))
for doc, range_pages in zip(docs, results):
new_tasks = [
ExtractionTask(doc, p, lang=self.lang, ocr=self.ocr)
for p in range_pages
]
tasks += new_tasks
pbar.update(len(new_tasks))

return tasks

Expand Down Expand Up @@ -240,29 +241,33 @@ def process_task(task):
return task, result, error

@ray.remote
def process_task_ray(task):
return TextExtraction.process_task(task)
def process_chunk_ray(chunk):
return [TextExtraction.process_task(t) for t in chunk]

def _ray_process(self, tasks):
def _ray_process(self, tasks, chunksize):
tasks = iter(tasks)
futures = []

for task in tasks:
task = self._load_task_bin(task)
futures.append(self.process_task_ray.remote(task))
chunks = ichunked(tasks, int(chunksize))

for chunk in chunks:
chunk = [self._load_task_bin(t) for t in chunk]
futures.append(self.process_chunk_ray.remote(chunk))

if len(futures) >= self.simultaneous_documents:
if len(futures) >= self.num_cpus + 4:
break

while futures:
finished, rest = ray.wait(futures, num_returns=1)
result = ray.get(finished[0])
results = ray.get(finished[0])

yield result
for result in results:
yield result

try:
task = self._load_task_bin(next(tasks))
rest.append(self.process_task_ray.remote(task))
chunk = next(chunks)
chunk = [self._load_task_bin(t) for t in chunk]
rest.append(self.process_chunk_ray.remote(chunk))
except StopIteration:
...

Expand All @@ -281,11 +286,11 @@ def _apply_big(self, tasks, num_tasks, chunksize):

ray.init(**self.ray_params)

with ThreadPoolExecutor() as thread_exe:
with ThreadPoolExecutor(max_workers=4) as thread_exec:
thread_fs, texts, errors = [], [], []
processed, not_processed = tasks

not_processed = self._ray_process(not_processed)
not_processed = self._ray_process(not_processed, chunksize)
results = it.chain(processed, not_processed)

for result in self._get_apply_bar(results, num_tasks):
Expand All @@ -301,23 +306,23 @@ def _apply_big(self, tasks, num_tasks, chunksize):

if self.tmp_dir:
thread_fs.append(
thread_exe.submit(path.write_text, text)
thread_exec.submit(path.write_text, text)
)

if len(texts) + len(errors) >= self.chunk_df_size:
# Persist to disk, aiming large amount of data
df = self._to_df(texts, errors)

thread_fs.append(
thread_exe.submit(self._append_df, df)
thread_exec.submit(self._append_df, df)
)
texts, errors = [], []

if texts or errors:
df = self._to_df(texts, errors)

thread_fs.append(
thread_exe.submit(self._append_df, df)
thread_exec.submit(self._append_df, df)
)

for f in thread_fs: # Avoid fail silently
Expand Down Expand Up @@ -368,7 +373,8 @@ def apply(self):
f" processed pages in directory '{self.tmp_dir}'"
)

chunksize = int(max(1, (len(tasks)/self.num_cpus)//100))
chunk_by_cpu = (len(not_processed)/self.num_cpus) / 100
chunksize = int(max(1, chunk_by_cpu))

if self.small:
return self._apply_small(tasks, num_tasks, chunksize)
Expand Down
5 changes: 3 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[tool.poetry]
name = "pdf2dataset"
version = "0.2.0"
version = "0.2.1"
readme = "README.md"
description = "Easily convert a big folder with PDFs into a dataset, with extracted text using OCR"
authors = ["Ícaro Pires <[email protected]>"]
Expand All @@ -12,13 +12,14 @@ classifiers = [
repository = "https://github.com/icaropires/pdf2dataset"

[tool.poetry.dependencies]
python = "^3.6"
fastparquet = "0.4.0"
more-itertools = 8.4.0
opencv-python = "4.2.0.34"
packaging = "20.4"
pdf2image = "1.13.1"
pdftotext = "2.1.4"
pytesseract = "0.3.4"
python = "^3.6"
ray = "0.8.6"
tqdm = "4.47.0"

Expand Down

0 comments on commit 7f3e202

Please sign in to comment.