Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pretty_bad_protocol: Remove unused code #6960

Draft
wants to merge 1 commit into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 0 additions & 3 deletions securedrop/pretty_bad_protocol/_meta.py
Expand Up @@ -125,10 +125,7 @@ class GPGBase:
"import": _parsers.ImportResult,
"export": _parsers.ExportResult,
"list": _parsers.ListKeys,
"sign": _parsers.Sign,
"verify": _parsers.Verify,
"expire": _parsers.KeyExpirationResult,
"signing": _parsers.KeySigningResult,
"packets": _parsers.ListPackets,
}

Expand Down
179 changes: 0 additions & 179 deletions securedrop/pretty_bad_protocol/_parsers.py
Expand Up @@ -876,105 +876,6 @@ def progress(status_code): # type: ignore[no-untyped-def]
return value


class KeyExpirationInterface:
"""Interface that guards against misuse of --edit-key combined with --command-fd"""

def __init__(self, expiration_time, passphrase=None): # type: ignore[no-untyped-def]
self._passphrase = passphrase
self._expiration_time = expiration_time
self._clean_key_expiration_option()

def _clean_key_expiration_option(self): # type: ignore[no-untyped-def]
"""validates the expiration option supplied"""
allowed_entry = re.findall(r"^(\d+)(|w|m|y)$", self._expiration_time)
if not allowed_entry:
raise UsageError("Key expiration option: %s is not valid" % self._expiration_time)

def _input_passphrase(self, _input): # type: ignore[no-untyped-def]
if self._passphrase:
return f"{_input}{self._passphrase}\n"
return _input

def _main_key_command(self): # type: ignore[no-untyped-def]
main_key_input = "expire\n%s\n" % self._expiration_time
return self._input_passphrase(main_key_input)

def _sub_key_command(self, sub_key_number): # type: ignore[no-untyped-def]
sub_key_input = "key %d\nexpire\n%s\n" % (sub_key_number, self._expiration_time)
return self._input_passphrase(sub_key_input)

def gpg_interactive_input(self, sub_keys_number): # type: ignore[no-untyped-def]
"""processes series of inputs normally supplied on --edit-key but passed through stdin
this ensures that no other --edit-key command is actually passing through.
"""
deselect_sub_key = "key 0\n"

_input = self._main_key_command()
for sub_key_number in range(1, sub_keys_number + 1):
_input += self._sub_key_command(sub_key_number) + deselect_sub_key
return "%ssave\n" % _input


class KeyExpirationResult:
"""Handle status messages for key expiry
It does not really have a job, but just to conform to the API
"""

def __init__(self, gpg): # type: ignore[no-untyped-def]
self._gpg = gpg
self.status = "ok"

def _handle_status(self, key, value): # type: ignore[no-untyped-def]
"""Parse a status code from the attached GnuPG process.
:raises: :exc:`~exceptions.ValueError` if the status message is unknown.
"""
if key in (
"USERID_HINT",
"NEED_PASSPHRASE",
"GET_HIDDEN",
"SIGEXPIRED",
"KEYEXPIRED",
"GOOD_PASSPHRASE",
"GOT_IT",
"GET_LINE",
):
pass
elif key in ("BAD_PASSPHRASE", "MISSING_PASSPHRASE"):
self.status = key.replace("_", " ").lower()
else:
self.status = "failed"
raise ValueError("Unknown status message: %r" % key)


class KeySigningResult:
"""Handle status messages for key singing"""

def __init__(self, gpg): # type: ignore[no-untyped-def]
self._gpg = gpg
self.status = "ok"

def _handle_status(self, key, value): # type: ignore[no-untyped-def]
"""Parse a status code from the attached GnuPG process.
:raises: :exc:`~exceptions.ValueError` if the status message is unknown.
"""
if key in (
"USERID_HINT",
"NEED_PASSPHRASE",
"ALREADY_SIGNED",
"GOOD_PASSPHRASE",
"GOT_IT",
"GET_BOOL",
):
pass
elif key in ("BAD_PASSPHRASE", "MISSING_PASSPHRASE"):
self.status = "{}: {}".format(key.replace("_", " ").lower(), value)
else:
self.status = "failed"
raise ValueError(f"Key signing, unknown status message: {key!r} ::{value}")


class GenKey:
"""Handle status messages for key generation.
Expand Down Expand Up @@ -1089,86 +990,6 @@ def _handle_status(self, key, value): # type: ignore[no-untyped-def]
raise ValueError("Unknown status message: %r" % key)


class Sign:
"""Parse GnuPG status messages for signing operations.
:param gpg: An instance of :class:`gnupg.GPG`.
"""

#: The type of signature created.
sig_type = None
#: The algorithm used to create the signature.
sig_algo = None
#: The hash algorithm used to create the signature.
sig_hash_also = None
#: The fingerprint of the signing keyid.
fingerprint = None
#: The timestamp on the signature.
timestamp = None
#: xxx fill me in
what = None
status = None

def __init__(self, gpg): # type: ignore[no-untyped-def]
self._gpg = gpg

def __nonzero__(self): # type: ignore[no-untyped-def]
"""Override the determination for truthfulness evaluation.
:rtype: bool
:returns: True if we have a valid signature, False otherwise.
"""
return self.fingerprint is not None

__bool__ = __nonzero__

def __str__(self): # type: ignore[no-untyped-def]
return self.data.decode(self._gpg._encoding, self._gpg._decode_errors)

def _handle_status(self, key, value): # type: ignore[no-untyped-def]
"""Parse a status code from the attached GnuPG process.
:raises: :exc:`~exceptions.ValueError` if the status message is unknown.
"""
if key in (
"USERID_HINT",
"NEED_PASSPHRASE",
"BAD_PASSPHRASE",
"GOOD_PASSPHRASE",
"MISSING_PASSPHRASE",
"PINENTRY_LAUNCHED",
"BEGIN_SIGNING",
"CARDCTRL",
"INV_SGNR",
"SIGEXPIRED",
"KEY_CONSIDERED",
):
self.status = key.replace("_", " ").lower()
elif key == "SIG_CREATED":
(
self.sig_type,
self.sig_algo,
self.sig_hash_algo,
self.what,
self.timestamp,
self.fingerprint,
) = value.split()
elif key == "KEYEXPIRED":
self.status = "skipped signing key, key expired"
if (value is not None) and (len(value) > 0):
self.status += f" on {str(value)}"
elif key == "KEYREVOKED":
self.status = "skipped signing key, key revoked"
if (value is not None) and (len(value) > 0):
self.status += f" on {str(value)}"
elif key == "NODATA":
self.status = nodata(value)
elif key == "PROGRESS":
self.status = progress(value.split(" ", 1)[0])
else:
raise ValueError("Unknown status message: %r" % key)


class ListKeys(list):
"""Handle status messages for --list-keys.
Expand Down
45 changes: 0 additions & 45 deletions securedrop/pretty_bad_protocol/_trust.py
Expand Up @@ -42,51 +42,6 @@ def _create_trustdb(cls): # type: ignore[no-untyped-def]
cls.fix_trustdb(trustdb)


def export_ownertrust(cls, trustdb=None): # type: ignore[no-untyped-def]
"""Export ownertrust to a trustdb file.
If there is already a file named :file:`trustdb.gpg` in the current GnuPG
homedir, it will be renamed to :file:`trustdb.gpg.bak`.
:param string trustdb: The path to the trustdb.gpg file. If not given,
defaults to ``'trustdb.gpg'`` in the current GnuPG
homedir.
"""
if trustdb is None:
trustdb = os.path.join(cls.homedir, "trustdb.gpg")

try:
os.rename(trustdb, trustdb + ".bak")
except OSError as err:
log.debug(str(err))

export_proc = cls._open_subprocess(["--export-ownertrust"])
tdb = open(trustdb, "wb")
_util._threaded_copy_data(export_proc.stdout, tdb)
export_proc.wait()


def import_ownertrust(cls, trustdb=None): # type: ignore[no-untyped-def]
"""Import ownertrust from a trustdb file.
:param str trustdb: The path to the trustdb.gpg file. If not given,
defaults to :file:`trustdb.gpg` in the current GnuPG
homedir.
"""
if trustdb is None:
trustdb = os.path.join(cls.homedir, "trustdb.gpg")

import_proc = cls._open_subprocess(["--import-ownertrust"])

try:
tdb = open(trustdb, "rb")
except OSError:
log.error("trustdb file %s does not exist!" % trustdb)

_util._threaded_copy_data(tdb, import_proc.stdin)
import_proc.wait()


def fix_trustdb(cls, trustdb=None): # type: ignore[no-untyped-def]
"""Attempt to repair a broken trustdb.gpg file.
Expand Down
29 changes: 0 additions & 29 deletions securedrop/pretty_bad_protocol/_util.py
Expand Up @@ -18,26 +18,15 @@

"""Extra utilities for python-gnupg."""

import io
import os
import re
import threading
from datetime import datetime
from io import BytesIO
from socket import gethostname

from . import _logger

# These are all the classes which are stream-like; they are used in
# :func:`_is_stream`.
_STREAMLIKE_TYPES = [io.IOBase]


# Directory shortcuts:
# we don't want to use this one because it writes to the install dir:
# _here = getabsfile(currentframe()).rsplit(os.path.sep, 1)[0]
_here = os.path.join(os.getcwd(), "pretty_bad_protocol") # current dir
_test = os.path.join(os.path.join(_here, "test"), "tmp") # ./tests/tmp
_user = os.environ.get("HOME") # $HOME

# Fix for Issue #74: we shouldn't expect that a $HOME directory is set in all
Expand All @@ -54,20 +43,12 @@
# that. Otherwise, we'll use the current directory + /gnupghome.
_user = os.path.sep.join([_user, "gnupghome"])

_ugpg = os.path.join(_user, ".gnupg") # $HOME/.gnupg
_conf = os.path.join(os.path.join(_user, ".config"), "python-gnupg")
# $HOME/.config/python-gnupg

# Logger is disabled by default
log = _logger.create_logger(0)

#: Compiled regex for determining a GnuPG binary's version:
_VERSION_STRING_REGEX = re.compile(r"(\d)(\.)(\d)(\.)(\d+)")


class GnuPGVersionError(ValueError):
"""Raised when we couldn't parse GnuPG's version info."""


def _copy_data(instream, outstream): # type: ignore[no-untyped-def]
"""Copy data from one stream to another.
Expand Down Expand Up @@ -315,16 +296,6 @@ def _is_file(filename): # type: ignore[no-untyped-def]
return False


def _is_stream(input): # type: ignore[no-untyped-def]
"""Check that the input is a byte stream.
:param input: An object provided for reading from or writing to.
:rtype: bool
:returns: True if :param:input is a stream, False if otherwise.
"""
return isinstance(input, tuple(_STREAMLIKE_TYPES))


def _is_list_or_tuple(instance): # type: ignore[no-untyped-def]
"""Check that ``instance`` is a list or tuple.
Expand Down