"""Funcationality for performing virtualenv installation""" # Silence this one globally to support the internal function imports for the proxied poetry module. # See the docstring in 'tox_poetry_installer._poetry' for more context. # pylint: disable=import-outside-toplevel import concurrent.futures import contextlib import typing from datetime import datetime from typing import Collection from typing import Set from tox.tox_env.api import ToxEnv as ToxVirtualEnv from tox_poetry_installer import logger from tox_poetry_installer import utilities if typing.TYPE_CHECKING: from tox_poetry_installer import _poetry def install( poetry: "_poetry.Poetry", venv: ToxVirtualEnv, packages: Collection["_poetry.PoetryPackage"], parallels: int = 0, ): """Install a bunch of packages to a virtualenv :param poetry: Poetry object the packages were sourced from :param venv: Tox virtual environment to install the packages to :param packages: List of packages to install to the virtual environment :param parallels: Number of parallel processes to use for installing dependency packages, or ``None`` to disable parallelization. """ from tox_poetry_installer import _poetry logger.info(f"Installing {len(packages)} packages to environment at {venv.env_dir}") install_executor = _poetry.Executor( env=utilities.convert_virtualenv(venv), io=_poetry.NullIO(), pool=poetry.pool, config=_poetry.Config(), ) installed: Set[_poetry.PoetryPackage] = set() def logged_install(dependency: _poetry.PoetryPackage) -> None: start = datetime.now() logger.debug(f"Installing {dependency}") install_executor.execute([_poetry.Install(package=dependency)]) end = datetime.now() logger.debug(f"Finished installing {dependency} in {end - start}") @contextlib.contextmanager def _optional_parallelize(): """A bit of cheat, really A context manager that exposes a common interface for the caller that optionally enables/disables the usage of the parallel thread pooler depending on the value of the ``parallels`` parameter. """ if parallels > 0: with concurrent.futures.ThreadPoolExecutor( max_workers=parallels ) as executor: yield executor.submit else: yield lambda func, arg: func(arg) with _optional_parallelize() as executor: futures = [] for dependency in packages: if dependency not in installed: installed.add(dependency) logger.debug(f"Queuing {dependency}") future = executor(logged_install, dependency) if future is not None: futures.append(future) else: logger.debug(f"Skipping {dependency}, already installed") logger.debug("Waiting for installs to finish...") for future in concurrent.futures.as_completed(futures): # Don't actually care about the return value, just waiting on the # future to ensure any exceptions that were raised in the called # function are propagated. future.result()