Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Connector pod updates "Forbidden" error #184

Merged
merged 11 commits into from Mar 5, 2024
17 changes: 17 additions & 0 deletions app/api/tests/test_client_remote_networks.py
Expand Up @@ -45,3 +45,20 @@ def test_get_rn_with_non_existing_name_returns_none(
)
result = api_client.get_remote_network_by_name("My Cluster")
assert result is None

def test_get_rn_with_transport_error_returns_none(
self, test_url, api_client, mock_resource_data, mocked_responses
):
errors_response = json.dumps({"errors": [{"message": "Transport error"}]})
mocked_responses.post(
test_url,
status=200,
body=errors_response,
match=[
responses.matchers.json_params_matcher(
{"variables": {"name": "My Cluster"}}, strict_match=False
)
],
)
result = api_client.get_remote_network_by_name("My Cluster")
assert result is None
53 changes: 45 additions & 8 deletions app/handlers/handlers_connectors.py
Expand Up @@ -3,6 +3,7 @@
import kopf
import kubernetes.client
import pendulum
from kubernetes.client.models import V1ObjectMeta

from app.api import TwingateAPIClient
from app.crds import TwingateConnectorCRD
Expand All @@ -12,6 +13,9 @@
ANNOTATION_LAST_VERSION_CHECK = "twingate.com/last-version-check"
ANNOTATION_NEXT_VERSION_CHECK = "twingate.com/next-version-check"

ANNOTATION_POD_SPEC_VERSION = "twingate.com/connector-podspec-version"
ANNOTATION_POD_SPEC_VERSION_VALUE = "v1"


def get_connector_pod(
crd: TwingateConnectorCRD, tenant_url: str, image: str
Expand Down Expand Up @@ -66,8 +70,11 @@ def get_connector_pod(
],
**spec.pod_extra,
}

pod_meta = V1ObjectMeta(annotations={ANNOTATION_POD_SPEC_VERSION: ANNOTATION_POD_SPEC_VERSION_VALUE})

# fmt: on
return kubernetes.client.V1Pod(spec=pod_spec)
return kubernetes.client.V1Pod(spec=pod_spec, metadata=pod_meta)


