Skip to content
This repository has been archived by the owner on Dec 31, 2023. It is now read-only.

Commit

Permalink
feat(kms): add samples for new hmac and rng apis (#161)
Browse files Browse the repository at this point in the history
  • Loading branch information
sethvargo committed Aug 12, 2021
1 parent b8520cb commit 558b740
Show file tree
Hide file tree
Showing 8 changed files with 288 additions and 3 deletions.
8 changes: 7 additions & 1 deletion samples/snippets/create_key_asymmetric_decrypt.py
Expand Up @@ -30,6 +30,8 @@ def create_key_asymmetric_decrypt(project_id, location_id, key_ring_id, id):

# Import the client library.
from google.cloud import kms
from google.protobuf import duration_pb2
import datetime

# Create the client.
client = kms.KeyManagementServiceClient()
Expand All @@ -44,7 +46,11 @@ def create_key_asymmetric_decrypt(project_id, location_id, key_ring_id, id):
'purpose': purpose,
'version_template': {
'algorithm': algorithm,
}
},

# Optional: customize how long key versions should be kept before
# destroying.
'destroy_scheduled_duration': duration_pb2.Duration().FromTimedelta(datetime.timedelta(days=1))
}

# Call the API.
Expand Down
8 changes: 7 additions & 1 deletion samples/snippets/create_key_asymmetric_sign.py
Expand Up @@ -30,6 +30,8 @@ def create_key_asymmetric_sign(project_id, location_id, key_ring_id, id):

# Import the client library.
from google.cloud import kms
from google.protobuf import duration_pb2
import datetime

# Create the client.
client = kms.KeyManagementServiceClient()
Expand All @@ -44,7 +46,11 @@ def create_key_asymmetric_sign(project_id, location_id, key_ring_id, id):
'purpose': purpose,
'version_template': {
'algorithm': algorithm,
}
},

# Optional: customize how long key versions should be kept before
# destroying.
'destroy_scheduled_duration': duration_pb2.Duration().FromTimedelta(datetime.timedelta(days=1))
}

# Call the API.
Expand Down
8 changes: 7 additions & 1 deletion samples/snippets/create_key_hsm.py
Expand Up @@ -30,6 +30,8 @@ def create_key_hsm(project_id, location_id, key_ring_id, id):

# Import the client library.
from google.cloud import kms
from google.protobuf import duration_pb2
import datetime

# Create the client.
client = kms.KeyManagementServiceClient()
Expand All @@ -46,7 +48,11 @@ def create_key_hsm(project_id, location_id, key_ring_id, id):
'version_template': {
'algorithm': algorithm,
'protection_level': protection_level
}
},

# Optional: customize how long key versions should be kept before
# destroying.
'destroy_scheduled_duration': duration_pb2.Duration().FromTimedelta(datetime.timedelta(days=1))
}

# Call the API.
Expand Down
60 changes: 60 additions & 0 deletions samples/snippets/create_key_mac.py
@@ -0,0 +1,60 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and


# [START kms_create_key_mac]
def create_key_mac(project_id, location_id, key_ring_id, id):
"""
Creates a new key in Cloud KMS for HMAC operations.
Args:
project_id (string): Google Cloud project ID (e.g. 'my-project').
location_id (string): Cloud KMS location (e.g. 'us-east1').
key_ring_id (string): ID of the Cloud KMS key ring (e.g. 'my-key-ring').
id (string): ID of the key to create (e.g. 'my-mac-key').
Returns:
CryptoKey: Cloud KMS key.
"""

# Import the client library.
from google.cloud import kms
from google.protobuf import duration_pb2
import datetime

# Create the client.
client = kms.KeyManagementServiceClient()

# Build the parent key ring name.
key_ring_name = client.key_ring_path(project_id, location_id, key_ring_id)

# Build the key.
purpose = kms.CryptoKey.CryptoKeyPurpose.MAC
algorithm = kms.CryptoKeyVersion.CryptoKeyVersionAlgorithm.HMAC_SHA256
key = {
'purpose': purpose,
'version_template': {
'algorithm': algorithm,
},

# Optional: customize how long key versions should be kept before
# destroying.
'destroy_scheduled_duration': duration_pb2.Duration().FromTimedelta(datetime.timedelta(days=1))
}

# Call the API.
created_key = client.create_crypto_key(request={'parent': key_ring_name, 'crypto_key_id': id, 'crypto_key': key})
print('Created mac key: {}'.format(created_key.name))
return created_key
# [END kms_create_key_mac]
49 changes: 49 additions & 0 deletions samples/snippets/generate_random_bytes.py
@@ -0,0 +1,49 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and


