Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement batching strategies #3630

Open
wants to merge 22 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
11 changes: 10 additions & 1 deletion src/bentoml/_internal/configuration/containers.py
Expand Up @@ -139,7 +139,16 @@ def __init__(
) from None

def _finalize(self):
RUNNER_CFG_KEYS = ["batching", "resources", "logging", "metrics", "timeout"]
RUNNER_CFG_KEYS = [
"batching",
"resources",
"logging",
"metrics",
"timeout",
"strategy",
"strategy_options",
]

global_runner_cfg = {k: self.config["runners"][k] for k in RUNNER_CFG_KEYS}
custom_runners_cfg = dict(
filter(
Expand Down
2 changes: 2 additions & 0 deletions src/bentoml/_internal/configuration/v1/__init__.py
Expand Up @@ -142,6 +142,8 @@
s.Optional("enabled"): bool,
s.Optional("max_batch_size"): s.And(int, ensure_larger_than_zero),
s.Optional("max_latency_ms"): s.And(int, ensure_larger_than_zero),
s.Optional("strategy"): str,
s.Optional("strategy_options"): dict,
},
# NOTE: there is a distinction between being unset and None here; if set to 'None'
# in configuration for a specific runner, it will override the global configuration.
Expand Down
Expand Up @@ -84,6 +84,13 @@ runners:
batching:
enabled: true
max_batch_size: 100
# which strategy to use to batch requests
# there are currently two available options:
# - fixed_wait: always wait a fixed time after a request arrives
# - average_wait: intelligently wait a time based on
strategy: intelligent_wait
strategy_options:
decay: 0.95
max_latency_ms: 10000
logging:
access:
Expand Down
118 changes: 98 additions & 20 deletions src/bentoml/_internal/marshal/dispatcher.py
Expand Up @@ -7,6 +7,8 @@
import functools
import traceback
import collections
from abc import ABC
from abc import abstractmethod

import numpy as np

Expand Down Expand Up @@ -41,7 +43,7 @@ class Job:

class Optimizer:
"""
Analyse historical data to optimize CorkDispatcher.
Analyze historical data to predict execution time using a simple linear regression on batch size.
"""

N_KEPT_SAMPLE = 50 # amount of outbound info kept for inferring params
Expand Down Expand Up @@ -98,14 +100,97 @@ def trigger_refresh(self):
T_OUT = t.TypeVar("T_OUT")


class CorkDispatcher:
"""
A decorator that:
* wrap batch function
* implement CORK algorithm to cork & release calling of wrapped function
The wrapped function should be an async function.
"""
BATCHING_STRATEGY_REGISTRY = {}


class BatchingStrategy(abc.ABC):
strategy_id: str

@abc.abstractmethod
def controller(queue: t.Sequence[Job], predict_execution_time: t.Callable[t.Sequence[Job]], dispatch: t.Callable[]):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def controller(queue: t.Sequence[Job], predict_execution_time: t.Callable[t.Sequence[Job]], dispatch: t.Callable[]):
def controller(queue: t.Sequence[Job], predict_execution_time: t.Callable[[t.Sequence[Job]], t.Any], dispatch: t.Callable[..., t.Any]):

pass

def __init_subclass__(cls, strategy_id: str):
BATCHING_STRATEGY_REGISTRY[strategy_id] = cls
cls.strategy_id = strategy_id


class TargetLatencyStrategy(strategy_id="target_latency"):
latency: float = 1

def __init__(self, options: dict[t.Any, t.Any]):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: typed dict for init.

for key in options:
if key == "latency":
self.latency = options[key] / 1000.0
else:
logger.warning("Strategy 'target_latency' ignoring unknown configuration key '{key}'.")

async def wait(queue: t.Sequence[Job], optimizer: Optimizer, max_latency: float, max_batch_size: int, tick_interval: float):
now = time.time()
w0 = now - queue[0].enqueue_time
latency_0 = w0 + optimizer.o_a * n + optimizer.o_b

while latency_0 < self.latency:
n = len(queue)
now = time.time()
w0 = now - queue[0].enqueue_time
latency_0 = w0 + optimizer.o_a * n + optimizer.o_b

await asyncio.sleep(tick_interval)


class FixedWaitStrategy(strategy_id="fixed_wait"):
wait: float = 1

def __init__(self, options: dict[t.Any, t.Any]):
for key in options:
if key == "wait":
self.wait = options[key] / 1000.0
else:
logger.warning("Strategy 'fixed_wait' ignoring unknown configuration key '{key}'")

async def wait(queue: t.Sequence[Job], optimizer: Optimizer, max_latency: float, max_batch_size: int, tick_interval: float):
now = time.time()
w0 = now - queue[0].enqueue_time

if w0 < self.wait:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add loop checking for max_batch_size here.

await asyncio.sleep(self.wait - w0)


class IntelligentWaitStrategy(strategy_id="intelligent_wait"):
decay: float = 0.95

def __init__(self, options: dict[t.Any, t.Any]):
for key in options:
if key == "decay":
self.decay = options[key]
else:
logger.warning("Strategy 'intelligent_wait' ignoring unknown configuration value")

async def wait(queue: t.Sequence[Job], optimizer: Optimizer, max_latency: float, max_batch_size: int, tick_interval: float):
n = len(queue)
now = time.time()
wn = now - queue[-1].enqueue_time
latency_0 = w0 + optimizer.o_a * n + optimizer.o_b
while (
# if we don't already have enough requests,
n < max_batch_size
# we are not about to cancel the first request,
and latency_0 + dt <= self.max_latency * 0.95
# and waiting will cause average latency to decrese
sauyon marked this conversation as resolved.
Show resolved Hide resolved
and n * (wn + dt + optimizer.o_a) <= optimizer.wait * decay
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

n: number of requests in queue
multiplied by
(
wn: predictor of next request time +
dt: tick time +
o_a: optimizer slope
)

^ The above is a measure of how much latency will be added to every request if we wait for a new request and add that to the batch

less than

optimizer.wait: the average amount of time a request sits in queue
*
decay: an arbitrary decay value so that average wait should hopefully decay over time

):
n = len(queue)
now = time.time()
w0 = now - queue[0].enqueue_time
latency_0 = w0 + optimizer.o_a * n + optimizer.o_b

# wait for additional requests to arrive
await asyncio.sleep(tick_interval)



class Dispatcher:
def __init__(
self,
max_latency_in_ms: int,
Expand Down Expand Up @@ -283,19 +368,11 @@ async def controller(self):
continue
await asyncio.sleep(self.tick_interval)
continue
if (
n < self.max_batch_size
and n * (wn + dt + (a or 0)) <= self.optimizer.wait * decay
):
n = len(self._queue)
now = time.time()
wn = now - self._queue[-1].enqueue_time
latency_0 += dt

# wait for additional requests to arrive
await asyncio.sleep(self.tick_interval)
continue

# we are now free to dispatch whenever we like
await self.strategy.wait(self._queue, optimizer, self.max_latency, self.max_batch_size, self.tick_interval)

n = len(self._queue)
n_call_out = min(self.max_batch_size, n)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this (and above) logic into strategy.

# call
self._sema.acquire()
Expand All @@ -306,6 +383,7 @@ async def controller(self):
except Exception as e: # pylint: disable=broad-except
logger.error(traceback.format_exc(), exc_info=e)


async def inbound_call(self, data: t.Any):
now = time.time()
future = self._loop.create_future()
Expand Down
2 changes: 2 additions & 0 deletions src/bentoml/_internal/models/model.py
Expand Up @@ -326,6 +326,7 @@ def to_runner(
name: str = "",
max_batch_size: int | None = None,
max_latency_ms: int | None = None,
batching_strategy: BatchingStrategy | None = None,
method_configs: dict[str, dict[str, int]] | None = None,
) -> Runner:
"""
Expand All @@ -348,6 +349,7 @@ def to_runner(
models=[self],
max_batch_size=max_batch_size,
max_latency_ms=max_latency_ms,
batching_strategy=batching_strategy,
method_configs=method_configs,
)

Expand Down
32 changes: 30 additions & 2 deletions src/bentoml/_internal/runner/runner.py
Expand Up @@ -4,6 +4,7 @@
import logging
from abc import ABC
from abc import abstractmethod
from pprint import pprint

import attr
from simple_di import inject
Expand All @@ -19,6 +20,7 @@
from .runner_handle import RunnerHandle
from .runner_handle import DummyRunnerHandle
from ..configuration.containers import BentoMLContainer
from ..marshal.dispatcher import BATCHING_STRATEGY_REGISTRY

if t.TYPE_CHECKING:
from ...triton import Runner as TritonRunner
Expand Down Expand Up @@ -47,6 +49,7 @@ class RunnerMethod(t.Generic[T, P, R]):
config: RunnableMethodConfig
max_batch_size: int
max_latency_ms: int
batching_strategy: BatchingStrategy

def run(self, *args: P.args, **kwargs: P.kwargs) -> R:
return self.runner._runner_handle.run_method(self, *args, **kwargs)
Expand Down Expand Up @@ -159,6 +162,7 @@ def __init__(
models: list[Model] | None = None,
max_batch_size: int | None = None,
max_latency_ms: int | None = None,
batching_strategy: BatchingStrategy | None = None,
method_configs: dict[str, dict[str, int]] | None = None,
) -> None:
"""
Expand All @@ -177,8 +181,9 @@ def __init__(
models: An optional list composed of ``bentoml.Model`` instances.
max_batch_size: Max batch size config for dynamic batching. If not provided, use the default value from
configuration.
max_latency_ms: Max latency config for dynamic batching. If not provided, use the default value from
configuration.
max_latency_ms: Max latency config. If not provided, uses the default value from configuration.
batching_strategy: Batching strategy for dynamic batching. If not provided, uses the default value
from the configuration.
method_configs: A dictionary per method config for this given Runner signatures.

Returns:
Expand Down Expand Up @@ -215,13 +220,31 @@ def __init__(

method_max_batch_size = None
method_max_latency_ms = None
method_batching_strategy = None
if method_name in method_configs:
method_max_batch_size = method_configs[method_name].get(
"max_batch_size"
)
method_max_latency_ms = method_configs[method_name].get(
"max_latency_ms"
)
method_batching_strategy = method_configs[method_name].get(
"batching_strategy"
)

if config["batching"]["strategy"] not in BATCHING_STRATEGY_REGISTRY:
raise BentoMLConfigException(
f"Unknown batching strategy '{config['batching']['strategy']}'. Available strategies are: {','.join(BATCHING_STRATEGY_REGISTRY.keys())}.",
)

try:
default_batching_strategy = BATCHING_STRATEGY_REGISTRY[
config["batching"]["strategy"]
](**config["batching"]["strategy_options"])
except Exception as e:
raise BentoMLConfigException(
f"Initializing strategy '{config['batching']['strategy']}' with configured options ({pprint(config['batching']['strategy_options'])}) failed."
) from e

runner_method_map[method_name] = RunnerMethod(
runner=self,
Expand All @@ -237,6 +260,11 @@ def __init__(
max_latency_ms,
default=config["batching"]["max_latency_ms"],
),
batching_strategy=first_not_none(
method_batching_strategy,
batching_strategy,
default=BATCHING_STRATEGY_REGISTRY[config["batching"]["strategy"]],
),
)

self.__attrs_init__(
Expand Down
1 change: 1 addition & 0 deletions src/bentoml/_internal/server/runner_app.py
Expand Up @@ -58,6 +58,7 @@ def __init__(
max_batch_size = method.max_batch_size if method.config.batchable else 1
self.dispatchers[method.name] = CorkDispatcher(
max_latency_in_ms=method.max_latency_ms,
batching_strategy=method.batching_strategy,
max_batch_size=max_batch_size,
fallback=functools.partial(
ServiceUnavailable, message="process is overloaded"
Expand Down