Skip to content

Commit

Permalink
fix: del unused var
Browse files Browse the repository at this point in the history
  • Loading branch information
xianml committed Oct 20, 2023
1 parent e7e9532 commit e9745a5
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 20 deletions.
22 changes: 21 additions & 1 deletion src/bentoml/_internal/cloud/base.py
Expand Up @@ -18,14 +18,34 @@
from rich.progress import TimeRemainingColumn
from rich.progress import TransferSpeedColumn

from ...exceptions import BentoMLException
from ..bento import Bento
from ..bento import BentoStore
from ..models import Model
from ..models import ModelStore
from ..tag import Tag

FILE_CHUNK_SIZE = 100 * 1024 * 1024 # 100Mb
SPOOLED_FILE_MAX_SIZE = 5 * 1024 * 1024 * 1024 # 5GB


def io_wrapper(
memory: int,
*,
read_cb: t.Callable[[int], None] | None = None,
write_cb: t.Callable[[int], None] | None = None,
) -> CallbackIOWrapper | CallbackSpooledTemporaryFileIO:
"""
io_wrapper is a wrapper for SpooledTemporaryFileIO and CallbackIOWrapper
"""
if memory == -1:
return CallbackIOWrapper(read_cb=read_cb, write_cb=write_cb)
elif memory > 0:
print("Using SpooledTemporaryFileIO, memory:", memory)
return CallbackSpooledTemporaryFileIO(
memory * 1024**3, read_cb=read_cb, write_cb=write_cb
)
else:
raise BentoMLException(f"Option maxmemory must be -1 or > 0, got {memory}")


class CallbackSpooledTemporaryFileIO(SpooledTemporaryFile):
Expand Down
29 changes: 15 additions & 14 deletions src/bentoml/_internal/cloud/bentocloud.py
Expand Up @@ -27,8 +27,8 @@
from ..utils import calc_dir_size
from .base import FILE_CHUNK_SIZE
from .base import CallbackIOWrapper
from .base import CallbackSpooledTemporaryFileIO
from .base import CloudClient
from .base import io_wrapper
from .config import get_rest_api_client
from .deployment import Deployment
from .schemas import BentoApiSchema
Expand Down Expand Up @@ -71,13 +71,19 @@ def push_bento(
force: bool = False,
threads: int = 10,
context: str | None = None,
maxmemory: int = -1,
):
with Live(self.progress_group):
upload_task_id = self.transmission_progress.add_task(
f'Pushing Bento "{bento.tag}"', start=False, visible=False
)
self._do_push_bento(
bento, upload_task_id, force=force, threads=threads, context=context
bento,
upload_task_id,
force=force,
threads=threads,
context=context,
maxmemory=maxmemory,
)

@inject
Expand All @@ -89,6 +95,7 @@ def _do_push_bento(
force: bool = False,
threads: int = 10,
context: str | None = None,
maxmemory: int = -1,
model_store: ModelStore = Provide[BentoMLContainer.model_store],
):
yatai_rest_client = get_rest_api_client(context)
Expand All @@ -114,6 +121,7 @@ def push_model(model: Model) -> None:
force=force,
threads=threads,
context=context,
maxmemory=maxmemory,
)

futures: t.Iterator[None] = executor.map(push_model, models)
Expand Down Expand Up @@ -571,7 +579,7 @@ def push_model(
force: bool = False,
threads: int = 10,
context: str | None = None,
memory: int = -1,
maxmemory: int = -1,
):
with Live(self.progress_group):
upload_task_id = self.transmission_progress.add_task(
Expand All @@ -583,7 +591,7 @@ def push_model(
force=force,
threads=threads,
context=context,
memory=memory,
maxmemory=maxmemory,
)

def _do_push_model(
Expand All @@ -594,7 +602,7 @@ def _do_push_model(
force: bool = False,
threads: int = 10,
context: str | None = None,
memory: int = -1,
maxmemory: int = -1,
):
yatai_rest_client = get_rest_api_client(context)
name = model.tag.name
Expand Down Expand Up @@ -675,15 +683,8 @@ def io_cb(x: int):
with io_mutex:
self.transmission_progress.update(upload_task_id, advance=x)

def IOWrapper(memory: int):
if memory == -1:
return CallbackIOWrapper(read_cb=io_cb)
elif memory > 0:
return CallbackSpooledTemporaryFileIO(memory, read_cb=io_cb)
else:
raise BentoMLException(f"Option memory must be -1 or > 0, got {memory}")

with IOWrapper(memory) as tar_io:
# limit the max memory usage when uploading model
with io_wrapper(maxmemory, read_cb=io_cb) as tar_io:
with self.spin(text=f'Creating tar archive for model "{model.tag}"..'):
with tarfile.open(fileobj=tar_io, mode="w:") as tar:
tar.add(model.path, arcname="./")
Expand Down
9 changes: 8 additions & 1 deletion src/bentoml_cli/bentos.py
Expand Up @@ -281,8 +281,14 @@ def pull(shared_options: SharedOptions, bento_tag: str, force: bool) -> None: #
default=10,
help="Number of threads to use for upload",
)
@click.option(
"-m",
"--maxmemory",
default=-1,
help="max memory usage in GB when pushing, default -1 means no limit",
)
@click.pass_obj
def push(shared_options: SharedOptions, bento_tag: str, force: bool, threads: int) -> None: # type: ignore (not accessed)
def push(shared_options: SharedOptions, bento_tag: str, force: bool, threads: int, maxmemory: int) -> None: # type: ignore (not accessed)
"""Push Bento to a remote Bento store server."""
bento_obj = bento_store.get(bento_tag)
if not bento_obj:
Expand All @@ -292,6 +298,7 @@ def push(shared_options: SharedOptions, bento_tag: str, force: bool, threads: in
force=force,
threads=threads,
context=shared_options.cloud_context,
maxmemory=maxmemory,
)

@cli.command()
Expand Down
8 changes: 4 additions & 4 deletions src/bentoml_cli/models.py
Expand Up @@ -310,12 +310,12 @@ def pull(ctx: click.Context, model_tag: str | None, force: bool, bentofile: str)
)
@click.option(
"-m",
"--memory",
"--maxmemory",
default=-1,
help="max memory usage in GB when pushing, -1 means no limit",
help="max memory usage in GB when pushing, default -1 means no limit",
)
@click.pass_obj
def push(shared_options: SharedOptions, model_tag: str, force: bool, threads: int, memory: int): # type: ignore (not accessed)
def push(shared_options: SharedOptions, model_tag: str, force: bool, threads: int, maxmemory: int): # type: ignore (not accessed)
"""Push Model to a remote model store."""
model_obj = model_store.get(model_tag)
if not model_obj:
Expand All @@ -325,5 +325,5 @@ def push(shared_options: SharedOptions, model_tag: str, force: bool, threads: in
force=force,
threads=threads,
context=shared_options.cloud_context,
memory=memory,
maxmemory=maxmemory,
)

0 comments on commit e9745a5

Please sign in to comment.