diff --git a/samples/snippets/deid.py b/samples/snippets/deid.py index 89b8168f..c74947e3 100644 --- a/samples/snippets/deid.py +++ b/samples/snippets/deid.py @@ -288,6 +288,92 @@ def deidentify_with_fpe( # [END dlp_deidentify_fpe] +# [START dlp_deidentify_deterministic] +def deidentify_with_deterministic( + project, + input_str, + info_types, + surrogate_type=None, + key_name=None, + wrapped_key=None, +): + """Deidentifies sensitive data in a string using deterministic encryption. + Args: + project: The Google Cloud project id to use as a parent resource. + input_str: The string to deidentify (will be treated as text). + surrogate_type: The name of the surrogate custom info type to use. Only + necessary if you want to reverse the deidentification process. Can + be essentially any arbitrary string, as long as it doesn't appear + in your dataset otherwise. + key_name: The name of the Cloud KMS key used to encrypt ('wrap') the + AES-256 key. Example: + key_name = 'projects/YOUR_GCLOUD_PROJECT/locations/YOUR_LOCATION/ + keyRings/YOUR_KEYRING_NAME/cryptoKeys/YOUR_KEY_NAME' + wrapped_key: The encrypted ('wrapped') AES-256 key to use. This key + should be encrypted using the Cloud KMS key specified by key_name. + Returns: + None; the response from the API is printed to the terminal. + """ + import base64 + + # Import the client library + import google.cloud.dlp + + # Instantiate a client + dlp = google.cloud.dlp_v2.DlpServiceClient() + + # Convert the project id into a full resource id. + parent = f"projects/{project}" + + # The wrapped key is base64-encoded, but the library expects a binary + # string, so decode it here. + wrapped_key = base64.b64decode(wrapped_key) + + # Construct Deterministic encryption configuration dictionary + crypto_replace_deterministic_config = { + "crypto_key": { + "kms_wrapped": {"wrapped_key": wrapped_key, "crypto_key_name": key_name} + }, + } + + # Add surrogate type + if surrogate_type: + crypto_replace_deterministic_config["surrogate_info_type"] = {"name": surrogate_type} + + # Construct inspect configuration dictionary + inspect_config = {"info_types": [{"name": info_type} for info_type in info_types]} + + # Construct deidentify configuration dictionary + deidentify_config = { + "info_type_transformations": { + "transformations": [ + { + "primitive_transformation": { + "crypto_deterministic_config": crypto_replace_deterministic_config + } + } + ] + } + } + + # Convert string to item + item = {"value": input_str} + + # Call the API + response = dlp.deidentify_content( + request={ + "parent": parent, + "deidentify_config": deidentify_config, + "inspect_config": inspect_config, + "item": item, + } + ) + + # Print results + print(response.item.value) + +# [END dlp_deidentify_deterministic] + # [START dlp_reidentify_fpe] def reidentify_with_fpe( @@ -380,6 +466,90 @@ def reidentify_with_fpe( # [END dlp_reidentify_fpe] +# [START dlp_reidentify_deterministic] +def reidentify_with_deterministic( + project, + input_str, + surrogate_type=None, + key_name=None, + wrapped_key=None, +): + """Deidentifies sensitive data in a string using deterministic encryption. + Args: + project: The Google Cloud project id to use as a parent resource. + input_str: The string to deidentify (will be treated as text). + surrogate_type: The name of the surrogate custom info type to used + during the encryption process. + key_name: The name of the Cloud KMS key used to encrypt ('wrap') the + AES-256 key. Example: + keyName = 'projects/YOUR_GCLOUD_PROJECT/locations/YOUR_LOCATION/ + keyRings/YOUR_KEYRING_NAME/cryptoKeys/YOUR_KEY_NAME' + wrapped_key: The encrypted ('wrapped') AES-256 key to use. This key + should be encrypted using the Cloud KMS key specified by key_name. + Returns: + None; the response from the API is printed to the terminal. + """ + import base64 + + # Import the client library + import google.cloud.dlp + + # Instantiate a client + dlp = google.cloud.dlp_v2.DlpServiceClient() + + # Convert the project id into a full resource id. + parent = f"projects/{project}" + + # The wrapped key is base64-encoded, but the library expects a binary + # string, so decode it here. + wrapped_key = base64.b64decode(wrapped_key) + + # Construct reidentify Configuration + reidentify_config = { + "info_type_transformations": { + "transformations": [ + { + "primitive_transformation": { + "crypto_deterministic_config": { + "crypto_key": { + "kms_wrapped": { + "wrapped_key": wrapped_key, + "crypto_key_name": key_name, + } + }, + "surrogate_info_type": {"name": surrogate_type}, + } + } + } + ] + } + } + + inspect_config = { + "custom_info_types": [ + {"info_type": {"name": surrogate_type}, "surrogate_type": {}} + ] + } + + # Convert string to item + item = {"value": input_str} + + # Call the API + response = dlp.reidentify_content( + request={ + "parent": parent, + "reidentify_config": reidentify_config, + "inspect_config": inspect_config, + "item": item, + } + ) + + # Print results + print(response.item.value) + +# [END dlp_reidentify_deterministic] + + # [START dlp_deidentify_free_text_with_fpe_using_surrogate] def deidentify_free_text_with_fpe_using_surrogate( project, diff --git a/samples/snippets/deid_test.py b/samples/snippets/deid_test.py index 1863f754..a2b66efd 100644 --- a/samples/snippets/deid_test.py +++ b/samples/snippets/deid_test.py @@ -123,6 +123,21 @@ def test_deidentify_with_fpe(capsys): assert "372819127" not in out +def test_deidentify_with_deterministic(capsys): + deid.deidentify_with_deterministic( + GCLOUD_PROJECT, + HARMFUL_STRING, + ["US_SOCIAL_SECURITY_NUMBER"], + surrogate_type=SURROGATE_TYPE, + key_name=KEY_NAME, + wrapped_key=WRAPPED_KEY, + ) + + out, _ = capsys.readouterr() + assert "My SSN is" in out + assert "372819127" not in out + + def test_deidentify_with_fpe_uses_surrogate_info_types(capsys): deid.deidentify_with_fpe( GCLOUD_PROJECT, @@ -207,6 +222,22 @@ def test_reidentify_with_fpe(capsys): assert "731997681" not in out +def test_reidentify_with_deterministic(capsys): + labeled_fpe_string = "My SSN is SSN_TOKEN(36):ATeRUd3WWnAHHFtjtl1bv+CT09FZ7hyqNas=" + + deid.reidentify_with_deterministic( + GCLOUD_PROJECT, + labeled_fpe_string, + surrogate_type=SURROGATE_TYPE, + key_name=KEY_NAME, + wrapped_key=WRAPPED_KEY, + ) + + out, _ = capsys.readouterr() + + assert "SSN_TOKEN(" not in out + + def test_deidentify_free_text_with_fpe_using_surrogate(capsys): labeled_fpe_string = "My phone number is 4359916732"