• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

spesmilo / electrum / 5304010765238272

17 Aug 2023 02:17PM UTC coverage: 59.027% (+0.02%) from 59.008%
5304010765238272

Pull #8493

CirrusCI

ecdsa
storage.append: fail if the file length is not what we expect
Pull Request #8493: partial-writes using jsonpatch

165 of 165 new or added lines in 9 files covered. (100.0%)

18653 of 31601 relevant lines covered (59.03%)

2.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

80.0
/electrum/crypto.py
1
# -*- coding: utf-8 -*-
2
#
3
# Electrum - lightweight Bitcoin client
4
# Copyright (C) 2018 The Electrum developers
5
#
6
# Permission is hereby granted, free of charge, to any person
7
# obtaining a copy of this software and associated documentation files
8
# (the "Software"), to deal in the Software without restriction,
9
# including without limitation the rights to use, copy, modify, merge,
10
# publish, distribute, sublicense, and/or sell copies of the Software,
11
# and to permit persons to whom the Software is furnished to do so,
12
# subject to the following conditions:
13
#
14
# The above copyright notice and this permission notice shall be
15
# included in all copies or substantial portions of the Software.
16
#
17
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
18
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
19
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
20
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
21
# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
22
# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
23
# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
24
# SOFTWARE.
25

26
import base64
5✔
27
import binascii
5✔
28
import os
5✔
29
import sys
5✔
30
import hashlib
5✔
31
import hmac
5✔
32
from typing import Union, Mapping, Optional
5✔
33

34
from .util import assert_bytes, InvalidPassword, to_bytes, to_string, WalletFileException, versiontuple
5✔
35
from .i18n import _
5✔
36
from .logging import get_logger
5✔
37

38

39
_logger = get_logger(__name__)
5✔
40

41

42
HAS_PYAES = False
5✔
43
try:
5✔
44
    import pyaes
5✔
45
except Exception:
×
46
    pass
×
47
else:
48
    HAS_PYAES = True
5✔
49

50
HAS_CRYPTODOME = False
5✔
51
MIN_CRYPTODOME_VERSION = "3.7"
5✔
52
try:
5✔
53
    import Cryptodome
5✔
54
    if versiontuple(Cryptodome.__version__) < versiontuple(MIN_CRYPTODOME_VERSION):
5✔
55
        _logger.warning(f"found module 'Cryptodome' but it is too old: {Cryptodome.__version__}<{MIN_CRYPTODOME_VERSION}")
×
56
        raise Exception()
×
57
    from Cryptodome.Cipher import ChaCha20_Poly1305 as CD_ChaCha20_Poly1305
5✔
58
    from Cryptodome.Cipher import ChaCha20 as CD_ChaCha20
5✔
59
    from Cryptodome.Cipher import AES as CD_AES
5✔
60
except Exception:
×
61
    pass
×
62
else:
63
    HAS_CRYPTODOME = True
5✔
64

65
HAS_CRYPTOGRAPHY = False
5✔
66
MIN_CRYPTOGRAPHY_VERSION = "2.1"
5✔
67
try:
5✔
68
    import cryptography
5✔
69
    if versiontuple(cryptography.__version__) < versiontuple(MIN_CRYPTOGRAPHY_VERSION):
5✔
70
        _logger.warning(f"found module 'cryptography' but it is too old: {cryptography.__version__}<{MIN_CRYPTOGRAPHY_VERSION}")
×
71
        raise Exception()
×
72
    from cryptography import exceptions
5✔
73
    from cryptography.hazmat.primitives.ciphers import Cipher as CG_Cipher
5✔
74
    from cryptography.hazmat.primitives.ciphers import algorithms as CG_algorithms
5✔
75
    from cryptography.hazmat.primitives.ciphers import modes as CG_modes
5✔
76
    from cryptography.hazmat.backends import default_backend as CG_default_backend
5✔
77
    import cryptography.hazmat.primitives.ciphers.aead as CG_aead
5✔
78
except Exception:
×
79
    pass
×
80
else:
81
    HAS_CRYPTOGRAPHY = True
5✔
82

83

84
if not (HAS_CRYPTODOME or HAS_CRYPTOGRAPHY):
5✔
85
    sys.exit(f"Error: at least one of ('pycryptodomex', 'cryptography') needs to be installed.")
×
86

87

88
def version_info() -> Mapping[str, Optional[str]]:
5✔
89
    ret = {}
×
90
    if HAS_PYAES:
×
91
        ret["pyaes.version"] = ".".join(map(str, pyaes.VERSION[:3]))
×
92
    else:
93
        ret["pyaes.version"] = None
×
94
    if HAS_CRYPTODOME:
×
95
        ret["cryptodome.version"] = Cryptodome.__version__
×
96
        if hasattr(Cryptodome, "__path__"):
×
97
            ret["cryptodome.path"] = ", ".join(Cryptodome.__path__ or [])
×
98
    else:
99
        ret["cryptodome.version"] = None
×
100
    if HAS_CRYPTOGRAPHY:
×
101
        ret["cryptography.version"] = cryptography.__version__
×
102
        if hasattr(cryptography, "__path__"):
×
103
            ret["cryptography.path"] = ", ".join(cryptography.__path__ or [])
×
104
    else:
105
        ret["cryptography.version"] = None
×
106
    return ret
×
107

108

109
class InvalidPadding(Exception):
5✔
110
    pass
5✔
111

112

113
class CiphertextFormatError(Exception):
5✔
114
    pass
5✔
115

116

117
def append_PKCS7_padding(data: bytes) -> bytes:
5✔
118
    assert_bytes(data)
5✔
119
    padlen = 16 - (len(data) % 16)
5✔
120
    return data + bytes([padlen]) * padlen
5✔
121

122

123
def strip_PKCS7_padding(data: bytes) -> bytes:
5✔
124
    assert_bytes(data)
5✔
125
    if len(data) % 16 != 0 or len(data) == 0:
5✔
126
        raise InvalidPadding("invalid length")
×
127
    padlen = data[-1]
5✔
128
    if not (0 < padlen <= 16):
5✔
129
        raise InvalidPadding("invalid padding byte (out of range)")
5✔
130
    for i in data[-padlen:]:
5✔
131
        if i != padlen:
5✔
132
            raise InvalidPadding("invalid padding byte (inconsistent)")
×
133
    return data[0:-padlen]
5✔
134

135

136
def aes_encrypt_with_iv(key: bytes, iv: bytes, data: bytes) -> bytes:
5✔
137
    assert_bytes(key, iv, data)
5✔
138
    data = append_PKCS7_padding(data)
5✔
139
    if HAS_CRYPTODOME:
5✔
140
        e = CD_AES.new(key, CD_AES.MODE_CBC, iv).encrypt(data)
5✔
141
    elif HAS_CRYPTOGRAPHY:
5✔
142
        cipher = CG_Cipher(CG_algorithms.AES(key), CG_modes.CBC(iv), backend=CG_default_backend())
5✔
143
        encryptor = cipher.encryptor()
5✔
144
        e = encryptor.update(data) + encryptor.finalize()
5✔
145
    elif HAS_PYAES:
5✔
146
        aes_cbc = pyaes.AESModeOfOperationCBC(key, iv=iv)
5✔
147
        aes = pyaes.Encrypter(aes_cbc, padding=pyaes.PADDING_NONE)
5✔
148
        e = aes.feed(data) + aes.feed()  # empty aes.feed() flushes buffer
5✔
149
    else:
150
        raise Exception("no AES backend found")
×
151
    return e
5✔
152

153

154
def aes_decrypt_with_iv(key: bytes, iv: bytes, data: bytes) -> bytes:
5✔
155
    assert_bytes(key, iv, data)
5✔
156
    if HAS_CRYPTODOME:
5✔
157
        cipher = CD_AES.new(key, CD_AES.MODE_CBC, iv)
5✔
158
        data = cipher.decrypt(data)
5✔
159
    elif HAS_CRYPTOGRAPHY:
5✔
160
        cipher = CG_Cipher(CG_algorithms.AES(key), CG_modes.CBC(iv), backend=CG_default_backend())
5✔
161
        decryptor = cipher.decryptor()
5✔
162
        data = decryptor.update(data) + decryptor.finalize()
5✔
163
    elif HAS_PYAES:
5✔
164
        aes_cbc = pyaes.AESModeOfOperationCBC(key, iv=iv)
5✔
165
        aes = pyaes.Decrypter(aes_cbc, padding=pyaes.PADDING_NONE)
5✔
166
        data = aes.feed(data) + aes.feed()  # empty aes.feed() flushes buffer
5✔
167
    else:
168
        raise Exception("no AES backend found")
×
169
    try:
5✔
170
        return strip_PKCS7_padding(data)
5✔
171
    except InvalidPadding:
5✔
172
        raise InvalidPassword()
5✔
173

174

175
def EncodeAES_bytes(secret: bytes, msg: bytes) -> bytes:
5✔
176
    assert_bytes(msg)
5✔
177
    iv = bytes(os.urandom(16))
5✔
178
    ct = aes_encrypt_with_iv(secret, iv, msg)
5✔
179
    return iv + ct
5✔
180

181

182
def DecodeAES_bytes(secret: bytes, ciphertext: bytes) -> bytes:
5✔
183
    assert_bytes(ciphertext)
5✔
184
    iv, e = ciphertext[:16], ciphertext[16:]
5✔
185
    s = aes_decrypt_with_iv(secret, iv, e)
5✔
186
    return s
5✔
187

188

189
PW_HASH_VERSION_LATEST = 1
5✔
190
KNOWN_PW_HASH_VERSIONS = (1, 2,)
5✔
191
SUPPORTED_PW_HASH_VERSIONS = (1,)
5✔
192
assert PW_HASH_VERSION_LATEST in KNOWN_PW_HASH_VERSIONS
5✔
193
assert PW_HASH_VERSION_LATEST in SUPPORTED_PW_HASH_VERSIONS
5✔
194

195

196
class UnexpectedPasswordHashVersion(InvalidPassword, WalletFileException):
5✔
197
    def __init__(self, version):
5✔
198
        self.version = version
×
199

200
    def __str__(self):
5✔
201
        return "{unexpected}: {version}\n{instruction}".format(
×
202
            unexpected=_("Unexpected password hash version"),
203
            version=self.version,
204
            instruction=_('You are most likely using an outdated version of Electrum. Please update.'))
205

206

207
class UnsupportedPasswordHashVersion(InvalidPassword, WalletFileException):
5✔
208
    def __init__(self, version):
5✔
209
        self.version = version
×
210

211
    def __str__(self):
5✔
212
        return "{unsupported}: {version}\n{instruction}".format(
×
213
            unsupported=_("Unsupported password hash version"),
214
            version=self.version,
215
            instruction=f"To open this wallet, try 'git checkout password_v{self.version}'.\n"
216
                        "Alternatively, restore from seed.")
217

218

219
def _hash_password(password: Union[bytes, str], *, version: int) -> bytes:
5✔
220
    pw = to_bytes(password, 'utf8')
5✔
221
    if version not in SUPPORTED_PW_HASH_VERSIONS:
5✔
222
        raise UnsupportedPasswordHashVersion(version)
×
223
    if version == 1:
5✔
224
        return sha256d(pw)
5✔
225
    else:
226
        assert version not in KNOWN_PW_HASH_VERSIONS
×
227
        raise UnexpectedPasswordHashVersion(version)
×
228

229

230
def _pw_encode_raw(data: bytes, password: Union[bytes, str], *, version: int) -> bytes:
5✔
231
    if version not in KNOWN_PW_HASH_VERSIONS:
5✔
232
        raise UnexpectedPasswordHashVersion(version)
×
233
    # derive key from password
234
    secret = _hash_password(password, version=version)
5✔
235
    # encrypt given data
236
    ciphertext = EncodeAES_bytes(secret, data)
5✔
237
    return ciphertext
5✔
238

239

240
def _pw_decode_raw(data_bytes: bytes, password: Union[bytes, str], *, version: int) -> bytes:
5✔
241
    if version not in KNOWN_PW_HASH_VERSIONS:
5✔
242
        raise UnexpectedPasswordHashVersion(version)
×
243
    # derive key from password
244
    secret = _hash_password(password, version=version)
5✔
245
    # decrypt given data
246
    try:
5✔
247
        d = DecodeAES_bytes(secret, data_bytes)
5✔
248
    except Exception as e:
5✔
249
        raise InvalidPassword() from e
5✔
250
    return d
5✔
251

252

253
def pw_encode_bytes(data: bytes, password: Union[bytes, str], *, version: int) -> str:
5✔
254
    """plaintext bytes -> base64 ciphertext"""
255
    ciphertext = _pw_encode_raw(data, password, version=version)
5✔
256
    ciphertext_b64 = base64.b64encode(ciphertext)
5✔
257
    return ciphertext_b64.decode('utf8')
5✔
258

259

260
def pw_decode_bytes(data: str, password: Union[bytes, str], *, version:int) -> bytes:
5✔
261
    """base64 ciphertext -> plaintext bytes"""
262
    if version not in KNOWN_PW_HASH_VERSIONS:
5✔
263
        raise UnexpectedPasswordHashVersion(version)
×
264
    try:
5✔
265
        data_bytes = bytes(base64.b64decode(data, validate=True))
5✔
266
    except binascii.Error as e:
×
267
        raise CiphertextFormatError("ciphertext not valid base64") from e
×
268
    return _pw_decode_raw(data_bytes, password, version=version)
5✔
269

270

271
def pw_encode_with_version_and_mac(data: bytes, password: Union[bytes, str]) -> str:
5✔
272
    """plaintext bytes -> base64 ciphertext"""
273
    # https://crypto.stackexchange.com/questions/202/should-we-mac-then-encrypt-or-encrypt-then-mac
274
    # Encrypt-and-MAC. The MAC will be used to detect invalid passwords
275
    version = PW_HASH_VERSION_LATEST
×
276
    mac = sha256(data)[0:4]
×
277
    ciphertext = _pw_encode_raw(data, password, version=version)
×
278
    ciphertext_b64 = base64.b64encode(bytes([version]) + ciphertext + mac)
×
279
    return ciphertext_b64.decode('utf8')
×
280

281

282
def pw_decode_with_version_and_mac(data: str, password: Union[bytes, str]) -> bytes:
5✔
283
    """base64 ciphertext -> plaintext bytes"""
284
    try:
5✔
285
        data_bytes = bytes(base64.b64decode(data, validate=True))
5✔
286
    except binascii.Error as e:
×
287
        raise CiphertextFormatError("ciphertext not valid base64") from e
×
288
    version = int(data_bytes[0])
5✔
289
    encrypted = data_bytes[1:-4]
5✔
290
    mac = data_bytes[-4:]
5✔
291
    if version not in KNOWN_PW_HASH_VERSIONS:
5✔
292
        raise UnexpectedPasswordHashVersion(version)
×
293
    decrypted = _pw_decode_raw(encrypted, password, version=version)
5✔
294
    if sha256(decrypted)[0:4] != mac:
5✔
295
        raise InvalidPassword()
×
296
    return decrypted
5✔
297

298

299
def pw_encode(data: str, password: Union[bytes, str, None], *, version: int) -> str:
5✔
300
    """plaintext str -> base64 ciphertext"""
301
    if not password:
5✔
302
        return data
5✔
303
    plaintext_bytes = to_bytes(data, "utf8")
5✔
304
    return pw_encode_bytes(plaintext_bytes, password, version=version)
5✔
305

306

307
def pw_decode(data: str, password: Union[bytes, str, None], *, version: int) -> str:
5✔
308
    """base64 ciphertext -> plaintext str"""
309
    if password is None:
5✔
310
        return data
5✔
311
    plaintext_bytes = pw_decode_bytes(data, password, version=version)
5✔
312
    try:
5✔
313
        plaintext_str = to_string(plaintext_bytes, "utf8")
5✔
314
    except UnicodeDecodeError as e:
5✔
315
        raise InvalidPassword() from e
5✔
316
    return plaintext_str
5✔
317

318

319
def sha256(x: Union[bytes, str]) -> bytes:
5✔
320
    x = to_bytes(x, 'utf8')
5✔
321
    return bytes(hashlib.sha256(x).digest())
5✔
322

323

324
def sha256d(x: Union[bytes, str]) -> bytes:
5✔
325
    x = to_bytes(x, 'utf8')
5✔
326
    out = bytes(sha256(sha256(x)))
5✔
327
    return out
5✔
328

329

330
def hash_160(x: bytes) -> bytes:
5✔
331
    return ripemd(sha256(x))
5✔
332

333
def ripemd(x):
5✔
334
    try:
5✔
335
        md = hashlib.new('ripemd160')
5✔
336
        md.update(x)
5✔
337
        return md.digest()
5✔
338
    except BaseException:
×
339
        # ripemd160 is not guaranteed to be available in hashlib on all platforms.
340
        # Historically, our Android builds had hashlib/openssl which did not have it.
341
        # see https://github.com/spesmilo/electrum/issues/7093
342
        # We bundle a pure python implementation as fallback that gets used now:
343
        from . import ripemd
×
344
        md = ripemd.new(x)
×
345
        return md.digest()
×
346

347
def hmac_oneshot(key: bytes, msg: bytes, digest) -> bytes:
5✔
348
    if hasattr(hmac, 'digest'):
5✔
349
        # requires python 3.7+; faster
350
        return hmac.digest(key, msg, digest)
5✔
351
    else:
352
        return hmac.new(key, msg, digest).digest()
×
353

354

355
def chacha20_poly1305_encrypt(
5✔
356
        *,
357
        key: bytes,
358
        nonce: bytes,
359
        associated_data: bytes = None,
360
        data: bytes
361
) -> bytes:
362
    assert isinstance(key, (bytes, bytearray))
5✔
363
    assert isinstance(nonce, (bytes, bytearray))
5✔
364
    assert isinstance(associated_data, (bytes, bytearray, type(None)))
5✔
365
    assert isinstance(data, (bytes, bytearray))
5✔
366
    assert len(key) == 32, f"unexpected key size: {len(key)} (expected: 32)"
5✔
367
    assert len(nonce) == 12, f"unexpected nonce size: {len(nonce)} (expected: 12)"
5✔
368
    if HAS_CRYPTODOME:
5✔
369
        cipher = CD_ChaCha20_Poly1305.new(key=key, nonce=nonce)
5✔
370
        if associated_data is not None:
5✔
371
            cipher.update(associated_data)
5✔
372
        ciphertext, mac = cipher.encrypt_and_digest(plaintext=data)
5✔
373
        return ciphertext + mac
5✔
374
    if HAS_CRYPTOGRAPHY:
5✔
375
        a = CG_aead.ChaCha20Poly1305(key)
5✔
376
        return a.encrypt(nonce, data, associated_data)
5✔
377
    raise Exception("no chacha20 backend found")
×
378

379

380
def chacha20_poly1305_decrypt(
5✔
381
        *,
382
        key: bytes,
383
        nonce: bytes,
384
        associated_data: bytes = None,
385
        data: bytes
386
) -> bytes:
387
    assert isinstance(key, (bytes, bytearray))
5✔
388
    assert isinstance(nonce, (bytes, bytearray))
5✔
389
    assert isinstance(associated_data, (bytes, bytearray, type(None)))
5✔
390
    assert isinstance(data, (bytes, bytearray))
5✔
391
    assert len(key) == 32, f"unexpected key size: {len(key)} (expected: 32)"
5✔
392
    assert len(nonce) == 12, f"unexpected nonce size: {len(nonce)} (expected: 12)"
5✔
393
    if HAS_CRYPTODOME:
5✔
394
        cipher = CD_ChaCha20_Poly1305.new(key=key, nonce=nonce)
5✔
395
        if associated_data is not None:
5✔
396
            cipher.update(associated_data)
5✔
397
        # raises ValueError if not valid (e.g. incorrect MAC)
398
        return cipher.decrypt_and_verify(ciphertext=data[:-16], received_mac_tag=data[-16:])
5✔
399
    if HAS_CRYPTOGRAPHY:
5✔
400
        a = CG_aead.ChaCha20Poly1305(key)
5✔
401
        try:
5✔
402
            return a.decrypt(nonce, data, associated_data)
5✔
403
        except cryptography.exceptions.InvalidTag as e:
5✔
404
            raise ValueError("invalid tag") from e
5✔
405
    raise Exception("no chacha20 backend found")
×
406

407

408
def chacha20_encrypt(*, key: bytes, nonce: bytes, data: bytes) -> bytes:
5✔
409
    assert isinstance(key, (bytes, bytearray))
5✔
410
    assert isinstance(nonce, (bytes, bytearray))
5✔
411
    assert isinstance(data, (bytes, bytearray))
5✔
412
    assert len(key) == 32, f"unexpected key size: {len(key)} (expected: 32)"
5✔
413
    assert len(nonce) in (8, 12), f"unexpected nonce size: {len(nonce)} (expected: 8 or 12)"
5✔
414
    if HAS_CRYPTODOME:
5✔
415
        cipher = CD_ChaCha20.new(key=key, nonce=nonce)
5✔
416
        return cipher.encrypt(data)
5✔
417
    if HAS_CRYPTOGRAPHY:
5✔
418
        nonce = bytes(16 - len(nonce)) + nonce  # cryptography wants 16 byte nonces
5✔
419
        algo = CG_algorithms.ChaCha20(key=key, nonce=nonce)
5✔
420
        cipher = CG_Cipher(algo, mode=None, backend=CG_default_backend())
5✔
421
        encryptor = cipher.encryptor()
5✔
422
        return encryptor.update(data)
5✔
423
    raise Exception("no chacha20 backend found")
×
424

425

426
def chacha20_decrypt(*, key: bytes, nonce: bytes, data: bytes) -> bytes:
5✔
427
    assert isinstance(key, (bytes, bytearray))
5✔
428
    assert isinstance(nonce, (bytes, bytearray))
5✔
429
    assert isinstance(data, (bytes, bytearray))
5✔
430
    assert len(key) == 32, f"unexpected key size: {len(key)} (expected: 32)"
5✔
431
    assert len(nonce) in (8, 12), f"unexpected nonce size: {len(nonce)} (expected: 8 or 12)"
5✔
432
    if HAS_CRYPTODOME:
5✔
433
        cipher = CD_ChaCha20.new(key=key, nonce=nonce)
5✔
434
        return cipher.decrypt(data)
5✔
435
    if HAS_CRYPTOGRAPHY:
5✔
436
        nonce = bytes(16 - len(nonce)) + nonce  # cryptography wants 16 byte nonces
5✔
437
        algo = CG_algorithms.ChaCha20(key=key, nonce=nonce)
5✔
438
        cipher = CG_Cipher(algo, mode=None, backend=CG_default_backend())
5✔
439
        decryptor = cipher.decryptor()
5✔
440
        return decryptor.update(data)
5✔
441
    raise Exception("no chacha20 backend found")
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc