Skip to content

Commit

Permalink
Optimize DL autoinserts. (#17883)
Browse files Browse the repository at this point in the history
* Optimize DL autoinserts.

* Lint.

* Improve tests.

* update root history references

* Stress test benchmarks.

* Try with low limit first.

* Try without -k

* Gather more data.

* Run only benchmark.

* Remove exception.

* Increase limit

* Increase limit.

* Increase timeout

* Add new benchmark.

* Increase timeout.

* Timeouts.

* Increase timeout

* Revert stress test.

* Use the correct limit

* Update test.

* Add option to disable the optimization.

* Revert optimisation in offers.

* Test all cases properly.

* Apply suggestions from code review

Co-authored-by: Kyle Altendorf <sda@fstab.net>

* Improve case handling.

---------

Co-authored-by: Kyle Altendorf <sda@fstab.net>
  • Loading branch information
fchirica and altendky committed May 9, 2024
1 parent 26ceeae commit 6c25d3a
Show file tree
Hide file tree
Showing 5 changed files with 230 additions and 38 deletions.
71 changes: 42 additions & 29 deletions chia/_tests/core/data_layer/test_data_rpc.py
Expand Up @@ -93,6 +93,7 @@ async def init_data_layer_service(
wallet_service: Optional[WalletService] = None,
manage_data_interval: int = 5,
maximum_full_file_count: Optional[int] = None,
enable_batch_autoinsert: bool = True,
) -> AsyncIterator[DataLayerService]:
config = bt.config
config["data_layer"]["wallet_peer"]["port"] = int(wallet_rpc_port)
Expand All @@ -101,6 +102,7 @@ async def init_data_layer_service(
config["data_layer"]["port"] = 0
config["data_layer"]["rpc_port"] = 0
config["data_layer"]["manage_data_interval"] = 5
config["data_layer"]["enable_batch_autoinsert"] = enable_batch_autoinsert
if maximum_full_file_count is not None:
config["data_layer"]["maximum_full_file_count"] = maximum_full_file_count
if db_path is not None:
Expand Down Expand Up @@ -806,8 +808,10 @@ async def offer_setup_fixture(
self_hostname: str,
two_wallet_nodes_services: SimulatorsAndWalletsServices,
tmp_path: Path,
request: pytest.FixtureRequest,
) -> AsyncIterator[OfferSetup]:
[full_node_service], wallet_services, bt = two_wallet_nodes_services
enable_batch_autoinsertion_settings = getattr(request, "param", (True, True))
full_node_api = full_node_service._api
wallets: List[Wallet] = []
for wallet_service in wallet_services:
Expand All @@ -822,12 +826,16 @@ async def offer_setup_fixture(

async with contextlib.AsyncExitStack() as exit_stack:
store_setups: List[StoreSetup] = []
for wallet_service in wallet_services:
for enable_batch_autoinsert, wallet_service in zip(enable_batch_autoinsertion_settings, wallet_services):
assert wallet_service.rpc_server is not None
port = wallet_service.rpc_server.listen_port
data_layer_service = await exit_stack.enter_async_context(
init_data_layer_service(
wallet_rpc_port=port, wallet_service=wallet_service, bt=bt, db_path=tmp_path.joinpath(str(port))
wallet_rpc_port=port,
wallet_service=wallet_service,
bt=bt,
db_path=tmp_path.joinpath(str(port)),
enable_batch_autoinsert=enable_batch_autoinsert,
)
)
data_layer = data_layer_service._api.data_layer
Expand Down Expand Up @@ -914,19 +922,20 @@ async def populate_offer_setup(offer_setup: OfferSetup, count: int) -> OfferSetu
(offer_setup.taker, b"\x02"),
)
for store_setup, value_prefix in setups:
await store_setup.api.batch_update(
{
"id": store_setup.id.hex(),
"changelist": [
{
"action": "insert",
"key": value.to_bytes(length=1, byteorder="big").hex(),
"value": (value_prefix + value.to_bytes(length=1, byteorder="big")).hex(),
}
for value in range(count)
],
}
await store_setup.data_layer.batch_insert(
tree_id=store_setup.id,
changelist=[
{
"action": "insert",
"key": value.to_bytes(length=1, byteorder="big"),
"value": (value_prefix + value.to_bytes(length=1, byteorder="big")),
}
for value in range(count)
],
status=Status.PENDING,
enable_batch_autoinsert=False,
)
await store_setup.data_layer.publish_update(store_setup.id, uint64(0))

await process_for_data_layer_keys(
expected_key=b"\x00",
Expand Down Expand Up @@ -1552,18 +1561,22 @@ class MakeAndTakeReference:


@pytest.mark.parametrize(
argnames="reference",
argvalues=[
pytest.param(make_one_take_one_reference, id="one for one"),
pytest.param(make_one_take_one_same_values_reference, id="one for one same values"),
pytest.param(make_two_take_one_reference, id="two for one"),
pytest.param(make_one_take_two_reference, id="one for two"),
pytest.param(make_one_existing_take_one_reference, id="one existing for one"),
pytest.param(make_one_take_one_existing_reference, id="one for one existing"),
pytest.param(make_one_upsert_take_one_reference, id="one upsert for one"),
pytest.param(make_one_take_one_upsert_reference, id="one for one upsert"),
pytest.param(make_one_take_one_unpopulated_reference, id="one for one unpopulated"),
"reference, offer_setup",
[
pytest.param(make_one_take_one_reference, (True, True), id="one for one new/new batch_update"),
pytest.param(make_one_take_one_reference, (True, False), id="one for one new/old batch_update"),
pytest.param(make_one_take_one_reference, (False, True), id="one for one old/new batch_update"),
pytest.param(make_one_take_one_reference, (False, False), id="one for one old/old batch_update"),
pytest.param(make_one_take_one_same_values_reference, (True, True), id="one for one same values"),
pytest.param(make_two_take_one_reference, (True, True), id="two for one"),
pytest.param(make_one_take_two_reference, (True, True), id="one for two"),
pytest.param(make_one_existing_take_one_reference, (True, True), id="one existing for one"),
pytest.param(make_one_take_one_existing_reference, (True, True), id="one for one existing"),
pytest.param(make_one_upsert_take_one_reference, (True, True), id="one upsert for one"),
pytest.param(make_one_take_one_upsert_reference, (True, True), id="one for one upsert"),
pytest.param(make_one_take_one_unpopulated_reference, (True, True), id="one for one unpopulated"),
],
indirect=["offer_setup"],
)
@pytest.mark.anyio
async def test_make_and_take_offer(offer_setup: OfferSetup, reference: MakeAndTakeReference) -> None:
Expand Down Expand Up @@ -3106,7 +3119,7 @@ async def test_pagination_cmds(
if max_page_size is None or max_page_size == 100:
assert keys == {
"keys": ["0x61616161", "0x6161"],
"root_hash": "0x3f4ae7b8e10ef48b3114843537d5def989ee0a3b6568af7e720a71730f260fa1",
"root_hash": "0x889a4a61b17be799ae9d36831246672ef857a24091f54481431a83309d4e890e",
"success": True,
"total_bytes": 6,
"total_pages": 1,
Expand All @@ -3126,7 +3139,7 @@ async def test_pagination_cmds(
"value": "0x6161",
},
],
"root_hash": "0x3f4ae7b8e10ef48b3114843537d5def989ee0a3b6568af7e720a71730f260fa1",
"root_hash": "0x889a4a61b17be799ae9d36831246672ef857a24091f54481431a83309d4e890e",
"success": True,
"total_bytes": 9,
"total_pages": 1,
Expand All @@ -3143,7 +3156,7 @@ async def test_pagination_cmds(
elif max_page_size == 5:
assert keys == {
"keys": ["0x61616161"],
"root_hash": "0x3f4ae7b8e10ef48b3114843537d5def989ee0a3b6568af7e720a71730f260fa1",
"root_hash": "0x889a4a61b17be799ae9d36831246672ef857a24091f54481431a83309d4e890e",
"success": True,
"total_bytes": 6,
"total_pages": 2,
Expand All @@ -3157,7 +3170,7 @@ async def test_pagination_cmds(
"value": "0x61",
}
],
"root_hash": "0x3f4ae7b8e10ef48b3114843537d5def989ee0a3b6568af7e720a71730f260fa1",
"root_hash": "0x889a4a61b17be799ae9d36831246672ef857a24091f54481431a83309d4e890e",
"success": True,
"total_bytes": 9,
"total_pages": 2,
Expand Down
113 changes: 105 additions & 8 deletions chia/_tests/core/data_layer/test_data_store.py
Expand Up @@ -370,12 +370,21 @@ async def test_get_ancestors_optimized(data_store: DataStore, tree_id: bytes32)
"use_optimized",
[True, False],
)
async def test_batch_update(data_store: DataStore, tree_id: bytes32, use_optimized: bool, tmp_path: Path) -> None:
num_batches = 10
num_ops_per_batch = 100 if use_optimized else 10
saved_roots: List[Root] = []
@pytest.mark.parametrize(
"num_batches",
[1, 5, 10, 25],
)
async def test_batch_update(
data_store: DataStore,
tree_id: bytes32,
use_optimized: bool,
tmp_path: Path,
num_batches: int,
) -> None:
total_operations = 1000 if use_optimized else 100
num_ops_per_batch = total_operations // num_batches
saved_batches: List[List[Dict[str, Any]]] = []

saved_kv: List[List[TerminalNode]] = []
db_uri = generate_in_memory_db_uri()
async with DataStore.managed(database=db_uri, uri=True) as single_op_data_store:
await single_op_data_store.create_tree(tree_id, status=Status.COMMITTED)
Expand Down Expand Up @@ -442,16 +451,21 @@ async def test_batch_update(data_store: DataStore, tree_id: bytes32, use_optimiz
if (operation + 1) % num_ops_per_batch == 0:
saved_batches.append(batch)
batch = []
root = await single_op_data_store.get_tree_root(tree_id=tree_id)
saved_roots.append(root)
current_kv = await single_op_data_store.get_keys_values(tree_id=tree_id)
assert {kv.key: kv.value for kv in current_kv} == keys_values
saved_kv.append(current_kv)

for batch_number, batch in enumerate(saved_batches):
assert len(batch) == num_ops_per_batch
await data_store.insert_batch(tree_id, batch, status=Status.COMMITTED)
root = await data_store.get_tree_root(tree_id)
assert root.generation == batch_number + 1
assert root.node_hash == saved_roots[batch_number].node_hash
assert root.node_hash is not None
current_kv = await data_store.get_keys_values(tree_id=tree_id)
# Get the same keys/values, but possibly stored in other order.
assert {node.key: node.value for node in current_kv} == {
node.key: node.value for node in saved_kv[batch_number]
}
queue: List[bytes32] = [root.node_hash]
ancestors: Dict[bytes32, bytes32] = {}
while len(queue) > 0:
Expand Down Expand Up @@ -1509,6 +1523,18 @@ def id(self) -> str:
return f"pre={self.pre},count={self.count}"


@dataclass
class BatchesInsertBenchmarkCase:
count: int
batch_count: int
limit: float
marks: Marks = ()

@property
def id(self) -> str:
return f"count={self.count},batch_count={self.batch_count}"


@datacases(
BatchInsertBenchmarkCase(
pre=0,
Expand All @@ -1530,6 +1556,11 @@ def id(self) -> str:
count=1_000,
limit=36,
),
BatchInsertBenchmarkCase(
pre=10_000,
count=25_000,
limit=52,
),
)
@pytest.mark.anyio
async def test_benchmark_batch_insert_speed(
Expand Down Expand Up @@ -1567,6 +1598,40 @@ async def test_benchmark_batch_insert_speed(
)


@datacases(
BatchesInsertBenchmarkCase(
count=50,
batch_count=200,
limit=195,
),
)
@pytest.mark.anyio
async def test_benchmark_batch_insert_speed_multiple_batches(
data_store: DataStore,
tree_id: bytes32,
benchmark_runner: BenchmarkRunner,
case: BatchesInsertBenchmarkCase,
) -> None:
r = random.Random()
r.seed("shadowlands", version=2)

with benchmark_runner.assert_runtime(seconds=case.limit):
for batch in range(case.batch_count):
changelist = [
{
"action": "insert",
"key": x.to_bytes(32, byteorder="big", signed=False),
"value": bytes(r.getrandbits(8) for _ in range(10000)),
}
for x in range(batch * case.count, (batch + 1) * case.count)
]
await data_store.insert_batch(
tree_id=tree_id,
changelist=changelist,
status=Status.COMMITTED,
)


@pytest.mark.anyio
async def test_delete_store_data(raw_data_store: DataStore) -> None:
tree_id = bytes32(b"\0" * 32)
Expand Down Expand Up @@ -1918,6 +1983,38 @@ async def test_insert_key_already_present(data_store: DataStore, tree_id: bytes3
await data_store.insert(key=key, value=value, tree_id=tree_id, reference_node_hash=None, side=None)


@pytest.mark.anyio
async def test_update_keys(data_store: DataStore, tree_id: bytes32) -> None:
num_keys = 10
missing_keys = 50
num_values = 10
new_keys = 10
for value in range(num_values):
changelist: List[Dict[str, Any]] = []
bytes_value = value.to_bytes(4, byteorder="big")
for key in range(num_keys + missing_keys):
bytes_key = key.to_bytes(4, byteorder="big")
changelist.append({"action": "delete", "key": bytes_key})
for key in range(num_keys):
bytes_key = key.to_bytes(4, byteorder="big")
changelist.append({"action": "insert", "key": bytes_key, "value": bytes_value})

await data_store.insert_batch(
tree_id=tree_id,
changelist=changelist,
status=Status.COMMITTED,
)
for key in range(num_keys):
bytes_key = key.to_bytes(4, byteorder="big")
node = await data_store.get_node_by_key(bytes_key, tree_id)
assert node.value == bytes_value
for key in range(num_keys, num_keys + missing_keys):
bytes_key = key.to_bytes(4, byteorder="big")
with pytest.raises(KeyNotFoundError, match=f"Key not found: {bytes_key.hex()}"):
await data_store.get_node_by_key(bytes_key, tree_id)
num_keys += new_keys


@pytest.mark.anyio
async def test_migration_unknown_version(data_store: DataStore) -> None:
async with data_store.db_wrapper.writer() as writer:
Expand Down
6 changes: 5 additions & 1 deletion chia/data_layer/data_layer.py
Expand Up @@ -338,6 +338,7 @@ async def batch_insert(
tree_id: bytes32,
changelist: List[Dict[str, Any]],
status: Status = Status.PENDING,
enable_batch_autoinsert: Optional[bool] = None,
) -> bytes32:
await self._update_confirmation_status(tree_id=tree_id)

Expand All @@ -352,7 +353,9 @@ async def batch_insert(
raise ValueError(f"Singleton with launcher ID {tree_id} is not owned by DL Wallet")

t1 = time.monotonic()
batch_hash = await self.data_store.insert_batch(tree_id, changelist, status)
if enable_batch_autoinsert is None:
enable_batch_autoinsert = self.config.get("enable_batch_autoinsert", True)
batch_hash = await self.data_store.insert_batch(tree_id, changelist, status, enable_batch_autoinsert)
t2 = time.monotonic()
self.log.info(f"Data store batch update process time: {t2 - t1}.")
# todo return empty node hash from get_tree_root
Expand Down Expand Up @@ -950,6 +953,7 @@ async def process_offered_stores(self, offer_stores: Tuple[OfferStore, ...]) ->
new_root_hash = await self.batch_insert(
tree_id=offer_store.store_id,
changelist=changelist,
enable_batch_autoinsert=False,
)
else:
existing_root = await self.get_root(store_id=offer_store.store_id)
Expand Down

0 comments on commit 6c25d3a

Please sign in to comment.