-
Notifications
You must be signed in to change notification settings - Fork 278
/
Copy pathsignature_builtin_runner.py
148 lines (130 loc) · 5.57 KB
/
signature_builtin_runner.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
from typing import Any, Dict, Optional
from starkware.cairo.lang.builtins.signature.instance_def import (
CELLS_PER_SIGNATURE,
INPUT_CELLS_PER_SIGNATURE,
EcdsaInstanceDef,
)
from starkware.cairo.lang.vm.builtin_runner import BuiltinVerifier, SimpleBuiltinRunner
from starkware.cairo.lang.vm.relocatable import RelocatableValue
from starkware.python.math_utils import safe_div
class SignatureBuiltinRunner(SimpleBuiltinRunner):
def __init__(
self,
name: str,
included: bool,
ratio,
process_signature,
verify_signature,
instance_def: Optional[EcdsaInstanceDef] = None,
):
"""
'process_signature' is a function that takes signatures as saved in 'signatures' and
returns a dict representing the signature in the format expected by the component used by
the runner.
It may also assert that the signature is valid.
"""
super().__init__(
name=name,
included=included,
ratio=ratio,
cells_per_instance=CELLS_PER_SIGNATURE,
n_input_cells=INPUT_CELLS_PER_SIGNATURE,
)
self.process_signature = process_signature
self.verify_signature = verify_signature
self.instance_def = instance_def
# A dict of address -> signature.
self.signatures: Dict = {}
def get_instance_def(self):
return self.instance_def
def add_validation_rules(self, runner):
def rule(memory, addr):
# A signature builtin instance consists of a pair of public key and message.
if addr.offset % CELLS_PER_SIGNATURE == 0 and addr + 1 in memory:
pubkey_addr = addr
msg_addr = addr + 1
elif addr.offset % CELLS_PER_SIGNATURE == 1 and addr - 1 in memory:
pubkey_addr = addr - 1
msg_addr = addr
else:
return set()
pubkey = memory[pubkey_addr]
msg = memory[msg_addr]
assert isinstance(pubkey, int), (
f"ECDSA builtin: Expected public key at address {pubkey_addr} to be an integer. "
f"Got: {pubkey}."
)
assert isinstance(msg, int), (
f"ECDSA builtin: Expected message hash at address {msg_addr} to be an integer. "
f"Got: {msg}."
)
assert pubkey_addr in self.signatures, (
f"Signature hint is missing for ECDSA builtin at address {pubkey_addr}. "
"Add it using 'ecdsa_builtin.add_signature'."
)
signature = self.signatures[pubkey_addr]
assert self.verify_signature(pubkey, msg, signature), (
f"Signature {signature}, is invalid, with respect to the public key {pubkey}, "
f"and the message hash {msg}."
)
return {pubkey_addr, msg_addr}
runner.vm.add_validation_rule(self.base.segment_index, rule)
def air_private_input(self, runner) -> Dict[str, Any]:
res: Dict[int, Any] = {}
for addr, signature in self.signatures.items():
addr_offset = addr - self.base
idx = safe_div(addr_offset, CELLS_PER_SIGNATURE)
pubkey = runner.vm_memory[addr]
msg = runner.vm_memory[addr + 1]
res[idx] = {
"index": idx,
"pubkey": hex(pubkey),
"msg": hex(msg),
"signature_input": self.process_signature(pubkey, msg, signature),
}
return {self.name: sorted(res.values(), key=lambda item: item["index"])}
def add_signature(self, addr, signature):
"""
This function should be used in Cairo hints.
"""
assert isinstance(
addr, RelocatableValue
), f"Expected memory address to be relocatable value. Found: {addr}."
assert (
addr.segment_index == self.base.segment_index
), f"Signature hint must point to the signature builtin segment, not {addr}."
assert (
addr.offset % CELLS_PER_SIGNATURE == 0
), f"Signature hint must point to the public key cell, not {addr}."
self.signatures[addr] = signature
def get_additional_data(self):
return [
[list(RelocatableValue.to_tuple(addr)), signature]
for addr, signature in sorted(self.signatures.items())
]
def extend_additional_data(self, data, relocate_callback, data_is_trusted=True):
for addr, signature in data:
relocated_addr = relocate_callback(RelocatableValue.from_tuple(addr))
assert relocated_addr.segment_index == self.base.segment_index, (
f"Error while loading {self.name} builtin additional data: "
"Signature hint must point to the signature builtin segment. "
f"Found: {addr} (after relocation: {relocated_addr})."
)
self.signatures[relocated_addr] = signature
class SignatureBuiltinVerifier(BuiltinVerifier):
def __init__(self, included: bool, ratio):
self.included = included
self.ratio = ratio
def expected_stack(self, public_input):
if not self.included:
return [], []
addresses = public_input.memory_segments["signature"]
max_size = safe_div(public_input.n_steps, self.ratio) * CELLS_PER_SIGNATURE
assert (
0
<= addresses.begin_addr
<= addresses.stop_ptr
<= addresses.begin_addr + max_size
< 2**64
)
return [addresses.begin_addr], [addresses.stop_ptr]