• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

tlsfuzzer / tlslite-ng / 13498621421

24 Feb 2025 12:59PM UTC coverage: 84.058% (+0.4%) from 83.706%
13498621421

push

github

web-flow
Merge pull request #545 from tlsfuzzer/ci-updates

add py3.13 to CI

5070 of 6600 branches covered (76.82%)

Branch coverage included in aggregate %.

8 of 12 new or added lines in 1 file covered. (66.67%)

1 existing line in 1 file now uncovered.

12325 of 14094 relevant lines covered (87.45%)

47.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.23
/tlslite/keyexchange.py
1
# Authors:
2
#   Hubert Kario (2015)
3
#
4
# See the LICENSE file for legal information regarding use of this file.
5
"""Handling of cryptographic operations for key exchange"""
27✔
6

7
import ecdsa
59✔
8
from .mathtls import goodGroupParameters, makeK, makeU, makeX, \
59✔
9
        paramStrength, RFC7919_GROUPS, calc_key
10
from .errors import TLSInsufficientSecurity, TLSUnknownPSKIdentity, \
59✔
11
        TLSIllegalParameterException, TLSDecryptionFailed, TLSInternalError, \
12
        TLSDecodeError
13
from .messages import ServerKeyExchange, ClientKeyExchange, CertificateVerify
59✔
14
from .constants import SignatureAlgorithm, HashAlgorithm, CipherSuite, \
59✔
15
        ExtensionType, GroupName, ECCurveType, SignatureScheme, ECPointFormat
16
from .utils.ecc import getCurveByName, getPointByteSize
59✔
17
from .utils.rsakey import RSAKey
59✔
18
from .utils.cryptomath import bytesToNumber, getRandomBytes, powMod, \
59✔
19
        numBits, numberToByteArray, divceil, numBytes, secureHash
20
from .utils.lists import getFirstMatching
59✔
21
from .utils import tlshashlib as hashlib
59✔
22
from .utils.x25519 import x25519, x448, X25519_G, X448_G, X25519_ORDER_SIZE, \
59✔
23
        X448_ORDER_SIZE
24
from .utils.compat import int_types, ML_KEM_AVAILABLE
59✔
25
from .utils.codec import DecodeError
59✔
26

27
if ML_KEM_AVAILABLE:
59✔
28
    from kyber_py.ml_kem import ML_KEM_768, ML_KEM_1024
10✔
29

30

31
class KeyExchange(object):
59✔
32
    """
33
    Common API for calculating Premaster secret
34

35
    NOT stable, will get moved from this file
36
    """
37

38
    def __init__(self, cipherSuite, clientHello, serverHello, privateKey=None):
59✔
39
        """Initialize KeyExchange. privateKey is the signing private key"""
40
        self.cipherSuite = cipherSuite
59✔
41
        self.clientHello = clientHello
59✔
42
        self.serverHello = serverHello
59✔
43
        self.privateKey = privateKey
59✔
44

45
    def makeServerKeyExchange(self, sigHash=None):
59✔
46
        """
47
        Create a ServerKeyExchange object
48

49
        Returns a ServerKeyExchange object for the server's initial leg in the
50
        handshake. If the key exchange method does not send ServerKeyExchange
51
        (e.g. RSA), it returns None.
52
        """
53
        raise NotImplementedError()
59✔
54

55
    def makeClientKeyExchange(self):
59✔
56
        """
57
        Create a ClientKeyExchange object
58

59
        Returns a ClientKeyExchange for the second flight from client in the
60
        handshake.
61
        """
62
        return ClientKeyExchange(self.cipherSuite,
59✔
63
                                 self.serverHello.server_version)
64

65
    def processClientKeyExchange(self, clientKeyExchange):
59✔
66
        """
67
        Process ClientKeyExchange and return premaster secret
68

69
        Processes the client's ClientKeyExchange message and returns the
70
        premaster secret. Raises TLSLocalAlert on error.
71
        """
72
        raise NotImplementedError()
59✔
73

74
    def processServerKeyExchange(self, srvPublicKey,
59✔
75
                                 serverKeyExchange):
76
        """Process the server KEX and return premaster secret"""
77
        raise NotImplementedError()
59✔
78

79
    def _tls12_sign_ecdsa_SKE(self, serverKeyExchange, sigHash=None):
59✔
80
        try:
59✔
81
            serverKeyExchange.hashAlg, serverKeyExchange.signAlg = \
59✔
82
                    getattr(SignatureScheme, sigHash)
83
            hashName = SignatureScheme.getHash(sigHash)
59✔
84
        except AttributeError:
59✔
85
            serverKeyExchange.hashAlg = getattr(HashAlgorithm, sigHash)
59✔
86
            serverKeyExchange.signAlg = SignatureAlgorithm.ecdsa
59✔
87
            hashName = sigHash
59✔
88

89
        hash_bytes = serverKeyExchange.hash(self.clientHello.random,
59✔
90
                                            self.serverHello.random)
91

92
        hash_bytes = hash_bytes[:self.privateKey.private_key.curve.baselen]
59✔
93

94
        serverKeyExchange.signature = \
59✔
95
            self.privateKey.sign(hash_bytes, hashAlg=hashName)
96

97
        if not serverKeyExchange.signature:
59!
98
            raise TLSInternalError("Empty signature")
×
99

100
        if not self.privateKey.verify(serverKeyExchange.signature,
59!
101
                                             hash_bytes,
102
                                             ecdsa.util.sigdecode_der):
103
            raise TLSInternalError("signature validation failure")
×
104

105
    def _tls12_sign_dsa_SKE(self, serverKeyExchange, sigHash=None):
59✔
106
        """Sign a TLSv1.2 SKE message."""
107
        try:
59✔
108
            serverKeyExchange.hashAlg, serverKeyExchange.signAlg = \
59✔
109
                getattr(SignatureScheme, sigHash)
110

111
        except AttributeError:
59✔
112
            serverKeyExchange.signAlg = SignatureAlgorithm.dsa
59✔
113
            serverKeyExchange.hashAlg = getattr(HashAlgorithm, sigHash)
59✔
114

115
        hashBytes = serverKeyExchange.hash(self.clientHello.random,
59✔
116
                                           self.serverHello.random)
117

118
        serverKeyExchange.signature = \
59✔
119
            self.privateKey.sign(hashBytes)
120

121
        if not serverKeyExchange.signature:
59!
122
            raise TLSInternalError("Empty signature")
×
123

124
        if not self.privateKey.verify(serverKeyExchange.signature,
59!
125
                                      hashBytes):
126
            raise TLSInternalError("Server Key Exchange signature invalid")
×
127

128
    def _tls12_sign_eddsa_ske(self, server_key_exchange, sig_hash):
59✔
129
        """Sign a TLSv1.2 SKE message."""
130
        server_key_exchange.hashAlg, server_key_exchange.signAlg = \
59✔
131
                getattr(SignatureScheme, sig_hash)
132
        pad_type = None
59✔
133
        hash_name = None
59✔
134
        salt_len = None
59✔
135

136
        hash_bytes = server_key_exchange.hash(self.clientHello.random,
59✔
137
                                              self.serverHello.random)
138

139
        server_key_exchange.signature = \
59✔
140
            self.privateKey.hashAndSign(hash_bytes,
141
                                        pad_type,
142
                                        hash_name,
143
                                        salt_len)
144

145
        if not server_key_exchange.signature:
59!
146
            raise TLSInternalError("Empty signature")
×
147

148
        if not self.privateKey.hashAndVerify(
59!
149
                server_key_exchange.signature,
150
                hash_bytes,
151
                pad_type,
152
                hash_name,
153
                salt_len):
154
            raise TLSInternalError("Server Key Exchange signature invalid")
×
155

156
    def _tls12_signSKE(self, serverKeyExchange, sigHash=None):
59✔
157
        """Sign a TLSv1.2 SKE message."""
158
        try:
59✔
159
            serverKeyExchange.hashAlg, serverKeyExchange.signAlg = \
59✔
160
                    getattr(SignatureScheme, sigHash)
161
            keyType = SignatureScheme.getKeyType(sigHash)
59✔
162
            padType = SignatureScheme.getPadding(sigHash)
59✔
163
            hashName = SignatureScheme.getHash(sigHash)
59✔
164
            saltLen = getattr(hashlib, hashName)().digest_size
59✔
165
        except AttributeError:
59✔
166
            serverKeyExchange.signAlg = SignatureAlgorithm.rsa
59✔
167
            serverKeyExchange.hashAlg = getattr(HashAlgorithm, sigHash)
59✔
168
            keyType = 'rsa'
59✔
169
            padType = 'pkcs1'
59✔
170
            hashName = sigHash
59✔
171
            saltLen = 0
59✔
172

173
        assert keyType == 'rsa'
59✔
174

175
        hashBytes = serverKeyExchange.hash(self.clientHello.random,
59✔
176
                                           self.serverHello.random)
177

178
        serverKeyExchange.signature = \
59✔
179
            self.privateKey.sign(hashBytes,
180
                                 padding=padType,
181
                                 hashAlg=hashName,
182
                                 saltLen=saltLen)
183

184
        if not serverKeyExchange.signature:
59✔
185
            raise TLSInternalError("Empty signature")
59✔
186

187
        if not self.privateKey.verify(serverKeyExchange.signature,
59✔
188
                                      hashBytes,
189
                                      padding=padType,
190
                                      hashAlg=hashName,
191
                                      saltLen=saltLen):
192
            raise TLSInternalError("Server Key Exchange signature invalid")
59✔
193

194
    def signServerKeyExchange(self, serverKeyExchange, sigHash=None):
59✔
195
        """
196
        Sign a server key exchange using default or specified algorithm
197

198
        :type sigHash: str
199
        :param sigHash: name of the signature hash to be used for signing
200
        """
201
        if self.serverHello.server_version < (3, 3):
59✔
202
            if self.privateKey.key_type == "ecdsa":
59✔
203
                serverKeyExchange.signAlg = SignatureAlgorithm.ecdsa
59✔
204
            if self.privateKey.key_type == "dsa":
59✔
205
                serverKeyExchange.signAlg = SignatureAlgorithm.dsa
59✔
206
            hashBytes = serverKeyExchange.hash(self.clientHello.random,
59✔
207
                                               self.serverHello.random)
208

209
            serverKeyExchange.signature = self.privateKey.sign(hashBytes)
59✔
210

211
            if not serverKeyExchange.signature:
59✔
212
                raise TLSInternalError("Empty signature")
59✔
213

214
            if not self.privateKey.verify(serverKeyExchange.signature,
59✔
215
                                          hashBytes):
216
                raise TLSInternalError("Server Key Exchange signature invalid")
59✔
217
        else:
218
            if self.privateKey.key_type == "ecdsa":
59✔
219
                self._tls12_sign_ecdsa_SKE(serverKeyExchange, sigHash)
59✔
220
            elif self.privateKey.key_type == "dsa":
59✔
221
                self._tls12_sign_dsa_SKE(serverKeyExchange, sigHash)
59✔
222
            elif self.privateKey.key_type in ("Ed25519", "Ed448"):
59✔
223
                self._tls12_sign_eddsa_ske(serverKeyExchange, sigHash)
59✔
224
            else:
225
                self._tls12_signSKE(serverKeyExchange, sigHash)
59✔
226

227
    @staticmethod
59✔
228
    def _tls12_verify_ecdsa_SKE(serverKeyExchange, publicKey, clientRandom,
37✔
229
                                serverRandom, validSigAlgs):
230
        hashName = HashAlgorithm.toRepr(serverKeyExchange.hashAlg)
59✔
231
        if not hashName:
59✔
232
            raise TLSIllegalParameterException("Unknown hash algorithm")
59✔
233

234
        hashBytes = serverKeyExchange.hash(clientRandom, serverRandom)
59✔
235

236
        hashBytes = hashBytes[:publicKey.public_key.curve.baselen]
59✔
237

238
        if not publicKey.verify(serverKeyExchange.signature, hashBytes,
59✔
239
                                padding=None,
240
                                hashAlg=hashName,
241
                                saltLen=None):
242
            raise TLSDecryptionFailed("Server Key Exchange signature "
59✔
243
                                      "invalid")
244

245
    @staticmethod
59✔
246
    def _tls12_verify_eddsa_ske(server_key_exchange, public_key, client_random,
37✔
247
                                server_random, valid_sig_algs):
248
        """Verify SeverKeyExchange messages with EdDSA signatures."""
249
        del valid_sig_algs
59✔
250
        sig_bytes = server_key_exchange.signature
59✔
251
        if not sig_bytes:
59✔
252
            raise TLSIllegalParameterException("Empty signature")
59✔
253

254
        hash_bytes = server_key_exchange.hash(client_random, server_random)
59✔
255

256
        if not public_key.hashAndVerify(sig_bytes,
59✔
257
                                        hash_bytes):
258
            raise TLSDecryptionFailed("Server Key Exchange signature invalid")
59✔
259

260
    @staticmethod
59✔
261
    def _tls12_verify_dsa_SKE(serverKeyExchange, publicKey, clientRandom,
37✔
262
                              serverRandom, validSigAlgs):
263

264
        hashBytes = serverKeyExchange.hash(clientRandom, serverRandom)
59✔
265

266
        if not publicKey.verify(serverKeyExchange.signature, hashBytes):
59✔
267
            raise TLSDecryptionFailed("Server Key Exchange signature "
59✔
268
                                      "invalid")
269

270
    @staticmethod
59✔
271
    def _tls12_verify_SKE(serverKeyExchange, publicKey, clientRandom,
37✔
272
                          serverRandom, validSigAlgs):
273
        """Verify TLSv1.2 version of SKE."""
274
        if (serverKeyExchange.hashAlg, serverKeyExchange.signAlg) not in \
59✔
275
                validSigAlgs:
276
            raise TLSIllegalParameterException("Server selected "
59✔
277
                                               "invalid signature "
278
                                               "algorithm")
279
        if (serverKeyExchange.hashAlg, serverKeyExchange.signAlg) in (
59✔
280
                SignatureScheme.ed25519, SignatureScheme.ed448):
281
            return KeyExchange._tls12_verify_eddsa_ske(serverKeyExchange,
59✔
282
                                                       publicKey,
283
                                                       clientRandom,
284
                                                       serverRandom,
285
                                                       validSigAlgs)
286
        if serverKeyExchange.signAlg == SignatureAlgorithm.ecdsa:
59✔
287
            return KeyExchange._tls12_verify_ecdsa_SKE(serverKeyExchange,
59✔
288
                                                       publicKey,
289
                                                       clientRandom,
290
                                                       serverRandom,
291
                                                       validSigAlgs)
292

293
        elif serverKeyExchange.signAlg == SignatureAlgorithm.dsa:
59✔
294
            return KeyExchange._tls12_verify_dsa_SKE(serverKeyExchange,
59✔
295
                                                     publicKey,
296
                                                     clientRandom,
297
                                                     serverRandom,
298
                                                     validSigAlgs)
299

300
        schemeID = (serverKeyExchange.hashAlg,
59✔
301
                    serverKeyExchange.signAlg)
302
        scheme = SignatureScheme.toRepr(schemeID)
59✔
303
        if scheme is not None:
59✔
304
            keyType = SignatureScheme.getKeyType(scheme)
59✔
305
            padType = SignatureScheme.getPadding(scheme)
59✔
306
            hashName = SignatureScheme.getHash(scheme)
59✔
307
            saltLen = getattr(hashlib, hashName)().digest_size
59✔
308
        else:
309
            if serverKeyExchange.signAlg != SignatureAlgorithm.rsa:
59✔
310
                raise TLSInternalError("non-RSA sigs are not supported")
59✔
311
            keyType = 'rsa'
59✔
312
            padType = 'pkcs1'
59✔
313
            saltLen = 0
59✔
314
            hashName = HashAlgorithm.toRepr(serverKeyExchange.hashAlg)
59✔
315
            if hashName is None:
59!
316
                msg = "Unknown hash ID: {0}"\
59✔
317
                        .format(serverKeyExchange.hashAlg)
318
                raise TLSIllegalParameterException(msg)
59✔
319
        assert keyType == 'rsa'
59✔
320

321
        hashBytes = serverKeyExchange.hash(clientRandom, serverRandom)
59✔
322

323
        sigBytes = serverKeyExchange.signature
59✔
324
        if not sigBytes:
59✔
325
            raise TLSIllegalParameterException("Empty signature")
59✔
326

327
        if not publicKey.verify(sigBytes, hashBytes,
59✔
328
                                padding=padType,
329
                                hashAlg=hashName,
330
                                saltLen=saltLen):
331
            raise TLSDecryptionFailed("Server Key Exchange signature "
59✔
332
                                      "invalid")
333

334
    @staticmethod
59✔
335
    def verifyServerKeyExchange(serverKeyExchange, publicKey, clientRandom,
37✔
336
                                serverRandom, validSigAlgs):
337
        """Verify signature on the Server Key Exchange message
338

339
        the only acceptable signature algorithms are specified by validSigAlgs
340
        """
341
        if serverKeyExchange.version < (3, 3):
59✔
342
            hashBytes = serverKeyExchange.hash(clientRandom, serverRandom)
59✔
343
            sigBytes = serverKeyExchange.signature
59✔
344

345
            if not sigBytes:
59!
346
                raise TLSIllegalParameterException("Empty signature")
×
347

348
            if not publicKey.verify(sigBytes, hashBytes):
59✔
349
                raise TLSDecryptionFailed("Server Key Exchange signature "
59✔
350
                                          "invalid")
351
        else:
352
            KeyExchange._tls12_verify_SKE(serverKeyExchange, publicKey,
59✔
353
                                          clientRandom, serverRandom,
354
                                          validSigAlgs)
355

356
    @staticmethod
59✔
357
    def calcVerifyBytes(version, handshakeHashes, signatureAlg,
59✔
358
                        premasterSecret, clientRandom, serverRandom,
359
                        prf_name = None, peer_tag=b'client', key_type="rsa"):
360
        """Calculate signed bytes for Certificate Verify"""
361
        if version == (3, 0):
59✔
362
            masterSecret = calc_key(version, premasterSecret,
59✔
363
                                    0, b"master secret",
364
                                    client_random=clientRandom,
365
                                    server_random=serverRandom,
366
                                    output_length=48)
367
            verifyBytes = handshakeHashes.digestSSL(masterSecret, b"")
59✔
368
        elif version in ((3, 1), (3, 2)):
59✔
369
            if key_type != "ecdsa":
59✔
370
                verifyBytes = handshakeHashes.digest()
59✔
371
            else:
372
                verifyBytes = handshakeHashes.digest("sha1")
59✔
373
        elif version == (3, 3):
59✔
374
            if signatureAlg in (SignatureScheme.ed25519,
59✔
375
                    SignatureScheme.ed448):
376
                hashName = "intrinsic"
59✔
377
                padding = None
59✔
378
            elif signatureAlg[1] == SignatureAlgorithm.dsa:
59✔
379
                hashName = HashAlgorithm.toRepr(signatureAlg[0])
59✔
380
                padding = None
59✔
381
            elif signatureAlg[1] != SignatureAlgorithm.ecdsa:
59!
382
                scheme = SignatureScheme.toRepr(signatureAlg)
59✔
383
                if scheme is None:
59✔
384
                    hashName = HashAlgorithm.toRepr(signatureAlg[0])
59✔
385
                    padding = 'pkcs1'
59✔
386
                else:
387
                    hashName = SignatureScheme.getHash(scheme)
59✔
388
                    padding = SignatureScheme.getPadding(scheme)
59✔
389
            else:
390
                padding = None
×
391
                hashName = HashAlgorithm.toRepr(signatureAlg[0])
×
392
            verifyBytes = handshakeHashes.digest(hashName)
59✔
393
            if padding == 'pkcs1':
59✔
394
                verifyBytes = RSAKey.addPKCS1Prefix(verifyBytes, hashName)
59✔
395
        elif version == (3, 4):
59!
396
            scheme = SignatureScheme.toRepr(signatureAlg)
59✔
397
            if scheme:
59✔
398
                hash_name = SignatureScheme.getHash(scheme)
59✔
399
            else:
400
                # handles negative test cases when we try to pass in
401
                # schemes that are not supported in TLS1.3
402
                hash_name = HashAlgorithm.toRepr(signatureAlg[0])
59✔
403
            verifyBytes = bytearray(b'\x20' * 64 +
59✔
404
                                    b'TLS 1.3, ' + peer_tag +
405
                                    b' CertificateVerify' +
406
                                    b'\x00') + \
407
                          handshakeHashes.digest(prf_name)
408
            if hash_name != "intrinsic":
59✔
409
                verifyBytes = secureHash(verifyBytes, hash_name)
59✔
410
        else:
411
            raise ValueError("Unsupported TLS version {0}".format(version))
×
412
        return verifyBytes
59✔
413

414
    @staticmethod
59✔
415
    def makeCertificateVerify(version, handshakeHashes, validSigAlgs,
37✔
416
                              privateKey, certificateRequest, premasterSecret,
417
                              clientRandom, serverRandom):
418
        """Create a Certificate Verify message
419

420
        :param version: protocol version in use
421
        :param handshakeHashes: the running hash of all handshake messages
422
        :param validSigAlgs: acceptable signature algorithms for client side,
423
            applicable only to TLSv1.2 (or later)
424
        :param certificateRequest: the server provided Certificate Request
425
            message
426
        :param premasterSecret: the premaster secret, needed only for SSLv3
427
        :param clientRandom: client provided random value, needed only for
428
            SSLv3
429
        :param serverRandom: server provided random value, needed only for
430
            SSLv3
431
        """
432
        signatureAlgorithm = None
59✔
433
        if privateKey.key_type == "ecdsa" and version < (3, 3):
59✔
434
            signatureAlgorithm = (HashAlgorithm.sha1, SignatureAlgorithm.ecdsa)
59✔
435
        # in TLS 1.2 we must decide which algorithm to use for signing
436
        if version == (3, 3):
59✔
437
            serverSigAlgs = certificateRequest.supported_signature_algs
59✔
438
            signatureAlgorithm = getFirstMatching(validSigAlgs, serverSigAlgs)
59✔
439
            # if none acceptable, do a last resort:
440
            if signatureAlgorithm is None:
59✔
441
                signatureAlgorithm = validSigAlgs[0]
59✔
442
        verifyBytes = KeyExchange.calcVerifyBytes(version, handshakeHashes,
59✔
443
                                                  signatureAlgorithm,
444
                                                  premasterSecret,
445
                                                  clientRandom,
446
                                                  serverRandom,
447
                                                  key_type=privateKey.key_type)
448
        if signatureAlgorithm and signatureAlgorithm in (
59✔
449
                SignatureScheme.ed25519, SignatureScheme.ed448):
450
            padding = None
59✔
451
            hashName = "intrinsic"
59✔
452
            saltLen = None
59✔
453
            sig_func = privateKey.hashAndSign
59✔
454
            ver_func = privateKey.hashAndVerify
59✔
455
        elif signatureAlgorithm and \
59✔
456
                signatureAlgorithm[1] == SignatureAlgorithm.ecdsa:
457
            padding = None
59✔
458
            hashName = HashAlgorithm.toRepr(signatureAlgorithm[0])
59✔
459
            saltLen = None
59✔
460
            verifyBytes = verifyBytes[:privateKey.private_key.curve.baselen]
59✔
461
            sig_func = privateKey.sign
59✔
462
            ver_func = privateKey.verify
59✔
463
        elif signatureAlgorithm and \
59✔
464
                signatureAlgorithm[1] == SignatureAlgorithm.dsa:
465
            padding = None
59✔
466
            hashName = HashAlgorithm.toRepr(signatureAlgorithm[0])
59✔
467
            saltLen = None
59✔
468
            sig_func = privateKey.sign
59✔
469
            ver_func = privateKey.verify
59✔
470
        else:
471
            scheme = SignatureScheme.toRepr(signatureAlgorithm)
59✔
472
            # for pkcs1 signatures hash is used to add PKCS#1 prefix, but
473
            # that was already done by calcVerifyBytes
474
            hashName = None
59✔
475
            saltLen = 0
59✔
476
            if scheme is None:
59✔
477
                padding = 'pkcs1'
59✔
478
            else:
479
                padding = SignatureScheme.getPadding(scheme)
59✔
480
                if padding == 'pss':
59✔
481
                    hashName = SignatureScheme.getHash(scheme)
59✔
482
                    saltLen = getattr(hashlib, hashName)().digest_size
59✔
483
            sig_func = privateKey.sign
59✔
484
            ver_func = privateKey.verify
59✔
485

486
        signedBytes = sig_func(verifyBytes,
59✔
487
                               padding,
488
                               hashName,
489
                               saltLen)
490
        if not ver_func(signedBytes, verifyBytes, padding, hashName,
59✔
491
                        saltLen):
492
            raise TLSInternalError("Certificate Verify signature invalid")
59✔
493
        certificateVerify = CertificateVerify(version)
59✔
494
        certificateVerify.create(signedBytes, signatureAlgorithm)
59✔
495

496
        return certificateVerify
59✔
497

498

499
class AuthenticatedKeyExchange(KeyExchange):
59✔
500
    """
501
    Common methods for key exchanges that authenticate Server Key Exchange
502

503
    Methods for signing Server Key Exchange message
504
    """
505

506
    def makeServerKeyExchange(self, sigHash=None):
59✔
507
        """Prepare server side of key exchange with selected parameters"""
508
        ske = super(AuthenticatedKeyExchange, self).makeServerKeyExchange()
59✔
509
        self.signServerKeyExchange(ske, sigHash)
59✔
510
        return ske
59✔
511

512

513
class RSAKeyExchange(KeyExchange):
59✔
514
    """
515
    Handling of RSA key exchange
516

517
    NOT stable API, do NOT use
518
    """
519

520
    def __init__(self, cipherSuite, clientHello, serverHello, privateKey):
59✔
521
        super(RSAKeyExchange, self).__init__(cipherSuite, clientHello,
59✔
522
                                             serverHello, privateKey)
523
        self.encPremasterSecret = None
59✔
524

525
    def makeServerKeyExchange(self, sigHash=None):
59✔
526
        """Don't create a server key exchange for RSA key exchange"""
527
        return None
59✔
528

529
    def processClientKeyExchange(self, clientKeyExchange):
59✔
530
        """Decrypt client key exchange, return premaster secret"""
531
        premasterSecret = self.privateKey.decrypt(\
59✔
532
            clientKeyExchange.encryptedPreMasterSecret)
533

534
        # On decryption failure randomize premaster secret to avoid
535
        # Bleichenbacher's "million message" attack
536
        randomPreMasterSecret = getRandomBytes(48)
59✔
537
        if not premasterSecret:
59!
UNCOV
538
            premasterSecret = randomPreMasterSecret
×
539
        elif len(premasterSecret) != 48:
59✔
540
            premasterSecret = randomPreMasterSecret
59✔
541
        else:
542
            versionCheck = (premasterSecret[0], premasterSecret[1])
59✔
543
            if versionCheck != self.clientHello.client_version:
59✔
544
                #Tolerate buggy IE clients
545
                if versionCheck != self.serverHello.server_version:
59✔
546
                    premasterSecret = randomPreMasterSecret
59✔
547
        return premasterSecret
59✔
548

549
    def processServerKeyExchange(self, srvPublicKey,
59✔
550
                                 serverKeyExchange):
551
        """Generate premaster secret for server"""
552
        del serverKeyExchange # not present in RSA key exchange
59✔
553
        premasterSecret = getRandomBytes(48)
59✔
554
        premasterSecret[0] = self.clientHello.client_version[0]
59✔
555
        premasterSecret[1] = self.clientHello.client_version[1]
59✔
556

557
        self.encPremasterSecret = srvPublicKey.encrypt(premasterSecret)
59✔
558
        return premasterSecret
59✔
559

560
    def makeClientKeyExchange(self):
59✔
561
        """Return a client key exchange with clients key share"""
562
        clientKeyExchange = super(RSAKeyExchange, self).makeClientKeyExchange()
59✔
563
        clientKeyExchange.createRSA(self.encPremasterSecret)
59✔
564
        return clientKeyExchange
59✔
565

566

567
class ADHKeyExchange(KeyExchange):
59✔
568
    """
569
    Handling of anonymous Diffie-Hellman Key exchange
570

571
    FFDHE without signing serverKeyExchange useful for anonymous DH
572
    """
573

574
    def __init__(self, cipherSuite, clientHello, serverHello,
59✔
575
                 dhParams=None, dhGroups=None):
576
        super(ADHKeyExchange, self).__init__(cipherSuite, clientHello,
59✔
577
                                             serverHello)
578
#pylint: enable = invalid-name
579
        self.dh_Xs = None
59✔
580
        self.dh_Yc = None
59✔
581
        if dhParams:
59✔
582
            self.dh_g, self.dh_p = dhParams
59✔
583
        else:
584
            # 2048-bit MODP Group (RFC 5054, group 3)
585
            self.dh_g, self.dh_p = goodGroupParameters[2]
59✔
586
        self.dhGroups = dhGroups
59✔
587

588
    def makeServerKeyExchange(self):
59✔
589
        """
590
        Prepare server side of anonymous key exchange with selected parameters
591
        """
592
        # Check for RFC 7919 support
593
        ext = self.clientHello.getExtension(ExtensionType.supported_groups)
59✔
594
        if ext and self.dhGroups:
59✔
595
            commonGroup = getFirstMatching(ext.groups, self.dhGroups)
59✔
596
            if commonGroup:
59✔
597
                self.dh_g, self.dh_p = RFC7919_GROUPS[commonGroup - 256]
59✔
598
            elif getFirstMatching(ext.groups, range(256, 512)):
59✔
599
                raise TLSInternalError("DHE key exchange attempted despite no "
59✔
600
                                       "overlap between supported groups")
601

602
        # for TLS < 1.3 we need special algorithm to select params (see above)
603
        # so do not pass in the group, if we selected one
604
        kex = FFDHKeyExchange(None, self.serverHello.server_version,
59✔
605
                              self.dh_g, self.dh_p)
606
        self.dh_Xs = kex.get_random_private_key()
59✔
607
        dh_Ys = kex.calc_public_value(self.dh_Xs)
59✔
608

609
        version = self.serverHello.server_version
59✔
610
        serverKeyExchange = ServerKeyExchange(self.cipherSuite, version)
59✔
611
        serverKeyExchange.createDH(self.dh_p, self.dh_g, dh_Ys)
59✔
612
        # No sign for anonymous ServerKeyExchange.
613
        return serverKeyExchange
59✔
614

615
    def processClientKeyExchange(self, clientKeyExchange):
59✔
616
        """Use client provided parameters to establish premaster secret"""
617
        dh_Yc = clientKeyExchange.dh_Yc
59✔
618

619
        kex = FFDHKeyExchange(None, self.serverHello.server_version,
59✔
620
                              self.dh_g, self.dh_p)
621
        return kex.calc_shared_key(self.dh_Xs, dh_Yc)
59✔
622

623
    def processServerKeyExchange(self, srvPublicKey, serverKeyExchange):
59✔
624
        """Process the server key exchange, return premaster secret."""
625
        del srvPublicKey
59✔
626
        dh_p = serverKeyExchange.dh_p
59✔
627
        # TODO make the minimum changeable
628
        if dh_p < 2**1023:
59✔
629
            raise TLSInsufficientSecurity("DH prime too small")
59✔
630
        dh_g = serverKeyExchange.dh_g
59✔
631
        dh_Ys = serverKeyExchange.dh_Ys
59✔
632

633
        kex = FFDHKeyExchange(None, self.serverHello.server_version,
59✔
634
                              dh_g, dh_p)
635

636
        dh_Xc = kex.get_random_private_key()
59✔
637
        self.dh_Yc = kex.calc_public_value(dh_Xc)
59✔
638
        return kex.calc_shared_key(dh_Xc, dh_Ys)
59✔
639

640
    def makeClientKeyExchange(self):
59✔
641
        """Create client key share for the key exchange"""
642
        cke = super(ADHKeyExchange, self).makeClientKeyExchange()
59✔
643
        cke.createDH(self.dh_Yc)
59✔
644
        return cke
59✔
645

646

647
# the DHE_RSA part comes from IETF ciphersuite names, we want to keep it
648
#pylint: disable = invalid-name
649
class DHE_RSAKeyExchange(AuthenticatedKeyExchange, ADHKeyExchange):
59✔
650
    """
651
    Handling of authenticated ephemeral Diffe-Hellman Key exchange.
652
    """
653

654
    def __init__(self, cipherSuite, clientHello, serverHello, privateKey,
59✔
655
                 dhParams=None, dhGroups=None):
656
        """
657
        Create helper object for Diffie-Hellamn key exchange.
658

659
        :param dhParams: Diffie-Hellman parameters that will be used by
660
            server. First element of the tuple is the generator, the second
661
            is the prime. If not specified it will use a secure set (currently
662
            a 2048-bit safe prime).
663
        :type dhParams: 2-element tuple of int
664
        """
665
        super(DHE_RSAKeyExchange, self).__init__(cipherSuite, clientHello,
59✔
666
                                                 serverHello, dhParams,
667
                                                 dhGroups)
668
#pylint: enable = invalid-name
669
        self.privateKey = privateKey
59✔
670

671

672
class AECDHKeyExchange(KeyExchange):
59✔
673
    """
674
    Handling of anonymous Eliptic curve Diffie-Hellman Key exchange
675

676
    ECDHE without signing serverKeyExchange useful for anonymous ECDH
677
    """
678

679
    def __init__(self, cipherSuite, clientHello, serverHello, acceptedCurves,
59✔
680
                 defaultCurve=GroupName.secp256r1):
681
        super(AECDHKeyExchange, self).__init__(cipherSuite, clientHello,
59✔
682
                                               serverHello)
683
        self.ecdhXs = None
59✔
684
        self.acceptedCurves = acceptedCurves
59✔
685
        self.group_id = None
59✔
686
        self.ecdhYc = None
59✔
687
        self.defaultCurve = defaultCurve
59✔
688

689
    def makeServerKeyExchange(self, sigHash=None):
59✔
690
        """Create AECDHE version of Server Key Exchange"""
691
        #Get client supported groups
692
        client_curves = self.clientHello.getExtension(
59✔
693
                ExtensionType.supported_groups)
694
        if client_curves is None:
59✔
695
            # in case there is no extension, we can pick any curve,
696
            # use the configured one
697
            client_curves = [self.defaultCurve]
59✔
698
        elif not client_curves.groups:
59✔
699
            # extension should have been validated before
700
            raise TLSInternalError("Can't do ECDHE with no client curves")
59✔
701
        else:
702
            client_curves = client_curves.groups
59✔
703

704
        #Pick first client preferred group we support
705
        self.group_id = getFirstMatching(client_curves, self.acceptedCurves)
59✔
706
        if self.group_id is None:
59✔
707
            raise TLSInsufficientSecurity("No mutual groups")
59✔
708

709
        kex = ECDHKeyExchange(self.group_id, self.serverHello.server_version)
59✔
710
        self.ecdhXs = kex.get_random_private_key()
59✔
711

712
        ext_negotiated = ECPointFormat.uncompressed
59✔
713
        ext_c = self.clientHello.getExtension(ExtensionType.ec_point_formats)
59✔
714
        ext_s = self.serverHello.getExtension(ExtensionType.ec_point_formats)
59✔
715
        if ext_c and ext_s:
59✔
716
            try:
59✔
717
                ext_negotiated = next((i for i in ext_c.formats \
59✔
718
                                       if i in ext_s.formats))
719
            except StopIteration:
×
720
                raise TLSIllegalParameterException("No common EC point format")
×
721

722
        ext_negotiated = 'uncompressed' if \
59✔
723
            ext_negotiated == ECPointFormat.uncompressed else 'compressed'
724

725
        ecdhYs = kex.calc_public_value(self.ecdhXs, ext_negotiated)
59✔
726

727
        version = self.serverHello.server_version
59✔
728
        serverKeyExchange = ServerKeyExchange(self.cipherSuite, version)
59✔
729
        serverKeyExchange.createECDH(ECCurveType.named_curve,
59✔
730
                                     named_curve=self.group_id,
731
                                     point=ecdhYs)
732
        # No sign for anonymous ServerKeyExchange
733
        return serverKeyExchange
59✔
734

735
    def processClientKeyExchange(self, clientKeyExchange):
59✔
736
        """Calculate premaster secret from previously generated SKE and CKE"""
737
        ecdhYc = clientKeyExchange.ecdh_Yc
59✔
738

739
        if not ecdhYc:
59✔
740
            raise TLSDecodeError("No key share")
59✔
741

742
        kex = ECDHKeyExchange(self.group_id, self.serverHello.server_version)
59✔
743
        ext_supported = [ECPointFormat.uncompressed]
59✔
744
        ext_c = self.clientHello.getExtension(ExtensionType.ec_point_formats)
59✔
745
        ext_s = self.serverHello.getExtension(ExtensionType.ec_point_formats)
59✔
746
        if ext_c and ext_s:
59✔
747
            ext_supported = [
59✔
748
                ext for ext in ext_c.formats if ext in ext_s.formats
749
                ]
750
            if not ext_supported:
59!
751
                raise TLSIllegalParameterException("No common EC point format")
×
752
        ext_supported = map(
59✔
753
            lambda x: 'uncompressed' if
754
            x == ECPointFormat.uncompressed else
755
            'compressed', ext_supported
756
            )
757
        return kex.calc_shared_key(self.ecdhXs, ecdhYc, set(ext_supported))
59✔
758

759
    def processServerKeyExchange(self, srvPublicKey, serverKeyExchange):
59✔
760
        """Process the server key exchange, return premaster secret"""
761
        del srvPublicKey
59✔
762

763
        if serverKeyExchange.curve_type != ECCurveType.named_curve \
59✔
764
            or serverKeyExchange.named_curve not in self.acceptedCurves:
765
            raise TLSIllegalParameterException("Server picked curve we "
59✔
766
                                               "didn't advertise")
767

768
        ecdh_Ys = serverKeyExchange.ecdh_Ys
59✔
769
        if not ecdh_Ys:
59✔
770
            raise TLSDecodeError("Empty server key share")
59✔
771

772
        kex = ECDHKeyExchange(serverKeyExchange.named_curve,
59✔
773
                              self.serverHello.server_version)
774
        ecdhXc = kex.get_random_private_key()
59✔
775
        ext_negotiated = ECPointFormat.uncompressed
59✔
776
        ext_supported = [ECPointFormat.uncompressed]
59✔
777

778
        if self.clientHello:
59!
779
            ext_c = self.clientHello.getExtension(
59✔
780
                ExtensionType.ec_point_formats)
781
            ext_s = self.serverHello.getExtension(
59✔
782
                ExtensionType.ec_point_formats)
783
            if ext_c and ext_s:
59✔
784
                try:
59✔
785
                    ext_supported = [
59✔
786
                        i for i in ext_c.formats if i in ext_s.formats
787
                        ]
788
                    ext_negotiated = ext_supported[0]
59✔
789
                except IndexError:
×
790
                    raise TLSIllegalParameterException(
×
791
                        "No common EC point format")
792

793
        ext_negotiated = 'uncompressed' if \
59✔
794
            ext_negotiated == ECPointFormat.uncompressed else 'compressed'
795
        ext_supported = map(
59✔
796
            lambda x: 'uncompressed' if
797
            x == ECPointFormat.uncompressed else
798
            'compressed', ext_supported
799
            )
800
        self.ecdhYc = kex.calc_public_value(ecdhXc, ext_negotiated)
59✔
801
        return kex.calc_shared_key(ecdhXc, ecdh_Ys, set(ext_supported))
59✔
802

803
    def makeClientKeyExchange(self):
59✔
804
        """Make client key exchange for ECDHE"""
805
        cke = super(AECDHKeyExchange, self).makeClientKeyExchange()
59✔
806
        cke.createECDH(self.ecdhYc)
59✔
807
        return cke
59✔
808

809

810
# The ECDHE_RSA part comes from the IETF names of ciphersuites, so we want to
811
# keep it
812
#pylint: disable = invalid-name
813
class ECDHE_RSAKeyExchange(AuthenticatedKeyExchange, AECDHKeyExchange):
59✔
814
    """Helper class for conducting ECDHE key exchange"""
815

816
    def __init__(self, cipherSuite, clientHello, serverHello, privateKey,
59✔
817
                 acceptedCurves, defaultCurve=GroupName.secp256r1):
818
        super(ECDHE_RSAKeyExchange, self).__init__(cipherSuite, clientHello,
59✔
819
                                                   serverHello,
820
                                                   acceptedCurves,
821
                                                   defaultCurve)
822
#pylint: enable = invalid-name
823
        self.privateKey = privateKey
59✔
824

825

826
class SRPKeyExchange(KeyExchange):
59✔
827
    """Helper class for conducting SRP key exchange"""
828

829
    def __init__(self, cipherSuite, clientHello, serverHello, privateKey,
59✔
830
                 verifierDB, srpUsername=None, password=None, settings=None):
831
        """Link Key Exchange options with verifierDB for SRP"""
832
        super(SRPKeyExchange, self).__init__(cipherSuite, clientHello,
59✔
833
                                             serverHello, privateKey)
834
        self.N = None
59✔
835
        self.v = None
59✔
836
        self.b = None
59✔
837
        self.B = None
59✔
838
        self.verifierDB = verifierDB
59✔
839
        self.A = None
59✔
840
        self.srpUsername = srpUsername
59✔
841
        self.password = password
59✔
842
        self.settings = settings
59✔
843
        if srpUsername is not None and not isinstance(srpUsername, bytearray):
59✔
844
            raise TypeError("srpUsername must be a bytearray object")
59✔
845
        if password is not None and not isinstance(password, bytearray):
59✔
846
            raise TypeError("password must be a bytearray object")
59✔
847

848
    def makeServerKeyExchange(self, sigHash=None):
59✔
849
        """Create SRP version of Server Key Exchange"""
850
        srpUsername = bytes(self.clientHello.srp_username)
59✔
851
        #Get parameters from username
852
        try:
59✔
853
            entry = self.verifierDB[srpUsername]
59✔
854
        except KeyError:
59✔
855
            raise TLSUnknownPSKIdentity("Unknown identity")
59✔
856
        (self.N, g, s, self.v) = entry
59✔
857

858
        #Calculate server's ephemeral DH values (b, B)
859
        self.b = bytesToNumber(getRandomBytes(32))
59✔
860
        k = makeK(self.N, g)
59✔
861
        self.B = (powMod(g, self.b, self.N) + (k * self.v)) % self.N
59✔
862

863
        #Create ServerKeyExchange, signing it if necessary
864
        serverKeyExchange = ServerKeyExchange(self.cipherSuite,
59✔
865
                                              self.serverHello.server_version)
866
        serverKeyExchange.createSRP(self.N, g, s, self.B)
59✔
867
        if self.cipherSuite in CipherSuite.srpCertSuites:
59✔
868
            self.signServerKeyExchange(serverKeyExchange, sigHash)
59✔
869
        return serverKeyExchange
59✔
870

871
    def processClientKeyExchange(self, clientKeyExchange):
59✔
872
        """Calculate premaster secret from Client Key Exchange and sent SKE"""
873
        A = clientKeyExchange.srp_A
59✔
874
        if A % self.N == 0:
59✔
875
            raise TLSIllegalParameterException("Invalid SRP A value")
59✔
876

877
        #Calculate u
878
        u = makeU(self.N, A, self.B)
59✔
879

880
        #Calculate premaster secret
881
        S = powMod((A * powMod(self.v, u, self.N)) % self.N, self.b, self.N)
59✔
882
        return numberToByteArray(S)
59✔
883

884
    def processServerKeyExchange(self, srvPublicKey, serverKeyExchange):
59✔
885
        """Calculate premaster secret from ServerKeyExchange"""
886
        del srvPublicKey # irrelevant for SRP
59✔
887
        N = serverKeyExchange.srp_N
59✔
888
        g = serverKeyExchange.srp_g
59✔
889
        s = serverKeyExchange.srp_s
59✔
890
        B = serverKeyExchange.srp_B
59✔
891

892
        if (g, N) not in goodGroupParameters:
59✔
893
            raise TLSInsufficientSecurity("Unknown group parameters")
59✔
894
        if numBits(N) < self.settings.minKeySize:
59✔
895
            raise TLSInsufficientSecurity("N value is too small: {0}".\
59✔
896
                                          format(numBits(N)))
897
        if numBits(N) > self.settings.maxKeySize:
59✔
898
            raise TLSInsufficientSecurity("N value is too large: {0}".\
59✔
899
                                          format(numBits(N)))
900
        if B % N == 0:
59✔
901
            raise TLSIllegalParameterException("Suspicious B value")
59✔
902

903
        #Client ephemeral value
904
        a = bytesToNumber(getRandomBytes(32))
59✔
905
        self.A = powMod(g, a, N)
59✔
906

907
        #Calculate client's static DH values (x, v)
908
        x = makeX(s, self.srpUsername, self.password)
59✔
909
        v = powMod(g, x, N)
59✔
910

911
        #Calculate u
912
        u = makeU(N, self.A, B)
59✔
913

914
        #Calculate premaster secret
915
        k = makeK(N, g)
59✔
916
        S = powMod((B - (k*v)) % N, a+(u*x), N)
59✔
917
        return numberToByteArray(S)
59✔
918

919
    def makeClientKeyExchange(self):
59✔
920
        """Create ClientKeyExchange"""
921
        cke = super(SRPKeyExchange, self).makeClientKeyExchange()
59✔
922
        cke.createSRP(self.A)
59✔
923
        return cke
59✔
924

925

926
class RawDHKeyExchange(object):
59✔
927
    """
928
    Abstract class for performing Diffe-Hellman key exchange.
929

930
    Provides a shared API for X25519, ECDHE and FFDHE key exchange.
931
    """
932

933
    def __init__(self, group, version):
59✔
934
        """
935
        Set the parameters of the key exchange
936

937
        Sets group on which the KEX will take part and protocol version used.
938
        """
939
        self.group = group
59✔
940
        self.version = version
59✔
941

942
    def get_random_private_key(self):
59✔
943
        """
944
        Generate a random value suitable for use as the private value of KEX.
945
        """
946
        raise NotImplementedError("Abstract class")
59✔
947

948
    def calc_public_value(self, private, point_format=None):
59✔
949
        """Calculate the public value from the provided private value."""
950
        raise NotImplementedError("Abstract class")
59✔
951

952
    def calc_shared_key(self, private, peer_share, valid_point_formats=None):
59✔
953
        """Calcualte the shared key given our private and remote share value"""
954
        raise NotImplementedError("Abstract class")
59✔
955

956

957
class FFDHKeyExchange(RawDHKeyExchange):
59✔
958
    """Implemenation of the Finite Field Diffie-Hellman key exchange."""
959

960
    def __init__(self, group, version, generator=None, prime=None):
59✔
961
        super(FFDHKeyExchange, self).__init__(group, version)
59✔
962
        if prime and group:
59✔
963
            raise ValueError("Can't set the RFC7919 group and custom params"
59✔
964
                             " at the same time")
965
        if group:
59✔
966
            self.generator, self.prime = RFC7919_GROUPS[group-256]
59✔
967
        else:
968
            self.prime = prime
59✔
969
            self.generator = generator
59✔
970

971
        if not 1 < self.generator < self.prime:
59✔
972
            raise TLSIllegalParameterException("Invalid DH generator")
59✔
973

974
    def get_random_private_key(self):
59✔
975
        """
976
        Return a random private value for the prime used.
977

978
        :rtype: int
979
        """
980
        # Per RFC 3526, Section 1, the exponent should have double the entropy
981
        # of the strength of the group.
982
        needed_bytes = divceil(paramStrength(self.prime) * 2, 8)
59✔
983
        return bytesToNumber(getRandomBytes(needed_bytes))
59✔
984

985
    def calc_public_value(self, private, point_format=None):
59✔
986
        """
987
        Calculate the public value for given private value.
988

989
        :param point_format: ignored, used for compatibility with ECDH groups
990

991
        :rtype: int
992
        """
993
        dh_Y = powMod(self.generator, private, self.prime)
59✔
994
        if dh_Y in (1, self.prime - 1):
59✔
995
            raise TLSIllegalParameterException("Small subgroup capture")
59✔
996
        if self.version < (3, 4):
59✔
997
            return dh_Y
59✔
998
        else:
999
            return numberToByteArray(dh_Y, numBytes(self.prime))
59✔
1000

1001
    def _normalise_peer_share(self, peer_share):
59✔
1002
        """Convert the peer_share to number if necessary."""
1003
        if isinstance(peer_share, (int_types)):
59✔
1004
            return peer_share
59✔
1005

1006
        if numBytes(self.prime) != len(peer_share):
59✔
1007
            raise TLSIllegalParameterException(
59✔
1008
                "Key share does not match FFDH prime")
1009
        return bytesToNumber(peer_share)
59✔
1010

1011
    def calc_shared_key(self, private, peer_share, valid_point_formats=None):
59✔
1012
        """Calculate the shared key.
1013

1014
        :param valid_point_formats: ignored, used for compatibility with ECDH groups
1015

1016
        :rtype: bytearray"""
1017
        peer_share = self._normalise_peer_share(peer_share)
59✔
1018
        # First half of RFC 2631, Section 2.1.5. Validate the client's public
1019
        # key.
1020
        # use of safe primes also means that the p-1 is invalid
1021
        if not 2 <= peer_share < self.prime - 1:
59✔
1022
            raise TLSIllegalParameterException("Invalid peer key share")
59✔
1023

1024
        S = powMod(peer_share, private, self.prime)
59✔
1025
        if S in (1, self.prime - 1):
59✔
1026
            raise TLSIllegalParameterException("Small subgroup capture")
59✔
1027
        if self.version < (3, 4):
59✔
1028
            return numberToByteArray(S)
59✔
1029
        else:
1030
            return numberToByteArray(S, numBytes(self.prime))
59✔
1031

1032

1033
class ECDHKeyExchange(RawDHKeyExchange):
59✔
1034
    """Implementation of the Elliptic Curve Diffie-Hellman key exchange."""
1035
    _x_groups = set((GroupName.x25519, GroupName.x448))
59✔
1036

1037
    @staticmethod
59✔
1038
    def _non_zero_check(value):
37✔
1039
        """
1040
        Verify using constant time operation that the bytearray is not zero
1041

1042
        :raises TLSIllegalParameterException: if the value is all zero
1043
        """
1044
        summa = 0
59✔
1045
        for i in value:
59✔
1046
            summa |= i
59✔
1047
        if summa == 0:
59✔
1048
            raise TLSIllegalParameterException("Invalid key share")
59✔
1049

1050
    def __init__(self, group, version):
59✔
1051
        super(ECDHKeyExchange, self).__init__(group, version)
59✔
1052

1053
    def get_random_private_key(self):
59✔
1054
        """Return random private key value for the selected curve."""
1055
        if self.group in self._x_groups:
59✔
1056
            if self.group == GroupName.x25519:
59✔
1057
                return getRandomBytes(X25519_ORDER_SIZE)
59✔
1058
            else:
1059
                return getRandomBytes(X448_ORDER_SIZE)
59✔
1060
        else:
1061
            curve = getCurveByName(GroupName.toStr(self.group))
59✔
1062
            return ecdsa.keys.SigningKey.generate(curve)
59✔
1063

1064
    def _get_fun_gen_size(self):
59✔
1065
        """Return the function and generator for X25519/X448 KEX."""
1066
        if self.group == GroupName.x25519:
59✔
1067
            return x25519, bytearray(X25519_G), X25519_ORDER_SIZE
59✔
1068
        else:
1069
            return x448, bytearray(X448_G), X448_ORDER_SIZE
59✔
1070

1071
    def calc_public_value(self, private, point_format='uncompressed'):
59✔
1072
        """
1073
        Calculate public value for given private key.
1074

1075
        :param private: Private key for the selected key exchange group.
1076
        :param str point_format: The point format to use for the
1077
            ECDH public key. Applies only to NIST curves.
1078
        """
1079
        if isinstance(private, ecdsa.keys.SigningKey):
59✔
1080
            return private.verifying_key.to_string(point_format)
59✔
1081
        if self.group in self._x_groups:
59!
1082
            fun, generator, _ = self._get_fun_gen_size()
59✔
1083
            return fun(private, generator)
59✔
1084
        else:
1085
            curve = getCurveByName(GroupName.toStr(self.group))
×
1086
            point = curve.generator * private
×
1087
            return bytearray(point.to_bytes(point_format))
×
1088

1089
    def calc_shared_key(self, private, peer_share,
59✔
1090
            valid_point_formats=('uncompressed',)):
1091
        """
1092
        Calculate the shared key.
1093

1094
        :param bytearray | SigningKey private: private value
1095

1096
        :param bytearray peer_share: public value
1097

1098
        :param set(str) valid_point_formats: list of point formats that
1099
            the peer share can be in; ["uncompressed"] by default.
1100

1101
        :rtype: bytearray
1102
        :returns: shared key
1103

1104
        :raises TLSIllegalParameterException
1105
            when the paramentrs for point are invalid.
1106
        :raises TLSDecodeError
1107
            when the the valid_point_formats is empty.
1108

1109
        """
1110
        if self.group in self._x_groups:
59✔
1111
            fun, _, size = self._get_fun_gen_size()
59✔
1112
            if len(peer_share) != size:
59✔
1113
                raise TLSIllegalParameterException("Invalid key share")
59✔
1114
            if isinstance(private, ecdsa.keys.SigningKey):
59!
1115
                private = bytesToNumber(private.to_string())
×
1116
            S = fun(private, peer_share)
59✔
1117
            self._non_zero_check(S)
59✔
1118
            return S
59✔
1119

1120
        curve = getCurveByName(GroupName.toRepr(self.group))
59✔
1121
        try:
59✔
1122
            abstractPoint = ecdsa.ellipticcurve.AbstractPoint()
59✔
1123
            point = abstractPoint.from_bytes(
59✔
1124
                curve.curve, peer_share, valid_encodings=valid_point_formats)
1125
            ecdhYc = ecdsa.ellipticcurve.Point(
59✔
1126
                curve.curve, point[0], point[1])
1127

1128
        except AssertionError:
59!
1129
            raise TLSIllegalParameterException("Invalid ECC point")
59✔
1130
        except DecodeError:
×
1131
            raise TLSDecodeError("Empty point formats extension")
×
1132
        if isinstance(private, ecdsa.keys.SigningKey):
59!
1133
            ecdh = ecdsa.ecdh.ECDH(curve=curve, private_key=private)
59✔
1134
            ecdh.load_received_public_key_bytes(peer_share,
59✔
1135
                                                valid_encodings=
1136
                                                valid_point_formats)
1137
            return bytearray(ecdh.generate_sharedsecret_bytes())
59✔
1138
        S = ecdhYc * private
×
1139

1140
        return numberToByteArray(S.x(), getPointByteSize(ecdhYc))
×
1141

1142

1143
class KEMKeyExchange(object):
59✔
1144
    """
1145
    Implementation of the Hybrid KEM key exchange groups.
1146

1147
    Caution, KEMs are not symmetric! While they client calls the
1148
    same get_random_private_key(), calc_public_value(), and calc_shared_key()
1149
    as in FFDH or ECDH, the server calls just the encapsulate_key() method.
1150
    """
1151

1152
    def __init__(self, group, version):
59✔
1153
        if not ML_KEM_AVAILABLE:
10!
1154
            raise TLSInternalError("kyber-py library not installed!")
×
1155
        self.group = group
10✔
1156
        assert version == (3, 4)
10✔
1157
        del version
10✔
1158

1159
        if self.group not in GroupName.allKEM:
10✔
1160
            raise TLSInternalError("called with wrong group")
10✔
1161

1162
        if self.group == GroupName.secp256r1mlkem768:
10✔
1163
            self._classic_group = GroupName.secp256r1
10✔
1164
        elif self.group == GroupName.x25519mlkem768:
10✔
1165
            self._classic_group = GroupName.x25519
10✔
1166
        else:
1167
            assert self.group == GroupName.secp384r1mlkem1024
10✔
1168
            self._classic_group = GroupName.secp384r1
10✔
1169

1170
    def get_random_private_key(self):
59✔
1171
        """
1172
        Generates a random value to be used as the private key in KEM.
1173

1174
        To be used only to generate the KeyShare in ClientHello.
1175
        """
1176

1177
        if self.group not in GroupName.allKEM:
10!
1178
            raise TLSInternalError("called with wrong group")
×
1179
        if self.group in (GroupName.secp256r1mlkem768,
10✔
1180
                          GroupName.x25519mlkem768):
1181
            pqc_pub_key, pqc_priv_key = ML_KEM_768.keygen()
10✔
1182
        else:
1183
            pqc_pub_key, pqc_priv_key = ML_KEM_1024.keygen()
10✔
1184

1185
        classic_kex = ECDHKeyExchange(self._classic_group, (3, 4))
10✔
1186
        classic_key = classic_kex.get_random_private_key()
10✔
1187

1188
        return ((pqc_pub_key, pqc_priv_key), classic_key)
10✔
1189

1190
    def calc_public_value(self, private, point_format='uncompressed'):
59✔
1191
        """
1192
        Extract public values for the private key.
1193

1194
        To be used only to generate the KeyShare in ClientHello.
1195

1196
        :param str point_format: Point format of the ECDH portion of the
1197
            key exchange (effective only for NIST curves, valid is
1198
            'uncompressed' only)
1199
        """
1200
        classic_kex = ECDHKeyExchange(self._classic_group, (3, 4))
10✔
1201

1202
        classic_pub_key_share = classic_kex.calc_public_value(
10✔
1203
            private[1], point_format=point_format)
1204

1205
        if self.group == GroupName.x25519mlkem768:
10✔
1206
            return private[0][0] + classic_pub_key_share
10✔
1207
        return classic_pub_key_share + private[0][0]
10✔
1208

1209
    @staticmethod
59✔
1210
    def _split_key_shares(public, pqc_first, pqc_key_len, classic_key_len):
37✔
1211
        if len(public) != classic_key_len + pqc_key_len:
10✔
1212
            raise TLSIllegalParameterException(
10✔
1213
                "Invalid key size for the selected group. "
1214
                "Expected: {0}, received: {1}".format(
1215
                    classic_key_len + pqc_key_len,
1216
                    len(public)))
1217

1218
        if pqc_first:
10✔
1219
            pqc_key = public[:pqc_key_len]
10✔
1220
            classic_key_share = bytearray(public[pqc_key_len:])
10✔
1221
        else:
1222
            classic_key_share = bytearray(public[:classic_key_len])
10✔
1223
            pqc_key = public[classic_key_len:]
10✔
1224

1225
        return pqc_key, classic_key_share
10✔
1226

1227
    def _group_to_params(self):
59✔
1228
        """Returns a tuple:
1229
        classic_key_len, pqc_ek_key_len, pqc_ciphertext_len, pqc_first, ML_KEM
1230
        """
1231
        if self.group == GroupName.secp256r1mlkem768:
10✔
1232
            classic_key_len = 65
10✔
1233
            pqc_key_len = 1184
10✔
1234
            pqc_ciphertext_len = 1088
10✔
1235
            pqc_first = False
10✔
1236
            ml_kem = ML_KEM_768
10✔
1237
        elif self.group == GroupName.x25519mlkem768:
10✔
1238
            classic_key_len = 32
10✔
1239
            pqc_key_len = 1184
10✔
1240
            pqc_ciphertext_len = 1088
10✔
1241
            pqc_first = True
10✔
1242
            ml_kem = ML_KEM_768
10✔
1243
        else:
1244
            assert self.group == GroupName.secp384r1mlkem1024
10✔
1245
            classic_key_len = 97
10✔
1246
            pqc_key_len = 1568
10✔
1247
            pqc_ciphertext_len = 1568
10✔
1248
            pqc_first = False
10✔
1249
            ml_kem = ML_KEM_1024
10✔
1250

1251
        return classic_key_len, pqc_key_len, pqc_ciphertext_len, pqc_first, \
10✔
1252
            ml_kem
1253

1254
    def encapsulate_key(self, public):
59✔
1255
        """
1256
        Generate a random secret, encapsulate it given the public key,
1257
        and return both the random secret and encapsulation of it.
1258

1259
        To be used for generation of KeyShare in ServerHello.
1260
        """
1261
        classic_key_len, pqc_key_len, _, pqc_first, ml_kem = \
10✔
1262
            self._group_to_params()
1263

1264
        pqc_key, classic_key_share = self._split_key_shares(
10✔
1265
            public, pqc_first, pqc_key_len, classic_key_len)
1266

1267
        classic_kex = ECDHKeyExchange(self._classic_group, (3, 4))
10✔
1268
        classic_key = classic_kex.get_random_private_key()
10✔
1269
        classic_my_key_share = classic_kex.calc_public_value(classic_key)
10✔
1270
        classic_shared_secret = classic_kex.calc_shared_key(
10✔
1271
            classic_key, classic_key_share)
1272

1273
        try:
10✔
1274
            pqc_shared_secret, pqc_encaps = ml_kem.encaps(pqc_key)
10✔
1275
        except ValueError:
10✔
1276
            raise TLSIllegalParameterException(
10✔
1277
                "Invalid PQC key from peer")
1278

1279
        if pqc_first:
10✔
1280
            shared_secret = pqc_shared_secret + classic_shared_secret
10✔
1281
            key_encapsulation = pqc_encaps + classic_my_key_share
10✔
1282
        else:
1283
            shared_secret = classic_shared_secret + pqc_shared_secret
10✔
1284
            key_encapsulation = classic_my_key_share + pqc_encaps
10✔
1285

1286
        return shared_secret, key_encapsulation
10✔
1287

1288
    def calc_shared_key(self, private, key_encaps):
59✔
1289
        """
1290
        Decapsulate the key share received from server.
1291
        """
1292
        classic_key_len, _, pqc_key_len, pqc_first, ml_kem = \
10✔
1293
            self._group_to_params()
1294

1295
        pqc_key, classic_key_share = self._split_key_shares(
10✔
1296
            key_encaps, pqc_first, pqc_key_len, classic_key_len)
1297

1298
        classic_kex = ECDHKeyExchange(self._classic_group, (3, 4))
10✔
1299
        classic_shared_secret = classic_kex.calc_shared_key(
10✔
1300
                private[1], classic_key_share)
1301

1302
        try:
10✔
1303
            pqc_shared_secret = ml_kem.decaps(private[0][1], pqc_key)
10✔
1304
        except ValueError:
×
1305
            raise TLSIllegalParameterException(
×
1306
                "Error in KEM decapsulation")
1307

1308
        if pqc_first:
10✔
1309
            shared_secret = pqc_shared_secret + classic_shared_secret
10✔
1310
        else:
1311
            shared_secret = classic_shared_secret + pqc_shared_secret
10✔
1312

1313
        return shared_secret
10✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc