• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OpShin / opshin / #610820567

01 Oct 2023 08:05PM UTC coverage: 92.873%. First build
#610820567

Pull #265

travis-ci

Pull Request #265: Enable code reuse through patternization

381 of 381 new or added lines in 11 files covered. (100.0%)

4743 of 5107 relevant lines covered (92.87%)

2.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

92.0
/opshin/builder.py
1
import copy
3✔
2
import dataclasses
3✔
3
import enum
3✔
4
import functools
3✔
5
import json
3✔
6
import typing
7
from ast import Module
3✔
8
from typing import Optional, Any, Union
9
from pathlib import Path
3✔
10

3✔
11
from pycardano import PlutusV2Script, IndefiniteList, PlutusData, Datum
3✔
12

3✔
13
from . import __version__, compiler
3✔
14

15
import uplc.ast
3✔
16
from uplc import flatten, ast as uplc_ast, eval as uplc_eval
17
import cbor2
18
import pycardano
3✔
19
from pluthon import compile as plt_compile
3✔
20

3✔
21
from .util import datum_to_cbor
3✔
22
from .compiler_config import DEFAULT_CONFIG
3✔
23

3✔
24

3✔
25
class Purpose(enum.Enum):
26
    spending = "spending"
27
    minting = "minting"
3✔
28
    rewarding = "rewarding"
29
    certifying = "certifying"
30
    any = "any"
31
    lib = "lib"
32

33

34
@dataclasses.dataclass
35
class PlutusContract:
36
    contract: PlutusV2Script
37
    datum_type: Optional[typing.Tuple[str, typing.Type[Datum]]] = None
3✔
38
    redeemer_type: Optional[typing.Tuple[str, typing.Type[Datum]]] = None
39
    parameter_types: typing.List[typing.Tuple[str, typing.Type[Datum]]] = (
40
        dataclasses.field(default_factory=list)
41
    )
42
    purpose: typing.Iterable[Purpose] = (Purpose.any,)
43
    version: Optional[str] = "1.0.0"
44
    title: str = "validator"
45
    description: Optional[str] = f"opshin {__version__} Smart Contract"
46
    license: Optional[str] = None
3✔
47

3✔
48
    @property
49
    def cbor(self) -> bytes:
50
        return self.contract
3✔
51

52
    @property
53
    def cbor_hex(self) -> str:
54
        return self.contract.hex()
55

56
    @property
57
    def script_hash(self):
58
        return pycardano.plutus_script_hash(self.contract)
59

60
    @property
61
    def policy_id(self):
62
        return self.script_hash.to_primitive().hex()
63

64
    @property
65
    def mainnet_addr(self):
3✔
66
        return pycardano.Address(self.script_hash, network=pycardano.Network.MAINNET)
3✔
67

68
    @property
69
    def testnet_addr(self):
70
        return pycardano.Address(self.script_hash, network=pycardano.Network.TESTNET)
71

72
    @property
73
    def plutus_json(self):
74
        return json.dumps(
75
            {
76
                "type": "PlutusScriptV2",
77
                "description": self.description,
78
                "cborHex": self.cbor_hex,
3✔
79
            },
80
            indent=2,
3✔
81
        )
3✔
82

83
    @property
84
    def blueprint(self):
85
        return {
86
            "$schema": "https://cips.cardano.org/cips/cip57/schemas/plutus-blueprint.json",
87
            "$id": "https://github.com/aiken-lang/aiken/blob/main/examples/hello_world/plutus.json",
3✔
88
            "$vocabulary": {
3✔
89
                "https://json-schema.org/draft/2020-12/vocab/core": True,
90
                "https://json-schema.org/draft/2020-12/vocab/applicator": True,
91
                "https://json-schema.org/draft/2020-12/vocab/validation": True,
3✔
92
                "https://cips.cardano.org/cips/cip57": True,
93
            },
94
            "preamble": {
95
                "version": self.version,
96
                "plutusVersion": "v2",
97
                "description": self.description,
98
                "title": self.title,
99
                **({"license": self.license} if self.license is not None else {}),
100
            },
101
            "validators": [
×
102
                {
×
103
                    "title": self.title,
×
104
                    **(
105
                        {
106
                            "datum": {
107
                                "title": self.datum_type[0],
108
                                "purpose": PURPOSE_MAP[Purpose.spending],
109
                                "schema": to_plutus_schema(self.datum_type[1]),
110
                            }
111
                        }
×
112
                        if self.datum_type is not None
113
                        else {}
114
                    ),
3✔
115
                    "redeemer": (
116
                        {
3✔
117
                            "title": self.redeemer_type[0],
3✔
118
                            "purpose": {
119
                                "oneOf": [PURPOSE_MAP[p] for p in self.purpose]
120
                            },
3✔
121
                            "schema": to_plutus_schema(self.redeemer_type[1]),
3✔
122
                        }
123
                        if self.redeemer_type is not None
3✔
124
                        else {}
3✔
125
                    ),
126
                    **(
3✔
127
                        {
128
                            "parameters": [
129
                                {
130
                                    "title": t[0],
131
                                    "purpose": PURPOSE_MAP[Purpose.spending],
3✔
132
                                    "schema": to_plutus_schema(t[1]),
3✔
133
                                }
3✔
134
                                for t in self.parameter_types
135
                            ]
3✔
136
                        }
137
                        if self.parameter_types
138
                        else {}
139
                    ),
3✔
140
                    "compiledCode": self.cbor_hex,
141
                    "hash": self.policy_id,
142
                },
3✔
143
            ],
144
        }
145

146
    def apply_parameter(self, *args: pycardano.Datum):
147
        """
148
        Returns a new OpShin Contract with the applied parameters
149
        """
150
        # update the parameters in the blueprint (remove applied parameters)
151
        assert len(self.parameter_types) >= len(
152
            args
153
        ), f"Applying too many parameters to contract, allowed amount: {self.parameter_types}, but got {len(args)}"
154
        new_parameter_types = copy.copy(self.parameter_types)
155
        for _ in args:
156
            # TODO validate that the applied parameters are of the correct type
157
            new_parameter_types.pop(0)
158
        new_contract_contract = apply_parameters(self.contract, *args)
159
        new_contract = PlutusContract(
160
            new_contract_contract,
161
            self.datum_type,
162
            self.redeemer_type,
163
            new_parameter_types,
164
            self.purpose,
165
            self.version,
166
            self.title,
167
            self.description,
168
        )
169
        return new_contract
170

171
    def dump(self, target_dir: Union[str, Path]):
172
        target_dir = Path(target_dir)
173
        target_dir.mkdir(exist_ok=True, parents=True)
174
        with (target_dir / "script.cbor").open("w") as fp:
175
            fp.write(self.cbor_hex)
176
        with (target_dir / "script.plutus").open("w") as fp:
177
            fp.write(self.plutus_json)
178
        with (target_dir / "script.policy_id").open("w") as fp:
179
            fp.write(self.policy_id)
180
        with (target_dir / "mainnet.addr").open("w") as fp:
181
            fp.write(self.mainnet_addr.encode())
182
        with (target_dir / "testnet.addr").open("w") as fp:
183
            fp.write(self.testnet_addr.encode())
184
        with (target_dir / "blueprint.json").open("w") as fp:
185
            json.dump(self.blueprint, fp, indent=2)
186

187

188
def compile(
189
    program: Module,
190
    contract_filename: Optional[str] = None,
191
    validator_function_name="validator",
192
    config=DEFAULT_CONFIG,
193
) -> uplc_ast.Program:
194
    code = compiler.compile(
195
        program,
196
        filename=contract_filename,
197
        validator_function_name=validator_function_name,
198
        config=config,
199
    )
200
    plt_code = plt_compile(code, config)
201
    return plt_code
202

203

204
@functools.lru_cache(maxsize=32)
205
def _static_compile(
206
    source_code: str,
207
    contract_file: str = "<unknown>",
208
    validator_function_name="validator",
209
    config=DEFAULT_CONFIG,
210
):
211
    """
212
    Expects a python module and returns the build artifacts from compiling it
213
    """
214

215
    source_ast = compiler.parse(source_code, filename=contract_file)
216
    code = compile(
217
        source_ast,
218
        contract_filename=contract_file,
219
        validator_function_name=validator_function_name,
220
        config=config,
221
    )
222
    return code
223

224

225
def _compile(
226
    source_code: str,
227
    *args: typing.Union[pycardano.Datum, uplc_ast.Constant],
228
    contract_file: str = "<unknown>",
229
    validator_function_name="validator",
230
    config=DEFAULT_CONFIG,
231
):
232
    """
233
    Expects a python module and returns the build artifacts from compiling it
234
    """
235

236
    code = _static_compile(
237
        source_code,
238
        contract_file=contract_file,
239
        validator_function_name=validator_function_name,
240
        config=config,
241
    )
242
    code = _apply_parameters(code, *args)
243
    return code
244

245

246
def build(
247
    contract_file: str,
248
    *args: typing.Union[pycardano.Datum, uplc_ast.Constant],
249
    validator_function_name="validator",
250
    config=DEFAULT_CONFIG,
251
):
252
    """
253
    Expects a python module and returns the build artifacts from compiling it
254
    """
255
    with open(contract_file) as f:
256
        source_code = f.read()
257
    code = _compile(
258
        source_code,
259
        *args,
260
        contract_file=contract_file,
261
        validator_function_name=validator_function_name,
262
        config=config,
263
    )
264
    return _build(code)
265

266

267
def _build(contract: uplc.ast.Program):
268
    # create cbor file for use with pycardano/lucid
269
    cbor = flatten(contract)
270
    return pycardano.PlutusV2Script(cbor)
271

272

273
PURPOSE_MAP = {
274
    Purpose.any: {"oneOf": ["spend", "mint", "withdraw", "publish"]},
275
    Purpose.spending: "spend",
276
    Purpose.minting: "mint",
277
    Purpose.rewarding: "withdraw",
278
    Purpose.certifying: "publish",
279
}
280

281

282
def to_plutus_schema(cls: typing.Type[Datum]) -> dict:
283
    """
284
    Convert to a dictionary representing a json schema according to CIP 57 Plutus Blueprint
285
    Reference of the core structure:
286
    https://cips.cardano.org/cips/cip57/#corevocabulary
287

288
    Args:
289
        **kwargs: Extra key word arguments to be passed to `json.dumps()`
290

291
    Returns:
292
        dict: a dict representing the schema of this class.
293
    """
294
    if hasattr(cls, "__origin__") and cls.__origin__ is list:
295
        return {
296
            "dataType": "list",
297
            **(
298
                {"items": to_plutus_schema(cls.__args__[0])}
299
                if hasattr(cls, "__args__")
300
                else {}
301
            ),
302
        }
303
    elif hasattr(cls, "__origin__") and cls.__origin__ is dict:
304
        return {
305
            "dataType": "map",
306
            **(
307
                {
308
                    "keys": to_plutus_schema(cls.__args__[0]),
309
                    "values": to_plutus_schema(cls.__args__[1]),
310
                }
311
                if hasattr(cls, "__args__")
312
                else {}
313
            ),
314
        }
315
    elif hasattr(cls, "__origin__") and cls.__origin__ is Union:
316
        return {
317
            "anyOf": (
318
                [to_plutus_schema(t) for t in cls.__args__]
319
                if hasattr(cls, "__args__")
320
                else []
321
            )
322
        }
323
    elif issubclass(cls, PlutusData):
324
        fields = []
325
        for field_value in cls.__dataclass_fields__.values():
326
            if field_value.name == "CONSTR_ID":
327
                continue
328
            field_schema = to_plutus_schema(field_value.type)
329
            field_schema["title"] = field_value.name
330
            fields.append(field_schema)
331
        return {
332
            "dataType": "constructor",
333
            "index": cls.CONSTR_ID,
334
            "fields": fields,
335
            "title": cls.__name__,
336
        }
337
    elif issubclass(cls, bytes):
338
        return {"dataType": "bytes"}
339
    elif issubclass(cls, int):
340
        return {"dataType": "integer"}
341
    elif issubclass(cls, IndefiniteList) or issubclass(cls, list):
342
        return {"dataType": "list"}
343
    else:
344
        return {}
345

346

347
def from_plutus_schema(schema: dict) -> typing.Type[pycardano.Datum]:
348
    """
349
    Convert from a dictionary representing a json schema according to CIP 57 Plutus Blueprint
350
    """
351
    if schema == {}:
352
        return pycardano.Datum
353
    if "anyOf" in schema:
354
        if len(schema["anyOf"]) == 0:
355
            raise ValueError("Cannot convert empty anyOf schema")
356
        union_t = typing.Union[from_plutus_schema(schema["anyOf"][0])]
357
        for s in schema["anyOf"][1:]:
358
            union_t = typing.Union[union_t, from_plutus_schema(s)]
359
        return union_t
360
    if "dataType" in schema:
361
        typ = schema["dataType"]
362
        if typ == "bytes":
363
            return bytes
364
        elif typ == "integer":
365
            return int
366
        elif typ == "list":
367
            if "items" in schema:
368
                return typing.List[from_plutus_schema(schema["items"])]
369
            else:
370
                return typing.List[pycardano.Datum]
371
        elif typ == "map":
372
            key_t = (
373
                from_plutus_schema(schema["keys"])
374
                if "keys" in schema
375
                else pycardano.Datum
376
            )
377
            value_t = (
378
                from_plutus_schema(schema["values"])
379
                if "values" in schema
380
                else pycardano.Datum
381
            )
382
            return typing.Dict[key_t, value_t]
383
        elif typ == "constructor":
384
            fields = {}
385
            for field in schema["fields"]:
386
                fields[field["title"]] = from_plutus_schema(field)
387
            fields["CONSTR_ID"] = schema["index"]
388
            return dataclasses.dataclass(
389
                type(schema["title"], (pycardano.PlutusData,), fields)
390
            )
391
    raise ValueError(f"Cannot read schema (not supported yet) {schema}")
392

393

394
def apply_parameters(script: PlutusV2Script, *args: pycardano.Datum):
395
    """
396
    Expects a plutus script (compiled) and returns the compiled script from applying parameters to it
397
    """
398
    return _build(_apply_parameters(uplc.unflatten(script), *args))
399

400

401
def _apply_parameters(script: uplc.ast.Program, *args: pycardano.Datum):
402
    """
403
    Expects a UPLC program and returns the build artifacts from applying parameters to it
404
    """
405
    # apply parameters from the command line to the contract (instantiates parameterized contract!)
406
    code = script.term
407
    # UPLC lambdas may only take one argument at a time, so we evaluate by repeatedly applying
408
    for d in args:
409
        code = uplc.ast.Apply(
410
            code,
411
            (
412
                uplc.ast.data_from_cbor(datum_to_cbor(d))
413
                if not isinstance(d, uplc_ast.Constant)
414
                else d
415
            ),
416
        )
417
    code = uplc.ast.Program((1, 0, 0), code)
418
    return code
419

420

421
def load(contract_path: Union[Path, str]) -> PlutusContract:
422
    """
423
    Load a contract from a file or directory and generate the artifacts
424
    """
425
    if isinstance(contract_path, str):
426
        contract_path = Path(contract_path)
427
    if contract_path.is_dir():
428
        contract_candidates = list(contract_path.iterdir())
429
    elif contract_path.is_file():
430
        contract_candidates = [contract_path]
431
    else:
432
        raise ValueError(
433
            f"Invalid contract path, is neither file nor directory: {contract_path}"
434
        )
435
    for contract_file in contract_candidates:
436
        with contract_file.open("r") as f:
437
            contract_content = f.read()
438
        # could be a plutus blueprint
439
        try:
440
            contract = json.loads(contract_content)
441
            if "validators" in contract:
442
                assert len(contract["validators"]) == 1, "Only one validator supported"
443
                validator = contract["validators"][0]
444
                contract_cbor = PlutusV2Script(bytes.fromhex(validator["compiledCode"]))
445
                datum_type = (
446
                    validator["datum"]["title"],
447
                    from_plutus_schema(validator["datum"]["schema"]),
448
                )
449
                redeemer_type = (
450
                    validator["redeemer"]["title"],
451
                    from_plutus_schema(validator["redeemer"]["schema"]),
452
                )
453
                parameter_types = [
454
                    (p["title"], from_plutus_schema(p["schema"]))
455
                    for p in validator["parameters"]
456
                ]
457
                if "oneOf" in validator["redeemer"]["purpose"]:
458
                    purpose = [
459
                        k
460
                        for k, v in PURPOSE_MAP.items()
461
                        if v in validator["redeemer"]["purpose"]["oneOf"]
462
                    ]
463
                else:
464
                    purpose = [
465
                        k
466
                        for k, v in PURPOSE_MAP.items()
467
                        if v == validator["redeemer"]["purpose"]
468
                    ]
469
                version = contract["preamble"].get("version")
470
                title = contract["preamble"].get("title")
471
                description = contract["preamble"].get("description")
472
                license = contract["preamble"].get("license")
473
                assert (
474
                    contract["preamble"].get("plutusVersion") == "v2"
475
                ), "Only Plutus V2 supported"
476
                return PlutusContract(
477
                    contract_cbor,
478
                    datum_type,
479
                    redeemer_type,
480
                    parameter_types,
481
                    purpose,
482
                    version,
483
                    title,
484
                    description,
485
                    license,
486
                )
487
        except (ValueError, KeyError) as e:
488
            pass
489
    contract_cbor = None
490
    for contract_file in contract_candidates:
491
        with contract_file.open("r") as f:
492
            contract_content = f.read()
493
        # could be a singly wrapped cbor hex
494
        try:
495
            # try to unwrap to see if it is cbor
496
            contract_cbor_unwrapped = cbor2.loads(bytes.fromhex(contract_content))
497
            contract_cbor = bytes.fromhex(contract_content)
498
            # if we can unwrap again, its doubly wrapped
499
            try:
500
                cbor2.loads(contract_cbor_unwrapped)
501
                contract_cbor = contract_cbor_unwrapped
502
            except ValueError:
503
                pass
504
            break
505
        except ValueError:
506
            pass
507
        # could be a plutus json
508
        try:
509
            contract = json.loads(contract_content)
510
            contract_cbor = cbor2.loads(bytes.fromhex(contract["cborHex"]))
511
        except (ValueError, KeyError):
512
            pass
513
        # could be uplc
514
        try:
515
            contract_ast = uplc.parse(contract_content)
516
            contract_cbor = uplc.flatten(contract_ast)
517
        except:
518
            pass
519
    if contract_cbor is None:
520
        raise ValueError(f"Could not load contract from file {contract_path}")
521
    return PlutusContract(PlutusV2Script(contract_cbor))
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc