• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

thorgate / django-esteid / 4354524044

pending completion
4354524044

push

github-actions

GitHub
Merge pull request #67 from thorgate/updates

404 of 571 branches covered (70.75%)

Branch coverage included in aggregate %.

12 of 12 new or added lines in 6 files covered. (100.0%)

1594 of 2012 relevant lines covered (79.22%)

0.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

27.24
/esteid/actions.py
1
# pragma: no cover
2
import binascii
3
import logging
1✔
4
import typing
1✔
5
import warnings
1✔
6
from tempfile import NamedTemporaryFile
1✔
7

8
from esteid_certificates import get_certificate
1✔
9

10
import pyasice
1✔
11
from pyasice import Container, finalize_signature, verify, XmlSignature
1✔
12

13
from esteid import settings
1✔
14
from esteid.exceptions import ActionInProgress, EsteidError, InvalidIdCode, UserNotRegistered
1✔
15
from esteid.mobileid.i18n import TranslatedMobileIDService
1✔
16
from esteid.smartid.i18n import TranslatedSmartIDService
1✔
17

18
from .session import delete_esteid_session, get_esteid_session, open_container, update_esteid_session
1✔
19
from .validators import id_code_ee_is_valid
1✔
20

21

22
if typing.TYPE_CHECKING:
1!
23
    from .generic import GenericDigitalSignViewMixin
×
24

25
warnings.warn("The actions API is deprecated. Please use the new signing API", DeprecationWarning)
1✔
26

27
logger = logging.getLogger(__name__)
1✔
28

29

30
class BaseAction(object):
1✔
31
    @classmethod
1✔
32
    def do_action(cls, view: "GenericDigitalSignViewMixin", params):
1✔
33
        raise NotImplementedError
×
34

35

36
class NoAction(object):
1✔
37
    @classmethod
1✔
38
    def do_action(cls, view, params):
1✔
39
        return {"success": True}
×
40

41

42
class IdCardPrepareAction(BaseAction):
1✔
43
    @classmethod
1✔
44
    def do_action(cls, view: "GenericDigitalSignViewMixin", params: dict = None, *, certificate: str = None):
1✔
45
        """
46
        The old API is to pass a dict of params (previously confusingly named `action_kwargs`),
47
        the keyword args are added here for clarity as to what the method accepts
48

49
        :param view:
50
        :param params:
51
        :param certificate: HEX-encoded certificate from the ID card
52
        :return:
53
        """
54
        request = view.request
1✔
55
        delete_esteid_session(request)
1✔
56

57
        if not certificate:
1!
58
            certificate = (params or {}).get("certificate")
1✔
59

60
        if not certificate:
1!
61
            return {
1✔
62
                "success": False,
63
                "code": "BAD_CERTIFICATE",
64
            }
65

66
        certificate = binascii.a2b_hex(certificate)
×
67

68
        files = view.get_files()
×
69
        container_path = view.get_bdoc_container_file()
×
70

71
        if not (files or container_path):
×
72
            return {
×
73
                "success": False,
74
                "code": "MIN_1_FILE",
75
            }
76

77
        container = open_container(container_path, files)
×
78
        xml_sig = container.prepare_signature(certificate)
×
79

80
        # save intermediate signature XML to temp file
81
        with NamedTemporaryFile(delete=False) as temp_signature_file:
×
82
            temp_signature_file.write(xml_sig.dump())
×
83

84
        # always save container to a temp file
85
        with NamedTemporaryFile(mode="wb", delete=False) as temp_container_file:
×
86
            temp_container_file.write(container.finalize().getbuffer())
×
87

88
        signed_digest = xml_sig.digest()
×
89
        digest_hash_b64 = binascii.b2a_base64(signed_digest).decode()
×
90

91
        update_esteid_session(
×
92
            request,
93
            signed_hash=digest_hash_b64,  # we can take the hash from the signature XML, but it'd take time to compute
94
            temp_signature_file=temp_signature_file.name,
95
            temp_container_file=temp_container_file.name,
96
        )
97

98
        return {
×
99
            "success": True,
100
            "digest": binascii.b2a_hex(signed_digest).decode(),
101
        }
102

103

104
class IdCardFinishAction(BaseAction):
1✔
105
    @classmethod
1✔
106
    def do_action(cls, view: "GenericDigitalSignViewMixin", params: dict = None, *, signature_value: str = None):
1✔
107
        """
108
        The old API is to pass a dict of params (previously confusingly named `action_kwargs`),
109
        the keyword args are added here for clarity as to what the method accepts
110

111
        :param view:
112
        :param params:
113
        :param signature_value: a HEX encoded signature, as received from `hwcrypto.js`
114
        :return:
115
        """
116
        request = view.request
1✔
117
        session_data = get_esteid_session(request)
1✔
118
        if not session_data:
1!
119
            return {
×
120
                "success": False,
121
                "code": "NO_SESSION",
122
            }
123

124
        if signature_value is None:
1!
125
            signature_value = (params or {}).get("signature_value")
1✔
126
            if not signature_value:
1!
127
                return {
1✔
128
                    "success": False,
129
                    "code": "BAD_SIGNATURE",
130
                }
131

132
        logger.debug("Signature HEX: %s", signature_value)
×
133

134
        signed_hash_b64 = session_data["signed_hash"]
×
135
        signature_value = binascii.a2b_hex(signature_value)
×
136

137
        temp_signature_file = session_data["temp_signature_file"]
×
138
        temp_container_file = session_data["temp_container_file"]
×
139

140
        with open(temp_signature_file, "rb") as f:
×
141
            xml_sig = XmlSignature(f.read())
×
142

143
        # Load a partially prepared BDoc from a tempfile and clean it up
144
        container = Container.open(temp_container_file)
×
145

146
        # now we don't need the session anymore
147
        delete_esteid_session(request)
×
148

149
        verify(xml_sig.get_certificate_value(), signature_value, binascii.a2b_base64(signed_hash_b64), prehashed=True)
×
150

151
        xml_sig.set_signature_value(signature_value)
×
152
        issuer_cert = get_certificate(xml_sig.get_certificate_issuer_common_name())
×
153

154
        try:
×
155
            finalize_signature(
×
156
                xml_sig,
157
                issuer_cert,
158
                lt_ts=settings.ESTEID_USE_LT_TS,
159
                ocsp_url=settings.OCSP_URL,
160
                tsa_url=settings.TSA_URL,
161
            )
162
        except pyasice.Error:
×
163
            logger.exception("Signature confirmation service error")
×
164
            return {
×
165
                "success": False,
166
                "code": "SERVICE_ERROR",
167
            }
168

169
        container.add_signature(xml_sig)
×
170

171
        return {
×
172
            "success": True,
173
            "container": container,
174
        }
175

176

177
class SignCompleteAction(BaseAction):
1✔
178
    @classmethod
1✔
179
    def do_action(cls, view, params):
1✔
180
        """Return the signed container"""
181
        raise NotImplementedError
×
182

183

184
class MobileIdSignAction(BaseAction):
1✔
185
    @classmethod
1✔
186
    def do_action(
1✔
187
        cls,
188
        view: "GenericDigitalSignViewMixin",
189
        params: dict = None,
190
        *,
191
        phone_number: str = None,
192
        id_code: str = None,
193
        language: str = None,
194
    ):
195
        """
196
        The old API is to pass a dict of params (previously confusingly named `action_kwargs`),
197
        the keyword args are added here for clarity as to what the method accepts
198

199
        :param view:
200
        :param params:
201
        :param phone_number:
202
        :param id_code:
203
        :param language:
204
        :return:
205
        """
206
        request = view.request
×
207
        delete_esteid_session(request)
×
208

209
        if not phone_number:
×
210
            phone_number = params["phone_number"]
×
211
        if not id_code:
×
212
            id_code = params["id_code"]
×
213

214
        if not (phone_number and id_code):
×
215
            return {
×
216
                "success": False,
217
                "code": "BAD_PARAMS",
218
            }
219

220
        # NOTE: since EE and LT id codes use the same format, we are using the same function.
221
        if not id_code_ee_is_valid(id_code):
×
222
            return {
×
223
                "success": False,
224
                "code": "INVALID_ID_CODE",
225
            }
226

227
        files = view.get_files()
×
228
        container_path = view.get_bdoc_container_file()
×
229

230
        if not (files or container_path):
×
231
            return {
×
232
                "success": False,
233
                "code": "MIN_1_FILE",
234
            }
235

236
        service = TranslatedMobileIDService.get_instance()
×
237

238
        try:
×
239
            certificate = service.get_certificate(id_code, phone_number)
×
240
        except UserNotRegistered:
×
241
            return {
×
242
                "success": False,
243
                "code": "NOT_A_MOBILEID_USER",
244
            }
245

246
        container = open_container(container_path, files)
×
247
        xml_sig = container.prepare_signature(certificate)
×
248

249
        # save intermediate signature XML to temp file
250
        with NamedTemporaryFile(delete=False) as temp_signature_file:
×
251
            temp_signature_file.write(xml_sig.dump())
×
252

253
        # always save container to a temp file
254
        with NamedTemporaryFile(mode="wb", delete=False) as temp_container_file:
×
255
            temp_container_file.write(container.finalize().getbuffer())
×
256

257
        try:
×
258
            sign_session = service.sign(id_code, phone_number, xml_sig.signed_data(), language=language)
×
259
        except EsteidError:
×
260
            return {
×
261
                "success": False,
262
                "code": "SIGN_SESSION_FAILED",
263
            }
264

265
        signed_digest = sign_session.digest
×
266
        digest_hash_b64 = binascii.b2a_base64(signed_digest).decode()
×
267

268
        update_esteid_session(
×
269
            request,
270
            session_id=sign_session.session_id,
271
            digest_b64=digest_hash_b64,
272
            temp_signature_file=temp_signature_file.name,
273
            temp_container_file=temp_container_file.name,
274
        )
275

276
        return {
×
277
            "success": True,
278
            "challenge": sign_session.verification_code,
279
            "verification_code": sign_session.verification_code,
280
        }
281

282

283
class MobileIdStatusAction(BaseAction):
1✔
284
    @classmethod
1✔
285
    def do_action(cls, view: "GenericDigitalSignViewMixin", params: dict = None):
1✔
286
        request = view.request
×
287
        session_data = get_esteid_session(request)
×
288
        if not session_data:
×
289
            return {
×
290
                "success": False,
291
                "code": "NO_SESSION",
292
            }
293

294
        session_id = session_data["session_id"]
×
295
        signed_digest = session_data["digest_b64"]
×
296
        temp_signature_file = session_data["temp_signature_file"]
×
297
        temp_container_file = session_data["temp_container_file"]
×
298

299
        with open(temp_signature_file, "rb") as f:
×
300
            xml_sig = XmlSignature(f.read())
×
301

302
        # Load a partially prepared BDoc from a tempfile and clean it up
303
        container = Container.open(temp_container_file)
×
304

305
        service = TranslatedMobileIDService.get_instance()
×
306
        try:
×
307
            status = service.sign_status(
×
308
                session_id, xml_sig.get_certificate_value(), binascii.a2b_base64(signed_digest)
309
            )
310
        except ActionInProgress:
×
311
            #  when there is an `ActionInProgress` exception, we shouldn't delete the session.
312
            return {
×
313
                "success": False,
314
                "pending": True,
315
            }
316
        except Exception:
×
317
            # NOTE: we could pick some exceptions that don't require cleanup,
318
            # but this also requires support from the party that polls this action.
319
            # Most likely the whole process would need to be restarted anyway
320
            delete_esteid_session(request)
×
321
            raise
×
322

323
        # now we don't need the session anymore
324
        delete_esteid_session(request)
×
325

326
        xml_sig.set_signature_value(status.signature)
×
327

328
        issuer_cert = get_certificate(xml_sig.get_certificate_issuer_common_name())
×
329

330
        try:
×
331
            finalize_signature(
×
332
                xml_sig,
333
                issuer_cert,
334
                lt_ts=settings.ESTEID_USE_LT_TS,
335
                ocsp_url=settings.OCSP_URL,
336
                tsa_url=settings.TSA_URL,
337
            )
338
        except pyasice.Error:
×
339
            logger.exception("Signature confirmation service error")
×
340
            return {
×
341
                "success": False,
342
                "code": "SERVICE_ERROR",
343
            }
344

345
        container.add_signature(xml_sig)
×
346

347
        return {
×
348
            "success": True,
349
            "container": container,
350
        }
351

352

353
class SmartIdSignAction(BaseAction):
1✔
354
    @classmethod
1✔
355
    def do_action(
1✔
356
        cls,
357
        view: "GenericDigitalSignViewMixin",
358
        params: dict = None,
359
        *,
360
        id_code: str = None,
361
        country: str = None,
362
        language: str = None,
363
    ):
364
        """
365
        The old API is to pass a dict of params (previously confusingly named `action_kwargs`),
366
        the keyword args are added here for clarity as to what the method accepts
367
        """
368
        request = view.request
×
369
        delete_esteid_session(request)
×
370

371
        params = params or {}
×
372
        if not id_code:
×
373
            id_code = params.get("id_code")
×
374

375
        if not country:
×
376
            country = params.get("country") or settings.ESTEID_COUNTRY
×
377

378
        files = view.get_files()
×
379
        container_path = view.get_bdoc_container_file()
×
380

381
        if not (files or container_path):
×
382
            return {
×
383
                "success": False,
384
                "code": "MIN_1_FILE",
385
            }
386

387
        service = TranslatedSmartIDService.get_instance()
×
388

389
        try:
×
390
            certificate, document_number = service.select_signing_certificate(id_code=id_code, country=country)
×
391
        except InvalidIdCode:
×
392
            return {
×
393
                "success": False,
394
                "code": "INVALID_ID_CODE",
395
            }
396
        except UserNotRegistered:
×
397
            return {
×
398
                "success": False,
399
                "code": "NOT_A_SMARTID_USER",
400
            }
401
        except EsteidError:
×
402
            logger.exception("An error occurred during SmartID certificate selection")
×
403
            raise
×
404

405
        container = open_container(container_path, files)
×
406
        xml_sig = container.prepare_signature(certificate)
×
407

408
        sign_result = service.sign_by_document_number(document_number, xml_sig.signed_data())
×
409

410
        signed_digest = sign_result.digest
×
411
        digest_hash_b64 = binascii.b2a_base64(signed_digest).decode()
×
412

413
        # always save container to a temp file
414
        with NamedTemporaryFile(mode="wb", delete=False) as temp_container_file:
×
415
            temp_container_file.write(container.finalize().getbuffer())
×
416

417
        # save intermediate signature XML to temp file
418
        with NamedTemporaryFile(delete=False) as temp_signature_file:
×
419
            temp_signature_file.write(xml_sig.dump())
×
420

421
        update_esteid_session(
×
422
            request,
423
            session_id=sign_result.session_id,
424
            digest_b64=digest_hash_b64,
425
            temp_container_file=temp_container_file.name,
426
            temp_signature_file=temp_signature_file.name,
427
        )
428

429
        return {
×
430
            "success": True,
431
            "verification_code": sign_result.verification_code,
432
        }
433

434

435
class SmartIdStatusAction(BaseAction):
1✔
436
    @classmethod
1✔
437
    def do_action(cls, view: "GenericDigitalSignViewMixin", params: dict = None):
1✔
438
        request = view.request
×
439
        session_data = get_esteid_session(request)
×
440
        if not session_data:
×
441
            return {
×
442
                "success": False,
443
                "code": "NO_SESSION",
444
            }
445

446
        # Continue with signing - poll status and finalize
447
        logger.debug("SmartID: polling status of signing")
×
448

449
        session_id = session_data["session_id"]
×
450
        signed_digest = session_data["digest_b64"]
×
451
        temp_signature_file = session_data["temp_signature_file"]
×
452
        temp_container_file = session_data["temp_container_file"]
×
453

454
        service = TranslatedSmartIDService.get_instance()
×
455
        try:
×
456
            status = service.sign_status(session_id, binascii.a2b_base64(signed_digest))
×
457
        except ActionInProgress:
×
458
            # Do not delete session here.
459
            return {
×
460
                "success": False,
461
                "pending": True,
462
            }
463
        except Exception:
×
464
            logger.exception("Failed to get signing status from Smart ID service")
×
465
            # NOTE: we could pick some exceptions that don't require cleanup,
466
            # but this also requires support from the party that polls this action.
467
            # Most likely the whole process would need to be restarted anyway
468
            delete_esteid_session(request)
×
469
            raise
×
470

471
        logger.debug("SmartID Signing complete")
×
472

473
        with open(temp_signature_file, "rb") as f:
×
474
            xml_sig = XmlSignature(f.read())
×
475

476
        # Load a partially prepared BDoc from a tempfile and clean it up
477
        container = Container.open(temp_container_file)
×
478

479
        # now we don't need the session anymore
480
        delete_esteid_session(request)
×
481

482
        xml_sig.set_signature_value(status.signature)
×
483

484
        issuer_cert = get_certificate(xml_sig.get_certificate_issuer_common_name())
×
485

486
        try:
×
487
            finalize_signature(
×
488
                xml_sig,
489
                issuer_cert,
490
                lt_ts=settings.ESTEID_USE_LT_TS,
491
                ocsp_url=settings.OCSP_URL,
492
                tsa_url=settings.TSA_URL,
493
            )
494
        except pyasice.Error:
×
495
            logger.exception("Signature confirmation service error")
×
496
            return {
×
497
                "success": False,
498
                "code": "SERVICE_ERROR",
499
            }
500

501
        container.add_signature(xml_sig)
×
502

503
        return {
×
504
            "success": True,
505
            "container": container,
506
        }
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc