diff --git a/tests/system.py b/tests/system.py index 37a39766a..2eccad8ed 100644 --- a/tests/system.py +++ b/tests/system.py @@ -564,7 +564,7 @@ def test_streaming_pull_max_messages( @pytest.mark.skipif( "KOKORO_GFILE_DIR" not in os.environ, - reason="Requires Kokoro environment with a limited subscriber service account.", + reason="Requires Kokoro environment with a service account with limited role.", ) def test_streaming_pull_subscriber_permissions_sufficient( self, publisher, topic_path, subscriber, subscription_path, cleanup @@ -601,6 +601,136 @@ def test_streaming_pull_subscriber_permissions_sufficient( finally: future.cancel() + @pytest.mark.skipif( + "KOKORO_GFILE_DIR" not in os.environ, + reason="Requires Kokoro environment with a service account with limited role.", + ) + def test_publisher_role_can_publish_messages( + self, publisher, topic_path, subscriber, subscription_path, cleanup + ): + + # Make sure the topic and subscription get deleted. + cleanup.append((publisher.delete_topic, topic_path)) + cleanup.append((subscriber.delete_subscription, subscription_path)) + + # Create a topic and subscribe to it. + publisher.create_topic(topic_path) + subscriber.create_subscription(subscription_path, topic_path) + + # Create a publisher client with only the publisher role only. + filename = os.path.join( + os.environ["KOKORO_GFILE_DIR"], "pubsub-publisher-service-account.json" + ) + publisher_only_client = type(publisher).from_service_account_file(filename) + + self._publish_messages(publisher_only_client, topic_path, batch_sizes=[2]) + + response = subscriber.pull(subscription_path, max_messages=2) + assert len(response.received_messages) == 2 + + @pytest.mark.skip( + "Snapshot creation is not instant on the backend, causing test falkiness." + ) + @pytest.mark.skipif( + "KOKORO_GFILE_DIR" not in os.environ, + reason="Requires Kokoro environment with a service account with limited role.", + ) + def test_snapshot_seek_subscriber_permissions_sufficient( + self, project, publisher, topic_path, subscriber, subscription_path, cleanup + ): + snapshot_name = "snap" + unique_resource_id("-") + snapshot_path = "projects/{}/snapshots/{}".format(project, snapshot_name) + + # Make sure the topic and subscription get deleted. + cleanup.append((publisher.delete_topic, topic_path)) + cleanup.append((subscriber.delete_subscription, subscription_path)) + cleanup.append((subscriber.delete_snapshot, snapshot_path)) + + # Create a topic and subscribe to it. + publisher.create_topic(topic_path) + subscriber.create_subscription( + subscription_path, topic_path, retain_acked_messages=True + ) + + # A service account granting only the pubsub.subscriber role must be used. + filename = os.path.join( + os.environ["KOKORO_GFILE_DIR"], "pubsub-subscriber-service-account.json" + ) + subscriber_only_client = type(subscriber).from_service_account_file(filename) + + # Publish two messages and create a snapshot inbetween. + self._publish_messages(publisher, topic_path, batch_sizes=[1]) + response = subscriber.pull(subscription_path, max_messages=10) + assert len(response.received_messages) == 1 + + subscriber.create_snapshot(snapshot_path, subscription_path) + + self._publish_messages(publisher, topic_path, batch_sizes=[1]) + response = subscriber.pull(subscription_path, max_messages=10) + assert len(response.received_messages) == 1 + + # A subscriber-only client should be allowed to seek to a snapshot. + subscriber_only_client.seek(subscription_path, snapshot=snapshot_path) + + # We should receive one message again, since we sought back to a snapshot. + response = subscriber.pull(subscription_path, max_messages=10) + assert len(response.received_messages) == 1 + + @pytest.mark.skipif( + "KOKORO_GFILE_DIR" not in os.environ, + reason="Requires Kokoro environment with a service account with limited role.", + ) + def test_viewer_role_can_list_resources( + self, project, publisher, topic_path, subscriber, cleanup + ): + project_path = "projects/" + project + + # Make sure the created topic gets deleted. + cleanup.append((publisher.delete_topic, topic_path)) + + publisher.create_topic(topic_path) + + # A service account granting only the pubsub.viewer role must be used. + filename = os.path.join( + os.environ["KOKORO_GFILE_DIR"], "pubsub-viewer-service-account.json" + ) + viewer_only_subscriber = type(subscriber).from_service_account_file(filename) + viewer_only_publisher = type(publisher).from_service_account_file(filename) + + # The following operations should not raise permission denied errors. + # NOTE: At least one topic exists. + topic = next(iter(viewer_only_publisher.list_topics(project_path))) + next(iter(viewer_only_publisher.list_topic_subscriptions(topic.name)), None) + next(iter(viewer_only_subscriber.list_subscriptions(project_path)), None) + next(iter(viewer_only_subscriber.list_snapshots(project_path)), None) + + @pytest.mark.skipif( + "KOKORO_GFILE_DIR" not in os.environ, + reason="Requires Kokoro environment with a service account with limited role.", + ) + def test_editor_role_can_create_resources( + self, project, publisher, topic_path, subscriber, subscription_path, cleanup + ): + snapshot_name = "snap" + unique_resource_id("-") + snapshot_path = "projects/{}/snapshots/{}".format(project, snapshot_name) + + # Make sure the created resources get deleted. + cleanup.append((subscriber.delete_snapshot, snapshot_path)) + cleanup.append((subscriber.delete_subscription, subscription_path)) + cleanup.append((publisher.delete_topic, topic_path)) + + # A service account granting only the pubsub.editor role must be used. + filename = os.path.join( + os.environ["KOKORO_GFILE_DIR"], "pubsub-editor-service-account.json" + ) + editor_subscriber = type(subscriber).from_service_account_file(filename) + editor_publisher = type(publisher).from_service_account_file(filename) + + # The following operations should not raise permission denied errors. + editor_publisher.create_topic(topic_path) + editor_subscriber.create_subscription(subscription_path, topic_path) + editor_subscriber.create_snapshot(snapshot_path, subscription_path) + def _publish_messages(self, publisher, topic_path, batch_sizes): """Publish ``count`` messages in batches and wait until completion.""" publish_futures = []