Skip to content

Commit

Permalink
security: ensure pack separator will not be conflicted with serialize…
Browse files Browse the repository at this point in the history
…d fields
  • Loading branch information
shirsa committed May 7, 2024
1 parent 9255236 commit 53fc46f
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 25 deletions.
31 changes: 7 additions & 24 deletions celery/security/serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,37 +55,20 @@ def deserialize(self, data):
def _pack(self, body, content_type, content_encoding, signer, signature,
sep=str_to_bytes('\x00\x01')):
fields = sep.join(
ensure_bytes(s) for s in [signer, signature, content_type,
ensure_bytes(s) for s in [b64encode(signer), b64encode(signature), content_type,
content_encoding, body]
)
return b64encode(fields)

def _unpack(self, payload, sep=str_to_bytes('\x00\x01')):
raw_payload = b64decode(ensure_bytes(payload))
first_sep = raw_payload.find(sep)

signer = raw_payload[:first_sep]
signer_cert = self._cert_store[signer]

# shift 3 bits right to get signature length
# 2048bit rsa key has a signature length of 256
# 4096bit rsa key has a signature length of 512
sig_len = signer_cert.get_pubkey().key_size >> 3
sep_len = len(sep)
signature_start_position = first_sep + sep_len
signature_end_position = signature_start_position + sig_len
signature = raw_payload[
signature_start_position:signature_end_position
]

v = raw_payload[signature_end_position + sep_len:].split(sep)

v = raw_payload.split(sep, maxsplit=4)
return {
'signer': signer,
'signature': signature,
'content_type': bytes_to_str(v[0]),
'content_encoding': bytes_to_str(v[1]),
'body': v[2],
"signer": b64decode(v[0]),
"signature": b64decode(v[1]),
"content_type": bytes_to_str(v[2]),
"content_encoding": bytes_to_str(v[3]),
"body": v[4],
}


Expand Down
2 changes: 1 addition & 1 deletion t/unit/security/test_serialization.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def _get_s(self, key, cert, certs, serializer="json"):
PrivateKey(key), Certificate(cert), store, serializer=serializer
)

@pytest.mark.parametrize("data", [1, "foo", b"foo", {"foo": 1}])
@pytest.mark.parametrize("data", [1, "foo", b"foo", {"foo": 1}, {"foo": "\x00\x01"}])
@pytest.mark.parametrize("serializer", ["json", "pickle"])
def test_serialize(self, data, serializer):
s = self._get_s(KEY1, CERT1, [CERT1], serializer=serializer)
Expand Down

0 comments on commit 53fc46f

Please sign in to comment.