diff --git a/backend/btrixcloud/k8sapi.py b/backend/btrixcloud/k8sapi.py index 578c22692..3ba7f965e 100644 --- a/backend/btrixcloud/k8sapi.py +++ b/backend/btrixcloud/k8sapi.py @@ -288,3 +288,29 @@ async def has_custom_jobs_with_label(self, plural, label) -> bool: # pylint: disable=broad-exception-caught except Exception: return False + + async def send_signal_to_pod(self, pod_name, signame) -> bool: + """send signal to all pods""" + command = ["bash", "-c", f"kill -s {signame} 1"] + signaled = False + + try: + print(f"Sending {signame} to {pod_name}", flush=True) + + res = await self.core_api_ws.connect_get_namespaced_pod_exec( + name=pod_name, + namespace=self.namespace, + command=command, + stdout=True, + ) + if res: + print("Result", res, flush=True) + + else: + signaled = True + + # pylint: disable=broad-except + except Exception as exc: + print(f"Send Signal Error: {exc}", flush=True) + + return signaled diff --git a/backend/btrixcloud/operator/baseoperator.py b/backend/btrixcloud/operator/baseoperator.py index b06d8bf05..9a3c67db7 100644 --- a/backend/btrixcloud/operator/baseoperator.py +++ b/backend/btrixcloud/operator/baseoperator.py @@ -42,28 +42,35 @@ def compute_crawler_resources(self): """compute memory / cpu resources for crawlers""" p = self.shared_params num = max(int(p["crawler_browser_instances"]) - 1, 0) + crawler_cpu: float = 0 + crawler_memory: int = 0 print("crawler resources") if not p.get("crawler_cpu"): base = parse_quantity(p["crawler_cpu_base"]) extra = parse_quantity(p["crawler_extra_cpu_per_browser"]) # cpu is a floating value of cpu cores - p["crawler_cpu"] = float(base + num * extra) + crawler_cpu = float(base + num * extra) - print(f"cpu = {base} + {num} * {extra} = {p['crawler_cpu']}") + print(f"cpu = {base} + {num} * {extra} = {crawler_cpu}") else: - print(f"cpu = {p['crawler_cpu']}") + crawler_cpu = float(parse_quantity(p["crawler_cpu"])) + print(f"cpu = {crawler_cpu}") if not p.get("crawler_memory"): base = parse_quantity(p["crawler_memory_base"]) extra = parse_quantity(p["crawler_extra_memory_per_browser"]) # memory is always an int - p["crawler_memory"] = int(base + num * extra) + crawler_memory = int(base + num * extra) - print(f"memory = {base} + {num} * {extra} = {p['crawler_memory']}") + print(f"memory = {base} + {num} * {extra} = {crawler_memory}") else: - print(f"memory = {p['crawler_memory']}") + crawler_memory = int(parse_quantity(p["crawler_memory"])) + print(f"memory = {crawler_memory}") + + p["crawler_cpu"] = crawler_cpu + p["crawler_memory"] = crawler_memory def compute_profile_resources(self): """compute memory /cpu resources for a single profile browser""" diff --git a/backend/btrixcloud/operator/crawls.py b/backend/btrixcloud/operator/crawls.py index c33afe1d5..fe385d777 100644 --- a/backend/btrixcloud/operator/crawls.py +++ b/backend/btrixcloud/operator/crawls.py @@ -39,6 +39,7 @@ CrawlStatus, MCBaseRequest, MCSyncData, + PodInfo, POD, CMAP, PVC, @@ -61,6 +62,19 @@ EXEC_TIME_UPDATE_SECS = 60 +# scale up if exceeded this threshold of mem usage (eg. 90%) +MEM_SCALE_UP_THRESHOLD = 0.90 + +# scale up by this much +MEM_SCALE_UP = 1.2 + +# soft OOM if exceeded this threshold of mem usage (eg. 100%) +MEM_SOFT_OOM_THRESHOLD = 1.0 + +# set memory limit to this much of request for extra padding +MEM_LIMIT_PADDING = 1.2 + + # pylint: disable=too-many-public-methods, too-many-locals, too-many-branches, too-many-statements # pylint: disable=invalid-name, too-many-lines, too-many-return-statements # ============================================================================ @@ -209,8 +223,10 @@ async def sync_crawls(self, data: MCSyncData): data.related.get(METRICS, {}), ) - # auto sizing handled here - self.handle_auto_size(crawl.id, status.podStatus) + # auto-scaling not possible without pod metrics + if self.k8s.has_pod_metrics: + # auto sizing handled here + await self.handle_auto_size(status.podStatus) if status.finished: return await self.finalize_response( @@ -326,6 +342,7 @@ def _load_crawler(self, params, i, status, children): params["name"] = name params["cpu"] = pod_info.newCpu or params.get("crawler_cpu") params["memory"] = pod_info.newMemory or params.get("crawler_memory") + params["memory_limit"] = float(params["memory"]) * MEM_LIMIT_PADDING params["do_restart"] = ( pod_info.should_restart_pod() or params.get("force_restart") ) and has_pod @@ -758,8 +775,6 @@ async def sync_crawl_state( qa_run_id = crawl.id if crawl.is_qa else None while page_crawled: - print("PAGE DATA", flush=True) - print(page_crawled, flush=True) page_dict = json.loads(page_crawled) await self.page_ops.add_page_to_db( page_dict, crawl.db_crawl_id, qa_run_id, crawl.oid @@ -836,7 +851,7 @@ def sync_pod_status(self, pods: dict[str, dict], status: CrawlStatus): return crawler_running, redis_running, done - def handle_terminated_pod(self, name, role, status, terminated): + def handle_terminated_pod(self, name, role, status: CrawlStatus, terminated): """handle terminated pod state""" if not terminated: return @@ -986,7 +1001,9 @@ def should_mark_waiting(self, state, started): return False - async def add_used_stats(self, crawl_id, pod_status, redis, metrics): + async def add_used_stats( + self, crawl_id, pod_status: dict[str, PodInfo], redis, metrics + ): """load current usage stats""" if redis: stats = await redis.info("persistence") @@ -1010,20 +1027,42 @@ async def add_used_stats(self, crawl_id, pod_status, redis, metrics): pod_info.used.memory = int(parse_quantity(usage["memory"])) pod_info.used.cpu = float(parse_quantity(usage["cpu"])) - def handle_auto_size(self, _, pod_status): + async def handle_auto_size(self, pod_status: dict[str, PodInfo]) -> None: """auto scale pods here, experimental""" for name, pod in pod_status.items(): - # if pod crashed due to OOM, increase mem - # if pod.isNewExit and pod.reason == "oom": - # pod.newMemory = int(float(pod.allocated.memory) * 1.2) - # print(f"Resizing pod {name} -> mem {pod.newMemory} - OOM Detected") + mem_usage = pod.get_percent_memory() + new_memory = int(float(pod.allocated.memory) * MEM_SCALE_UP) + send_sig = False + + # if pod is using >MEM_SCALE_UP_THRESHOLD of its memory, increase mem + if mem_usage > MEM_SCALE_UP_THRESHOLD: + pod.newMemory = new_memory + print( + f"Mem {mem_usage}: Resizing pod {name} -> mem {pod.newMemory} - Scale Up" + ) + + # if crawler pod is using its OOM threshold, attempt a soft OOM + # via a second SIGTERM + if ( + mem_usage >= MEM_SOFT_OOM_THRESHOLD + and name.startswith("crawl") + and pod.signalAtMem != pod.newMemory + ): + send_sig = True + + # if any pod crashed due to OOM, increase mem + elif pod.isNewExit and pod.reason == "oom": + pod.newMemory = new_memory + print( + f"Mem {mem_usage}: Resizing pod {name} -> mem {pod.newMemory} - OOM Detected" + ) + send_sig = True - # if redis is using >0.90 of its memory, increase mem - if name.startswith("redis") and pod.get_percent_memory() > 0.90: - pod.newMemory = int(float(pod.allocated.memory) * 1.2) - print(f"Resizing pod {name} -> mem {pod.newMemory} - Redis Capacity") + # avoid resending SIGTERM multiple times after it already succeeded + if send_sig and await self.k8s.send_signal_to_pod(name, "SIGTERM"): + pod.signalAtMem = pod.newMemory - async def log_crashes(self, crawl_id, pod_status, redis): + async def log_crashes(self, crawl_id, pod_status: dict[str, PodInfo], redis): """report/log any pod crashes here""" for name, pod in pod_status.items(): # log only unexpected exits as crashes diff --git a/backend/btrixcloud/operator/models.py b/backend/btrixcloud/operator/models.py index f5a2f4147..bf8988e96 100644 --- a/backend/btrixcloud/operator/models.py +++ b/backend/btrixcloud/operator/models.py @@ -114,6 +114,7 @@ class PodInfo(BaseModel): newCpu: Optional[int] = None newMemory: Optional[int] = None + signalAtMem: Optional[int] = None def dict(self, *a, **kw): res = super().dict(*a, **kw) @@ -180,7 +181,7 @@ class CrawlStatus(BaseModel): initRedis: bool = False crawlerImage: Optional[str] = None lastActiveTime: str = "" - podStatus: Optional[DefaultDict[str, PodInfo]] = defaultdict( + podStatus: DefaultDict[str, PodInfo] = defaultdict( lambda: PodInfo() # pylint: disable=unnecessary-lambda ) # placeholder for pydantic 2.0 -- will require this version diff --git a/chart/app-templates/crawler.yaml b/chart/app-templates/crawler.yaml index e9ea1834d..ea9f4e33b 100644 --- a/chart/app-templates/crawler.yaml +++ b/chart/app-templates/crawler.yaml @@ -175,7 +175,7 @@ spec: resources: limits: - memory: "{{ memory }}" + memory: "{{ memory_limit }}" requests: cpu: "{{ cpu }}" diff --git a/chart/templates/role.yaml b/chart/templates/role.yaml index bfce0c05f..16f860734 100644 --- a/chart/templates/role.yaml +++ b/chart/templates/role.yaml @@ -7,7 +7,7 @@ metadata: rules: - apiGroups: [""] resources: ["pods", "pods/exec", "pods/log", "services", "configmaps", "secrets", "events", "persistentvolumeclaims"] - verbs: ["get", "list", "watch", "create", "update", "patch", "delete", "deletecollection"] + verbs: ["get", "list", "watch", "create", "update", "patch", "delete", "deletecollection", "exec"] - apiGroups: ["batch", "extensions", "apps"] resources: ["jobs", "cronjobs", "statefulsets"]