forked from Chia-Network/pool-reference
/
store.py
165 lines (146 loc) · 5.94 KB
/
store.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
import asyncio
from dataclasses import dataclass
from pathlib import Path
from typing import Optional, Set, List, Tuple, Dict
import aiosqlite
from blspy import G1Element
from chia.types.blockchain_format.sized_bytes import bytes32
from chia.util.ints import uint32, uint64
from chia.util.lru_cache import LRUCache
from chia.util.streamable import streamable, Streamable
@dataclass(frozen=True)
@streamable
class FarmerRecord(Streamable):
singleton_genesis: bytes32
owner_public_key: G1Element
pool_puzzle_hash: bytes32
relative_lock_height: uint32
p2_singleton_puzzle_hash: bytes32
blockchain_height: uint32 # Height of the singleton (might not be the last one)
singleton_coin_id: bytes32 # Coin id of the singleton (might not be the last one)
points: uint64
difficulty: uint64
rewards_target: bytes32
is_pool_member: bool # If the farmer leaves the pool, this gets set to False
class PoolStore:
connection: aiosqlite.Connection
lock: asyncio.Lock
@classmethod
async def create(cls):
self = cls()
self.db_path = Path("pooldb.sqlite")
self.connection = await aiosqlite.connect(self.db_path)
self.lock = asyncio.Lock()
await self.connection.execute("pragma journal_mode=wal")
await self.connection.execute("pragma synchronous=2")
await self.connection.execute(
(
"CREATE TABLE IF NOT EXISTS farmer("
"singleton_genesis text PRIMARY KEY,"
" owner_public_key text,"
" pool_puzzle_hash text,"
" relative_lock_height bigint,"
" p2_singleton_puzzle_hash text,"
" blockchain_height bigint,"
" singleton_coin_id text,"
" points bigint,"
" difficulty bigint,"
" rewards_target text,"
" is_pool_member tinyint)"
)
)
# Useful for reorg lookups
await self.connection.execute("CREATE INDEX IF NOT EXISTS scan_ph on farmer(p2_singleton_puzzle_hash)")
await self.connection.commit()
self.coin_record_cache = LRUCache(1000)
return self
async def add_farmer_record(self, farmer_record: FarmerRecord):
cursor = await self.connection.execute(
f"INSERT OR REPLACE INTO farmer VALUES(?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)",
(
farmer_record.singleton_genesis.hex(),
bytes(farmer_record.owner_public_key).hex(),
farmer_record.pool_puzzle_hash.hex(),
farmer_record.relative_lock_height,
farmer_record.p2_singleton_puzzle_hash.hex(),
farmer_record.blockchain_height,
farmer_record.singleton_coin_id.hex(),
farmer_record.points,
farmer_record.difficulty,
farmer_record.rewards_target.hex(),
int(farmer_record.is_pool_member),
),
)
await cursor.close()
await self.connection.commit()
async def get_farmer_record(self, singleton_genesis: bytes32) -> Optional[FarmerRecord]:
# TODO: use cache
cursor = await self.connection.execute(
"SELECT * from farmer where singleton_genesis=?", (singleton_genesis.hex(),)
)
row = await cursor.fetchone()
if row is None:
return None
return FarmerRecord(
bytes.fromhex(row[0]),
G1Element.from_bytes(bytes.fromhex(row[1])),
bytes.fromhex(row[2]),
row[3],
bytes.fromhex(row[4]),
row[5],
bytes.fromhex(row[6]),
row[7],
row[8],
bytes.fromhex(row[9]),
True if row[10] == 1 else False,
)
async def get_pay_to_singleton_phs(self) -> Set[bytes32]:
cursor = await self.connection.execute("SELECT p2_singleton_puzzle_hash from farmer")
rows = await cursor.fetchall()
all_phs: Set[bytes32] = set()
for row in rows:
all_phs.add(bytes32(bytes.fromhex(row[0])))
return all_phs
async def get_farmer_records_for_p2_singleton_phs(self, puzzle_hashes: Set[bytes32]) -> List[FarmerRecord]:
if len(puzzle_hashes) == 0:
return []
puzzle_hashes_db = tuple([ph.hex() for ph in list(puzzle_hashes)])
cursor = await self.connection.execute(
f'SELECT * from farmer WHERE p2_singleton_puzzle_hash in ({"?," * (len(puzzle_hashes_db) - 1)}?) '
)
rows = await cursor.fetchall()
records: List[FarmerRecord] = []
for row in rows:
record = FarmerRecord(
bytes.fromhex(row[0]),
G1Element.from_bytes(bytes.fromhex(row[1])),
bytes.fromhex(row[2]),
row[3],
bytes.fromhex(row[4]),
row[5],
bytes.fromhex(row[6]),
row[7],
row[8],
bytes.fromhex(row[9]),
True if row[10] == 1 else False,
)
records.append(record)
return records
async def get_farmer_points_and_ph(self) -> List[Tuple[uint64, bytes32]]:
cursor = await self.connection.execute(f"SELECT points, rewards_target from farmer")
rows = await cursor.fetchall()
accumulated: Dict[bytes32, uint64] = {}
for row in rows:
points: uint64 = uint64(row[0])
ph: bytes32 = bytes32(bytes.fromhex(row[1]))
if ph in accumulated:
accumulated[ph] += points
else:
accumulated[ph] = points
ret: List[Tuple[uint64, bytes32]] = []
for ph, total_points in accumulated.items():
ret.append((total_points, ph))
return ret
async def clear_farmer_points(self) -> List[Tuple[uint64, bytes32]]:
cursor = await self.connection.execute(f"UPDATE farmer set points=0")
await cursor.fetchall()