# [START kms_generate_random_bytes]
def generate_random_bytes(project_id, location_id, num_bytes):
"""
Generate random bytes with entropy sourced from the given location.
Args:
project_id (string): Google Cloud project ID (e.g. 'my-project').
location_id (string): Cloud KMS location (e.g. 'us-east1').
num_bytes (integer): number of bytes of random data.
Returns:
bytes: Encrypted ciphertext.
"""

# Import the client library.
from google.cloud import kms

# Import base64 for encoding the bytes for printing.
import base64

# Create the client.
client = kms.KeyManagementServiceClient()

# Build the location name.
location_name = client.common_location_path(project_id, location_id)

# Call the API.
protection_level = kms.ProtectionLevel.HSM
random_bytes_response = client.generate_random_bytes(
request={'location': location_name, 'length_bytes': num_bytes, 'protection_level': protection_level})

print('Random bytes: {}'.format(base64.b64encode(random_bytes_response.data)))
return random_bytes_response
# [END kms_generate_random_bytes]
53 changes: 53 additions & 0 deletions samples/snippets/sign_mac.py
@@ -0,0 +1,53 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and


# [START kms_sign_mac]
def sign_mac(project_id, location_id, key_ring_id, key_id, version_id, data):
"""
Sign a message using the public key part of an asymmetric key.
Args:
project_id (string): Google Cloud project ID (e.g. 'my-project').
location_id (string): Cloud KMS location (e.g. 'us-east1').
key_ring_id (string): ID of the Cloud KMS key ring (e.g. 'my-key-ring').
key_id (string): ID of the key to use (e.g. 'my-key').
version_id (string): Version to use (e.g. '1').
data (string): Data to sign.
Returns:
MacSignResponse: Signature.
"""

# Import the client library.
from google.cloud import kms

# Import base64 for printing the ciphertext.
import base64

# Create the client.
client = kms.KeyManagementServiceClient()

# Build the key version name.
key_version_name = client.crypto_key_version_path(project_id, location_id, key_ring_id, key_id, version_id)

# Convert the message to bytes.
data_bytes = data.encode('utf-8')

# Call the API
sign_response = client.mac_sign(
request={'name': key_version_name, 'data': data_bytes})

print('Signature: {}'.format(base64.b64encode(sign_response.mac)))
return sign_response
# [END kms_sign_mac]
54 changes: 54 additions & 0 deletions samples/snippets/snippets_test.py
Expand Up @@ -32,6 +32,7 @@
from create_key_for_import import create_key_for_import
from create_key_hsm import create_key_hsm
from create_key_labels import create_key_labels
from create_key_mac import create_key_mac
from create_key_ring import create_key_ring
from create_key_rotation_schedule import create_key_rotation_schedule
from create_key_symmetric_encrypt_decrypt import create_key_symmetric_encrypt_decrypt
Expand All @@ -43,6 +44,7 @@
from enable_key_version import enable_key_version
from encrypt_asymmetric import encrypt_asymmetric
from encrypt_symmetric import encrypt_symmetric
from generate_random_bytes import generate_random_bytes
from get_key_labels import get_key_labels
from get_key_version_attestation import get_key_version_attestation
from get_public_key import get_public_key
Expand All @@ -53,13 +55,15 @@
from quickstart import quickstart
from restore_key_version import restore_key_version
from sign_asymmetric import sign_asymmetric
from sign_mac import sign_mac
from update_key_add_rotation import update_key_add_rotation
from update_key_remove_labels import update_key_remove_labels
from update_key_remove_rotation import update_key_remove_rotation
from update_key_set_primary import update_key_set_primary
from update_key_update_labels import update_key_update_labels
from verify_asymmetric_ec import verify_asymmetric_ec
from verify_asymmetric_rsa import verify_asymmetric_rsa
from verify_mac import verify_mac


@pytest.fixture(scope="module")
Expand Down Expand Up @@ -167,6 +171,22 @@ def hsm_key_id(client, project_id, location_id, key_ring_id):
return key_id


@pytest.fixture(scope="module")
def hmac_key_id(client, project_id, location_id, key_ring_id):
key_ring_name = client.key_ring_path(project_id, location_id, key_ring_id)
key_id = '{}'.format(uuid.uuid4())
key = client.create_crypto_key(request={'parent': key_ring_name, 'crypto_key_id': key_id, 'crypto_key': {
'purpose': kms.CryptoKey.CryptoKeyPurpose.MAC,
'version_template': {
'algorithm': kms.CryptoKeyVersion.CryptoKeyVersionAlgorithm.HMAC_SHA256,
'protection_level': kms.ProtectionLevel.HSM
},
'labels': {'foo': 'bar', 'zip': 'zap'}
}})
wait_for_ready(client, '{}/cryptoKeyVersions/1'.format(key.name))
return key_id


@pytest.fixture(scope="module")
def symmetric_key_id(client, project_id, location_id, key_ring_id):
key_ring_name = client.key_ring_path(project_id, location_id, key_ring_id)
Expand Down Expand Up @@ -245,6 +265,13 @@ def test_create_key_labels(project_id, location_id, key_ring_id):
assert key.labels == {'team': 'alpha', 'cost_center': 'cc1234'}


def test_create_key_mac(project_id, location_id, key_ring_id):
key_id = '{}'.format(uuid.uuid4())
key = create_key_mac(project_id, location_id, key_ring_id, key_id)
assert key.purpose == kms.CryptoKey.CryptoKeyPurpose.MAC
assert key.version_template.algorithm == kms.CryptoKeyVersion.CryptoKeyVersionAlgorithm.HMAC_SHA256


def test_create_key_ring(project_id, location_id):
key_ring_id = '{}'.format(uuid.uuid4())
key_ring = create_key_ring(project_id, location_id, key_ring_id)
Expand Down Expand Up @@ -345,6 +372,11 @@ def test_encrypt_symmetric(client, project_id, location_id, key_ring_id, symmetr
assert decrypt_response.plaintext == plaintext.encode('utf-8')


def test_generate_random_bytes(client, project_id, location_id):
generate_random_bytes_response = generate_random_bytes(project_id, location_id, 256)
assert len(generate_random_bytes_response.data) == 256


def test_get_key_labels(project_id, location_id, key_ring_id, symmetric_key_id):
key = get_key_labels(project_id, location_id, key_ring_id, symmetric_key_id)
assert key.labels == {'foo': 'bar', 'zip': 'zap'}
Expand Down Expand Up @@ -412,6 +444,18 @@ def test_sign_asymmetric(client, project_id, location_id, key_ring_id, asymmetri
pytest.fail('invalid signature')


def test_sign_mac(client, project_id, location_id, key_ring_id, hmac_key_id):
data = 'my data'

sign_response = sign_mac(project_id, location_id, key_ring_id, hmac_key_id, '1', data)
assert sign_response.mac

key_version_name = client.crypto_key_version_path(project_id, location_id, key_ring_id, hmac_key_id, '1')
verify_response = client.mac_verify(request={'name': key_version_name, 'data': data.encode('utf-8'), 'mac': sign_response.mac})

assert verify_response.success


def test_update_key_add_rotation(project_id, location_id, key_ring_id, symmetric_key_id):
key = update_key_add_rotation(project_id, location_id, key_ring_id, symmetric_key_id)
assert key.rotation_period == datetime.timedelta(seconds=60*60*24*30)
Expand Down Expand Up @@ -461,6 +505,16 @@ def test_verify_asymmetric_rsa(client, project_id, location_id, key_ring_id, asy
assert verified


def test_verify_mac(client, project_id, location_id, key_ring_id, hmac_key_id):
data = 'my data'

key_version_name = client.crypto_key_version_path(project_id, location_id, key_ring_id, hmac_key_id, '1')
sign_response = client.mac_sign(request={'name': key_version_name, 'data': data.encode('utf-8')})

verify_response = verify_mac(project_id, location_id, key_ring_id, hmac_key_id, '1', data, sign_response.mac)
assert verify_response.success


def test_quickstart(project_id, location_id):
key_rings = quickstart(project_id, location_id)
assert key_rings
51 changes: 51 additions & 0 deletions samples/snippets/verify_mac.py
@@ -0,0 +1,51 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and


# [START kms_verify_mac]
def verify_mac(project_id, location_id, key_ring_id, key_id, version_id, data, signature):
"""
Verify the signature of data from an HMAC key.
Args:
project_id (string): Google Cloud project ID (e.g. 'my-project').
location_id (string): Cloud KMS location (e.g. 'us-east1').
key_ring_id (string): ID of the Cloud KMS key ring (e.g. 'my-key-ring').
key_id (string): ID of the key to use (e.g. 'my-key').
version_id (string): Version to use (e.g. '1').
data (string): Data that was signed.
signature (bytes): Signature bytes.
Returns:
MacVerifyResponse: Success.
"""

# Import the client library.
from google.cloud import kms

# Create the client.
client = kms.KeyManagementServiceClient()

# Build the key version name.
key_version_name = client.crypto_key_version_path(project_id, location_id, key_ring_id, key_id, version_id)

# Convert the message to bytes.
data_bytes = data.encode('utf-8')

# Call the API
verify_response = client.mac_verify(
request={'name': key_version_name, 'data': data_bytes, 'mac': signature})

print('Verified: {}'.format(verify_response.success))
return verify_response
# [END kms_verify_mac]

0 comments on commit 558b740

Please sign in to comment.