def get_connector_secret(
Expand All @@ -94,6 +101,16 @@ def k8s_read_namespaced_pod(
raise


def k8s_force_delete_pod(
namespace: str, name: str, kapi: kubernetes.client.CoreV1Api | None = None
):
kapi = kapi or kubernetes.client.CoreV1Api()
kapi.patch_namespaced_pod(name, namespace, body={"metadata": {"finalizers": []}})
kapi.delete_namespaced_pod(
name, namespace, body=kubernetes.client.V1DeleteOptions(grace_period_seconds=0)
)


@kopf.on.create("twingateconnector")
def twingate_connector_create(body, memo, logger, namespace, patch, **_):
settings = memo.twingate_settings
Expand Down Expand Up @@ -124,7 +141,7 @@ def twingate_connector_create(body, memo, logger, namespace, patch, **_):


@kopf.on.update("twingateconnector", field=["spec"])
def twingate_connector_update(body, memo, logger, new, diff, status, **_):
def twingate_connector_update(body, memo, logger, new, diff, status, namespace, **_):
logger.info(
"Got TwingateConnector update request: %s. Diff: %s. Status: %s.",
new,
Expand All @@ -136,13 +153,17 @@ def twingate_connector_update(body, memo, logger, new, diff, status, **_):
client = TwingateAPIClient(settings)

crd = TwingateConnectorCRD(**body)
if len(diff) == 1 and diff[0][:3] == ("add", ("spec", "id"), None):
return success(twingate_id=crd.id, message="No update required")
if len(diff) == 1 and diff[0][:3] == ("add", ("id"), None):
return success(twingate_id=crd.spec.id, message="No update required")

if not crd.spec.id:
return fail(error="Update called before Connector has an ID")

updated_connector = client.connector_update(crd.spec)

kapi = kubernetes.client.CoreV1Api()
kapi.delete_namespaced_pod(crd.metadata.name, namespace)

return success(twingate_id=updated_connector.id)


Expand Down Expand Up @@ -178,6 +199,19 @@ def twingate_connector_pod_reconciler(
crd = TwingateConnectorCRD(**body)
kapi = kubernetes.client.CoreV1Api()
k8s_pod = k8s_read_namespaced_pod(namespace, crd.metadata.name, kapi=kapi)
if k8s_pod and k8s_pod.status.phase != "Running":
raise kopf.TemporaryError("Pod not running.", delay=1)

# Migrate old pods
pod_spec_version = (k8s_pod and k8s_pod.metadata.annotations or {}).get(
ANNOTATION_POD_SPEC_VERSION
)
if k8s_pod and pod_spec_version != ANNOTATION_POD_SPEC_VERSION_VALUE:
k8s_force_delete_pod(namespace, crd.metadata.name, kapi)
return success(
message=f"Pod spec version mismatch. Deleting pod {crd.metadata.name}."
)

image = k8s_pod.spec.containers[0].image if k8s_pod else None

if crd.spec.image or not k8s_pod:
Expand All @@ -195,12 +229,15 @@ def twingate_connector_pod_reconciler(
ANNOTATION_NEXT_VERSION_CHECK: crd.spec.image_policy.get_next_date_iso8601(),
}

pod = get_connector_pod(crd, memo.twingate_settings.full_url, image)
kopf.adopt(pod, owner=body, strict=True, forced=True)

if k8s_pod:
kapi.patch_namespaced_pod(meta.name, namespace, body=pod)
# When pod exists, can only update the image or we get `Forbidden: pod updates may not change fields other than `spec.containers[*].image`
current_image = k8s_pod.spec.containers[0].image
if current_image != image:
k8s_pod.spec.containers[0].image = image
kapi.patch_namespaced_pod(meta.name, namespace, body=k8s_pod)
else:
pod = get_connector_pod(crd, memo.twingate_settings.full_url, image)
kopf.adopt(pod, owner=body, strict=True, forced=True)
kapi.create_namespaced_pod(namespace, body=pod)

return success()
119 changes: 118 additions & 1 deletion app/handlers/tests/test_handlers_connector.py
@@ -1,6 +1,7 @@
from unittest.mock import ANY, MagicMock, patch

import kopf
import kubernetes
import pendulum
import pytest

Expand All @@ -9,6 +10,9 @@
from app.handlers.handlers_connectors import (
ANNOTATION_LAST_VERSION_CHECK,
ANNOTATION_NEXT_VERSION_CHECK,
ANNOTATION_POD_SPEC_VERSION,
ANNOTATION_POD_SPEC_VERSION_VALUE,
k8s_read_namespaced_pod,
twingate_connector_create,
twingate_connector_delete,
twingate_connector_pod_reconciler,
Expand Down Expand Up @@ -59,6 +63,21 @@ def get(*, spec_overrides=None, status=None, with_id=False, annotations=None):
return get


def test_k8s_read_namespaced_pod_handles_404_returns_none(k8s_client_mock):
k8s_client_mock.read_namespaced_pod.side_effect = (
kubernetes.client.exceptions.ApiException(status=404)
)
assert k8s_read_namespaced_pod("default", "test") is None


def test_k8s_read_namespaced_reraises_non_404_exceptions(k8s_client_mock):
k8s_client_mock.read_namespaced_pod.side_effect = (
kubernetes.client.exceptions.ApiException(status=500)
)
with pytest.raises(kubernetes.client.exceptions.ApiException):
k8s_read_namespaced_pod("default", "test")


def test_twingate_connector_create(
get_connector_and_crd, kopf_handler_runner, mock_api_client
):
Expand Down Expand Up @@ -133,9 +152,34 @@ def test_twingate_connector_update(
run = kopf_handler_runner(
twingate_connector_update, crd, MagicMock(), new={}, diff={}
)
run.k8s_client_mock.delete_namespaced_pod.assert_called_with(
crd.metadata.name, "default"
)
assert run.result == {"success": True, "twingate_id": connector.id, "ts": ANY}


def test_twingate_connector_update_only_id_does_nothing(
get_connector_and_crd, kopf_handler_runner, mock_api_client
):
connector, crd = get_connector_and_crd(with_id=False)

mock_api_client.connector_update.return_value = connector

run = kopf_handler_runner(
twingate_connector_update,
crd,
MagicMock(),
new={},
diff=(("add", ("id"), None, "123"),),
)
assert run.result == {
"success": True,
"message": "No update required",
"ts": ANY,
"twingate_id": crd.spec.id,
}


def test_twingate_connector_update_without_id_does_nothing(
get_connector_and_crd, kopf_handler_runner, mock_api_client
):
Expand Down Expand Up @@ -200,10 +244,64 @@ def test_pod_update(
status={"twingate_connector_create": {"success": True}}, with_id=True
)

mock_pod = MagicMock()
mock_pod.status.phase = "Running"
mock_pod.metadata.annotations = {ANNOTATION_POD_SPEC_VERSION: ANNOTATION_POD_SPEC_VERSION_VALUE} # fmt: skip
k8s_client_mock.read_namespaced_pod.return_value = mock_pod

run = kopf_handler_runner(twingate_connector_pod_reconciler, crd, MagicMock())
assert run.result == {"success": True, "ts": ANY}
run.k8s_client_mock.patch_namespaced_pod.assert_called_once()

def test_pod_not_running(
self, get_connector_and_crd, kopf_handler_runner, k8s_client_mock
):
connector, crd = get_connector_and_crd(
status={"twingate_connector_create": {"success": True}}, with_id=True
)

mock_pod = MagicMock()
mock_pod.status.phase = "Pending"
k8s_client_mock.read_namespaced_pod.return_value = mock_pod

with pytest.raises(kopf.TemporaryError):
kopf_handler_runner(twingate_connector_pod_reconciler, crd, MagicMock())

def test_no_old_pod_spec_is_deleted(
self, get_connector_and_crd, kopf_handler_runner, k8s_client_mock
):
connector, crd = get_connector_and_crd(
status={"twingate_connector_create": {"success": True}}, with_id=True
)

mock_pod = MagicMock()
mock_pod.status.phase = "Running"
k8s_client_mock.read_namespaced_pod.return_value = mock_pod

run = kopf_handler_runner(twingate_connector_pod_reconciler, crd, MagicMock())
assert run.result == {"success": True, "ts": ANY, "message": ANY}
assert run.result["message"].startswith("Pod spec version mismatch.")
run.k8s_client_mock.patch_namespaced_pod.assert_called_once()
run.k8s_client_mock.delete_namespaced_pod.assert_called_once()

def test_old_pod_spec_is_deleted(
self, get_connector_and_crd, kopf_handler_runner, k8s_client_mock
):
connector, crd = get_connector_and_crd(
status={"twingate_connector_create": {"success": True}}, with_id=True
)

mock_pod = MagicMock()
mock_pod.status.phase = "Running"
mock_pod.metadata.annotations = {ANNOTATION_POD_SPEC_VERSION: "not ANNOTATION_POD_SPEC_VERSION_VALUE"} # fmt: skip
k8s_client_mock.read_namespaced_pod.return_value = mock_pod

run = kopf_handler_runner(twingate_connector_pod_reconciler, crd, MagicMock())
assert run.result == {"success": True, "ts": ANY, "message": ANY}
assert run.result["message"].startswith("Pod spec version mismatch.")
run.k8s_client_mock.patch_namespaced_pod.assert_called_once()
run.k8s_client_mock.delete_namespaced_pod.assert_called_once()


class TestTwingateConnectorPodReconciler_ImagePolicy:
@pytest.fixture()
Expand All @@ -225,6 +323,11 @@ def test_no_annotation(
with_id=True,
)

mock_pod = MagicMock()
mock_pod.status.phase = "Running"
mock_pod.metadata.annotations = {ANNOTATION_POD_SPEC_VERSION: ANNOTATION_POD_SPEC_VERSION_VALUE} # fmt: skip
k8s_client_mock.read_namespaced_pod.return_value = mock_pod

run = kopf_handler_runner(twingate_connector_pod_reconciler, crd, MagicMock())
assert run.result == {"success": True, "ts": ANY}
run.k8s_client_mock.patch_namespaced_pod.assert_called_once()
Expand All @@ -248,6 +351,11 @@ def test_past_due_annotation(
with_id=True,
)

mock_pod = MagicMock()
mock_pod.status.phase = "Running"
mock_pod.metadata.annotations = {ANNOTATION_POD_SPEC_VERSION: ANNOTATION_POD_SPEC_VERSION_VALUE} # fmt: skip
k8s_client_mock.read_namespaced_pod.return_value = mock_pod

run = kopf_handler_runner(twingate_connector_pod_reconciler, crd, MagicMock())
assert run.result == {"success": True, "ts": ANY}
run.k8s_client_mock.patch_namespaced_pod.assert_called_once()
Expand All @@ -273,8 +381,17 @@ def test_not_past_due_annotation(
with_id=True,
)

mock_container = MagicMock()
mock_container.image = "twingate/connector:latest"

mock_pod = MagicMock()
mock_pod.status.phase = "Running"
mock_pod.metadata.annotations = {ANNOTATION_POD_SPEC_VERSION: ANNOTATION_POD_SPEC_VERSION_VALUE} # fmt: skip
mock_pod.spec.containers = [mock_container]
k8s_client_mock.read_namespaced_pod.return_value = mock_pod

run = kopf_handler_runner(twingate_connector_pod_reconciler, crd, MagicMock())
assert run.result == {"success": True, "ts": ANY}
run.k8s_client_mock.patch_namespaced_pod.assert_called_once()
assert run.patch_mock.meta == {}
run.k8s_client_mock.patch_namespaced_pod.assert_not_called()
mock_get_image.assert_not_called()