• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 6373379927703552

10 Apr 2025 01:10PM UTC coverage: 60.334%. Remained the same
6373379927703552

push

CirrusCI

SomberNight
tests: util.custom_task_factory: fix res warn "coro was never awaited"

follow-up https://github.com/spesmilo/electrum/commit/70d1e1170

```
=============================== warnings summary ===============================
tests/test_util.py::TestUtil::test_custom_task_factory
  /tmp/cirrus-ci-build/tests/test_util.py:504: RuntimeWarning: coroutine 'TestUtil.test_custom_task_factory.<locals>.foo' was never awaited
    self.assertEqual(foo().__qualname__, task.get_coro().__qualname__)
  Enable tracemalloc to get traceback where the object was allocated.
  See https://docs.pytest.org/en/stable/how-to/capture-warnings.html#resource-warnings for more info.

```

21537 of 35696 relevant lines covered (60.33%)

3.01 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.53
/electrum/crypto.py
1
# -*- coding: utf-8 -*-
2
#
3
# Electrum - lightweight Bitcoin client
4
# Copyright (C) 2018 The Electrum developers
5
#
6
# Permission is hereby granted, free of charge, to any person
7
# obtaining a copy of this software and associated documentation files
8
# (the "Software"), to deal in the Software without restriction,
9
# including without limitation the rights to use, copy, modify, merge,
10
# publish, distribute, sublicense, and/or sell copies of the Software,
11
# and to permit persons to whom the Software is furnished to do so,
12
# subject to the following conditions:
13
#
14
# The above copyright notice and this permission notice shall be
15
# included in all copies or substantial portions of the Software.
16
#
17
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
21
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
22
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
23
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24
# SOFTWARE.
25

26
import base64
5✔
27
import binascii
5✔
28
import os
5✔
29
import sys
5✔
30
import hashlib
5✔
31
import hmac
5✔
32
from typing import Union, Mapping, Optional
5✔
33

34
import electrum_ecc as ecc
5✔
35

36
from .util import assert_bytes, InvalidPassword, to_bytes, to_string, WalletFileException, versiontuple
5✔
37
from .i18n import _
5✔
38
from .logging import get_logger
5✔
39

40
_logger = get_logger(__name__)
5✔
41

42

43
HAS_PYAES = False
5✔
44
try:
5✔
45
    import pyaes
5✔
46
except Exception:
×
47
    pass
×
48
else:
49
    HAS_PYAES = True
5✔
50

51
HAS_CRYPTODOME = False
5✔
52
MIN_CRYPTODOME_VERSION = "3.7"
5✔
53
try:
5✔
54
    import Cryptodome
5✔
55
    if versiontuple(Cryptodome.__version__) < versiontuple(MIN_CRYPTODOME_VERSION):
5✔
56
        _logger.warning(f"found module 'Cryptodome' but it is too old: {Cryptodome.__version__}<{MIN_CRYPTODOME_VERSION}")
×
57
        raise Exception()
×
58
    from Cryptodome.Cipher import ChaCha20_Poly1305 as CD_ChaCha20_Poly1305
5✔
59
    from Cryptodome.Cipher import ChaCha20 as CD_ChaCha20
5✔
60
    from Cryptodome.Cipher import AES as CD_AES
5✔
61
except Exception:
×
62
    pass
×
63
else:
64
    HAS_CRYPTODOME = True
5✔
65

66
HAS_CRYPTOGRAPHY = False
5✔
67
MIN_CRYPTOGRAPHY_VERSION = "2.1"
5✔
68
try:
5✔
69
    import cryptography
5✔
70
    if versiontuple(cryptography.__version__) < versiontuple(MIN_CRYPTOGRAPHY_VERSION):
5✔
71
        _logger.warning(f"found module 'cryptography' but it is too old: {cryptography.__version__}<{MIN_CRYPTOGRAPHY_VERSION}")
×
72
        raise Exception()
×
73
    from cryptography import exceptions
5✔
74
    from cryptography.hazmat.primitives.ciphers import Cipher as CG_Cipher
5✔
75
    from cryptography.hazmat.primitives.ciphers import algorithms as CG_algorithms
5✔
76
    from cryptography.hazmat.primitives.ciphers import modes as CG_modes
5✔
77
    from cryptography.hazmat.backends import default_backend as CG_default_backend
5✔
78
    import cryptography.hazmat.primitives.ciphers.aead as CG_aead
5✔
79
except Exception:
×
80
    pass
×
81
else:
82
    HAS_CRYPTOGRAPHY = True
5✔
83

84

85
if not (HAS_CRYPTODOME or HAS_CRYPTOGRAPHY):
5✔
86
    sys.exit(f"Error: at least one of ('pycryptodomex', 'cryptography') needs to be installed.")
×
87

88

89
def version_info() -> Mapping[str, Optional[str]]:
5✔
90
    ret = {}
×
91
    if HAS_PYAES:
×
92
        ret["pyaes.version"] = ".".join(map(str, pyaes.VERSION[:3]))
×
93
    else:
94
        ret["pyaes.version"] = None
×
95
    if HAS_CRYPTODOME:
×
96
        ret["cryptodome.version"] = Cryptodome.__version__
×
97
        if hasattr(Cryptodome, "__path__"):
×
98
            ret["cryptodome.path"] = ", ".join(Cryptodome.__path__ or [])
×
99
    else:
100
        ret["cryptodome.version"] = None
×
101
    if HAS_CRYPTOGRAPHY:
×
102
        ret["cryptography.version"] = cryptography.__version__
×
103
        if hasattr(cryptography, "__path__"):
×
104
            ret["cryptography.path"] = ", ".join(cryptography.__path__ or [])
×
105
    else:
106
        ret["cryptography.version"] = None
×
107
    return ret
×
108

109

110
class InvalidPadding(Exception):
5✔
111
    pass
5✔
112

113

114
class CiphertextFormatError(Exception):
5✔
115
    pass
5✔
116

117

118
def append_PKCS7_padding(data: bytes) -> bytes:
5✔
119
    assert_bytes(data)
5✔
120
    padlen = 16 - (len(data) % 16)
5✔
121
    return data + bytes([padlen]) * padlen
5✔
122

123

124
def strip_PKCS7_padding(data: bytes) -> bytes:
5✔
125
    assert_bytes(data)
5✔
126
    if len(data) % 16 != 0 or len(data) == 0:
5✔
127
        raise InvalidPadding("invalid length")
×
128
    padlen = data[-1]
5✔
129
    if not (0 < padlen <= 16):
5✔
130
        raise InvalidPadding("invalid padding byte (out of range)")
5✔
131
    for i in data[-padlen:]:
5✔
132
        if i != padlen:
5✔
133
            raise InvalidPadding("invalid padding byte (inconsistent)")
3✔
134
    return data[0:-padlen]
5✔
135

136

137
def aes_encrypt_with_iv(key: bytes, iv: bytes, data: bytes) -> bytes:
5✔
138
    assert_bytes(key, iv, data)
5✔
139
    data = append_PKCS7_padding(data)
5✔
140
    if HAS_CRYPTODOME:
5✔
141
        e = CD_AES.new(key, CD_AES.MODE_CBC, iv).encrypt(data)
5✔
142
    elif HAS_CRYPTOGRAPHY:
5✔
143
        cipher = CG_Cipher(CG_algorithms.AES(key), CG_modes.CBC(iv), backend=CG_default_backend())
5✔
144
        encryptor = cipher.encryptor()
5✔
145
        e = encryptor.update(data) + encryptor.finalize()
5✔
146
    elif HAS_PYAES:
5✔
147
        aes_cbc = pyaes.AESModeOfOperationCBC(key, iv=iv)
5✔
148
        aes = pyaes.Encrypter(aes_cbc, padding=pyaes.PADDING_NONE)
