tox-poetry-installer/tox_poetry_installer/installer.py

89 lines
3.2 KiB
Python

"""Funcationality for performing virtualenv installation"""
# Silence this one globally to support the internal function imports for the proxied poetry module.
# See the docstring in 'tox_poetry_installer._poetry' for more context.
# pylint: disable=import-outside-toplevel
import concurrent.futures
import contextlib
import typing
from datetime import datetime
from typing import Collection
from typing import Set
from tox.tox_env.api import ToxEnv as ToxVirtualEnv
from tox_poetry_installer import logger
from tox_poetry_installer import utilities
if typing.TYPE_CHECKING:
from tox_poetry_installer import _poetry
def install(
poetry: "_poetry.Poetry",
venv: ToxVirtualEnv,
packages: Collection["_poetry.PoetryPackage"],
parallels: int = 0,
):
"""Install a bunch of packages to a virtualenv
:param poetry: Poetry object the packages were sourced from
:param venv: Tox virtual environment to install the packages to
:param packages: List of packages to install to the virtual environment
:param parallels: Number of parallel processes to use for installing dependency packages, or
``None`` to disable parallelization.
"""
from tox_poetry_installer import _poetry
logger.info(f"Installing {len(packages)} packages to environment at {venv.env_dir}")
install_executor = _poetry.Executor(
env=utilities.convert_virtualenv(venv),
io=_poetry.NullIO(),
pool=poetry.pool,
config=_poetry.Config(),
)
installed: Set[_poetry.PoetryPackage] = set()
def logged_install(dependency: _poetry.PoetryPackage) -> None:
start = datetime.now()
logger.debug(f"Installing {dependency}")
install_executor.execute([_poetry.Install(package=dependency)])
end = datetime.now()
logger.debug(f"Finished installing {dependency} in {end - start}")
@contextlib.contextmanager
def _optional_parallelize():
"""A bit of cheat, really
A context manager that exposes a common interface for the caller that optionally
enables/disables the usage of the parallel thread pooler depending on the value of
the ``parallels`` parameter.
"""
if parallels > 0:
with concurrent.futures.ThreadPoolExecutor(
max_workers=parallels
) as executor:
yield executor.submit
else:
yield lambda func, arg: func(arg)
with _optional_parallelize() as executor:
futures = []
for dependency in packages:
if dependency not in installed:
installed.add(dependency)
logger.debug(f"Queuing {dependency}")
future = executor(logged_install, dependency)
if future is not None:
futures.append(future)
else:
logger.debug(f"Skipping {dependency}, already installed")
logger.debug("Waiting for installs to finish...")
for future in concurrent.futures.as_completed(futures):
# Don't actually care about the return value, just waiting on the
# future to ensure any exceptions that were raised in the called
# function are propagated.
future.result()