diff --git a/docs/requirements.txt b/docs/requirements.txt deleted file mode 100644 index 7d01417f..00000000 --- a/docs/requirements.txt +++ /dev/null @@ -1 +0,0 @@ -google-cloud-pubsub==1.7.0 diff --git a/docs/snippets_findings.py b/docs/snippets_findings.py deleted file mode 100644 index dbd9406c..00000000 --- a/docs/snippets_findings.py +++ /dev/null @@ -1,602 +0,0 @@ -#!/usr/bin/env python -# -# Copyright 2019 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -"""Examples of working with source and findings in Cloud Security Command Center.""" - -from itertools import chain -import os -import pytest - - -@pytest.fixture(scope="module") -def organization_id(): - """Get Organization ID from the environment variable """ - return os.environ["GCLOUD_ORGANIZATION"] - - -@pytest.fixture(scope="module") -def source_name(organization_id): - from google.cloud import securitycenter - - client = securitycenter.SecurityCenterClient() - org_name = "organizations/{org_id}".format(org_id=organization_id) - - source = client.create_source( - org_name, - { - "display_name": "Unit test source", - "description": "A new custom source that does X", - }, - ) - return source.name - - -def test_create_source(organization_id): - """Create a new findings source. """ - # [START create_source] - from google.cloud import securitycenter - - client = securitycenter.SecurityCenterClient() - # organization_id is the numeric ID of the organization. e.g.: - # organization_id = "111122222444" - org_name = "organizations/{org_id}".format(org_id=organization_id) - - created = client.create_source( - org_name, - { - "display_name": "Customized Display Name", - "description": "A new custom source that does X", - }, - ) - print("Created Source: {}".format(created.name)) - # [END create_source] - - -def test_get_source(source_name): - """Gets an existing source.""" - # [START get_source] - from google.cloud import securitycenter - - client = securitycenter.SecurityCenterClient() - - # source_name is the resource path for a source that has been - # created previously (you can use list_sources to find a specific one). - # Its format is: - # source_name = "organizations/{organization_id}/sources/{source_id}" - # e.g.: - # source_name = "organizations/111122222444/sources/1234" - source = client.get_source(source_name) - - print("Source: {}".format(source)) - # [END get_source] - - -def test_update_source(source_name): - """Updates a source's display name.""" - # [START update_source] - from google.cloud import securitycenter - from google.protobuf import field_mask_pb2 - - client = securitycenter.SecurityCenterClient() - - # Field mask to only update the display name. - field_mask = field_mask_pb2.FieldMask(paths=["display_name"]) - - # source_name is the resource path for a source that has been - # created previously (you can use list_sources to find a specific one). - # Its format is: - # source_name = "organizations/{organization_id}/sources/{source_id}" - # e.g.: - # source_name = "organizations/111122222444/sources/1234" - updated = client.update_source( - {"name": source_name, "display_name": "Updated Display Name"}, - update_mask=field_mask, - ) - print("Updated Source: {}".format(updated)) - # [END update_source] - assert updated.display_name == "Updated Display Name" - - -def test_add_user_to_source(source_name): - """Gives a user findingsEditor permission to the source.""" - user_email = "csccclienttest@gmail.com" - # [START update_source_iam] - from google.cloud import securitycenter - from google.iam.v1 import policy_pb2 - - client = securitycenter.SecurityCenterClient() - - # source_name is the resource path for a source that has been - # created previously (you can use list_sources to find a specific one). - # Its format is: - # source_name = "organizations/{organization_id}/sources/{source_id}" - # e.g.: - # source_name = "organizations/111122222444/sources/1234" - # Get the old policy so we can do an incremental update. - old_policy = client.get_iam_policy(source_name) - print("Old Policy: {}".format(old_policy)) - - # Setup a new IAM binding. - binding = policy_pb2.Binding() - binding.role = "roles/securitycenter.findingsEditor" - # user_email is an e-mail address known to Cloud IAM (e.g. a gmail address). - # user_mail = user@somedomain.com - binding.members.append("user:{}".format(user_email)) - - # Setting the e-tag avoids over-write existing policy - updated = client.set_iam_policy( - source_name, {"etag": old_policy.etag, "bindings": [binding]} - ) - - print("Updated Policy: {}".format(updated)) - - # [END update_source_iam] - assert any( - member == "user:csccclienttest@gmail.com" - for member in chain.from_iterable( - binding.members for binding in updated.bindings - ) - ) - - -def test_list_source(organization_id): - """Lists finding sources.""" - i = -1 - # [START list_sources] - from google.cloud import securitycenter - - # Create a new client. - client = securitycenter.SecurityCenterClient() - # organization_id is the numeric ID of the organization. e.g.: - # organization_id = "111122222444" - org_name = "organizations/{org_id}".format(org_id=organization_id) - - # Call the API and print out each existing source. - for i, source in enumerate(client.list_sources(org_name)): - print(i, source) - # [END list_sources] - assert i >= 0 - - -def test_create_finding(source_name): - """Creates a new finding.""" - # [START create_finding] - from google.cloud import securitycenter - from google.cloud.securitycenter_v1.proto.finding_pb2 import Finding - from google.protobuf.timestamp_pb2 import Timestamp - - # Create a new client. - client = securitycenter.SecurityCenterClient() - - # Use the current time as the finding "event time". - now_proto = Timestamp() - now_proto.GetCurrentTime() - - # source_name is the resource path for a source that has been - # created previously (you can use list_sources to find a specific one). - # Its format is: - # source_name = "organizations/{organization_id}/sources/{source_id}" - # e.g.: - # source_name = "organizations/111122222444/sources/1234" - - # Controlled by caller. - finding_id = "samplefindingid" - - # The resource this finding applies to. The CSCC UI can link - # the findings for a resource to the corresponding Asset of a resource - # if there are matches. - resource_name = "//cloudresourcemanager.googleapis.com/organizations/11232" - - # Call The API. - created_finding = client.create_finding( - source_name, - finding_id, - { - "state": Finding.ACTIVE, - "resource_name": resource_name, - "category": "MEDIUM_RISK_ONE", - "event_time": now_proto, - }, - ) - print(created_finding) - # [END create_finding] - assert len(created_finding.name) > 0 - - -def test_create_finding_with_source_properties(source_name): - """Demonstrate creating a new finding with source properties. """ - # [START create_finding_with_properties] - from google.cloud import securitycenter - from google.cloud.securitycenter_v1.proto.finding_pb2 import Finding - from google.protobuf.timestamp_pb2 import Timestamp - from google.protobuf.struct_pb2 import Value - - # Create a new client. - client = securitycenter.SecurityCenterClient() - - # source_name is the resource path for a source that has been - # created previously (you can use list_sources to find a specific one). - # Its format is: - # source_name = "organizations/{organization_id}/sources/{source_id}" - # e.g.: - # source_name = "organizations/111122222444/sources/1234" - - # Controlled by caller. - finding_id = "samplefindingid2" - - # The resource this finding applies to. The CSCC UI can link - # the findings for a resource to the corresponding Asset of a resource - # if there are matches. - resource_name = "//cloudresourcemanager.googleapis.com/organizations/11232" - - # Define source properties values as protobuf "Value" objects. - str_value = Value() - str_value.string_value = "string_example" - num_value = Value() - num_value.number_value = 1234 - - # Use the current time as the finding "event time". - now_proto = Timestamp() - now_proto.GetCurrentTime() - - created_finding = client.create_finding( - source_name, - finding_id, - { - "state": Finding.ACTIVE, - "resource_name": resource_name, - "category": "MEDIUM_RISK_ONE", - "source_properties": {"s_value": str_value, "n_value": num_value}, - "event_time": now_proto, - }, - ) - print(created_finding) - # [END create_finding_with_properties] - - -def test_update_finding(source_name): - # [START update_finding] - from google.cloud import securitycenter - from google.protobuf.struct_pb2 import Value - from google.protobuf import field_mask_pb2 - from google.protobuf.timestamp_pb2 import Timestamp - - client = securitycenter.SecurityCenterClient() - # Only update the specific source property and event_time. event_time - # is required for updates. - field_mask = field_mask_pb2.FieldMask( - paths=["source_properties.s_value", "event_time"] - ) - value = Value() - value.string_value = "new_string" - - # Set the update time to Now. This must be some time greater then the - # event_time on the original finding. - now_proto = Timestamp() - now_proto.GetCurrentTime() - - # source_name is the resource path for a source that has been - # created previously (you can use list_sources to find a specific one). - # Its format is: - # source_name = "organizations/{organization_id}/sources/{source_id}" - # e.g.: - # source_name = "organizations/111122222444/sources/1234" - finding_name = "{}/findings/samplefindingid2".format(source_name) - updated_finding = client.update_finding( - { - "name": finding_name, - "source_properties": {"s_value": value}, - "event_time": now_proto, - }, - update_mask=field_mask, - ) - - print( - "New Source properties: {}, Event Time {}".format( - updated_finding.source_properties, updated_finding.event_time.ToDatetime() - ) - ) - # [END update_finding] - - -def test_update_finding_state(source_name): - """Demonstrate updating only a finding state.""" - # [START update_finding_state] - from google.cloud import securitycenter - from google.cloud.securitycenter_v1.proto.finding_pb2 import Finding - from google.protobuf.timestamp_pb2 import Timestamp - from datetime import datetime - - # Create a client. - client = securitycenter.SecurityCenterClient() - # source_name is the resource path for a source that has been - # created previously (you can use list_sources to find a specific one). - # Its format is: - # source_name = "organizations/{organization_id}/sources/{source_id}" - # e.g.: - # source_name = "organizations/111122222444/sources/1234" - finding_name = "{}/findings/samplefindingid2".format(source_name) - - now_proto = Timestamp() - now_proto.GetCurrentTime() - - # Call the API to change the finding state to inactive as of now. - new_finding = client.set_finding_state( - finding_name, Finding.INACTIVE, start_time=now_proto - ) - print("New state: {}".format(Finding.State.Name(new_finding.state))) - # [END update_finding_state] - - -def test_trouble_shoot(source_name): - """Demonstrate calling test_iam_permissions to determine if the - service account has the correct permisions.""" - # [START test_iam_permissions] - from google.cloud import securitycenter - - # Create a client. - client = securitycenter.SecurityCenterClient() - # source_name is the resource path for a source that has been - # created previously (you can use list_sources to find a specific one). - # Its format is: - # source_name = "organizations/{organization_id}/sources/{source_id}" - # e.g.: - # source_name = "organizations/111122222444/sources/1234" - - # Check for permssions to call create_finding or update_finding. - permission_response = client.test_iam_permissions( - source_name, ["securitycenter.findings.update"] - ) - - print( - "Permision to create or update findings? {}".format( - len(permission_response.permissions) > 0 - ) - ) - # [END test_iam_permissions] - assert len(permission_response.permissions) > 0 - # [START test_iam_permissions] - # Check for permissions necessary to call set_finding_state. - permission_response = client.test_iam_permissions( - source_name, ["securitycenter.findings.setState"] - ) - print( - "Permision to update state? {}".format(len(permission_response.permissions) > 0) - ) - # [END test_iam_permissions] - assert len(permission_response.permissions) > 0 - - -def test_list_all_findings(organization_id): - # [START list_all_findings] - from google.cloud import securitycenter - - # Create a client. - client = securitycenter.SecurityCenterClient() - - # organization_id is the numeric ID of the organization. e.g.: - # organization_id = "111122222444" - org_name = "organizations/{org_id}".format(org_id=organization_id) - # The "sources/-" suffix lists findings across all sources. You - # also use a specific source_name instead. - all_sources = "{org_name}/sources/-".format(org_name=org_name) - finding_result_iterator = client.list_findings(all_sources) - for i, finding_result in enumerate(finding_result_iterator): - print( - "{}: name: {} resource: {}".format( - i, finding_result.finding.name, finding_result.finding.resource_name - ) - ) - # [END list_all_findings] - assert i > 0 - - -def test_list_filtered_findings(source_name): - # [START list_filtered_findings] - from google.cloud import securitycenter - - # Create a new client. - client = securitycenter.SecurityCenterClient() - - # source_name is the resource path for a source that has been - # created previously (you can use list_sources to find a specific one). - # Its format is: - # source_name = "organizations/{organization_id}/sources/{source_id}" - # e.g.: - # source_name = "organizations/111122222444/sources/1234" - # You an also use a wild-card "-" for all sources: - # source_name = "organizations/111122222444/sources/-" - finding_result_iterator = client.list_findings( - source_name, filter_='category="MEDIUM_RISK_ONE"' - ) - # Iterate an print all finding names and the resource they are - # in reference to. - for i, finding_result in enumerate(finding_result_iterator): - print( - "{}: name: {} resource: {}".format( - i, finding_result.finding.name, finding_result.finding.resource_name - ) - ) - # [END list_filtered_findings] - assert i > 0 - - -def test_list_findings_at_time(source_name): - # [START list_findings_at_a_time] - from google.cloud import securitycenter - from google.protobuf.timestamp_pb2 import Timestamp - from datetime import timedelta, datetime - - # Create a new client. - client = securitycenter.SecurityCenterClient() - - # source_name is the resource path for a source that has been - # created previously (you can use list_sources to find a specific one). - # Its format is: - # source_name = "organizations/{organization_id}/sources/{source_id}" - # e.g.: - # source_name = "organizations/111122222444/sources/1234" - # You an also use a wild-card "-" for all sources: - # source_name = "organizations/111122222444/sources/-" - five_days_ago = Timestamp() - five_days_ago.FromDatetime(datetime.now() - timedelta(days=5)) - # [END list_findings_at_a_time] - i = -1 - five_days_ago.FromDatetime(datetime(2019, 3, 5, 0, 0, 0)) - # [START list_findings_at_a_time] - - finding_result_iterator = client.list_findings(source_name, read_time=five_days_ago) - for i, finding_result in enumerate(finding_result_iterator): - print( - "{}: name: {} resource: {}".format( - i, finding_result.finding.name, finding_result.finding.resource_name - ) - ) - # [END list_findings_at_a_time] - assert i == -1 - - -def test_get_iam_policy(source_name): - """Gives a user findingsEditor permission to the source.""" - user_email = "csccclienttest@gmail.com" - # [START get_source_iam] - from google.cloud import securitycenter - from google.iam.v1 import policy_pb2 - - client = securitycenter.SecurityCenterClient() - - # source_name is the resource path for a source that has been - # created previously (you can use list_sources to find a specific one). - # Its format is: - # source_name = "organizations/{organization_id}/sources/{source_id}" - # e.g.: - # source_name = "organizations/111122222444/sources/1234" - # Get the old policy so we can do an incremental update. - policy = client.get_iam_policy(source_name) - print("Policy: {}".format(policy)) - # [END get_source_iam] - - -def test_group_all_findings(organization_id): - """Demonstrates grouping all findings across an organization.""" - # [START group_all_findings] - from google.cloud import securitycenter - - # Create a client. - client = securitycenter.SecurityCenterClient() - - # organization_id is the numeric ID of the organization. e.g.: - # organization_id = "111122222444" - org_name = "organizations/{org_id}".format(org_id=organization_id) - # The "sources/-" suffix lists findings across all sources. You - # also use a specific source_name instead. - all_sources = "{org_name}/sources/-".format(org_name=org_name) - group_result_iterator = client.group_findings(all_sources, group_by="category") - for i, group_result in enumerate(group_result_iterator): - print((i + 1), group_result) - # [END group_all_findings] - assert i > 0 - - -def test_group_filtered_findings(source_name): - """Demonstrates grouping all findings across an organization.""" - # [START group_filtered_findings] - from google.cloud import securitycenter - - # Create a client. - client = securitycenter.SecurityCenterClient() - - # source_name is the resource path for a source that has been - # created previously (you can use list_sources to find a specific one). - # Its format is: - # source_name = "organizations/{organization_id}/sources/{source_id}" - # e.g.: - # source_name = "organizations/111122222444/sources/1234" - - group_result_iterator = client.group_findings( - source_name, group_by="category", filter_='state="ACTIVE"' - ) - for i, group_result in enumerate(group_result_iterator): - print((i + 1), group_result) - # [END group_filtered_findings] - assert i == 0 - - -def test_group_findings_at_time(source_name): - """Demonstrates grouping all findings across an organization as of - a specific time.""" - i = -1 - # [START group_findings_at_time] - from datetime import datetime, timedelta - from google.cloud import securitycenter - from google.protobuf.timestamp_pb2 import Timestamp - - # Create a client. - client = securitycenter.SecurityCenterClient() - - # source_name is the resource path for a source that has been - # created previously (you can use list_sources to find a specific one). - # Its format is: - # source_name = "organizations/{organization_id}/sources/{source_id}" - # e.g.: - # source_name = "organizations/111122222444/sources/1234" - - # Group findings as of yesterday. - read_time = datetime.utcnow() - timedelta(days=1) - timestamp_proto = Timestamp() - timestamp_proto.FromDatetime(read_time) - - group_result_iterator = client.group_findings( - source_name, group_by="category", read_time=timestamp_proto - ) - for i, group_result in enumerate(group_result_iterator): - print((i + 1), group_result) - # [END group_filtered_findings_at_time] - assert i == -1 - - -def test_group_findings_and_changes(source_name): - """Demonstrates grouping all findings across an organization and - associated changes.""" - # [START group_filtered_findings_with_changes] - from datetime import timedelta - - from google.cloud import securitycenter - from google.protobuf.duration_pb2 import Duration - - # Create a client. - client = securitycenter.SecurityCenterClient() - - # source_name is the resource path for a source that has been - # created previously (you can use list_sources to find a specific one). - # Its format is: - # source_name = "organizations/{organization_id}/sources/{source_id}" - # e.g.: - # source_name = "organizations/111122222444/sources/1234" - - # List assets and their state change the last 30 days - compare_delta = timedelta(days=30) - # Convert the timedelta to a Duration - duration_proto = Duration() - duration_proto.FromTimedelta(compare_delta) - - group_result_iterator = client.group_findings( - source_name, group_by="state_change", compare_duration=duration_proto - ) - for i, group_result in enumerate(group_result_iterator): - print((i + 1), group_result) - # [END group_findings_with_changes] - assert i == 0 diff --git a/docs/snippets_list_assets.py b/docs/snippets_list_assets.py deleted file mode 100644 index ae50ff91..00000000 --- a/docs/snippets_list_assets.py +++ /dev/null @@ -1,209 +0,0 @@ -#!/usr/bin/env python -# -# Copyright 2019 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" Examples of listing assets in Cloud Security Command Center.""" -import os -from datetime import datetime, timedelta -import pytest - - -@pytest.fixture(scope="module") -def organization_id(): - """Get Organization ID from the environment variable """ - return os.environ["GCLOUD_ORGANIZATION"] - - -def test_list_all_assets(organization_id): - """Demonstrate listing and printing all assets.""" - # [START demo_list_all_assets] - from google.cloud import securitycenter - - client = securitycenter.SecurityCenterClient() - # organization_id is the numeric ID of the organization. - # organization_id = "1234567777" - org_name = "organizations/{org_id}".format(org_id=organization_id) - - # Call the API and print results. - asset_iterator = client.list_assets(org_name) - for i, asset_result in enumerate(asset_iterator): - print(i, asset_result) - # [END demo_list_all_assets] - assert i > 0 - - -def test_list_assets_with_filters(organization_id): - """Demonstrate listing assets with a filter.""" - # [START demo_list_assets_with_filter] - from google.cloud import securitycenter - - client = securitycenter.SecurityCenterClient() - - # organization_id is the numeric ID of the organization. - # organization_id = "1234567777" - org_name = "organizations/{org_id}".format(org_id=organization_id) - - project_filter = ( - "security_center_properties.resource_type=" - + '"google.cloud.resourcemanager.Project"' - ) - # Call the API and print results. - asset_iterator = client.list_assets(org_name, filter_=project_filter) - for i, asset_result in enumerate(asset_iterator): - print(i, asset_result) - # [END demo_list_assets_with_filter] - assert i > 0 - - -def test_list_assets_with_filters_and_read_time(organization_id): - """Demonstrate listing assets with a filter.""" - # [START demo_list_assets_with_filter_and_time] - from datetime import datetime, timedelta - - from google.protobuf.timestamp_pb2 import Timestamp - - from google.cloud import securitycenter - - client = securitycenter.SecurityCenterClient() - - # organization_id is the numeric ID of the organization. - # organization_id = "1234567777" - org_name = "organizations/{org_id}".format(org_id=organization_id) - - project_filter = ( - "security_center_properties.resource_type=" - + '"google.cloud.resourcemanager.Project"' - ) - - # Lists assets as of yesterday. - read_time = datetime.utcnow() - timedelta(days=1) - timestamp_proto = Timestamp() - timestamp_proto.FromDatetime(read_time) - - # Call the API and print results. - asset_iterator = client.list_assets( - org_name, filter_=project_filter, read_time=timestamp_proto - ) - for i, asset_result in enumerate(asset_iterator): - print(i, asset_result) - # [END demo_list_assets_with_filter_and_time] - assert i > 0 - - -def test_list_point_in_time_changes(organization_id): - """Demonstrate listing assets along with their state changes.""" - # [START demo_list_assets_changes] - from datetime import timedelta - - from google.protobuf.duration_pb2 import Duration - from google.cloud import securitycenter - - client = securitycenter.SecurityCenterClient() - - # organization_id is the numeric ID of the organization. - # organization_id = "1234567777" - org_name = "organizations/{org_id}".format(org_id=organization_id) - project_filter = ( - "security_center_properties.resource_type=" - + '"google.cloud.resourcemanager.Project"' - ) - - # List assets and their state change the last 30 days - compare_delta = timedelta(days=30) - # Convert the timedelta to a Duration - duration_proto = Duration() - duration_proto.FromTimedelta(compare_delta) - # Call the API and print results. - asset_iterator = client.list_assets( - org_name, filter_=project_filter, compare_duration=duration_proto - ) - for i, asset in enumerate(asset_iterator): - print(i, asset) - - # [END demo_list_assets_changes] - assert i > 0 - - -def test_group_assets(organization_id): - """Demonstrates grouping all assets by type. """ - # [START group_all_assets] - from google.cloud import securitycenter - - client = securitycenter.SecurityCenterClient() - - # organization_id is the numeric ID of the organization. - # organization_id = "1234567777" - org_name = "organizations/{org_id}".format(org_id=organization_id) - - group_by_type = "security_center_properties.resource_type" - - result_iterator = client.group_assets(org_name, group_by=group_by_type) - for i, result in enumerate(result_iterator): - print((i + 1), result) - # [END group_all_assets] - # 8 different asset types. - assert i >= 8 - - -def test_group_filtered_assets(organization_id): - """Demonstrates grouping assets by type with a filter. """ - # [START group_all_assets] - from google.cloud import securitycenter - - client = securitycenter.SecurityCenterClient() - - # organization_id is the numeric ID of the organization. - # organization_id = "1234567777" - org_name = "organizations/{org_id}".format(org_id=organization_id) - - group_by_type = "security_center_properties.resource_type" - only_projects = ( - "security_center_properties.resource_type=" - + '"google.cloud.resourcemanager.Project"' - ) - result_iterator = client.group_assets( - org_name, group_by=group_by_type, filter_=only_projects - ) - for i, result in enumerate(result_iterator): - print((i + 1), result) - # [END group_all_assets] - # only one asset type is a project - assert i == 0 - - -def test_group_assets_by_changes(organization_id): - """Demonstrates grouping assets by there changes over a period of time.""" - # [START group_all_assets_by_change] - from datetime import timedelta - - from google.cloud import securitycenter - from google.protobuf.duration_pb2 import Duration - - client = securitycenter.SecurityCenterClient() - - duration_proto = Duration() - duration_proto.FromTimedelta(timedelta(days=5)) - - # organization_id is the numeric ID of the organization. - # organization_id = "1234567777" - org_name = "organizations/{org_id}".format(org_id=organization_id) - result_iterator = client.group_assets( - org_name, group_by="state_change", compare_duration=duration_proto - ) - for i, result in enumerate(result_iterator): - print((i + 1), result) - # [END group_all_assets_by_change] - # only one asset type is a project - assert i >= 0 diff --git a/docs/snippets_notification_configs.py b/docs/snippets_notification_configs.py deleted file mode 100644 index 42d1f70f..00000000 --- a/docs/snippets_notification_configs.py +++ /dev/null @@ -1,138 +0,0 @@ -#!/usr/bin/env python -# -# Copyright 2020 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Demos for working with notification configs.""" - - -def create_notification_config(organization_id, notification_config_id, pubsub_topic): - - # [START scc_create_notification_config] - from google.cloud import securitycenter as securitycenter - - client = securitycenter.SecurityCenterClient() - - # TODO: organization_id = "your-org-id" - # TODO: notification_config_id = "your-config-id" - # TODO: pubsub_topic = "projects/{your-project-id}/topics/{your-topic-ic}" - # Ensure this ServiceAccount has the "pubsub.topics.setIamPolicy" permission on the new topic. - - org_name = "organizations/{org_id}".format(org_id=organization_id) - - created_notification_config = client.create_notification_config( - org_name, - notification_config_id, - { - "description": "Notification for active findings", - "pubsub_topic": pubsub_topic, - "streaming_config": {"filter": 'state = "ACTIVE"',}, - }, - ) - - print(created_notification_config) - # [END scc_create_notification_config] - return created_notification_config - - -def delete_notification_config(organization_id, notification_config_id): - - # [START scc_delete_notification_config] - from google.cloud import securitycenter as securitycenter - - client = securitycenter.SecurityCenterClient() - - # TODO: organization_id = "your-org-id" - # TODO: notification_config_id = "your-config-id" - - notification_config_name = "organizations/{org_id}/notificationConfigs/{config_id}".format( - org_id=organization_id, config_id=notification_config_id - ) - - client.delete_notification_config(notification_config_name) - print("Deleted notification config: {}".format(notification_config_name)) - # [END scc_delete_notification_config] - return True - - -def get_notification_config(organization_id, notification_config_id): - - # [START scc_get_notification_config] - from google.cloud import securitycenter as securitycenter - - client = securitycenter.SecurityCenterClient() - - # TODO: organization_id = "your-org-id" - # TODO: notification_config_id = "your-config-id" - - notification_config_name = "organizations/{org_id}/notificationConfigs/{config_id}".format( - org_id=organization_id, config_id=notification_config_id - ) - - notification_config = client.get_notification_config(notification_config_name) - print("Got notification config: {}".format(notification_config)) - # [END scc_get_notification_config] - return notification_config - - -def list_notification_configs(organization_id): - - # [START scc_list_notification_configs] - from google.cloud import securitycenter as securitycenter - - client = securitycenter.SecurityCenterClient() - - # TODO: organization_id = "your-org-id" - org_name = "organizations/{org_id}".format(org_id=organization_id) - - notification_configs_iterator = client.list_notification_configs(org_name) - for i, config in enumerate(notification_configs_iterator): - print("{}: notification_config: {}".format(i, config)) - # [END scc_list_notification_configs] - return notification_configs_iterator - - -def update_notification_config(organization_id, notification_config_id, pubsub_topic): - # [START scc_update_notification_config] - from google.cloud import securitycenter as securitycenter - from google.protobuf import field_mask_pb2 - - client = securitycenter.SecurityCenterClient() - - # TODO organization_id = "your-org-id" - # TODO notification_config_id = "config-id-to-update" - # TODO pubsub_topic = "projects/{new-project}/topics/{new-topic}" - # If updating a pubsub_topic, ensure this ServiceAccount has the - # "pubsub.topics.setIamPolicy" permission on the new topic. - - notification_config_name = "organizations/{org_id}/notificationConfigs/{config_id}".format( - org_id=organization_id, config_id=notification_config_id - ) - - updated_description = "New updated description" - - # Only description and pubsub_topic can be updated. - field_mask = field_mask_pb2.FieldMask(paths=["description", "pubsub_topic"]) - - updated_notification_config = client.update_notification_config( - { - "name": notification_config_name, - "description": updated_description, - "pubsub_topic": pubsub_topic, - }, - update_mask=field_mask, - ) - - print(updated_notification_config) - # [END scc_update_notification_config] - return updated_notification_config diff --git a/docs/snippets_notification_receiver.py b/docs/snippets_notification_receiver.py deleted file mode 100644 index fd2fdcae..00000000 --- a/docs/snippets_notification_receiver.py +++ /dev/null @@ -1,58 +0,0 @@ -#!/usr/bin/env python -# -# Copyright 2020 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Demo for receiving notifications.""" - - -def receive_notifications(project_id, subscription_name): - # [START scc_receive_notifications] - # Requires https://cloud.google.com/pubsub/docs/quickstart-client-libraries#pubsub-client-libraries-python - from google.cloud import pubsub_v1 - from google.cloud.securitycenter_v1.proto.notification_message_pb2 import ( - NotificationMessage, - ) - from google.protobuf import json_format - - # TODO: project_id = "your-project-id" - # TODO: subscription_name = "your-subscription-name" - - def callback(message): - print("Received message") - - notification_msg = NotificationMessage() - json_format.Parse(message.data, notification_msg) - - print( - "Notification config name: {}".format( - notification_msg.notification_config_name - ) - ) - print("Finding: {}".format(notification_msg.finding)) - - # Ack the message to prevent it from being pulled again - message.ack() - - subscriber = pubsub_v1.SubscriberClient() - subscription_path = subscriber.subscription_path(project_id, subscription_name) - - streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback) - - print("Listening for messages on {}...\n".format(subscription_path)) - try: - streaming_pull_future.result(timeout=1) # Block for 1 second - except: - streaming_pull_future.cancel() - # [END scc_receive_notifications] - return True diff --git a/docs/snippets_notification_test.py b/docs/snippets_notification_test.py deleted file mode 100644 index d24d0906..00000000 --- a/docs/snippets_notification_test.py +++ /dev/null @@ -1,146 +0,0 @@ -#!/usr/bin/env python -# -# Copyright 2020 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Tests for snippets.""" - -import os -import uuid - -from google.cloud import securitycenter as securitycenter -import pytest - -import snippets_notification_configs -import snippets_notification_receiver - -ORG_ID = os.environ["GCLOUD_ORGANIZATION"] -PROJECT_ID = os.environ["GCLOUD_PROJECT"] -PUBSUB_TOPIC = os.environ["GCLOUD_PUBSUB_TOPIC"] -PUBSUB_SUBSCRIPTION = os.environ["GCLOUD_PUBSUB_SUBSCRIPTION"] - -CREATE_CONFIG_ID = "new-notification-pytest" + str(uuid.uuid1()) -DELETE_CONFIG_ID = "new-notification-pytest" + str(uuid.uuid1()) -GET_CONFIG_ID = "new-notification-pytest" + str(uuid.uuid1()) -UPDATE_CONFIG_ID = "new-notification-pytest" + str(uuid.uuid1()) - - -def cleanup_notification_config(notification_config_id): - client = securitycenter.SecurityCenterClient() - - notification_config_name = "organizations/{org_id}/notificationConfigs/{config_id}".format( - org_id=ORG_ID, config_id=notification_config_id - ) - client.delete_notification_config(notification_config_name) - - -@pytest.fixture -def new_notification_config_for_update(): - client = securitycenter.SecurityCenterClient() - - org_name = "organizations/{org_id}".format(org_id=ORG_ID) - - created_notification_config = client.create_notification_config( - org_name, - UPDATE_CONFIG_ID, - { - "description": "Notification for active findings", - "pubsub_topic": PUBSUB_TOPIC, - "streaming_config": {"filter": "",}, - }, - ) - yield created_notification_config - cleanup_notification_config(UPDATE_CONFIG_ID) - - -@pytest.fixture -def new_notification_config_for_get(): - client = securitycenter.SecurityCenterClient() - - org_name = "organizations/{org_id}".format(org_id=ORG_ID) - - created_notification_config = client.create_notification_config( - org_name, - GET_CONFIG_ID, - { - "description": "Notification for active findings", - "pubsub_topic": PUBSUB_TOPIC, - "streaming_config": {"filter": "",}, - }, - ) - yield created_notification_config - cleanup_notification_config(GET_CONFIG_ID) - - -@pytest.fixture -def deleted_notification_config(): - client = securitycenter.SecurityCenterClient() - - org_name = "organizations/{org_id}".format(org_id=ORG_ID) - - created_notification_config = client.create_notification_config( - org_name, - DELETE_CONFIG_ID, - { - "description": "Notification for active findings", - "pubsub_topic": PUBSUB_TOPIC, - "streaming_config": {"filter": "",}, - }, - ) - return created_notification_config - - -def test_create_notification_config(): - created_notification_config = snippets_notification_configs.create_notification_config( - ORG_ID, CREATE_CONFIG_ID, PUBSUB_TOPIC - ) - assert created_notification_config is not None - - cleanup_notification_config(CREATE_CONFIG_ID) - - -def test_delete_notification_config(deleted_notification_config): - assert ( - snippets_notification_configs.delete_notification_config( - ORG_ID, DELETE_CONFIG_ID - ) - == True - ) - - -def test_get_notification_config(new_notification_config_for_get): - retrieved_config = snippets_notification_configs.get_notification_config( - ORG_ID, GET_CONFIG_ID - ) - assert retrieved_config is not None - - -def test_list_notification_configs(): - iterator = snippets_notification_configs.list_notification_configs(ORG_ID) - assert iterator is not None - - -def test_update_notification_config(new_notification_config_for_update): - updated_config = snippets_notification_configs.update_notification_config( - ORG_ID, UPDATE_CONFIG_ID, PUBSUB_TOPIC - ) - assert updated_config is not None - - -def test_receive_notifications(): - assert ( - snippets_notification_receiver.receive_notifications( - PROJECT_ID, PUBSUB_SUBSCRIPTION - ) - == True - ) diff --git a/docs/snippets_orgs.py b/docs/snippets_orgs.py deleted file mode 100644 index 7781da11..00000000 --- a/docs/snippets_orgs.py +++ /dev/null @@ -1,66 +0,0 @@ -#!/usr/bin/env python -# -# Copyright 2019 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Examples for working with organization settings. """ -import os -import pytest - - -@pytest.fixture(scope="module") -def organization_id(): - """Get Organization ID from the environment variable """ - return os.environ["GCLOUD_ORGANIZATION"] - - -def test_get_settings(organization_id): - """Example showing how to retreive current organization settings.""" - # [START get_org_settings] - from google.cloud import securitycenter - - client = securitycenter.SecurityCenterClient() - # organization_id is numeric ID for the organization. e.g. - # organization_id = "111112223333" - - org_settings_name = client.organization_settings_path(organization_id) - - org_settings = client.get_organization_settings(org_settings_name) - print(org_settings) - # [END get_org_settings] - - -def test_update_asset_discovery_org_settings(organization_id): - """Example showing how to update the asset discovery configuration - for an organization.""" - # [START update_org_settings] - from google.cloud import securitycenter - from google.protobuf import field_mask_pb2 - - # Create the client - client = securitycenter.SecurityCenterClient() - # organization_id is numeric ID for the organization. e.g. - # organization_id = "111112223333" - org_settings_name = "organizations/{org_id}/organizationSettings".format( - org_id=organization_id - ) - # Only update the enable_asset_discovery_value (leave others untouched). - field_mask = field_mask_pb2.FieldMask(paths=["enable_asset_discovery"]) - # Call the service. - updated = client.update_organization_settings( - {"name": org_settings_name, "enable_asset_discovery": True}, - update_mask=field_mask, - ) - print("Asset Discovery Enabled? {}".format(updated.enable_asset_discovery)) - # [END update_org_settings] - assert updated.enable_asset_discovery == True diff --git a/docs/snippets_security_marks.py b/docs/snippets_security_marks.py deleted file mode 100644 index ef2f5d73..00000000 --- a/docs/snippets_security_marks.py +++ /dev/null @@ -1,272 +0,0 @@ -#!/usr/bin/env python -# -# Copyright 2019 Google LLC -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# https://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -"""Demos for working with security marks.""" -import os -import random - -import pytest - - -@pytest.fixture(scope="module") -def organization_id(): - """Gets Organization ID from the environment variable """ - return os.environ["GCLOUD_ORGANIZATION"] - - -@pytest.fixture(scope="module") -def asset_name(organization_id): - """Returns a random asset name from existing assets.""" - from google.cloud import securitycenter - - client = securitycenter.SecurityCenterClient() - # organization_id is the numeric ID of the organization. - # organization_id=1234567777 - org_name = "organizations/{org_id}".format(org_id=organization_id) - assets = list(client.list_assets(org_name)) - # Select a random asset to avoid collision between integration tests. - asset = (random.sample(assets, 1)[0]).asset.name - - # Set fresh marks. - update = client.update_security_marks( - {"name": "{}/securityMarks".format(asset), "marks": {"other": "other_val"}} - ) - assert update.marks == {"other": "other_val"} - return asset - - -@pytest.fixture(scope="module") -def source_name(organization_id): - """Creates a new source in the organization.""" - from google.cloud import securitycenter - - client = securitycenter.SecurityCenterClient() - org_name = "organizations/{org_id}".format(org_id=organization_id) - source = client.create_source( - org_name, - { - "display_name": "Security marks Unit test source", - "description": "A new custom source that does X", - }, - ) - return source.name - - -@pytest.fixture(scope="module") -def finding_name(source_name): - """Creates a new finding and returns it name.""" - from google.cloud import securitycenter - from google.cloud.securitycenter_v1.proto.finding_pb2 import Finding - from google.protobuf.timestamp_pb2 import Timestamp - - client = securitycenter.SecurityCenterClient() - - now_proto = Timestamp() - now_proto.GetCurrentTime() - - finding = client.create_finding( - source_name, - "scfinding", - { - "state": Finding.ACTIVE, - "category": "C1", - "event_time": now_proto, - "resource_name": "//cloudresourcemanager.googleapis.com/organizations/1234", - }, - ) - client.create_finding( - source_name, - "untouched", - { - "state": Finding.ACTIVE, - "category": "MEDIUM_RISK_ONE", - "event_time": now_proto, - "resource_name": "//cloudresourcemanager.googleapis.com/organizations/1234", - }, - ) - - return finding.name - - -def test_add_to_asset(asset_name): - """Add new security marks to an asset.""" - # [START add_marks_to_asset] - from google.cloud import securitycenter - from google.protobuf import field_mask_pb2 - - # Create a new client. - client = securitycenter.SecurityCenterClient() - - # asset_name is the resource path for an asset that exists in CSCC. - # Its format is "organization/{organization_id}/assets/{asset_id} - # e.g.: - # asset_name = organizations/123123342/assets/12312321 - marks_name = "{}/securityMarks".format(asset_name) - - # Notice the suffix after "marks." in the field mask matches the keys - # in marks. - field_mask = field_mask_pb2.FieldMask(paths=["marks.key_a", "marks.key_b"]) - marks = {"key_a": "value_a", "key_b": "value_b"} - - updated_marks = client.update_security_marks( - {"name": marks_name, "marks": marks}, - # If this field was left empty, all marks would be cleared before adding - # the new values. - update_mask=field_mask, - ) - print(updated_marks) - # [END add_marks_to_asset] - assert updated_marks.marks.keys() >= marks.keys() - - -def test_clear_from_asset(asset_name): - """Removes security marks from an asset.""" - # Make sure they are there first - test_add_to_asset(asset_name) - # [START clear_marks_asset] - from google.cloud import securitycenter - from google.protobuf import field_mask_pb2 - - # Create a new client. - client = securitycenter.SecurityCenterClient() - - # asset_name is the resource path for an asset that exists in CSCC. - # Its format is "organization/{organization_id}/assets/{asset_id} - # e.g.: - # asset_name = organizations/123123342/assets/12312321 - marks_name = "{}/securityMarks".format(asset_name) - - field_mask = field_mask_pb2.FieldMask(paths=["marks.key_a", "marks.key_b"]) - - updated_marks = client.update_security_marks( - { - "name": marks_name - # Note, no marks specified, so the specified values in - # the fields masks will be deleted. - }, - # If this field was left empty, all marks would be cleared. - update_mask=field_mask, - ) - print(updated_marks) - # [END clear_marks_asset] - assert "other" in updated_marks.marks - assert len(updated_marks.marks) == 1 - - -def test_delete_and_update_marks(asset_name): - """Updates and deletes security marks from an asset in the same call.""" - # Make sure they are there first - test_add_to_asset(asset_name) - # [START delete_and_update_marks] - from google.cloud import securitycenter - from google.protobuf import field_mask_pb2 - - client = securitycenter.SecurityCenterClient() - # asset_name is the resource path for an asset that exists in CSCC. - # Its format is "organization/{organization_id}/assets/{asset_id} - # e.g.: - # asset_name = organizations/123123342/assets/12312321 - marks_name = "{}/securityMarks".format(asset_name) - - field_mask = field_mask_pb2.FieldMask(paths=["marks.key_a", "marks.key_b"]) - marks = {"key_a": "new_value_for_a"} - - updated_marks = client.update_security_marks( - {"name": marks_name, "marks": marks}, update_mask=field_mask - ) - print(updated_marks) - # [END delete_and_update_marks] - assert updated_marks.marks == {"key_a": "new_value_for_a", "other": "other_val"} - - -def test_add_to_finding(finding_name): - """Adds security marks to a finding. """ - # [START add_marks_to_finding] - from google.cloud import securitycenter - from google.protobuf import field_mask_pb2 - - client = securitycenter.SecurityCenterClient() - # finding_name is the resource path for a finding that exists in CSCC. - # Its format is - # "organizations/{org_id}/sources/{source_id}/findings/{finding_id}" - # e.g.: - # finding_name = "organizations/1112/sources/1234/findings/findingid" - finding_marks_name = "{}/securityMarks".format(finding_name) - - # Notice the suffix after "marks." in the field mask matches the keys - # in marks. - field_mask = field_mask_pb2.FieldMask( - paths=["marks.finding_key_a", "marks.finding_key_b"] - ) - marks = {"finding_key_a": "value_a", "finding_key_b": "value_b"} - - updated_marks = client.update_security_marks( - {"name": finding_marks_name, "marks": marks}, update_mask=field_mask - ) - # [END add_marks_to_finding] - - assert updated_marks.marks == marks - - -def test_list_assets_with_query_marks(organization_id, asset_name): - """Lists assets with a filter on security marks. """ - test_add_to_asset(asset_name) - # [START demo_list_assets_with_security_marks] - from google.cloud import securitycenter - - client = securitycenter.SecurityCenterClient() - - # organization_id is the numeric ID of the organization. - # organization_id=1234567777 - org_name = "organizations/{org_id}".format(org_id=organization_id) - - marks_filter = 'security_marks.marks.key_a = "value_a"' - # Call the API and print results. - asset_iterator = client.list_assets(org_name, filter_=marks_filter) - - # Call the API and print results. - asset_iterator = client.list_assets(org_name, filter_=marks_filter) - for i, asset_result in enumerate(asset_iterator): - print(i, asset_result) - # [END demo_list_assets_with_security_marks] - assert i >= 0 - - -def test_list_findings_with_query_marks(source_name, finding_name): - """Lists findings with a filter on security marks.""" - # ensure marks are set on finding. - test_add_to_finding(finding_name) - i = -1 - # [START demo_list_findings_with_security_marks] - from google.cloud import securitycenter - - client = securitycenter.SecurityCenterClient() - - # source_name is the resource path for a source that has been - # created previously (you can use list_sources to find a specific one). - # Its format is: - # source_name = "organizations/{organization_id}/sources/{source_id}" - # e.g.: - # source_name = "organizations/111122222444/sources/1234" - marks_filter = 'NOT security_marks.marks.finding_key_a="value_a"' - - # Call the API and print results. - finding_iterator = client.list_findings(source_name, filter_=marks_filter) - for i, finding_result in enumerate(finding_iterator): - print(i, finding_result) - # [END demo_list_findings_with_security_marks] - # one finding should have been updated with keys, and one should be - # untouched. - assert i == 0