Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[QST] out of memory error while trying out examples in jupyter notebook #993

Open
bjpietrzak opened this issue May 24, 2023 · 8 comments
Open
Labels
question Further information is requested

Comments

@bjpietrzak
Copy link

❓ Questions & Help

Details

Hi, I have experienced CUDA out of memory error while trying out examples in jupyter notebook files. Is there any neat way of solving this error?

os: Ubuntu 20.04
gpu: NVIDIA GeForce RTX 3060 Ti
cuda: 11.8
cudnn: 8
docker version: 24.0.1
docker image: nvcr.io/nvidia/merlin/merlin-pytorch-training:22.03
ipynb file: 02-ETL-with-NVTabular.ipynb

The code that throws an error cudaErrorMemoryAllocation out of memory:

train_dataset = nvt.Dataset([os.path.join(INPUT_DATA_DIR, "train.parquet")])

valid_dataset = nvt.Dataset([os.path.join(INPUT_DATA_DIR, "valid.parquet")])
%%time
workflow.fit(train_dataset)
@bjpietrzak bjpietrzak added the question Further information is requested label May 24, 2023
@rnyak
Copy link
Contributor

rnyak commented May 26, 2023

@Bartoliinii hello. Can you clarify the followings:

  • which 02 notebook is that? the one in getting-started-movielens folder?
  • Are you using your own dataset or the dataset used in the notebook example?
  • Are you running the notebook as in the docker image or you are running the notebook from Merlin repo main branch? These two can be different.

Besides, the docker image version you mentioned above is like one year old. Is it possible for you to use our latest stable docker image? Can you please test merlin-pytorch-training:23.04 image?

@bjpietrzak
Copy link
Author

bjpietrzak commented May 26, 2023

Hi @rnyak thanks for responding.

which 02 notebook is that? the one in getting-started-movielens folder?

It was this notebook: https://nvidia-merlin.github.io/NVTabular/v0.7.1/examples/getting-started-movielens/02-ETL-with-NVTabular.html

Are you using your own dataset or the dataset used in the notebook example?

I was running cells, with no custom inputs on my part

Are you running the notebook as in the docker image or you are running the notebook from Merlin repo main branch? These two can be different.

I was running it as a docker image

Besides, the docker image version you mentioned above is like one year old. Is it possible for you to use our latest stable docker image? Can you please test merlin-pytorch-training:23.04 image?

I know, but the last time I tried to use a newer docker image with tensorflow I had to update drivers for nvidia graphics card, this update wrecked my cuda installation.

  • One additional question, is there a way to load a parquet with nvt without filling GPU? Something that resembles dask_cudf.read_parquete?

@rnyak
Copy link
Contributor

rnyak commented May 30, 2023

One additional question, is there a way to load a parquet with nvt without filling GPU?

you can directly read data from path to NVT.. like that:

dataset= nvt.Dataset(<path to your parquet files>)
workflow.fit_transform(dataset)
...

does that answer your question?

btw, do you have 8 GB machine?

@karlhigley karlhigley changed the title [QST] [QST] out of memory error while trying out examples in jupyter notebook May 30, 2023
@bjpietrzak
Copy link
Author

does that answer your question?

Yes, thank you.

btw, do you have 8 GB machine?

Yes, I do.

@rnyak
Copy link
Contributor

rnyak commented Jun 5, 2023

@Bartoliinii I noticed that you are following examples in the NVT website.. Please use these notebooks instead:

https://github.com/NVIDIA-Merlin/Merlin/tree/main/examples/getting-started-movielens

thanks.

@bjpietrzak
Copy link
Author

bjpietrzak commented Jun 12, 2023

Thank you for the tip. I'm following the instructions in this docker container for TensorFlow:

nvcr.io/nvidia/merlin/merlin-tensorflow:23.02

In the /Merlin/examples/getting-started-movielens/02-ETL-with-NVTabular.ipynb I get the same error while running 14th cell:

Cell 13:

train_dataset = nvt.Dataset([os.path.join(INPUT_DATA_DIR, "train.parquet")])
valid_dataset = nvt.Dataset([os.path.join(INPUT_DATA_DIR, "valid.parquet")])

Cell 14:

%%time
workflow.fit(train_dataset)

Output:
MemoryError: Parquet data was larger than the available GPU memory!

Is there a way to fit workflow to a dataset without filling GPU memmory?

@rnyak
Copy link
Contributor

rnyak commented Jun 12, 2023

Is there a way to fit workflow to a dataset without filling GPU memory?

  • Please be sure there is no other kernels (nbs) are open, shut them down.
  • check your gpu consumption before you start notebook2 --> you can type this in your terminal watch -n0.1 nvidia-smi, and be sure memory consumption is 0 or very low at the beginning.
  • in 02-ETL-with-NVTabular.ipynb notebook if you notice we are reading the parquet file with cudf upfront, that makes your GPU memory is used a bit (< 1 GB) before you reach out to NVT pipeline.

so you can change all df_lib.read_parquet to pandas.read_parquet(..)

movies = df_lib.read_parquet(os.path.join(INPUT_DATA_DIR, "movies_converted.parquet"))
movies.head()

Note that joined = userId + movieId >> JoinExternal(movies, on=["movieId"]) consumes some GPU memory but it is also less than 1 GB.

train_dataset = nvt.Dataset([os.path.join(INPUT_DATA_DIR, "train.parquet")]) does not fill up your GPU memory or uses GPU memory, you can observe your watch -n0.1 nvidia-smi on terminal and see what cell fills up your GPU memory, or uses GPU memory..

the moment you do workflow.fit() then your GPU is started to be used. But it is less than 8 GB so you should be good.

  • Please try to set a part_size arg as following:
train_dataset = nvt.Dataset([os.path.join(INPUT_DATA_DIR, "train.parquet")], part_size='128MB')
valid_dataset = nvt.Dataset([os.path.join(INPUT_DATA_DIR, "valid.parquet")], part_size='128MB')

Please be sure you are not using anything like cudf or dask cudf that is using your GPU memory before NVT. you can check this using the command I shared in the terminal and find out which cell is consuming your memory.

@bjpietrzak
Copy link
Author

  • I made sure no other nds are running:
root@c16d4c741902:/Merlin/examples/getting-started-movielens# jupyter notebook list
Currently running servers:
  • Before running the 02-ETL-with-NVTabular.ipynb GPU memory consumption was at 6.2%
  • I changed df_lib.read_parquet to pandas.read_parquet in 3 cell
  • Just before running workflow.fit(train_dataset) GPU memory consumption was at 13.4% because of the joined = userId + movieId >> JoinExternal(movies, on=["movieId"])

Unfortunately, the error still remains. Here is the full output:

/usr/local/lib/python3.8/dist-packages/merlin/schema/tags.py:148: UserWarning: Compound tags like Tags.USER_ID have been deprecated and will be removed in a future version. Please use the atomic versions of these tags, like [<Tags.USER: 'user'>, <Tags.ID: 'id'>].
  warnings.warn(
/usr/local/lib/python3.8/dist-packages/merlin/schema/tags.py:148: UserWarning: Compound tags like Tags.ITEM_ID have been deprecated and will be removed in a future version. Please use the atomic versions of these tags, like [<Tags.ITEM: 'item'>, <Tags.ID: 'id'>].
  warnings.warn(
Failed to transform operator <nvtabular.ops.categorify.Categorify object at 0x7f0314138190>
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/dist-packages/nvtabular/ops/categorify.py", line 463, in transform
    encoded = _encode(
  File "/usr/local/lib/python3.8/dist-packages/nvtabular/ops/categorify.py", line 1427, in _encode
    labels = codes.merge(
  File "/usr/local/lib/python3.8/dist-packages/nvtx/nvtx.py", line 101, in inner
    result = func(*args, **kwargs)
  File "/usr/local/lib/python3.8/dist-packages/cudf/core/dataframe.py", line 3759, in merge
    return merge_cls(
  File "/usr/local/lib/python3.8/dist-packages/cudf/core/join/join.py", line 200, in perform_merge
    self.rhs._gather(gather_map=right_rows, **gather_kwargs)
  File "/usr/local/lib/python3.8/dist-packages/cudf/core/indexed_frame.py", line 1506, in _gather
    libcudf.copying.gather(
  File "copying.pyx", line 185, in cudf._lib.copying.gather
MemoryError: std::bad_alloc: out_of_memory: CUDA error at: /usr/local/include/rmm/mr/device/cuda_memory_resource.hpp:70: cudaErrorMemoryAllocation out of memory

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.8/dist-packages/merlin/dag/executors.py", line 183, in _transform_data
    output_data = node.op.transform(selection, input_data)
  File "/usr/local/lib/python3.8/dist-packages/nvtx/nvtx.py", line 101, in inner
    result = func(*args, **kwargs)
  File "/usr/local/lib/python3.8/dist-packages/nvtabular/ops/categorify.py", line 483, in transform
    raise RuntimeError(f"Failed to categorical encode column {name}") from e
RuntimeError: Failed to categorical encode column genres

---------------------------------------------------------------------------
MemoryError                               Traceback (most recent call last)
File /usr/local/lib/python3.8/dist-packages/nvtabular/ops/categorify.py:463, in Categorify.transform(self, col_selector, df)
    461 path = self.categories[storage_name]
--> 463 encoded = _encode(
    464     use_name,
    465     storage_name,
    466     path,
    467     df,
    468     self.cat_cache,
    469     na_sentinel=self.na_sentinel,
    470     freq_threshold=self.freq_threshold[name]
    471     if isinstance(self.freq_threshold, dict)
    472     else self.freq_threshold,
    473     search_sorted=self.search_sorted,
    474     buckets=self.num_buckets,
    475     encode_type=self.encode_type,
    476     cat_names=column_names,
    477     max_size=self.max_size,
    478     dtype=self.output_dtype,
    479     start_index=self.start_index,
    480 )
    481 new_df[name] = encoded

File /usr/local/lib/python3.8/dist-packages/nvtabular/ops/categorify.py:1427, in _encode(name, storage_name, path, df, cat_cache, na_sentinel, freq_threshold, search_sorted, buckets, encode_type, cat_names, max_size, dtype, start_index)
   1426 na_sentinel = 0
-> 1427 labels = codes.merge(
   1428     value, left_on=selection_l.names, right_on=selection_r.names, how="left"
   1429 ).sort_values("order")["labels"]
   1430 labels.fillna(na_sentinel, inplace=True)

File /usr/local/lib/python3.8/dist-packages/nvtx/nvtx.py:101, in annotate.__call__.<locals>.inner(*args, **kwargs)
    100 libnvtx_push_range(self.attributes, self.domain.handle)
--> 101 result = func(*args, **kwargs)
    102 libnvtx_pop_range(self.domain.handle)

File /usr/local/lib/python3.8/dist-packages/cudf/core/dataframe.py:3759, in DataFrame.merge(self, right, on, left_on, right_on, left_index, right_index, how, sort, lsuffix, rsuffix, indicator, suffixes)
   3757     merge_cls = MergeSemi
-> 3759 return merge_cls(
   3760     lhs,
   3761     rhs,
   3762     on=on,
   3763     left_on=left_on,
   3764     right_on=right_on,
   3765     left_index=left_index,
   3766     right_index=right_index,
   3767     how=how,
   3768     sort=sort,
   3769     indicator=indicator,
   3770     suffixes=suffixes,
   3771 ).perform_merge()

File /usr/local/lib/python3.8/dist-packages/cudf/core/join/join.py:200, in Merge.perform_merge(self)
    194 left_result = (
    195     self.lhs._gather(gather_map=left_rows, **gather_kwargs)
    196     if left_rows is not None
    197     else cudf.DataFrame._from_data({})
    198 )
    199 right_result = (
--> 200     self.rhs._gather(gather_map=right_rows, **gather_kwargs)
    201     if right_rows is not None
    202     else cudf.DataFrame._from_data({})
    203 )
    205 result = cudf.DataFrame._from_data(
    206     *self._merge_results(left_result, right_result)
    207 )

File /usr/local/lib/python3.8/dist-packages/cudf/core/indexed_frame.py:1506, in IndexedFrame._gather(self, gather_map, keep_index, nullify, check_bounds)
   1503     raise IndexError("Gather map index is out of bounds.")
   1505 return self._from_columns_like_self(
-> 1506     libcudf.copying.gather(
   1507         list(self._index._columns + self._columns)
   1508         if keep_index
   1509         else list(self._columns),
   1510         gather_map,
   1511         nullify=nullify,
   1512     ),
   1513     self._column_names,
   1514     self._index.names if keep_index else None,
   1515 )

File copying.pyx:185, in cudf._lib.copying.gather()

MemoryError: std::bad_alloc: out_of_memory: CUDA error at: /usr/local/include/rmm/mr/device/cuda_memory_resource.hpp:70: cudaErrorMemoryAllocation out of memory

The above exception was the direct cause of the following exception:

RuntimeError                              Traceback (most recent call last)
File <timed eval>:1

File /usr/local/lib/python3.8/dist-packages/nvtabular/workflow/workflow.py:213, in Workflow.fit(self, dataset)
    209         dependencies.difference_update(current_phase)
    211 # This captures the output dtypes of operators like LambdaOp where
    212 # the dtype can't be determined without running the transform
--> 213 self._transform_impl(dataset, capture_dtypes=True).sample_dtypes()
    214 self.graph.construct_schema(dataset.schema, preserve_dtypes=True)
    216 return self

File /usr/local/lib/python3.8/dist-packages/merlin/io/dataset.py:1155, in Dataset.sample_dtypes(self, n, annotate_lists)
   1148 """Return the real dtypes of the Dataset
   1149 
   1150 Use cached metadata if this operation was
   1151 already performed. Otherwise, call down to the
   1152 underlying engine for sampling logic.
   1153 """
   1154 if self._real_meta.get(n, None) is None:
-> 1155     _real_meta = self.engine.sample_data(n=n)
   1156     if self.dtypes:
   1157         _real_meta = _set_dtypes(_real_meta, self.dtypes)

File /usr/local/lib/python3.8/dist-packages/merlin/io/dataset_engine.py:71, in DatasetEngine.sample_data(self, n)
     69 _ddf = self.to_ddf()
     70 for partition_index in range(_ddf.npartitions):
---> 71     _head = _ddf.partitions[partition_index].head(n)
     72     if len(_head):
     73         return _head

File /usr/local/lib/python3.8/dist-packages/dask/dataframe/core.py:1256, in _Frame.head(self, n, npartitions, compute)
   1254 # No need to warn if we're already looking at all partitions
   1255 safe = npartitions != self.npartitions
-> 1256 return self._head(n=n, npartitions=npartitions, compute=compute, safe=safe)

File /usr/local/lib/python3.8/dist-packages/dask/dataframe/core.py:1290, in _Frame._head(self, n, npartitions, compute, safe)
   1285 result = new_dd_object(
   1286     graph, name, self._meta, [self.divisions[0], self.divisions[npartitions]]
   1287 )
   1289 if compute:
-> 1290     result = result.compute()
   1291 return result

File /usr/local/lib/python3.8/dist-packages/dask/base.py:315, in DaskMethodsMixin.compute(self, **kwargs)
    291 def compute(self, **kwargs):
    292     """Compute this dask collection
    293 
    294     This turns a lazy Dask collection into its in-memory equivalent.
   (...)
    313     dask.base.compute
    314     """
--> 315     (result,) = compute(self, traverse=False, **kwargs)
    316     return result

File /usr/local/lib/python3.8/dist-packages/dask/base.py:598, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    595     keys.append(x.__dask_keys__())
    596     postcomputes.append(x.__dask_postcompute__())
--> 598 results = schedule(dsk, keys, **kwargs)
    599 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File /usr/local/lib/python3.8/dist-packages/dask/local.py:557, in get_sync(dsk, keys, **kwargs)
    552 """A naive synchronous version of get_async
    553 
    554 Can be useful for debugging.
    555 """
    556 kwargs.pop("num_workers", None)  # if num_workers present, remove it
--> 557 return get_async(
    558     synchronous_executor.submit,
    559     synchronous_executor._max_workers,
    560     dsk,
    561     keys,
    562     **kwargs,
    563 )

File /usr/local/lib/python3.8/dist-packages/dask/local.py:500, in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
    498 while state["waiting"] or state["ready"] or state["running"]:
    499     fire_tasks(chunksize)
--> 500     for key, res_info, failed in queue_get(queue).result():
    501         if failed:
    502             exc, tb = loads(res_info)

File /usr/lib/python3.8/concurrent/futures/_base.py:437, in Future.result(self, timeout)
    435     raise CancelledError()
    436 elif self._state == FINISHED:
--> 437     return self.__get_result()
    439 self._condition.wait(timeout)
    441 if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:

File /usr/lib/python3.8/concurrent/futures/_base.py:389, in Future.__get_result(self)
    387 if self._exception:
    388     try:
--> 389         raise self._exception
    390     finally:
    391         # Break a reference cycle with the exception in self._exception
    392         self = None

File /usr/local/lib/python3.8/dist-packages/dask/local.py:542, in SynchronousExecutor.submit(self, fn, *args, **kwargs)
    540 fut = Future()
    541 try:
--> 542     fut.set_result(fn(*args, **kwargs))
    543 except BaseException as e:
    544     fut.set_exception(e)

File /usr/local/lib/python3.8/dist-packages/dask/local.py:238, in batch_execute_tasks(it)
    234 def batch_execute_tasks(it):
    235     """
    236     Batch computing of multiple tasks with `execute_task`
    237     """
--> 238     return [execute_task(*a) for a in it]

File /usr/local/lib/python3.8/dist-packages/dask/local.py:238, in <listcomp>(.0)
    234 def batch_execute_tasks(it):
    235     """
    236     Batch computing of multiple tasks with `execute_task`
    237     """
--> 238     return [execute_task(*a) for a in it]

File /usr/local/lib/python3.8/dist-packages/dask/local.py:229, in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    227     failed = False
    228 except BaseException as e:
--> 229     result = pack_exception(e, dumps)
    230     failed = True
    231 return key, result, failed

File /usr/local/lib/python3.8/dist-packages/dask/local.py:224, in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    222 try:
    223     task, data = loads(task_info)
--> 224     result = _execute_task(task, data)
    225     id = get_id()
    226     result = dumps((result, id))

File /usr/local/lib/python3.8/dist-packages/dask/core.py:119, in _execute_task(arg, cache, dsk)
    115     func, args = arg[0], arg[1:]
    116     # Note: Don't assign the subtask results to a variable. numpy detects
    117     # temporaries by their reference count and can execute certain
    118     # operations in-place.
--> 119     return func(*(_execute_task(a, cache) for a in args))
    120 elif not ishashable(arg):
    121     return arg

File /usr/local/lib/python3.8/dist-packages/dask/optimization.py:990, in SubgraphCallable.__call__(self, *args)
    988 if not len(args) == len(self.inkeys):
    989     raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
--> 990 return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))

File /usr/local/lib/python3.8/dist-packages/dask/core.py:149, in get(dsk, out, cache)
    147 for key in toposort(dsk):
    148     task = dsk[key]
--> 149     result = _execute_task(task, cache)
    150     cache[key] = result
    151 result = _execute_task(out, cache)

File /usr/local/lib/python3.8/dist-packages/dask/core.py:119, in _execute_task(arg, cache, dsk)
    115     func, args = arg[0], arg[1:]
    116     # Note: Don't assign the subtask results to a variable. numpy detects
    117     # temporaries by their reference count and can execute certain
    118     # operations in-place.
--> 119     return func(*(_execute_task(a, cache) for a in args))
    120 elif not ishashable(arg):
    121     return arg

File /usr/local/lib/python3.8/dist-packages/dask/utils.py:41, in apply(func, args, kwargs)
     39 def apply(func, args, kwargs=None):
     40     if kwargs:
---> 41         return func(*args, **kwargs)
     42     else:
     43         return func(*args)

File /usr/local/lib/python3.8/dist-packages/merlin/dag/executors.py:72, in LocalExecutor.transform(self, transformable, graph, output_dtypes, additional_columns, capture_dtypes, validate_dtypes)
     69 output_data = None
     71 for node in nodes:
---> 72     input_data = self._build_input_data(
     73         node, transformable, capture_dtypes=capture_dtypes, validate_dtypes=validate_dtypes
     74     )
     76     if node.op:
     77         transformed_data = self._transform_data(
     78             node, input_data, capture_dtypes=capture_dtypes, validate_dtypes=validate_dtypes
     79         )

File /usr/local/lib/python3.8/dist-packages/merlin/dag/executors.py:121, in LocalExecutor._build_input_data(self, node, transformable, capture_dtypes, validate_dtypes)
    119 for parent in node.parents_with_dependencies:
    120     parent_output_cols = _get_unique(parent.output_schema.column_names)
--> 121     parent_data = self.transform(
    122         transformable,
    123         [parent],
    124         capture_dtypes=capture_dtypes,
    125         validate_dtypes=validate_dtypes,
    126     )
    127     if input_data is None or not len(input_data):
    128         input_data = parent_data[parent_output_cols]

File /usr/local/lib/python3.8/dist-packages/merlin/dag/executors.py:77, in LocalExecutor.transform(self, transformable, graph, output_dtypes, additional_columns, capture_dtypes, validate_dtypes)
     72 input_data = self._build_input_data(
     73     node, transformable, capture_dtypes=capture_dtypes, validate_dtypes=validate_dtypes
     74 )
     76 if node.op:
---> 77     transformed_data = self._transform_data(
     78         node, input_data, capture_dtypes=capture_dtypes, validate_dtypes=validate_dtypes
     79     )
     80 else:
     81     transformed_data = input_data

File /usr/local/lib/python3.8/dist-packages/merlin/dag/executors.py:183, in LocalExecutor._transform_data(self, node, input_data, capture_dtypes, validate_dtypes)
    180 try:
    181     # use input_columns to ensure correct grouping (subgroups)
    182     selection = node.input_columns.resolve(node.input_schema)
--> 183     output_data = node.op.transform(selection, input_data)
    185     # Update or validate output_data dtypes
    186     if capture_dtypes or validate_dtypes:

File /usr/local/lib/python3.8/dist-packages/nvtx/nvtx.py:101, in annotate.__call__.<locals>.inner(*args, **kwargs)
     98 @wraps(func)
     99 def inner(*args, **kwargs):
    100     libnvtx_push_range(self.attributes, self.domain.handle)
--> 101     result = func(*args, **kwargs)
    102     libnvtx_pop_range(self.domain.handle)
    103     return result

File /usr/local/lib/python3.8/dist-packages/nvtabular/ops/categorify.py:483, in Categorify.transform(self, col_selector, df)
    481         new_df[name] = encoded
    482     except Exception as e:
--> 483         raise RuntimeError(f"Failed to categorical encode column {name}") from e
    485 return new_df

RuntimeError: Failed to categorical encode column genres

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

2 participants