• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OpShin / uplc / 18317963532

07 Oct 2025 03:34PM UTC coverage: 94.573% (+0.3%) from 94.287%
18317963532

push

github

web-flow
Merge pull request #40 from OpShin/feat/plutus-v3-builtins

Add Plutus V3 support

477 of 497 new or added lines in 9 files covered. (95.98%)

17 existing lines in 4 files now uncovered.

2666 of 2819 relevant lines covered (94.57%)

4.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

96.33
/uplc/flat_decoder.py
1
from ast import NodeVisitor
5✔
2
from typing import Callable
5✔
3

4
from .ast import *
5✔
5
from .parser import PLUTUS_V3
5✔
6
from .transformer.plutus_version_enforcer import UnsupportedTerm
5✔
7

8
UPLC_TAG_WIDTHS = {
5✔
9
    "term": 4,
10
    "type": 3,
11
    "constType": 4,
12
    "builtin": 7,
13
    "constant": 4,
14
    "kind": 1,
15
}
16

17

18
def parse_raw_byte(b):
5✔
19
    """
20
    Parses a single byte in the Plutus-core byte-list representation of an int
21
    :param b: int
22
    :return: int
23
    """
24
    return b & 0b01111111
5✔
25

26

27
def raw_byte_is_last(b):
5✔
28
    """
29
    Returns true if 'b' is the last byte in the Plutus-core byte-list representation of an int.
30
    :param b: int
31
    :return: bool
32
    """
33
    return (b & 0b10000000) == 0
5✔
34

35

36
def bytes_to_int(bytes):
5✔
37
    """
38
    Combines a list of Plutus-core bytes into a int (leading bit of each byte is ignored).
39
    Differs from int.from_bytes because only 7 bits are used from each byte.
40
    :param bytes: list[int]
41
    :return: int
42
    """
43
    value = 0
5✔
44

45
    n = len(bytes)
5✔
46

47
    for i in range(n):
5✔
48
        b = bytes[i]
5✔
49

50
        # 7 (not 8), because leading bit isn't used here
51
        value += b * (2 ** (i * 7))
5✔
52

53
    return value
5✔
54

55

56
def unzigzag(value: int, signed: bool):
5✔
57
    """
58
    Unapplies zigzag encoding
59
    :return: int
60
    """
61
    if not signed:
5✔
62
        return value
5✔
63
    else:
64
        if value % 2 == 0:
5✔
65
            return value // 2
5✔
66
        else:
67
            return -((value + 1) // 2)
5✔
68

69

70
class UplcDeserializer:
5✔
71
    def __init__(self, bits: str):
5✔
72
        self._bits = bits
5✔
73
        self._pos = 0
5✔
74
        self._version = None
5✔
75

76
    def tag_width(self, category: str) -> int:
5✔
77
        assert category in UPLC_TAG_WIDTHS, f"unknown tag category {category}"
5✔
78

79
        return UPLC_TAG_WIDTHS[category]
5✔
80

81
    def built_in_fun(self, id: int) -> BuiltInFun:
5✔
82
        return BuiltInFun(id)
5✔
83

84
    def read_linked_fixed_width_integer_list(self, elem_size: int) -> List[int]:
5✔
85
        nil_or_cons = self.read_bit()
5✔
86

87
        if nil_or_cons == 0:
5✔
88
            return []
5✔
89
        else:
90
            elem = self.read_fixed_width_integer(elem_size)
5✔
91
            return [elem] + self.read_linked_fixed_width_integer_list(elem_size)
5✔
92

93
    def read_term(self) -> AST:
5✔
94
        tag = self.read_tag("term")
5✔
95

96
        if tag == 0:
5✔
97
            return self.read_variable()
5✔
98
        elif tag == 1:
5✔
99
            return self.read_delay()
5✔
100
        elif tag == 2:
5✔
101
            return self.read_lambda()
5✔
102
        elif tag == 3:
5✔
103
            return self.read_apply()
5✔
104
        elif tag == 4:
5✔
105
            return self.read_constant()
5✔
106
        elif tag == 5:
5✔
107
            return self.read_force()
5✔
108
        elif tag == 6:
5✔
109
            return Error()
5✔
110
        elif tag == 7:
5✔
111
            return self.read_builtin()
5✔
112
        elif tag == 8:
5✔
113
            return self.read_constr()
5✔
114
        elif tag == 9:
5✔
115
            return self.read_case()
5✔
116
        else:
117
            raise ValueError(f"term tag {tag} unhandled")
×
118

119
    def read_integer(self, signed: bool = False) -> int:
5✔
120
        byts = []
5✔
121

122
        b = self.read_byte()
5✔
123
        byts.append(b)
5✔
124

125
        while not raw_byte_is_last(b):
5✔
126
            b = self.read_byte()
5✔
127
            byts.append(b)
5✔
128

129
        res = bytes_to_int([parse_raw_byte(b) for b in byts])
5✔
130

131
        res = unzigzag(res, signed)
5✔
132

133
        return res
5✔
134

135
    def move_to_byte_boundary(self, force=False):
5✔
136
        """
137
        Moves position to the next byte boundary.
138

139
        Args:
140
            force (bool): If True, move to the next byte boundary even if already at one.
141

142
        Returns:
143
            None
144
        """
145
        if self._pos % 8 != 0:
5✔
146
            n = 8 - self._pos % 8
5✔
147
            self.read_bits(n)
5✔
148
        elif force:
5✔
149
            self.read_bits(8)
5✔
150

151
    def read_bytes(self) -> bytes:
5✔
152
        self.move_to_byte_boundary(True)
5✔
153

154
        byts = []
5✔
155

156
        n_chunk = self.read_byte()
5✔
157

158
        while n_chunk > 0:
5✔
159
            for _ in range(n_chunk):
5✔
160
                byts.append(self.read_byte())
5✔
161

162
            n_chunk = self.read_byte()
5✔
163

164
        return bytes(byts)
5✔
165

166
    def read_builtin_byte_string(self) -> BuiltinByteString:
5✔
167
        byts = self.read_bytes()
5✔
168

169
        return BuiltinByteString(byts)
5✔
170

171
    def read_builtin_string(self) -> BuiltinString:
5✔
172
        byts = self.read_bytes()
5✔
173

174
        s = byts.decode("utf8")
5✔
175

176
        return BuiltinString(s)
5✔
177

178
    def read_list(self, typed_reader: Callable[[], Constant]) -> List[Constant]:
5✔
179
        items = []
5✔
180

181
        while self.read_bit() == 1:
5✔
182
            items.append(typed_reader())
5✔
183

184
        return items
5✔
185

186
    def read_data(self) -> PlutusData:
5✔
187
        byts = self.read_bytes()
5✔
188

189
        return data_from_cbor(byts)
5✔
190

191
    def read_variable(self) -> Variable:
5✔
192
        index = self.read_integer(signed=False)
5✔
193

194
        return Variable(str(index))
5✔
195

196
    def read_lambda(self) -> Lambda:
5✔
197
        rhs = self.read_term()
5✔
198

199
        return Lambda("_", rhs)
5✔
200

201
    def read_apply(self) -> Apply:
5✔
202
        a = self.read_term()
5✔
203
        b = self.read_term()
5✔
204

205
        return Apply(a, b)
5✔
206

207
    def read_constant(self) -> Constant:
5✔
208
        type_list = self.read_linked_fixed_width_integer_list(
5✔
209
            self.tag_width("constType")
210
        )
211

212
        res = self.read_typed_value(type_list)
5✔
213

214
        return res
5✔
215

216
    def read_typed_value(self, type_list: List[int]) -> Constant:
5✔
217
        typed_reader = self.construct_typed_reader(type_list)
5✔
218

219
        assert len(type_list) == 0, "Did not consume all type parameters"
5✔
220

221
        return typed_reader()
5✔
222

223
    def sample_value(self, type_list: List[int]):
5✔
224
        typ = type_list.pop(0)
5✔
225
        if typ == 0:
5✔
226
            return BuiltinInteger(0)
5✔
227
        elif typ == 1:
5✔
228
            return BuiltinByteString(b"")
5✔
229
        elif typ == 2:
5✔
230
            return BuiltinString("")
5✔
231
        elif typ == 3:
5✔
232
            return BuiltinUnit()
5✔
233
        elif typ == 4:
5✔
234
            return BuiltinBool(False)
5✔
235
        elif typ in (5, 6):
5✔
236
            raise ValueError("unexpected type tag without type application")
×
237
        elif typ == 7:
5✔
238
            container_type = type_list.pop(0)
5✔
239
            if container_type == 5:
5✔
240
                list_type = self.sample_value(type_list)
5✔
241
                return BuiltinList([], list_type)
5✔
242
            else:
243
                assert container_type == 7, "Unexpected type tag"
5✔
244
                container_type = type_list.pop(0)
5✔
245
                if container_type == 6:
5✔
246
                    return BuiltinPair(
5✔
247
                        self.sample_value(type_list), self.sample_value(type_list)
248
                    )
249
        elif typ == 8:
5✔
250
            return PlutusInteger(0)
5✔
251
        else:
252
            raise ValueError(f"unhandled constant type {typ}")
×
253

254
    def construct_typed_reader(self, type_list: List[int]) -> Callable[[], Constant]:
5✔
255
        typ = type_list.pop(0)
5✔
256

257
        if typ == 0:
5✔
258
            return lambda: BuiltinInteger(self.read_integer(signed=True))
5✔
259
        elif typ == 1:
5✔
260
            return lambda: self.read_builtin_byte_string()
5✔
261
        elif typ == 2:
5✔
262
            return lambda: self.read_builtin_string()
5✔
263
        elif typ == 3:
5✔
264
            return lambda: BuiltinUnit()
5✔
265
        elif typ == 4:
5✔
266
            return lambda: BuiltinBool(self.read_bit() == 1)
5✔
267
        elif typ in (5, 6):
5✔
268
            raise ValueError("unexpected type tag without type application")
×
269
        elif typ == 7:
5✔
270
            container_type = type_list.pop(0)
5✔
271
            if container_type == 5:
5✔
272
                list_type = self.sample_value(type_list.copy())
5✔
273
                type_reader = self.construct_typed_reader(type_list)
5✔
274

275
                return lambda: BuiltinList(self.read_list(type_reader), list_type)
5✔
276
            else:
277
                assert container_type == 7, "Unexpected type tag"
5✔
278
                container_type = type_list.pop(0)
5✔
279
                if container_type == 6:
5✔
280
                    left_reader = self.construct_typed_reader(type_list)
5✔
281
                    right_reader = self.construct_typed_reader(type_list)
5✔
282
                    return lambda: BuiltinPair(left_reader(), right_reader())
5✔
283
                else:
284
                    raise ValueError(f"unhandled container type {container_type}")
×
285
        elif typ == 8:
5✔
286
            return lambda: self.read_data()
5✔
287
        else:
288
            raise ValueError(f"unhandled constant type {typ}")
×
289

290
    def read_delay(self) -> Delay:
5✔
291
        expr = self.read_term()
5✔
292

293
        return Delay(expr)
5✔
294

295
    def read_force(self) -> Force:
5✔
296
        expr = self.read_term()
5✔
297

298
        return Force(expr)
5✔
299

300
    def read_builtin(self) -> BuiltIn:
5✔
301
        id = self.read_tag("builtin")
5✔
302

303
        builtin = self.built_in_fun(id)
5✔
304

305
        return BuiltIn(builtin)
5✔
306

307
    def read_constr(self) -> Constr:
5✔
308
        if self._version < PLUTUS_V3:
5✔
NEW
309
            raise UnsupportedTerm("Invalid term encoded (Constr in pre-PlutusV3)")
×
310
        # in theory limited to 64 bits
311
        id = self.read_integer(signed=False)
5✔
312

313
        fields = self.read_list(self.read_term)
5✔
314

315
        return Constr(id, fields)
5✔
316

317
    def read_case(self) -> Case:
5✔
318
        if self._version < PLUTUS_V3:
5✔
NEW
319
            raise UnsupportedTerm("Invalid term encoded (Case in pre-PlutusV3)")
×
320
        scrutinee = self.read_term()
5✔
321

322
        branches = self.read_list(self.read_term)
5✔
323

324
        return Case(scrutinee, branches)
5✔
325

326
    def finalize(self):
5✔
327
        self.move_to_byte_boundary(True)
5✔
328

329
    def read_bits(self, num: int) -> str:
5✔
330
        bits = self._bits[self._pos : self._pos + num]
5✔
331
        self._pos += num
5✔
332
        return bits
5✔
333

334
    def read_fixed_width_integer(self, width: int) -> int:
5✔
335
        return int(self.read_bits(width), 2)
5✔
336

337
    def read_tag(self, name: str) -> int:
5✔
338
        return self.read_fixed_width_integer(self.tag_width(name))
5✔
339

340
    def read_bit(self) -> int:
5✔
341
        return self.read_fixed_width_integer(1)
5✔
342

343
    def read_byte(self) -> int:
5✔
344
        return self.read_fixed_width_integer(8)
5✔
345

346
    def read_program(self) -> Program:
5✔
347
        version = (
5✔
348
            self.read_integer(signed=False),
349
            self.read_integer(signed=False),
350
            self.read_integer(signed=False),
351
        )
352
        self._version = version
5✔
353

354
        expr = self.read_term()
5✔
355

356
        self.finalize()
5✔
357

358
        return Program(version, expr)
5✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc