Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add dns-account-01 challenge support #9887

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
103 changes: 94 additions & 9 deletions acme/acme/_internal/tests/challenges_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
from unittest import mock
import urllib.parse as urllib_parse

from base64 import b32encode
from hashlib import sha256
import josepy as jose
from josepy.jwk import JWKEC
import OpenSSL
Expand Down Expand Up @@ -127,7 +129,7 @@ def setUp(self):

def test_validation_domain_name(self):
assert '_acme-challenge.www.example.com' == \
self.msg.validation_domain_name('www.example.com')
self.msg.validation_domain_name('www.example.com')

def test_validation(self):
assert "rAa7iIg4K2y63fvUhCfy8dP1Xl7wEhmQq0oChTcE3Zk" == \
Expand All @@ -145,6 +147,84 @@ def test_from_json_hashable(self):
hash(DNS01.from_json(self.jmsg))


class DNSACCOUNT01ResponseTest(unittest.TestCase):

def setUp(self):
from acme.challenges import DNSACCOUNT01Response
self.msg = DNSACCOUNT01Response(key_authorization=u'foo')
self.jmsg = {
'resource': 'challenge',
'type': 'dns-account-01',
'keyAuthorization': u'foo',
}

from acme.challenges import DNSACCOUNT01
self.chall = DNSACCOUNT01(token=(b'x' * 16))
self.response = self.chall.response(KEY)

def test_to_partial_json(self):
assert {} == self.msg.to_partial_json()

def test_from_json(self):
from acme.challenges import DNSACCOUNT01Response
assert self.msg == DNSACCOUNT01Response.from_json(self.jmsg)

def test_from_json_hashable(self):
from acme.challenges import DNSACCOUNT01Response
hash(DNSACCOUNT01Response.from_json(self.jmsg))

def test_simple_verify_failure(self):
key2 = jose.JWKRSA.load(test_util.load_vector('rsa256_key.pem'))
public_key = key2.public_key()
verified = self.response.simple_verify(self.chall, "local", public_key)
assert not verified

def test_simple_verify_success(self):
public_key = KEY.public_key()
verified = self.response.simple_verify(self.chall, "local", public_key)
assert verified


class DNSACCOUNT01Test(unittest.TestCase):

def setUp(self):
from acme.challenges import DNSACCOUNT01
self.msg = DNSACCOUNT01(token=jose.decode_b64jose(
'evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ+PCt92wr+oA'))
self.jmsg = {
'type': 'dns-account-01',
'token': 'evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA',
}

from acme.challenges import DNSACCOUNT01
self.accountURI = "https://example.com/acme/acct/1234"
self.scope = "wildcard"
# "_" || base32(SHA-256(<ACCOUNT_RESOURCE_URL>)[0:10]) || "._acme-" || <SCOPE> || "-challenge"
self.accountLabel = '_' + b32encode(
sha256(self.accountURI.encode()).digest()[:10]
).decode().lower() + '._acme-' + self.scope + '-challenge'

def test_validation_domain_name(self):
assert self.accountLabel + '.www.example.com' == \
self.msg.validation_domain_name(
self.accountURI, self.scope, 'www.example.com')

def test_validation(self):
assert "rAa7iIg4K2y63fvUhCfy8dP1Xl7wEhmQq0oChTcE3Zk" == \
self.msg.validation(KEY)

def test_to_partial_json(self):
assert self.jmsg == self.msg.to_partial_json()

def test_from_json(self):
from acme.challenges import DNSACCOUNT01
assert self.msg == DNSACCOUNT01.from_json(self.jmsg)

def test_from_json_hashable(self):
from acme.challenges import DNSACCOUNT01
hash(DNSACCOUNT01.from_json(self.jmsg))


class HTTP01ResponseTest(unittest.TestCase):

def setUp(self):
Expand Down Expand Up @@ -221,7 +301,8 @@ def test_simple_verify_timeout(self, mock_get):
mock_get.assert_called_once_with(self.chall.uri("local"), verify=False,
timeout=30)
mock_get.reset_mock()
self.response.simple_verify(self.chall, "local", KEY.public_key(), timeout=1234)
self.response.simple_verify(
self.chall, "local", KEY.public_key(), timeout=1234)
mock_get.assert_called_once_with(self.chall.uri("local"), verify=False,
timeout=1234)

Expand All @@ -240,7 +321,7 @@ def setUp(self):

def test_path(self):
assert self.msg.path == '/.well-known/acme-challenge/' \
'evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA'
'evaGxfADs6pSRb2LAv9IZf17Dt3juxGJ-PCt92wr-oA'

def test_uri(self):
assert 'http://example.com/.well-known/acme-challenge/' \
Expand Down Expand Up @@ -303,7 +384,7 @@ def test_gen_verify_cert_gen_key(self):

def test_verify_bad_cert(self):
assert not self.response.verify_cert(self.domain,
test_util.load_cert('cert.pem'))
test_util.load_cert('cert.pem'))

def test_verify_bad_domain(self):
key1 = test_util.load_pyopenssl_private_key('rsa512_key.pem')
Expand All @@ -319,8 +400,8 @@ def test_simple_verify_bad_key_authorization(self):
def test_simple_verify(self, mock_verify_cert):
mock_verify_cert.return_value = mock.sentinel.verification
assert mock.sentinel.verification == self.response.simple_verify(
self.chall, self.domain, KEY.public_key(),
cert=mock.sentinel.cert)
self.chall, self.domain, KEY.public_key(),
cert=mock.sentinel.cert)
mock_verify_cert.assert_called_once_with(
self.response, self.domain, mock.sentinel.cert)

Expand Down Expand Up @@ -406,7 +487,8 @@ def test_from_json_hashable(self):
hash(DNS.from_json(self.jmsg))

def test_gen_check_validation(self):
ec_key_secp384r1 = JWKEC(key=test_util.load_ecdsa_private_key('ec_secp384r1_key.pem'))
ec_key_secp384r1 = JWKEC(
key=test_util.load_ecdsa_private_key('ec_secp384r1_key.pem'))
for key, alg in [(KEY, jose.RS256), (ec_key_secp384r1, jose.ES384)]:
with self.subTest(key=key, alg=alg):
assert self.msg.check_validation(
Expand Down Expand Up @@ -442,10 +524,12 @@ def test_gen_response(self):
assert response.validation == mock.sentinel.validation

def test_validation_domain_name(self):
assert '_acme-challenge.le.wtf' == self.msg.validation_domain_name('le.wtf')
assert '_acme-challenge.le.wtf' == self.msg.validation_domain_name(
'le.wtf')

def test_validation_domain_name_ecdsa(self):
ec_key_secp384r1 = JWKEC(key=test_util.load_ecdsa_private_key('ec_secp384r1_key.pem'))
ec_key_secp384r1 = JWKEC(
key=test_util.load_ecdsa_private_key('ec_secp384r1_key.pem'))
assert self.msg.check_validation(
self.msg.gen_validation(ec_key_secp384r1, alg=jose.ES384),
ec_key_secp384r1.public_key()) is True
Expand Down Expand Up @@ -489,6 +573,7 @@ def test_check_validation(self):

class JWSPayloadRFC8555Compliant(unittest.TestCase):
"""Test for RFC8555 compliance of JWS generated from resources/challenges"""

def test_challenge_payload(self):
from acme.challenges import HTTP01Response

Expand Down
72 changes: 69 additions & 3 deletions acme/acme/challenges.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
"""ACME Identifier Validation Challenges."""
import abc
from base64 import b32encode
import codecs
import functools
import hashlib
Expand Down Expand Up @@ -251,7 +252,8 @@ def simple_verify(self, chall: 'DNS01', domain: str, account_public_key: jose.JW
"""
verified = self.verify(chall, account_public_key)
if not verified:
logger.debug("Verification of key authorization in response failed")
logger.debug(
"Verification of key authorization in response failed")
return verified


Expand Down Expand Up @@ -284,6 +286,68 @@ def validation_domain_name(self, name: str) -> str:
return f"{self.LABEL}.{name}"


@ChallengeResponse.register
class DNSACCOUNT01Response(KeyAuthorizationChallengeResponse):
"""ACME dns-account-01 challenge response."""
typ = "dns-account-01"

def simple_verify(self,
chall: 'DNSACCOUNT01',
domain: str, # pylint: disable=unused-argument
account_public_key: jose.JWK
) -> bool:
"""Simple verify.

This method no longer checks DNS records and is a simple wrapper
around `KeyAuthorizationChallengeResponse.verify`.

:param challenges.DNSACCOUNT01 chall: Corresponding challenge.
:param str domain: Domain name being verified.
:param JWK account_public_key: Public key for the key pair
being authorized.

:return: ``True`` iff verification of the key authorization was
successful.
:rtype: bool

"""
verified = self.verify(chall, account_public_key)
if not verified:
logger.debug(
"Verification of key authorization in response failed")
return verified


@Challenge.register
class DNSACCOUNT01(KeyAuthorizationChallenge):
"""ACME dns-account-01 challenge."""
response_cls = DNSACCOUNT01Response
typ = response_cls.typ

def validation(self, account_key: jose.JWK, **unused_kwargs: Any) -> str:
"""Generate validation.

:param JWK account_key:
:rtype: str

"""
return jose.b64encode(hashlib.sha256(self.key_authorization(
account_key).encode("utf-8")).digest()).decode()

def validation_domain_name(self, acctURI: str, scope: str, name: str) -> str:
"""Domain name for TXT validation record.

:param str acctURI: Account Resource URI.
:param str scope: Scope of the challenge.
:param str name: Domain name being validated.
:rtype: str

"""
acctLabel = b32encode(hashlib.sha256(
acctURI.encode()).digest()[:10]).decode().lower()
return f"_{acctLabel}._acme-{scope}-challenge.{name}"


@ChallengeResponse.register
class HTTP01Response(KeyAuthorizationChallengeResponse):
"""ACME http-01 challenge response."""
Expand Down Expand Up @@ -317,7 +381,8 @@ def simple_verify(self, chall: 'HTTP01', domain: str, account_public_key: jose.J

"""
if not self.verify(chall, account_public_key):
logger.debug("Verification of key authorization in response failed")
logger.debug(
"Verification of key authorization in response failed")
return False

# TODO: ACME specification defines URI template that doesn't
Expand Down Expand Up @@ -513,7 +578,8 @@ def simple_verify(self, chall: 'TLSALPN01', domain: str, account_public_key: jos

"""
if not self.verify(chall, account_public_key):
logger.debug("Verification of key authorization in response failed")
logger.debug(
"Verification of key authorization in response failed")
return False

if cert is None:
Expand Down