Skip to content
This repository has been archived by the owner on Dec 31, 2023. It is now read-only.

feat(kms): add samples for new hmac and rng apis #161

Merged
merged 1 commit into from Aug 12, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
8 changes: 7 additions & 1 deletion samples/snippets/create_key_asymmetric_decrypt.py
Expand Up @@ -30,6 +30,8 @@ def create_key_asymmetric_decrypt(project_id, location_id, key_ring_id, id):

# Import the client library.
from google.cloud import kms
from google.protobuf import duration_pb2
import datetime

# Create the client.
client = kms.KeyManagementServiceClient()
Expand All @@ -44,7 +46,11 @@ def create_key_asymmetric_decrypt(project_id, location_id, key_ring_id, id):
'purpose': purpose,
'version_template': {
'algorithm': algorithm,
}
},

# Optional: customize how long key versions should be kept before
# destroying.
'destroy_scheduled_duration': duration_pb2.Duration().FromTimedelta(datetime.timedelta(days=1))
}

# Call the API.
Expand Down
8 changes: 7 additions & 1 deletion samples/snippets/create_key_asymmetric_sign.py
Expand Up @@ -30,6 +30,8 @@ def create_key_asymmetric_sign(project_id, location_id, key_ring_id, id):

# Import the client library.
from google.cloud import kms
from google.protobuf import duration_pb2
import datetime

# Create the client.
client = kms.KeyManagementServiceClient()
Expand All @@ -44,7 +46,11 @@ def create_key_asymmetric_sign(project_id, location_id, key_ring_id, id):
'purpose': purpose,
'version_template': {
'algorithm': algorithm,
}
},

# Optional: customize how long key versions should be kept before
# destroying.
'destroy_scheduled_duration': duration_pb2.Duration().FromTimedelta(datetime.timedelta(days=1))
}

# Call the API.
Expand Down
8 changes: 7 additions & 1 deletion samples/snippets/create_key_hsm.py
Expand Up @@ -30,6 +30,8 @@ def create_key_hsm(project_id, location_id, key_ring_id, id):

# Import the client library.
from google.cloud import kms
from google.protobuf import duration_pb2
import datetime

# Create the client.
client = kms.KeyManagementServiceClient()
Expand All @@ -46,7 +48,11 @@ def create_key_hsm(project_id, location_id, key_ring_id, id):
'version_template': {
'algorithm': algorithm,
'protection_level': protection_level
}
},

# Optional: customize how long key versions should be kept before
# destroying.
'destroy_scheduled_duration': duration_pb2.Duration().FromTimedelta(datetime.timedelta(days=1))
}

# Call the API.
Expand Down
60 changes: 60 additions & 0 deletions samples/snippets/create_key_mac.py
@@ -0,0 +1,60 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and


# [START kms_create_key_mac]
def create_key_mac(project_id, location_id, key_ring_id, id):
"""
Creates a new key in Cloud KMS for HMAC operations.

Args:
project_id (string): Google Cloud project ID (e.g. 'my-project').
location_id (string): Cloud KMS location (e.g. 'us-east1').
key_ring_id (string): ID of the Cloud KMS key ring (e.g. 'my-key-ring').
id (string): ID of the key to create (e.g. 'my-mac-key').

Returns:
CryptoKey: Cloud KMS key.

"""

# Import the client library.
from google.cloud import kms
from google.protobuf import duration_pb2
import datetime

# Create the client.
client = kms.KeyManagementServiceClient()

# Build the parent key ring name.
key_ring_name = client.key_ring_path(project_id, location_id, key_ring_id)

# Build the key.
purpose = kms.CryptoKey.CryptoKeyPurpose.MAC
algorithm = kms.CryptoKeyVersion.CryptoKeyVersionAlgorithm.HMAC_SHA256
key = {
'purpose': purpose,
'version_template': {
'algorithm': algorithm,
},

# Optional: customize how long key versions should be kept before
# destroying.
'destroy_scheduled_duration': duration_pb2.Duration().FromTimedelta(datetime.timedelta(days=1))
}

# Call the API.
created_key = client.create_crypto_key(request={'parent': key_ring_name, 'crypto_key_id': id, 'crypto_key': key})
print('Created mac key: {}'.format(created_key.name))
return created_key
# [END kms_create_key_mac]
49 changes: 49 additions & 0 deletions samples/snippets/generate_random_bytes.py
@@ -0,0 +1,49 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and


# [START kms_generate_random_bytes]
def generate_random_bytes(project_id, location_id, num_bytes):
"""
Generate random bytes with entropy sourced from the given location.

Args:
project_id (string): Google Cloud project ID (e.g. 'my-project').
location_id (string): Cloud KMS location (e.g. 'us-east1').
num_bytes (integer): number of bytes of random data.

Returns:
bytes: Encrypted ciphertext.

"""

# Import the client library.
from google.cloud import kms

# Import base64 for encoding the bytes for printing.
import base64

# Create the client.
client = kms.KeyManagementServiceClient()

# Build the location name.
location_name = client.common_location_path(project_id, location_id)

# Call the API.
protection_level = kms.ProtectionLevel.HSM
random_bytes_response = client.generate_random_bytes(
request={'location': location_name, 'length_bytes': num_bytes, 'protection_level': protection_level})

print('Random bytes: {}'.format(base64.b64encode(random_bytes_response.data)))
return random_bytes_response
# [END kms_generate_random_bytes]
53 changes: 53 additions & 0 deletions samples/snippets/sign_mac.py
@@ -0,0 +1,53 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and


# [START kms_sign_mac]
def sign_mac(project_id, location_id, key_ring_id, key_id, version_id, data):
"""
Sign a message using the public key part of an asymmetric key.

Args:
project_id (string): Google Cloud project ID (e.g. 'my-project').
location_id (string): Cloud KMS location (e.g. 'us-east1').
key_ring_id (string): ID of the Cloud KMS key ring (e.g. 'my-key-ring').
key_id (string): ID of the key to use (e.g. 'my-key').
version_id (string): Version to use (e.g. '1').
data (string): Data to sign.

Returns:
MacSignResponse: Signature.
"""

# Import the client library.
from google.cloud import kms

# Import base64 for printing the ciphertext.
import base64

# Create the client.
client = kms.KeyManagementServiceClient()

# Build the key version name.
key_version_name = client.crypto_key_version_path(project_id, location_id, key_ring_id, key_id, version_id)

# Convert the message to bytes.
data_bytes = data.encode('utf-8')

# Call the API
sign_response = client.mac_sign(
request={'name': key_version_name, 'data': data_bytes})

print('Signature: {}'.format(base64.b64encode(sign_response.mac)))
return sign_response
# [END kms_sign_mac]
54 changes: 54 additions & 0 deletions samples/snippets/snippets_test.py
Expand Up @@ -32,6 +32,7 @@
from create_key_for_import import create_key_for_import
from create_key_hsm import create_key_hsm
from create_key_labels import create_key_labels
from create_key_mac import create_key_mac
from create_key_ring import create_key_ring
from create_key_rotation_schedule import create_key_rotation_schedule
from create_key_symmetric_encrypt_decrypt import create_key_symmetric_encrypt_decrypt
Expand All @@ -43,6 +44,7 @@
from enable_key_version import enable_key_version
from encrypt_asymmetric import encrypt_asymmetric
from encrypt_symmetric import encrypt_symmetric
from generate_random_bytes import generate_random_bytes
from get_key_labels import get_key_labels
from get_key_version_attestation import get_key_version_attestation
from get_public_key import get_public_key
Expand All @@ -53,13 +55,15 @@
from quickstart import quickstart
from restore_key_version import restore_key_version
from sign_asymmetric import sign_asymmetric
from sign_mac import sign_mac
from update_key_add_rotation import update_key_add_rotation
from update_key_remove_labels import update_key_remove_labels
from update_key_remove_rotation import update_key_remove_rotation
from update_key_set_primary import update_key_set_primary
from update_key_update_labels import update_key_update_labels
from verify_asymmetric_ec import verify_asymmetric_ec
from verify_asymmetric_rsa import verify_asymmetric_rsa
from verify_mac import verify_mac


@pytest.fixture(scope="module")
Expand Down Expand Up @@ -167,6 +171,22 @@ def hsm_key_id(client, project_id, location_id, key_ring_id):
return key_id


@pytest.fixture(scope="module")
def hmac_key_id(client, project_id, location_id, key_ring_id):
key_ring_name = client.key_ring_path(project_id, location_id, key_ring_id)
key_id = '{}'.format(uuid.uuid4())
key = client.create_crypto_key(request={'parent': key_ring_name, 'crypto_key_id': key_id, 'crypto_key': {
'purpose': kms.CryptoKey.CryptoKeyPurpose.MAC,
'version_template': {
'algorithm': kms.CryptoKeyVersion.CryptoKeyVersionAlgorithm.HMAC_SHA256,
'protection_level': kms.ProtectionLevel.HSM
},
'labels': {'foo': 'bar', 'zip': 'zap'}
}})
wait_for_ready(client, '{}/cryptoKeyVersions/1'.format(key.name))
return key_id


