• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

OpShin / opshin / 865

pending completion
865

push

travis-ci-com

nielstron
Bump opshin version

1 of 1 new or added line in 1 file covered. (100.0%)

3728 of 4026 relevant lines covered (92.6%)

3.7 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

76.59
/opshin/__main__.py
1
import argparse
4✔
2
import cbor2
4✔
3
import enum
4✔
4
import importlib
4✔
5
import json
4✔
6
import pathlib
4✔
7
import sys
4✔
8
import typing
4✔
9
import ast
4✔
10

11
import pycardano
4✔
12
from pycardano import PlutusData
4✔
13

14
import uplc
4✔
15
import uplc.ast
4✔
16

17
from . import compiler, builder, prelude
4✔
18
from .util import CompilerError, data_from_json
4✔
19
from .prelude import ScriptContext
4✔
20

21

22
class Command(enum.Enum):
4✔
23
    compile_pluto = "compile_pluto"
4✔
24
    compile = "compile"
4✔
25
    eval = "eval"
4✔
26
    parse = "parse"
4✔
27
    eval_uplc = "eval_uplc"
4✔
28
    build = "build"
4✔
29
    lint = "lint"
4✔
30

31

32
class Purpose(enum.Enum):
4✔
33
    spending = "spending"
4✔
34
    minting = "minting"
4✔
35
    rewarding = "rewarding"
4✔
36
    certifying = "certifying"
4✔
37
    any = "any"
4✔
38

39

40
def plutus_data_from_json(annotation: typing.Type, x: dict):
4✔
41
    try:
4✔
42
        if annotation == int:
4✔
43
            return int(x["int"])
4✔
44
        if annotation == bytes:
4✔
45
            return bytes.fromhex(x["bytes"])
4✔
46
        if annotation is None:
×
47
            return None
×
48
        if isinstance(annotation, typing._GenericAlias):
×
49
            # Annotation is a List or Dict
50
            if annotation._name == "List":
×
51
                annotation_ann = annotation.__dict__["__args__"][0]
×
52
                return [plutus_data_from_json(annotation_ann, k) for k in x["list"]]
×
53
            if annotation._name == "Dict":
×
54
                annotation_key, annotation_val = annotation.__dict__["__args__"]
×
55
                return {
×
56
                    plutus_data_from_json(
57
                        annotation_key, d["k"]
58
                    ): plutus_data_from_json(annotation_val, d["v"])
59
                    for d in x["map"]
60
                }
61
        if issubclass(annotation, pycardano.PlutusData):
×
62
            return annotation.from_dict(x)
×
63
    except (KeyError, ValueError):
×
64
        raise ValueError(
×
65
            f"Annotation {annotation} does not match provided plutus datum {json.dumps(x)}"
66
        )
67

68

69
def plutus_data_from_cbor(annotation: typing.Type, x: bytes):
4✔
70
    try:
4✔
71
        if annotation in (int, bytes):
4✔
72
            return cbor2.loads(x)
×
73
        if annotation is None:
4✔
74
            return None
×
75
        if isinstance(annotation, typing._GenericAlias):
4✔
76
            # Annotation is a List or Dict
77
            if annotation._name == "List":
×
78
                annotation_ann = annotation.__dict__["__args__"][0]
×
79
                return [
×
80
                    plutus_data_from_cbor(annotation_ann, cbor2.dumps(k))
81
                    for k in cbor2.loads(x)
82
                ]
83
            if annotation._name == "Dict":
×
84
                annotation_key, annotation_val = annotation.__dict__["__args__"]
×
85
                return {
×
86
                    plutus_data_from_cbor(
87
                        annotation_key, cbor2.dumps(k)
88
                    ): plutus_data_from_cbor(annotation_val, v)
89
                    for k, v in cbor2.loads(x).items()
90
                }
91
        if issubclass(annotation, pycardano.PlutusData):
4✔
92
            return annotation.from_cbor(x)
4✔
93
    except (KeyError, ValueError):
×
94
        raise ValueError(
×
95
            f"Annotation {annotation} does not match provided plutus datum {x.hex()}"
96
        )
97

98

99
def check_params(
4✔
100
    command: Command,
101
    purpose: Purpose,
102
    validator_args,
103
    validator_params,
104
    force_three_params=False,
105
):
106
    if purpose == Purpose.any:
4✔
107
        # The any purpose does not do any checks. Use only if you know what you are doing
108
        return
4✔
109
    ret_type = validator_args[-1]
4✔
110
    # expect the validator to return None
111
    assert (
4✔
112
        ret_type is None or ret_type == prelude.Anything
113
    ), f"Expected contract to return None, but returns {ret_type}"
114

115
    num_onchain_params = 3 if purpose == Purpose.spending or force_three_params else 2
4✔
116
    onchain_params = validator_args[-1 - num_onchain_params : -1]
4✔
117
    param_types = validator_args[: -1 - num_onchain_params]
4✔
118

119
    if command in (Command.eval, Command.eval_uplc):
4✔
120
        assert len(validator_params) == len(param_types) + len(
4✔
121
            onchain_params
122
        ), f"{purpose.value.capitalize()} validator expects {len(param_types) + len(onchain_params)} parameters for evaluation, but only got {len(validator_params)}."
123
    else:
124
        assert len(validator_params) == len(
4✔
125
            param_types
126
        ), f"{purpose.value.capitalize()} validator expects {len(onchain_params)} parameters at evaluation (on-chain) and {len(param_types)} parameters at compilation time, but got {len(validator_params)} during compilation."
127
    assert (
4✔
128
        onchain_params[-1] == ScriptContext
129
    ), f"Last parameter of the validator is always ScriptContext, but is {onchain_params[-1].__name__} here."
130

131

132
def main():
4✔
133
    a = argparse.ArgumentParser(
4✔
134
        description="An evaluator and compiler from python into UPLC. Translate imperative programs into functional quasi-assembly."
135
    )
136
    a.add_argument(
4✔
137
        "command",
138
        type=str,
139
        choices=Command.__members__.keys(),
140
        help="The command to execute on the input file.",
141
    )
142
    a.add_argument(
4✔
143
        "purpose",
144
        type=str,
145
        choices=Purpose.__members__.keys(),
146
        help="The intended script purpose. Determines the number of on-chain parameters "
147
        "(spending = 3, minting, rewarding, certifying = 2, any = no checks). "
148
        "This allows the compiler to check whether the correct amount of parameters was passed during compilation.",
149
    )
150
    a.add_argument(
4✔
151
        "input_file", type=str, help="The input program to parse. Set to - for stdin."
152
    )
153
    a.add_argument(
4✔
154
        "-o",
155
        "--output-directory",
156
        default="",
157
        type=str,
158
        help="The output directory for artefacts of the build command. Defaults to the filename of the compiled contract. of the compiled contract.",
159
    )
160
    a.add_argument(
4✔
161
        "--force-three-params",
162
        action="store_true",
163
        help="Enforces that the contract is always called with three virtual parameters on-chain. Enable if the script should support spending and other purposes.",
164
    )
165
    a.add_argument(
4✔
166
        "args",
167
        nargs="*",
168
        default=[],
169
        help="Input parameters for the validator (parameterizes the contract for compile/build). Either json or CBOR notation.",
170
    )
171
    a.add_argument(
4✔
172
        "--output-format-json",
173
        action="store_true",
174
        help="Changes the output of the Linter to a json format.",
175
    )
176
    args = a.parse_args()
4✔
177
    command = Command(args.command)
4✔
178
    purpose = Purpose(args.purpose)
4✔
179
    input_file = args.input_file if args.input_file != "-" else sys.stdin
4✔
180
    force_three_params = args.force_three_params
4✔
181

182
    # read and import the contract
183
    with open(input_file, "r") as f:
4✔
184
        source_code = f.read()
