diff --git a/vault2vault.py b/vault2vault.py index 7d914ff..788ac8c 100644 --- a/vault2vault.py +++ b/vault2vault.py @@ -9,6 +9,7 @@ from pathlib import Path from typing import Any from typing import Iterable from typing import List +from typing import Optional from typing import Tuple from typing import Union @@ -16,10 +17,13 @@ import ruamel.yaml try: import ansible.constants - import ansible.parsing.vault + from ansible.parsing.vault import VaultSecret + from ansible.parsing.vault import VaultLib + from ansible.parsing.vault import AnsibleVaultError except ImportError: print( - "FATAL: No supported version of Ansible could be imported under the current python interpreter" + "FATAL: No supported version of Ansible could be imported under the current python interpreter", + file=sys.stderr, ) sys.exit(1) @@ -44,8 +48,8 @@ ruamel.yaml.add_constructor( def rekey( - old: ansible.parsing.vault.VaultLib, - new: ansible.parsing.vault.VaultLib, + old: VaultLib, + new: VaultLib, content: bytes, ) -> bytes: """Rekey vaulted content to use a new vault password @@ -61,6 +65,189 @@ def rekey( return new.encrypt(old.decrypt(content)) +# This whole function needs to be rebuilt from the ground up so I don't +# feel bad about disabling this warning +def _process_file( # pylint: disable=too-many-statements + path: Path, + old: VaultLib, + new: VaultLib, + interactive: bool, + backup: bool, + ignore: bool, +) -> None: + logger = logging.getLogger(__name__) + + logger.debug(f"Processing file {path}") + + def _process_yaml_data(content: bytes, data: Any, ignore: bool, name: str = ""): + if isinstance(data, dict): + for key, value in data.items(): + content = _process_yaml_data( + content, value, ignore, name=f"{name}.{key}" + ) + elif isinstance(data, list): + for index, item in enumerate(data): + content = _process_yaml_data( + content, item, ignore, name=f"{name}.{index}" + ) + elif isinstance(data, ruamel.yaml.comments.TaggedScalar) and old.is_encrypted( + data.value + ): + logger.info(f"Identified vaulted content in {path} at {name}") + confirm = ( + _confirm(f"Rekey vault encrypted variable {name} in file {path}?") + if interactive + else True + ) + + if not confirm: + logger.debug( + f"User skipped vault encrypted content in {path} at {name} via interactive mode" + ) + return content + + try: + new_data = rekey(old, new, data.value.encode()) + except AnsibleVaultError as err: + msg = f"Failed to decrypt vault encrypted data in {path} at {name} with provided vault secret" + if ignore: + logger.warning(msg) + return content + raise RuntimeError(msg) from err + content_decoded = content.decode("utf-8") + + # Ok so this next section is probably the worst possible way to do this, but I did + # it this way to solve a very specific problem that would absolutely prevent people + # from using this tool: round trip YAML format preservation. Namely, that it's impossible. + # Ruamel gets the closest to achieving this: it can do round trip format preservation + # when the starting state is in _some_ known state (this is better than competitors which + # require the starting state to be in a _specific_ known state). But given how many + # ways there are to write YAML- and by extension, how many opinions there are on the + # "correct" way to write YAML- it is not possible to configure ruamel to account for all of + # them, even if everyones YAML style was compatible with ruamel's roundtrip formatting (note: + # they aren't). So there's the problem: to be useful, this tool would need to reformat every + # YAML file it touched, which means nobody would use it. + # + # To avoid the YAML formatting problem, we need a way to replace the target content + # in the raw text of the file without dumping the parsed YAML. We want to preserve + # indendation, remove any extra newlines that would be left over, add any necessary + # newlines without clobbering the following lines, and ideally avoid reimplementing + # a YAML formatter. The answer to this problem- as the answer to so many stupid problems + # seems to be- is a regex. If this is too janky for you (I know it is for me) go support + # the estraven project I'm trying to get off the ground: https://github.com/enpaul/estraven + # + # Ok, thanks for sticking with me as I was poetic about this. The solution below... + # is awful, I can admit that. But it does work, so I'll leave it up to + # your judgement as to whether it's worthwhile or not. Here's how it works: + # + # 1. First we take the first line of the original (unmodified) vaulted content. This line + # of text has several important qualities: 1) it exists in the raw text of the file, 2) + # it is pseudo-guaranteed to be unique, and 3) it is guaranteed to exist (vaulted content + # will be at least one line long, but possibly no more) + search_data = data.value.split("\n")[1] + try: + # 2. Next we use a regex to grab the full line of text from the file that includes the above + # string. This is important because the full line of text will include the leading + # whitespace, which ruamel helpfully strips out from the parsed data. + # 3. Next we grab the number of leading spaces on the line using the capture group from the + # regex + padding = len( + re.search(rf"\n(\s*){search_data}\n", content_decoded).groups()[0] + ) + except (TypeError, AttributeError): + # This is to handle an edgecase where the vaulted content is actually a yaml anchor. For + # example, if a single vaulted secret needs to be stored under multiple variable names. + # In that case, the vaulted content iself will only appear once in the file, but the data + # parsed by ruamel will include it twice. If we fail to get a match on the first line, then + # we check whether the data is a yaml anchor and, if it is, we skip it. + if data.anchor.value: + logger.debug( + f"Content replacement for encrypted content in {path} at {name} was not found, so replacement will be skipped because target is a YAML anchor" + ) + return content + raise + + # 4. Now with the leading whitespace padding, we add this same number of spaces to each line + # of *both* the old vaulted data and the new vaulted data. It's important to do both because + # we'll need to do a replacement in a moment so we need to know both what we're replacing + # and what we're replacing it with. + padded_old_data = "\n".join( + [f"{' ' * padding}{item}" for item in data.value.split("\n") if item] + ) + padded_new_data = "\n".join( + [ + f"{' ' * padding}{item}" + for item in new_data.decode("utf-8").split("\n") + if item + ] + ) + + # 5. Finally, we actually replace the content. We also need to re-encode it back to bytes + # because all file operations with vault are done in bytes mode + content = content_decoded.replace(padded_old_data, padded_new_data).encode() + return content + + with path.open("rb") as infile: + raw = infile.read() + + # The 'is_encrypted' check doesn't rely on the vault secret in the VaultLib matching the + # secret the data was encrypted with, it just checks that the data is encrypted with some + # vault secret. We could use either `old` or `new` for this check, it doesn't actually matter. + if old.is_encrypted(raw): + logger.info(f"Identified vault encrypted file: {path}") + + confirm = ( + _confirm(f"Rekey vault encrypted file {path}?") if interactive else True + ) + + if not confirm: + logger.debug( + f"User skipped vault encrypted file {path} via interactive mode" + ) + return + + if backup: + path.rename(f"{path}.bak") + + try: + updated = rekey(old, new, raw) + except AnsibleVaultError: + msg = f"Failed to decrypt vault encrypted file {path} with provided vault secret" + if ignore: + logger.warning(msg) + return + raise RuntimeError(msg) from None + elif path.suffix.lower() in YAML_FILE_EXTENSIONS: + logger.debug(f"Identified YAML file: {path}") + + confirm = ( + _confirm(f"Search YAML file {path} for vault encrypted variables?") + if interactive + else True + ) + + data = yaml.load(raw) + + if not confirm: + logger.debug( + f"User skipped processing YAML file {path} via interactive mode" + ) + return + + if backup: + shutil.copy(path, f"{path}.bak") + + updated = _process_yaml_data(raw, data, ignore=ignore) + else: + logger.debug(f"Skipping non-vault file {path}") + return + + logger.debug(f"Writing updated file contents to {path}") + + with path.open("wb") as outfile: + outfile.write(updated) + + def _get_args() -> argparse.Namespace: parser = argparse.ArgumentParser( prog=__title__, @@ -131,174 +318,6 @@ def _confirm(prompt: str, default: bool = True) -> bool: print("Please input one of the specified options", file=sys.stderr) -# This whole function needs to be rebuilt from the ground up so I don't -# feel bad about disabling this warning -def _process_file( # pylint: disable=too-many-statements - path: Path, - old: ansible.parsing.vault.VaultLib, - new: ansible.parsing.vault.VaultLib, - interactive: bool, - backup: bool, - ignore: bool, -) -> None: - logger = logging.getLogger(__name__) - - logger.debug(f"Processing file {path}") - - def _process_yaml_data(content: bytes, data: Any, name: str = ""): - if isinstance(data, dict): - for key, value in data.items(): - content = _process_yaml_data(content, value, f"{name}.{key}") - elif isinstance(data, list): - for index, item in enumerate(data): - content = _process_yaml_data(content, item, f"{name}.{index}") - elif isinstance(data, ruamel.yaml.comments.TaggedScalar) and old.is_encrypted( - data.value - ): - logger.debug(f"Identified vaulted content in {path} at '{name}'") - confirm = ( - _confirm(f"Rekey vault encrypted variable {name} in file {path}?") - if interactive - else True - ) - - if not confirm: - logger.debug( - f"User skipped vault encrypted content in {path} at '{name}' via interactive mode" - ) - return content - - new_data = rekey(old, new, data.value.encode()) - content_decoded = content.decode("utf-8") - - # Ok so this next section is probably the worst possible way to do this, but I did - # it this way to solve a very specific problem that would absolutely prevent people - # from using this tool: round trip YAML format preservation. Namely, that it's impossible. - # Ruamel gets the closest to achieving this: it can do round trip format preservation - # when the starting state is in _some_ known state (this is better than competitors which - # require the starting state to be in a _specific_ known state). But given how many - # ways there are to write YAML- and by extension, how many opinions there are on the - # "correct" way to write YAML- it is not possible to configure ruamel to account for all of - # them, even if everyones YAML style was compatible with ruamel's roundtrip formatting (note: - # they aren't). So there's the problem: to be useful, this tool would need to reformat every - # YAML file it touched, which means nobody would use it. - # - # To avoid the YAML formatting problem, we need a way to replace the target content - # in the raw text of the file without dumping the parsed YAML. We want to preserve - # indendation, remove any extra newlines that would be left over, add any necessary - # newlines without clobbering the following lines, and ideally avoid reimplementing - # a YAML formatter. The answer to this problem- as the answer to so many stupid problems - # seems to be- is a regex. If this is too janky for you (I know it is for me) go support - # the estraven project I'm trying to get off the ground: https://github.com/enpaul/estraven - # - # Ok, thanks for sticking with me as I was poetic about this. The solution below... - # is awful, I can admit that. But it does work, so I'll leave it up to - # your judgement as to whether it's worthwhile or not. Here's how it works: - # - # 1. First we take the first line of the original (unmodified) vaulted content. This line - # of text has several important qualities: 1) it exists in the raw text of the file, 2) - # it is pseudo-guaranteed to be unique, and 3) it is guaranteed to exist (vaulted content - # will be at least one line long, but possibly no more) - search_data = data.value.split("\n")[1] - try: - # 2. Next we use a regex to grab the full line of text from the file that includes the above - # string. This is important because the full line of text will include the leading - # whitespace, which ruamel helpfully strips out from the parsed data. - # 3. Next we grab the number of leading spaces on the line using the capture group from the - # regex - padding = len( - re.search(rf"\n(\s*){search_data}\n", content_decoded).groups()[0] - ) - except (TypeError, AttributeError): - # This is to handle an edgecase where - if data.anchor.value: - logger.debug( - f"Content replacement for encrypted content in {path} at {name} was not found, so replacement will be skipped because target is a YAML anchor" - ) - return content - raise - - # 4. Now with the leading whitespace padding, we add this same number of spaces to each line - # of *both* the old vaulted data and the new vaulted data. It's important to do both because - # we'll need to do a replacement in a moment so we need to know both what we're replacing - # and what we're replacing it with. - padded_old_data = "\n".join( - [f"{' ' * padding}{item}" for item in data.value.split("\n") if item] - ) - padded_new_data = "\n".join( - [ - f"{' ' * padding}{item}" - for item in new_data.decode("utf-8").split("\n") - if item - ] - ) - - # 5. Finally, we actually replace the content. We also need to re-encode it back to bytes - # because all file operations with vault are done in bytes mode - content = content_decoded.replace(padded_old_data, padded_new_data).encode() - return content - - with path.open("rb") as infile: - raw = infile.read() - - # The 'is_encrypted' check doesn't rely on the vault secret in the VaultLib matching the - # secret the data was encrypted with, it just checks that the data is encrypted with some - # vault secret. We could use either `old` or `new` for this check, it doesn't actually matter. - if old.is_encrypted(raw): - logger.debug(f"Identified vault encrypted file: {path}") - - confirm = ( - _confirm(f"Rekey vault encrypted file {path}?") if interactive else True - ) - - if not confirm: - logger.debug( - f"User skipped vault encrypted file {path} via interactive mode" - ) - return - - if backup: - path.rename(f"{path}.bak") - - try: - updated = rekey(old, new, raw) - except ansible.parsing.vault.AnsibleVaultError: - msg = f"Failed to decrypt vault encrypted file {path} with provided vault secret" - if ignore: - logger.warning(msg) - return - raise RuntimeError(msg) from None - elif path.suffix.lower() in YAML_FILE_EXTENSIONS: - logger.debug(f"Identified YAML file: {path}") - - confirm = ( - _confirm(f"Search YAML file {path} for vault encrypted variables?") - if interactive - else True - ) - - data = yaml.load(raw) - - if not confirm: - logger.debug( - f"User skipped processing YAML file {path} via interactive mode" - ) - return - - if backup: - shutil.copy(path, f"{path}.bak") - - updated = _process_yaml_data(raw, data) - else: - logger.debug(f"Skipping non-vault file {path}") - return - - logger.debug(f"Writing updated file contents to {path}") - - with path.open("wb") as outfile: - outfile.write(updated) - - def _expand_paths(paths: Iterable[Path]) -> List[Path]: logger = logging.getLogger(__name__) @@ -309,61 +328,52 @@ def _expand_paths(paths: Iterable[Path]) -> List[Path]: logger.debug(f"Including file {path}") results.append(path) elif path.is_dir(): - logger.debug(f"Descending into subdirectory {path}") + logger.debug(f"Identifying files under {path}") results += _expand_paths(path.iterdir()) else: logger.debug(f"Discarding path {path}") return results -def _read_vault_pass_file(path: Union[Path, str]) -> str: - logger = logging.getLogger(__name__) - try: - with Path(path).resolve().open(encoding="utf-8") as infile: - return infile.read() - except (FileNotFoundError, PermissionError): - logger.error( - f"Specified vault password file '{path}' does not exist or is unreadable" - ) - sys.exit(1) +def _load_password( + fpath: Optional[str], desc: str = "", confirm: bool = True +) -> VaultSecret: + """Load a password from a file or interactively + :param fpath: Optional path to the file containing the vault password. If not provided then + the password will be prompted for interactively. + :param desc: Description text to inject into the interactive password prompt. Useful when using + this function multiple times to identify different passwords to the user. + :returns: Populated vault secret object with the loaded password + """ -def _load_passwords( - old_file: str, new_file: str -) -> Tuple[ansible.parsing.vault.VaultSecret, ansible.parsing.vault.VaultSecret]: logger = logging.getLogger(__name__) - if old_file: - old_vault_pass = _read_vault_pass_file(old_file) - logger.info(f"Loaded old vault password from {Path(old_file).resolve()}") - else: - logger.debug( - "No old vault password file provided, prompting for old vault password input" - ) - old_vault_pass = getpass.getpass( - prompt="Old Ansible Vault password: ", stream=sys.stderr + if fpath: + try: + with Path(fpath).resolve().open("rb", encoding="utf-8") as infile: + return VaultSecret(infile.read()) + except (FileNotFoundError, PermissionError) as err: + raise RuntimeError( + f"Specified vault password file '{fpath}' does not exist or is unreadable" + ) from err + + logger.debug("No vault password file provided, prompting for interactive input") + + password_1 = getpass.getpass( + prompt=f"Enter {desc} Ansible Vault password: ", stream=sys.stderr + ) + + if confirm: + password_2 = getpass.getpass( + prompt=f"Confirm (re-enter) {desc} Ansible Vault password: ", + stream=sys.stderr, ) - if new_file: - new_vault_pass = _read_vault_pass_file(new_file) - logger.info(f"Loaded new vault password from {Path(new_file).resolve()}") - else: - logger.debug( - "No new vault password file provided, prompting for new vault password input" - ) - new_vault_pass = getpass.getpass( - prompt="New Ansible Vault password: ", stream=sys.stderr - ) - confirm = getpass.getpass( - prompt="Confirm new Ansible Vault password: ", stream=sys.stderr - ) - if new_vault_pass != confirm: - logger.error("New vault passwords do not match") - sys.exit(1) + if password_1 != password_2: + raise RuntimeError(f"Provided {desc} passwords do not match") - return ansible.parsing.vault.VaultSecret( - old_vault_pass.encode("utf-8") - ), ansible.parsing.vault.VaultSecret(new_vault_pass.encode("utf-8")) + return VaultSecret(password_1.encode("utf-8")) def main(): @@ -383,17 +393,26 @@ def main(): sys.exit(0) if not args.paths: - logger.warning("No path provided, nothing to do!") + logger.warning("No paths provided, nothing to do!") sys.exit(0) - old_pass, new_pass = _load_passwords(args.old_pass_file, args.new_pass_file) - in_vault = ansible.parsing.vault.VaultLib([(args.vault_id, old_pass)]) - out_vault = ansible.parsing.vault.VaultLib([(args.vault_id, new_pass)]) + try: + old_pass = _load_password(args.old_pass_file, desc="existing", confirm=False) + new_pass = _load_password(args.new_pass_file, desc="new", confirm=True) - logger.debug( + in_vault = VaultLib([(args.vault_id, old_pass)]) + out_vault = VaultLib([(args.vault_id, new_pass)]) + except RuntimeError as err: + logger.error(str(err)) + sys.exit(1) + except KeyboardInterrupt: + sys.exit(130) + + logger.info( f"Identifying all files under {len(args.paths)} input paths: {', '.join(args.paths)}" ) files = _expand_paths(args.paths) + logger.info(f"Identified {len(files)} files for processing") for filepath in files: _process_file(