Skip to content

Commit

Permalink
WIP: pretty_bad_protocol: Remove unused code
Browse files Browse the repository at this point in the history
This is a very basic first pass, removing code that is clearly
unused.
  • Loading branch information
legoktm committed Oct 4, 2023
1 parent 1964032 commit bfeef44
Show file tree
Hide file tree
Showing 6 changed files with 8 additions and 703 deletions.
3 changes: 0 additions & 3 deletions securedrop/pretty_bad_protocol/_meta.py
Expand Up @@ -125,10 +125,7 @@ class GPGBase:
"import": _parsers.ImportResult,
"export": _parsers.ExportResult,
"list": _parsers.ListKeys,
"sign": _parsers.Sign,
"verify": _parsers.Verify,
"expire": _parsers.KeyExpirationResult,
"signing": _parsers.KeySigningResult,
"packets": _parsers.ListPackets,
}

Expand Down
179 changes: 0 additions & 179 deletions securedrop/pretty_bad_protocol/_parsers.py
Expand Up @@ -876,105 +876,6 @@ def progress(status_code): # type: ignore[no-untyped-def]
return value


class KeyExpirationInterface:
"""Interface that guards against misuse of --edit-key combined with --command-fd"""

def __init__(self, expiration_time, passphrase=None): # type: ignore[no-untyped-def]
self._passphrase = passphrase
self._expiration_time = expiration_time
self._clean_key_expiration_option()

def _clean_key_expiration_option(self): # type: ignore[no-untyped-def]
"""validates the expiration option supplied"""
allowed_entry = re.findall(r"^(\d+)(|w|m|y)$", self._expiration_time)
if not allowed_entry:
raise UsageError("Key expiration option: %s is not valid" % self._expiration_time)

def _input_passphrase(self, _input): # type: ignore[no-untyped-def]
if self._passphrase:
return f"{_input}{self._passphrase}\n"
return _input

def _main_key_command(self): # type: ignore[no-untyped-def]
main_key_input = "expire\n%s\n" % self._expiration_time
return self._input_passphrase(main_key_input)

def _sub_key_command(self, sub_key_number): # type: ignore[no-untyped-def]
sub_key_input = "key %d\nexpire\n%s\n" % (sub_key_number, self._expiration_time)
return self._input_passphrase(sub_key_input)

def gpg_interactive_input(self, sub_keys_number): # type: ignore[no-untyped-def]
"""processes series of inputs normally supplied on --edit-key but passed through stdin
this ensures that no other --edit-key command is actually passing through.
"""
deselect_sub_key = "key 0\n"

_input = self._main_key_command()
for sub_key_number in range(1, sub_keys_number + 1):
_input += self._sub_key_command(sub_key_number) + deselect_sub_key
return "%ssave\n" % _input


class KeyExpirationResult:
"""Handle status messages for key expiry
It does not really have a job, but just to conform to the API
"""

def __init__(self, gpg): # type: ignore[no-untyped-def]
self._gpg = gpg
self.status = "ok"

def _handle_status(self, key, value): # type: ignore[no-untyped-def]
"""Parse a status code from the attached GnuPG process.
:raises: :exc:`~exceptions.ValueError` if the status message is unknown.
"""
if key in (
"USERID_HINT",
"NEED_PASSPHRASE",
"GET_HIDDEN",
"SIGEXPIRED",
"KEYEXPIRED",
"GOOD_PASSPHRASE",
"GOT_IT",
"GET_LINE",
):
pass
elif key in ("BAD_PASSPHRASE", "MISSING_PASSPHRASE"):
self.status = key.replace("_", " ").lower()
else:
self.status = "failed"
raise ValueError("Unknown status message: %r" % key)


class KeySigningResult:
"""Handle status messages for key singing"""

def __init__(self, gpg): # type: ignore[no-untyped-def]
self._gpg = gpg
self.status = "ok"

def _handle_status(self, key, value): # type: ignore[no-untyped-def]
"""Parse a status code from the attached GnuPG process.
:raises: :exc:`~exceptions.ValueError` if the status message is unknown.
"""
if key in (
"USERID_HINT",
"NEED_PASSPHRASE",
"ALREADY_SIGNED",
"GOOD_PASSPHRASE",
"GOT_IT",
"GET_BOOL",
):
pass
elif key in ("BAD_PASSPHRASE", "MISSING_PASSPHRASE"):
self.status = "{}: {}".format(key.replace("_", " ").lower(), value)
else:
self.status = "failed"
raise ValueError(f"Key signing, unknown status message: {key!r} ::{value}")


class GenKey:
"""Handle status messages for key generation.
Expand Down Expand Up @@ -1089,86 +990,6 @@ def _handle_status(self, key, value): # type: ignore[no-untyped-def]
raise ValueError("Unknown status message: %r" % key)


class Sign:
"""Parse GnuPG status messages for signing operations.
:param gpg: An instance of :class:`gnupg.GPG`.
"""

#: The type of signature created.
sig_type = None
#: The algorithm used to create the signature.
sig_algo = None
#: The hash algorithm used to create the signature.
sig_hash_also = None
#: The fingerprint of the signing keyid.
fingerprint = None
#: The timestamp on the signature.
timestamp = None
#: xxx fill me in
what = None
status = None

def __init__(self, gpg): # type: ignore[no-untyped-def]
self._gpg = gpg

def __nonzero__(self): # type: ignore[no-untyped-def]
"""Override the determination for truthfulness evaluation.
:rtype: bool
:returns: True if we have a valid signature, False otherwise.
"""
return self.fingerprint is not None

__bool__ = __nonzero__

def __str__(self): # type: ignore[no-untyped-def]
return self.data.decode(self._gpg._encoding, self._gpg._decode_errors)

def _handle_status(self, key, value): # type: ignore[no-untyped-def]
"""Parse a status code from the attached GnuPG process.
:raises: :exc:`~exceptions.ValueError` if the status message is unknown.
"""
if key in (
"USERID_HINT",
"NEED_PASSPHRASE",
"BAD_PASSPHRASE",
"GOOD_PASSPHRASE",
"MISSING_PASSPHRASE",
"PINENTRY_LAUNCHED",
"BEGIN_SIGNING",
"CARDCTRL",
"INV_SGNR",
"SIGEXPIRED",
"KEY_CONSIDERED",
):
self.status = key.replace("_", " ").lower()
elif key == "SIG_CREATED":
(
self.sig_type,
self.sig_algo,
self.sig_hash_algo,
self.what,
self.timestamp,
self.fingerprint,
) = value.split()
elif key == "KEYEXPIRED":
self.status = "skipped signing key, key expired"
if (value is not None) and (len(value) > 0):
self.status += f" on {str(value)}"
elif key == "KEYREVOKED":
self.status = "skipped signing key, key revoked"
if (value is not None) and (len(value) > 0):
self.status += f" on {str(value)}"
elif key == "NODATA":
self.status = nodata(value)
elif key == "PROGRESS":
self.status = progress(value.split(" ", 1)[0])
else:
raise ValueError("Unknown status message: %r" % key)


class ListKeys(list):
"""Handle status messages for --list-keys.
Expand Down
45 changes: 0 additions & 45 deletions securedrop/pretty_bad_protocol/_trust.py
Expand Up @@ -42,51 +42,6 @@ def _create_trustdb(cls): # type: ignore[no-untyped-def]
cls.fix_trustdb(trustdb)


def export_ownertrust(cls, trustdb=None): # type: ignore[no-untyped-def]
"""Export ownertrust to a trustdb file.
If there is already a file named :file:`trustdb.gpg` in the current GnuPG
homedir, it will be renamed to :file:`trustdb.gpg.bak`.
:param string trustdb: The path to the trustdb.gpg file. If not given,
defaults to ``'trustdb.gpg'`` in the current GnuPG
homedir.
"""
if trustdb is None:
trustdb = os.path.join(cls.homedir, "trustdb.gpg")

try:
os.rename(trustdb, trustdb + ".bak")
except OSError as err:
log.debug(str(err))

export_proc = cls._open_subprocess(["--export-ownertrust"])
tdb = open(trustdb, "wb")
_util._threaded_copy_data(export_proc.stdout, tdb)
export_proc.wait()


def import_ownertrust(cls, trustdb=None): # type: ignore[no-untyped-def]
"""Import ownertrust from a trustdb file.
:param str trustdb: The path to the trustdb.gpg file. If not given,
defaults to :file:`trustdb.gpg` in the current GnuPG
homedir.
"""
if trustdb is None:
trustdb = os.path.join(cls.homedir, "trustdb.gpg")

import_proc = cls._open_subprocess(["--import-ownertrust"])

try:
tdb = open(trustdb, "rb")
except OSError:
log.error("trustdb file %s does not exist!" % trustdb)

_util._threaded_copy_data(tdb, import_proc.stdin)
import_proc.wait()


def fix_trustdb(cls, trustdb=None): # type: ignore[no-untyped-def]
"""Attempt to repair a broken trustdb.gpg file.
Expand Down
29 changes: 0 additions & 29 deletions securedrop/pretty_bad_protocol/_util.py
Expand Up @@ -18,26 +18,15 @@

"""Extra utilities for python-gnupg."""

import io
import os
import re
import threading
from datetime import datetime
from io import BytesIO
from socket import gethostname

from . import _logger

# These are all the classes which are stream-like; they are used in
# :func:`_is_stream`.
_STREAMLIKE_TYPES = [io.IOBase]


# Directory shortcuts:
# we don't want to use this one because it writes to the install dir:
# _here = getabsfile(currentframe()).rsplit(os.path.sep, 1)[0]
_here = os.path.join(os.getcwd(), "pretty_bad_protocol") # current dir
_test = os.path.join(os.path.join(_here, "test"), "tmp") # ./tests/tmp
_user = os.environ.get("HOME") # $HOME

# Fix for Issue #74: we shouldn't expect that a $HOME directory is set in all
Expand All @@ -54,20 +43,12 @@
# that. Otherwise, we'll use the current directory + /gnupghome.
_user = os.path.sep.join([_user, "gnupghome"])

_ugpg = os.path.join(_user, ".gnupg") # $HOME/.gnupg
_conf = os.path.join(os.path.join(_user, ".config"), "python-gnupg")
# $HOME/.config/python-gnupg

# Logger is disabled by default
log = _logger.create_logger(0)

#: Compiled regex for determining a GnuPG binary's version:
_VERSION_STRING_REGEX = re.compile(r"(\d)(\.)(\d)(\.)(\d+)")


class GnuPGVersionError(ValueError):
"""Raised when we couldn't parse GnuPG's version info."""


def _copy_data(instream, outstream): # type: ignore[no-untyped-def]
"""Copy data from one stream to another.
Expand Down Expand Up @@ -315,16 +296,6 @@ def _is_file(filename): # type: ignore[no-untyped-def]
return False


def _is_stream(input): # type: ignore[no-untyped-def]
"""Check that the input is a byte stream.
:param input: An object provided for reading from or writing to.
:rtype: bool
:returns: True if :param:input is a stream, False if otherwise.
"""
return isinstance(input, tuple(_STREAMLIKE_TYPES))


def _is_list_or_tuple(instance): # type: ignore[no-untyped-def]
"""Check that ``instance`` is a list or tuple.
Expand Down

0 comments on commit bfeef44

Please sign in to comment.