Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[CHIA-298] Optimize DL autoinserts. #17883

Merged
merged 26 commits into from May 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
71 changes: 42 additions & 29 deletions chia/_tests/core/data_layer/test_data_rpc.py
Expand Up @@ -93,6 +93,7 @@ async def init_data_layer_service(
wallet_service: Optional[WalletService] = None,
manage_data_interval: int = 5,
maximum_full_file_count: Optional[int] = None,
enable_batch_autoinsert: bool = True,
) -> AsyncIterator[DataLayerService]:
config = bt.config
config["data_layer"]["wallet_peer"]["port"] = int(wallet_rpc_port)
Expand All @@ -101,6 +102,7 @@ async def init_data_layer_service(
config["data_layer"]["port"] = 0
config["data_layer"]["rpc_port"] = 0
config["data_layer"]["manage_data_interval"] = 5
config["data_layer"]["enable_batch_autoinsert"] = enable_batch_autoinsert
if maximum_full_file_count is not None:
config["data_layer"]["maximum_full_file_count"] = maximum_full_file_count
if db_path is not None:
Expand Down Expand Up @@ -806,8 +808,10 @@ async def offer_setup_fixture(
self_hostname: str,
two_wallet_nodes_services: SimulatorsAndWalletsServices,
tmp_path: Path,
request: pytest.FixtureRequest,
) -> AsyncIterator[OfferSetup]:
[full_node_service], wallet_services, bt = two_wallet_nodes_services
enable_batch_autoinsertion_settings = getattr(request, "param", (True, True))
full_node_api = full_node_service._api
wallets: List[Wallet] = []
for wallet_service in wallet_services:
Expand All @@ -822,12 +826,16 @@ async def offer_setup_fixture(

async with contextlib.AsyncExitStack() as exit_stack:
store_setups: List[StoreSetup] = []
for wallet_service in wallet_services:
for enable_batch_autoinsert, wallet_service in zip(enable_batch_autoinsertion_settings, wallet_services):
assert wallet_service.rpc_server is not None
port = wallet_service.rpc_server.listen_port
data_layer_service = await exit_stack.enter_async_context(
init_data_layer_service(
wallet_rpc_port=port, wallet_service=wallet_service, bt=bt, db_path=tmp_path.joinpath(str(port))
wallet_rpc_port=port,
wallet_service=wallet_service,
bt=bt,
db_path=tmp_path.joinpath(str(port)),
enable_batch_autoinsert=enable_batch_autoinsert,
)
)
data_layer = data_layer_service._api.data_layer
Expand Down Expand Up @@ -914,19 +922,20 @@ async def populate_offer_setup(offer_setup: OfferSetup, count: int) -> OfferSetu
(offer_setup.taker, b"\x02"),
)
for store_setup, value_prefix in setups:
await store_setup.api.batch_update(
{
"id": store_setup.id.hex(),
"changelist": [
{
"action": "insert",
"key": value.to_bytes(length=1, byteorder="big").hex(),
"value": (value_prefix + value.to_bytes(length=1, byteorder="big")).hex(),
}
for value in range(count)
],
}
await store_setup.data_layer.batch_insert(
tree_id=store_setup.id,
changelist=[
{
"action": "insert",
"key": value.to_bytes(length=1, byteorder="big"),
"value": (value_prefix + value.to_bytes(length=1, byteorder="big")),
}
for value in range(count)
],
status=Status.PENDING,
enable_batch_autoinsert=False,
)
await store_setup.data_layer.publish_update(store_setup.id, uint64(0))

await process_for_data_layer_keys(
expected_key=b"\x00",
Expand Down Expand Up @@ -1552,18 +1561,22 @@ class MakeAndTakeReference:


@pytest.mark.parametrize(
argnames="reference",
argvalues=[
pytest.param(make_one_take_one_reference, id="one for one"),
pytest.param(make_one_take_one_same_values_reference, id="one for one same values"),
pytest.param(make_two_take_one_reference, id="two for one"),
pytest.param(make_one_take_two_reference, id="one for two"),
pytest.param(make_one_existing_take_one_reference, id="one existing for one"),
pytest.param(make_one_take_one_existing_reference, id="one for one existing"),
pytest.param(make_one_upsert_take_one_reference, id="one upsert for one"),
pytest.param(make_one_take_one_upsert_reference, id="one for one upsert"),
pytest.param(make_one_take_one_unpopulated_reference, id="one for one unpopulated"),
"reference, offer_setup",
[
pytest.param(make_one_take_one_reference, (True, True), id="one for one new/new batch_update"),
pytest.param(make_one_take_one_reference, (True, False), id="one for one new/old batch_update"),
pytest.param(make_one_take_one_reference, (False, True), id="one for one old/new batch_update"),
pytest.param(make_one_take_one_reference, (False, False), id="one for one old/old batch_update"),
pytest.param(make_one_take_one_same_values_reference, (True, True), id="one for one same values"),
pytest.param(make_two_take_one_reference, (True, True), id="two for one"),
pytest.param(make_one_take_two_reference, (True, True), id="one for two"),
pytest.param(make_one_existing_take_one_reference, (True, True), id="one existing for one"),
pytest.param(make_one_take_one_existing_reference, (True, True), id="one for one existing"),
pytest.param(make_one_upsert_take_one_reference, (True, True), id="one upsert for one"),
pytest.param(make_one_take_one_upsert_reference, (True, True), id="one for one upsert"),
pytest.param(make_one_take_one_unpopulated_reference, (True, True), id="one for one unpopulated"),
],
indirect=["offer_setup"],
)
@pytest.mark.anyio
async def test_make_and_take_offer(offer_setup: OfferSetup, reference: MakeAndTakeReference) -> None:
Expand Down Expand Up @@ -3106,7 +3119,7 @@ async def test_pagination_cmds(
if max_page_size is None or max_page_size == 100:
assert keys == {
"keys": ["0x61616161", "0x6161"],
"root_hash": "0x3f4ae7b8e10ef48b3114843537d5def989ee0a3b6568af7e720a71730f260fa1",
"root_hash": "0x889a4a61b17be799ae9d36831246672ef857a24091f54481431a83309d4e890e",
"success": True,
"total_bytes": 6,
"total_pages": 1,
Expand All @@ -3126,7 +3139,7 @@ async def test_pagination_cmds(
"value": "0x6161",
},
],
"root_hash": "0x3f4ae7b8e10ef48b3114843537d5def989ee0a3b6568af7e720a71730f260fa1",
"root_hash": "0x889a4a61b17be799ae9d36831246672ef857a24091f54481431a83309d4e890e",
"success": True,
"total_bytes": 9,
"total_pages": 1,
Expand All @@ -3143,7 +3156,7 @@ async def test_pagination_cmds(
elif max_page_size == 5:
assert keys == {
"keys": ["0x61616161"],
"root_hash": "0x3f4ae7b8e10ef48b3114843537d5def989ee0a3b6568af7e720a71730f260fa1",
"root_hash": "0x889a4a61b17be799ae9d36831246672ef857a24091f54481431a83309d4e890e",
"success": True,
"total_bytes": 6,
"total_pages": 2,
Expand All @@ -3157,7 +3170,7 @@ async def test_pagination_cmds(
"value": "0x61",
}
],
"root_hash": "0x3f4ae7b8e10ef48b3114843537d5def989ee0a3b6568af7e720a71730f260fa1",
"root_hash": "0x889a4a61b17be799ae9d36831246672ef857a24091f54481431a83309d4e890e",
"success": True,
"total_bytes": 9,
"total_pages": 2,
Expand Down
113 changes: 105 additions & 8 deletions chia/_tests/core/data_layer/test_data_store.py
Expand Up @@ -370,12 +370,21 @@ async def test_get_ancestors_optimized(data_store: DataStore, tree_id: bytes32)
"use_optimized",
[True, False],
)
async def test_batch_update(data_store: DataStore, tree_id: bytes32, use_optimized: bool, tmp_path: Path) -> None:
num_batches = 10
num_ops_per_batch = 100 if use_optimized else 10
saved_roots: List[Root] = []
@pytest.mark.parametrize(
"num_batches",
[1, 5, 10, 25],
)
async def test_batch_update(
data_store: DataStore,
tree_id: bytes32,
use_optimized: bool,
tmp_path: Path,
num_batches: int,
) -> None:
total_operations = 1000 if use_optimized else 100
num_ops_per_batch = total_operations // num_batches
saved_batches: List[List[Dict[str, Any]]] = []

saved_kv: List[List[TerminalNode]] = []
db_uri = generate_in_memory_db_uri()
async with DataStore.managed(database=db_uri, uri=True) as single_op_data_store:
await single_op_data_store.create_tree(tree_id, status=Status.COMMITTED)
Expand Down Expand Up @@ -442,16 +451,21 @@ async def test_batch_update(data_store: DataStore, tree_id: bytes32, use_optimiz
if (operation + 1) % num_ops_per_batch == 0:
saved_batches.append(batch)
batch = []
root = await single_op_data_store.get_tree_root(tree_id=tree_id)
saved_roots.append(root)
current_kv = await single_op_data_store.get_keys_values(tree_id=tree_id)
assert {kv.key: kv.value for kv in current_kv} == keys_values
saved_kv.append(current_kv)

for batch_number, batch in enumerate(saved_batches):
assert len(batch) == num_ops_per_batch
await data_store.insert_batch(tree_id, batch, status=Status.COMMITTED)
root = await data_store.get_tree_root(tree_id)
assert root.generation == batch_number + 1
assert root.node_hash == saved_roots[batch_number].node_hash
assert root.node_hash is not None
current_kv = await data_store.get_keys_values(tree_id=tree_id)
# Get the same keys/values, but possibly stored in other order.
assert {node.key: node.value for node in current_kv} == {
node.key: node.value for node in saved_kv[batch_number]
}
queue: List[bytes32] = [root.node_hash]
ancestors: Dict[bytes32, bytes32] = {}
while len(queue) > 0:
Expand Down Expand Up @@ -1509,6 +1523,18 @@ def id(self) -> str:
return f"pre={self.pre},count={self.count}"


@dataclass
class BatchesInsertBenchmarkCase:
count: int
batch_count: int
limit: float
marks: Marks = ()

@property
def id(self) -> str:
return f"count={self.count},batch_count={self.batch_count}"


@datacases(
BatchInsertBenchmarkCase(
pre=0,
Expand All @@ -1530,6 +1556,11 @@ def id(self) -> str:
count=1_000,
limit=36,
),
BatchInsertBenchmarkCase(
pre=10_000,
count=25_000,
limit=52,
),
)
@pytest.mark.anyio
async def test_benchmark_batch_insert_speed(
Expand Down Expand Up @@ -1567,6 +1598,40 @@ async def test_benchmark_batch_insert_speed(
)


@datacases(
BatchesInsertBenchmarkCase(
count=50,
batch_count=200,
limit=195,
),
)
altendky marked this conversation as resolved.
Show resolved Hide resolved
@pytest.mark.anyio
async def test_benchmark_batch_insert_speed_multiple_batches(
data_store: DataStore,
tree_id: bytes32,
benchmark_runner: BenchmarkRunner,
case: BatchesInsertBenchmarkCase,
) -> None:
r = random.Random()
r.seed("shadowlands", version=2)

with benchmark_runner.assert_runtime(seconds=case.limit):
for batch in range(case.batch_count):
changelist = [
{
"action": "insert",
"key": x.to_bytes(32, byteorder="big", signed=False),
"value": bytes(r.getrandbits(8) for _ in range(10000)),
}
for x in range(batch * case.count, (batch + 1) * case.count)
]
await data_store.insert_batch(
tree_id=tree_id,
changelist=changelist,
status=Status.COMMITTED,
)


@pytest.mark.anyio
async def test_delete_store_data(raw_data_store: DataStore) -> None:
tree_id = bytes32(b"\0" * 32)
Expand Down Expand Up @@ -1918,6 +1983,38 @@ async def test_insert_key_already_present(data_store: DataStore, tree_id: bytes3
await data_store.insert(key=key, value=value, tree_id=tree_id, reference_node_hash=None, side=None)


@pytest.mark.anyio
async def test_update_keys(data_store: DataStore, tree_id: bytes32) -> None:
num_keys = 10
missing_keys = 50
num_values = 10
new_keys = 10
for value in range(num_values):
changelist: List[Dict[str, Any]] = []
bytes_value = value.to_bytes(4, byteorder="big")
for key in range(num_keys + missing_keys):
bytes_key = key.to_bytes(4, byteorder="big")
changelist.append({"action": "delete", "key": bytes_key})
for key in range(num_keys):
bytes_key = key.to_bytes(4, byteorder="big")
changelist.append({"action": "insert", "key": bytes_key, "value": bytes_value})

await data_store.insert_batch(
tree_id=tree_id,
changelist=changelist,
status=Status.COMMITTED,
)
for key in range(num_keys):
bytes_key = key.to_bytes(4, byteorder="big")
node = await data_store.get_node_by_key(bytes_key, tree_id)
assert node.value == bytes_value
for key in range(num_keys, num_keys + missing_keys):
bytes_key = key.to_bytes(4, byteorder="big")
with pytest.raises(KeyNotFoundError, match=f"Key not found: {bytes_key.hex()}"):
await data_store.get_node_by_key(bytes_key, tree_id)
num_keys += new_keys


@pytest.mark.anyio
async def test_migration_unknown_version(data_store: DataStore) -> None:
async with data_store.db_wrapper.writer() as writer:
Expand Down
6 changes: 5 additions & 1 deletion chia/data_layer/data_layer.py
Expand Up @@ -338,6 +338,7 @@ async def batch_insert(
tree_id: bytes32,
changelist: List[Dict[str, Any]],
status: Status = Status.PENDING,
enable_batch_autoinsert: Optional[bool] = None,
) -> bytes32:
await self._update_confirmation_status(tree_id=tree_id)

Expand All @@ -352,7 +353,9 @@ async def batch_insert(
raise ValueError(f"Singleton with launcher ID {tree_id} is not owned by DL Wallet")

t1 = time.monotonic()
batch_hash = await self.data_store.insert_batch(tree_id, changelist, status)
if enable_batch_autoinsert is None:
enable_batch_autoinsert = self.config.get("enable_batch_autoinsert", True)
batch_hash = await self.data_store.insert_batch(tree_id, changelist, status, enable_batch_autoinsert)
t2 = time.monotonic()
self.log.info(f"Data store batch update process time: {t2 - t1}.")
# todo return empty node hash from get_tree_root
Expand Down Expand Up @@ -950,6 +953,7 @@ async def process_offered_stores(self, offer_stores: Tuple[OfferStore, ...]) ->
new_root_hash = await self.batch_insert(
tree_id=offer_store.store_id,
changelist=changelist,
enable_batch_autoinsert=False,
)
else:
existing_root = await self.get_root(store_id=offer_store.store_id)
Expand Down