Skip to content

Commit

Permalink
fix: Connector pod updates "Forbidden" error (#184)
Browse files Browse the repository at this point in the history
  • Loading branch information
ekampf committed Mar 5, 2024
1 parent 22b7f84 commit 8096e62
Show file tree
Hide file tree
Showing 3 changed files with 180 additions and 9 deletions.
17 changes: 17 additions & 0 deletions app/api/tests/test_client_remote_networks.py
Expand Up @@ -45,3 +45,20 @@ def test_get_rn_with_non_existing_name_returns_none(
)
result = api_client.get_remote_network_by_name("My Cluster")
assert result is None

def test_get_rn_with_transport_error_returns_none(
self, test_url, api_client, mock_resource_data, mocked_responses
):
errors_response = json.dumps({"errors": [{"message": "Transport error"}]})
mocked_responses.post(
test_url,
status=200,
body=errors_response,
match=[
responses.matchers.json_params_matcher(
{"variables": {"name": "My Cluster"}}, strict_match=False
)
],
)
result = api_client.get_remote_network_by_name("My Cluster")
assert result is None
53 changes: 45 additions & 8 deletions app/handlers/handlers_connectors.py
Expand Up @@ -3,6 +3,7 @@
import kopf
import kubernetes.client
import pendulum
from kubernetes.client.models import V1ObjectMeta

from app.api import TwingateAPIClient
from app.crds import TwingateConnectorCRD
Expand All @@ -12,6 +13,9 @@
ANNOTATION_LAST_VERSION_CHECK = "twingate.com/last-version-check"
ANNOTATION_NEXT_VERSION_CHECK = "twingate.com/next-version-check"

ANNOTATION_POD_SPEC_VERSION = "twingate.com/connector-podspec-version"
ANNOTATION_POD_SPEC_VERSION_VALUE = "v1"


def get_connector_pod(
crd: TwingateConnectorCRD, tenant_url: str, image: str
Expand Down Expand Up @@ -66,8 +70,11 @@ def get_connector_pod(
],
**spec.pod_extra,
}

pod_meta = V1ObjectMeta(annotations={ANNOTATION_POD_SPEC_VERSION: ANNOTATION_POD_SPEC_VERSION_VALUE})

# fmt: on
return kubernetes.client.V1Pod(spec=pod_spec)
return kubernetes.client.V1Pod(spec=pod_spec, metadata=pod_meta)


def get_connector_secret(
Expand All @@ -94,6 +101,16 @@ def k8s_read_namespaced_pod(
raise


def k8s_force_delete_pod(
namespace: str, name: str, kapi: kubernetes.client.CoreV1Api | None = None
):
kapi = kapi or kubernetes.client.CoreV1Api()
kapi.patch_namespaced_pod(name, namespace, body={"metadata": {"finalizers": []}})
kapi.delete_namespaced_pod(
name, namespace, body=kubernetes.client.V1DeleteOptions(grace_period_seconds=0)
)


@kopf.on.create("twingateconnector")
def twingate_connector_create(body, memo, logger, namespace, patch, **_):
settings = memo.twingate_settings
Expand Down Expand Up @@ -124,7 +141,7 @@ def twingate_connector_create(body, memo, logger, namespace, patch, **_):


@kopf.on.update("twingateconnector", field=["spec"])
def twingate_connector_update(body, memo, logger, new, diff, status, **_):
def twingate_connector_update(body, memo, logger, new, diff, status, namespace, **_):
logger.info(
"Got TwingateConnector update request: %s. Diff: %s. Status: %s.",
new,
Expand All @@ -136,13 +153,17 @@ def twingate_connector_update(body, memo, logger, new, diff, status, **_):
client = TwingateAPIClient(settings)

crd = TwingateConnectorCRD(**body)
if len(diff) == 1 and diff[0][:3] == ("add", ("spec", "id"), None):
return success(twingate_id=crd.id, message="No update required")
if len(diff) == 1 and diff[0][:3] == ("add", ("id"), None):
return success(twingate_id=crd.spec.id, message="No update required")

if not crd.spec.id:
return fail(error="Update called before Connector has an ID")

updated_connector = client.connector_update(crd.spec)

kapi = kubernetes.client.CoreV1Api()
kapi.delete_namespaced_pod(crd.metadata.name, namespace)

return success(twingate_id=updated_connector.id)


Expand Down Expand Up @@ -178,6 +199,19 @@ def twingate_connector_pod_reconciler(
crd = TwingateConnectorCRD(**body)
kapi = kubernetes.client.CoreV1Api()
k8s_pod = k8s_read_namespaced_pod(namespace, crd.metadata.name, kapi=kapi)
if k8s_pod and k8s_pod.status.phase != "Running":
raise kopf.TemporaryError("Pod not running.", delay=1)

# Migrate old pods
pod_spec_version = (k8s_pod and k8s_pod.metadata.annotations or {}).get(
ANNOTATION_POD_SPEC_VERSION
)
if k8s_pod and pod_spec_version != ANNOTATION_POD_SPEC_VERSION_VALUE:
k8s_force_delete_pod(namespace, crd.metadata.name, kapi)
return success(
message=f"Pod spec version mismatch. Deleting pod {crd.metadata.name}."
)

image = k8s_pod.spec.containers[0].image if k8s_pod else None

if crd.spec.image or not k8s_pod:
Expand All @@ -195,12 +229,15 @@ def twingate_connector_pod_reconciler(
ANNOTATION_NEXT_VERSION_CHECK: crd.spec.image_policy.get_next_date_iso8601(),
}

pod = get_connector_pod(crd, memo.twingate_settings.full_url, image)
kopf.adopt(pod, owner=body, strict=True, forced=True)

if k8s_pod:
kapi.patch_namespaced_pod(meta.name, namespace, body=pod)
# When pod exists, can only update the image or we get `Forbidden: pod updates may not change fields other than `spec.containers[*].image`
current_image = k8s_pod.spec.containers[0].image
if current_image != image:
k8s_pod.spec.containers[0].image = image
kapi.patch_namespaced_pod(meta.name, namespace, body=k8s_pod)
else:
pod = get_connector_pod(crd, memo.twingate_settings.full_url, image)
kopf.adopt(pod, owner=body, strict=True, forced=True)
kapi.create_namespaced_pod(namespace, body=pod)

return success()
119 changes: 118 additions & 1 deletion app/handlers/tests/test_handlers_connector.py
@@ -1,6 +1,7 @@
from unittest.mock import ANY, MagicMock, patch

import kopf
import kubernetes
import pendulum
import pytest

Expand All @@ -9,6 +10,9 @@
from app.handlers.handlers_connectors import (
ANNOTATION_LAST_VERSION_CHECK,
ANNOTATION_NEXT_VERSION_CHECK,
ANNOTATION_POD_SPEC_VERSION,
ANNOTATION_POD_SPEC_VERSION_VALUE,
k8s_read_namespaced_pod,
twingate_connector_create,
twingate_connector_delete,
twingate_connector_pod_reconciler,
Expand Down Expand Up @@ -59,6 +63,21 @@ def get(*, spec_overrides=None, status=None, with_id=False, annotations=None):
return get


def test_k8s_read_namespaced_pod_handles_404_returns_none(k8s_client_mock):
k8s_client_mock.read_namespaced_pod.side_effect = (
kubernetes.client.exceptions.ApiException(status=404)
)
assert k8s_read_namespaced_pod("default", "test") is None


def test_k8s_read_namespaced_reraises_non_404_exceptions(k8s_client_mock):
k8s_client_mock.read_namespaced_pod.side_effect = (
kubernetes.client.exceptions.ApiException(status=500)
)
with pytest.raises(kubernetes.client.exceptions.ApiException):
k8s_read_namespaced_pod("default", "test")


def test_twingate_connector_create(
get_connector_and_crd, kopf_handler_runner, mock_api_client
):
Expand Down Expand Up @@ -133,9 +152,34 @@ def test_twingate_connector_update(
run = kopf_handler_runner(
twingate_connector_update, crd, MagicMock(), new={}, diff={}
)
run.k8s_client_mock.delete_namespaced_pod.assert_called_with(
crd.metadata.name, "default"
)
assert run.result == {"success": True, "twingate_id": connector.id, "ts": ANY}


def test_twingate_connector_update_only_id_does_nothing(
get_connector_and_crd, kopf_handler_runner, mock_api_client
):
connector, crd = get_connector_and_crd(with_id=False)

mock_api_client.connector_update.return_value = connector

run = kopf_handler_runner(
twingate_connector_update,
crd,
MagicMock(),
new={},
diff=(("add", ("id"), None, "123"),),
)
assert run.result == {
"success": True,
"message": "No update required",
"ts": ANY,
"twingate_id": crd.spec.id,
}


def test_twingate_connector_update_without_id_does_nothing(
get_connector_and_crd, kopf_handler_runner, mock_api_client
):
Expand Down Expand Up @@ -200,10 +244,64 @@ def test_pod_update(
status={"twingate_connector_create": {"success": True}}, with_id=True
)

mock_pod = MagicMock()
mock_pod.status.phase = "Running"
mock_pod.metadata.annotations = {ANNOTATION_POD_SPEC_VERSION: ANNOTATION_POD_SPEC_VERSION_VALUE} # fmt: skip
k8s_client_mock.read_namespaced_pod.return_value = mock_pod

run = kopf_handler_runner(twingate_connector_pod_reconciler, crd, MagicMock())
assert run.result == {"success": True, "ts": ANY}
run.k8s_client_mock.patch_namespaced_pod.assert_called_once()

def test_pod_not_running(
self, get_connector_and_crd, kopf_handler_runner, k8s_client_mock
):
connector, crd = get_connector_and_crd(
status={"twingate_connector_create": {"success": True}}, with_id=True
)

mock_pod = MagicMock()
mock_pod.status.phase = "Pending"
k8s_client_mock.read_namespaced_pod.return_value = mock_pod

with pytest.raises(kopf.TemporaryError):
kopf_handler_runner(twingate_connector_pod_reconciler, crd, MagicMock())

def test_no_old_pod_spec_is_deleted(
self, get_connector_and_crd, kopf_handler_runner, k8s_client_mock
):
connector, crd = get_connector_and_crd(
status={"twingate_connector_create": {"success": True}}, with_id=True
)

mock_pod = MagicMock()
mock_pod.status.phase = "Running"
k8s_client_mock.read_namespaced_pod.return_value = mock_pod

run = kopf_handler_runner(twingate_connector_pod_reconciler, crd, MagicMock())
assert run.result == {"success": True, "ts": ANY, "message": ANY}
assert run.result["message"].startswith("Pod spec version mismatch.")
run.k8s_client_mock.patch_namespaced_pod.assert_called_once()
run.k8s_client_mock.delete_namespaced_pod.assert_called_once()

def test_old_pod_spec_is_deleted(
self, get_connector_and_crd, kopf_handler_runner, k8s_client_mock
):
connector, crd = get_connector_and_crd(
status={"twingate_connector_create": {"success": True}}, with_id=True
)

mock_pod = MagicMock()
mock_pod.status.phase = "Running"
mock_pod.metadata.annotations = {ANNOTATION_POD_SPEC_VERSION: "not ANNOTATION_POD_SPEC_VERSION_VALUE"} # fmt: skip
k8s_client_mock.read_namespaced_pod.return_value = mock_pod

run = kopf_handler_runner(twingate_connector_pod_reconciler, crd, MagicMock())
assert run.result == {"success": True, "ts": ANY, "message": ANY}
assert run.result["message"].startswith("Pod spec version mismatch.")
run.k8s_client_mock.patch_namespaced_pod.assert_called_once()
run.k8s_client_mock.delete_namespaced_pod.assert_called_once()


class TestTwingateConnectorPodReconciler_ImagePolicy:
@pytest.fixture()
Expand All @@ -225,6 +323,11 @@ def test_no_annotation(
with_id=True,
)

mock_pod = MagicMock()
mock_pod.status.phase = "Running"
mock_pod.metadata.annotations = {ANNOTATION_POD_SPEC_VERSION: ANNOTATION_POD_SPEC_VERSION_VALUE} # fmt: skip
k8s_client_mock.read_namespaced_pod.return_value = mock_pod

run = kopf_handler_runner(twingate_connector_pod_reconciler, crd, MagicMock())
assert run.result == {"success": True, "ts": ANY}
run.k8s_client_mock.patch_namespaced_pod.assert_called_once()
Expand All @@ -248,6 +351,11 @@ def test_past_due_annotation(
with_id=True,
)

mock_pod = MagicMock()
mock_pod.status.phase = "Running"
mock_pod.metadata.annotations = {ANNOTATION_POD_SPEC_VERSION: ANNOTATION_POD_SPEC_VERSION_VALUE} # fmt: skip
k8s_client_mock.read_namespaced_pod.return_value = mock_pod

run = kopf_handler_runner(twingate_connector_pod_reconciler, crd, MagicMock())
assert run.result == {"success": True, "ts": ANY}
run.k8s_client_mock.patch_namespaced_pod.assert_called_once()
Expand All @@ -273,8 +381,17 @@ def test_not_past_due_annotation(
with_id=True,
)

mock_container = MagicMock()
mock_container.image = "twingate/connector:latest"

mock_pod = MagicMock()
mock_pod.status.phase = "Running"
mock_pod.metadata.annotations = {ANNOTATION_POD_SPEC_VERSION: ANNOTATION_POD_SPEC_VERSION_VALUE} # fmt: skip
mock_pod.spec.containers = [mock_container]
k8s_client_mock.read_namespaced_pod.return_value = mock_pod

run = kopf_handler_runner(twingate_connector_pod_reconciler, crd, MagicMock())
assert run.result == {"success": True, "ts": ANY}
run.k8s_client_mock.patch_namespaced_pod.assert_called_once()
assert run.patch_mock.meta == {}
run.k8s_client_mock.patch_namespaced_pod.assert_not_called()
mock_get_image.assert_not_called()

0 comments on commit 8096e62

Please sign in to comment.