From 24c9c9a98af4147c77aa632ec4a9c3fb6464f30c Mon Sep 17 00:00:00 2001 From: Gabor Cseh <77115915+gcseh@users.noreply.github.com> Date: Fri, 5 Feb 2021 13:52:09 +0100 Subject: [PATCH] fix: add fieldMask for getDevice and listDevices (#64) * Add fieldMask for getDevice and listDevices --- samples/api-client/manager/manager.py | 58 ++- samples/api-client/manager/manager_test.py | 550 ++++++++++++--------- samples/api-client/manager/noxfile.py | 26 +- 3 files changed, 388 insertions(+), 246 deletions(-) diff --git a/samples/api-client/manager/manager.py b/samples/api-client/manager/manager.py index 5d2e4ef5..260d7134 100644 --- a/samples/api-client/manager/manager.py +++ b/samples/api-client/manager/manager.py @@ -279,7 +279,31 @@ def get_device(service_account_json, project_id, cloud_region, registry_id, devi client = iot_v1.DeviceManagerClient() device_path = client.device_path(project_id, cloud_region, registry_id, device_id) - device = client.get_device(request={"name": device_path}) + # See full list of device fields: https://cloud.google.com/iot/docs/reference/cloudiot/rest/v1/projects.locations.registries.devices + # Warning! Use snake_case field names. + field_mask = gp_field_mask.FieldMask( + paths=[ + "id", + "name", + "num_id", + "credentials", + "last_heartbeat_time", + "last_event_time", + "last_state_time", + "last_config_ack_time", + "last_config_send_time", + "blocked", + "last_error_time", + "last_error_status", + "config", + "state", + "log_level", + "metadata", + "gateway_config", + ] + ) + + device = client.get_device(request={"name": device_path, "field_mask": field_mask}) print("Id : {}".format(device.id)) print("Name : {}".format(device.name)) @@ -345,9 +369,35 @@ def list_devices(service_account_json, project_id, cloud_region, registry_id): client = iot_v1.DeviceManagerClient() registry_path = client.registry_path(project_id, cloud_region, registry_id) - devices = list(client.list_devices(request={"parent": registry_path})) + # See full list of device fields: https://cloud.google.com/iot/docs/reference/cloudiot/rest/v1/projects.locations.registries.devices + # Warning! Use snake_case field names. + field_mask = gp_field_mask.FieldMask( + paths=[ + "id", + "name", + "num_id", + "credentials", + "last_heartbeat_time", + "last_event_time", + "last_state_time", + "last_config_ack_time", + "last_config_send_time", + "blocked", + "last_error_time", + "last_error_status", + "config", + "state", + "log_level", + "metadata", + "gateway_config", + ] + ) + + devices = list( + client.list_devices(request={"parent": registry_path, "field_mask": field_mask}) + ) for device in devices: - print("Device: {} : {}".format(device.num_id, device.id)) + print(device) return devices # [END iot_list_devices] @@ -373,7 +423,7 @@ def list_registries(service_account_json, project_id, cloud_region): def create_registry( service_account_json, project_id, cloud_region, pubsub_topic, registry_id ): - """ Creates a registry and returns the result. Returns an empty result if + """Creates a registry and returns the result. Returns an empty result if the registry already exists.""" # [START iot_create_registry] # project_id = 'YOUR_PROJECT_ID' diff --git a/samples/api-client/manager/manager_test.py b/samples/api-client/manager/manager_test.py index 9802677e..e8300fc4 100644 --- a/samples/api-client/manager/manager_test.py +++ b/samples/api-client/manager/manager_test.py @@ -22,38 +22,38 @@ import pytest # Add command receiver for bootstrapping device registry / device for testing -sys.path.append(os.path.join(os.path.dirname(__file__), '..', 'mqtt_example')) # noqa +sys.path.append(os.path.join(os.path.dirname(__file__), "..", "mqtt_example")) # noqa import cloudiot_mqtt_example # noqa import manager # noqa -cloud_region = 'us-central1' -device_id_template = 'test-device-{}' -ca_cert_path = '../mqtt_example/resources/roots.pem' -es_cert_path = 'resources/ec_public.pem' -rsa_cert_path = 'resources/rsa_cert.pem' -rsa_private_path = 'resources/rsa_private.pem' # Must match rsa_cert -topic_id = 'test-device-events-{}'.format(uuid.uuid4()) +cloud_region = "us-central1" +device_id_template = "test-device-{}" +ca_cert_path = "../mqtt_example/resources/roots.pem" +es_cert_path = "resources/ec_public.pem" +rsa_cert_path = "resources/rsa_cert.pem" +rsa_private_path = "resources/rsa_private.pem" # Must match rsa_cert +topic_id = "test-device-events-{}".format(uuid.uuid4()) -project_id = os.environ['GOOGLE_CLOUD_PROJECT'] -service_account_json = os.environ['GOOGLE_APPLICATION_CREDENTIALS'] +project_id = os.environ["GOOGLE_CLOUD_PROJECT"] +service_account_json = os.environ["GOOGLE_APPLICATION_CREDENTIALS"] -pubsub_topic = 'projects/{}/topics/{}'.format(project_id, topic_id) +pubsub_topic = "projects/{}/topics/{}".format(project_id, topic_id) # This format is used in the `clean_up_registries()` below. -registry_id = 'test-registry-{}-{}'.format(uuid.uuid4().hex, int(time.time())) +registry_id = "test-registry-{}-{}".format(uuid.uuid4().hex, int(time.time())) @pytest.fixture(scope="session", autouse=True) def clean_up_registries(): - all_registries = list(manager.list_registries( - service_account_json, project_id, cloud_region)) + all_registries = list( + manager.list_registries(service_account_json, project_id, cloud_region) + ) for registry in all_registries: registry_id = registry.id - if registry_id.find('test-registry-') == 0: - time_str = registry_id[ - registry_id.rfind('-') + 1 : len(registry_id)] + if registry_id.find("test-registry-") == 0: + time_str = registry_id[registry_id.rfind("-") + 1 : len(registry_id)] try: test_date = datetime.datetime.utcfromtimestamp(int(time_str)) except ValueError: @@ -64,64 +64,83 @@ def clean_up_registries(): # *NOTE* Restrict to registries used in the tests older than 30 # days to prevent thrashing in the case of async tests - if (difftime.days > 30): + if difftime.days > 30: client = manager.get_client(service_account_json) - gateways = client.projects().locations().registries().devices( - ).list( - parent=registry.name, - fieldMask='config,gatewayConfig' - ).execute().get('devices', []) - devices = client.projects().locations().registries().devices( - ).list(parent=registry.name).execute().get( - 'devices', []) + gateways = ( + client.projects() + .locations() + .registries() + .devices() + .list(parent=registry.name, fieldMask="config,gatewayConfig") + .execute() + .get("devices", []) + ) + devices = ( + client.projects() + .locations() + .registries() + .devices() + .list(parent=registry.name) + .execute() + .get("devices", []) + ) # Unbind devices from each gateway and delete for gateway in gateways: - gateway_id = gateway.get('id') - bound = client.projects().locations().registries().devices( - ).list( + gateway_id = gateway.get("id") + bound = ( + client.projects() + .locations() + .registries() + .devices() + .list( parent=registry.name, - gatewayListOptions_associationsGatewayId=gateway_id - ).execute() - if 'devices' in bound: - for device in bound['devices']: + gatewayListOptions_associationsGatewayId=gateway_id, + ) + .execute() + ) + if "devices" in bound: + for device in bound["devices"]: bind_request = { - 'deviceId': device.get('id'), - 'gatewayId': gateway_id + "deviceId": device.get("id"), + "gatewayId": gateway_id, } - client.projects().locations().registries( - ).unbindDeviceFromGateway( - parent=registry.get('name'), - body=bind_request).execute() - gateway_name = '{}/devices/{}'.format( - registry.name, gateway_id) - client.projects().locations().registries().devices( - ).delete(name=gateway_name).execute() + client.projects().locations().registries().unbindDeviceFromGateway( + parent=registry.get("name"), body=bind_request + ).execute() + gateway_name = "{}/devices/{}".format(registry.name, gateway_id) + client.projects().locations().registries().devices().delete( + name=gateway_name + ).execute() # Delete the devices # Assumption is that the devices are not bound to gateways for device in devices: - device_name = '{}/devices/{}'.format( - registry.name, device.get('id')) + device_name = "{}/devices/{}".format( + registry.name, device.get("id") + ) print(device_name) remove_device = True try: - client.projects().locations().registries().devices( - ).get(name=device_name).execute() + client.projects().locations().registries().devices().get( + name=device_name + ).execute() except Exception: remove_device = False if remove_device: - print('removing {}'.format(device_name)) - client.projects().locations().registries().devices( - ).delete(name=device_name).execute() + print("removing {}".format(device_name)) + client.projects().locations().registries().devices().delete( + name=device_name + ).execute() # Delete the old test registry client.projects().locations().registries().delete( - name=registry.name).execute() + name=registry.name + ).execute() -@pytest.fixture(scope='module') +@pytest.fixture(scope="module") def test_topic(): topic = manager.create_iot_topic(project_id, topic_id) @@ -134,266 +153,310 @@ def test_topic(): def test_create_delete_registry(test_topic, capsys): manager.open_registry( - service_account_json, project_id, cloud_region, pubsub_topic, - registry_id) + service_account_json, project_id, cloud_region, pubsub_topic, registry_id + ) - manager.list_devices( - service_account_json, project_id, cloud_region, registry_id) + manager.list_devices(service_account_json, project_id, cloud_region, registry_id) out, _ = capsys.readouterr() # Check that create / list worked - assert 'Created registry' in out - assert 'event_notification_config' in out + assert "Created registry" in out + assert "event_notification_config" in out # Clean up - manager.delete_registry( - service_account_json, project_id, cloud_region, registry_id) + manager.delete_registry(service_account_json, project_id, cloud_region, registry_id) def test_get_iam_permissions(test_topic, capsys): manager.open_registry( - service_account_json, project_id, cloud_region, pubsub_topic, - registry_id) + service_account_json, project_id, cloud_region, pubsub_topic, registry_id + ) - manager.list_devices( - service_account_json, project_id, cloud_region, registry_id) + manager.list_devices(service_account_json, project_id, cloud_region, registry_id) # Test getting IAM permissions - print(manager.get_iam_permissions( - service_account_json, project_id, cloud_region, registry_id)) + print( + manager.get_iam_permissions( + service_account_json, project_id, cloud_region, registry_id + ) + ) # Test setting IAM permissions MEMBER = "group:dpebot@google.com" ROLE = "roles/viewer" - print(manager.set_iam_permissions( - service_account_json, project_id, cloud_region, registry_id, - ROLE, MEMBER)) + print( + manager.set_iam_permissions( + service_account_json, project_id, cloud_region, registry_id, ROLE, MEMBER + ) + ) out, _ = capsys.readouterr() # Check that create / list worked - assert 'Created registry' in out - assert 'event_notification_config' in out - assert 'dpebot' in out - assert 'etag' in out + assert "Created registry" in out + assert "event_notification_config" in out + assert "dpebot" in out + assert "etag" in out # Clean up - manager.delete_registry( - service_account_json, project_id, cloud_region, registry_id) + manager.delete_registry(service_account_json, project_id, cloud_region, registry_id) def test_add_delete_unauth_device(test_topic, capsys): - device_id = device_id_template.format('UNAUTH') + device_id = device_id_template.format("UNAUTH") manager.open_registry( - service_account_json, project_id, cloud_region, pubsub_topic, - registry_id) + service_account_json, project_id, cloud_region, pubsub_topic, registry_id + ) manager.create_unauth_device( - service_account_json, project_id, cloud_region, registry_id, - device_id) + service_account_json, project_id, cloud_region, registry_id, device_id + ) manager.get_device( - service_account_json, project_id, cloud_region, registry_id, - device_id) + service_account_json, project_id, cloud_region, registry_id, device_id + ) manager.delete_device( - service_account_json, project_id, cloud_region, registry_id, - device_id) + service_account_json, project_id, cloud_region, registry_id, device_id + ) - manager.delete_registry( - service_account_json, project_id, cloud_region, registry_id) + manager.delete_registry(service_account_json, project_id, cloud_region, registry_id) out, _ = capsys.readouterr() - assert 'UNAUTH' in out + assert "UNAUTH" in out @pytest.mark.flaky(max_runs=5, min_passes=1) def test_add_config_unauth_device(test_topic, capsys): - device_id = device_id_template.format('UNAUTH') + device_id = device_id_template.format("UNAUTH") manager.open_registry( - service_account_json, project_id, cloud_region, pubsub_topic, - registry_id) + service_account_json, project_id, cloud_region, pubsub_topic, registry_id + ) manager.create_unauth_device( - service_account_json, project_id, cloud_region, registry_id, - device_id) + service_account_json, project_id, cloud_region, registry_id, device_id + ) manager.set_config( - service_account_json, project_id, cloud_region, registry_id, - device_id, 0, 'test') + service_account_json, + project_id, + cloud_region, + registry_id, + device_id, + 0, + "test", + ) manager.get_device( - service_account_json, project_id, cloud_region, registry_id, - device_id) + service_account_json, project_id, cloud_region, registry_id, device_id + ) manager.get_config_versions( - service_account_json, project_id, cloud_region, registry_id, - device_id) + service_account_json, project_id, cloud_region, registry_id, device_id + ) manager.delete_device( - service_account_json, project_id, cloud_region, registry_id, - device_id) + service_account_json, project_id, cloud_region, registry_id, device_id + ) - manager.delete_registry( - service_account_json, project_id, cloud_region, registry_id) + manager.delete_registry(service_account_json, project_id, cloud_region, registry_id) out, _ = capsys.readouterr() - assert 'Set device configuration' in out - assert 'UNAUTH' in out - assert 'version: 2' in out + assert "Set device configuration" in out + assert "UNAUTH" in out + assert "version: 2" in out def test_add_delete_rs256_device(test_topic, capsys): - device_id = device_id_template.format('RSA256') + device_id = device_id_template.format("RSA256") manager.open_registry( - service_account_json, project_id, cloud_region, pubsub_topic, - registry_id) + service_account_json, project_id, cloud_region, pubsub_topic, registry_id + ) manager.create_rs256_device( - service_account_json, project_id, cloud_region, registry_id, - device_id, rsa_cert_path) + service_account_json, + project_id, + cloud_region, + registry_id, + device_id, + rsa_cert_path, + ) manager.get_device( - service_account_json, project_id, cloud_region, registry_id, - device_id) + service_account_json, project_id, cloud_region, registry_id, device_id + ) manager.get_state( - service_account_json, project_id, cloud_region, registry_id, - device_id) + service_account_json, project_id, cloud_region, registry_id, device_id + ) manager.delete_device( - service_account_json, project_id, cloud_region, registry_id, - device_id) + service_account_json, project_id, cloud_region, registry_id, device_id + ) - manager.delete_registry( - service_account_json, project_id, cloud_region, registry_id) + manager.delete_registry(service_account_json, project_id, cloud_region, registry_id) out, _ = capsys.readouterr() - assert 'format : RSA_X509_PEM' in out + assert "format : RSA_X509_PEM" in out def test_add_delete_es256_device(test_topic, capsys): - device_id = device_id_template.format('ES256') + device_id = device_id_template.format("ES256") manager.open_registry( - service_account_json, project_id, cloud_region, pubsub_topic, - registry_id) + service_account_json, project_id, cloud_region, pubsub_topic, registry_id + ) manager.create_es256_device( - service_account_json, project_id, cloud_region, registry_id, - device_id, es_cert_path) + service_account_json, + project_id, + cloud_region, + registry_id, + device_id, + es_cert_path, + ) manager.get_device( - service_account_json, project_id, cloud_region, registry_id, - device_id) + service_account_json, project_id, cloud_region, registry_id, device_id + ) manager.get_state( - service_account_json, project_id, cloud_region, registry_id, - device_id) + service_account_json, project_id, cloud_region, registry_id, device_id + ) manager.delete_device( - service_account_json, project_id, cloud_region, registry_id, - device_id) + service_account_json, project_id, cloud_region, registry_id, device_id + ) - manager.delete_registry( - service_account_json, project_id, cloud_region, registry_id) + manager.delete_registry(service_account_json, project_id, cloud_region, registry_id) out, _ = capsys.readouterr() - assert 'format : ES256_PEM' in out + assert "format : ES256_PEM" in out def test_add_patch_delete_rs256(test_topic, capsys): - device_id = device_id_template.format('PATCHME') + device_id = device_id_template.format("PATCHME") manager.open_registry( - service_account_json, project_id, cloud_region, pubsub_topic, - registry_id) + service_account_json, project_id, cloud_region, pubsub_topic, registry_id + ) manager.create_rs256_device( - service_account_json, project_id, cloud_region, registry_id, - device_id, rsa_cert_path) + service_account_json, + project_id, + cloud_region, + registry_id, + device_id, + rsa_cert_path, + ) manager.get_device( - service_account_json, project_id, cloud_region, registry_id, - device_id) + service_account_json, project_id, cloud_region, registry_id, device_id + ) out, _ = capsys.readouterr() - assert 'format : RSA_X509_PEM' in out + assert "format : RSA_X509_PEM" in out manager.patch_es256_auth( - service_account_json, project_id, cloud_region, registry_id, - device_id, es_cert_path) + service_account_json, + project_id, + cloud_region, + registry_id, + device_id, + es_cert_path, + ) manager.get_device( - service_account_json, project_id, cloud_region, registry_id, - device_id) + service_account_json, project_id, cloud_region, registry_id, device_id + ) out, _ = capsys.readouterr() - assert 'format : ES256_PEM' in out + assert "format : ES256_PEM" in out manager.delete_device( - service_account_json, project_id, cloud_region, registry_id, - device_id) + service_account_json, project_id, cloud_region, registry_id, device_id + ) - manager.delete_registry( - service_account_json, project_id, cloud_region, registry_id) + manager.delete_registry(service_account_json, project_id, cloud_region, registry_id) def test_add_patch_delete_es256(test_topic, capsys): - device_id = device_id_template.format('PATCHME') + device_id = device_id_template.format("PATCHME") manager.open_registry( - service_account_json, project_id, cloud_region, pubsub_topic, - registry_id) + service_account_json, project_id, cloud_region, pubsub_topic, registry_id + ) manager.create_es256_device( - service_account_json, project_id, cloud_region, registry_id, - device_id, es_cert_path) + service_account_json, + project_id, + cloud_region, + registry_id, + device_id, + es_cert_path, + ) manager.get_device( - service_account_json, project_id, cloud_region, registry_id, - device_id) + service_account_json, project_id, cloud_region, registry_id, device_id + ) out, _ = capsys.readouterr() - assert 'format : ES256_PEM' in out + assert "format : ES256_PEM" in out manager.patch_rsa256_auth( - service_account_json, project_id, cloud_region, registry_id, - device_id, rsa_cert_path) + service_account_json, + project_id, + cloud_region, + registry_id, + device_id, + rsa_cert_path, + ) manager.get_device( - service_account_json, project_id, cloud_region, registry_id, - device_id) + service_account_json, project_id, cloud_region, registry_id, device_id + ) out, _ = capsys.readouterr() - assert 'format : RSA_X509_PEM' in out + assert "format : RSA_X509_PEM" in out manager.delete_device( - service_account_json, project_id, cloud_region, registry_id, - device_id) + service_account_json, project_id, cloud_region, registry_id, device_id + ) - manager.delete_registry( - service_account_json, project_id, cloud_region, registry_id) + manager.delete_registry(service_account_json, project_id, cloud_region, registry_id) @pytest.mark.flaky(max_runs=5, min_passes=1) def test_send_command(test_topic, capsys): - device_id = device_id_template.format('RSA256') + device_id = device_id_template.format("RSA256") manager.create_registry( - service_account_json, project_id, cloud_region, pubsub_topic, - registry_id) + service_account_json, project_id, cloud_region, pubsub_topic, registry_id + ) exists = False devices = manager.list_devices( - service_account_json, project_id, cloud_region, registry_id) + service_account_json, project_id, cloud_region, registry_id + ) for device in devices: if device.id == device_id: exists = True if not exists: manager.create_rs256_device( - service_account_json, project_id, cloud_region, registry_id, - device_id, rsa_cert_path) + service_account_json, + project_id, + cloud_region, + registry_id, + device_id, + rsa_cert_path, + ) # Exercize the functionality client = cloudiot_mqtt_example.get_client( - project_id, cloud_region, registry_id, device_id, - rsa_private_path, 'RS256', ca_cert_path, - 'mqtt.googleapis.com', 443) + project_id, + cloud_region, + registry_id, + device_id, + rsa_private_path, + "RS256", + ca_cert_path, + "mqtt.googleapis.com", + 443, + ) client.loop_start() out, _ = capsys.readouterr() @@ -402,8 +465,13 @@ def test_send_command(test_topic, capsys): time.sleep(1) manager.send_command( - service_account_json, project_id, cloud_region, registry_id, - device_id, 'me want cookies') + service_account_json, + project_id, + cloud_region, + registry_id, + device_id, + "me want cookies", + ) out, _ = capsys.readouterr() # Process commands @@ -414,97 +482,123 @@ def test_send_command(test_topic, capsys): client.loop_stop() client.disconnect() manager.delete_device( - service_account_json, project_id, cloud_region, registry_id, - device_id) - manager.delete_registry( - service_account_json, project_id, cloud_region, registry_id) + service_account_json, project_id, cloud_region, registry_id, device_id + ) + manager.delete_registry(service_account_json, project_id, cloud_region, registry_id) - assert 'Sending command to device' in out - assert '400' not in out + assert "Sending command to device" in out + assert "400" not in out def test_create_gateway(test_topic, capsys): - gateway_id = device_id_template.format('RS256') + gateway_id = device_id_template.format("RS256") manager.create_registry( - service_account_json, project_id, cloud_region, pubsub_topic, - registry_id) + service_account_json, project_id, cloud_region, pubsub_topic, registry_id + ) # TODO: consider adding test for ES256 manager.create_gateway( - service_account_json, project_id, cloud_region, registry_id, - None, gateway_id, rsa_cert_path, 'RS256') + service_account_json, + project_id, + cloud_region, + registry_id, + None, + gateway_id, + rsa_cert_path, + "RS256", + ) # Clean up manager.delete_device( - service_account_json, project_id, cloud_region, registry_id, - gateway_id) - manager.delete_registry( - service_account_json, project_id, cloud_region, registry_id) + service_account_json, project_id, cloud_region, registry_id, gateway_id + ) + manager.delete_registry(service_account_json, project_id, cloud_region, registry_id) out, _ = capsys.readouterr() - assert 'Created Gateway' in out + assert "Created Gateway" in out def test_list_gateways(test_topic, capsys): - gateway_id = device_id_template.format('RS256') + gateway_id = device_id_template.format("RS256") manager.create_registry( - service_account_json, project_id, cloud_region, pubsub_topic, - registry_id) + service_account_json, project_id, cloud_region, pubsub_topic, registry_id + ) # TODO: consider adding test for ES256 manager.create_gateway( - service_account_json, project_id, cloud_region, registry_id, - None, gateway_id, rsa_cert_path, 'RS256') - - manager.list_gateways( - service_account_json, project_id, cloud_region, registry_id) + service_account_json, + project_id, + cloud_region, + registry_id, + None, + gateway_id, + rsa_cert_path, + "RS256", + ) + + manager.list_gateways(service_account_json, project_id, cloud_region, registry_id) # Clean up manager.delete_device( - service_account_json, project_id, cloud_region, registry_id, - gateway_id) - manager.delete_registry( - service_account_json, project_id, cloud_region, registry_id) + service_account_json, project_id, cloud_region, registry_id, gateway_id + ) + manager.delete_registry(service_account_json, project_id, cloud_region, registry_id) out, _ = capsys.readouterr() - assert 'Gateway ID: {}'.format(gateway_id) in out + assert "Gateway ID: {}".format(gateway_id) in out def test_bind_device_to_gateway_and_unbind(test_topic, capsys): - gateway_id = device_id_template.format('RS256') - device_id = device_id_template.format('noauthbind') + gateway_id = device_id_template.format("RS256") + device_id = device_id_template.format("noauthbind") manager.create_registry( - service_account_json, project_id, cloud_region, pubsub_topic, - registry_id) + service_account_json, project_id, cloud_region, pubsub_topic, registry_id + ) manager.create_gateway( - service_account_json, project_id, cloud_region, registry_id, - None, gateway_id, rsa_cert_path, 'RS256') + service_account_json, + project_id, + cloud_region, + registry_id, + None, + gateway_id, + rsa_cert_path, + "RS256", + ) manager.create_device( - service_account_json, project_id, cloud_region, registry_id, - device_id) + service_account_json, project_id, cloud_region, registry_id, device_id + ) manager.bind_device_to_gateway( - service_account_json, project_id, cloud_region, registry_id, - device_id, gateway_id) + service_account_json, + project_id, + cloud_region, + registry_id, + device_id, + gateway_id, + ) manager.unbind_device_from_gateway( - service_account_json, project_id, cloud_region, registry_id, - device_id, gateway_id) + service_account_json, + project_id, + cloud_region, + registry_id, + device_id, + gateway_id, + ) # Clean up manager.delete_device( - service_account_json, project_id, cloud_region, registry_id, - device_id) + service_account_json, project_id, cloud_region, registry_id, device_id + ) manager.delete_device( - service_account_json, project_id, cloud_region, registry_id, - gateway_id) - manager.delete_registry( - service_account_json, project_id, cloud_region, registry_id) + service_account_json, project_id, cloud_region, registry_id, gateway_id + ) + manager.delete_registry(service_account_json, project_id, cloud_region, registry_id) out, _ = capsys.readouterr() - assert 'Device Bound' in out - assert 'Device unbound' in out - assert 'HttpError 404' not in out + assert "Device Bound" in out + assert "Device unbound" in out + assert "HttpError 404" not in out diff --git a/samples/api-client/manager/noxfile.py b/samples/api-client/manager/noxfile.py index ba55d7ce..5660f08b 100644 --- a/samples/api-client/manager/noxfile.py +++ b/samples/api-client/manager/noxfile.py @@ -37,24 +37,22 @@ TEST_CONFIG = { # You can opt out from the test for specific Python versions. - 'ignored_versions': ["2.7"], - + "ignored_versions": ["2.7"], # An envvar key for determining the project id to use. Change it # to 'BUILD_SPECIFIC_GCLOUD_PROJECT' if you want to opt in using a # build specific Cloud project. You can also use your own string # to use your own Cloud project. - 'gcloud_project_env': 'GOOGLE_CLOUD_PROJECT', + "gcloud_project_env": "GOOGLE_CLOUD_PROJECT", # 'gcloud_project_env': 'BUILD_SPECIFIC_GCLOUD_PROJECT', - # A dictionary you want to inject into your test. Don't put any # secrets here. These values will override predefined values. - 'envs': {}, + "envs": {}, } try: # Ensure we can import noxfile_config in the project's directory. - sys.path.append('.') + sys.path.append(".") from noxfile_config import TEST_CONFIG_OVERRIDE except ImportError as e: print("No user noxfile_config found: detail: {}".format(e)) @@ -69,12 +67,12 @@ def get_pytest_env_vars(): ret = {} # Override the GCLOUD_PROJECT and the alias. - env_key = TEST_CONFIG['gcloud_project_env'] + env_key = TEST_CONFIG["gcloud_project_env"] # This should error out if not set. - ret['GOOGLE_CLOUD_PROJECT'] = os.environ[env_key] + ret["GOOGLE_CLOUD_PROJECT"] = os.environ[env_key] # Apply user supplied envs. - ret.update(TEST_CONFIG['envs']) + ret.update(TEST_CONFIG["envs"]) return ret @@ -83,7 +81,7 @@ def get_pytest_env_vars(): ALL_VERSIONS = ["2.7", "3.6", "3.7", "3.8"] # Any default versions that should be ignored. -IGNORED_VERSIONS = TEST_CONFIG['ignored_versions'] +IGNORED_VERSIONS = TEST_CONFIG["ignored_versions"] TESTED_VERSIONS = sorted([v for v in ALL_VERSIONS if v not in IGNORED_VERSIONS]) @@ -138,7 +136,7 @@ def lint(session): args = FLAKE8_COMMON_ARGS + [ "--application-import-names", ",".join(local_names), - "." + ".", ] session.run("flake8", *args) @@ -182,9 +180,9 @@ def py(session): if session.python in TESTED_VERSIONS: _session_tests(session) else: - session.skip("SKIPPED: {} tests are disabled for this sample.".format( - session.python - )) + session.skip( + "SKIPPED: {} tests are disabled for this sample.".format(session.python) + ) #