Skip to content

Commit

Permalink
change datalayer python and logging from tree id to store id (#17864)
Browse files Browse the repository at this point in the history
* change python and logging from tree id to store id

* always more

* oops

* again

* and...

* comment

* more
  • Loading branch information
altendky committed May 10, 2024
1 parent aa57676 commit 98df4ce
Show file tree
Hide file tree
Showing 16 changed files with 810 additions and 798 deletions.
18 changes: 9 additions & 9 deletions chia/_tests/core/data_layer/conftest.py
Expand Up @@ -52,9 +52,9 @@ def create_example_fixture(request: SubRequest) -> Callable[[DataStore, bytes32]
return request.param # type: ignore[no-any-return]


@pytest.fixture(name="tree_id", scope="function")
def tree_id_fixture() -> bytes32:
base = b"a tree id"
@pytest.fixture(name="store_id", scope="function")
def store_id_fixture() -> bytes32:
base = b"a store id"
pad = b"." * (32 - len(base))
return bytes32(pad + base)

Expand All @@ -66,8 +66,8 @@ async def raw_data_store_fixture(database_uri: str) -> AsyncIterable[DataStore]:


@pytest.fixture(name="data_store", scope="function")
async def data_store_fixture(raw_data_store: DataStore, tree_id: bytes32) -> AsyncIterable[DataStore]:
await raw_data_store.create_tree(tree_id=tree_id, status=Status.COMMITTED)
async def data_store_fixture(raw_data_store: DataStore, store_id: bytes32) -> AsyncIterable[DataStore]:
await raw_data_store.create_tree(store_id=store_id, status=Status.COMMITTED)

await raw_data_store.check()
yield raw_data_store
Expand All @@ -82,14 +82,14 @@ def node_type_fixture(request: SubRequest) -> NodeType:
@pytest.fixture(name="valid_node_values")
async def valid_node_values_fixture(
data_store: DataStore,
tree_id: bytes32,
store_id: bytes32,
node_type: NodeType,
) -> Dict[str, Any]:
await add_01234567_example(data_store=data_store, tree_id=tree_id)
await add_01234567_example(data_store=data_store, store_id=store_id)

if node_type == NodeType.INTERNAL:
node_a = await data_store.get_node_by_key(key=b"\x02", tree_id=tree_id)
node_b = await data_store.get_node_by_key(key=b"\x04", tree_id=tree_id)
node_a = await data_store.get_node_by_key(key=b"\x02", store_id=store_id)
node_b = await data_store.get_node_by_key(key=b"\x04", store_id=store_id)
return create_valid_node_values(node_type=node_type, left_hash=node_a.hash, right_hash=node_b.hash)
elif node_type == NodeType.TERMINAL:
return create_valid_node_values(node_type=node_type)
Expand Down
12 changes: 6 additions & 6 deletions chia/_tests/core/data_layer/test_data_cli.py
Expand Up @@ -29,29 +29,29 @@ def test_round_trip(chia_root: ChiaRoot, chia_daemon: None, chia_data: None) ->
print(f"create_data_store: {create}")
dic = json.loads(create.stdout)
assert dic["success"]
tree_id = dic["id"]
store_id = dic["id"]
key = "1a6f915513173902a7216e7d9e4a16bfd088e20683f45de3b432ce72e9cc7aa8"
value = "ffff8353594d8083616263"
changelist: List[Dict[str, str]] = [{"action": "insert", "key": key, "value": value}]
print(json.dumps(changelist))
update = chia_root.run(
args=["data", "update_data_store", "--id", tree_id, "--changelist", json.dumps(changelist)]
args=["data", "update_data_store", "--id", store_id, "--changelist", json.dumps(changelist)]
)
dic = json.loads(create.stdout)
assert dic["success"]
print(f"update_data_store: {update}")
completed_process = chia_root.run(args=["data", "get_value", "--id", tree_id, "--key", key])
completed_process = chia_root.run(args=["data", "get_value", "--id", store_id, "--key", key])
parsed = json.loads(completed_process.stdout)
expected = {"value": value, "success": True}
assert parsed == expected
get_keys_values = chia_root.run(args=["data", "get_keys_values", "--id", tree_id])
get_keys_values = chia_root.run(args=["data", "get_keys_values", "--id", store_id])
print(f"get_keys_values: {get_keys_values}")
changelist = [{"action": "delete", "key": key}]
update = chia_root.run(
args=["data", "update_data_store", "--id", tree_id, "--changelist", json.dumps(changelist)]
args=["data", "update_data_store", "--id", store_id, "--changelist", json.dumps(changelist)]
)
print(f"update_data_store: {update}")
completed_process = chia_root.run(args=["data", "get_value", "--id", tree_id, "--key", key])
completed_process = chia_root.run(args=["data", "get_value", "--id", store_id, "--key", key])
parsed = json.loads(completed_process.stdout)
expected = {"data": None, "success": True}
assert parsed == expected
4 changes: 2 additions & 2 deletions chia/_tests/core/data_layer/test_data_layer.py
Expand Up @@ -75,8 +75,8 @@ async def wallet_rpc_init() -> WalletRpcClient:
)

async with data_layer.manage():
await data_layer.get_downloader(tree_id=bytes32([0] * 32), url="")
await data_layer.get_uploaders(tree_id=bytes32([0] * 32))
await data_layer.get_downloader(store_id=bytes32([0] * 32), url="")
await data_layer.get_uploaders(store_id=bytes32([0] * 32))
await data_layer.check_plugins()

header_values = {request.headers.get(header_key) for request in recording_web_server.requests}
Expand Down
4 changes: 2 additions & 2 deletions chia/_tests/core/data_layer/test_data_layer_util.py
Expand Up @@ -100,7 +100,7 @@ class RoundTripCase:
RoundTripCase(
id="Root",
instance=Root(
tree_id=bytes32(b"\x00" * 32),
store_id=bytes32(b"\x00" * 32),
node_hash=bytes32(b"\x01" * 32),
generation=3,
status=Status.PENDING,
Expand All @@ -115,7 +115,7 @@ class RoundTripCase:
instance=ClearPendingRootsResponse(
success=True,
root=Root(
tree_id=bytes32(b"\x00" * 32),
store_id=bytes32(b"\x00" * 32),
node_hash=bytes32(b"\x01" * 32),
generation=3,
status=Status.PENDING,
Expand Down
50 changes: 25 additions & 25 deletions chia/_tests/core/data_layer/test_data_rpc.py
Expand Up @@ -199,8 +199,8 @@ async def check_coin_state(wallet_node: WalletNode, coin_id: bytes32) -> bool:
return False # pragma: no cover


async def check_singleton_confirmed(dl: DataLayer, tree_id: bytes32) -> bool:
return await dl.wallet_rpc.dl_latest_singleton(tree_id, True) is not None
async def check_singleton_confirmed(dl: DataLayer, store_id: bytes32) -> bool:
return await dl.wallet_rpc.dl_latest_singleton(store_id, True) is not None


async def process_block_and_check_offer_validity(offer: TradingOffer, offer_setup: OfferSetup) -> bool:
Expand Down Expand Up @@ -923,7 +923,7 @@ async def populate_offer_setup(offer_setup: OfferSetup, count: int) -> OfferSetu
)
for store_setup, value_prefix in setups:
await store_setup.data_layer.batch_insert(
tree_id=store_setup.id,
store_id=store_setup.id,
changelist=[
{
"action": "insert",
Expand Down Expand Up @@ -1752,7 +1752,7 @@ async def test_make_offer_failure_rolls_back_db(offer_setup: OfferSetup) -> None
with pytest.raises(Exception, match="store id not available"):
await offer_setup.maker.api.make_offer(request=maker_request)

pending_root = await offer_setup.maker.data_layer.data_store.get_pending_root(tree_id=offer_setup.maker.id)
pending_root = await offer_setup.maker.data_layer.data_store.get_pending_root(store_id=offer_setup.maker.id)
assert pending_root is None


Expand Down Expand Up @@ -2055,29 +2055,29 @@ async def test_clear_pending_roots(

data_store = data_layer.data_store

tree_id = bytes32(range(32))
await data_store.create_tree(tree_id=tree_id, status=Status.COMMITTED)
store_id = bytes32(range(32))
await data_store.create_tree(store_id=store_id, status=Status.COMMITTED)

key = b"\x01\x02"
value = b"abc"

await data_store.insert(
key=key,
value=value,
tree_id=tree_id,
store_id=store_id,
reference_node_hash=None,
side=None,
status=Status.PENDING,
)

pending_root = await data_store.get_pending_root(tree_id=tree_id)
pending_root = await data_store.get_pending_root(store_id=store_id)
assert pending_root is not None

if layer == InterfaceLayer.direct:
cleared_root = await data_rpc_api.clear_pending_roots({"store_id": tree_id.hex()})
cleared_root = await data_rpc_api.clear_pending_roots({"store_id": store_id.hex()})
elif layer == InterfaceLayer.funcs:
cleared_root = await clear_pending_roots(
store_id=tree_id,
store_id=store_id,
rpc_port=rpc_port,
root_path=bt.root_path,
)
Expand All @@ -2089,7 +2089,7 @@ async def test_clear_pending_roots(
"data",
"clear_pending_roots",
"--id",
tree_id.hex(),
store_id.hex(),
"--data-rpc-port",
str(rpc_port),
"--yes",
Expand Down Expand Up @@ -2120,7 +2120,7 @@ async def test_clear_pending_roots(
net_config=bt.config,
)
try:
cleared_root = await client.clear_pending_roots(store_id=tree_id)
cleared_root = await client.clear_pending_roots(store_id=store_id)
finally:
client.close()
await client.await_closed()
Expand Down Expand Up @@ -2153,23 +2153,23 @@ async def test_issue_15955_deadlock(
await full_node_api.wait_for_wallet_synced(wallet_node)

# create a store
transaction_records, tree_id = await data_layer.create_store(fee=uint64(0))
transaction_records, store_id = await data_layer.create_store(fee=uint64(0))
await full_node_api.process_transaction_records(records=transaction_records)
await full_node_api.wait_for_wallet_synced(wallet_node)
assert await check_singleton_confirmed(dl=data_layer, tree_id=tree_id)
assert await check_singleton_confirmed(dl=data_layer, store_id=store_id)

# insert a key and value
key = b"\x00"
value = b"\x01" * 10_000
transaction_record = await data_layer.batch_update(
tree_id=tree_id,
store_id=store_id,
changelist=[{"action": "insert", "key": key, "value": value}],
fee=uint64(0),
)
assert transaction_record is not None
await full_node_api.process_transaction_records(records=[transaction_record])
await full_node_api.wait_for_wallet_synced(wallet_node)
assert await check_singleton_confirmed(dl=data_layer, tree_id=tree_id)
assert await check_singleton_confirmed(dl=data_layer, store_id=store_id)

# get the value a bunch through several periodic data management cycles
concurrent_requests = 10
Expand All @@ -2183,7 +2183,7 @@ async def test_issue_15955_deadlock(
while time.monotonic() < end:
with anyio.fail_after(adjusted_timeout(timeout)):
await asyncio.gather(
*(asyncio.create_task(data_layer.get_value(store_id=tree_id, key=key)) for _ in range(10))
*(asyncio.create_task(data_layer.get_value(store_id=store_id, key=key)) for _ in range(10))
)


Expand Down Expand Up @@ -3294,7 +3294,7 @@ async def test_unsubmitted_batch_update(
)
keys_values = await data_rpc_api.get_keys_values({"id": store_id.hex()})
assert keys_values == {"keys_values": []}
pending_root = await data_layer.data_store.get_pending_root(tree_id=store_id)
pending_root = await data_layer.data_store.get_pending_root(store_id=store_id)
assert pending_root is not None
assert pending_root.status == Status.PENDING_BATCH

Expand All @@ -3313,7 +3313,7 @@ async def test_unsubmitted_batch_update(
for key, value in to_insert:
assert kv_dict["0x" + key.hex()] == "0x" + value.hex()
prev_keys_values = keys_values
old_root = await data_layer.data_store.get_tree_root(tree_id=store_id)
old_root = await data_layer.data_store.get_tree_root(store_id=store_id)

key = b"e"
value = b"\x00\x05"
Expand All @@ -3326,7 +3326,7 @@ async def test_unsubmitted_batch_update(
await full_node_api.farm_blocks_to_puzzlehash(
count=NUM_BLOCKS_WITHOUT_SUBMIT, guarantee_transaction_blocks=True
)
root = await data_layer.data_store.get_tree_root(tree_id=store_id)
root = await data_layer.data_store.get_tree_root(store_id=store_id)
assert root == old_root

key = b"f"
Expand All @@ -3342,9 +3342,9 @@ async def test_unsubmitted_batch_update(
)

await data_rpc_api.clear_pending_roots({"store_id": store_id.hex()})
pending_root = await data_layer.data_store.get_pending_root(tree_id=store_id)
pending_root = await data_layer.data_store.get_pending_root(store_id=store_id)
assert pending_root is None
root = await data_layer.data_store.get_tree_root(tree_id=store_id)
root = await data_layer.data_store.get_tree_root(store_id=store_id)
assert root == old_root

key = b"g"
Expand All @@ -3363,7 +3363,7 @@ async def test_unsubmitted_batch_update(
keys_values = await data_rpc_api.get_keys_values({"id": store_id.hex()})
assert keys_values == prev_keys_values

pending_root = await data_layer.data_store.get_pending_root(tree_id=store_id)
pending_root = await data_layer.data_store.get_pending_root(store_id=store_id)
assert pending_root is not None
assert pending_root.status == Status.PENDING_BATCH

Expand Down Expand Up @@ -3427,7 +3427,7 @@ async def test_unsubmitted_batch_update(
else: # pragma: no cover
assert False, "unhandled parametrization"

pending_root = await data_layer.data_store.get_pending_root(tree_id=store_id)
pending_root = await data_layer.data_store.get_pending_root(store_id=store_id)
assert pending_root is not None
assert pending_root.status == Status.PENDING

Expand Down Expand Up @@ -3606,7 +3606,7 @@ async def test_multistore_update(
with pytest.raises(Exception, match="No pending roots found to submit"):
await data_rpc_api.submit_all_pending_roots({})
for store_id in store_ids:
pending_root = await data_store.get_pending_root(tree_id=store_id)
pending_root = await data_store.get_pending_root(store_id=store_id)
assert pending_root is None

store_updates = []
Expand Down

0 comments on commit 98df4ce

Please sign in to comment.