/
kvac.py
346 lines (275 loc) · 13.2 KB
/
kvac.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
from __future__ import annotations
from collections.abc import Iterable
from enum import Enum
from typing import List, Tuple, NamedTuple, Any, Optional, Type
from poksho.group.ristretto import RistrettoPoint
from kvac.credential_presentation import CredentialPresentation
from kvac.mac import MACTag
from kvac.issuer_key import IssuerPublicKey, IssuerKeyPair
from kvac.elgamal import ElGamalKeyPair
from kvac.commitment import BlindAttributeCommitment
from kvac.exceptions import VerificationFailure, CallNotAllowed
from kvac.issuance_request import IssuanceRequest
from kvac.issuance_response import IssuanceResponse
from kvac.verifiable_encryption import KeyPair as HidingKeyPair, \
AttributeRepresentationForEncryption as AttributeRepresentationForHiding, \
MessageToEncrypt as MessageToHide
def to_list(value_or_iterable):
"""Converts an iterable to a list or produces a list with a single element
if the argument is not iterable."""
if isinstance(value_or_iterable, Iterable):
return list(value_or_iterable)
return [value_or_iterable]
class AttributeValue(NamedTuple):
"""AttributeValue is the object that stores the value of an attribute in an
instance of KVAC."""
value: Tuple[RistrettoPoint] | AttributeRepresentationForHiding
scalar: bool # True if this is a scalar attribute. Scalar attributes can not be hidden.
blind: bool # True if this is a blind attribute during issuance.
hidden: bool # True if this is a hidden attribute during presentation.
class Attribute:
"""This descriptor is used to declare an attribute of a KVAC.
An Attribute is a class variable on the KVAC that indicates the presence of
a certain attribute on the KVAC. The value of the attribute for an instance of
a KVAC is stored in the KVAC as a private instance variable of type AttributeValue.
See the documentation of the KVAC class for usage information."""
def __init__(self, *, blind: bool = False, hidden: bool = False, scalar: bool = False):
if scalar and hidden:
raise ValueError("scalar attribute can not be hidden")
self._blind: bool = blind
self._hidden: bool = hidden
self._scalar: bool = scalar
self._name: str = ""
self._private_name: str = ""
self._index: int
def __set_name__(self, owner: Type[KVAC], name: str):
self._name = name
self._private_name = f"_{name}"
self._index = self.index_in_kvac(owner)
def __get__(self, obj: KVAC, objtype: Optional[Type] = None) -> Any:
attribute = getattr(obj, self._private_name, None)
if attribute is None:
raise AttributeError(f"{obj} has no attribute '{self.name}'")
if self.hidden:
return attribute.value.decode()
return attribute.value
def get_internal_representation(self, obj: KVAC):
attribute = getattr(obj, self._private_name, None)
if attribute is None:
raise AttributeError(f"attribute '{self.name}' has no value assigned")
if self.scalar:
return obj.issuer_public_key.system.G_ms[self.index] ** attribute.value
return attribute.value
def __set__(self, obj: KVAC, value: Any):
if getattr(obj, self._private_name, None) is not None:
raise AttributeError(f"can't set attribute '{self.name}' more than once")
if self.hidden:
if not isinstance(value, MessageToHide):
raise ValueError(f"attribute '{self.name}' is a hidden attribute, please specify a "
f"MessageToHide object")
value = AttributeRepresentationForHiding.encode(value)
setattr(obj, self._private_name, AttributeValue(value=value,
blind=self.blind,
hidden=self.hidden,
scalar=self.scalar))
def index_in_kvac(self, owner: Type[KVAC]):
index = 0
for attribute in owner.attributes():
if attribute is self:
break
if attribute.hidden:
index += 2 # hidden attributes consist of 2 attributes internally
else:
index += 1
return index
@property
def blind(self):
"""Returns whether the attribute is blinded during issuance."""
return self._blind
@property
def hidden(self):
"""Returns whether the attribute is hidden during presentation."""
return self._hidden
@property
def scalar(self):
"""Returns whether the attribute is a scalar."""
return self._scalar
@property
def name(self):
"""Returns the name of the attribute in the KVAC."""
return self._name
@property
def index(self):
"""Returns the position of this attribute in the list of all attributes of the credential."""
return self._index
class KVAC:
"""Represents a Keyed-Verification Anonymous Credential (KVAC).
Refer to the README on how to use this class for implementing credentials with specific attributes..
"""
class ProcessStage(Enum):
"""
This class indicates the stage we have currently reached in obtaining a credential.
It is used to ensure we do the required steps (see above) in the correct order.
"""
CREDENTIAL_INACTIVE = 0
ISSUANCE_REQUESTED = 1
CREDENTIAL_ACTIVE = 2
issuer_public_key: IssuerPublicKey
process_stage: KVAC.ProcessStage
issuance_request: IssuanceRequest
user_key: ElGamalKeyPair
tag: MACTag
def __init__(self, *, issuer_key: IssuerPublicKey, **kwargs: Any):
self.issuer_public_key: IssuerPublicKey = issuer_key
available_attributes = set(map(lambda a: a.name, self.attributes()))
given_attributes = set(kwargs.keys())
if not set(available_attributes).issubset(given_attributes):
raise ValueError(
f"missing value for attribute(s): {available_attributes - given_attributes}"
)
for attribute in self.attributes():
setattr(self, attribute.name, kwargs[attribute.name])
self.process_stage = self.ProcessStage.CREDENTIAL_INACTIVE
@classmethod
def number_of_attribute_components(cls) -> int:
"""
Returns the total number of attribute components across all attributes of this credential.
Should be used to determine how many attribute an issuer key pair should have.
"""
return len(cls.revealed_attributes()) + 2 * len(cls.hidden_attributes())
@classmethod
def attributes(cls) -> List[Attribute]:
"""Returns the attributes of this credential."""
return [
member for member in vars(cls).values() if isinstance(member, Attribute)
]
@classmethod
def clear_attributes(cls) -> List[Attribute]:
"""Returns the attributes that are not blinded during issuance."""
# cls.attributes is iterable.
# pylint: disable-next=not-an-iterable
return [attribute for attribute in cls.attributes() if attribute.blind is False]
def clear_attribute_components(self):
"""
Returns the flattened version of all attribute components that are not blinded
during issuance.
Each attribute may consist of two components depending on whether it is going to be
hidden during presentation. However, during the issuance, we can treat these
components as separate attributes.
"""
attributes = []
for attribute in self.clear_attributes():
attributes += to_list(attribute.get_internal_representation(self))
return attributes
@classmethod
def blind_attributes(cls) -> List[Attribute]:
"""Returns the attributes that are blinded during issuance."""
# cls.attributes is iterable.
# pylint: disable-next=not-an-iterable
return [attribute for attribute in cls.attributes() if attribute.blind is True]
def blind_attribute_components(self):
"""
Returns the flattened version of all attribute components that are blinded
during issuance.
Each attribute may consist of two components depending on whether it is going to be
hidden during presentation. However, during the issuance, we can treat these
components as separate attributes.
"""
attributes = []
for attribute in self.blind_attributes():
attributes += to_list(attribute.get_internal_representation(self))
return attributes
def request(self) -> Tuple[IssuanceRequest, BlindAttributeCommitment]:
"""Request a new KVAC.
Called by the user."""
issuance_request, user_key = IssuanceRequest.new(
self.issuer_public_key,
self.clear_attribute_components(),
self.blind_attribute_components()
)
self.issuance_request = issuance_request
self.user_key = user_key
self.process_stage = self.ProcessStage.ISSUANCE_REQUESTED
return issuance_request, self.commit_blinded_attributes()
def commit_blinded_attributes(self):
"""Creates a commitment on the attributes blinded during issuance."""
return BlindAttributeCommitment.new(
self.issuer_public_key,
self.blind_attribute_components()
)
@classmethod
def issue(
cls,
*,
issuer_key: IssuerKeyPair,
request: IssuanceRequest,
commitment: Optional[BlindAttributeCommitment] = None,
) -> IssuanceResponse:
"""Issues a new KVAC and generates the issuance response.
Also checks that the blinded values committed to in the request matches
the given commitment.
Called by the issuer.
"""
if len(request.blinded_attributes) > 0:
if commitment is None:
raise ValueError(
"commitment is required if blind attributes are given."
)
if request.commitment != commitment:
raise VerificationFailure(
"Commitment in request does not match given commitment."
)
if request.verify(issuer_key.public, request.blinding_key) is False:
raise VerificationFailure(
"Invalid issuance request. This could mean that the user is malicious."
)
return IssuanceResponse.new(issuer_key, request)
def activate(self, response: IssuanceResponse):
"""Activates the credential with a tag from an issuance response so that it can be presented."""
if self.process_stage != self.ProcessStage.ISSUANCE_REQUESTED:
raise CallNotAllowed(
"Cannot activate the credential before having created a issuance request."
)
if response.verify(self.issuer_public_key, self.issuance_request) is False:
raise VerificationFailure(
"Invalid issuance response. This could mean that the issuer is malicious."
)
self.tag = response.tag.decrypt(self.user_key)
self.process_stage = self.ProcessStage.CREDENTIAL_ACTIVE
@classmethod
def revealed_attributes(cls) -> List[Attribute]:
"""Returns the attributes that are not hidden during presentation."""
# cls.attributes is iterable.
# pylint: disable-next=not-an-iterable
return [attribute for attribute in cls.attributes() if attribute.hidden is False]
@classmethod
def hidden_attributes(cls) -> List[Attribute]:
"""Returns the attributes that are hidden during presentation."""
# cls.attributes is iterable.
# pylint: disable-next=not-an-iterable
return [attribute for attribute in cls.attributes() if attribute.hidden is True]
def present(self, hiding_keys: Optional[List[HidingKeyPair]]) -> CredentialPresentation:
"""Creates a presentation for the credential.
:param hiding_keys One hiding key for each attribute to hide.
Can be omitted iff there are no hidden attributes whatsoever.
Called by the user."""
if self.process_stage != self.ProcessStage.CREDENTIAL_ACTIVE:
raise CallNotAllowed(
"Cannot present a credential without having received a tag."
)
hiding_pattern = []
attributes = []
# We need to use the attributes in this order to match the order of the issuance (proof)
# during presentation.
for attribute in self.clear_attributes() + self.blind_attributes():
hiding_pattern.append(attribute.hidden)
attributes.append(attribute.get_internal_representation(self))
hiding_keys = hiding_keys or []
if 0 < len(self.hidden_attributes()) != len(hiding_keys):
raise ValueError("There must be one hiding key given for each attribute to hide")
return CredentialPresentation.new(self.tag, self.issuer_public_key, hiding_pattern, attributes, hiding_keys)
@classmethod
def verify_presentation(cls, *, issuer_key: IssuerKeyPair, presentation: CredentialPresentation) -> bool:
"""Verifies a credential presentation.
Called by the issuer."""
return presentation.verify(issuer_key)