Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Crawler pod memory padding + auto scaling #1631

Merged
merged 4 commits into from Mar 28, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
26 changes: 26 additions & 0 deletions backend/btrixcloud/k8sapi.py
Expand Up @@ -288,3 +288,29 @@ async def has_custom_jobs_with_label(self, plural, label) -> bool:
# pylint: disable=broad-exception-caught
except Exception:
return False

async def send_signal_to_pod(self, pod_name, signame) -> bool:
"""send signal to all pods"""
command = ["bash", "-c", f"kill -s {signame} 1"]
signaled = False

try:
print(f"Sending {signame} to {pod_name}", flush=True)

res = await self.core_api_ws.connect_get_namespaced_pod_exec(
name=pod_name,
namespace=self.namespace,
command=command,
stdout=True,
)
if res:
print("Result", res, flush=True)

else:
signaled = True

# pylint: disable=broad-except
except Exception as exc:
print(f"Send Signal Error: {exc}", flush=True)

return signaled
19 changes: 13 additions & 6 deletions backend/btrixcloud/operator/baseoperator.py
Expand Up @@ -42,28 +42,35 @@ def compute_crawler_resources(self):
"""compute memory / cpu resources for crawlers"""
p = self.shared_params
num = max(int(p["crawler_browser_instances"]) - 1, 0)
crawler_cpu: float = 0
crawler_memory: int = 0
print("crawler resources")
if not p.get("crawler_cpu"):
base = parse_quantity(p["crawler_cpu_base"])
extra = parse_quantity(p["crawler_extra_cpu_per_browser"])

# cpu is a floating value of cpu cores
p["crawler_cpu"] = float(base + num * extra)
crawler_cpu = float(base + num * extra)

print(f"cpu = {base} + {num} * {extra} = {p['crawler_cpu']}")
print(f"cpu = {base} + {num} * {extra} = {crawler_cpu}")
else:
print(f"cpu = {p['crawler_cpu']}")
crawler_cpu = float(parse_quantity(p["crawler_cpu"]))
print(f"cpu = {crawler_cpu}")

if not p.get("crawler_memory"):
base = parse_quantity(p["crawler_memory_base"])
extra = parse_quantity(p["crawler_extra_memory_per_browser"])

# memory is always an int
p["crawler_memory"] = int(base + num * extra)
crawler_memory = int(base + num * extra)

print(f"memory = {base} + {num} * {extra} = {p['crawler_memory']}")
print(f"memory = {base} + {num} * {extra} = {crawler_memory}")
else:
print(f"memory = {p['crawler_memory']}")
crawler_memory = int(parse_quantity(p["crawler_memory"]))
print(f"memory = {crawler_memory}")

p["crawler_cpu"] = crawler_cpu
p["crawler_memory"] = crawler_memory

def compute_profile_resources(self):
"""compute memory /cpu resources for a single profile browser"""
Expand Down
48 changes: 35 additions & 13 deletions backend/btrixcloud/operator/crawls.py
Expand Up @@ -61,6 +61,19 @@
EXEC_TIME_UPDATE_SECS = 60


# scale up if exceeded this threshold of mem usage (eg. 90%)
MEM_SCALE_UP_THRESHOLD = 0.90

# scale up by this much
MEM_SCALE_UP = 1.2

# soft OOM if exceeded this threshold of mem usage (eg. 100%)
MEM_SOFT_OOM_THRESHOLD = 1.0

# set memory limit to this much of request for extra padding
MEM_LIMIT_PADDING = 1.2


# pylint: disable=too-many-public-methods, too-many-locals, too-many-branches, too-many-statements
# pylint: disable=invalid-name, too-many-lines, too-many-return-statements
# ============================================================================
Expand Down Expand Up @@ -210,7 +223,7 @@ async def sync_crawls(self, data: MCSyncData):
)

# auto sizing handled here
self.handle_auto_size(crawl.id, status.podStatus)
await self.handle_auto_size(status.podStatus)

if status.finished:
return await self.finalize_response(
Expand Down Expand Up @@ -326,6 +339,7 @@ def _load_crawler(self, params, i, status, children):
params["name"] = name
params["cpu"] = pod_info.newCpu or params.get("crawler_cpu")
params["memory"] = pod_info.newMemory or params.get("crawler_memory")
params["memory_limit"] = float(params["memory"]) * MEM_LIMIT_PADDING
params["do_restart"] = (
pod_info.should_restart_pod() or params.get("force_restart")
) and has_pod
Expand Down Expand Up @@ -758,8 +772,6 @@ async def sync_crawl_state(
qa_run_id = crawl.id if crawl.is_qa else None

while page_crawled:
print("PAGE DATA", flush=True)
print(page_crawled, flush=True)
page_dict = json.loads(page_crawled)
await self.page_ops.add_page_to_db(
page_dict, crawl.db_crawl_id, qa_run_id, crawl.oid
Expand Down Expand Up @@ -1010,18 +1022,28 @@ async def add_used_stats(self, crawl_id, pod_status, redis, metrics):
pod_info.used.memory = int(parse_quantity(usage["memory"]))
pod_info.used.cpu = float(parse_quantity(usage["cpu"]))

def handle_auto_size(self, _, pod_status):
async def handle_auto_size(self, pod_status) -> None:
"""auto scale pods here, experimental"""
for name, pod in pod_status.items():
# if pod crashed due to OOM, increase mem
# if pod.isNewExit and pod.reason == "oom":
# pod.newMemory = int(float(pod.allocated.memory) * 1.2)
# print(f"Resizing pod {name} -> mem {pod.newMemory} - OOM Detected")

# if redis is using >0.90 of its memory, increase mem
if name.startswith("redis") and pod.get_percent_memory() > 0.90:
pod.newMemory = int(float(pod.allocated.memory) * 1.2)
print(f"Resizing pod {name} -> mem {pod.newMemory} - Redis Capacity")
mem_usage = pod.get_percent_memory()
# if pod is using >MEM_SCALE_UP_THRESHOLD of its memory, increase mem
if mem_usage > MEM_SCALE_UP_THRESHOLD:
pod.newMemory = int(float(pod.allocated.memory) * MEM_SCALE_UP)
print(
f"Mem {mem_usage}: Resizing pod {name} -> mem {pod.newMemory} - Scale Up"
)

# if crawler pod is using its OOM threshold, attempt a soft OOM
# via a second SIGTERM
if mem_usage >= MEM_SOFT_OOM_THRESHOLD and name.startswith("crawl"):
await self.k8s.send_signal_to_pod(name, "SIGTERM")

# if any pod crashed due to OOM, increase mem
elif pod.isNewExit and pod.reason == "oom":
pod.newMemory = int(float(pod.allocated.memory) * MEM_SCALE_UP)
print(
f"Mem {mem_usage}: Resizing pod {name} -> mem {pod.newMemory} - OOM Detected"
)

async def log_crashes(self, crawl_id, pod_status, redis):
"""report/log any pod crashes here"""
Expand Down
2 changes: 1 addition & 1 deletion chart/app-templates/crawler.yaml
Expand Up @@ -175,7 +175,7 @@ spec:

resources:
limits:
memory: "{{ memory }}"
memory: "{{ memory_limit }}"

requests:
cpu: "{{ cpu }}"
Expand Down