Add documentation and fix runtime bug

Fix issue where a newline was added after every character of newly vaulted yaml content
Add docs, fix linting errors, confess to my sins
This commit is contained in:
Ethan Paul 2022-04-08 01:35:34 -04:00
parent 42b34468a8
commit b9bb307738
No known key found for this signature in database
GPG Key ID: D0E2CBF1245E92BF

View File

@ -7,8 +7,8 @@ import shutil
import sys import sys
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
from typing import Iterable
from typing import List from typing import List
from typing import Sequence
from typing import Tuple from typing import Tuple
from typing import Union from typing import Union
@ -41,6 +41,16 @@ def rekey(
new: ansible.parsing.vault.VaultLib, new: ansible.parsing.vault.VaultLib,
content: bytes, content: bytes,
) -> bytes: ) -> bytes:
"""Rekey vaulted content to use a new vault password
:param old: ``VaultLib`` object populated with the vault password the content is
currently encrypted with
:param new: ``VaultLib`` object populated with the vault password the content will
be re-encrypted with
:param content: Content to decrypt using ``old`` and re-encrypt using ``new``
:returns: The value of ``content`` decrypted using the existing vault password and
re-encrypted using the new vault password
"""
return new.encrypt(old.decrypt(content)) return new.encrypt(old.decrypt(content))
@ -114,7 +124,9 @@ def _confirm(prompt: str, default: bool = True) -> bool:
print("Please input one of the specified options", file=sys.stderr) print("Please input one of the specified options", file=sys.stderr)
def _process_file( # This whole function needs to be rebuilt from the ground up so I don't
# feel bad about disabling this warning
def _process_file( # pylint: disable=too-many-statements
path: Path, path: Path,
old: ansible.parsing.vault.VaultLib, old: ansible.parsing.vault.VaultLib,
new: ansible.parsing.vault.VaultLib, new: ansible.parsing.vault.VaultLib,
@ -133,8 +145,9 @@ def _process_file(
elif isinstance(data, list): elif isinstance(data, list):
for index, item in enumerate(data): for index, item in enumerate(data):
content = _process_yaml_data(content, item, f"{name}.{index}") content = _process_yaml_data(content, item, f"{name}.{index}")
elif isinstance(data, ruamel.yaml.comments.TaggedScalar): elif isinstance(data, ruamel.yaml.comments.TaggedScalar) and old.is_encrypted(
if old.is_encrypted(data.value): data.value
):
logger.debug(f"Identified vaulted content in {path} at '{name}'") logger.debug(f"Identified vaulted content in {path} at '{name}'")
confirm = ( confirm = (
_confirm(f"Rekey vault encrypted variable {name} in file {path}?") _confirm(f"Rekey vault encrypted variable {name} in file {path}?")
@ -151,14 +164,46 @@ def _process_file(
new_data = rekey(old, new, data.value.encode()) new_data = rekey(old, new, data.value.encode())
content_decoded = content.decode("utf-8") content_decoded = content.decode("utf-8")
# Ok so this next section is probably the worst possible way to do this, but I did
# it this way to solve a very specific problem that would absolutely prevent people
# from using this tool: round trip YAML format preservation. Namely, that it's impossible.
# Ruamel gets the closest to achieving this: it can do round trip format preservation
# when the starting state is in _some_ known state (this is better than competitors which
# require the starting state to be in a _specific_ known state). But given how many
# ways there are to write YAML- and by extension, how many opinions there are on the
# "correct" way to write YAML- it is not possible to configure ruamel to account for all of
# them, even if everyones YAML style was compatible with ruamel's roundtrip formatting (note:
# they aren't). So there's the problem: to be useful, this tool would need to reformat every
# YAML file it touched, which means nobody would use it.
#
# To avoid the YAML formatting problem, we need a way to replace the target content
# in the raw text of the file without dumping the parsed YAML. We want to preserve
# indendation, remove any extra newlines that would be left over, add any necessary
# newlines without clobbering the following lines, and ideally avoid reimplementing
# a YAML formatter. The answer to this problem- as the answer to so many stupid problems
# seems to be- is a regex. If this is too janky for you (I know it is for me) go support
# the estraven project I'm trying to get off the ground: https://github.com/enpaul/estraven
#
# Ok, thanks for sticking with me as I was poetic about this. The solution below...
# is awful, I can admit that. But it does work, so I'll leave it up to
# your judgement as to whether it's worthwhile or not. Here's how it works:
#
# 1. First we take the first line of the original (unmodified) vaulted content. This line
# of text has several important qualities: 1) it exists in the raw text of the file, 2)
# it is pseudo-guaranteed to be unique, and 3) it is guaranteed to exist (vaulted content
# will be at least one line long, but possibly no more)
search_data = data.value.split("\n")[1] search_data = data.value.split("\n")[1]
try: try:
# 2. Next we use a regex to grab the full line of text from the file that includes the above
# string. This is important because the full line of text will include the leading
# whitespace, which ruamel helpfully strips out from the parsed data.
# 3. Next we grab the number of leading spaces on the line using the capture group from the
# regex
padding = len( padding = len(
re.search(rf"\n(\s*){search_data}\n", content_decoded).groups()[ re.search(rf"\n(\s*){search_data}\n", content_decoded).groups()[0]
0
]
) )
except (TypeError, AttributeError): except (TypeError, AttributeError):
# This is to handle an edgecase where
if data.anchor.value: if data.anchor.value:
logger.debug( logger.debug(
f"Content replacement for encrypted content in {path} at {name} was not found, so replacement will be skipped because target is a YAML anchor" f"Content replacement for encrypted content in {path} at {name} was not found, so replacement will be skipped because target is a YAML anchor"
@ -166,24 +211,24 @@ def _process_file(
return content return content
raise raise
# 4. Now with the leading whitespace padding, we add this same number of spaces to each line
# of *both* the old vaulted data and the new vaulted data. It's important to do both because
# we'll need to do a replacement in a moment so we need to know both what we're replacing
# and what we're replacing it with.
padded_old_data = "\n".join( padded_old_data = "\n".join(
[ [f"{' ' * padding}{item}" for item in data.value.split("\n") if item]
f"{' ' * padding}{item}"
for item in data.value.split("\n")
if item
]
) )
padded_new_data = "\n".join( padded_new_data = "\n".join(
[ [
f"{' ' * padding}{item}" f"{' ' * padding}{item}"
for item in new_data.decode("utf-8") for item in new_data.decode("utf-8").split("\n")
if item if item
] ]
) )
content = content_decoded.replace( # 5. Finally, we actually replace the content. We also need to re-encode it back to bytes
padded_old_data, padded_new_data # because all file operations with vault are done in bytes mode
).encode() content = content_decoded.replace(padded_old_data, padded_new_data).encode()
return content return content
with path.open("rb") as infile: with path.open("rb") as infile:
@ -215,7 +260,7 @@ def _process_file(
if ignore: if ignore:
logger.warning(msg) logger.warning(msg)
return return
raise RuntimeError(msg) raise RuntimeError(msg) from None
elif path.suffix.lower() in YAML_FILE_EXTENSIONS: elif path.suffix.lower() in YAML_FILE_EXTENSIONS:
logger.debug(f"Identified YAML file: {path}") logger.debug(f"Identified YAML file: {path}")
@ -247,7 +292,7 @@ def _process_file(
outfile.write(updated) outfile.write(updated)
def _expand_paths(paths: Sequence[Union[Path, str]]) -> List[Path]: def _expand_paths(paths: Iterable[Path]) -> List[Path]:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
results = [] results = []
@ -267,7 +312,7 @@ def _expand_paths(paths: Sequence[Union[Path, str]]) -> List[Path]:
def _read_vault_pass_file(path: Union[Path, str]) -> str: def _read_vault_pass_file(path: Union[Path, str]) -> str:
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
try: try:
with Path(path).resolve().open() as infile: with Path(path).resolve().open(encoding="utf-8") as infile:
return infile.read() return infile.read()
except (FileNotFoundError, PermissionError): except (FileNotFoundError, PermissionError):
logger.error( logger.error(
@ -315,6 +360,7 @@ def _load_passwords(
def main(): def main():
"""Main program entrypoint and CLI interface"""
args = _get_args() args = _get_args()
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)