Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement batching strategies #3630

Open
wants to merge 22 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
14 changes: 11 additions & 3 deletions docs/source/concepts/runner.rst
Expand Up @@ -292,15 +292,17 @@ Runner Definition

# below are also configurable via config file:

# default configs:
max_batch_size=.. # default max batch size will be applied to all run methods, unless override in the runnable_method_configs
max_latency_ms=.. # default max latency will be applied to all run methods, unless override in the runnable_method_configs
# default configs, which will be applied to all run methods, unless overriden for a specific method:
max_batch_size=..,
max_latency_ms=..,
batching_strategy=..,

runnable_method_configs=[
{
method_name="predict",
max_batch_size=..,
max_latency_ms=..,
batching_strategy=..,
}
],
)
Expand Down Expand Up @@ -333,6 +335,9 @@ To explicitly disable or control adaptive batching behaviors at runtime, configu
enabled: true
max_batch_size: 100
max_latency_ms: 500
strategy: intelligent_wait
strategy_options:
decay: 0.95

.. tab-item:: Individual Runner
:sync: individual_runner
Expand All @@ -346,6 +351,9 @@ To explicitly disable or control adaptive batching behaviors at runtime, configu
enabled: true
max_batch_size: 100
max_latency_ms: 500
strategy: intelligent_wait
strategy_options:
decay: 0.95

Resource Allocation
^^^^^^^^^^^^^^^^^^^
Expand Down
62 changes: 58 additions & 4 deletions docs/source/guides/batching.rst
Expand Up @@ -52,28 +52,82 @@ In addition to declaring model as batchable, batch dimensions can also be config
Configuring Batching
--------------------

If a model supports batching, adaptive batching is enabled by default. To explicitly disable or control adaptive batching behaviors at runtime, configuration can be specified under the ``batching`` key.
Additionally, there are two configurations for customizing batching behaviors, `max_batch_size` and `max_latency_ms`.
If a model supports batching, adaptive batching is enabled by default. To explicitly disable or
control adaptive batching behaviors at runtime, configuration can be specified under the
``batching`` key. Additionally, there are three configuration keys for customizing batching
behaviors, ``max_batch_size``, ``max_latency_ms``, and ``strategy``.

Max Batch Size
^^^^^^^^^^^^^^

Configured through the ``max_batch_size`` key, max batch size represents the maximum size a batch can reach before releasing for inferencing. Max batch size should be set based on the capacity of the available system resources, e.g. memory or GPU memory.
Configured through the ``max_batch_size`` key, max batch size represents the maximum size a batch
can reach before being released for inferencing. Max batch size should be set based on the capacity
of the available system resources, e.g. memory or GPU memory.

Max Latency
^^^^^^^^^^^

Configured through the ``max_latency_ms`` key, max latency represents the maximum latency in milliseconds that a batch should wait before releasing for inferencing. Max latency should be set based on the service level objective (SLO) of the inference requests.
Configured through the ``max_latency_ms`` key, max latency represents the maximum latency in
milliseconds that the scheduler will attempt to uphold by cancelling requests when it thinks the
runner server is incapable of servicing that request in time. Max latency should be set based on the
service level objective (SLO) of the inference requests.

Batching Strategy
^^^^^^^^^^^^^^^^^
Comment on lines +75 to +76
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Docs changes are outdated, correct?


Configured through the ``strategy`` and ``strategy_options`` keys, the batching strategy determines
the way that the scheduler chooses a batching window, i.e. the time it waits for requests to combine
them into a batch before dispatching it to begin execution. There are three options:

- target_latency: this strategy waits until it expects the first request received will take around
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

``latency`` time to complete before beginning execution. Choose this method if
you think that your service workload will be very bursty and so the intelligent
wait algorithm will do a poor job of identifying average wait times.

It takes one option, ``latency_ms`` (default 1000), which is the latency target
to use for dispatch.

- fixed_wait: this strategy will wait a fixed amount of time after the first request has been
received. It differs from the target_latency strategy in that it does not consider
the amount of time that it expects a batch will take to execute.

It takes one option, ``wait_ms`` (default 1000), the amount of time to wait after
receiving the first request.

- intelligent_wait: this strategy waits intelligently in an effort to optimize average latency
across all requests. It takes the average the average time spent in queue, then
calculates the average time it expects to take to wait for and then execute the
batch including the next request. If that time, multiplied by number of
requests in the queue, is less than the average wait time, it will continue
waiting for the next request to arrive. This is the default, and the other
options should only be chosen if undesirable latency behavior is observed.

It has one option, ``decay`` (default 0.95), which is the rate at which the
dispatcher decays the wait time, per dispatched job. Note that this does not
decay the actual expected wait time, but instead reduces the batching window,
which indirectly reduces the average waiting time.


.. code-block:: yaml
:caption: ⚙️ `configuration.yml`

runners:
# batching options for all runners
batching:
enabled: true
max_batch_size: 100
max_latency_ms: 500
strategy: avg_wait
iris_clf:
# batching options for specifically the iris_clf runner
# these options override the above
batching:
enabled: true
max_batch_size: 100
max_latency_ms: 500
strategy: target_latency
strategy_options:
latency_ms: 200

Monitoring
----------
Expand Down
11 changes: 10 additions & 1 deletion src/bentoml/_internal/configuration/containers.py
Expand Up @@ -139,7 +139,16 @@ def __init__(
) from None

def _finalize(self):
RUNNER_CFG_KEYS = ["batching", "resources", "logging", "metrics", "timeout"]
RUNNER_CFG_KEYS = [
"batching",
"resources",
"logging",
"metrics",
"timeout",
"strategy",
"strategy_options",
]

global_runner_cfg = {k: self.config["runners"][k] for k in RUNNER_CFG_KEYS}
custom_runners_cfg = dict(
filter(
Expand Down
2 changes: 2 additions & 0 deletions src/bentoml/_internal/configuration/v1/__init__.py
Expand Up @@ -142,6 +142,8 @@
s.Optional("enabled"): bool,
s.Optional("max_batch_size"): s.And(int, ensure_larger_than_zero),
s.Optional("max_latency_ms"): s.And(int, ensure_larger_than_zero),
s.Optional("strategy"): str,
s.Optional("strategy_options"): dict,
},
# NOTE: there is a distinction between being unset and None here; if set to 'None'
# in configuration for a specific runner, it will override the global configuration.
Expand Down
Expand Up @@ -84,6 +84,13 @@ runners:
batching:
enabled: true
max_batch_size: 100
# which strategy to use to batch requests
# there are currently two available options:
# - fixed_wait: always wait a fixed time after a request arrives
# - average_wait: intelligently wait a time based on
strategy: intelligent_wait
strategy_options:
decay: 0.95
max_latency_ms: 10000
logging:
access:
Expand Down
142 changes: 118 additions & 24 deletions src/bentoml/_internal/marshal/dispatcher.py
Expand Up @@ -7,6 +7,8 @@
import functools
import traceback
import collections
from abc import ABC
from abc import abstractmethod

import numpy as np

Expand All @@ -32,10 +34,16 @@ def is_locked(self):
def release(self):
self.sema += 1

@attr.define
class Job:
enqueue_time: float,
data: t.Any,
future: asyncio.Future[t.Any],
dispatch_time: float = 0,

class Optimizer:
"""
Analyse historical data to optimize CorkDispatcher.
Analyze historical data to predict execution time using a simple linear regression on batch size.
"""

N_KEPT_SAMPLE = 50 # amount of outbound info kept for inferring params
Expand Down Expand Up @@ -92,14 +100,97 @@ def trigger_refresh(self):
T_OUT = t.TypeVar("T_OUT")


class CorkDispatcher:
"""
A decorator that:
* wrap batch function
* implement CORK algorithm to cork & release calling of wrapped function
The wrapped function should be an async function.
"""
BATCHING_STRATEGY_REGISTRY = {}


class BatchingStrategy(abc.ABC):
strategy_id: str

@abc.abstractmethod
def controller(queue: t.Sequence[Job], predict_execution_time: t.Callable[t.Sequence[Job]], dispatch: t.Callable[]):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def controller(queue: t.Sequence[Job], predict_execution_time: t.Callable[t.Sequence[Job]], dispatch: t.Callable[]):
def controller(queue: t.Sequence[Job], predict_execution_time: t.Callable[[t.Sequence[Job]], t.Any], dispatch: t.Callable[..., t.Any]):

pass

def __init_subclass__(cls, strategy_id: str):
BATCHING_STRATEGY_REGISTRY[strategy_id] = cls
cls.strategy_id = strategy_id


class TargetLatencyStrategy(strategy_id="target_latency"):
latency: float = 1

def __init__(self, options: dict[t.Any, t.Any]):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: typed dict for init.

for key in options:
if key == "latency":
self.latency = options[key] / 1000.0
else:
logger.warning("Strategy 'target_latency' ignoring unknown configuration key '{key}'.")

async def wait(queue: t.Sequence[Job], optimizer: Optimizer, max_latency: float, max_batch_size: int, tick_interval: float):
now = time.time()
w0 = now - queue[0].enqueue_time
latency_0 = w0 + optimizer.o_a * n + optimizer.o_b

while latency_0 < self.latency:
n = len(queue)
now = time.time()
w0 = now - queue[0].enqueue_time
latency_0 = w0 + optimizer.o_a * n + optimizer.o_b

await asyncio.sleep(tick_interval)


class FixedWaitStrategy(strategy_id="fixed_wait"):
wait: float = 1

def __init__(self, options: dict[t.Any, t.Any]):
for key in options:
if key == "wait":
self.wait = options[key] / 1000.0
else:
logger.warning("Strategy 'fixed_wait' ignoring unknown configuration key '{key}'")

async def wait(queue: t.Sequence[Job], optimizer: Optimizer, max_latency: float, max_batch_size: int, tick_interval: float):
now = time.time()
w0 = now - queue[0].enqueue_time

if w0 < self.wait:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add loop checking for max_batch_size here.

await asyncio.sleep(self.wait - w0)


class IntelligentWaitStrategy(strategy_id="intelligent_wait"):
decay: float = 0.95

def __init__(self, options: dict[t.Any, t.Any]):
for key in options:
if key == "decay":
self.decay = options[key]
else:
logger.warning("Strategy 'intelligent_wait' ignoring unknown configuration value")

async def wait(queue: t.Sequence[Job], optimizer: Optimizer, max_latency: float, max_batch_size: int, tick_interval: float):
n = len(queue)
now = time.time()
wn = now - queue[-1].enqueue_time
latency_0 = w0 + optimizer.o_a * n + optimizer.o_b
while (
# if we don't already have enough requests,
n < max_batch_size
# we are not about to cancel the first request,
and latency_0 + dt <= self.max_latency * 0.95
# and waiting will cause average latency to decrese
sauyon marked this conversation as resolved.
Show resolved Hide resolved
and n * (wn + dt + optimizer.o_a) <= optimizer.wait * decay
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

n: number of requests in queue
multiplied by
(
wn: predictor of next request time +
dt: tick time +
o_a: optimizer slope
)

^ The above is a measure of how much latency will be added to every request if we wait for a new request and add that to the batch

less than

optimizer.wait: the average amount of time a request sits in queue
*
decay: an arbitrary decay value so that average wait should hopefully decay over time

):
n = len(queue)
now = time.time()
w0 = now - queue[0].enqueue_time
latency_0 = w0 + optimizer.o_a * n + optimizer.o_b

# wait for additional requests to arrive
await asyncio.sleep(tick_interval)



class Dispatcher:
def __init__(
self,
max_latency_in_ms: int,
Expand All @@ -123,9 +214,7 @@ def __init__(
self.tick_interval = 0.001

self._controller = None
self._queue: collections.deque[
tuple[float, t.Any, asyncio.Future[t.Any]]
] = collections.deque() # TODO(bojiang): maxlen
self._queue: collections.deque[Job] = collections.deque() # TODO(bojiang): maxlen
self._sema = shared_sema if shared_sema else NonBlockSema(1)

def shutdown(self):
Expand Down Expand Up @@ -214,6 +303,9 @@ async def train_optimizer(
# call
self._sema.acquire()
inputs_info = tuple(self._queue.pop() for _ in range(n_call_out))
for info in inputs_info:
# fake wait as 0 for training requests
info[0] = now
self._loop.create_task(self.outbound_call(inputs_info))
except asyncio.CancelledError:
return
Expand Down Expand Up @@ -259,27 +351,28 @@ async def controller(self):
dt = self.tick_interval
decay = 0.95 # the decay rate of wait time
now = time.time()
w0 = now - self._queue[0][0]
wn = now - self._queue[-1][0]
w0 = now - self._queue[0].enqueue_time
wn = now - self._queue[-1].enqueue_time
a = self.optimizer.o_a
b = self.optimizer.o_b

if n > 1 and (w0 + a * n + b) >= self.max_latency_in_ms:
self._queue.popleft()[2].cancel()
# the estimated latency of the first request if we began processing now
latency_0 = w0 + a * n + b

if n > 1 and latency_0 >= self.max_latency_in_ms:
self._queue.popleft().future.cancel()
continue
if self._sema.is_locked():
if n == 1 and w0 >= self.max_latency_in_ms:
self._queue.popleft()[2].cancel()
self._queue.popleft().future.cancel()
continue
await asyncio.sleep(self.tick_interval)
continue
if (
n < self.max_batch_size
and n * (wn + dt + (a or 0)) <= self.optimizer.wait * decay
):
await asyncio.sleep(self.tick_interval)
continue

# we are now free to dispatch whenever we like
await self.strategy.wait(self._queue, optimizer, self.max_latency, self.max_batch_size, self.tick_interval)

n = len(self._queue)
n_call_out = min(self.max_batch_size, n)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this (and above) logic into strategy.

# call
self._sema.acquire()
Expand All @@ -290,6 +383,7 @@ async def controller(self):
except Exception as e: # pylint: disable=broad-except
logger.error(traceback.format_exc(), exc_info=e)


async def inbound_call(self, data: t.Any):
now = time.time()
future = self._loop.create_future()
Expand All @@ -300,7 +394,7 @@ async def inbound_call(self, data: t.Any):
return await future

async def outbound_call(
self, inputs_info: tuple[tuple[float, t.Any, asyncio.Future[t.Any]]]
self, inputs_info: tuple[Job, ...]
):
_time_start = time.time()
_done = False
Expand All @@ -315,7 +409,7 @@ async def outbound_call(
_done = True
self.optimizer.log_outbound(
n=len(inputs_info),
wait=_time_start - inputs_info[-1][0],
wait=_time_start - inputs_info[-1].enqueue_time,
duration=time.time() - _time_start,
)
except asyncio.CancelledError:
Expand Down