• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

openwallet-foundation / acapy-vc-authn-oidc / 22600817255

02 Mar 2026 11:36PM UTC coverage: 89.577% (+0.5%) from 89.038%
22600817255

Pull #971

github

web-flow
Merge c3d09f238 into 34fe2d17c
Pull Request #971: Replace requests with httpx for async HTTP

169 of 179 new or added lines in 9 files covered. (94.41%)

9 existing lines in 2 files now uncovered.

2458 of 2744 relevant lines covered (89.58%)

0.9 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.99
/oidc-controller/api/routers/acapy_handler.py
1
import json
1✔
2
import time
1✔
3
from datetime import UTC, datetime, timedelta
1✔
4

5
import structlog
1✔
6
from fastapi import APIRouter, Depends, Request
1✔
7
from pydantic.plugin import Any
1✔
8
from pymongo.database import Database
1✔
9

10
from ..authSessions.crud import AuthSessionCRUD
1✔
11
from ..authSessions.models import AuthSession, AuthSessionPatch, AuthSessionState
1✔
12
from ..core.acapy.client import AcapyClient
1✔
13
from ..core.config import settings
1✔
14
from ..routers.socketio import sio, get_socket_id_for_pid, safe_emit
1✔
15
from ..core.siem_audit import (
1✔
16
    audit_proof_verification_failed,
17
    audit_proof_verified,
18
    audit_session_abandoned,
19
    audit_session_expired,
20
    audit_webhook_received,
21
)
22
from ..db.session import get_db
1✔
23
from ..verificationConfigs.crud import VerificationConfigCRUD
1✔
24

25
logger: structlog.typing.FilteringBoundLogger = structlog.getLogger(__name__)
1✔
26

27
router = APIRouter()
1✔
28

29

30
def _extract_credential_schemas(presentation_data: dict) -> list[str]:
1✔
31
    """
32
    Extract schema names from verified presentation data.
33

34
    Safe to log - schema names are public metadata, not PII.
35
    """
36
    schemas = set()
1✔
37
    try:
1✔
38
        # Try to extract from various proof formats
39
        by_format = presentation_data.get("by_format", {})
1✔
40
        for format_key in ["indy", "anoncreds"]:
1✔
41
            if format_key in by_format:
1✔
42
                pres = by_format[format_key].get("pres", {})
1✔
43
                identifiers = pres.get("identifiers", [])
1✔
44
                for identifier in identifiers:
1✔
45
                    if schema_id := identifier.get("schema_id"):
1✔
46
                        # schema_id format: <issuer_did>:2:<schema_name>:<version>
47
                        # The issuer DID may itself contain colons (e.g. did:sov:ABC),
48
                        # so we locate the ":2:" marker to split reliably.
49
                        marker = ":2:"
1✔
50
                        marker_pos = schema_id.find(marker)
1✔
51
                        if marker_pos != -1:
1✔
52
                            remainder = schema_id[marker_pos + len(marker) :]
1✔
53
                            # remainder is "<schema_name>:<version>"
54
                            rem_parts = remainder.split(":")
1✔
55
                            if len(rem_parts) >= 2:
1✔
56
                                schemas.add(rem_parts[0])
1✔
57
                            else:
58
                                schemas.add(remainder)
1✔
59
                        else:
60
                            schemas.add(schema_id)
1✔
61
    except Exception as e:
×
62
        # Return empty list if extraction fails
63
        logger.debug(f"Failed to extract schemas from presentation data: {e}")
×
64
    return sorted(list(schemas))
1✔
65

66

67
def _extract_issuer_dids(presentation_data: dict) -> list[str]:
1✔
68
    """
69
    Extract issuer DIDs from verified presentation data.
70

71
    Safe to log - DIDs are public identifiers, not PII.
72
    """
73
    issuers = set()
1✔
74
    try:
1✔
75
        by_format = presentation_data.get("by_format", {})
1✔
76
        for format_key in ["indy", "anoncreds"]:
1✔
77
            if format_key in by_format:
1✔
78
                pres = by_format[format_key].get("pres", {})
1✔
79
                identifiers = pres.get("identifiers", [])
1✔
80
                for identifier in identifiers:
1✔
81
                    if cred_def_id := identifier.get("cred_def_id"):
1✔
82
                        # cred_def_id format: <issuer_did>:3:CL:<schema_seq_no>:<tag>
83
                        # The issuer DID may itself contain colons (e.g. did:sov:ABC),
84
                        # so we locate the ":3:CL:" marker to extract the DID prefix.
85
                        marker = ":3:CL:"
1✔
86
                        marker_pos = cred_def_id.find(marker)
1✔
87
                        if marker_pos != -1:
1✔
88
                            issuers.add(cred_def_id[:marker_pos])
1✔
89
                        else:
90
                            issuers.add(cred_def_id)
1✔
91
    except Exception as e:
×
92
        # Return empty list if extraction fails
93
        logger.debug(f"Failed to extract issuer DIDs from presentation data: {e}")
×
94
    return sorted(list(issuers))
1✔
95

96

97
async def _send_problem_report_safely(
1✔
98
    client: AcapyClient, pres_ex_id: str, description: str
99
) -> None:
100
    """Send a problem report with error handling."""
101
    try:
1✔
102
        await client.send_problem_report(pres_ex_id, description)
1✔
103
        logger.info(f"Problem report sent for pres_ex_id: {pres_ex_id}")
1✔
104
    except Exception as e:
1✔
105
        logger.error(f"Failed to send problem report: {e}")
1✔
106

107

108
async def _cleanup_presentation_and_connection(
1✔
109
    client: AcapyClient, auth_session: AuthSession, pres_ex_id: str, context: str
110
) -> None:
111
    """Clean up presentation record and connection (if applicable) with proper error handling."""
112
    connection_id_to_delete = (
1✔
113
        auth_session.connection_id
114
        if (
115
            settings.USE_CONNECTION_BASED_VERIFICATION
116
            and auth_session.connection_id
117
            and not auth_session.multi_use  # Only delete single-use connections
118
        )
119
        else None
120
    )
121

122
    try:
1✔
123
        (
1✔
124
            presentation_deleted,
125
            connection_deleted,
126
            errors,
127
        ) = await client.delete_presentation_record_and_connection(
128
            pres_ex_id, connection_id_to_delete
129
        )
130

131
        if presentation_deleted:
1✔
132
            logger.info(
1✔
133
                f"Successfully cleaned up presentation record {pres_ex_id} after {context}"
134
            )
135
        else:
136
            logger.warning(
1✔
137
                f"Failed to cleanup presentation record {pres_ex_id} after {context} - will be handled by background cleanup"
138
            )
139

140
        if connection_deleted:
1✔
141
            logger.info(
1✔
142
                f"Successfully cleaned up single-use connection {connection_id_to_delete} after {context}"
143
            )
144
        elif connection_id_to_delete:
1✔
145
            logger.warning(
1✔
146
                f"Failed to cleanup single-use connection {connection_id_to_delete} after {context}"
147
            )
148
        elif (
1✔
149
            settings.USE_CONNECTION_BASED_VERIFICATION
150
            and auth_session.connection_id
151
            and auth_session.multi_use
152
        ):
153
            logger.info(
1✔
154
                f"Preserving multi-use connection {auth_session.connection_id} after {context}"
155
            )
156

157
        if errors:
1✔
158
            logger.warning(f"{context.capitalize()} cleanup errors: {errors}")
1✔
159

160
    except Exception as cleanup_error:
1✔
161
        logger.warning(
1✔
162
            f"Cleanup failed for presentation record {pres_ex_id} after {context}: {cleanup_error} - will be handled by background cleanup"
163
        )
164

165

166
async def _update_auth_session(db: Database, auth_session: AuthSession) -> None:
1✔
167
    """Update auth session in database with error handling."""
168
    await AuthSessionCRUD(db).patch(
1✔
169
        str(auth_session.id), AuthSessionPatch(**auth_session.model_dump())
170
    )
171

172

173
async def _emit_status_to_socket(
1✔
174
    db: Database, auth_session: AuthSession, status: str
175
) -> None:
176
    """Emit status update to socket if session ID exists."""
177
    pid = str(auth_session.id)
1✔
178
    sid = await get_socket_id_for_pid(pid, db)
1✔
179
    if sid:
1✔
180
        await safe_emit("status", {"status": status}, to=sid)
1✔
181

182

183
async def _parse_webhook_body(request: Request) -> dict[Any, Any]:
1✔
184
    return json.loads((await request.body()).decode("ascii"))
1✔
185

186

187
@router.post("/topic/{topic}/")
1✔
188
async def post_topic(request: Request, topic: str, db: Database = Depends(get_db)):
1✔
189
    """Called by aca-py agent."""
190
    webhook_start_time = time.time()
1✔
191
    logger.info(f">>> post_topic : topic={topic}")
1✔
192
    # Note: Full webhook body is logged at DEBUG level only to protect privacy
193
    logger.debug(f">>> web hook received for topic: {topic}")
1✔
194

195
    client = AcapyClient(request.app.state.http_client)
1✔
196

197
    match topic:
1✔
198
        case "connections":
1✔
199
            if settings.USE_CONNECTION_BASED_VERIFICATION:
1✔
200
                webhook_body = await _parse_webhook_body(request)
1✔
201
                logger.info(f">>>> connection_id: {webhook_body.get('connection_id')}")
1✔
202
                logger.info(f">>>> connection state: {webhook_body.get('state')}")
1✔
203

204
                # Log request state for debugging but don't act on it yet
205
                if webhook_body.get("state") == "request":
1✔
206
                    logger.info(
1✔
207
                        f"Connection {webhook_body.get('connection_id')} is in request state, waiting for active/completed"
208
                    )
209

210
                if webhook_body.get("state") in ["active", "completed"]:
1✔
211
                    # Connection established, now send presentation request
212
                    connection_id = webhook_body.get("connection_id")
1✔
213
                    invitation_msg_id = webhook_body.get("invitation_msg_id")
1✔
214

215
                    logger.debug(f"Full webhook body: {webhook_body}")
1✔
216
                    logger.debug(f"Available keys: {list(webhook_body.keys())}")
1✔
217

218
                    # Try multiple possible fields for invitation message ID
219
                    search_id = (
1✔
220
                        invitation_msg_id
221
                        or webhook_body.get("invi_msg_id")
222
                        or webhook_body.get("invitation_id")
223
                    )
224

225
                    # Find the auth session by invitation message ID (stored as connection_id initially)
226
                    logger.info(f"Looking for auth session with search_id: {search_id}")
1✔
227
                    auth_session = await AuthSessionCRUD(db).get_by_connection_id(
1✔
228
                        search_id
229
                    )
230

231
                    # If not found by invitation message ID, try by connection_id directly
232
                    if not auth_session and connection_id:
1✔
233
                        logger.info(
1✔
234
                            f"Trying to find auth session by connection_id: {connection_id}"
235
                        )
236
                        auth_session = await AuthSessionCRUD(db).get_by_connection_id(
1✔
237
                            connection_id
238
                        )
239

240
                    # If still not found, try searching by pres_exch_id pattern
241
                    if not auth_session and search_id:
1✔
242
                        logger.info(
1✔
243
                            f"Trying to find auth session by pres_exch_id pattern: {search_id}"
244
                        )
245
                        try:
1✔
246
                            auth_session = await AuthSessionCRUD(
1✔
247
                                db
248
                            ).get_by_pres_exch_id(f"{search_id}")
249
                        except:
1✔
250
                            pass  # This lookup might fail if the pattern doesn't match
1✔
251

252
                    if auth_session:
1✔
253
                        logger.info(f"Found auth session: {auth_session.id}")
1✔
254
                        logger.info(
1✔
255
                            f"Auth session has proof_request: {auth_session.proof_request is not None}"
256
                        )
257

258
                        if auth_session.proof_request:
1✔
259
                            logger.info(
1✔
260
                                f"Sending presentation request to connection {connection_id}"
261
                            )
262

263
                            # Send presentation request to the established connection
264
                            try:
1✔
265
                                pres_response = await client.send_presentation_request_by_connection(
1✔
266
                                    connection_id=connection_id,
267
                                    presentation_request_configuration=auth_session.proof_request,
268
                                )
269

270
                                # Update auth session with presentation exchange details and real connection ID
UNCOV
271
                                auth_session.pres_exch_id = pres_response.pres_ex_id
×
UNCOV
272
                                auth_session.presentation_exchange = (
×
273
                                    pres_response.model_dump()
274
                                )
UNCOV
275
                                auth_session.connection_id = (
×
276
                                    connection_id  # Update with real connection ID
277
                                )
UNCOV
278
                                await _update_auth_session(db, auth_session)
×
279

UNCOV
280
                                logger.info(
×
281
                                    f"Presentation request sent successfully: {pres_response.pres_ex_id}"
282
                                )
283
                            except Exception as e:
1✔
284
                                logger.error(
1✔
285
                                    f"Failed to send presentation request: {e}"
286
                                )
287
                                # Set auth session to failed state
288
                                auth_session.proof_status = AuthSessionState.FAILED
1✔
289
                                await _update_auth_session(db, auth_session)
1✔
290

291
                                # Send problem report if we have a presentation exchange ID
292
                                if auth_session.pres_exch_id:
1✔
293
                                    await _send_problem_report_safely(
1✔
294
                                        client,
295
                                        auth_session.pres_exch_id,
296
                                        f"Failed to send presentation request: {str(e)}",
297
                                    )
298

299
                                # Emit failure status to frontend
300
                                await _emit_status_to_socket(db, auth_session, "failed")
1✔
301
                        else:
302
                            logger.debug(
1✔
303
                                f"Auth session found but no proof_request: {auth_session.id}"
304
                            )
305
                    else:
306
                        logger.debug(
1✔
307
                            f"No auth session found for invitation_msg_id: {invitation_msg_id}"
308
                        )
309

310
        case "present_proof_v2_0":
1✔
311
            webhook_body = await _parse_webhook_body(request)
1✔
312

313
            state = webhook_body.get("state")
1✔
314
            role = webhook_body.get("role")
1✔
315

316
            # SIEM Audit: Log webhook receipt (safe metadata only)
317
            audit_webhook_received(
1✔
318
                topic="present_proof_v2_0",
319
                state=state,
320
                role=role,
321
            )
322

323
            logger.info(
1✔
324
                f">>>> pres_exch_id: {webhook_body['pres_ex_id']}, state: {state}"
325
            )
326

327
            # Check for prover-role (issue #898)
328

329
            if role == "prover":
1✔
330
                # Handle prover-role separately - VC-AuthN is responding to a proof request
331
                pres_ex_id = webhook_body.get("pres_ex_id")
1✔
332
                connection_id = webhook_body.get("connection_id")
1✔
333
                state = webhook_body.get("state")
1✔
334

335
                deleted = False
1✔
336
                delete_error = None
1✔
337

338
                # Clean up presentation records in terminal states
339
                if pres_ex_id and state in ["done", "abandoned", "declined"]:
1✔
340
                    try:
1✔
341
                        deleted = await client.delete_presentation_record(pres_ex_id)
1✔
342
                        if not deleted:
1✔
343
                            logger.warning(
1✔
344
                                "Failed to delete prover-role presentation record",
345
                                pres_ex_id=pres_ex_id,
346
                                state=state,
347
                            )
348
                    except Exception as e:
1✔
349
                        delete_error = str(e)
1✔
350
                        logger.error(
1✔
351
                            "Error deleting prover-role presentation record",
352
                            pres_ex_id=pres_ex_id,
353
                            error=delete_error,
354
                        )
355

356
                logger.info(
1✔
357
                    f"Prover-role webhook received: {state}",
358
                    pres_ex_id=pres_ex_id,
359
                    connection_id=connection_id,
360
                    deleted=deleted,
361
                    delete_error=delete_error,
362
                    role=role,
363
                    state=state,
364
                    timestamp=datetime.now(UTC).isoformat(),
365
                )
366

367
                # Return early - do NOT trigger verifier-role logic or cleanup
368
                return {"status": "prover-role event logged"}
1✔
369

370
            # Existing verifier-role code continues below...
371
            auth_session: AuthSession = await AuthSessionCRUD(db).get_by_pres_exch_id(
1✔
372
                webhook_body["pres_ex_id"]
373
            )
374

375
            # Get the saved websocket session
376
            pid = str(auth_session.id)
1✔
377
            sid = await get_socket_id_for_pid(pid, db)
1✔
378
            logger.debug(f"sid: {sid} found for pid: {pid}")
1✔
379

380
            if webhook_body["state"] == "presentation-received":
1✔
381
                logger.info("presentation-received")
×
382

383
            if webhook_body["state"] == "done":
1✔
384
                duration_ms = int((time.time() - webhook_start_time) * 1000)
1✔
385

386
                if webhook_body["verified"] == "true":
1✔
387
                    logger.info("VERIFIED")
1✔
388
                    auth_session.proof_status = AuthSessionState.VERIFIED
1✔
389

390
                    # Get presentation data via API call instead of webhook payload
391
                    presentation_data = await client.get_presentation_request(
1✔
392
                        webhook_body["pres_ex_id"]
393
                    )
394

395
                    if not presentation_data:
1✔
396
                        raise ValueError(
1✔
397
                            f"Failed to retrieve presentation data for {webhook_body['pres_ex_id']} - record may have been deleted or is inaccessible"
398
                        )
399

400
                    auth_session.presentation_exchange = presentation_data.get(
1✔
401
                        "by_format", {}
402
                    )
403
                    logger.debug(
1✔
404
                        f"Retrieved presentation data via API for {webhook_body['pres_ex_id']}"
405
                    )
406

407
                    # SIEM Audit: Log successful verification (metadata only, no PII)
408
                    # Extract schema names from presentation for audit
409
                    credential_schemas = _extract_credential_schemas(presentation_data)
1✔
410
                    issuer_dids = _extract_issuer_dids(presentation_data)
1✔
411

412
                    audit_proof_verified(
1✔
413
                        session_id=str(auth_session.id),
414
                        ver_config_id=auth_session.ver_config_id,
415
                        credential_schemas=credential_schemas,
416
                        issuer_dids=issuer_dids,
417
                        duration_ms=duration_ms,
418
                        revocation_checked=settings.SET_NON_REVOKED,
419
                    )
420

421
                    # Cleanup presentation record and connection after successful verification
422
                    await _cleanup_presentation_and_connection(
1✔
423
                        client,
424
                        auth_session,
425
                        webhook_body["pres_ex_id"],
426
                        "successful verification",
427
                    )
428

429
                    await _emit_status_to_socket(db, auth_session, "verified")
1✔
430
                else:
431
                    logger.info("VERIFICATION FAILED")
1✔
432
                    auth_session.proof_status = AuthSessionState.FAILED
1✔
433

434
                    # SIEM Audit: Log failed verification
435
                    audit_proof_verification_failed(
1✔
436
                        session_id=str(auth_session.id),
437
                        ver_config_id=auth_session.ver_config_id,
438
                        failure_category="unknown",  # ACA-Py doesn't provide detailed failure reason
439
                        duration_ms=duration_ms,
440
                    )
441

442
                    await _emit_status_to_socket(db, auth_session, "failed")
1✔
443

444
                    # Send problem report for failed verification in connection-based flow
445
                    if (
1✔
446
                        settings.USE_CONNECTION_BASED_VERIFICATION
447
                        and auth_session.pres_exch_id
448
                    ):
449
                        await _send_problem_report_safely(
1✔
450
                            client,
451
                            auth_session.pres_exch_id,
452
                            f"Presentation verification failed: {webhook_body.get('error_msg', 'Unknown error')}",
453
                        )
454

455
                await _update_auth_session(db, auth_session)
1✔
456

457
                # Connection cleanup is now handled above in the combined cleanup operation
458
            if webhook_body["state"] == "abandoned":
1✔
459
                logger.info("ABANDONED")
1✔
460
                # Note: error_msg may contain sensitive info, log at debug level only
461
                logger.debug(
1✔
462
                    f"Abandonment reason: {webhook_body.get('error_msg', 'No reason provided')}"
463
                )
464
                auth_session.proof_status = AuthSessionState.ABANDONED
1✔
465

466
                # SIEM Audit: Log session abandonment
467
                duration_ms = int((time.time() - webhook_start_time) * 1000)
1✔
468
                audit_session_abandoned(
1✔
469
                    session_id=str(auth_session.id),
470
                    ver_config_id=auth_session.ver_config_id,
471
                    phase="wallet_response",
472
                    duration_ms=duration_ms,
473
                )
474

475
                await _emit_status_to_socket(db, auth_session, "abandoned")
1✔
476

477
                # Send problem report for abandoned presentation in connection-based flow
478
                if (
1✔
479
                    settings.USE_CONNECTION_BASED_VERIFICATION
480
                    and auth_session.pres_exch_id
481
                ):
482
                    await _send_problem_report_safely(
1✔
483
                        client,
484
                        auth_session.pres_exch_id,
485
                        f"Presentation abandoned: {webhook_body.get('error_msg', 'Unknown error')}",
486
                    )
487

488
                await _update_auth_session(db, auth_session)
1✔
489

490
                # Cleanup presentation record and connection after abandonment
491
                await _cleanup_presentation_and_connection(
1✔
492
                    client, auth_session, webhook_body["pres_ex_id"], "abandonment"
493
                )
494

495
            # Calcuate the expiration time of the proof
496
            now_time = datetime.now(UTC)
1✔
497
            expired_time = now_time + timedelta(
1✔
498
                seconds=settings.CONTROLLER_PRESENTATION_EXPIRE_TIME
499
            )
500

501
            # Update the expiration time of the proof
502
            auth_session.expired_timestamp = expired_time
1✔
503
            await AuthSessionCRUD(db).patch(
1✔
504
                str(auth_session.id), AuthSessionPatch(**auth_session.model_dump())
505
            )
506

507
            # Check if expired. But only if the proof has not been started.
508
            # Handle comparison between timezone-aware and naive datetimes
509
            if auth_session.expired_timestamp.tzinfo is not None:
1✔
510
                # Use timezone-aware comparison if database has timezone-aware timestamp
511
                expired_time = datetime.now(UTC) + timedelta(
1✔
512
                    seconds=settings.CONTROLLER_PRESENTATION_EXPIRE_TIME
513
                )
514
                now_time = datetime.now(UTC)
1✔
515

516
            if (
1✔
517
                expired_time < now_time
518
                and auth_session.proof_status == AuthSessionState.NOT_STARTED
519
            ):
520
                logger.info("EXPIRED")
×
521
                auth_session.proof_status = AuthSessionState.EXPIRED
×
522

523
                # SIEM Audit: Log session expiration
524
                audit_session_expired(
×
525
                    session_id=str(auth_session.id),
526
                    ver_config_id=auth_session.ver_config_id,
527
                    phase="qr_scan",
528
                    timeout_seconds=settings.CONTROLLER_PRESENTATION_EXPIRE_TIME,
529
                )
530

531
                await _emit_status_to_socket(db, auth_session, "expired")
×
532

533
                # Send problem report for expired presentation in connection-based flow
534
                if (
×
535
                    settings.USE_CONNECTION_BASED_VERIFICATION
536
                    and auth_session.pres_exch_id
537
                ):
538
                    await _send_problem_report_safely(
×
539
                        client,
540
                        auth_session.pres_exch_id,
541
                        f"Presentation expired: timeout after {settings.CONTROLLER_PRESENTATION_EXPIRE_TIME} seconds",
542
                    )
543

544
                await _update_auth_session(db, auth_session)
×
545

546
                # Cleanup presentation record and connection after expiration
547
                await _cleanup_presentation_and_connection(
×
548
                    client, auth_session, auth_session.pres_exch_id, "expiration"
549
                )
550

551
            pass
1✔
552
        case _:
×
553
            logger.debug("skipping webhook")
×
554

555
    return {}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc