Skip to content

Commit

Permalink
How to for embedding op with categorify from scratch (#1827)
Browse files Browse the repository at this point in the history
* embedding op test from start

* setup asserts to verify logic

* use make series to handle cpu-gpu env

---------

Co-authored-by: Karl Higley <kmhigley@gmail.com>
Co-authored-by: rnyak <16246900+rnyak@users.noreply.github.com>
  • Loading branch information
3 people committed Jun 8, 2023
1 parent 4b7957a commit 25151f7
Showing 1 changed file with 57 additions and 1 deletion.
58 changes: 57 additions & 1 deletion tests/unit/workflow/test_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@
import nvtabular as nvt
from merlin.core import dispatch
from merlin.core.compat import cudf, dask_cudf
from merlin.core.dispatch import HAS_GPU, make_df
from merlin.core.dispatch import HAS_GPU, create_multihot_col, make_df, make_series
from merlin.core.utils import set_dask_client
from merlin.dag import ColumnSelector, postorder_iter_nodes
from merlin.dataloader.loader_base import LoaderBase as Loader
from merlin.dataloader.ops.embeddings import EmbeddingOperator
from merlin.schema import Tags
from nvtabular import Dataset, Workflow, ops
from tests.conftest import assert_eq, get_cats, mycols_csv
Expand Down Expand Up @@ -774,3 +776,57 @@ def test_workflow_auto_infer_modules_byvalue(tmp_path):
os.unlink(str(tmp_path / "not_a_real_module.py"))

Workflow.load(str(tmp_path / "identity-workflow"))


@pytest.mark.parametrize("cpu", [None, "cpu"] if HAS_GPU else ["cpu"])
def test_embedding_cat_export_import(tmpdir, cpu):
string_ids = ["alpha", "bravo", "charlie", "delta", "foxtrot"]
training_data = make_df(
{
"string_id": string_ids,
}
)
training_data["embeddings"] = create_multihot_col(
[0, 5, 10, 15, 20, 25], make_series(np.random.rand(25))
)

cat_op = nvt.ops.Categorify()

# first workflow that categorifies all data
graph1 = ["string_id"] >> cat_op
emb_res = Workflow(graph1 + ["embeddings"]).fit_transform(
Dataset(training_data, cpu=(cpu is not None))
)
npy_path = str(tmpdir / "embeddings.npy")
emb_res.to_npy(npy_path)

embeddings = np.load(npy_path)
# second workflow that categorifies the embedding table data
df = make_df({"string_id": np.random.choice(string_ids, 30)})
graph2 = ["string_id"] >> cat_op
train_res = Workflow(graph2).transform(Dataset(df, cpu=(cpu is not None)))

data_loader = Loader(
train_res,
batch_size=1,
transforms=[
EmbeddingOperator(
embeddings[:, 1:],
id_lookup_table=embeddings[:, 0].astype(int),
lookup_key="string_id",
)
],
shuffle=False,
device=cpu,
)
origin_df = train_res.to_ddf().merge(emb_res.to_ddf(), on="string_id", how="left").compute()
for idx, batch in enumerate(data_loader):
batch
b_df = batch[0].to_df()
org_df = origin_df.iloc[idx]
if not cpu:
assert (b_df["string_id"].to_numpy() == org_df["string_id"].to_numpy()).all()
assert (b_df["embeddings"].list.leaves == org_df["embeddings"].list.leaves).all()
else:
assert (b_df["string_id"].values == org_df["string_id"]).all()
assert b_df["embeddings"].values[0] == org_df["embeddings"].tolist()

0 comments on commit 25151f7

Please sign in to comment.