Skip to content

Commit

Permalink
Use ThreadPool for cache fetching and rust tar for artifact extration (
Browse files Browse the repository at this point in the history
…#6748)

### Problem

https://github.com/pantsbuild/pants/blob/11aa11f2a3abc865e32a9f513abed4a3877b9272/src/python/pants/cache/restful_artifact_cache.py#L75 would hang and thus never exiting the multiprocessing pool. The closest issue I can find is https://stackoverflow.com/questions/25943923/requests-get-hangs-when-called-in-a-multiprocessing-pool which doesn't have any solution.

I was able to repro the issue almost consistently with PyCharm debug mode with
```
clean-all compile --cache-read-from="https://dummy_url/" examples/tests/java/org/pantsbuild/example/hello/greet/:
...
[zinc]
  [cache] (stuck here)
```
and the issue seems to be resolved by using `ThreadPool`.

# Solution

Since most of the untar work was done in Python and was CPU bound, we chose to move that to rust for the performance.
  • Loading branch information
wisechengyi authored Nov 19, 2018
1 parent fc53162 commit c83f6f1
Show file tree
Hide file tree
Showing 14 changed files with 284 additions and 46 deletions.
5 changes: 4 additions & 1 deletion src/python/pants/backend/jvm/tasks/jvmdoc_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import multiprocessing
import os
import re
from multiprocessing.pool import ThreadPool

from pants.backend.jvm.tasks.jvm_task import JvmTask
from pants.base.exceptions import TaskError
Expand Down Expand Up @@ -157,8 +158,10 @@ def _generate_individual(self, targets, create_jvmdoc_command):
jobs[gendir] = (target, command)

if jobs:
# Use ThreadPool as there may be dangling processes that cause identical run id and
# then buildstats error downstream. https://github.com/pantsbuild/pants/issues/6785
with contextlib.closing(
multiprocessing.Pool(processes=min(len(jobs), multiprocessing.cpu_count()))) as pool:
ThreadPool(processes=min(len(jobs), multiprocessing.cpu_count()))) as pool:
# map would be a preferable api here but fails after the 1st batch with an internal:
# ...
# File "...src/python/pants/backend/jvm/tasks/jar_create.py", line 170, in javadocjar
Expand Down
10 changes: 1 addition & 9 deletions src/python/pants/base/worker_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@
from __future__ import absolute_import, division, print_function, unicode_literals

import multiprocessing
import signal
import sys
import threading
from builtins import next, object
from multiprocessing.pool import ThreadPool
Expand Down Expand Up @@ -192,11 +190,6 @@ class SubprocPool(object):
_lock = threading.Lock()
_num_processes = multiprocessing.cpu_count()

@staticmethod
def worker_init():
# Exit quietly on sigint, otherwise we get {num_procs} keyboardinterrupt stacktraces spewn
signal.signal(signal.SIGINT, lambda *args: sys.exit())

@classmethod
def set_num_processes(cls, num_processes):
cls._num_processes = num_processes
Expand All @@ -205,8 +198,7 @@ def set_num_processes(cls, num_processes):
def foreground(cls):
with cls._lock:
if cls._pool is None:
cls._pool = multiprocessing.Pool(processes=cls._num_processes,
initializer=SubprocPool.worker_init)
cls._pool = ThreadPool(processes=cls._num_processes)
return cls._pool

@classmethod
Expand Down
1 change: 1 addition & 0 deletions src/python/pants/cache/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ python_library(
'3rdparty/python:six',
'src/python/pants/base:deprecated',
'src/python/pants/base:validation',
'src/python/pants/engine:native',
'src/python/pants/option',
'src/python/pants/subsystem',
'src/python/pants/util:contextutil',
Expand Down
37 changes: 8 additions & 29 deletions src/python/pants/cache/artifact.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,8 @@

from __future__ import absolute_import, division, print_function, unicode_literals

import errno
import os
import shutil
import tarfile
from builtins import object, str

from pants.util.contextutil import open_tar
from pants.util.dirutil import safe_mkdir, safe_mkdir_for, safe_walk
Expand Down Expand Up @@ -83,6 +80,8 @@ def extract(self):
class TarballArtifact(Artifact):
"""An artifact stored in a tarball."""

NATIVE_BINARY = None

# TODO: Expose `dereference` for tasks.
# https://github.com/pantsbuild/pants/issues/3961
def __init__(self, artifact_root, tarfile_, compression=9, dereference=True):
Expand All @@ -109,30 +108,10 @@ def collect(self, paths):
self._relpaths.add(relpath)

def extract(self):
# Note(yic): unlike the python implementation before, now we do not update self._relpath
# after the extraction.
try:
with open_tar(self._tarfile, 'r', errorlevel=2) as tarin:
# Note: We create all needed paths proactively, even though extractall() can do this for us.
# This is because we may be called concurrently on multiple artifacts that share directories,
# and there will be a race condition inside extractall(): task T1 A) sees that a directory
# doesn't exist and B) tries to create it. But in the gap between A) and B) task T2 creates
# the same directory, so T1 throws "File exists" in B).
# This actually happened, and was very hard to debug.
# Creating the paths here up front allows us to squelch that "File exists" error.
paths = []
dirs = set()
for tarinfo in tarin.getmembers():
paths.append(tarinfo.name)
if tarinfo.isdir():
dirs.add(tarinfo.name)
else:
dirs.add(os.path.dirname(tarinfo.name))
for d in dirs:
try:
os.makedirs(os.path.join(self._artifact_root, d))
except OSError as e:
if e.errno != errno.EEXIST:
raise
tarin.extractall(self._artifact_root)
self._relpaths.update(paths)
except tarfile.ReadError as e:
raise ArtifactError(str(e))
self.NATIVE_BINARY.decompress_tarball(self._tarfile.encode('utf-8'),
self._artifact_root.encode('utf-8'))
except Exception as e:
raise ArtifactError("Extracting artifact failed:\n{}".format(e))
3 changes: 3 additions & 0 deletions src/python/pants/cache/cache_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from future.moves.urllib.parse import urlparse

from pants.base.build_environment import get_buildroot
from pants.cache.artifact import TarballArtifact
from pants.cache.artifact_cache import ArtifactCacheError
from pants.cache.local_artifact_cache import LocalArtifactCache, TempLocalArtifactCache
from pants.cache.pinger import BestUrlSelector, Pinger
Expand Down Expand Up @@ -134,6 +135,8 @@ def __init__(self, options, log, task, pinger=None, resolver=None):
else:
self._resolver = NoopResolver()

TarballArtifact.NATIVE_BINARY = task.context._scheduler._scheduler._native

@staticmethod
def make_task_cache_dirname(task):
"""Use the task fingerprint as the name of the cache subdirectory to store
Expand Down
6 changes: 6 additions & 0 deletions src/python/pants/engine/native.py
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,8 @@
void lease_files_in_graph(Scheduler*);
void garbage_collect_store(Scheduler*);
PyResult decompress_tarball(char*, char*);
'''

CFFI_EXTERNS = '''
Expand Down Expand Up @@ -787,6 +789,10 @@ def buffer(self, cdata):
def to_ids_buf(self, types):
return self.context.type_ids_buf([TypeId(self.context.to_id(t)) for t in types])

def decompress_tarball(self, tarfile_path, dest_dir):
result = self.lib.decompress_tarball(tarfile_path, dest_dir)
return self.context.raise_or_return(result)

def new_tasks(self):
return self.gc(self.lib.tasks_create(), self.lib.tasks_destroy)

Expand Down
85 changes: 85 additions & 0 deletions src/rust/engine/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion src/rust/engine/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ members = [
"process_execution/bazel_protos",
"process_executor",
"resettable",
"tar_api",
"testutil",
"testutil/mock",
"testutil/local_cas",
Expand All @@ -63,6 +64,7 @@ default-members = [
"process_execution/bazel_protos",
"process_executor",
"resettable",
"tar_api",
"testutil",
"testutil/mock",
"testutil/local_cas",
Expand All @@ -77,6 +79,7 @@ fs = { path = "fs" }
futures = "^0.1.16"
graph = { path = "graph" }
hashing = { path = "hashing" }
indexmap = "1.0.2"
itertools = "0.7.2"
lazy_static = "1"
log = "0.4"
Expand All @@ -90,4 +93,4 @@ tokio = "0.1"
tempfile = "3"
ui = { path = "ui" }
url = "1.7.1"
indexmap = "1.0.2"
tar_api = { path = "tar_api" }
28 changes: 28 additions & 0 deletions src/rust/engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ extern crate process_execution;
extern crate reqwest;
extern crate resettable;
extern crate smallvec;
extern crate tar_api;
extern crate tempfile;
extern crate tokio;
extern crate ui;
Expand Down Expand Up @@ -523,6 +524,33 @@ pub extern "C" fn graph_len(scheduler_ptr: *mut Scheduler) -> u64 {
with_scheduler(scheduler_ptr, |scheduler| scheduler.core.graph.len() as u64)
}

#[no_mangle]
pub extern "C" fn decompress_tarball(
tar_path: *const raw::c_char,
output_dir: *const raw::c_char,
) -> PyResult {
let tar_path_str = PathBuf::from(
unsafe { CStr::from_ptr(tar_path) }
.to_string_lossy()
.into_owned(),
);
let output_dir_str = PathBuf::from(
unsafe { CStr::from_ptr(output_dir) }
.to_string_lossy()
.into_owned(),
);

tar_api::decompress_tgz(tar_path_str.as_path(), output_dir_str.as_path())
.map_err(|e| {
format!(
"Failed to untar {:?} to {:?}:\n{:?}",
tar_path_str.as_path(),
output_dir_str.as_path(),
e
)
}).into()
}

#[no_mangle]
pub extern "C" fn graph_visualize(
scheduler_ptr: *mut Scheduler,
Expand Down
15 changes: 15 additions & 0 deletions src/rust/engine/tar_api/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[package]
name = "tar_api"
version = "0.0.1"
authors = [ "Pants Build <[email protected]>" ]

[lib]
path = "src/tar_api.rs"

[dependencies]
flate2 = "1.0"
tar = "0.4.20"

[dev-dependencies]
tempfile = "3"
testutil = { path = "../testutil" }
Loading

0 comments on commit c83f6f1

Please sign in to comment.