5✔
149
        e = aes.feed(data) + aes.feed()  # empty aes.feed() flushes buffer
5✔
150
    else:
151
        raise Exception("no AES backend found")
×
152
    return e
5✔
153

154

155
def aes_decrypt_with_iv(key: bytes, iv: bytes, data: bytes) -> bytes:
5✔
156
    assert_bytes(key, iv, data)
5✔
157
    if HAS_CRYPTODOME:
5✔
158
        cipher = CD_AES.new(key, CD_AES.MODE_CBC, iv)
5✔
159
        data = cipher.decrypt(data)
5✔
160
    elif HAS_CRYPTOGRAPHY:
5✔
161
        cipher = CG_Cipher(CG_algorithms.AES(key), CG_modes.CBC(iv), backend=CG_default_backend())
5✔
162
        decryptor = cipher.decryptor()
5✔
163
        data = decryptor.update(data) + decryptor.finalize()
5✔
164
    elif HAS_PYAES:
5✔
165
        aes_cbc = pyaes.AESModeOfOperationCBC(key, iv=iv)
5✔
166
        aes = pyaes.Decrypter(aes_cbc, padding=pyaes.PADDING_NONE)
5✔
167
        data = aes.feed(data) + aes.feed()  # empty aes.feed() flushes buffer
5✔
168
    else:
169
        raise Exception("no AES backend found")
×
170
    try:
5✔
171
        return strip_PKCS7_padding(data)
5✔
172
    except InvalidPadding:
5✔
173
        raise InvalidPassword()
5✔
174

175

176
def EncodeAES_bytes(secret: bytes, msg: bytes) -> bytes:
5✔
177
    assert_bytes(msg)
5✔
178
    iv = bytes(os.urandom(16))
5✔
179
    ct = aes_encrypt_with_iv(secret, iv, msg)
5✔
180
    return iv + ct
5✔
181

182

183
def DecodeAES_bytes(secret: bytes, ciphertext: bytes) -> bytes:
5✔
184
    assert_bytes(ciphertext)
5✔
185
    iv, e = ciphertext[:16], ciphertext[16:]
5✔
186
    s = aes_decrypt_with_iv(secret, iv, e)
5✔
187
    return s
5✔
188

189

190
PW_HASH_VERSION_LATEST = 1
5✔
191
KNOWN_PW_HASH_VERSIONS = (1, 2,)
5✔
192
SUPPORTED_PW_HASH_VERSIONS = (1,)
5✔
193
assert PW_HASH_VERSION_LATEST in KNOWN_PW_HASH_VERSIONS
5✔
194
assert PW_HASH_VERSION_LATEST in SUPPORTED_PW_HASH_VERSIONS
5✔
195

196

197
class UnexpectedPasswordHashVersion(InvalidPassword, WalletFileException):
5✔
198
    def __init__(self, version):
5✔
199
        InvalidPassword.__init__(self)
×
200
        WalletFileException.__init__(self)
×
201
        self.version = version
×
202

203
    def __str__(self):
5✔
204
        return "{unexpected}: {version}\n{instruction}".format(
×
205
            unexpected=_("Unexpected password hash version"),
206
            version=self.version,
207
            instruction=_('You are most likely using an outdated version of Electrum. Please update.'))
208

209

210
class UnsupportedPasswordHashVersion(InvalidPassword, WalletFileException):
5✔
211
    def __init__(self, version):
5✔
212
        InvalidPassword.__init__(self)
×
213
        WalletFileException.__init__(self)
×
214
        self.version = version
×
215

216
    def __str__(self):
5✔
217
        return "{unsupported}: {version}\n{instruction}".format(
×
218
            unsupported=_("Unsupported password hash version"),
219
            version=self.version,
220
            instruction=f"To open this wallet, try 'git checkout password_v{self.version}'.\n"
221
                        "Alternatively, restore from seed.")
222

223

224
def _hash_password(password: Union[bytes, str], *, version: int) -> bytes:
5✔
225
    pw = to_bytes(password, 'utf8')
5✔
226
    if version not in SUPPORTED_PW_HASH_VERSIONS:
5✔
227
        raise UnsupportedPasswordHashVersion(version)
×
228
    if version == 1:
5✔
229
        return sha256d(pw)
5✔
230
    else:
231
        assert version not in KNOWN_PW_HASH_VERSIONS
×
232
        raise UnexpectedPasswordHashVersion(version)
×
233

234

235
def _pw_encode_raw(data: bytes, password: Union[bytes, str], *, version: int) -> bytes:
5✔
236
    if version not in KNOWN_PW_HASH_VERSIONS:
5✔
237
        raise UnexpectedPasswordHashVersion(version)
×
238
    # derive key from password
239
    secret = _hash_password(password, version=version)
5✔
240
    # encrypt given data
241
    ciphertext = EncodeAES_bytes(secret, data)
5✔
242
    return ciphertext
5✔
243

244

245
def _pw_decode_raw(data_bytes: bytes, password: Union[bytes, str], *, version: int) -> bytes:
5✔
246
    if version not in KNOWN_PW_HASH_VERSIONS:
5✔
247
        raise UnexpectedPasswordHashVersion(version)
×
248
    # derive key from password
249
    secret = _hash_password(password, version=version)
5✔
250
    # decrypt given data
251
    try:
5✔
252
        d = DecodeAES_bytes(secret, data_bytes)
5✔
253
    except Exception as e:
5✔
254
        raise InvalidPassword() from e
5✔
255
    return d
5✔
256

257

258
def pw_encode_bytes(data: bytes, password: Union[bytes, str], *, version: int) -> str:
5✔
259
    """plaintext bytes -> base64 ciphertext"""
260
    ciphertext = _pw_encode_raw(data, password, version=version)
5✔
261
    ciphertext_b64 = base64.b64encode(ciphertext)
5✔
262
    return ciphertext_b64.decode('utf8')
5✔
263

264

265
def pw_decode_bytes(data: str, password: Union[bytes, str], *, version:int) -> bytes:
5✔
266
    """base64 ciphertext -> plaintext bytes"""
267
    if version not in KNOWN_PW_HASH_VERSIONS:
5✔
268
        raise UnexpectedPasswordHashVersion(version)
×
269
    try:
5✔
270
        data_bytes = bytes(base64.b64decode(data, validate=True))
5✔
271
    except binascii.Error as e:
×
272
        raise CiphertextFormatError("ciphertext not valid base64") from e
×
273
    return _pw_decode_raw(data_bytes, password, version=version)
5✔
274

275

276
def pw_encode_with_version_and_mac(data: bytes, password: Union[bytes, str]) -> str:
5✔
277
    """plaintext bytes -> base64 ciphertext"""
278
    # https://crypto.stackexchange.com/questions/202/should-we-mac-then-encrypt-or-encrypt-then-mac
279
    # Encrypt-and-MAC. The MAC will be used to detect invalid passwords
280
    version = PW_HASH_VERSION_LATEST
×
281
    mac = sha256(data)[0:4]
×
282
    ciphertext = _pw_encode_raw(data, password, version=version)
×
283
    ciphertext_b64 = base64.b64encode(bytes([version]) + ciphertext + mac)
×
284
    return ciphertext_b64.decode('utf8')
×
285

286

287
def pw_decode_with_version_and_mac(data: str, password: Union[bytes, str]) -> bytes:
5✔
288
    """base64 ciphertext -> plaintext bytes"""
289
    try:
5✔
290
        data_bytes = bytes(base64.b64decode(data, validate=True))
5✔
291
    except binascii.Error as e:
×
292
        raise CiphertextFormatError("ciphertext not valid base64") from e
×
293
    version = int(data_bytes[0])
5✔
294
    encrypted = data_bytes[1:-4]
5✔
295
    mac = data_bytes[-4:]
5✔
296
    if version not in KNOWN_PW_HASH_VERSIONS:
5✔
297
        raise UnexpectedPasswordHashVersion(version)
×
298
    decrypted = _pw_decode_raw(encrypted, password, version=version)
5✔
299
    if sha256(decrypted)[0:4] != mac:
5✔
300
        raise InvalidPassword()
×
301
    return decrypted
5✔
302

303

304
def pw_encode(data: str, password: Union[bytes, str, None], *, version: int) -> str:
5✔
305
    """plaintext str -> base64 ciphertext"""
306
    if not password:
5✔
307
        return data
5✔
308
    plaintext_bytes = to_bytes(data, "utf8")
5✔
309
    return pw_encode_bytes(plaintext_bytes, password, version=version)
5✔
310

311

312
def pw_decode(data: str, password: Union[bytes, str, None], *, version: int) -> str:
5✔
313
    """base64 ciphertext -> plaintext str"""
314
    if password is None:
5✔
315
        return data
5✔
316
    plaintext_bytes = pw_decode_bytes(data, password, version=version)
5✔
317
    try:
5✔
318
        plaintext_str = to_string(plaintext_bytes, "utf8")
5✔
319
    except UnicodeDecodeError as e:
5✔
320
        raise InvalidPassword() from e
5✔
321
    return plaintext_str
5✔
322

323

324
def sha256(x: Union[bytes, str]) -> bytes:
5✔
325
    x = to_bytes(x, 'utf8')
5✔
326
    return bytes(hashlib.sha256(x).digest())
5✔
327

328

329
def sha256d(x: Union[bytes, str]) -> bytes:
5✔
330
    x = to_bytes(x, 'utf8')
5✔
331
    out = bytes(sha256(sha256(x)))
5✔
332
    return out
5✔
333

334

335
def hash_160(x: bytes) -> bytes:
5✔
336
    return ripemd(sha256(x))
5✔
337

338
def ripemd(x: bytes) -> bytes:
5✔
339
    try:
5✔
340
        md = hashlib.new('ripemd160')
5✔
341
        md.update(x)
5✔
342
        return md.digest()
5✔
343
    except BaseException:
×
344
        # ripemd160 is not guaranteed to be available in hashlib on all platforms.
345
        # Historically, our Android builds had hashlib/openssl which did not have it.
346
        # see https://github.com/spesmilo/electrum/issues/7093
347
        # We bundle a pure python implementation as fallback that gets used now:
348
        from . import ripemd
×
349
        md = ripemd.new(x)
×
350
        return md.digest()
×
351

352

353
def hmac_oneshot(key: bytes, msg: bytes, digest) -> bytes:
5✔
354
    return hmac.digest(key, msg, digest)
5✔
355

356

357
def chacha20_poly1305_encrypt(
5✔
358
        *,
359
        key: bytes,
360
        nonce: bytes,
361
        associated_data: bytes = None,
362
        data: bytes
363
) -> bytes:
364
    assert isinstance(key, (bytes, bytearray))
5✔
365
    assert isinstance(nonce, (bytes, bytearray))
5✔
366
    assert isinstance(associated_data, (bytes, bytearray, type(None)))
5✔
367
    assert isinstance(data, (bytes, bytearray))
5✔
368
    assert len(key) == 32, f"unexpected key size: {len(key)} (expected: 32)"
5✔
369
    assert len(nonce) == 12, f"unexpected nonce size: {len(nonce)} (expected: 12)"
5✔
370
    if HAS_CRYPTODOME:
5✔
371
        cipher = CD_ChaCha20_Poly1305.new(key=key, nonce=nonce)
5✔
372
        if associated_data is not None:
5✔
373
            cipher.update(associated_data)
5✔
374
        ciphertext, mac = cipher.encrypt_and_digest(plaintext=data)
5✔
375
        return ciphertext + mac
5✔
376
    if HAS_CRYPTOGRAPHY:
5✔
377
        a = CG_aead.ChaCha20Poly1305(key)
5✔
378
        return a.encrypt(nonce, data, associated_data)
5✔
379
    raise Exception("no chacha20 backend found")
×
380

381

382
def chacha20_poly1305_decrypt(
5✔
383
        *,
384
        key: bytes,
385
        nonce: bytes,
386
        associated_data: bytes = None,
387
        data: bytes
388
) -> bytes:
389
    assert isinstance(key, (bytes, bytearray))
5✔
390
    assert isinstance(nonce, (bytes, bytearray))
5✔
391
    assert isinstance(associated_data, (bytes, bytearray, type(None)))
5✔
392
    assert isinstance(data, (bytes, bytearray))
5✔
393
    assert len(key) == 32, f"unexpected key size: {len(key)} (expected: 32)"
5✔
394
    assert len(nonce) == 12, f"unexpected nonce size: {len(nonce)} (expected: 12)"
5✔
395
    if HAS_CRYPTODOME:
5✔
396
        cipher = CD_ChaCha20_Poly1305.new(key=key, nonce=nonce)
5✔
397
        if associated_data is not None:
5✔
398
            cipher.update(associated_data)
5✔
399
        # raises ValueError if not valid (e.g. incorrect MAC)
400
        return cipher.decrypt_and_verify(ciphertext=data[:-16], received_mac_tag=data[-16:])
5✔
401
    if HAS_CRYPTOGRAPHY:
5✔
402
        a = CG_aead.ChaCha20Poly1305(key)
5✔
403
        try:
5✔
404
            return a.decrypt(nonce, data, associated_data)
5✔
405
        except cryptography.exceptions.InvalidTag as e:
5✔
406
            raise ValueError("invalid tag") from e
5✔
407
    raise Exception("no chacha20 backend found")
×
408

409

410
def chacha20_encrypt(*, key: bytes, nonce: bytes, data: bytes) -> bytes:
5✔
411
    """note: for any new protocol you design, please consider using chacha20_poly1305_encrypt instead
412
             (for its Authenticated Encryption property).
413
    """
414
    assert isinstance(key, (bytes, bytearray))
5✔
415
    assert isinstance(nonce, (bytes, bytearray))
5✔
416
    assert isinstance(data, (bytes, bytearray))
5✔
417
    assert len(key) == 32, f"unexpected key size: {len(key)} (expected: 32)"
5✔
418
    assert len(nonce) in (8, 12), f"unexpected nonce size: {len(nonce)} (expected: 8 or 12)"
5✔
419
    if HAS_CRYPTODOME:
5✔
420
        cipher = CD_ChaCha20.new(key=key, nonce=nonce)
5✔
421
        return cipher.encrypt(data)
5✔
422
    if HAS_CRYPTOGRAPHY:
5✔
423
        nonce = bytes(16 - len(nonce)) + nonce  # cryptography wants 16 byte nonces
5✔
424
        algo = CG_algorithms.ChaCha20(key=key, nonce=nonce)
5✔
425
        cipher = CG_Cipher(algo, mode=None, backend=CG_default_backend())
5✔
426
        encryptor = cipher.encryptor()
5✔
427
        return encryptor.update(data)
5✔
428
    raise Exception("no chacha20 backend found")
×
429

430

431
def chacha20_decrypt(*, key: bytes, nonce: bytes, data: bytes) -> bytes:
5✔
432
    assert isinstance(key, (bytes, bytearray))
5✔
433
    assert isinstance(nonce, (bytes, bytearray))
5✔
434
    assert isinstance(data, (bytes, bytearray))
5✔
435
    assert len(key) == 32, f"unexpected key size: {len(key)} (expected: 32)"
5✔
436
    assert len(nonce) in (8, 12), f"unexpected nonce size: {len(nonce)} (expected: 8 or 12)"
5✔
437
    if HAS_CRYPTODOME:
5✔
438
        cipher = CD_ChaCha20.new(key=key, nonce=nonce)
5✔
439
        return cipher.decrypt(data)
5✔
440
    if HAS_CRYPTOGRAPHY:
5✔
441
        nonce = bytes(16 - len(nonce)) + nonce  # cryptography wants 16 byte nonces
5✔
442
        algo = CG_algorithms.ChaCha20(key=key, nonce=nonce)
5✔
443
        cipher = CG_Cipher(algo, mode=None, backend=CG_default_backend())
5✔
444
        decryptor = cipher.decryptor()
5✔
445
        return decryptor.update(data)
5✔
446
    raise Exception("no chacha20 backend found")
×
447

448

449
def ecies_encrypt_message(
5✔
450
    ec_pubkey: 'ecc.ECPubkey',
451
    message: bytes,
452
    *,
453
    magic: bytes = b'BIE1',
454
) -> bytes:
455
    """
456
        ECIES encryption/decryption methods; AES-128-CBC with PKCS7 is used as the cipher; hmac-sha256 is used as the mac
457
    """
458
    assert_bytes(message)
5✔
459
    ephemeral = ecc.ECPrivkey.generate_random_key()
5✔
460
    ecdh_key = (ec_pubkey * ephemeral.secret_scalar).get_public_key_bytes(compressed=True)
5✔
461
    key = hashlib.sha512(ecdh_key).digest()
5✔
462
    iv, key_e, key_m = key[0:16], key[16:32], key[32:]
5✔
463
    ciphertext = aes_encrypt_with_iv(key_e, iv, message)
5✔
464
    ephemeral_pubkey = ephemeral.get_public_key_bytes(compressed=True)
5✔
465
    encrypted = magic + ephemeral_pubkey + ciphertext
5✔
466
    mac = hmac_oneshot(key_m, encrypted, hashlib.sha256)
5✔
467
    return base64.b64encode(encrypted + mac)
5✔
468

469

470
def ecies_decrypt_message(
5✔
471
    ec_privkey: 'ecc.ECPrivkey',
472
    encrypted: Union[str, bytes],
473
    *,
474
    magic: bytes = b'BIE1',
475
) -> bytes:
476
    encrypted = base64.b64decode(encrypted)  # type: bytes
5✔
477
    if len(encrypted) < 85:
5✔
478
        raise Exception('invalid ciphertext: length')
×
479
    magic_found = encrypted[:4]
5✔
480
    ephemeral_pubkey_bytes = encrypted[4:37]
5✔
481
    ciphertext = encrypted[37:-32]
5✔
482
    mac = encrypted[-32:]
5✔
483
    if magic_found != magic:
5✔
484
        raise Exception('invalid ciphertext: invalid magic bytes')
×
485
    try:
5✔
486
        ephemeral_pubkey = ecc.ECPubkey(ephemeral_pubkey_bytes)
5✔
487
    except ecc.InvalidECPointException as e:
×
488
        raise Exception('invalid ciphertext: invalid ephemeral pubkey') from e
×
489
    ecdh_key = (ephemeral_pubkey * ec_privkey.secret_scalar).get_public_key_bytes(compressed=True)
5✔
490
    key = hashlib.sha512(ecdh_key).digest()
5✔
491
    iv, key_e, key_m = key[0:16], key[16:32], key[32:]
5✔
492
    if mac != hmac_oneshot(key_m, encrypted[:-32], hashlib.sha256):
5✔
493
        raise InvalidPassword()
5✔
494
    return aes_decrypt_with_iv(key_e, iv, ciphertext)
5✔
495

496

497
def get_ecdh(priv: bytes, pub: bytes) -> bytes:
5✔
498
    pt = ecc.ECPubkey(pub) * ecc.string_to_number(priv)
5✔
499
    return sha256(pt.get_public_key_bytes())
5✔
500

501
def privkey_to_pubkey(priv: bytes) -> bytes:
5✔
502
    return ecc.ECPrivkey(priv[:32]).get_public_key_bytes()
5✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc