/
server.py
313 lines (266 loc) · 12.6 KB
/
server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
from __future__ import annotations
from datetime import date, timedelta
from typing import Dict, Optional, Tuple
from kvac.commitment import BlindAttributeCommitment
from kvac.credential_presentation import CredentialPresentation
from kvac.issuance_request import IssuanceRequest
from kvac.issuance_response import IssuanceResponse
from kvac.issuer_key import IssuerKeyPair, IssuerPublicKey
from kvac.system_params import SystemParams
from kvac.verifiable_encryption import Ciphertext, KeyCommitment
from poksho.group.ristretto import RistrettoScalar
from zkgroup.auth_credential import AuthCredential
from zkgroup.constants import Role, MAX_ATTRIBUTES, REDEMPTION_DATE_RANGE
from zkgroup.exceptions import NotAMemberError, NotAnAdministratorError
from zkgroup.profile_key_credential import ProfileKeyCredential
class Server:
"""Represents the ZKGroup Server. It is a singleton, which means that subsequent calls
'Server()' will return the same instance. It manages groups and their registered users."""
_instance: Optional[Server] = None
_initialized: bool = False
def __new__(cls):
if cls._instance is None:
cls._instance = super(Server, cls).__new__(cls)
cls._initialized = False
return cls._instance
def __init__(self):
if self._initialized:
# required check because __init__ can be re-called
return
self.system_params = SystemParams.generate(MAX_ATTRIBUTES, "zkgroup.server.server")
self._auth_keypair = IssuerKeyPair.generate(
self.system_params,
AuthCredential.number_of_attribute_components())
self._profile_key_keypair = IssuerKeyPair.generate(
self.system_params,
ProfileKeyCredential.number_of_attribute_components())
# use tuple of encoded KeyCommitments as group key
# use encoded UidCiphertext as user key
# for each user we store an optional encoded ProfileKeyCiphertext and their Role
self.groups: \
Dict[Tuple[bytes, bytes],
Dict[bytes,
Tuple[Optional[bytes], Role]]] = {}
# (uid, profile_key_version) --> profile_key_commitment
self.profile_key_commitments: Dict[
Tuple[bytes, bytes], BlindAttributeCommitment] = {}
self._initialized = True
# Operations for Credentials
def get_auth_credential(
self,
request: IssuanceRequest,
redemption_date: date,
) -> IssuanceResponse:
"""The server issues an AuthCredential."""
# assumes an authenticated channel
# check: authenticated uid == request.clear_attributes[1].to_bytes()
if self.system_params.G_ms[2] ** RistrettoScalar.from_bytes(
redemption_date.toordinal().to_bytes(32, byteorder="little")
) != request.clear_attributes[2]:
raise ValueError("Redemption date does not match the one in the request.")
if redemption_date > date.today() + timedelta(days=REDEMPTION_DATE_RANGE):
raise ValueError("Redemption date is too far in the future.")
return AuthCredential.issue(issuer_key=self._auth_keypair, request=request)
def commit_to_profile_key(
self,
uid: bytes,
profile_key_version: bytes,
profile_key_commitment: BlindAttributeCommitment
):
"""The server stores the profile key commitment of a user."""
# assumes an authenticated channel
self.profile_key_commitments[(uid, profile_key_version)] = profile_key_commitment
def get_profile_key_credential(
self,
request: IssuanceRequest,
uid: bytes,
profile_key_version: bytes,
commitment: BlindAttributeCommitment
) -> IssuanceResponse:
"""The server issues a ProfileKeyCredential."""
if (uid, profile_key_version) not in self.profile_key_commitments:
raise ValueError("Profile key commitment not found.")
if self.profile_key_commitments[(uid, profile_key_version)] != commitment:
raise ValueError("Profile key commitment does not match.")
return ProfileKeyCredential.issue(
issuer_key=self._profile_key_keypair,
request=request,
commitment=commitment)
def _verify_auth_credential(
self,
presentation: CredentialPresentation,
redemption_date: date,
) -> bool:
"""The server verifies an AuthCredential."""
if self.system_params.G_ms[2] ** RistrettoScalar.from_bytes(
redemption_date.toordinal().to_bytes(32, byteorder="little")
) != presentation.attributes[1].value:
raise ValueError("Redemption date does not match the one in the presentation.")
if redemption_date < date.today():
raise ValueError("Redemption date is in the past.")
return AuthCredential.verify_presentation(
issuer_key=self._auth_keypair,
presentation=presentation)
def _verify_profile_key_credential(
self,
presentation: CredentialPresentation,
) -> bool:
"""The server verifies a ProfileKeyCredential."""
return ProfileKeyCredential.verify_presentation(
issuer_key=self._profile_key_keypair,
presentation=presentation)
# Operations for Group Management
def auth_as_group_member(
self,
group_public_params: Tuple[KeyCommitment, KeyCommitment],
auth_credential_presentation: CredentialPresentation,
redemption_date: date,
) -> bool:
if not self._verify_auth_credential(auth_credential_presentation, redemption_date):
raise ValueError("Invalid auth credential presentation.")
uid_ciphertext = auth_credential_presentation.attributes[0].value
return bytes(uid_ciphertext) in self.groups[(bytes(group_public_params[0]), bytes(group_public_params[1]))]
def add_group_member(
self,
group_public_params: Tuple[KeyCommitment, KeyCommitment],
target_uid_ciphertext: Ciphertext,
target_profile_key_ciphertext: Ciphertext,
target_role: Role,
auth_credential_presentation: CredentialPresentation,
pk_credential_presentation: CredentialPresentation,
redemption_date: date,
):
if not self.auth_as_group_member(
group_public_params,
auth_credential_presentation,
redemption_date):
raise NotAMemberError()
if not self._verify_profile_key_credential(pk_credential_presentation):
raise ValueError("Invalid profile key credential presentation")
group = self.groups[(bytes(group_public_params[0]), bytes(group_public_params[1]))]
if not self.user_is_admin(
group_public_params,
auth_credential_presentation):
raise NotAnAdministratorError()
if bytes(target_uid_ciphertext) in group:
target_user = group[bytes(target_uid_ciphertext)]
if not target_user[0] is None:
raise ValueError("User already exists.")
group[bytes(target_uid_ciphertext)] = (bytes(target_profile_key_ciphertext), target_role)
def create_group(
self,
group_public_params: Tuple[KeyCommitment, KeyCommitment],
profile_key_ciphertext: Ciphertext,
auth_credential_presentation: CredentialPresentation,
pk_credential_presentation: CredentialPresentation,
redemption_date: date,
):
if not self._verify_auth_credential(auth_credential_presentation, redemption_date):
raise ValueError("Invalid auth credential presentation.")
if not self._verify_profile_key_credential(pk_credential_presentation):
raise ValueError("Invalid profile key credential presentation")
uid_ciphertext = auth_credential_presentation.attributes[0].value
self.groups[(bytes(group_public_params[0]), bytes(group_public_params[1]))] = {
bytes(uid_ciphertext): (bytes(profile_key_ciphertext), Role.ADMINISTRATOR)}
def user_is_admin(
self,
group_public_params: Tuple[KeyCommitment, KeyCommitment],
auth_credential_presentation: CredentialPresentation,
) -> bool:
uid_ciphertext = auth_credential_presentation.attributes[0].value
return self.groups[(bytes(group_public_params[0]), bytes(group_public_params[1]))][
bytes(uid_ciphertext)][1] == Role.ADMINISTRATOR
def delete_group(
self,
group_public_params: Tuple[KeyCommitment, KeyCommitment],
auth_credential_presentation: CredentialPresentation,
redemption_date: date,
):
if not self.auth_as_group_member(
group_public_params,
auth_credential_presentation,
redemption_date):
raise NotAMemberError()
if not self.user_is_admin(
group_public_params,
auth_credential_presentation):
raise NotAnAdministratorError()
del self.groups[(bytes(group_public_params[0]), bytes(group_public_params[1]))]
def remove_member(
self,
group_public_params: Tuple[KeyCommitment, KeyCommitment],
target_uid_ciphertext: Ciphertext,
auth_credential_presentation: CredentialPresentation,
redemption_date: date,
):
if not self.auth_as_group_member(
group_public_params,
auth_credential_presentation,
redemption_date):
raise NotAMemberError()
group = self.groups[(bytes(group_public_params[0]), bytes(group_public_params[1]))]
uid_ciphertext = auth_credential_presentation.attributes[0].value
if not self.user_is_admin(
group_public_params,
auth_credential_presentation) \
and uid_ciphertext != target_uid_ciphertext:
raise NotAnAdministratorError()
del group[bytes(target_uid_ciphertext)]
def add_invited_group_member(
self,
group_public_params: Tuple[KeyCommitment, KeyCommitment],
new_uid_ciphertext: Ciphertext,
new_role: Role,
auth_credential_presentation: CredentialPresentation,
redemption_date: date,
):
if not self.auth_as_group_member(
group_public_params,
auth_credential_presentation,
redemption_date):
raise NotAMemberError()
group = self.groups[(bytes(group_public_params[0]), bytes(group_public_params[1]))]
if not self.user_is_admin(
group_public_params,
auth_credential_presentation):
raise NotAnAdministratorError()
if bytes(new_uid_ciphertext) in group:
raise ValueError("User already exists.")
group[bytes(new_uid_ciphertext)] = (None, new_role)
def fetch_group_members(
self,
group_public_params: Tuple[KeyCommitment, KeyCommitment],
auth_credential_presentation: CredentialPresentation,
redemption_date: date,
) -> Dict[bytes, Tuple[Optional[bytes], Role]]:
if not self.auth_as_group_member(
group_public_params,
auth_credential_presentation,
redemption_date):
raise NotAMemberError()
return self.groups[(bytes(group_public_params[0]), bytes(group_public_params[1]))]
def update_profile_key(
self,
group_public_params: Tuple[KeyCommitment, KeyCommitment],
profile_key_ciphertext: Ciphertext,
auth_credential_presentation: CredentialPresentation,
pk_credential_presentation: CredentialPresentation,
redemption_date: date,
):
if not self.auth_as_group_member(
group_public_params,
auth_credential_presentation,
redemption_date):
raise NotAMemberError()
if not self._verify_profile_key_credential(pk_credential_presentation):
raise ValueError("Invalid profile key credential presentation")
group = self.groups[(bytes(group_public_params[0]), bytes(group_public_params[1]))]
uid_ciphertext = auth_credential_presentation.attributes[0].value
role = group[bytes(uid_ciphertext)][1]
group[bytes(uid_ciphertext)] = (bytes(profile_key_ciphertext), role)
@property
def auth_public_key(self) -> IssuerPublicKey:
return self._auth_keypair.public
@property
def profile_key_public_key(self) -> IssuerPublicKey:
return self._profile_key_keypair.public