From c3a5c73867a92cd2f980bdb1d1e2a969764a9eca Mon Sep 17 00:00:00 2001 From: Neil Botelho Date: Sun, 29 Oct 2023 15:56:43 +0530 Subject: [PATCH 01/11] add --parallel-downloads option to install and download commands --- src/pip/_internal/cli/cmdoptions.py | 13 +++++++++++++ src/pip/_internal/commands/download.py | 1 + src/pip/_internal/commands/install.py | 1 + 3 files changed, 15 insertions(+) diff --git a/src/pip/_internal/cli/cmdoptions.py b/src/pip/_internal/cli/cmdoptions.py index 8fb16dc4a6a..04b86f0b4a7 100644 --- a/src/pip/_internal/cli/cmdoptions.py +++ b/src/pip/_internal/cli/cmdoptions.py @@ -764,6 +764,19 @@ def _handle_no_cache_dir( help="Check the build dependencies when PEP517 is used.", ) +parallel_downloads: Callable[..., Option] = partial( + Option, + "--parallel-downloads", + dest="parallel_downloads", + type="int", + metavar="n", + default=None, + help=( + "Use upto threads to download packages in parallel." + " must be greater than 0" + ), +) + def _handle_no_use_pep517( option: Option, opt: str, value: str, parser: OptionParser diff --git a/src/pip/_internal/commands/download.py b/src/pip/_internal/commands/download.py index 54247a78a65..fa96e30074a 100644 --- a/src/pip/_internal/commands/download.py +++ b/src/pip/_internal/commands/download.py @@ -52,6 +52,7 @@ def add_options(self) -> None: self.cmd_opts.add_option(cmdoptions.no_use_pep517()) self.cmd_opts.add_option(cmdoptions.check_build_deps()) self.cmd_opts.add_option(cmdoptions.ignore_requires_python()) + self.cmd_opts.add_option(cmdoptions.parallel_downloads()) self.cmd_opts.add_option( "-d", diff --git a/src/pip/_internal/commands/install.py b/src/pip/_internal/commands/install.py index 365764fc7cb..a9fbdd9581f 100644 --- a/src/pip/_internal/commands/install.py +++ b/src/pip/_internal/commands/install.py @@ -74,6 +74,7 @@ def add_options(self) -> None: self.cmd_opts.add_option(cmdoptions.constraints()) self.cmd_opts.add_option(cmdoptions.no_deps()) self.cmd_opts.add_option(cmdoptions.pre()) + self.cmd_opts.add_option(cmdoptions.parallel_downloads()) self.cmd_opts.add_option(cmdoptions.editable()) self.cmd_opts.add_option( From ed13cea1764bbb21c2cb5f242f764e42616e10f4 Mon Sep 17 00:00:00 2001 From: Neil Botelho Date: Sun, 29 Oct 2023 16:02:58 +0530 Subject: [PATCH 02/11] Add validation to parallel_downloads option --- src/pip/_internal/commands/download.py | 6 ++++++ src/pip/_internal/commands/install.py | 4 ++++ 2 files changed, 10 insertions(+) diff --git a/src/pip/_internal/commands/download.py b/src/pip/_internal/commands/download.py index fa96e30074a..1a22f6073e3 100644 --- a/src/pip/_internal/commands/download.py +++ b/src/pip/_internal/commands/download.py @@ -7,6 +7,7 @@ from pip._internal.cli.cmdoptions import make_target_python from pip._internal.cli.req_command import RequirementCommand, with_cleanup from pip._internal.cli.status_codes import SUCCESS +from pip._internal.exceptions import CommandError from pip._internal.operations.build.build_tracker import get_build_tracker from pip._internal.req.req_install import check_legacy_setup_py_options from pip._internal.utils.misc import ensure_dir, normalize_path, write_output @@ -77,6 +78,11 @@ def add_options(self) -> None: @with_cleanup def run(self, options: Values, args: List[str]) -> int: + if (options.parallel_downloads is not None) and ( + options.parallel_downloads < 1 + ): + raise CommandError("Value of '--parallel-downloads' must be greater than 0") + options.ignore_installed = True # editable doesn't really make sense for `pip download`, but the bowels # of the RequirementSet code require that property. diff --git a/src/pip/_internal/commands/install.py b/src/pip/_internal/commands/install.py index a9fbdd9581f..c02d361523d 100644 --- a/src/pip/_internal/commands/install.py +++ b/src/pip/_internal/commands/install.py @@ -268,6 +268,10 @@ def run(self, options: Values, args: List[str]) -> int: if options.use_user_site and options.target_dir is not None: raise CommandError("Can not combine '--user' and '--target'") + if (options.parallel_downloads is not None) and ( + options.parallel_downloads < 1 + ): + raise CommandError("Value of '--parallel-downloads' must be greater than 0") # Check whether the environment we're installing into is externally # managed, as specified in PEP 668. Specifying --root, --target, or # --prefix disables the check, since there's no reliable way to locate From 2334399566a926714d10f995038b55e1972a7ba3 Mon Sep 17 00:00:00 2001 From: Neil Botelho Date: Sun, 29 Oct 2023 17:29:34 +0530 Subject: [PATCH 03/11] Add parallel_downloads param to PipSesion This parameter is used to set the number of parallel downloads in BatchDownloader as well as to set the pool_connections in the HTTPAdapter to prevent 'Connection pool full' warnings --- src/pip/_internal/cli/req_command.py | 5 +++++ src/pip/_internal/network/session.py | 22 ++++++++++++++++++++-- 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/src/pip/_internal/cli/req_command.py b/src/pip/_internal/cli/req_command.py index 6f2f79c6b3f..fd37ba5d337 100644 --- a/src/pip/_internal/cli/req_command.py +++ b/src/pip/_internal/cli/req_command.py @@ -118,6 +118,10 @@ def _build_session( ssl_context = None else: ssl_context = None + if "parallel_downloads" in options.__dict__: + parallel_downloads = options.parallel_downloads + else: + parallel_downloads = None session = PipSession( cache=os.path.join(cache_dir, "http-v2") if cache_dir else None, @@ -125,6 +129,7 @@ def _build_session( trusted_hosts=options.trusted_hosts, index_urls=self._get_index_urls(options), ssl_context=ssl_context, + parallel_downloads=parallel_downloads, ) # Handle custom ca-bundles from the user diff --git a/src/pip/_internal/network/session.py b/src/pip/_internal/network/session.py index 887dc14e796..433cf1f1541 100644 --- a/src/pip/_internal/network/session.py +++ b/src/pip/_internal/network/session.py @@ -326,6 +326,7 @@ def __init__( trusted_hosts: Sequence[str] = (), index_urls: Optional[List[str]] = None, ssl_context: Optional["SSLContext"] = None, + parallel_downloads: Optional[int] = None, **kwargs: Any, ) -> None: """ @@ -362,12 +363,24 @@ def __init__( backoff_factor=0.25, ) # type: ignore + # Used to set numbers of parallel downloads in + # pip._internal.network.BatchDownloader and to set pool_connection in + # the HTTPAdapter to prevent connection pool from hitting the default(10) + # limit and throwing 'Connection pool is full' warnings + self.parallel_downloads = ( + parallel_downloads if (parallel_downloads is not None) else 1 + ) + pool_maxsize = max(self.parallel_downloads, 10) # Our Insecure HTTPAdapter disables HTTPS validation. It does not # support caching so we'll use it for all http:// URLs. # If caching is disabled, we will also use it for # https:// hosts that we've marked as ignoring # TLS errors for (trusted-hosts). - insecure_adapter = InsecureHTTPAdapter(max_retries=retries) + insecure_adapter = InsecureHTTPAdapter( + max_retries=retries, + pool_connections=pool_maxsize, + pool_maxsize=pool_maxsize, + ) # We want to _only_ cache responses on securely fetched origins or when # the host is specified as trusted. We do this because @@ -385,7 +398,12 @@ def __init__( max_retries=retries, ) else: - secure_adapter = HTTPAdapter(max_retries=retries, ssl_context=ssl_context) + secure_adapter = HTTPAdapter( + max_retries=retries, + ssl_context=ssl_context, + pool_connections=pool_maxsize, + pool_maxsize=pool_maxsize, + ) self._trusted_host_adapter = insecure_adapter self.mount("https://", secure_adapter) From a39cbc584ddd7b95deb5de84bde6a956401ea6a7 Mon Sep 17 00:00:00 2001 From: Neil Botelho Date: Sat, 4 Nov 2023 22:59:21 +0530 Subject: [PATCH 04/11] Separate common download logic from Downloaders --- src/pip/_internal/network/download.py | 69 ++++++++++++--------------- 1 file changed, 31 insertions(+), 38 deletions(-) diff --git a/src/pip/_internal/network/download.py b/src/pip/_internal/network/download.py index 79b82a570e5..4673083832c 100644 --- a/src/pip/_internal/network/download.py +++ b/src/pip/_internal/network/download.py @@ -119,6 +119,27 @@ def _http_get_download(session: PipSession, link: Link) -> Response: return resp +def _download( + link: Link, location: str, session: PipSession, progress_bar: str +) -> Tuple[str, str]: + try: + resp = _http_get_download(session, link) + except NetworkConnectionError as e: + assert e.response is not None + logger.critical("HTTP error %s while getting %s", e.response.status_code, link) + raise + + filename = _get_http_response_filename(resp, link) + filepath = os.path.join(location, filename) + + chunks = _prepare_download(resp, link, progress_bar) + with open(filepath, "wb") as content_file: + for chunk in chunks: + content_file.write(chunk) + content_type = resp.headers.get("Content-Type", "") + return filepath, content_type + + class Downloader: def __init__( self, @@ -130,24 +151,7 @@ def __init__( def __call__(self, link: Link, location: str) -> Tuple[str, str]: """Download the file given by link into location.""" - try: - resp = _http_get_download(self._session, link) - except NetworkConnectionError as e: - assert e.response is not None - logger.critical( - "HTTP error %s while getting %s", e.response.status_code, link - ) - raise - - filename = _get_http_response_filename(resp, link) - filepath = os.path.join(location, filename) - - chunks = _prepare_download(resp, link, self._progress_bar) - with open(filepath, "wb") as content_file: - for chunk in chunks: - content_file.write(chunk) - content_type = resp.headers.get("Content-Type", "") - return filepath, content_type + return _download(link, location, self._session, self._progress_bar) class BatchDownloader: @@ -159,28 +163,17 @@ def __init__( self._session = session self._progress_bar = progress_bar + def _sequential_download( + self, link: Link, location: str, progress_bar: str + ) -> Tuple[Link, Tuple[str, str]]: + filepath, content_type = _download( + link, location, self._session, self._progress_bar + ) + return link, (filepath, content_type) + def __call__( self, links: Iterable[Link], location: str ) -> Iterable[Tuple[Link, Tuple[str, str]]]: """Download the files given by links into location.""" for link in links: - try: - resp = _http_get_download(self._session, link) - except NetworkConnectionError as e: - assert e.response is not None - logger.critical( - "HTTP error %s while getting %s", - e.response.status_code, - link, - ) - raise - - filename = _get_http_response_filename(resp, link) - filepath = os.path.join(location, filename) - - chunks = _prepare_download(resp, link, self._progress_bar) - with open(filepath, "wb") as content_file: - for chunk in chunks: - content_file.write(chunk) - content_type = resp.headers.get("Content-Type", "") - yield link, (filepath, content_type) + yield self._sequential_download(link, location, self._progress_bar) From 8b41c672c46afdafbacff7c92faae8f7e6db51df Mon Sep 17 00:00:00 2001 From: Neil Botelho Date: Sat, 4 Nov 2023 23:06:00 +0530 Subject: [PATCH 05/11] Add parallel download support to BatchDownloader --- src/pip/_internal/network/download.py | 28 ++++++++++++++++++++++----- 1 file changed, 23 insertions(+), 5 deletions(-) diff --git a/src/pip/_internal/network/download.py b/src/pip/_internal/network/download.py index 4673083832c..cfe095bece2 100644 --- a/src/pip/_internal/network/download.py +++ b/src/pip/_internal/network/download.py @@ -4,6 +4,8 @@ import logging import mimetypes import os +from concurrent.futures import ThreadPoolExecutor +from functools import partial from typing import Iterable, Optional, Tuple from pip._vendor.requests.models import CONTENT_CHUNK_SIZE, Response @@ -166,14 +168,30 @@ def __init__( def _sequential_download( self, link: Link, location: str, progress_bar: str ) -> Tuple[Link, Tuple[str, str]]: - filepath, content_type = _download( - link, location, self._session, self._progress_bar - ) + filepath, content_type = _download(link, location, self._session, progress_bar) return link, (filepath, content_type) + def _download_parallel( + self, links: Iterable[Link], location: str, max_workers: int + ) -> Iterable[Tuple[Link, Tuple[str, str]]]: + with ThreadPoolExecutor(max_workers=max_workers) as pool: + _download_parallel = partial( + self._sequential_download, location=location, progress_bar="off" + ) + results = list(pool.map(_download_parallel, links)) + return results + def __call__( self, links: Iterable[Link], location: str ) -> Iterable[Tuple[Link, Tuple[str, str]]]: """Download the files given by links into location.""" - for link in links: - yield self._sequential_download(link, location, self._progress_bar) + links = list(links) + max_workers = self._session.parallel_downloads + if max_workers == 1 or len(links) == 1: + # TODO: set minimum number of links to perform parallel download + for link in links: + yield self._sequential_download(link, location, self._progress_bar) + else: + results = self._download_parallel(links, location, max_workers) + for result in results: + yield result From ec4362534cf076cb914fa525227e0537ea424e55 Mon Sep 17 00:00:00 2001 From: Neil Botelho Date: Sun, 5 Nov 2023 00:19:08 +0530 Subject: [PATCH 06/11] Add news file --- news/12388.feature.rst | 1 + 1 file changed, 1 insertion(+) create mode 100644 news/12388.feature.rst diff --git a/news/12388.feature.rst b/news/12388.feature.rst new file mode 100644 index 00000000000..27f368416d5 --- /dev/null +++ b/news/12388.feature.rst @@ -0,0 +1 @@ +Add parallel download support to BatchDownloader From 3008a6180240eb3cab6bdf7c0a9f205599df1706 Mon Sep 17 00:00:00 2001 From: Neil Botelho Date: Sun, 25 Feb 2024 10:59:17 +0530 Subject: [PATCH 07/11] update docstrings --- src/pip/_internal/network/download.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/src/pip/_internal/network/download.py b/src/pip/_internal/network/download.py index cfe095bece2..88cf5dc70bf 100644 --- a/src/pip/_internal/network/download.py +++ b/src/pip/_internal/network/download.py @@ -124,6 +124,15 @@ def _http_get_download(session: PipSession, link: Link) -> Response: def _download( link: Link, location: str, session: PipSession, progress_bar: str ) -> Tuple[str, str]: + """ + Common download logic across Downloader and BatchDownloader classes + + :param link: The Link object to be downloaded + :param location: path to download to + :param session: PipSession object + :param progress_bar: creates a `rich` progress bar is set to "on" + :return: the path to the downloaded file and the content-type + """ try: resp = _http_get_download(session, link) except NetworkConnectionError as e: @@ -174,6 +183,12 @@ def _sequential_download( def _download_parallel( self, links: Iterable[Link], location: str, max_workers: int ) -> Iterable[Tuple[Link, Tuple[str, str]]]: + + """ + Wraps the _sequential_download method in a ThreadPoolExecutor. `rich` + progress bar doesn't support naive parallelism, hence the progress bar + is disabled for parallel downloads. For more info see PR #12388 + """ with ThreadPoolExecutor(max_workers=max_workers) as pool: _download_parallel = partial( self._sequential_download, location=location, progress_bar="off" @@ -188,7 +203,6 @@ def __call__( links = list(links) max_workers = self._session.parallel_downloads if max_workers == 1 or len(links) == 1: - # TODO: set minimum number of links to perform parallel download for link in links: yield self._sequential_download(link, location, self._progress_bar) else: From fa268ae4b7b6566e0cbfd80d2a63037c18b423a6 Mon Sep 17 00:00:00 2001 From: Neil Botelho Date: Sun, 25 Feb 2024 12:20:29 +0530 Subject: [PATCH 08/11] fix pre-commit warnings --- src/pip/_internal/network/download.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/pip/_internal/network/download.py b/src/pip/_internal/network/download.py index 88cf5dc70bf..ec985f1fe44 100644 --- a/src/pip/_internal/network/download.py +++ b/src/pip/_internal/network/download.py @@ -183,10 +183,9 @@ def _sequential_download( def _download_parallel( self, links: Iterable[Link], location: str, max_workers: int ) -> Iterable[Tuple[Link, Tuple[str, str]]]: - """ Wraps the _sequential_download method in a ThreadPoolExecutor. `rich` - progress bar doesn't support naive parallelism, hence the progress bar + progress bar doesn't support naive parallelism, hence the progress bar is disabled for parallel downloads. For more info see PR #12388 """ with ThreadPoolExecutor(max_workers=max_workers) as pool: From e754fd65c14f72f32fda821bc8075433b8cd42b8 Mon Sep 17 00:00:00 2001 From: Neil Botelho Date: Wed, 6 Mar 2024 23:22:57 +0530 Subject: [PATCH 09/11] make 1 the default value of the parallel-downloads arguement --- src/pip/_internal/cli/cmdoptions.py | 2 +- src/pip/_internal/cli/req_command.py | 6 +----- src/pip/_internal/commands/download.py | 4 +--- src/pip/_internal/network/session.py | 6 ++---- 4 files changed, 5 insertions(+), 13 deletions(-) diff --git a/src/pip/_internal/cli/cmdoptions.py b/src/pip/_internal/cli/cmdoptions.py index 04b86f0b4a7..873e6798c8f 100644 --- a/src/pip/_internal/cli/cmdoptions.py +++ b/src/pip/_internal/cli/cmdoptions.py @@ -770,7 +770,7 @@ def _handle_no_cache_dir( dest="parallel_downloads", type="int", metavar="n", - default=None, + default=1, help=( "Use upto threads to download packages in parallel." " must be greater than 0" diff --git a/src/pip/_internal/cli/req_command.py b/src/pip/_internal/cli/req_command.py index fd37ba5d337..4962aab7898 100644 --- a/src/pip/_internal/cli/req_command.py +++ b/src/pip/_internal/cli/req_command.py @@ -118,10 +118,6 @@ def _build_session( ssl_context = None else: ssl_context = None - if "parallel_downloads" in options.__dict__: - parallel_downloads = options.parallel_downloads - else: - parallel_downloads = None session = PipSession( cache=os.path.join(cache_dir, "http-v2") if cache_dir else None, @@ -129,7 +125,7 @@ def _build_session( trusted_hosts=options.trusted_hosts, index_urls=self._get_index_urls(options), ssl_context=ssl_context, - parallel_downloads=parallel_downloads, + parallel_downloads=options.parallel_downloads, ) # Handle custom ca-bundles from the user diff --git a/src/pip/_internal/commands/download.py b/src/pip/_internal/commands/download.py index 1a22f6073e3..62961229b02 100644 --- a/src/pip/_internal/commands/download.py +++ b/src/pip/_internal/commands/download.py @@ -78,9 +78,7 @@ def add_options(self) -> None: @with_cleanup def run(self, options: Values, args: List[str]) -> int: - if (options.parallel_downloads is not None) and ( - options.parallel_downloads < 1 - ): + if options.parallel_downloads < 1: raise CommandError("Value of '--parallel-downloads' must be greater than 0") options.ignore_installed = True diff --git a/src/pip/_internal/network/session.py b/src/pip/_internal/network/session.py index 433cf1f1541..821bf2f5e73 100644 --- a/src/pip/_internal/network/session.py +++ b/src/pip/_internal/network/session.py @@ -326,7 +326,7 @@ def __init__( trusted_hosts: Sequence[str] = (), index_urls: Optional[List[str]] = None, ssl_context: Optional["SSLContext"] = None, - parallel_downloads: Optional[int] = None, + parallel_downloads: int = 1, **kwargs: Any, ) -> None: """ @@ -367,9 +367,7 @@ def __init__( # pip._internal.network.BatchDownloader and to set pool_connection in # the HTTPAdapter to prevent connection pool from hitting the default(10) # limit and throwing 'Connection pool is full' warnings - self.parallel_downloads = ( - parallel_downloads if (parallel_downloads is not None) else 1 - ) + self.parallel_downloads = parallel_downloads pool_maxsize = max(self.parallel_downloads, 10) # Our Insecure HTTPAdapter disables HTTPS validation. It does not # support caching so we'll use it for all http:// URLs. From 233ae103fa92f53b4bc2cd695f1aed1dac6e93f4 Mon Sep 17 00:00:00 2001 From: Neil Botelho Date: Sun, 10 Mar 2024 22:43:04 +0530 Subject: [PATCH 10/11] fix typing errors --- src/pip/_internal/cli/req_command.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/pip/_internal/cli/req_command.py b/src/pip/_internal/cli/req_command.py index 4962aab7898..1245e94d866 100644 --- a/src/pip/_internal/cli/req_command.py +++ b/src/pip/_internal/cli/req_command.py @@ -119,13 +119,17 @@ def _build_session( else: ssl_context = None + if "parallel_downloads" in options.__dict__: + parallel_downloads = options.parallel_downloads + else: + parallel_downloads = 1 session = PipSession( cache=os.path.join(cache_dir, "http-v2") if cache_dir else None, retries=retries if retries is not None else options.retries, trusted_hosts=options.trusted_hosts, index_urls=self._get_index_urls(options), ssl_context=ssl_context, - parallel_downloads=options.parallel_downloads, + parallel_downloads=parallel_downloads, ) # Handle custom ca-bundles from the user From e91234c206145aae3c747b1286b4516bd2721d0a Mon Sep 17 00:00:00 2001 From: Neil Botelho Date: Mon, 25 Mar 2024 17:50:20 +0530 Subject: [PATCH 11/11] Remove unused `None` check Co-authored-by: Tzu-ping Chung --- src/pip/_internal/commands/install.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/pip/_internal/commands/install.py b/src/pip/_internal/commands/install.py index c02d361523d..14d1ccc3fa6 100644 --- a/src/pip/_internal/commands/install.py +++ b/src/pip/_internal/commands/install.py @@ -268,9 +268,7 @@ def run(self, options: Values, args: List[str]) -> int: if options.use_user_site and options.target_dir is not None: raise CommandError("Can not combine '--user' and '--target'") - if (options.parallel_downloads is not None) and ( - options.parallel_downloads < 1 - ): + if options.parallel_downloads < 1: raise CommandError("Value of '--parallel-downloads' must be greater than 0") # Check whether the environment we're installing into is externally # managed, as specified in PEP 668. Specifying --root, --target, or