Skip to content

Commit

Permalink
Merge pull request #264 from aidecoe/master
Browse files Browse the repository at this point in the history
Catch DKIM and DMARC filter exceptions and log them
  • Loading branch information
GuillaumeSeren committed Jun 1, 2020
2 parents 9bcecab + 5e68142 commit aae2dc3
Show file tree
Hide file tree
Showing 3 changed files with 248 additions and 27 deletions.
32 changes: 30 additions & 2 deletions afew/filters/DKIMValidityFilter.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,25 @@
# SPDX-License-Identifier: ISC
# Copyright (c) Amadeusz Zolnowski <aidecoe@aidecoe.name>

"""
DKIM validator filter.
Verifies DKIM signature of an e-mail which has DKIM header.
"""

import logging

import dkim
import dns.exception

from afew.filters.BaseFilter import Filter


class DKIMVerifyError(Exception):
"""Failed to verify DKIM signature.
"""


def verify_dkim(path):
"""
Verify DKIM signature of an e-mail file.
Expand All @@ -16,7 +30,10 @@ def verify_dkim(path):
with open(path, 'rb') as message_file:
message_bytes = message_file.read()

return dkim.verify(message_bytes)
try:
return dkim.verify(message_bytes)
except (dns.exception.DNSException, dkim.DKIMException) as exception:
raise DKIMVerifyError(str(exception)) from exception


class DKIMValidityFilter(Filter):
Expand All @@ -29,8 +46,19 @@ class DKIMValidityFilter(Filter):
def __init__(self, database, ok_tag='dkim-ok', fail_tag='dkim-fail'):
super().__init__(database)
self.dkim_tag = {True: ok_tag, False: fail_tag}
self.log = logging.getLogger('{}.{}'.format(
self.__module__, self.__class__.__name__))

def handle_message(self, message):
if message.get_header(self.header):
dkim_ok = all(map(verify_dkim, message.get_filenames()))
try:
dkim_ok = all(map(verify_dkim, message.get_filenames()))
except DKIMVerifyError as verify_error:
self.log.warning(
"Failed to verify DKIM of '%s': %s "
"(marked as 'dkim-fail')",
message.get_message_id(),
verify_error
)
dkim_ok = False
self.add_tags(message, self.dkim_tag[dkim_ok])
85 changes: 60 additions & 25 deletions afew/filters/DMARCReportInspectionFilter.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,19 @@
# SPDX-License-Identifier: ISC
# Copyright (c) Amadeusz Zolnowski <aidecoe@aidecoe.name>

"""
DMARC report inspection filter.
Looks into DMARC report whether all results are successful or any is failing.
Add tags 2 of the tags below:
- dmarc/dkim-ok
- dmarc/dkim-fail
- dmarc/spf-ok
- dmarc/spf-fail
"""

import logging
import re
import tempfile
import xml.etree.ElementTree as ET
Expand All @@ -9,6 +22,11 @@
from .BaseFilter import Filter


class DMARCInspectionError(Exception):
"""Failed to inspect DMARC report.
"""


class ReportFilesIterator:
"""
Iterator over DMARC reports files attached to the e-mail either directly or
Expand All @@ -25,10 +43,14 @@ def __iter__(self):
if part.get_content_type() == 'application/zip':
with tempfile.TemporaryFile(suffix='.zip') as file:
file.write(part.get_payload(decode=True))
with zipfile.ZipFile(file) as zip_file:
for member_file in zip_file.infolist():
if member_file.filename.endswith('.xml'):
yield zip_file.read(member_file)
try:
with zipfile.ZipFile(file) as zip_file:
for member_file in zip_file.infolist():
if member_file.filename.endswith('.xml'):
yield zip_file.read(member_file)
except zipfile.BadZipFile as zip_error:
raise DMARCInspectionError(str(zip_error)) \
from zip_error
elif part.get_content_type() == 'application/xml':
yield part.get_payload(decode=True)

Expand Down Expand Up @@ -73,19 +95,22 @@ def read_auth_results(document):
:returns: Results as a dictionary where keys are: `dkim` and `spf` and
values are boolean values.
"""
results = {'dkim': True, 'spf': True}
root = ET.fromstring(document)
for record in root.findall('record'):
auth_results = record.find('auth_results')
if auth_results:
dkim = auth_results.find('dkim')
if dkim:
dkim = dkim.find('result')
results['dkim'] &= not has_failed(dkim)
spf = auth_results.find('spf')
if spf:
spf = spf.find('result')
results['spf'] &= not has_failed(spf)
try:
results = {'dkim': True, 'spf': True}
root = ET.fromstring(document)
for record in root.findall('record'):
auth_results = record.find('auth_results')
if auth_results:
dkim = auth_results.find('dkim')
if dkim:
dkim = dkim.find('result')
results['dkim'] &= not has_failed(dkim)
spf = auth_results.find('spf')
if spf:
spf = spf.find('result')
results['spf'] &= not has_failed(spf)
except ET.ParseError as parse_error:
raise DMARCInspectionError(str(parse_error)) from parse_error

return results

Expand All @@ -105,18 +130,28 @@ def __init__(self, # pylint: disable=too-many-arguments
self.spf_tag = {True: spf_ok_tag, False: spf_fail_tag}
self.dmarc_subject = re.compile(r'^report domain:',
flags=re.IGNORECASE)
self.log = logging.getLogger('{}.{}'.format(
self.__module__, self.__class__.__name__))

def handle_message(self, message):
if not self.dmarc_subject.match(message.get_header('Subject')):
return

auth_results = {'dkim': True, 'spf': True}

for file_content in ReportFilesIterator(message):
document = file_content.decode('UTF-8')
auth_results = and_dict(auth_results, read_auth_results(document))

self.add_tags(message,
'dmarc',
self.dkim_tag[auth_results['dkim']],
self.spf_tag[auth_results['spf']])
try:
for file_content in ReportFilesIterator(message):
document = file_content.decode('UTF-8')
auth_results = and_dict(auth_results,
read_auth_results(document))

self.add_tags(message,
'dmarc',
self.dkim_tag[auth_results['dkim']],
self.spf_tag[auth_results['spf']])
except DMARCInspectionError as inspection_error:
self.log.error(
"Failed to verify DMARC report of '%s': %s (not tagging)",
message.get_message_id(),
inspection_error
)
158 changes: 158 additions & 0 deletions afew/tests/test_dkimvalidityfilter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
"""Test suite for DKIMValidityFilter.
"""
import unittest
from email.utils import make_msgid
from unittest import mock

import dkim
import dns.exception

from afew.Database import Database
from afew.filters.DKIMValidityFilter import DKIMValidityFilter


class _AddTags: # pylint: disable=too-few-public-methods
"""Mock for `add_tags` method of base filter. We need to easily collect
tags added by filter for test assertion.
"""
def __init__(self, tags):
self._tags = tags

def __call__(self, message, *tags):
self._tags.update(tags)


def _make_dkim_validity_filter():
"""Make `DKIMValidityFilter` with mocked `DKIMValidityFilter.add_tags`
method, so in tests we can easily check what tags were added by filter
without fiddling with db.
"""
tags = set()
add_tags = _AddTags(tags)
dkim_filter = DKIMValidityFilter(Database())
dkim_filter.add_tags = add_tags
return dkim_filter, tags


def _make_message():
"""Make mock email Message.
Mocked methods:
- `get_header()` returns non-empty string. When testing with mocked
function for verifying DKIM signature, DKIM signature doesn't matter as
long as it's non-empty string.
- `get_filenames()` returns list of non-empty string. When testing with
mocked file open, it must just be non-empty string.
- `get_message_id()` returns some generated message ID.
"""
message = mock.Mock()
message.get_header.return_value = 'sig'
message.get_filenames.return_value = ['a']
message.get_message_id.return_value = make_msgid()
return message


class TestDKIMValidityFilter(unittest.TestCase):
"""Test suite for `DKIMValidityFilter`.
"""
@mock.patch('afew.filters.DKIMValidityFilter.open',
mock.mock_open(read_data=b''))
def test_no_dkim_header(self):
"""Test message without DKIM-Signature header doesn't get any tags.
"""
dkim_filter, tags = _make_dkim_validity_filter()
message = _make_message()
message.get_header.return_value = False

with mock.patch('afew.filters.DKIMValidityFilter.dkim.verify') \
as dkim_verify:
dkim_verify.return_value = True
dkim_filter.handle_message(message)

self.assertSetEqual(tags, set())

@mock.patch('afew.filters.DKIMValidityFilter.open',
mock.mock_open(read_data=b''))
def test_dkim_all_ok(self):
"""Test message, with multiple files all having good signature, gets
only 'dkim-ok' tag.
"""
dkim_filter, tags = _make_dkim_validity_filter()
message = _make_message()
message.get_filenames.return_value = ['a', 'b', 'c']

with mock.patch('afew.filters.DKIMValidityFilter.dkim.verify') \
as dkim_verify:
dkim_verify.return_value = True
dkim_filter.handle_message(message)

self.assertSetEqual(tags, {'dkim-ok'})

@mock.patch('afew.filters.DKIMValidityFilter.open',
mock.mock_open(read_data=b''))
def test_dkim_all_fail(self):
"""Test message, with multiple files all having bad signature, gets
only 'dkim-fail' tag.
"""
dkim_filter, tags = _make_dkim_validity_filter()
message = _make_message()
message.get_filenames.return_value = ['a', 'b', 'c']

with mock.patch('afew.filters.DKIMValidityFilter.dkim.verify') \
as dkim_verify:
dkim_verify.return_value = False
dkim_filter.handle_message(message)

self.assertSetEqual(tags, {'dkim-fail'})

@mock.patch('afew.filters.DKIMValidityFilter.open',
mock.mock_open(read_data=b''))
def test_dkim_some_fail(self):
"""Test message, with multiple files but only some having bad
signature, still gets only 'dkim-fail' tag.
"""
dkim_filter, tags = _make_dkim_validity_filter()
message = _make_message()
message.get_filenames.return_value = ['a', 'b', 'c']

with mock.patch('afew.filters.DKIMValidityFilter.dkim.verify') \
as dkim_verify:
dkim_verify.side_effect = [True, False, True]
dkim_filter.handle_message(message)

self.assertSetEqual(tags, {'dkim-fail'})

@mock.patch('afew.filters.DKIMValidityFilter.open',
mock.mock_open(read_data=b''))
def test_dkim_dns_resolve_failure(self):
"""Test message, on which DNS resolution failure happens when verifying
DKIM signature, gets only 'dkim-fail' tag.
"""
dkim_filter, tags = _make_dkim_validity_filter()
message = _make_message()

with mock.patch('afew.filters.DKIMValidityFilter.dkim.verify') \
as dkim_verify:
dkim_verify.side_effect = dns.resolver.NoNameservers()
dkim_filter.handle_message(message)

self.assertSetEqual(tags, {'dkim-fail'})

@mock.patch('afew.filters.DKIMValidityFilter.open',
mock.mock_open(read_data=b''))
def test_dkim_verify_failed(self):
"""Test message, on which DKIM key parsing failure occurs, gets only
'dkim-fail' tag.
"""
dkim_filter, tags = _make_dkim_validity_filter()
message = _make_message()

with mock.patch('afew.filters.DKIMValidityFilter.dkim.verify') \
as dkim_verify:
dkim_verify.side_effect = dkim.KeyFormatError()
dkim_filter.handle_message(message)

self.assertSetEqual(tags, {'dkim-fail'})

0 comments on commit aae2dc3

Please sign in to comment.