• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OpShin / opshin / #609210982

05 Sep 2023 10:05AM UTC coverage: 92.999%. First build
#609210982

Pull #253

travis-ci

Pull Request #253: Add code to apply UPLC parameters to a contract

10 of 10 new or added lines in 1 file covered. (100.0%)

5486 of 5899 relevant lines covered (93.0%)

2.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.33
/opshin/builder.py
1
import copy
3✔
2
import dataclasses
3✔
3
import enum
4
import functools
3✔
5
import inspect
6
import json
3✔
7
import typing
8
from ast import Module
3✔
9
from typing import Optional, Any, Union
3✔
10
from pathlib import Path
3✔
11

3✔
12
from pycardano import IndefiniteList, PlutusData, Datum, PlutusV3Script
13

3✔
14
from . import __version__, compiler
15

16
import uplc.ast
3✔
17
from uplc import flatten, ast as uplc_ast, eval as uplc_eval
3✔
18
import cbor2
3✔
19
import pycardano
3✔
20
from pluthon import compile as plt_compile
3✔
21

3✔
22
from .util import datum_to_cbor, get_class_annotations, set_class_annotations
3✔
23
from .compiler_config import DEFAULT_CONFIG
24

25

3✔
26
class Purpose(enum.Enum):
27
    spending = "spending"
28
    minting = "minting"
29
    rewarding = "rewarding"
30
    certifying = "certifying"
31
    voting = "voting"
32
    proposing = "proposing"
33
    any = "any"
34

×
35

×
36
@dataclasses.dataclass
37
class PlutusContract:
×
38
    contract: PlutusV3Script
×
39
    datum_type: Optional[typing.Tuple[str, typing.Type[Datum]]] = None
40
    redeemer_type: Optional[typing.Tuple[str, typing.Type[Datum]]] = None
41
    parameter_types: typing.List[typing.Tuple[str, typing.Type[Datum]]] = (
×
42
        dataclasses.field(default_factory=list)
43
    )
×
44
    purpose: typing.Iterable[Purpose] = (Purpose.any,)
45
    version: Optional[str] = "1.0.0"
46
    title: str = "validator"
3✔
47
    description: Optional[str] = f"opshin {__version__} Smart Contract"
48
    license: Optional[str] = None
3✔
49

3✔
50
    @property
51
    def cbor(self) -> bytes:
52
        return self.contract
3✔
53

3✔
54
    @property
55
    def cbor_hex(self) -> str:
3✔
56
        return self.contract.hex()
3✔
57

58
    @property
3✔
59
    def script_hash(self):
60
        return pycardano.plutus_script_hash(self.contract)
61

62
    @property
63
    def policy_id(self):
3✔
64
        return self.script_hash.to_primitive().hex()
3✔
65

3✔
66
    @property
67
    def mainnet_addr(self):
3✔
68
        return pycardano.Address(self.script_hash, network=pycardano.Network.MAINNET)
69

70
    @property
71
    def testnet_addr(self):
3✔
72
        return pycardano.Address(self.script_hash, network=pycardano.Network.TESTNET)
73

74
    @property
3✔
75
    def plutus_json(self):
76
        return json.dumps(
77
            {
78
                "type": "PlutusScriptV3",
79
                "description": self.description,
80
                "cborHex": cbor2.dumps(bytes.fromhex(self.cbor_hex)).hex(),
81
            },
82
            indent=2,
83
        )
3✔
84

85
    @property
86
    def blueprint(self):
87
        return {
×
88
            "$schema": "https://cips.cardano.org/cips/cip57/schemas/plutus-blueprint.json",
89
            "$id": "<insert-url-to-contract-here>",
90
            "$vocabulary": {
3✔
91
                "https://json-schema.org/draft/2020-12/vocab/core": True,
92
                "https://json-schema.org/draft/2020-12/vocab/applicator": True,
93
                "https://json-schema.org/draft/2020-12/vocab/validation": True,
94
                "https://cips.cardano.org/cips/cip57": True,
95
            },
×
96
            "preamble": {
97
                "version": self.version,
×
98
                "plutusVersion": "v3",
×
99
                "description": self.description,
×
100
                "title": self.title,
×
101
                **({"license": self.license} if self.license is not None else {}),
102
            },
103
            "validators": [
104
                {
105
                    "title": self.title,
106
                    **(
107
                        {
108
                            "datum": {
109
                                "title": self.datum_type[0],
110
                                "purpose": PURPOSE_MAP[Purpose.spending],
111
                                "schema": to_plutus_schema(self.datum_type[1]),
112
                            }
113
                        }
114
                        if self.datum_type is not None
115
                        else {}
116
                    ),
117
                    "redeemer": (
118
                        {
119
                            "title": self.redeemer_type[0],
120
                            "purpose": {
121
                                "oneOf": [PURPOSE_MAP[p] for p in self.purpose]
122
                            },
123
                            "schema": to_plutus_schema(self.redeemer_type[1]),
124
                        }
125
                        if self.redeemer_type is not None
126
                        else {}
127
                    ),
128
                    **(
129
                        {
130
                            "parameters": [
131
                                {
132
                                    "title": t[0],
133
                                    "purpose": PURPOSE_MAP[Purpose.spending],
134
                                    "schema": to_plutus_schema(t[1]),
135
                                }
136
                                for t in self.parameter_types
137
                            ]
138
                        }
139
                        if self.parameter_types
140
                        else {}
141
                    ),
142
                    "compiledCode": self.cbor_hex,
143
                    "hash": self.policy_id,
144
                },
145
            ],
146
        }
147

148
    def apply_parameter(self, *args: pycardano.Datum):
149
        """
150
        Returns a new OpShin Contract with the applied parameters
151
        """
152
        # update the parameters in the blueprint (remove applied parameters)
153
        assert len(self.parameter_types) >= len(
154
            args
155
        ), f"Applying too many parameters to contract, allowed amount: {self.parameter_types}, but got {len(args)}"
156
        new_parameter_types = copy.copy(self.parameter_types)
157
        for _ in args:
158
            # TODO validate that the applied parameters are of the correct type
159
            new_parameter_types.pop(0)
160
        new_contract_contract = apply_parameters(self.contract, *args)
161
        new_contract = PlutusContract(
162
            new_contract_contract,
163
            self.datum_type,
164
            self.redeemer_type,
165
            new_parameter_types,
166
            self.purpose,
167
            self.version,
168
            self.title,
169
            self.description,
170
        )
171
        return new_contract
172

173
    def dump(self, target_dir: Union[str, Path]):
174
        target_dir = Path(target_dir)
175
        target_dir.mkdir(exist_ok=True, parents=True)
176
        with (target_dir / "script.cbor").open("w") as fp:
177
            fp.write(self.cbor_hex)
178
        with (target_dir / "script.plutus").open("w") as fp:
179
            fp.write(self.plutus_json)
180
        with (target_dir / "script.policy_id").open("w") as fp:
181
            fp.write(self.policy_id)
182
        with (target_dir / "mainnet.addr").open("w") as fp:
183
            fp.write(self.mainnet_addr.encode())
184
        with (target_dir / "testnet.addr").open("w") as fp:
185
            fp.write(self.testnet_addr.encode())
186
        with (target_dir / "blueprint.json").open("w") as fp:
187
            json.dump(self.blueprint, fp, indent=2)
188

189

190
def compile(
191
    program: Module,
192
    contract_filename: Optional[str] = None,
193
    validator_function_name="validator",
194
    config=DEFAULT_CONFIG,
195
) -> uplc_ast.Program:
196
    code = compiler.compile(
197
        program,
198
        filename=contract_filename,
199
        validator_function_name=validator_function_name,
200
        config=config,
201
    )
202
    plt_code = plt_compile(code, config)
203
    return plt_code
204

205

206
@functools.lru_cache(maxsize=32)
207
def _static_compile(
208
    source_code: str,
209
    contract_file: str = "<unknown>",
210
    validator_function_name="validator",
211
    config=DEFAULT_CONFIG,
212
):
213
    """
214
    Expects a python module and returns the build artifacts from compiling it
215
    """
216

217
    source_ast = compiler.parse(source_code, filename=contract_file)
218
    code = compile(
219
        source_ast,
220
        contract_filename=contract_file,
221
        validator_function_name=validator_function_name,
222
        config=config,
223
    )
224
    return code
225

226

227
def _compile(
228
    source_code: str,
229
    *args: typing.Union[pycardano.Datum, uplc_ast.Constant],
230
    contract_file: str = "<unknown>",
231
    validator_function_name="validator",
232
    config=DEFAULT_CONFIG,
233
):
234
    """
235
    Expects a python module and returns the build artifacts from compiling it
236
    """
237

238
    code = _static_compile(
239
        source_code,
240
        contract_file=contract_file,
241
        validator_function_name=validator_function_name,
242
        config=config,
243
    )
244
    code = _apply_parameters(code, *args)
245
    return code
246

247

248
def build(
249
    contract_file: str,
250
    *args: typing.Union[pycardano.Datum, uplc_ast.Constant],
251
    validator_function_name="validator",
252
    config=DEFAULT_CONFIG,
253
):
254
    """
255
    Expects a python module and returns the build artifacts from compiling it
256
    """
257
    with open(contract_file) as f:
258
        source_code = f.read()
259
    code = _compile(
260
        source_code,
261
        *args,
262
        contract_file=contract_file,
263
        validator_function_name=validator_function_name,
264
        config=config,
265
    )
266
    return _build(code)
267

268

269
def _build(contract: uplc.ast.Program):
270
    # create cbor file for use with pycardano/lucid
271
    cbor = flatten(contract)
272
    return pycardano.PlutusV3Script(cbor)
273

274

275
PURPOSE_MAP = {
276
    Purpose.any: {"oneOf": ["spend", "mint", "withdraw", "publish", "vote", "propose"]},
277
    Purpose.spending: "spend",
278
    Purpose.minting: "mint",
279
    Purpose.rewarding: "withdraw",
280
    Purpose.certifying: "publish",
281
    Purpose.voting: "vote",
282
    Purpose.proposing: "propose",
283
}
284

285

286
def to_plutus_schema(cls: typing.Type[Datum]) -> dict:
287
    """
288
    Convert to a dictionary representing a json schema according to CIP 57 Plutus Blueprint
289
    Reference of the core structure:
290
    https://cips.cardano.org/cips/cip57/#corevocabulary
291

292
    Args:
293
        **kwargs: Extra key word arguments to be passed to `json.dumps()`
294

295
    Returns:
296
        dict: a dict representing the schema of this class.
297
    """
298
    if cls == Datum:
299
        return {}
300
    if hasattr(cls, "__origin__") and cls.__origin__ is list:
301
        return {
302
            "dataType": "list",
303
            **(
304
                {"items": to_plutus_schema(cls.__args__[0])}
305
                if hasattr(cls, "__args__")
306
                else {}
307
            ),
308
        }
309
    elif hasattr(cls, "__origin__") and cls.__origin__ is dict:
310
        return {
311
            "dataType": "map",
312
            **(
313
                {
314
                    "keys": to_plutus_schema(cls.__args__[0]),
315
                    "values": to_plutus_schema(cls.__args__[1]),
316
                }
317
                if hasattr(cls, "__args__")
318
                else {}
319
            ),
320
        }
321
    elif hasattr(cls, "__origin__") and cls.__origin__ is Union:
322
        return {
323
            "anyOf": (
324
                [to_plutus_schema(t) for t in cls.__args__]
325
                if hasattr(cls, "__args__")
326
                else []
327
            )
328
        }
329
    elif issubclass(cls, PlutusData):
330
        fields = []
331
        for field in get_class_annotations(cls).items():
332
            field_name, field_value = field
333
            field_schema = to_plutus_schema(field_value)
334
            field_schema["title"] = field_name
335
            fields.append(field_schema)
336
        return {
337
            "dataType": "constructor",
338
            "index": cls.CONSTR_ID,
339
            "fields": fields,
340
            "title": cls.__name__,
341
        }
342
    elif issubclass(cls, bytes):
343
        return {"dataType": "bytes"}
344
    elif issubclass(cls, int):
345
        return {"dataType": "integer"}
346
    elif issubclass(cls, IndefiniteList) or issubclass(cls, list):
347
        return {"dataType": "list"}
348
    else:
349
        return {}
350

351

352
def from_plutus_schema(schema: dict) -> typing.Type[pycardano.Datum]:
353
    """
354
    Convert from a dictionary representing a json schema according to CIP 57 Plutus Blueprint
355
    """
356
    if schema == {} or set(schema.keys()) == {"title"}:
357
        # CIP57 allows eliding type metadata for unconstrained Datum fields
358
        return pycardano.Datum
359
    if "anyOf" in schema:
360
        if len(schema["anyOf"]) == 0:
361
            raise ValueError("Cannot convert empty anyOf schema")
362
        union_t = typing.Union[from_plutus_schema(schema["anyOf"][0])]
363
        for s in schema["anyOf"][1:]:
364
            union_t = typing.Union[union_t, from_plutus_schema(s)]
365
        return union_t
366
    if "dataType" in schema:
367
        typ = schema["dataType"]
368
        if typ == "bytes":
369
            return bytes
370
        elif typ == "integer":
371
            return int
372
        elif typ == "list":
373
            if "items" in schema:
374
                return typing.List[from_plutus_schema(schema["items"])]
375
            else:
376
                return typing.List[pycardano.Datum]
377
        elif typ == "map":
378
            key_t = (
379
                from_plutus_schema(schema["keys"])
380
                if "keys" in schema
381
                else pycardano.Datum
382
            )
383
            value_t = (
384
                from_plutus_schema(schema["values"])
385
                if "values" in schema
386
                else pycardano.Datum
387
            )
388
            return typing.Dict[key_t, value_t]
389
        elif typ == "constructor":
390
            fields = {}
391
            fields["CONSTR_ID"] = schema["index"]
392
            cls = type(schema["title"], (pycardano.PlutusData,), fields)
393
            for field in schema["fields"]:
394
                set_class_annotations(cls, field["title"], from_plutus_schema(field))
395
            cls = dataclasses.dataclass(cls)
396
            return cls
397
    raise ValueError(f"Cannot read schema (not supported yet) {schema}")
398

399

400
def apply_parameters(script: PlutusV3Script, *args: pycardano.Datum):
401
    """
402
    Expects a plutus script (compiled) and returns the compiled script from applying parameters to it
403
    """
404
    return _build(_apply_parameters(uplc.unflatten(script), *args))
405

406

407
def _apply_parameters(script: uplc.ast.Program, *args: pycardano.Datum):
408
    """
409
    Expects a UPLC program and returns the build artifacts from applying parameters to it
410
    """
411
    # apply parameters from the command line to the contract (instantiates parameterized contract!)
412
    code = script.term
413
    # UPLC lambdas may only take one argument at a time, so we evaluate by repeatedly applying
414
    for d in args:
415
        code = uplc.ast.Apply(
416
            code,
417
            (
418
                uplc.ast.data_from_cbor(datum_to_cbor(d))
419
                if not isinstance(d, uplc_ast.Constant)
420
                else d
421
            ),
422
        )
423
    code = uplc.ast.Program((1, 0, 0), code)
424
    return code
425

426

427
def load(contract_path: Union[Path, str]) -> PlutusContract:
428
    """
429
    Load a contract from a file or directory and generate the artifacts
430
    """
431
    if isinstance(contract_path, str):
432
        contract_path = Path(contract_path)
433
    if contract_path.is_dir():
434
        contract_candidates = list(contract_path.iterdir())
435
    elif contract_path.is_file():
436
        contract_candidates = [contract_path]
437
    else:
438
        raise ValueError(
439
            f"Invalid contract path, is neither file nor directory: {contract_path}"
440
        )
441
    for contract_file in contract_candidates:
442
        with contract_file.open("r") as f:
443
            contract_content = f.read()
444
        # could be a plutus blueprint
445
        try:
446
            contract = json.loads(contract_content)
447
            if "validators" in contract:
448
                assert len(contract["validators"]) == 1, "Only one validator supported"
449
                validator = contract["validators"][0]
450
                # TODO this should be controlled by the version in the preamble
451
                contract_cbor = PlutusV3Script(bytes.fromhex(validator["compiledCode"]))
452
                datum_type = (
453
                    validator["datum"]["title"],
454
                    from_plutus_schema(validator["datum"]["schema"]),
455
                )
456
                redeemer_type = (
457
                    validator["redeemer"]["title"],
458
                    from_plutus_schema(validator["redeemer"]["schema"]),
459
                )
460
                parameter_types = [
461
                    (p["title"], from_plutus_schema(p["schema"]))
462
                    for p in validator["parameters"]
463
                ]
464
                if "oneOf" in validator["redeemer"]["purpose"]:
465
                    purpose = [
466
                        k
467
                        for k, v in PURPOSE_MAP.items()
468
                        if v in validator["redeemer"]["purpose"]["oneOf"]
469
                    ]
470
                else:
471
                    purpose = [
472
                        k
473
                        for k, v in PURPOSE_MAP.items()
474
                        if v == validator["redeemer"]["purpose"]
475
                    ]
476
                version = contract["preamble"].get("version")
477
                title = contract["preamble"].get("title")
478
                description = contract["preamble"].get("description")
479
                license = contract["preamble"].get("license")
480
                assert (
481
                    contract["preamble"].get("plutusVersion") == "v3"
482
                ), "Only Plutus V3 supported"
483
                return PlutusContract(
484
                    contract_cbor,
485
                    datum_type,
486
                    redeemer_type,
487
                    parameter_types,
488
                    purpose,
489
                    version,
490
                    title,
491
                    description,
492
                    license,
493
                )
494
        except (ValueError, KeyError) as e:
495
            pass
496
    contract_cbor = None
497
    for contract_file in contract_candidates:
498
        with contract_file.open("r") as f:
499
            contract_content = f.read()
500
        # could be a singly wrapped cbor hex
501
        try:
502
            # try to unwrap to see if it is cbor
503
            contract_cbor_unwrapped = cbor2.loads(bytes.fromhex(contract_content))
504
            contract_cbor = bytes.fromhex(contract_content)
505
            # if we can unwrap again, its doubly wrapped
506
            try:
507
                cbor2.loads(contract_cbor_unwrapped)
508
                contract_cbor = contract_cbor_unwrapped
509
            except ValueError:
510
                pass
511
            break
512
        except ValueError:
513
            pass
514
        # could be a plutus json
515
        try:
516
            contract = json.loads(contract_content)
517
            contract_cbor = cbor2.loads(bytes.fromhex(contract["cborHex"]))
518
        except (ValueError, KeyError):
519
            pass
520
        # could be uplc
521
        try:
522
            contract_ast = uplc.parse(contract_content)
523
            contract_cbor = uplc.flatten(contract_ast)
524
        except:
525
            pass
526
    if contract_cbor is None:
527
        raise ValueError(f"Could not load contract from file {contract_path}")
528
    return PlutusContract(PlutusV3Script(contract_cbor))
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc