• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 6028015529885696

28 Oct 2025 10:43AM UTC coverage: 61.447% (-0.008%) from 61.455%
6028015529885696

push

CirrusCI

ecdsa
notary plugin

22938 of 37330 relevant lines covered (61.45%)

0.61 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.24
/electrum/crypto.py
1
# -*- coding: utf-8 -*-
2
#
3
# Electrum - lightweight Bitcoin client
4
# Copyright (C) 2018 The Electrum developers
5
#
6
# Permission is hereby granted, free of charge, to any person
7
# obtaining a copy of this software and associated documentation files
8
# (the "Software"), to deal in the Software without restriction,
9
# including without limitation the rights to use, copy, modify, merge,
10
# publish, distribute, sublicense, and/or sell copies of the Software,
11
# and to permit persons to whom the Software is furnished to do so,
12
# subject to the following conditions:
13
#
14
# The above copyright notice and this permission notice shall be
15
# included in all copies or substantial portions of the Software.
16
#
17
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
21
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
22
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
23
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24
# SOFTWARE.
25

26
import base64
1✔
27
import binascii
1✔
28
import os
1✔
29
import sys
1✔
30
import hashlib
1✔
31
import hmac
1✔
32
from typing import Union, Mapping, Optional
1✔
33

34
import electrum_ecc as ecc
1✔
35

36
from .util import assert_bytes, InvalidPassword, to_bytes, to_string, WalletFileException, versiontuple
1✔
37
from .i18n import _
1✔
38
from .logging import get_logger
1✔
39

40
_logger = get_logger(__name__)
1✔
41

42

43
HAS_PYAES = False
1✔
44
try:
1✔
45
    import pyaes
1✔
46
except Exception:
×
47
    pass
×
48
else:
49
    HAS_PYAES = True
1✔
50

51
HAS_CRYPTODOME = False
1✔
52
MIN_CRYPTODOME_VERSION = "3.7"
1✔
53
try:
1✔
54
    import Cryptodome
1✔
55
    if versiontuple(Cryptodome.__version__) < versiontuple(MIN_CRYPTODOME_VERSION):
1✔
56
        _logger.warning(f"found module 'Cryptodome' but it is too old: {Cryptodome.__version__}<{MIN_CRYPTODOME_VERSION}")
×
57
        raise Exception()
×
58
    from Cryptodome.Cipher import ChaCha20_Poly1305 as CD_ChaCha20_Poly1305
1✔
59
    from Cryptodome.Cipher import ChaCha20 as CD_ChaCha20
1✔
60
    from Cryptodome.Cipher import AES as CD_AES
1✔
61
except Exception:
×
62
    pass
×
63
else:
64
    HAS_CRYPTODOME = True
1✔
65

66
HAS_CRYPTOGRAPHY = False
1✔
67
MIN_CRYPTOGRAPHY_VERSION = "2.1"
1✔
68
try:
1✔
69
    import cryptography
1✔
70
    if versiontuple(cryptography.__version__) < versiontuple(MIN_CRYPTOGRAPHY_VERSION):
1✔
71
        _logger.warning(f"found module 'cryptography' but it is too old: {cryptography.__version__}<{MIN_CRYPTOGRAPHY_VERSION}")
×
72
        raise Exception()
×
73
    from cryptography import exceptions
1✔
74
    from cryptography.hazmat.primitives.ciphers import Cipher as CG_Cipher
1✔
75
    from cryptography.hazmat.primitives.ciphers import algorithms as CG_algorithms
1✔
76
    from cryptography.hazmat.primitives.ciphers import modes as CG_modes
1✔
77
    from cryptography.hazmat.backends import default_backend as CG_default_backend
1✔
78
    import cryptography.hazmat.primitives.ciphers.aead as CG_aead
1✔
79
except Exception:
×
80
    pass
×
81
else:
82
    HAS_CRYPTOGRAPHY = True
1✔
83

84

85
if not (HAS_CRYPTODOME or HAS_CRYPTOGRAPHY):
1✔
86
    sys.exit(f"Error: at least one of ('pycryptodomex', 'cryptography') needs to be installed.")
×
87

88

89
def version_info() -> Mapping[str, Optional[str]]:
1✔
90
    ret = {}
×
91
    if HAS_PYAES:
×
92
        ret["pyaes.version"] = ".".join(map(str, pyaes.VERSION[:3]))
×
93
    else:
94
        ret["pyaes.version"] = None
×
95
    if HAS_CRYPTODOME:
×
96
        ret["cryptodome.version"] = Cryptodome.__version__
×
97
        if hasattr(Cryptodome, "__path__"):
×
98
            ret["cryptodome.path"] = ", ".join(Cryptodome.__path__ or [])
×
99
    else:
100
        ret["cryptodome.version"] = None
×
101
    if HAS_CRYPTOGRAPHY:
×
102
        ret["cryptography.version"] = cryptography.__version__
×
103
        if hasattr(cryptography, "__path__"):
×
104
            ret["cryptography.path"] = ", ".join(cryptography.__path__ or [])
×
105
    else:
106
        ret["cryptography.version"] = None
×
107
    return ret
×
108

109

110
class InvalidPadding(Exception):
1✔
111
    pass
1✔
112

113

114
class CiphertextFormatError(Exception):
1✔
115
    pass
1✔
116

117

118
def append_PKCS7_padding(data: bytes) -> bytes:
1✔
119
    assert_bytes(data)
1✔
120
    padlen = 16 - (len(data) % 16)
1✔
121
    return data + bytes([padlen]) * padlen
1✔
122

123

124
def strip_PKCS7_padding(data: bytes) -> bytes:
1✔
125
    assert_bytes(data)
1✔
126
    if len(data) % 16 != 0 or len(data) == 0:
1✔
127
        raise InvalidPadding("invalid length")
×
128
    padlen = data[-1]
1✔
129
    if not (0 < padlen <= 16):
1✔
130
        raise InvalidPadding("invalid padding byte (out of range)")
1✔
131
    for i in data[-padlen:]:
1✔
132
        if i != padlen:
1✔
133
            raise InvalidPadding("invalid padding byte (inconsistent)")
×
134
    return data[0:-padlen]
1✔
135

136

137
def aes_encrypt_with_iv(key: bytes, iv: bytes, data: bytes) -> bytes:
1✔
138
    assert_bytes(key, iv, data)
1✔
139
    data = append_PKCS7_padding(data)
1✔
140
    if HAS_CRYPTODOME:
1✔
141
        e = CD_AES.new(key, CD_AES.MODE_CBC, iv).encrypt(data)
1✔
142
    elif HAS_CRYPTOGRAPHY:
1✔
143
        cipher = CG_Cipher(CG_algorithms.AES(key), CG_modes.CBC(iv), backend=CG_default_backend())
1✔
144
        encryptor = cipher.encryptor()
1✔
145
        e = encryptor.update(data) + encryptor.finalize()
1✔
146
    elif HAS_PYAES:
1✔
147
        aes_cbc = pyaes.AESModeOfOperationCBC(key, iv=iv)
1✔
148
        aes = pyaes.Encrypter(aes_cbc, padding=pyaes.PADDING_NONE)
1✔
149
        e = aes.feed(data) + aes.feed()  # empty aes.feed() flushes buffer
1✔
150
    else:
151
        raise Exception("no AES backend found")
×
152
    return e
1✔
153

154

155
def aes_decrypt_with_iv(key: bytes, iv: bytes, data: bytes) -> bytes:
1✔
156
    assert_bytes(key, iv, data)
1✔
157
    if HAS_CRYPTODOME:
1✔
158
        cipher = CD_AES.new(key, CD_AES.MODE_CBC, iv)
1✔
159
        data = cipher.decrypt(data)
1✔
160
    elif HAS_CRYPTOGRAPHY:
1✔
161
        cipher = CG_Cipher(CG_algorithms.AES(key), CG_modes.CBC(iv), backend=CG_default_backend())
1✔
162
        decryptor = cipher.decryptor()
1✔
163
        data = decryptor.update(data) + decryptor.finalize()
1✔
164
    elif HAS_PYAES:
1✔
165
        aes_cbc = pyaes.AESModeOfOperationCBC(key, iv=iv)
1✔
166
        aes = pyaes.Decrypter(aes_cbc, padding=pyaes.PADDING_NONE)
1✔
167
        data = aes.feed(data) + aes.feed()  # empty aes.feed() flushes buffer
1✔
168
    else:
169
        raise Exception("no AES backend found")
×
170
    try:
1✔
171
        return strip_PKCS7_padding(data)
1✔
172
    except InvalidPadding:
1✔
173
        raise InvalidPassword()
1✔
174

175

176
def EncodeAES_bytes(secret: bytes, msg: bytes) -> bytes:
1✔
177
    assert_bytes(msg)
1✔
178
    iv = bytes(os.urandom(16))
1✔
179
    ct = aes_encrypt_with_iv(secret, iv, msg)
1✔
180
    return iv + ct
1✔
181

182

183
def DecodeAES_bytes(secret: bytes, ciphertext: bytes) -> bytes:
1✔
184
    assert_bytes(ciphertext)
1✔
185
    iv, e = ciphertext[:16], ciphertext[16:]
1✔
186
    s = aes_decrypt_with_iv(secret, iv, e)
1✔
187
    return s
1✔
188

189

190
PW_HASH_VERSION_LATEST = 1
1✔
191
KNOWN_PW_HASH_VERSIONS = (1, 2,)
1✔
192
SUPPORTED_PW_HASH_VERSIONS = (1,)
1✔
193
assert PW_HASH_VERSION_LATEST in KNOWN_PW_HASH_VERSIONS
1✔
194
assert PW_HASH_VERSION_LATEST in SUPPORTED_PW_HASH_VERSIONS
1✔
195

196

197
class UnexpectedPasswordHashVersion(InvalidPassword, WalletFileException):
1✔
198
    def __init__(self, version):
1✔
199
        InvalidPassword.__init__(self)
×
200
        WalletFileException.__init__(self)
×
201
        self.version = version
×
202

203
    def __str__(self):
1✔
204
        return "{unexpected}: {version}\n{instruction}".format(
×
205
            unexpected=_("Unexpected password hash version"),
206
            version=self.version,
207
            instruction=_('You are most likely using an outdated version of Electrum. Please update.'))
208

209

210
class UnsupportedPasswordHashVersion(InvalidPassword, WalletFileException):
1✔
211
    def __init__(self, version):
1✔
212
        InvalidPassword.__init__(self)
×
213
        WalletFileException.__init__(self)
×
214
        self.version = version
×
215

216
    def __str__(self):
1✔
217
        return "{unsupported}: {version}\n{instruction}".format(
×
218
            unsupported=_("Unsupported password hash version"),
219
            version=self.version,
220
            instruction=f"To open this wallet, try 'git checkout password_v{self.version}'.\n"
221
                        "Alternatively, restore from seed.")
222

223

224
def _hash_password(password: Union[bytes, str], *, version: int) -> bytes:
1✔
225
    pw = to_bytes(password, 'utf8')
1✔
226
    if version not in SUPPORTED_PW_HASH_VERSIONS:
1✔
227
        raise UnsupportedPasswordHashVersion(version)
×
228
    if version == 1:
1✔
229
        return sha256d(pw)
1✔
230
    else:
231
        assert version not in KNOWN_PW_HASH_VERSIONS
×
232
        raise UnexpectedPasswordHashVersion(version)
×
233

234

235
def _pw_encode_raw(data: bytes, password: Union[bytes, str], *, version: int) -> bytes:
1✔
236
    if version not in KNOWN_PW_HASH_VERSIONS:
1✔
237
        raise UnexpectedPasswordHashVersion(version)
×
238
    # derive key from password
239
    secret = _hash_password(password, version=version)
1✔
240
    # encrypt given data
241
    ciphertext = EncodeAES_bytes(secret, data)
1✔
242
    return ciphertext
1✔
243

244

245
def _pw_decode_raw(data_bytes: bytes, password: Union[bytes, str], *, version: int) -> bytes:
1✔
246
    if version not in KNOWN_PW_HASH_VERSIONS:
1✔
247
        raise UnexpectedPasswordHashVersion(version)
×
248
    # derive key from password
249
    secret = _hash_password(password, version=version)
1✔
250
    # decrypt given data
251
    try:
1✔
252
        d = DecodeAES_bytes(secret, data_bytes)
1✔
253
    except Exception as e:
1✔
254
        raise InvalidPassword() from e
1✔
255
    return d
1✔
256

257

258
def pw_encode_bytes(data: bytes, password: Union[bytes, str], *, version: int) -> str:
1✔
259
    """plaintext bytes -> base64 ciphertext"""
260
    ciphertext = _pw_encode_raw(data, password, version=version)
1✔
261
    ciphertext_b64 = base64.b64encode(ciphertext)
1✔
262
    return ciphertext_b64.decode('utf8')
1✔
263

264

265
def pw_decode_bytes(data: str, password: Union[bytes, str], *, version:int) -> bytes:
1✔
266
    """base64 ciphertext -> plaintext bytes"""
267
    if version not in KNOWN_PW_HASH_VERSIONS:
1✔
268
        raise UnexpectedPasswordHashVersion(version)
×
269
    try:
1✔
270
        data_bytes = bytes(base64.b64decode(data, validate=True))
1✔
271
    except binascii.Error as e:
×
272
        raise CiphertextFormatError("ciphertext not valid base64") from e
×
273
    return _pw_decode_raw(data_bytes, password, version=version)
1✔
274

275

276
def pw_encode_with_version_and_mac(data: bytes, password: Union[bytes, str]) -> str:
1✔
277
    """plaintext bytes -> base64 ciphertext"""
278
    # https://crypto.stackexchange.com/questions/202/should-we-mac-then-encrypt-or-encrypt-then-mac
279
    # Encrypt-and-MAC. The MAC will be used to detect invalid passwords
280
    version = PW_HASH_VERSION_LATEST
×
281
    mac = sha256(data)[0:4]
×
282
    ciphertext = _pw_encode_raw(data, password, version=version)
×
283
    ciphertext_b64 = base64.b64encode(bytes([version]) + ciphertext + mac)
×
284
    return ciphertext_b64.decode('utf8')
×
285

286

287
def pw_decode_with_version_and_mac(data: str, password: Union[bytes, str]) -> bytes:
1✔
288
    """base64 ciphertext -> plaintext bytes"""
289
    try:
1✔
290
        data_bytes = bytes(base64.b64decode(data, validate=True))
1✔
291
    except binascii.Error as e:
×
292
        raise CiphertextFormatError("ciphertext not valid base64") from e
×
293
    version = int(data_bytes[0])
1✔
294
    encrypted = data_bytes[1:-4]
1✔
295
    mac = data_bytes[-4:]
1✔
296
    if version not in KNOWN_PW_HASH_VERSIONS:
1✔
297
        raise UnexpectedPasswordHashVersion(version)
×
298
    decrypted = _pw_decode_raw(encrypted, password, version=version)
1✔
299
    if sha256(decrypted)[0:4] != mac:
1✔
300
        raise InvalidPassword()
×
301
    return decrypted
1✔
302

303

304
def pw_encode(data: str, password: Union[bytes, str, None], *, version: int) -> str:
1✔
305
    """plaintext str -> base64 ciphertext"""
306
    if not password:
1✔
307
        return data
1✔
308
    plaintext_bytes = to_bytes(data, "utf8")
1✔
309
    return pw_encode_bytes(plaintext_bytes, password, version=version)
1✔
310

311

312
def pw_decode(data: str, password: Union[bytes, str, None], *, version: int) -> str:
1✔
313
    """base64 ciphertext -> plaintext str"""
314
    if password is None:
1✔
315
        return data
1✔
316
    plaintext_bytes = pw_decode_bytes(data, password, version=version)
1✔
317
    try:
1✔
318
        plaintext_str = to_string(plaintext_bytes, "utf8")
1✔
319
    except UnicodeDecodeError as e:
1✔
320
        raise InvalidPassword() from e
1✔
321
    return plaintext_str
1✔
322

323

324
def sha256(x: Union[bytes, str]) -> bytes:
1✔
325
    x = to_bytes(x, 'utf8')
1✔
326
    return bytes(hashlib.sha256(x).digest())
1✔
327

328

329
def sha256d(x: Union[bytes, str]) -> bytes:
1✔
330
    x = to_bytes(x, 'utf8')
1✔
331
    out = bytes(sha256(sha256(x)))
1✔
332
    return out
1✔
333

334

335
def hash_160(x: bytes) -> bytes:
1✔
336
    return ripemd(sha256(x))
1✔
337

338
def ripemd(x: bytes) -> bytes:
1✔
339
    try:
1✔
340
        md = hashlib.new('ripemd160')
1✔
341
        md.update(x)
1✔
342
        return md.digest()
1✔
343
    except BaseException:
×
344
        # ripemd160 is not guaranteed to be available in hashlib on all platforms.
345
        # Historically, our Android builds had hashlib/openssl which did not have it.
346
        # see https://github.com/spesmilo/electrum/issues/7093
347
        # We bundle a pure python implementation as fallback that gets used now:
348
        from . import ripemd
×
349
        md = ripemd.new(x)
×
350
        return md.digest()
×
351

352

353
def hmac_oneshot(key: bytes, msg: bytes, digest) -> bytes:
1✔
354
    return hmac.digest(key, msg, digest)
1✔
355

356

357
def chacha20_poly1305_encrypt(
1✔
358
        *,
359
        key: bytes,
360
        nonce: bytes,
361
        associated_data: bytes = None,
362
        data: bytes
363
) -> bytes:
364
    assert isinstance(key, (bytes, bytearray))
1✔
365
    assert isinstance(nonce, (bytes, bytearray))
1✔
366
    assert isinstance(associated_data, (bytes, bytearray, type(None)))
1✔
367
    assert isinstance(data, (bytes, bytearray))
1✔
368
    assert len(key) == 32, f"unexpected key size: {len(key)} (expected: 32)"
1✔
369
    assert len(nonce) == 12, f"unexpected nonce size: {len(nonce)} (expected: 12)"
1✔
370
    if HAS_CRYPTODOME:
1✔
371
        cipher = CD_ChaCha20_Poly1305.new(key=key, nonce=nonce)
1✔
372
        if associated_data is not None:
1✔
373
            cipher.update(associated_data)
1✔
374
        ciphertext, mac = cipher.encrypt_and_digest(plaintext=data)
1✔
375
        return ciphertext + mac
1✔
376
    if HAS_CRYPTOGRAPHY:
1✔
377
        a = CG_aead.ChaCha20Poly1305(key)
1✔
378
        return a.encrypt(nonce, data, associated_data)
1✔
379
    raise Exception("no chacha20 backend found")
×
380

381

382
def chacha20_poly1305_decrypt(
1✔
383
        *,
384
        key: bytes,
385
        nonce: bytes,
386
        associated_data: bytes = None,
387
        data: bytes
388
) -> bytes:
389
    assert isinstance(key, (bytes, bytearray))
1✔
390
    assert isinstance(nonce, (bytes, bytearray))
1✔
391
    assert isinstance(associated_data, (bytes, bytearray, type(None)))
1✔
392
    assert isinstance(data, (bytes, bytearray))
1✔
393
    assert len(key) == 32, f"unexpected key size: {len(key)} (expected: 32)"
1✔
394
    assert len(nonce) == 12, f"unexpected nonce size: {len(nonce)} (expected: 12)"
1✔
395
    if HAS_CRYPTODOME:
1✔
396
        cipher = CD_ChaCha20_Poly1305.new(key=key, nonce=nonce)
1✔
397
        if associated_data is not None:
1✔
398
            cipher.update(associated_data)
1✔
399
        # raises ValueError if not valid (e.g. incorrect MAC)
400
        return cipher.decrypt_and_verify(ciphertext=data[:-16], received_mac_tag=data[-16:])
1✔
401
    if HAS_CRYPTOGRAPHY:
1✔
402
        a = CG_aead.ChaCha20Poly1305(key)
1✔
403
        try:
1✔
404
            return a.decrypt(nonce, data, associated_data)
1✔
405
        except cryptography.exceptions.InvalidTag as e:
1✔
406
            raise ValueError("invalid tag") from e
1✔
407
    raise Exception("no chacha20 backend found")
×
408

409

410
def chacha20_encrypt(*, key: bytes, nonce: bytes, data: bytes) -> bytes:
1✔
411
    """note: for any new protocol you design, please consider using chacha20_poly1305_encrypt instead
412
             (for its Authenticated Encryption property).
413
    """
414
    assert isinstance(key, (bytes, bytearray))
1✔
415
    assert isinstance(nonce, (bytes, bytearray))
1✔
416
    assert isinstance(data, (bytes, bytearray))
1✔
417
    assert len(key) == 32, f"unexpected key size: {len(key)} (expected: 32)"
1✔
418
    assert len(nonce) in (8, 12), f"unexpected nonce size: {len(nonce)} (expected: 8 or 12)"
1✔
419
    if HAS_CRYPTODOME:
1✔
420
        cipher = CD_ChaCha20.new(key=key, nonce=nonce)
1✔
421
        return cipher.encrypt(data)
1✔
422
    if HAS_CRYPTOGRAPHY:
1✔
423
        nonce = bytes(16 - len(nonce)) + nonce  # cryptography wants 16 byte nonces
1✔
424
        algo = CG_algorithms.ChaCha20(key=key, nonce=nonce)
1✔
425
        cipher = CG_Cipher(algo, mode=None, backend=CG_default_backend())
1✔
426
        encryptor = cipher.encryptor()
1✔
427
        return encryptor.update(data)
1✔
428
    raise Exception("no chacha20 backend found")
×
429

430

431
def chacha20_decrypt(*, key: bytes, nonce: bytes, data: bytes) -> bytes:
1✔
432
    assert isinstance(key, (bytes, bytearray))
1✔
433
    assert isinstance(nonce, (bytes, bytearray))
1✔
434
    assert isinstance(data, (bytes, bytearray))
1✔
435
    assert len(key) == 32, f"unexpected key size: {len(key)} (expected: 32)"
1✔
436
    assert len(nonce) in (8, 12), f"unexpected nonce size: {len(nonce)} (expected: 8 or 12)"
1✔
437
    if HAS_CRYPTODOME:
1✔
438
        cipher = CD_ChaCha20.new(key=key, nonce=nonce)
1✔
439
        return cipher.decrypt(data)
1✔
440
    if HAS_CRYPTOGRAPHY:
1✔
441
        nonce = bytes(16 - len(nonce)) + nonce  # cryptography wants 16 byte nonces
1✔
442
        algo = CG_algorithms.ChaCha20(key=key, nonce=nonce)
1✔
443
        cipher = CG_Cipher(algo, mode=None, backend=CG_default_backend())
1✔
444
        decryptor = cipher.decryptor()
1✔
445
        return decryptor.update(data)
1✔
446
    raise Exception("no chacha20 backend found")
×
447

448

449
def ecies_encrypt_message(
1✔
450
    ec_pubkey: 'ecc.ECPubkey',
451
    message: bytes,
452
    *,
453
    magic: bytes = b'BIE1',
454
) -> bytes:
455
    """
456
        ECIES encryption/decryption methods; AES-128-CBC with PKCS7 is used as the cipher; hmac-sha256 is used as the mac
457
    """
458
    assert_bytes(message)
1✔
459
    ephemeral = ecc.ECPrivkey.generate_random_key()
1✔
460
    ecdh_key = (ec_pubkey * ephemeral.secret_scalar).get_public_key_bytes(compressed=True)
1✔
461
    key = hashlib.sha512(ecdh_key).digest()
1✔
462
    iv, key_e, key_m = key[0:16], key[16:32], key[32:]
1✔
463
    ciphertext = aes_encrypt_with_iv(key_e, iv, message)
1✔
464
    ephemeral_pubkey = ephemeral.get_public_key_bytes(compressed=True)
1✔
465
    encrypted = magic + ephemeral_pubkey + ciphertext
1✔
466
    mac = hmac_oneshot(key_m, encrypted, hashlib.sha256)
1✔
467
    return base64.b64encode(encrypted + mac)
1✔
468

469

470
def ecies_decrypt_message(
1✔
471
    ec_privkey: 'ecc.ECPrivkey',
472
    encrypted: Union[str, bytes],
473
    *,
474
    magic: bytes = b'BIE1',
475
) -> bytes:
476
    encrypted = base64.b64decode(encrypted, validate=True)  # type: bytes
1✔
477
    if len(encrypted) < 85:
1✔
478
        raise Exception('invalid ciphertext: length')
×
479
    magic_found = encrypted[:4]
1✔
480
    ephemeral_pubkey_bytes = encrypted[4:37]
1✔
481
    ciphertext = encrypted[37:-32]
1✔
482
    mac = encrypted[-32:]
1✔
483
    if magic_found != magic:
1✔
484
        raise Exception('invalid ciphertext: invalid magic bytes')
×
485
    try:
1✔
486
        ephemeral_pubkey = ecc.ECPubkey(ephemeral_pubkey_bytes)
1✔
487
    except ecc.InvalidECPointException as e:
×
488
        raise Exception('invalid ciphertext: invalid ephemeral pubkey') from e
×
489
    ecdh_key = (ephemeral_pubkey * ec_privkey.secret_scalar).get_public_key_bytes(compressed=True)
1✔
490
    key = hashlib.sha512(ecdh_key).digest()
1✔
491
    iv, key_e, key_m = key[0:16], key[16:32], key[32:]
1✔
492
    if mac != hmac_oneshot(key_m, encrypted[:-32], hashlib.sha256):
1✔
493
        raise InvalidPassword()
1✔
494
    return aes_decrypt_with_iv(key_e, iv, ciphertext)
1✔
495

496

497
def get_ecdh(priv: bytes, pub: bytes) -> bytes:
1✔
498
    pt = ecc.ECPubkey(pub) * ecc.string_to_number(priv)
1✔
499
    return sha256(pt.get_public_key_bytes())
1✔
500

501
def privkey_to_pubkey(priv: bytes) -> bytes:
1✔
502
    return ecc.ECPrivkey(priv[:32]).get_public_key_bytes()
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc