-
Notifications
You must be signed in to change notification settings - Fork 143
/
conftest.py
413 lines (339 loc) · 12.1 KB
/
conftest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
#
# Copyright (c) 2021, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import contextlib
import glob
import os
import platform
import random
import signal
import socket
import subprocess
import time
from pathlib import Path
from unittest.mock import patch
import dask
import pandas as pd
from merlin.core.compat import cudf
from merlin.core.compat import numpy as np
if cudf:
try:
import cudf.testing._utils
assert_eq = cudf.testing._utils.assert_eq
except ImportError:
import cudf.tests.utils
assert_eq = cudf.tests.utils.assert_eq
else:
def assert_eq(a, b, *args, **kwargs):
if isinstance(a, pd.DataFrame):
return pd.testing.assert_frame_equal(a, b, *args, **kwargs)
elif isinstance(a, pd.Series):
return pd.testing.assert_series_equal(a, b, *args, **kwargs)
else:
return np.testing.assert_allclose(a, b)
import pytest
from asvdb import ASVDb, BenchmarkInfo, utils
from numba import cuda
import nvtabular
from merlin.core.utils import Distributed
from merlin.dag.node import iter_nodes
REPO_ROOT = Path(__file__).parent.parent
allcols_csv = ["timestamp", "id", "label", "name-string", "x", "y", "z"]
mycols_csv = ["name-string", "id", "label", "x", "y"]
mycols_pq = ["name-cat", "name-string", "id", "label", "x", "y"]
mynames = [
"Alice",
"Bob",
"Charlie",
"Dan",
"Edith",
"Frank",
"Gary",
"Hannah",
"Ingrid",
"Jerry",
"Kevin",
"Laura",
"Michael",
"Norbert",
"Oliver",
"Patricia",
"Quinn",
"Ray",
"Sarah",
"Tim",
"Ursula",
"Victor",
"Wendy",
"Xavier",
"Yvonne",
"Zelda",
]
_CUDA_CLUSTER = None
@pytest.fixture(scope="module")
def client():
distributed = Distributed()
cluster = distributed.cluster
client = distributed.client
yield client
client.close()
cluster.close()
@contextlib.contextmanager
def get_cuda_cluster():
from dask_cuda import LocalCUDACluster
CUDA_VISIBLE_DEVICES = os.environ.get("CUDA_VISIBLE_DEVICES", "0")
n_workers = min(2, len(CUDA_VISIBLE_DEVICES.split(",")))
cluster = LocalCUDACluster(n_workers=n_workers)
yield cluster
cluster.close()
@pytest.fixture(scope="session")
def datasets(tmpdir_factory):
_lib = cudf if cudf else pd
_datalib = cudf if cudf else dask
df = _datalib.datasets.timeseries(
start="2000-01-01",
end="2000-01-04",
freq="60s",
dtypes={
"name-cat": str,
"name-string": str,
"id": int,
"label": int,
"x": float,
"y": float,
"z": float,
},
).reset_index()
if _datalib is dask:
df = df.compute()
df["name-string"] = _lib.Series(np.random.choice(mynames, df.shape[0])).astype("O")
# Add two random null values to each column
imax = len(df) - 1
for col in df.columns:
if col in ["name-cat", "label", "id"]:
break
for _ in range(2):
rand_idx = random.randint(1, imax - 1)
if rand_idx == df[col].shape[0] // 2:
# dont want null in median
rand_idx += 1
df[col].iloc[rand_idx] = None
datadir = tmpdir_factory.mktemp("data_test")
datadir = {
"parquet": tmpdir_factory.mktemp("parquet"),
"csv": tmpdir_factory.mktemp("csv"),
"csv-no-header": tmpdir_factory.mktemp("csv-no-header"),
"cats": tmpdir_factory.mktemp("cats"),
}
half = int(len(df) // 2)
# Write Parquet Dataset
cudf_version = 0
if cudf:
cudf_version = cudf.__version__.split(".")[:2]
cudf_version = float(".".join(cudf_version))
if cudf_version > 22.10:
df.iloc[:half].to_parquet(
str(datadir["parquet"].join("dataset-0.parquet")), row_group_size_rows=5000
)
df.iloc[half:].to_parquet(
str(datadir["parquet"].join("dataset-1.parquet")), row_group_size_rows=5000
)
else:
df.iloc[:half].to_parquet(
str(datadir["parquet"].join("dataset-0.parquet")), chunk_size=1000
)
df.iloc[half:].to_parquet(
str(datadir["parquet"].join("dataset-1.parquet")), chunk_size=1000
)
# Write CSV Dataset (Leave out categorical column)
df = df[allcols_csv] # Set deterministic column order before write
df.iloc[:half].to_csv(str(datadir["csv"].join("dataset-0.csv")), index=False)
df.iloc[half:].to_csv(str(datadir["csv"].join("dataset-1.csv")), index=False)
df.iloc[:half].to_csv(
str(datadir["csv-no-header"].join("dataset-0.csv")), header=False, index=False
)
df.iloc[half:].to_csv(
str(datadir["csv-no-header"].join("dataset-1.csv")), header=False, index=False
)
return datadir
@pytest.fixture(scope="function")
def paths(engine, datasets):
return sorted(glob.glob(str(datasets[engine]) + "/*." + engine.split("-")[0]))
@pytest.fixture(scope="function")
def df(engine, paths):
_lib = cudf if cudf else pd
if engine == "parquet":
df1 = _lib.read_parquet(paths[0])[mycols_pq]
df2 = _lib.read_parquet(paths[1])[mycols_pq]
elif engine == "csv-no-header":
df1 = _lib.read_csv(paths[0], header=None, names=allcols_csv)[mycols_csv]
df2 = _lib.read_csv(paths[1], header=None, names=allcols_csv)[mycols_csv]
elif engine == "csv":
df1 = _lib.read_csv(paths[0], header=0)[mycols_csv]
df2 = _lib.read_csv(paths[1], header=0)[mycols_csv]
else:
raise ValueError("unknown engine:" + engine)
gdf = _lib.concat([df1, df2], axis=0)
gdf["id"] = gdf["id"].astype("int64")
return gdf
@pytest.fixture(scope="function")
def dataset(request, paths, engine):
try:
gpu_memory_frac = request.getfixturevalue("gpu_memory_frac")
except Exception: # pylint: disable=broad-except
gpu_memory_frac = 0.01
try:
cpu = request.getfixturevalue("cpu")
except Exception: # pylint: disable=broad-except
cpu = None
kwargs = {}
if engine == "csv-no-header":
kwargs["names"] = allcols_csv
return nvtabular.Dataset(paths, part_mem_fraction=gpu_memory_frac, cpu=cpu, **kwargs)
@pytest.fixture(scope="session")
def asv_db():
# Create an interface to an ASV "database" to write the results to.
(repo, branch) = utils.getRepoInfo() # gets repo info from CWD by default
# allows control of results location
db_dir = os.environ.get("ASVDB_DIR", "./benchmarks")
db = ASVDb(dbDir=db_dir, repo=repo, branches=[branch])
return db
@pytest.fixture(scope="session")
def bench_info():
# Create a BenchmarkInfo object describing the benchmarking environment.
# This can/should be reused when adding multiple results from the same environment.
uname = platform.uname()
(commitHash, commitTime) = utils.getCommitInfo() # gets commit info from CWD by default
cuda_version = os.environ["CUDA_VERSION"]
# get GPU info from nvidia-smi
bInfo = BenchmarkInfo(
machineName=socket.gethostname(),
cudaVer=cuda_version,
osType=f"{uname.system}",
pythonVer=platform.python_version(),
commitHash=commitHash,
commitTime=commitTime,
gpuType=cuda.get_current_device().name.decode("utf-8"),
)
return bInfo
def get_cats(workflow, col, stat_name="categories", cpu=False):
_lib = cudf if cudf and not cpu else pd
# figure out the categorify node from the workflow graph
cats = [
cg.op
for cg in iter_nodes([workflow.output_node])
if isinstance(cg.op, nvtabular.ops.Categorify)
]
if len(cats) != 1:
raise RuntimeError(f"Found {len(cats)} categorical ops, expected 1")
filename = cats[0].categories[col]
df = _lib.read_parquet(filename)
df.reset_index(drop=True, inplace=True)
if cudf and not cpu:
return df[col].values_host
else:
return df[col]
@contextlib.contextmanager
def run_triton_server(
modelpath, model_name, triton_server_path, device_id="0", backend="tensorflow", ps_path=None
):
import tritonclient
import tritonclient.grpc as grpcclient
if backend == "tensorflow":
backend_config = "tensorflow,version=2"
elif backend == "hugectr":
backend_config = "hugectr,ps=" + ps_path
else:
raise ValueError("unknown backend:" + backend)
cmdline = [
triton_server_path,
"--model-repository",
modelpath,
"--backend-config",
backend_config,
"--model-control-mode=explicit",
"--load-model",
model_name,
]
env = os.environ.copy()
env["CUDA_VISIBLE_DEVICES"] = device_id
with subprocess.Popen(cmdline, env=env) as process:
try:
with grpcclient.InferenceServerClient("localhost:8001") as client:
# wait until server is ready
for _ in range(120):
if process.poll() is not None:
retcode = process.returncode
raise RuntimeError(f"Tritonserver failed to start (ret={retcode})")
try:
ready = client.is_server_ready()
except tritonclient.utils.InferenceServerException:
ready = False
if ready:
yield client
return
time.sleep(1)
raise RuntimeError("Timed out waiting for tritonserver to become ready")
finally:
# signal triton to shutdown
process.send_signal(signal.SIGINT)
def run_in_context(func, *args, context=None, **kwargs):
# Convenience utility to execute a function within
# a specific `context`. For example, this can be
# used to test that a function raises a `UserWarning`
# by setting `context=pytest.warns(UserWarning)`
if context is None:
context = contextlib.suppress()
with context:
result = func(*args, **kwargs)
return result
# Allow to pass devices as parameters
def pytest_addoption(parser):
parser.addoption("--devices", action="store", default="0", help="0,1,..,n-1")
parser.addoption("--report", action="store", default="0", help="0 | 1")
@pytest.fixture
def devices(request):
return request.config.getoption("--devices")
@pytest.fixture
def report(request):
return request.config.getoption("--report")
def pytest_collection_modifyitems(items):
for item in items:
path = item.location[0]
if "/unit/" in path:
item.add_marker(getattr(pytest.mark, "unit"))
if "/loader/" in path:
item.add_marker(getattr(pytest.mark, "loader"))
if "/examples/" in path:
item.add_marker(getattr(pytest.mark, "examples"))
if "/ops/" in path:
item.add_marker(getattr(pytest.mark, "ops"))
if "test_tf_" in path:
item.add_marker(getattr(pytest.mark, "tensorflow"))
if "test_torch_" in path:
item.add_marker(getattr(pytest.mark, "torch"))
@pytest.fixture(scope="function", autouse=True)
def cleanup_dataloader():
"""After each test runs. Call .stop() on any dataloaders created during the test.
The avoids issues with background threads hanging around and interfering with subsequent tests.
This happens when a dataloader is partially consumed (not all batches are iterated through).
"""
from merlin.dataloader.loader_base import LoaderBase
with patch.object(
LoaderBase, "__iter__", side_effect=LoaderBase.__iter__, autospec=True
) as patched:
yield
for call in patched.call_args_list:
call.args[0].stop()