Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Improved TwingateConnector reconciliation #180

Merged
merged 15 commits into from Mar 5, 2024
219 changes: 45 additions & 174 deletions app/handlers/handlers_connectors.py
@@ -1,3 +1,5 @@
import os

import kopf
import kubernetes.client
import pendulum
Expand All @@ -10,9 +12,6 @@
ANNOTATION_LAST_VERSION_CHECK = "twingate.com/last-version-check"
ANNOTATION_NEXT_VERSION_CHECK = "twingate.com/next-version-check"

LABEL_CONNECTOR = "twingate.com/connector"
LABEL_CONNECTOR_POD_DELETED = "twingate.com/connector-pod-deleted"


def get_connector_pod(
crd: TwingateConnectorCRD, tenant_url: str, image: str
Expand Down Expand Up @@ -83,7 +82,7 @@ def get_connector_secret(
)


def get_existing_pod(
def k8s_read_namespaced_pod(
namespace: str, name: str, kapi: kubernetes.client.CoreV1Api | None = None
) -> kubernetes.client.V1Pod | None:
try:
Expand All @@ -95,12 +94,6 @@ def get_existing_pod(
raise


def check_pod_exists(
namespace: str, name: str, kapi: kubernetes.client.CoreV1Api | None = None
) -> bool:
return bool(get_existing_pod(namespace, name, kapi=kapi))


@kopf.on.create("twingateconnector")
def twingate_connector_create(body, memo, logger, namespace, patch, **_):
settings = memo.twingate_settings
Expand All @@ -120,44 +113,14 @@ def twingate_connector_create(body, memo, logger, namespace, patch, **_):

logger.info("connector: %s", connector)
tokens = client.connector_generate_tokens(connector.id)
image = crd.spec.get_image()

pod = get_connector_pod(crd, settings.full_url, image)
secret = get_connector_secret(tokens.access_token, tokens.refresh_token)
kopf.adopt([pod, secret], owner=body, strict=True, forced=True)
kopf.label([pod, secret], {LABEL_CONNECTOR: crd.metadata.name})
kopf.adopt([secret], owner=body, strict=True, forced=True)

kapi = kubernetes.client.CoreV1Api()
kapi.create_namespaced_secret(namespace=namespace, body=secret)
kapi.create_namespaced_pod(namespace=namespace, body=pod)

image_policy = crd.spec.image_policy
next_version_check = image_policy.get_next_date_iso8601() if image_policy else None
patch.meta["annotations"] = {ANNOTATION_NEXT_VERSION_CHECK: next_version_check}

return success(twingate_id=connector_id, image=image)


@kopf.on.resume("twingateconnector")
def twingate_connector_resume(body, patch, namespace, logger, **_):
crd = TwingateConnectorCRD(**body)
image_policy = crd.spec.image_policy
next_version_check = image_policy.get_next_date_iso8601() if image_policy else None
patch.meta["annotations"] = {ANNOTATION_NEXT_VERSION_CHECK: next_version_check}

# Check pod exists and if not, add LABEL_CONNECTOR_POD_DELETED label to trigger recreation
pod_exists = check_pod_exists(namespace, crd.metadata.name)
if not pod_exists:
logger.info(
"Pod is gone. Adding LABEL_CONNECTOR_POD_DELETED label to trigger recreation."
)
patch.meta["labels"] = {LABEL_CONNECTOR_POD_DELETED: "true"}

return success(
twingate_id=crd.spec.id,
pod_exists=pod_exists,
next_version_check=next_version_check,
)
return success(twingate_id=connector_id)


@kopf.on.update("twingateconnector", field=["spec"])
Expand All @@ -173,113 +136,16 @@ def twingate_connector_update(body, memo, logger, new, diff, status, **_):
client = TwingateAPIClient(settings)

crd = TwingateConnectorCRD(**body)
if len(diff) == 1 and diff[0][:3] == ("add", ("spec", "id"), None):
return success(twingate_id=crd.id, message="No update required")

if not crd.spec.id:
return fail(error="Update called before Connector has an ID")

updated_connector = client.connector_update(crd.spec)
return success(twingate_id=updated_connector.id)


@kopf.on.field("twingateconnector", field="spec.imagePolicy")
def twingate_connector_version_policy_update(body, patch, logger, **_):
logger.info("twingate_connector_version_policy_update: %s", body)
crd = TwingateConnectorCRD(**body)
image_policy = crd.spec.image_policy
next_version_check = image_policy.get_next_date_iso8601() if image_policy else None
patch.meta["annotations"] = {ANNOTATION_NEXT_VERSION_CHECK: next_version_check}


@kopf.on.field("twingateconnector", field="spec.image")
def twingate_connector_image_update(body, meta, namespace, memo, logger, **_):
logger.info("twingate_connector_image_update: %s", body)
settings = memo.twingate_settings
crd = TwingateConnectorCRD(**body)
image = crd.spec.get_image()
if crd.spec.image:
pod = get_connector_pod(crd, settings.full_url, image)
kapi = kubernetes.client.CoreV1Api()
result = kapi.patch_namespaced_pod(meta.name, namespace, body=pod)
logger.info("Patched pod: %s", result)

return success(twingate_id=crd.spec.id, image=image)


@kopf.on.timer(
"twingateconnector",
interval=60.0,
annotations={ANNOTATION_NEXT_VERSION_CHECK: kopf.PRESENT},
)
def timer_check_image_version(body, meta, namespace, memo, logger, patch, **_):
settings = memo.twingate_settings
crd = TwingateConnectorCRD(**body)
if not crd.spec.image_policy:
patch.meta["annotations"] = {ANNOTATION_NEXT_VERSION_CHECK: None}
return

now = pendulum.now("UTC").start_of("minute")
next_check = pendulum.parse(
body["metadata"]["annotations"][ANNOTATION_NEXT_VERSION_CHECK]
)
if now < next_check:
return

logger.info("Checking connector %s for new image version", crd.metadata.name)

try:
image = crd.spec.get_image()
pod = get_connector_pod(crd, settings.full_url, image)
kapi = kubernetes.client.CoreV1Api()
kapi.patch_namespaced_pod(meta.name, namespace, body=pod)
patch.meta["annotations"] = {
ANNOTATION_LAST_VERSION_CHECK: now.to_iso8601_string(),
ANNOTATION_NEXT_VERSION_CHECK: crd.spec.image_policy.get_next_date_iso8601(),
}
except kubernetes.client.exceptions.ApiException:
logger.exception("Failed to remove label from pod %s", meta.name)


# region Delete related


@kopf.on.update("twingateconnector", labels={LABEL_CONNECTOR_POD_DELETED: "true"})
def twingate_connector_recreate_pod(body, namespace, memo, patch, logger, **_):
"""Recreates the Connector's Pod.

When pod is deleted we can't recreate it right away because we want to
use the same name. So when it's deleted, `twingate_connector_pod_deleted` annotates
it's connector object so that we get to this handler and can recreate it.

NOTE: This handler will get called as soon as we add the label to the pod, which
is before the pod is actually deleted. So we need to wait for the pod to actually
get deleted before we can recreate it.
"""

def is_conflict_already_exists(apiex):
return (
apiex.status == 409
and apiex.reason == "Conflict"
and "AlreadyExists" in str(apiex)
)

logger.info("twingate_connector_recreate_pod: %s.", body)
settings = memo.twingate_settings
crd = TwingateConnectorCRD(**body)
image = crd.spec.get_image()

pod = get_connector_pod(crd, settings.full_url, image)
kopf.adopt(pod, owner=body, strict=True, forced=True)
kopf.label(pod, {"twingate.com/connector": crd.metadata.name})

kapi = kubernetes.client.CoreV1Api()
if check_pod_exists(namespace, crd.metadata.name, kapi=kapi):
raise kopf.TemporaryError(
f"Pod {crd.metadata.name} not deleted yet. Retrying (%s)...", delay=1
)

kapi.create_namespaced_pod(namespace=namespace, body=pod)
patch.meta["labels"] = {LABEL_CONNECTOR_POD_DELETED: None}


@kopf.on.delete("twingateconnector")
def twingate_connector_delete(spec, meta, status, namespace, memo, logger, **kwargs):
logger.info("Got a delete request: %s. Status: %s", spec, status)
Expand All @@ -292,44 +158,49 @@ def twingate_connector_delete(spec, meta, status, namespace, memo, logger, **kwa
logger.info("Deleting connector %s", connector_id)
client.connector_delete(connector_id)

try:
# Remove label from pod so its delete handler isn't triggered
kapi = kubernetes.client.CoreV1Api()
kapi.patch_namespaced_pod(
meta.name,
namespace,
body={"metadata": {"labels": {LABEL_CONNECTOR: None}}},
)
except kubernetes.client.exceptions.ApiException:
logger.exception("Failed to remove label from pod %s", meta.name)

CONNECTOR_RECONCILER_INTERVAL = int(os.environ.get("CONNECTOR_RECONCILER_INTERVAL", "5")) # fmt: skip
CONNECTOR_RECONCILER_INIT_DELAY = int(os.environ.get("CONNECTOR_RECONCILER_INIT_DELAY", "5")) # fmt: skip

@kopf.on.delete("", "v1", "pods", labels={LABEL_CONNECTOR: kopf.PRESENT})
def twingate_connector_pod_deleted(body, spec, meta, logger, namespace, memo, **_):
logger.info("twingate_connector_pod_deleted: %s", body)

# Annotate the parent connector so that it knows it needs to recreate the pod
owner_refs = meta.get("ownerReferences", [])
owner = next((o for o in owner_refs if o["kind"] == "TwingateConnector"), None)
if not owner:
return
@kopf.timer(
"twingateconnector",
interval=CONNECTOR_RECONCILER_INTERVAL,
initial_delay=CONNECTOR_RECONCILER_INIT_DELAY,
)
def twingate_connector_pod_reconciler(
body, meta, status, namespace, patch, memo, logger, **_
):
logger.info("twingate_connector_reconciler: %s", body)
if not (status and "twingate_connector_create" in status):
raise kopf.TemporaryError("TwingateConnector not ready.", delay=1)

owner_group, owner_version = owner["apiVersion"].split("/")
crd = TwingateConnectorCRD(**body)
kapi = kubernetes.client.CoreV1Api()
k8s_pod = k8s_read_namespaced_pod(namespace, crd.metadata.name, kapi=kapi)
image = k8s_pod.spec.containers[0].image if k8s_pod else None

try:
kapi = kubernetes.client.CustomObjectsApi()
kapi.patch_namespaced_custom_object(
owner_group,
owner_version,
namespace,
"twingateconnectors",
owner["name"],
{"metadata": {"labels": {LABEL_CONNECTOR_POD_DELETED: "true"}}},
if crd.spec.image or not k8s_pod:
image = crd.spec.get_image()
elif crd.spec.image_policy:
now = pendulum.now("UTC").start_of("minute")
next_check = pendulum.parse(
ekampf marked this conversation as resolved.
Show resolved Hide resolved
meta.annotations.get(ANNOTATION_NEXT_VERSION_CHECK, "0001-01-01 00:00:00")
)
except kubernetes.client.exceptions.ApiException:
logger.exception("Failed to annotate connector %s", owner["name"])

return success(msg="deleted")
if now >= next_check:
image = crd.spec.get_image()
patch.meta["annotations"] = {
ANNOTATION_LAST_VERSION_CHECK: now.to_iso8601_string(),
ANNOTATION_NEXT_VERSION_CHECK: crd.spec.image_policy.get_next_date_iso8601(),
}

pod = get_connector_pod(crd, memo.twingate_settings.full_url, image)
kopf.adopt(pod, owner=body, strict=True, forced=True)

if k8s_pod:
kapi.patch_namespaced_pod(meta.name, namespace, body=pod)
else:
kapi.create_namespaced_pod(namespace, body=pod)

# endregion
return success()