@pytest.fixture(scope="module")
def symmetric_key_id(client, project_id, location_id, key_ring_id):
key_ring_name = client.key_ring_path(project_id, location_id, key_ring_id)
Expand Down Expand Up @@ -245,6 +265,13 @@ def test_create_key_labels(project_id, location_id, key_ring_id):
assert key.labels == {'team': 'alpha', 'cost_center': 'cc1234'}


def test_create_key_mac(project_id, location_id, key_ring_id):
key_id = '{}'.format(uuid.uuid4())
key = create_key_mac(project_id, location_id, key_ring_id, key_id)
assert key.purpose == kms.CryptoKey.CryptoKeyPurpose.MAC
assert key.version_template.algorithm == kms.CryptoKeyVersion.CryptoKeyVersionAlgorithm.HMAC_SHA256


def test_create_key_ring(project_id, location_id):
key_ring_id = '{}'.format(uuid.uuid4())
key_ring = create_key_ring(project_id, location_id, key_ring_id)
Expand Down Expand Up @@ -345,6 +372,11 @@ def test_encrypt_symmetric(client, project_id, location_id, key_ring_id, symmetr
assert decrypt_response.plaintext == plaintext.encode('utf-8')


def test_generate_random_bytes(client, project_id, location_id):
generate_random_bytes_response = generate_random_bytes(project_id, location_id, 256)
assert len(generate_random_bytes_response.data) == 256


def test_get_key_labels(project_id, location_id, key_ring_id, symmetric_key_id):
key = get_key_labels(project_id, location_id, key_ring_id, symmetric_key_id)
assert key.labels == {'foo': 'bar', 'zip': 'zap'}
Expand Down Expand Up @@ -412,6 +444,18 @@ def test_sign_asymmetric(client, project_id, location_id, key_ring_id, asymmetri
pytest.fail('invalid signature')


def test_sign_mac(client, project_id, location_id, key_ring_id, hmac_key_id):
data = 'my data'

sign_response = sign_mac(project_id, location_id, key_ring_id, hmac_key_id, '1', data)
assert sign_response.mac

key_version_name = client.crypto_key_version_path(project_id, location_id, key_ring_id, hmac_key_id, '1')
verify_response = client.mac_verify(request={'name': key_version_name, 'data': data.encode('utf-8'), 'mac': sign_response.mac})

assert verify_response.success


def test_update_key_add_rotation(project_id, location_id, key_ring_id, symmetric_key_id):
key = update_key_add_rotation(project_id, location_id, key_ring_id, symmetric_key_id)
assert key.rotation_period == datetime.timedelta(seconds=60*60*24*30)
Expand Down Expand Up @@ -461,6 +505,16 @@ def test_verify_asymmetric_rsa(client, project_id, location_id, key_ring_id, asy
assert verified


def test_verify_mac(client, project_id, location_id, key_ring_id, hmac_key_id):
data = 'my data'

key_version_name = client.crypto_key_version_path(project_id, location_id, key_ring_id, hmac_key_id, '1')
sign_response = client.mac_sign(request={'name': key_version_name, 'data': data.encode('utf-8')})

verify_response = verify_mac(project_id, location_id, key_ring_id, hmac_key_id, '1', data, sign_response.mac)
assert verify_response.success


def test_quickstart(project_id, location_id):
key_rings = quickstart(project_id, location_id)
assert key_rings
51 changes: 51 additions & 0 deletions samples/snippets/verify_mac.py
@@ -0,0 +1,51 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and


# [START kms_verify_mac]
def verify_mac(project_id, location_id, key_ring_id, key_id, version_id, data, signature):
"""
Verify the signature of data from an HMAC key.

Args:
project_id (string): Google Cloud project ID (e.g. 'my-project').
location_id (string): Cloud KMS location (e.g. 'us-east1').
key_ring_id (string): ID of the Cloud KMS key ring (e.g. 'my-key-ring').
key_id (string): ID of the key to use (e.g. 'my-key').
version_id (string): Version to use (e.g. '1').
data (string): Data that was signed.
signature (bytes): Signature bytes.

Returns:
MacVerifyResponse: Success.
"""

# Import the client library.
from google.cloud import kms

# Create the client.
client = kms.KeyManagementServiceClient()

# Build the key version name.
key_version_name = client.crypto_key_version_path(project_id, location_id, key_ring_id, key_id, version_id)

# Convert the message to bytes.
data_bytes = data.encode('utf-8')

# Call the API
verify_response = client.mac_verify(
request={'name': key_version_name, 'data': data_bytes, 'mac': signature})

print('Verified: {}'.format(verify_response.success))
return verify_response
# [END kms_verify_mac]