• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OpShin / opshin / 18500088700

14 Oct 2025 02:33PM UTC coverage: 90.329%. First build
18500088700

Pull #549

github

nielstron
Fix formatting
Pull Request #549: Plutus V3 support

1294 of 1584 branches covered (81.69%)

Branch coverage included in aggregate %.

273 of 354 new or added lines in 16 files covered. (77.12%)

4740 of 5096 relevant lines covered (93.01%)

4.64 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.7
/opshin/builder.py
1
import copy
5✔
2
import dataclasses
5✔
3
import enum
5✔
4
import functools
5✔
5
import json
5✔
6
import typing
5✔
7
from ast import Module
5✔
8
from typing import Optional, Any, Union
5✔
9
from pathlib import Path
5✔
10

11
from pycardano import IndefiniteList, PlutusData, Datum, PlutusV3Script
5✔
12

13
from . import __version__, compiler
5✔
14

15
import uplc.ast
5✔
16
from uplc import flatten, ast as uplc_ast, eval as uplc_eval
5✔
17
import cbor2
5✔
18
import pycardano
5✔
19
from pluthon import compile as plt_compile
5✔
20

21
from .util import datum_to_cbor
5✔
22
from .compiler_config import DEFAULT_CONFIG
5✔
23

24

25
class Purpose(enum.Enum):
5✔
26
    spending = "spending"
5✔
27
    minting = "minting"
5✔
28
    rewarding = "rewarding"
5✔
29
    certifying = "certifying"
5✔
30
    voting = "voting"
5✔
31
    proposing = "proposing"
5✔
32
    any = "any"
5✔
33

34

35
@dataclasses.dataclass
5✔
36
class PlutusContract:
5✔
37
    contract: PlutusV3Script
5✔
38
    datum_type: Optional[typing.Tuple[str, typing.Type[Datum]]] = None
5✔
39
    redeemer_type: Optional[typing.Tuple[str, typing.Type[Datum]]] = None
5✔
40
    parameter_types: typing.List[typing.Tuple[str, typing.Type[Datum]]] = (
5✔
41
        dataclasses.field(default_factory=list)
42
    )
43
    purpose: typing.Iterable[Purpose] = (Purpose.any,)
5✔
44
    version: Optional[str] = "1.0.0"
5✔
45
    title: str = "validator"
5✔
46
    description: Optional[str] = f"opshin {__version__} Smart Contract"
5✔
47
    license: Optional[str] = None
5✔
48

49
    @property
5✔
50
    def cbor(self) -> bytes:
5✔
51
        return self.contract
×
52

53
    @property
5✔
54
    def cbor_hex(self) -> str:
5✔
55
        return self.contract.hex()
5✔
56

57
    @property
5✔
58
    def script_hash(self):
5✔
59
        return pycardano.plutus_script_hash(self.contract)
5✔
60

61
    @property
5✔
62
    def policy_id(self):
5✔
63
        return self.script_hash.to_primitive().hex()
5✔
64

65
    @property
5✔
66
    def mainnet_addr(self):
5✔
67
        return pycardano.Address(self.script_hash, network=pycardano.Network.MAINNET)
5✔
68

69
    @property
5✔
70
    def testnet_addr(self):
5✔
71
        return pycardano.Address(self.script_hash, network=pycardano.Network.TESTNET)
5✔
72

73
    @property
5✔
74
    def plutus_json(self):
5✔
75
        return json.dumps(
5✔
76
            {
77
                "type": "PlutusScriptV3",
78
                "description": self.description,
79
                "cborHex": self.cbor_hex,
80
            },
81
            indent=2,
82
        )
83

84
    @property
5✔
85
    def blueprint(self):
5✔
86
        return {
5✔
87
            "$schema": "https://cips.cardano.org/cips/cip57/schemas/plutus-blueprint.json",
88
            "$id": "<insert-url-to-contract-here>",
89
            "$vocabulary": {
90
                "https://json-schema.org/draft/2020-12/vocab/core": True,
91
                "https://json-schema.org/draft/2020-12/vocab/applicator": True,
92
                "https://json-schema.org/draft/2020-12/vocab/validation": True,
93
                "https://cips.cardano.org/cips/cip57": True,
94
            },
95
            "preamble": {
96
                "version": self.version,
97
                "plutusVersion": "v3",
98
                "description": self.description,
99
                "title": self.title,
100
                **({"license": self.license} if self.license is not None else {}),
101
            },
102
            "validators": [
103
                {
104
                    "title": self.title,
105
                    **(
106
                        {
107
                            "datum": {
108
                                "title": self.datum_type[0],
109
                                "purpose": PURPOSE_MAP[Purpose.spending],
110
                                "schema": to_plutus_schema(self.datum_type[1]),
111
                            }
112
                        }
113
                        if self.datum_type is not None
114
                        else {}
115
                    ),
116
                    "redeemer": (
117
                        {
118
                            "title": self.redeemer_type[0],
119
                            "purpose": {
120
                                "oneOf": [PURPOSE_MAP[p] for p in self.purpose]
121
                            },
122
                            "schema": to_plutus_schema(self.redeemer_type[1]),
123
                        }
124
                        if self.redeemer_type is not None
125
                        else {}
126
                    ),
127
                    **(
128
                        {
129
                            "parameters": [
130
                                {
131
                                    "title": t[0],
132
                                    "purpose": PURPOSE_MAP[Purpose.spending],
133
                                    "schema": to_plutus_schema(t[1]),
134
                                }
135
                                for t in self.parameter_types
136
                            ]
137
                        }
138
                        if self.parameter_types
139
                        else {}
140
                    ),
141
                    "compiledCode": self.cbor_hex,
142
                    "hash": self.policy_id,
143
                },
144
            ],
145
        }
146

147
    def apply_parameter(self, *args: pycardano.Datum):
5✔
148
        """
149
        Returns a new OpShin Contract with the applied parameters
150
        """
151
        # update the parameters in the blueprint (remove applied parameters)
152
        assert len(self.parameter_types) >= len(
5✔
153
            args
154
        ), f"Applying too many parameters to contract, allowed amount: {self.parameter_types}, but got {len(args)}"
155
        new_parameter_types = copy.copy(self.parameter_types)
5✔
156
        for _ in args:
5✔
157
            # TODO validate that the applied parameters are of the correct type
158
            new_parameter_types.pop(0)
5✔
159
        new_contract_contract = apply_parameters(self.contract, *args)
5✔
160
        new_contract = PlutusContract(
5✔
161
            new_contract_contract,
162
            self.datum_type,
163
            self.redeemer_type,
164
            new_parameter_types,
165
            self.purpose,
166
            self.version,
167
            self.title,
168
            self.description,
169
        )
170
        return new_contract
5✔
171

172
    def dump(self, target_dir: Union[str, Path]):
5✔
173
        target_dir = Path(target_dir)
5✔
174
        target_dir.mkdir(exist_ok=True, parents=True)
5✔
175
        with (target_dir / "script.cbor").open("w") as fp:
5✔
176
            fp.write(self.cbor_hex)
5✔
177
        with (target_dir / "script.plutus").open("w") as fp:
5✔
178
            fp.write(self.plutus_json)
5✔
179
        with (target_dir / "script.policy_id").open("w") as fp:
5✔
180
            fp.write(self.policy_id)
5✔
181
        with (target_dir / "mainnet.addr").open("w") as fp:
5✔
182
            fp.write(self.mainnet_addr.encode())
5✔
183
        with (target_dir / "testnet.addr").open("w") as fp:
5✔
184
            fp.write(self.testnet_addr.encode())
5✔
185
        with (target_dir / "blueprint.json").open("w") as fp:
5✔
186
            json.dump(self.blueprint, fp, indent=2)
5✔
187

188

189
def compile(
5✔
190
    program: Module,
191
    contract_filename: Optional[str] = None,
192
    validator_function_name="validator",
193
    config=DEFAULT_CONFIG,
194
) -> uplc_ast.Program:
195
    code = compiler.compile(
5✔
196
        program,
197
        filename=contract_filename,
198
        validator_function_name=validator_function_name,
199
        config=config,
200
    )
201
    plt_code = plt_compile(code, config)
5✔
202
    return plt_code
5✔
203

204

205
@functools.lru_cache(maxsize=32)
5✔
206
def _static_compile(
5✔
207
    source_code: str,
208
    contract_file: str = "<unknown>",
209
    validator_function_name="validator",
210
    config=DEFAULT_CONFIG,
211
):
212
    """
213
    Expects a python module and returns the build artifacts from compiling it
214
    """
215

216
    source_ast = compiler.parse(source_code, filename=contract_file)
5✔
217
    code = compile(
5✔
218
        source_ast,
219
        contract_filename=contract_file,
220
        validator_function_name=validator_function_name,
221
        config=config,
222
    )
223
    return code
5✔
224

225

226
def _compile(
5✔
227
    source_code: str,
228
    *args: typing.Union[pycardano.Datum, uplc_ast.Constant],
229
    contract_file: str = "<unknown>",
230
    validator_function_name="validator",
231
    config=DEFAULT_CONFIG,
232
):
233
    """
234
    Expects a python module and returns the build artifacts from compiling it
235
    """
236

237
    code = _static_compile(
5✔
238
        source_code,
239
        contract_file=contract_file,
240
        validator_function_name=validator_function_name,
241
        config=config,
242
    )
243
    code = _apply_parameters(code, *args)
5✔
244
    return code
5✔
245

246

247
def build(
5✔
248
    contract_file: str,
249
    *args: typing.Union[pycardano.Datum, uplc_ast.Constant],
250
    validator_function_name="validator",
251
    config=DEFAULT_CONFIG,
252
):
253
    """
254
    Expects a python module and returns the build artifacts from compiling it
255
    """
256
    with open(contract_file) as f:
5✔
257
        source_code = f.read()
5✔
258
    code = _compile(
5✔
259
        source_code,
260
        *args,
261
        contract_file=contract_file,
262
        validator_function_name=validator_function_name,
263
        config=config,
264
    )
265
    return _build(code)
5✔
266

267

268
def _build(contract: uplc.ast.Program):
5✔
269
    # create cbor file for use with pycardano/lucid
270
    cbor = flatten(contract)
5✔
271
    return pycardano.PlutusV3Script(cbor)
5✔
272

273

274
PURPOSE_MAP = {
5✔
275
    Purpose.any: {"oneOf": ["spend", "mint", "withdraw", "publish", "vote", "propose"]},
276
    Purpose.spending: "spend",
277
    Purpose.minting: "mint",
278
    Purpose.rewarding: "withdraw",
279
    Purpose.certifying: "publish",
280
    Purpose.voting: "vote",
281
    Purpose.proposing: "propose",
282
}
283

284

285
def to_plutus_schema(cls: typing.Type[Datum]) -> dict:
5✔
286
    """
287
    Convert to a dictionary representing a json schema according to CIP 57 Plutus Blueprint
288
    Reference of the core structure:
289
    https://cips.cardano.org/cips/cip57/#corevocabulary
290

291
    Args:
292
        **kwargs: Extra key word arguments to be passed to `json.dumps()`
293

294
    Returns:
295
        dict: a dict representing the schema of this class.
296
    """
297
    if hasattr(cls, "__origin__") and cls.__origin__ is list:
5✔
298
        return {
5✔
299
            "dataType": "list",
300
            **(
301
                {"items": to_plutus_schema(cls.__args__[0])}
302
                if hasattr(cls, "__args__")
303
                else {}
304
            ),
305
        }
306
    elif hasattr(cls, "__origin__") and cls.__origin__ is dict:
5✔
307
        return {
5✔
308
            "dataType": "map",
309
            **(
310
                {
311
                    "keys": to_plutus_schema(cls.__args__[0]),
312
                    "values": to_plutus_schema(cls.__args__[1]),
313
                }
314
                if hasattr(cls, "__args__")
315
                else {}
316
            ),
317
        }
318
    elif hasattr(cls, "__origin__") and cls.__origin__ is Union:
5✔
319
        return {
5✔
320
            "anyOf": (
321
                [to_plutus_schema(t) for t in cls.__args__]
322
                if hasattr(cls, "__args__")
323
                else []
324
            )
325
        }
326
    elif issubclass(cls, PlutusData):
5✔
327
        fields = []
5✔
328
        for field_value in cls.__dataclass_fields__.values():
5✔
329
            if field_value.name == "CONSTR_ID":
5!
330
                continue
×
331
            field_schema = to_plutus_schema(field_value.type)
5✔
332
            field_schema["title"] = field_value.name
5✔
333
            fields.append(field_schema)
5✔
334
        return {
5✔
335
            "dataType": "constructor",
336
            "index": cls.CONSTR_ID,
337
            "fields": fields,
338
            "title": cls.__name__,
339
        }
340
    elif issubclass(cls, bytes):
5✔
341
        return {"dataType": "bytes"}
5✔
342
    elif issubclass(cls, int):
5✔
343
        return {"dataType": "integer"}
5✔
344
    elif issubclass(cls, IndefiniteList) or issubclass(cls, list):
5✔
345
        return {"dataType": "list"}
5✔
346
    else:
347
        return {}
5✔
348

349

350
def from_plutus_schema(schema: dict) -> typing.Type[pycardano.Datum]:
5✔
351
    """
352
    Convert from a dictionary representing a json schema according to CIP 57 Plutus Blueprint
353
    """
354
    if schema == {}:
5✔
355
        return pycardano.Datum
5✔
356
    if "anyOf" in schema:
5✔
357
        if len(schema["anyOf"]) == 0:
5!
358
            raise ValueError("Cannot convert empty anyOf schema")
×
359
        union_t = typing.Union[from_plutus_schema(schema["anyOf"][0])]
5✔
360
        for s in schema["anyOf"][1:]:
5✔
361
            union_t = typing.Union[union_t, from_plutus_schema(s)]
5✔
362
        return union_t
5✔
363
    if "dataType" in schema:
5!
364
        typ = schema["dataType"]
5✔
365
        if typ == "bytes":
5✔
366
            return bytes
5✔
367
        elif typ == "integer":
5✔
368
            return int
5✔
369
        elif typ == "list":
5✔
370
            if "items" in schema:
5✔
371
                return typing.List[from_plutus_schema(schema["items"])]
5✔
372
            else:
373
                return typing.List[pycardano.Datum]
5✔
374
        elif typ == "map":
5✔
375
            key_t = (
5✔
376
                from_plutus_schema(schema["keys"])
377
                if "keys" in schema
378
                else pycardano.Datum
379
            )
380
            value_t = (
5✔
381
                from_plutus_schema(schema["values"])
382
                if "values" in schema
383
                else pycardano.Datum
384
            )
385
            return typing.Dict[key_t, value_t]
5✔
386
        elif typ == "constructor":
5!
387
            fields = {}
5✔
388
            for field in schema["fields"]:
5✔
389
                fields[field["title"]] = from_plutus_schema(field)
5✔
390
            fields["CONSTR_ID"] = schema["index"]
5✔
391
            return dataclasses.dataclass(
5✔
392
                type(schema["title"], (pycardano.PlutusData,), fields)
393
            )
394
    raise ValueError(f"Cannot read schema (not supported yet) {schema}")
×
395

396

397
def apply_parameters(script: PlutusV3Script, *args: pycardano.Datum):
5✔
398
    """
399
    Expects a plutus script (compiled) and returns the compiled script from applying parameters to it
400
    """
401
    return _build(_apply_parameters(uplc.unflatten(script), *args))
5✔
402

403

404
def _apply_parameters(script: uplc.ast.Program, *args: pycardano.Datum):
5✔
405
    """
406
    Expects a UPLC program and returns the build artifacts from applying parameters to it
407
    """
408
    # apply parameters from the command line to the contract (instantiates parameterized contract!)
409
    code = script.term
5✔
410
    # UPLC lambdas may only take one argument at a time, so we evaluate by repeatedly applying
411
    for d in args:
5✔
412
        code = uplc.ast.Apply(
5✔
413
            code,
414
            (
415
                uplc.ast.data_from_cbor(datum_to_cbor(d))
416
                if not isinstance(d, uplc_ast.Constant)
417
                else d
418
            ),
419
        )
420
    code = uplc.ast.Program((1, 0, 0), code)
5✔
421
    return code
5✔
422

423

424
def load(contract_path: Union[Path, str]) -> PlutusContract:
5✔
425
    """
426
    Load a contract from a file or directory and generate the artifacts
427
    """
428
    if isinstance(contract_path, str):
5!
429
        contract_path = Path(contract_path)
5✔
430
    if contract_path.is_dir():
5!
431
        contract_candidates = list(contract_path.iterdir())
5✔
432
    elif contract_path.is_file():
×
433
        contract_candidates = [contract_path]
×
434
    else:
435
        raise ValueError(
×
436
            f"Invalid contract path, is neither file nor directory: {contract_path}"
437
        )
438
    for contract_file in contract_candidates:
5!
439
        with contract_file.open("r") as f:
5✔
440
            contract_content = f.read()
5✔
441
        # could be a plutus blueprint
442
        try:
5✔
443
            contract = json.loads(contract_content)
5✔
444
            if "validators" in contract:
5✔
445
                assert len(contract["validators"]) == 1, "Only one validator supported"
5✔
446
                validator = contract["validators"][0]
5✔
447
                # TODO this should be controlled by the version in the preamble
448
                contract_cbor = PlutusV3Script(bytes.fromhex(validator["compiledCode"]))
5✔
449
                datum_type = (
5✔
450
                    validator["datum"]["title"],
451
                    from_plutus_schema(validator["datum"]["schema"]),
452
                )
453
                redeemer_type = (
5✔
454
                    validator["redeemer"]["title"],
455
                    from_plutus_schema(validator["redeemer"]["schema"]),
456
                )
457
                parameter_types = [
5✔
458
                    (p["title"], from_plutus_schema(p["schema"]))
459
                    for p in validator["parameters"]
460
                ]
461
                if "oneOf" in validator["redeemer"]["purpose"]:
5!
462
                    purpose = [
5✔
463
                        k
464
                        for k, v in PURPOSE_MAP.items()
465
                        if v in validator["redeemer"]["purpose"]["oneOf"]
466
                    ]
467
                else:
468
                    purpose = [
×
469
                        k
470
                        for k, v in PURPOSE_MAP.items()
471
                        if v == validator["redeemer"]["purpose"]
472
                    ]
473
                version = contract["preamble"].get("version")
5✔
474
                title = contract["preamble"].get("title")
5✔
475
                description = contract["preamble"].get("description")
5✔
476
                license = contract["preamble"].get("license")
5✔
477
                assert (
5✔
478
                    contract["preamble"].get("plutusVersion") == "v3"
479
                ), "Only Plutus V3 supported"
480
                return PlutusContract(
5✔
481
                    contract_cbor,
482
                    datum_type,
483
                    redeemer_type,
484
                    parameter_types,
485
                    purpose,
486
                    version,
487
                    title,
488
                    description,
489
                    license,
490
                )
491
        except (ValueError, KeyError) as e:
5✔
492
            pass
5✔
493
    contract_cbor = None
×
494
    for contract_file in contract_candidates:
×
495
        with contract_file.open("r") as f:
×
496
            contract_content = f.read()
×
497
        # could be a singly wrapped cbor hex
498
        try:
×
499
            # try to unwrap to see if it is cbor
500
            contract_cbor_unwrapped = cbor2.loads(bytes.fromhex(contract_content))
×
501
            contract_cbor = bytes.fromhex(contract_content)
×
502
            # if we can unwrap again, its doubly wrapped
503
            try:
×
504
                cbor2.loads(contract_cbor_unwrapped)
×
505
                contract_cbor = contract_cbor_unwrapped
×
506
            except ValueError:
×
507
                pass
×
508
            break
×
509
        except ValueError:
×
510
            pass
×
511
        # could be a plutus json
512
        try:
×
513
            contract = json.loads(contract_content)
×
514
            contract_cbor = cbor2.loads(bytes.fromhex(contract["cborHex"]))
×
515
        except (ValueError, KeyError):
×
516
            pass
×
517
        # could be uplc
518
        try:
×
519
            contract_ast = uplc.parse(contract_content)
×
520
            contract_cbor = uplc.flatten(contract_ast)
×
521
        except:
×
522
            pass
×
523
    if contract_cbor is None:
×
524
        raise ValueError(f"Could not load contract from file {contract_path}")
×
NEW
525
    return PlutusContract(PlutusV3Script(contract_cbor))
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc