Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[QST] examples/tutorial/02-ETL-with-NVTabular.ipynb #776

Open
zwei2016 opened this issue Apr 25, 2024 · 0 comments
Open

[QST] examples/tutorial/02-ETL-with-NVTabular.ipynb #776

zwei2016 opened this issue Apr 25, 2024 · 0 comments

Comments

@zwei2016
Copy link

❓ Questions & Help

Details

I have executed 01 successfully and one issue appeared in this block chunk "5.4. Grouping interactions into sessions":

workflow = nvt.Workflow(filtered_sessions)
dataset = nvt.Dataset(df)
# Learn features statistics necessary of the preprocessing workflow
# The following will generate schema.pbtxt file in the provided folder and export the parquet files.
workflow.fit_transform(dataset).to_parquet(os.path.join(INPUT_DATA_DIR, "processed_nvt"))

Error message shows workflow.fit_transform(dataset) caused a KeyError: 'category_id_price_sum'. But I didn't find this feature in the tutorial. Maybe I missed something? anyone could help me out?

The detailed error message is as follows:

KeyError Traceback (most recent call last)
Cell In[48], line 5
2 dataset = nvt.Dataset(df)
3 # Learn features statistics necessary of the preprocessing workflow
4 # The following will generate schema.pbtxt file in the provided folder and export the parquet files.
----> 5 workflow.fit_transform(dataset).to_parquet(os.path.join(INPUT_DATA_DIR, "processed_nvt"))

File ~/miniconda3/lib/python3.10/site-packages/nvtabular/workflow/workflow.py:236, in Workflow.fit_transform(self, dataset)
216 def fit_transform(self, dataset: Dataset) -> Dataset:
217 """Convenience method to both fit the workflow and transform the dataset in a single
218 call. Equivalent to calling workflow.fit(dataset) followed by
219 workflow.transform(dataset)
(...)
234 transform
235 """
--> 236 self.fit(dataset)
237 return self.transform(dataset)

File ~/miniconda3/lib/python3.10/site-packages/nvtabular/workflow/workflow.py:213, in Workflow.fit(self, dataset)
199 def fit(self, dataset: Dataset) -> "Workflow":
200 """Calculates statistics for this workflow on the input dataset
201
202 Parameters
(...)
211 This Workflow with statistics calculated on it
212 """
--> 213 self.executor.fit(dataset, self.graph)
214 return self

File ~/miniconda3/lib/python3.10/site-packages/merlin/dag/executors.py:466, in DaskExecutor.fit(self, dataset, graph, refit)
462 if not current_phase:
463 # this shouldn't happen, but lets not infinite loop just in case
464 raise RuntimeError("failed to find dependency-free StatOperator to fit")
--> 466 self.fit_phase(dataset, current_phase)
468 # Remove all the operators we processed in this phase, and remove
469 # from the dependencies of other ops too
470 for node in current_phase:

File ~/miniconda3/lib/python3.10/site-packages/merlin/dag/executors.py:532, in DaskExecutor.fit_phase(self, dataset, nodes, strict)
530 stats.append(node.op.fit(node.input_columns, Dataset(ddf)))
531 else:
--> 532 stats.append(node.op.fit(node.input_columns, transformed_ddf))
533 except Exception:
534 LOG.exception("Failed to fit operator %s", node.op)

File ~/miniconda3/lib/python3.10/site-packages/nvtabular/ops/join_groupby.py:154, in JoinGroupby.fit(self, col_selector, ddf)
151 # Cannot use "device" caching if the data is pandas-backed
152 self.cat_cache = "host" if self.cat_cache == "device" else self.cat_cache
--> 154 dsk, key = nvt_cat._category_stats(
155 ddf,
156 nvt_cat.FitOptions(
157 col_selector,
158 self.cont_names,
159 self.stats,
160 self.out_path,
161 0,
162 self.split_out,
163 self.on_host,
164 concat_groups=False,
165 name_sep=self.name_sep,
166 split_every=self.split_every,
167 ),
168 )
169 return Delayed(key, dsk)

File ~/miniconda3/lib/python3.10/site-packages/nvtabular/ops/categorify.py:1559, in _category_stats(ddf, options)
1556 if options.agg_list == []:
1557 options.agg_list = ["count"]
-> 1559 return _groupby_to_disk(ddf, None, options)

File ~/miniconda3/lib/python3.10/site-packages/nvtabular/ops/categorify.py:1485, in _groupby_to_disk(ddf, write_func, options)
1476 dsk_split[c][(reduce_2_name, s)] = (
1477 _bottom_level_groupby,
1478 [(split_name, 0, c, s)],
(...)
1481 False,
1482 )
1484 # Make DataFrame collection for column-group result
-> 1485 _meta = _bottom_level_groupby(
1486 [_grouped_meta_col[c]],
1487 col,
1488 options,
1489 spill=False,
1490 )
1491 _divisions = (None,) * (options.split_out[col_str] + 1)
1492 graph = HighLevelGraph.from_collections(reduce_2_name, dsk_split[c], dependencies=[grouped])

File ~/miniconda3/lib/python3.10/site-packages/nvtx/nvtx.py:116, in annotate.call..inner(*args, **kwargs)
113 @wraps(func)
114 def inner(*args, **kwargs):
115 libnvtx_push_range(self.attributes, self.domain.handle)
--> 116 result = func(*args, **kwargs)
117 libnvtx_pop_range(self.domain.handle)
118 return result

File ~/miniconda3/lib/python3.10/site-packages/nvtabular/ops/categorify.py:1097, in _bottom_level_groupby(dfs, col_selector, options, spill)
1095 name_mean = _make_name((col_selector.names + [cont_col, "mean"]), sep=options.name_sep)
1096 required.append(name_mean)
-> 1097 gb[name_mean] = gb[name_sum] / gb[name_count]
1099 if "min" in options.agg_list:
1100 name_min = _make_name(
(col_selector.names + [cont_col, "min"]), sep=options.name_sep)

File ~/miniconda3/lib/python3.10/site-packages/nvtx/nvtx.py:116, in annotate.call..inner(*args, **kwargs)
113 @wraps(func)
114 def inner(*args, **kwargs):
115 libnvtx_push_range(self.attributes, self.domain.handle)
--> 116 result = func(*args, **kwargs)
117 libnvtx_pop_range(self.domain.handle)
118 return result

File ~/miniconda3/lib/python3.10/site-packages/cudf/core/dataframe.py:1336, in DataFrame.getitem(self, arg)
1274 """
1275 If arg is a str or int type, return the column Series.
1276 If arg is a slice, return a new DataFrame with all columns
(...)
1333 8 8 8 8
1334 """
1335 if _is_scalar_or_zero_d_array(arg) or isinstance(arg, tuple):
-> 1336 return self._get_columns_by_label(arg, downcast=True)
1338 elif isinstance(arg, slice):
1339 return self._slice(arg)

File ~/miniconda3/lib/python3.10/site-packages/nvtx/nvtx.py:116, in annotate.call..inner(*args, **kwargs)
113 @wraps(func)
114 def inner(*args, **kwargs):
115 libnvtx_push_range(self.attributes, self.domain.handle)
--> 116 result = func(*args, **kwargs)
117 libnvtx_pop_range(self.domain.handle)
118 return result

File ~/miniconda3/lib/python3.10/site-packages/cudf/core/dataframe.py:1995, in DataFrame._get_columns_by_label(self, labels, downcast)
1986 @_cudf_nvtx_annotate
1987 def _get_columns_by_label(
1988 self, labels, *, downcast=False
1989 ) -> Self | Series:
1990 """
1991 Return columns of dataframe by labels
1992
1993 If downcast is True, try and downcast from a DataFrame to a Series
1994 """
-> 1995 ca = self._data.select_by_label(labels)
1996 if downcast:
1997 if is_scalar(labels):

File ~/miniconda3/lib/python3.10/site-packages/cudf/core/column_accessor.py:351, in ColumnAccessor.select_by_label(self, key)
349 if any(isinstance(k, slice) for k in key):
350 return self._select_by_label_with_wildcard(key)
--> 351 return self._select_by_label_grouped(key)

File ~/miniconda3/lib/python3.10/site-packages/cudf/core/column_accessor.py:496, in ColumnAccessor._select_by_label_grouped(self, key)
495 def _select_by_label_grouped(self, key: Any) -> ColumnAccessor:
--> 496 result = self._grouped_data[key]
497 if isinstance(result, column.ColumnBase):
498 # self._grouped_data[key] = self._data[key] so skip validation
499 return self.class(
500 data={key: result},
501 multiindex=self.multiindex,
502 verify=False,
503 )

KeyError: 'category_id_price_sum'

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant