• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OpShin / uplc / 192

pending completion
192

push

travis-ci-com

web-flow
Merge pull request #14 from OpShin/feat/flat_decoder

Add a native Flat decoder

298 of 298 new or added lines in 6 files covered. (100.0%)

1704 of 1778 relevant lines covered (95.84%)

3.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.97
/uplc/flat_decoder.py
1
from ast import NodeVisitor
4✔
2
from typing import Callable
4✔
3

4
from .ast import *
4✔
5

6
UPLC_TAG_WIDTHS = {
4✔
7
    "term": 4,
8
    "type": 3,
9
    "constType": 4,
10
    "builtin": 7,
11
    "constant": 4,
12
    "kind": 1,
13
}
14

15

16
def parse_raw_byte(b):
4✔
17
    """
18
    Parses a single byte in the Plutus-core byte-list representation of an int
19
    :param b: int
20
    :return: int
21
    """
22
    return b & 0b01111111
4✔
23

24

25
def raw_byte_is_last(b):
4✔
26
    """
27
    Returns true if 'b' is the last byte in the Plutus-core byte-list representation of an int.
28
    :param b: int
29
    :return: bool
30
    """
31
    return (b & 0b10000000) == 0
4✔
32

33

34
def bytes_to_int(bytes):
4✔
35
    """
36
    Combines a list of Plutus-core bytes into a int (leading bit of each byte is ignored).
37
    Differs from int.from_bytes because only 7 bits are used from each byte.
38
    :param bytes: list[int]
39
    :return: int
40
    """
41
    value = 0
4✔
42

43
    n = len(bytes)
4✔
44

45
    for i in range(n):
4✔
46
        b = bytes[i]
4✔
47

48
        # 7 (not 8), because leading bit isn't used here
49
        value += b * (2 ** (i * 7))
4✔
50

51
    return value
4✔
52

53

54
def unzigzag(value: int, signed: bool):
4✔
55
    """
56
    Unapplies zigzag encoding
57
    :return: int
58
    """
59
    if not signed:
4✔
60
        return value
4✔
61
    else:
62
        if value % 2 == 0:
4✔
63
            return value // 2
4✔
64
        else:
65
            return -((value + 1) // 2)
4✔
66

67

68
class UplcDeserializer:
4✔
69
    def __init__(self, bits: str):
4✔
70
        self._bits = bits
4✔
71
        self._pos = 0
4✔
72

73
    def tag_width(self, category: str) -> int:
4✔
74
        assert category in UPLC_TAG_WIDTHS, f"unknown tag category {category}"
4✔
75

76
        return UPLC_TAG_WIDTHS[category]
4✔
77

78
    def built_in_fun(self, id: int) -> BuiltInFun:
4✔
79
        return BuiltInFun(id)
4✔
80

81
    def read_linked_fixed_width_integer_list(self, elem_size: int) -> List[int]:
4✔
82
        nil_or_cons = self.read_bit()
4✔
83

84
        if nil_or_cons == 0:
4✔
85
            return []
4✔
86
        else:
87
            elem = self.read_fixed_width_integer(elem_size)
4✔
88
            return [elem] + self.read_linked_fixed_width_integer_list(elem_size)
4✔
89

90
    def read_term(self) -> AST:
4✔
91
        tag = self.read_tag("term")
4✔
92

93
        if tag == 0:
4✔
94
            return self.read_variable()
4✔
95
        elif tag == 1:
4✔
96
            return self.read_delay()
4✔
97
        elif tag == 2:
4✔
98
            return self.read_lambda()
4✔
99
        elif tag == 3:
4✔
100
            return self.read_apply()
4✔
101
        elif tag == 4:
4✔
102
            return self.read_constant()
4✔
103
        elif tag == 5:
4✔
104
            return self.read_force()
4✔
105
        elif tag == 6:
4✔
106
            return Error()
4✔
107
        elif tag == 7:
4✔
108
            return self.read_builtin()
4✔
109
        else:
110
            raise ValueError(f"term tag {tag} unhandled")
×
111

112
    def read_integer(self, signed: bool = False) -> int:
4✔
113
        byts = []
4✔
114

115
        b = self.read_byte()
4✔
116
        byts.append(b)
4✔
117

118
        while not raw_byte_is_last(b):
4✔
119
            b = self.read_byte()
4✔
120
            byts.append(b)
4✔
121

122
        res = bytes_to_int([parse_raw_byte(b) for b in byts])
4✔
123

124
        res = unzigzag(res, signed)
4✔
125

126
        return res
4✔
127

128
    def move_to_byte_boundary(self, force=False):
4✔
129
        """
130
        Moves position to the next byte boundary.
131

132
        Args:
133
            force (bool): If True, move to the next byte boundary even if already at one.
134

135
        Returns:
136
            None
137
        """
138
        if self._pos % 8 != 0:
4✔
139
            n = 8 - self._pos % 8
4✔
140
            self.read_bits(n)
4✔
141
        elif force:
4✔
142
            self.read_bits(8)
4✔
143

144
    def read_bytes(self) -> bytes:
4✔
145
        self.move_to_byte_boundary(True)
4✔
146

147
        byts = []
4✔
148

149
        n_chunk = self.read_byte()
4✔
150

151
        while n_chunk > 0:
4✔
152
            for _ in range(n_chunk):
4✔
153
                byts.append(self.read_byte())
4✔
154

155
            n_chunk = self.read_byte()
4✔
156

157
        return bytes(byts)
4✔
158

159
    def read_builtin_byte_string(self) -> BuiltinByteString:
4✔
160
        byts = self.read_bytes()
4✔
161

162
        return BuiltinByteString(byts)
4✔
163

164
    def read_builtin_string(self) -> BuiltinString:
4✔
165
        byts = self.read_bytes()
4✔
166

167
        s = byts.decode("utf8")
4✔
168

169
        return BuiltinString(s)
4✔
170

171
    def read_list(self, typed_reader: Callable[[], Constant]) -> List[Constant]:
4✔
172
        items = []
4✔
173

174
        while self.read_bit() == 1:
4✔
175
            items.append(typed_reader())
4✔
176

177
        return items
4✔
178

179
    def read_data(self) -> PlutusData:
4✔
180
        byts = self.read_bytes()
4✔
181

182
        return data_from_cbor(byts)
4✔
183

184
    def read_variable(self) -> Variable:
4✔
185
        index = self.read_integer(signed=False)
4✔
186

187
        return Variable(str(index))
4✔
188

189
    def read_lambda(self) -> Lambda:
4✔
190
        rhs = self.read_term()
4✔
191

192
        return Lambda("_", rhs)
4✔
193

194
    def read_apply(self) -> Apply:
4✔
195
        a = self.read_term()
4✔
196
        b = self.read_term()
4✔
197

198
        return Apply(a, b)
4✔
199

200
    def read_constant(self) -> Constant:
4✔
201
        type_list = self.read_linked_fixed_width_integer_list(
4✔
202
            self.tag_width("constType")
203
        )
204

205
        res = self.read_typed_value(type_list)
4✔
206

207
        return res
4✔
208

209
    def read_typed_value(self, type_list: List[int]) -> Constant:
4✔
210
        typed_reader = self.construct_typed_reader(type_list)
4✔
211

212
        assert len(type_list) == 0, "Did not consume all type parameters"
4✔
213

214
        return typed_reader()
4✔
215

216
    def sample_value(self, type_list: List[int]):
4✔
217
        typ = type_list.pop(0)
4✔
218
        if typ == 0:
4✔
219
            return BuiltinInteger(0)
4✔
220
        elif typ == 1:
4✔
221
            return BuiltinByteString(b"")
4✔
222
        elif typ == 2:
4✔
223
            return BuiltinString("")
4✔
224
        elif typ == 3:
4✔
225
            return BuiltinUnit()
4✔
226
        elif typ == 4:
4✔
227
            return BuiltinBool(False)
4✔
228
        elif typ in (5, 6):
4✔
229
            raise ValueError("unexpected type tag without type application")
×
230
        elif typ == 7:
4✔
231
            container_type = type_list.pop(0)
4✔
232
            if container_type == 5:
4✔
233
                list_type = self.sample_value(type_list)
4✔
234
                return BuiltinList([], list_type)
4✔
235
            else:
236
                assert container_type == 7, "Unexpected type tag"
4✔
237
                container_type = type_list.pop(0)
4✔
238
                if container_type == 6:
4✔
239
                    return BuiltinPair(
4✔
240
                        self.sample_value(type_list), self.sample_value(type_list)
241
                    )
242
        elif typ == 8:
4✔
243
            return PlutusInteger(0)
4✔
244
        else:
245
            raise ValueError(f"unhandled constant type {typ}")
×
246

247
    def construct_typed_reader(self, type_list: List[int]) -> Callable[[], Constant]:
4✔
248
        typ = type_list.pop(0)
4✔
249

250
        if typ == 0:
4✔
251
            return lambda: BuiltinInteger(self.read_integer(signed=True))
4✔
252
        elif typ == 1:
4✔
253
            return lambda: self.read_builtin_byte_string()
4✔
254
        elif typ == 2:
4✔
255
            return lambda: self.read_builtin_string()
4✔
256
        elif typ == 3:
4✔
257
            return lambda: BuiltinUnit()
4✔
258
        elif typ == 4:
4✔
259
            return lambda: BuiltinBool(self.read_bit() == 1)
4✔
260
        elif typ in (5, 6):
4✔
261
            raise ValueError("unexpected type tag without type application")
×
262
        elif typ == 7:
4✔
263
            container_type = type_list.pop(0)
4✔
264
            if container_type == 5:
4✔
265
                list_type = self.sample_value(type_list.copy())
4✔
266
                type_reader = self.construct_typed_reader(type_list)
4✔
267

268
                return lambda: BuiltinList(self.read_list(type_reader), list_type)
4✔
269
            else:
270
                assert container_type == 7, "Unexpected type tag"
4✔
271
                container_type = type_list.pop(0)
4✔
272
                if container_type == 6:
4✔
273
                    left_reader = self.construct_typed_reader(type_list)
4✔
274
                    right_reader = self.construct_typed_reader(type_list)
4✔
275
                    return lambda: BuiltinPair(left_reader(), right_reader())
4✔
276
                else:
277
                    raise ValueError(f"unhandled container type {container_type}")
×
278
        elif typ == 8:
4✔
279
            return lambda: self.read_data()
4✔
280
        else:
281
            raise ValueError(f"unhandled constant type {typ}")
×
282

283
    def read_delay(self) -> Delay:
4✔
284
        expr = self.read_term()
4✔
285

286
        return Delay(expr)
4✔
287

288
    def read_force(self) -> Force:
4✔
289
        expr = self.read_term()
4✔
290

291
        return Force(expr)
4✔
292

293
    def read_builtin(self) -> BuiltIn:
4✔
294
        id = self.read_tag("builtin")
4✔
295

296
        builtin = self.built_in_fun(id)
4✔
297

298
        return BuiltIn(builtin)
4✔
299

300
    def finalize(self):
4✔
301
        self.move_to_byte_boundary(True)
4✔
302

303
    def read_bits(self, num: int) -> str:
4✔
304
        bits = self._bits[self._pos : self._pos + num]
4✔
305
        self._pos += num
4✔
306
        return bits
4✔
307

308
    def read_fixed_width_integer(self, width: int) -> int:
4✔
309
        return int(self.read_bits(width), 2)
4✔
310

311
    def read_tag(self, name: str) -> int:
4✔
312
        return self.read_fixed_width_integer(self.tag_width(name))
4✔
313

314
    def read_bit(self) -> int:
4✔
315
        return self.read_fixed_width_integer(1)
4✔
316

317
    def read_byte(self) -> int:
4✔
318
        return self.read_fixed_width_integer(8)
4✔
319

320
    def read_program(self) -> Program:
4✔
321
        version = (
4✔
322
            self.read_integer(signed=False),
323
            self.read_integer(signed=False),
324
            self.read_integer(signed=False),
325
        )
326

327
        expr = self.read_term()
4✔
328

329
        self.finalize()
4✔
330

331
        return Program(version, expr)
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc