From b9bb307738e8564abc6b6d7e2e24ca7bcf77d7ea Mon Sep 17 00:00:00 2001 From: Ethan Paul <24588726+enpaul@users.noreply.github.com> Date: Fri, 8 Apr 2022 01:35:34 -0400 Subject: [PATCH] Add documentation and fix runtime bug Fix issue where a newline was added after every character of newly vaulted yaml content Add docs, fix linting errors, confess to my sins --- vault2vault.py | 146 ++++++++++++++++++++++++++++++++----------------- 1 file changed, 96 insertions(+), 50 deletions(-) diff --git a/vault2vault.py b/vault2vault.py index 72b1a7c..5b528c4 100644 --- a/vault2vault.py +++ b/vault2vault.py @@ -7,8 +7,8 @@ import shutil import sys from pathlib import Path from typing import Any +from typing import Iterable from typing import List -from typing import Sequence from typing import Tuple from typing import Union @@ -41,6 +41,16 @@ def rekey( new: ansible.parsing.vault.VaultLib, content: bytes, ) -> bytes: + """Rekey vaulted content to use a new vault password + + :param old: ``VaultLib`` object populated with the vault password the content is + currently encrypted with + :param new: ``VaultLib`` object populated with the vault password the content will + be re-encrypted with + :param content: Content to decrypt using ``old`` and re-encrypt using ``new`` + :returns: The value of ``content`` decrypted using the existing vault password and + re-encrypted using the new vault password + """ return new.encrypt(old.decrypt(content)) @@ -114,7 +124,9 @@ def _confirm(prompt: str, default: bool = True) -> bool: print("Please input one of the specified options", file=sys.stderr) -def _process_file( +# This whole function needs to be rebuilt from the ground up so I don't +# feel bad about disabling this warning +def _process_file( # pylint: disable=too-many-statements path: Path, old: ansible.parsing.vault.VaultLib, new: ansible.parsing.vault.VaultLib, @@ -133,57 +145,90 @@ def _process_file( elif isinstance(data, list): for index, item in enumerate(data): content = _process_yaml_data(content, item, f"{name}.{index}") - elif isinstance(data, ruamel.yaml.comments.TaggedScalar): - if old.is_encrypted(data.value): - logger.debug(f"Identified vaulted content in {path} at '{name}'") - confirm = ( - _confirm(f"Rekey vault encrypted variable {name} in file {path}?") - if interactive - else True - ) + elif isinstance(data, ruamel.yaml.comments.TaggedScalar) and old.is_encrypted( + data.value + ): + logger.debug(f"Identified vaulted content in {path} at '{name}'") + confirm = ( + _confirm(f"Rekey vault encrypted variable {name} in file {path}?") + if interactive + else True + ) - if not confirm: + if not confirm: + logger.debug( + f"User skipped vault encrypted content in {path} at '{name}' via interactive mode" + ) + return content + + new_data = rekey(old, new, data.value.encode()) + content_decoded = content.decode("utf-8") + + # Ok so this next section is probably the worst possible way to do this, but I did + # it this way to solve a very specific problem that would absolutely prevent people + # from using this tool: round trip YAML format preservation. Namely, that it's impossible. + # Ruamel gets the closest to achieving this: it can do round trip format preservation + # when the starting state is in _some_ known state (this is better than competitors which + # require the starting state to be in a _specific_ known state). But given how many + # ways there are to write YAML- and by extension, how many opinions there are on the + # "correct" way to write YAML- it is not possible to configure ruamel to account for all of + # them, even if everyones YAML style was compatible with ruamel's roundtrip formatting (note: + # they aren't). So there's the problem: to be useful, this tool would need to reformat every + # YAML file it touched, which means nobody would use it. + # + # To avoid the YAML formatting problem, we need a way to replace the target content + # in the raw text of the file without dumping the parsed YAML. We want to preserve + # indendation, remove any extra newlines that would be left over, add any necessary + # newlines without clobbering the following lines, and ideally avoid reimplementing + # a YAML formatter. The answer to this problem- as the answer to so many stupid problems + # seems to be- is a regex. If this is too janky for you (I know it is for me) go support + # the estraven project I'm trying to get off the ground: https://github.com/enpaul/estraven + # + # Ok, thanks for sticking with me as I was poetic about this. The solution below... + # is awful, I can admit that. But it does work, so I'll leave it up to + # your judgement as to whether it's worthwhile or not. Here's how it works: + # + # 1. First we take the first line of the original (unmodified) vaulted content. This line + # of text has several important qualities: 1) it exists in the raw text of the file, 2) + # it is pseudo-guaranteed to be unique, and 3) it is guaranteed to exist (vaulted content + # will be at least one line long, but possibly no more) + search_data = data.value.split("\n")[1] + try: + # 2. Next we use a regex to grab the full line of text from the file that includes the above + # string. This is important because the full line of text will include the leading + # whitespace, which ruamel helpfully strips out from the parsed data. + # 3. Next we grab the number of leading spaces on the line using the capture group from the + # regex + padding = len( + re.search(rf"\n(\s*){search_data}\n", content_decoded).groups()[0] + ) + except (TypeError, AttributeError): + # This is to handle an edgecase where + if data.anchor.value: logger.debug( - f"User skipped vault encrypted content in {path} at '{name}' via interactive mode" + f"Content replacement for encrypted content in {path} at {name} was not found, so replacement will be skipped because target is a YAML anchor" ) return content + raise - new_data = rekey(old, new, data.value.encode()) - content_decoded = content.decode("utf-8") + # 4. Now with the leading whitespace padding, we add this same number of spaces to each line + # of *both* the old vaulted data and the new vaulted data. It's important to do both because + # we'll need to do a replacement in a moment so we need to know both what we're replacing + # and what we're replacing it with. + padded_old_data = "\n".join( + [f"{' ' * padding}{item}" for item in data.value.split("\n") if item] + ) + padded_new_data = "\n".join( + [ + f"{' ' * padding}{item}" + for item in new_data.decode("utf-8").split("\n") + if item + ] + ) - search_data = data.value.split("\n")[1] - try: - padding = len( - re.search(rf"\n(\s*){search_data}\n", content_decoded).groups()[ - 0 - ] - ) - except (TypeError, AttributeError): - if data.anchor.value: - logger.debug( - f"Content replacement for encrypted content in {path} at {name} was not found, so replacement will be skipped because target is a YAML anchor" - ) - return content - raise - - padded_old_data = "\n".join( - [ - f"{' ' * padding}{item}" - for item in data.value.split("\n") - if item - ] - ) - padded_new_data = "\n".join( - [ - f"{' ' * padding}{item}" - for item in new_data.decode("utf-8") - if item - ] - ) - - content = content_decoded.replace( - padded_old_data, padded_new_data - ).encode() + # 5. Finally, we actually replace the content. We also need to re-encode it back to bytes + # because all file operations with vault are done in bytes mode + content = content_decoded.replace(padded_old_data, padded_new_data).encode() return content with path.open("rb") as infile: @@ -215,7 +260,7 @@ def _process_file( if ignore: logger.warning(msg) return - raise RuntimeError(msg) + raise RuntimeError(msg) from None elif path.suffix.lower() in YAML_FILE_EXTENSIONS: logger.debug(f"Identified YAML file: {path}") @@ -247,7 +292,7 @@ def _process_file( outfile.write(updated) -def _expand_paths(paths: Sequence[Union[Path, str]]) -> List[Path]: +def _expand_paths(paths: Iterable[Path]) -> List[Path]: logger = logging.getLogger(__name__) results = [] @@ -267,7 +312,7 @@ def _expand_paths(paths: Sequence[Union[Path, str]]) -> List[Path]: def _read_vault_pass_file(path: Union[Path, str]) -> str: logger = logging.getLogger(__name__) try: - with Path(path).resolve().open() as infile: + with Path(path).resolve().open(encoding="utf-8") as infile: return infile.read() except (FileNotFoundError, PermissionError): logger.error( @@ -315,6 +360,7 @@ def _load_passwords( def main(): + """Main program entrypoint and CLI interface""" args = _get_args() logger = logging.getLogger(__name__)