Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

change datalayer python and logging from tree id to store id #17864

Merged
merged 10 commits into from May 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
18 changes: 9 additions & 9 deletions chia/_tests/core/data_layer/conftest.py
Expand Up @@ -52,9 +52,9 @@ def create_example_fixture(request: SubRequest) -> Callable[[DataStore, bytes32]
return request.param # type: ignore[no-any-return]


@pytest.fixture(name="tree_id", scope="function")
def tree_id_fixture() -> bytes32:
base = b"a tree id"
@pytest.fixture(name="store_id", scope="function")
def store_id_fixture() -> bytes32:
base = b"a store id"
pad = b"." * (32 - len(base))
return bytes32(pad + base)

Expand All @@ -66,8 +66,8 @@ async def raw_data_store_fixture(database_uri: str) -> AsyncIterable[DataStore]:


@pytest.fixture(name="data_store", scope="function")
async def data_store_fixture(raw_data_store: DataStore, tree_id: bytes32) -> AsyncIterable[DataStore]:
await raw_data_store.create_tree(tree_id=tree_id, status=Status.COMMITTED)
async def data_store_fixture(raw_data_store: DataStore, store_id: bytes32) -> AsyncIterable[DataStore]:
await raw_data_store.create_tree(store_id=store_id, status=Status.COMMITTED)

await raw_data_store.check()
yield raw_data_store
Expand All @@ -82,14 +82,14 @@ def node_type_fixture(request: SubRequest) -> NodeType:
@pytest.fixture(name="valid_node_values")
async def valid_node_values_fixture(
data_store: DataStore,
tree_id: bytes32,
store_id: bytes32,
node_type: NodeType,
) -> Dict[str, Any]:
await add_01234567_example(data_store=data_store, tree_id=tree_id)
await add_01234567_example(data_store=data_store, store_id=store_id)

if node_type == NodeType.INTERNAL:
node_a = await data_store.get_node_by_key(key=b"\x02", tree_id=tree_id)
node_b = await data_store.get_node_by_key(key=b"\x04", tree_id=tree_id)
node_a = await data_store.get_node_by_key(key=b"\x02", store_id=store_id)
node_b = await data_store.get_node_by_key(key=b"\x04", store_id=store_id)
return create_valid_node_values(node_type=node_type, left_hash=node_a.hash, right_hash=node_b.hash)
elif node_type == NodeType.TERMINAL:
return create_valid_node_values(node_type=node_type)
Expand Down
12 changes: 6 additions & 6 deletions chia/_tests/core/data_layer/test_data_cli.py
Expand Up @@ -29,29 +29,29 @@ def test_round_trip(chia_root: ChiaRoot, chia_daemon: None, chia_data: None) ->
print(f"create_data_store: {create}")
dic = json.loads(create.stdout)
assert dic["success"]
tree_id = dic["id"]
store_id = dic["id"]
key = "1a6f915513173902a7216e7d9e4a16bfd088e20683f45de3b432ce72e9cc7aa8"
value = "ffff8353594d8083616263"
changelist: List[Dict[str, str]] = [{"action": "insert", "key": key, "value": value}]
print(json.dumps(changelist))
update = chia_root.run(
args=["data", "update_data_store", "--id", tree_id, "--changelist", json.dumps(changelist)]
args=["data", "update_data_store", "--id", store_id, "--changelist", json.dumps(changelist)]
)
dic = json.loads(create.stdout)
assert dic["success"]
print(f"update_data_store: {update}")
completed_process = chia_root.run(args=["data", "get_value", "--id", tree_id, "--key", key])
completed_process = chia_root.run(args=["data", "get_value", "--id", store_id, "--key", key])
parsed = json.loads(completed_process.stdout)
expected = {"value": value, "success": True}
assert parsed == expected
get_keys_values = chia_root.run(args=["data", "get_keys_values", "--id", tree_id])
get_keys_values = chia_root.run(args=["data", "get_keys_values", "--id", store_id])
print(f"get_keys_values: {get_keys_values}")
changelist = [{"action": "delete", "key": key}]
update = chia_root.run(
args=["data", "update_data_store", "--id", tree_id, "--changelist", json.dumps(changelist)]
args=["data", "update_data_store", "--id", store_id, "--changelist", json.dumps(changelist)]
)
print(f"update_data_store: {update}")
completed_process = chia_root.run(args=["data", "get_value", "--id", tree_id, "--key", key])
completed_process = chia_root.run(args=["data", "get_value", "--id", store_id, "--key", key])
parsed = json.loads(completed_process.stdout)
expected = {"data": None, "success": True}
assert parsed == expected
4 changes: 2 additions & 2 deletions chia/_tests/core/data_layer/test_data_layer.py
Expand Up @@ -75,8 +75,8 @@ async def wallet_rpc_init() -> WalletRpcClient:
)

async with data_layer.manage():
await data_layer.get_downloader(tree_id=bytes32([0] * 32), url="")
await data_layer.get_uploaders(tree_id=bytes32([0] * 32))
await data_layer.get_downloader(store_id=bytes32([0] * 32), url="")
await data_layer.get_uploaders(store_id=bytes32([0] * 32))
await data_layer.check_plugins()

header_values = {request.headers.get(header_key) for request in recording_web_server.requests}
Expand Down
4 changes: 2 additions & 2 deletions chia/_tests/core/data_layer/test_data_layer_util.py
Expand Up @@ -100,7 +100,7 @@ class RoundTripCase:
RoundTripCase(
id="Root",
instance=Root(
tree_id=bytes32(b"\x00" * 32),
store_id=bytes32(b"\x00" * 32),
node_hash=bytes32(b"\x01" * 32),
generation=3,
status=Status.PENDING,
Expand All @@ -115,7 +115,7 @@ class RoundTripCase:
instance=ClearPendingRootsResponse(
success=True,
root=Root(
tree_id=bytes32(b"\x00" * 32),
store_id=bytes32(b"\x00" * 32),
node_hash=bytes32(b"\x01" * 32),
generation=3,
status=Status.PENDING,
Expand Down
50 changes: 25 additions & 25 deletions chia/_tests/core/data_layer/test_data_rpc.py
Expand Up @@ -199,8 +199,8 @@ async def check_coin_state(wallet_node: WalletNode, coin_id: bytes32) -> bool:
return False # pragma: no cover


async def check_singleton_confirmed(dl: DataLayer, tree_id: bytes32) -> bool:
return await dl.wallet_rpc.dl_latest_singleton(tree_id, True) is not None
async def check_singleton_confirmed(dl: DataLayer, store_id: bytes32) -> bool:
return await dl.wallet_rpc.dl_latest_singleton(store_id, True) is not None


async def process_block_and_check_offer_validity(offer: TradingOffer, offer_setup: OfferSetup) -> bool:
Expand Down Expand Up @@ -923,7 +923,7 @@ async def populate_offer_setup(offer_setup: OfferSetup, count: int) -> OfferSetu
)
for store_setup, value_prefix in setups:
await store_setup.data_layer.batch_insert(
tree_id=store_setup.id,
store_id=store_setup.id,
changelist=[
{
"action": "insert",
Expand Down Expand Up @@ -1752,7 +1752,7 @@ async def test_make_offer_failure_rolls_back_db(offer_setup: OfferSetup) -> None
with pytest.raises(Exception, match="store id not available"):
await offer_setup.maker.api.make_offer(request=maker_request)

pending_root = await offer_setup.maker.data_layer.data_store.get_pending_root(tree_id=offer_setup.maker.id)
pending_root = await offer_setup.maker.data_layer.data_store.get_pending_root(store_id=offer_setup.maker.id)
assert pending_root is None


Expand Down Expand Up @@ -2055,29 +2055,29 @@ async def test_clear_pending_roots(

data_store = data_layer.data_store

tree_id = bytes32(range(32))
await data_store.create_tree(tree_id=tree_id, status=Status.COMMITTED)
store_id = bytes32(range(32))
await data_store.create_tree(store_id=store_id, status=Status.COMMITTED)

key = b"\x01\x02"
value = b"abc"

await data_store.insert(
key=key,
value=value,
tree_id=tree_id,
store_id=store_id,
reference_node_hash=None,
side=None,
status=Status.PENDING,
)

pending_root = await data_store.get_pending_root(tree_id=tree_id)
pending_root = await data_store.get_pending_root(store_id=store_id)
assert pending_root is not None

if layer == InterfaceLayer.direct:
cleared_root = await data_rpc_api.clear_pending_roots({"store_id": tree_id.hex()})
cleared_root = await data_rpc_api.clear_pending_roots({"store_id": store_id.hex()})
elif layer == InterfaceLayer.funcs:
cleared_root = await clear_pending_roots(
store_id=tree_id,
store_id=store_id,
rpc_port=rpc_port,
root_path=bt.root_path,
)
Expand All @@ -2089,7 +2089,7 @@ async def test_clear_pending_roots(
"data",
"clear_pending_roots",
"--id",
tree_id.hex(),
store_id.hex(),
"--data-rpc-port",
str(rpc_port),
"--yes",
Expand Down Expand Up @@ -2120,7 +2120,7 @@ async def test_clear_pending_roots(
net_config=bt.config,
)
try:
cleared_root = await client.clear_pending_roots(store_id=tree_id)
cleared_root = await client.clear_pending_roots(store_id=store_id)
finally:
client.close()
await client.await_closed()
Expand Down Expand Up @@ -2153,23 +2153,23 @@ async def test_issue_15955_deadlock(
await full_node_api.wait_for_wallet_synced(wallet_node)

# create a store
transaction_records, tree_id = await data_layer.create_store(fee=uint64(0))
transaction_records, store_id = await data_layer.create_store(fee=uint64(0))
await full_node_api.process_transaction_records(records=transaction_records)
await full_node_api.wait_for_wallet_synced(wallet_node)
assert await check_singleton_confirmed(dl=data_layer, tree_id=tree_id)
assert await check_singleton_confirmed(dl=data_layer, store_id=store_id)

# insert a key and value
key = b"\x00"
value = b"\x01" * 10_000
transaction_record = await data_layer.batch_update(
tree_id=tree_id,
store_id=store_id,
changelist=[{"action": "insert", "key": key, "value": value}],
fee=uint64(0),
)
assert transaction_record is not None
await full_node_api.process_transaction_records(records=[transaction_record])
await full_node_api.wait_for_wallet_synced(wallet_node)
assert await check_singleton_confirmed(dl=data_layer, tree_id=tree_id)
assert await check_singleton_confirmed(dl=data_layer, store_id=store_id)

# get the value a bunch through several periodic data management cycles
concurrent_requests = 10
Expand All @@ -2183,7 +2183,7 @@ async def test_issue_15955_deadlock(
while time.monotonic() < end:
with anyio.fail_after(adjusted_timeout(timeout)):
await asyncio.gather(
*(asyncio.create_task(data_layer.get_value(store_id=tree_id, key=key)) for _ in range(10))
*(asyncio.create_task(data_layer.get_value(store_id=store_id, key=key)) for _ in range(10))
)


Expand Down Expand Up @@ -3294,7 +3294,7 @@ async def test_unsubmitted_batch_update(
)
keys_values = await data_rpc_api.get_keys_values({"id": store_id.hex()})
assert keys_values == {"keys_values": []}
pending_root = await data_layer.data_store.get_pending_root(tree_id=store_id)
pending_root = await data_layer.data_store.get_pending_root(store_id=store_id)
assert pending_root is not None
assert pending_root.status == Status.PENDING_BATCH

Expand All @@ -3313,7 +3313,7 @@ async def test_unsubmitted_batch_update(
for key, value in to_insert:
assert kv_dict["0x" + key.hex()] == "0x" + value.hex()
prev_keys_values = keys_values
old_root = await data_layer.data_store.get_tree_root(tree_id=store_id)
old_root = await data_layer.data_store.get_tree_root(store_id=store_id)

key = b"e"
value = b"\x00\x05"
Expand All @@ -3326,7 +3326,7 @@ async def test_unsubmitted_batch_update(
await full_node_api.farm_blocks_to_puzzlehash(
count=NUM_BLOCKS_WITHOUT_SUBMIT, guarantee_transaction_blocks=True
)
root = await data_layer.data_store.get_tree_root(tree_id=store_id)
root = await data_layer.data_store.get_tree_root(store_id=store_id)
assert root == old_root

key = b"f"
Expand All @@ -3342,9 +3342,9 @@ async def test_unsubmitted_batch_update(
)

await data_rpc_api.clear_pending_roots({"store_id": store_id.hex()})
pending_root = await data_layer.data_store.get_pending_root(tree_id=store_id)
pending_root = await data_layer.data_store.get_pending_root(store_id=store_id)
assert pending_root is None
root = await data_layer.data_store.get_tree_root(tree_id=store_id)
root = await data_layer.data_store.get_tree_root(store_id=store_id)
assert root == old_root

key = b"g"
Expand All @@ -3363,7 +3363,7 @@ async def test_unsubmitted_batch_update(
keys_values = await data_rpc_api.get_keys_values({"id": store_id.hex()})
assert keys_values == prev_keys_values

pending_root = await data_layer.data_store.get_pending_root(tree_id=store_id)
pending_root = await data_layer.data_store.get_pending_root(store_id=store_id)
assert pending_root is not None
assert pending_root.status == Status.PENDING_BATCH

Expand Down Expand Up @@ -3427,7 +3427,7 @@ async def test_unsubmitted_batch_update(
else: # pragma: no cover
assert False, "unhandled parametrization"

pending_root = await data_layer.data_store.get_pending_root(tree_id=store_id)
pending_root = await data_layer.data_store.get_pending_root(store_id=store_id)
assert pending_root is not None
assert pending_root.status == Status.PENDING

Expand Down Expand Up @@ -3606,7 +3606,7 @@ async def test_multistore_update(
with pytest.raises(Exception, match="No pending roots found to submit"):
await data_rpc_api.submit_all_pending_roots({})
for store_id in store_ids:
pending_root = await data_store.get_pending_root(tree_id=store_id)
pending_root = await data_store.get_pending_root(store_id=store_id)
assert pending_root is None

store_updates = []
Expand Down