4✔
185
    tmp_input_file = pathlib.Path("build").joinpath("__tmp_opshin.py")
4✔
186
    tmp_input_file.parent.mkdir(exist_ok=True)
4✔
187
    with tmp_input_file.open("w") as fp:
4✔
188
        fp.write(source_code)
4✔
189
    sys.path.append(str(pathlib.Path(tmp_input_file).parent.absolute()))
4✔
190
    sc = importlib.import_module(pathlib.Path(tmp_input_file).stem)
4✔
191
    sys.path.pop()
4✔
192
    # load the passed parameters
193
    annotations = list(sc.validator.__annotations__.values())
4✔
194
    if "return" not in sc.validator.__annotations__:
4✔
195
        annotations.append(prelude.Anything)
×
196
    parsed_params = []
4✔
197
    for i, (c, a) in enumerate(zip(annotations, args.args)):
4✔
198
        if a[0] == "{":
4✔
199
            try:
4✔
200
                param_json = json.loads(a)
4✔
201
            except Exception as e:
×
202
                raise ValueError(
×
203
                    f'Invalid parameter for contract passed at position {i}, expected json value, got "{a}". Did you correctly encode the value as json and wrap it in quotes?'
204
                ) from e
205
            try:
4✔
206
                param = plutus_data_from_json(c, param_json)
4✔
207
            except Exception as e:
×
208
                raise ValueError(
×
209
                    f"Invalid parameter for contract passed at position {i}, expected type {c.__name__}."
210
                ) from e
211
        else:
212
            try:
4✔
213
                param_bytes = bytes.fromhex(a)
4✔
214
            except Exception as e:
×
215
                raise ValueError(
×
216
                    "Expected hexadecimal CBOR representation of plutus datum but could not transform hex string to bytes."
217
                ) from e
218
            try:
4✔
219
                param = plutus_data_from_cbor(c, param_bytes)
4✔
220
            except Exception as e:
×
221
                raise ValueError(
×
222
                    f"Invalid parameter for contract passed at position {i}, expected type {c.__name__}."
223
                ) from e
224
        parsed_params.append(param)
4✔
225
    check_params(
4✔
226
        command,
227
        purpose,
228
        annotations,
229
        parsed_params,
230
        force_three_params,
231
    )
232

233
    if command == Command.eval:
4✔
234
        print("Starting execution")
4✔
235
        print("------------------")
4✔
236
        try:
4✔
237
            ret = sc.validator(*parsed_params)
4✔
238
        except Exception as e:
×
239
            print(f"Exception of type {type(e).__name__} caused")
×
240
            ret = e
×
241
        print("------------------")
4✔
242
        print(ret)
4✔
243

244
    source_ast = compiler.parse(source_code, filename=input_file)
4✔
245

246
    if command == Command.parse:
4✔
247
        print("Parsed successfully.")
×
248
        return
×
249

250
    try:
4✔
251
        code = compiler.compile(
4✔
252
            source_ast, filename=input_file, force_three_params=force_three_params
253
        )
254
    except CompilerError as c:
4✔
255
        # Generate nice error message from compiler error
256
        if not isinstance(c.node, ast.Module):
4✔
257
            source_seg = ast.get_source_segment(source_code, c.node)
4✔
258
            start_line = c.node.lineno - 1
4✔
259
            end_line = start_line + len(source_seg.splitlines())
4✔
260
            source_lines = "\n".join(source_code.splitlines()[start_line:end_line])
4✔
261
            pos_in_line = source_lines.find(source_seg)
4✔
262
        else:
263
            start_line = 0
×
264
            pos_in_line = 0
×
265
            source_lines = source_code.splitlines()[0]
×
266

267
        if command == Command.lint:
4✔
268
            if args.output_format_json:
4✔
269
                print(
4✔
270
                    convert_linter_to_json(
271
                        line=start_line,
272
                        column=pos_in_line,
273
                        error_class=c.orig_err.__class__.__name__,
274
                        message=str(c.orig_err),
275
                    )
276
                )
277
            else:
278
                print(
4✔
279
                    f"{args.input_file}:{start_line+1}:{pos_in_line}: {c.orig_err.__class__.__name__}: {c.orig_err}"
280
                )
281
            return
4✔
282

283
        overwrite_syntaxerror = len("SyntaxError: ") * "\b"
×
284
        raise SyntaxError(
×
285
            f"""\
286
{overwrite_syntaxerror}{c.orig_err.__class__.__name__}: {c.orig_err}
287
Note that opshin errors may be overly restrictive as they aim to prevent code with unintended consequences.
288
""",
289
            (
290
                args.input_file,
291
                start_line + 1,
292
                pos_in_line,
293
                source_lines,
294
            )
295
            # we remove chaining so that users to not see the internal trace back,
296
        ) from None
297

298
    if command == Command.compile_pluto:
4✔
299
        print(code.dumps())
4✔
300
        return
4✔
301
    code = code.compile()
4✔
302

303
    # apply parameters from the command line to the contract (instantiates parameterized contract!)
304
    code = code.term
4✔
305
    # UPLC lambdas may only take one argument at a time, so we evaluate by repeatedly applying
306
    for d in map(
4✔
307
        data_from_json, map(json.loads, (PlutusData.to_json(p) for p in parsed_params))
308
    ):
309
        code = uplc.ast.Apply(code, d)
4✔
310
    code = uplc.ast.Program((1, 0, 0), code)
4✔
311

312
    if command == Command.compile:
4✔
313
        print(code.dumps())
4✔
314
        return
4✔
315

316
    if command == Command.build:
4✔
317
        if args.output_directory == "":
4✔
318
            if args.input_file == "-":
4✔
319
                print(
×
320
                    "Please supply an output directory if no input file is specified."
321
                )
322
                exit(-1)
×
323
            target_dir = pathlib.Path("build") / pathlib.Path(input_file).stem
4✔
324
        else:
325
            target_dir = pathlib.Path(args.output_directory)
×
326
        target_dir.mkdir(exist_ok=True, parents=True)
4✔
327
        artifacts = builder.generate_artifacts(builder._build(code))
4✔
328
        with (target_dir / "script.cbor").open("w") as fp:
4✔
329
            fp.write(artifacts.cbor_hex)
4✔
330
        with (target_dir / "script.plutus").open("w") as fp:
4✔
331
            fp.write(artifacts.plutus_json)
4✔
332
        with (target_dir / "script.policy_id").open("w") as fp:
4✔
333
            fp.write(artifacts.policy_id)
4✔
334
        with (target_dir / "mainnet.addr").open("w") as fp:
4✔
335
            fp.write(artifacts.mainnet_addr)
4✔
336
        with (target_dir / "testnet.addr").open("w") as fp:
4✔
337
            fp.write(artifacts.testnet_addr)
4✔
338

339
        print(f"Wrote script artifacts to {target_dir}/")
4✔
340
        return
4✔
341
    if command == Command.eval_uplc:
4✔
342
        print("Starting execution")
4✔
343
        print("------------------")
4✔
344
        assert isinstance(code, uplc.ast.Program)
4✔
345
        try:
4✔
346
            ret = uplc.dumps(uplc.eval(code))
4✔
347
        except Exception as e:
×
348
            print("An exception was raised")
×
349
            ret = e
×
350
        print("------------------")
4✔
351
        print(ret)
4✔
352

353

354
def convert_linter_to_json(
4✔
355
    line: int,
356
    column: int,
357
    error_class: str,
358
    message: str,
359
):
360
    # output in lists
361
    # TODO: possible to detect more than one error at once?
362
    return json.dumps(
4✔
363
        [
364
            {
365
                "line": line,
366
                "column": column,
367
                "error_class": error_class,
368
                "message": message,
369
            }
370
        ]
371
    )
372

373

374
if __name__ == "__main__":
4✔
375
    main()
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc