• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

openwallet-foundation / acapy-vc-authn-oidc / 18390534355

09 Oct 2025 10:14PM UTC coverage: 91.885% (-0.2%) from 92.11%
18390534355

push

github

web-flow
Handle Redis Connection Failure (#856)

* Better handle redis failing and working without redis

Signed-off-by: Gavin Jaeger-Freeborn <gavinfreeborn@gmail.com>

* Remove REDIS_REQUIRED and instead default to crashing on error connecting to redis

Signed-off-by: Gavin Jaeger-Freeborn <gavinfreeborn@gmail.com>

* Migrate from os._exit(1) to sys.exit(1)

Signed-off-by: Gavin Jaeger-Freeborn <gavinfreeborn@gmail.com>

* formatting

Signed-off-by: Gavin Jaeger-Freeborn <gavinfreeborn@gmail.com>

* Extra testing

Signed-off-by: Gavin Jaeger-Freeborn <gavinfreeborn@gmail.com>

* Implement proper degradation for redis crashing

Signed-off-by: Gavin Jaeger-Freeborn <gavinfreeborn@gmail.com>

* Refactor Redis error handling in socketio.py

Signed-off-by: Gavin Jaeger-Freeborn <gavinfreeborn@gmail.com>

- Removed custom exception classes for Redis connection, configuration, and operation errors to simplify error handling.
- Updated error handling in `create_socket_manager()` to log failures without specific Redis error classification.
- Cleaned up imports in test files to reflect the removal of the custom exceptions.
- Enhanced comments for clarity on exception handling strategy.

* Rename Redis connection validation functions

Signed-off-by: Gavin Jaeger-Freeborn <gavinfreeborn@gmail.com>

- Renamed functions for clarity: `_validate_redis_before_manager_creation` to `can_we_reach_redis` and `validate_redis_connection` to `should_we_use_redis`.

* removed single use function

Signed-off-by: Gavin Jaeger-Freeborn <gavinfreeborn@gmail.com>

* Corrected tests to use safe_emit

Signed-off-by: Gavin Jaeger-Freeborn <gavinfreeborn@gmail.com>

---------

Signed-off-by: Gavin Jaeger-Freeborn <gavinfreeborn@gmail.com>

85 of 113 new or added lines in 3 files covered. (75.22%)

52 existing lines in 2 files now uncovered.

1472 of 1602 relevant lines covered (91.89%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

91.67
/oidc-controller/api/routers/acapy_handler.py
1
import json
1✔
2
from pydantic.plugin import Any
1✔
3
import structlog
1✔
4
from datetime import datetime, timedelta, UTC
1✔
5

6
from fastapi import APIRouter, Depends, Request
1✔
7
from pymongo.database import Database
1✔
8

9
from ..authSessions.crud import AuthSessionCRUD
1✔
10
from ..authSessions.models import AuthSession, AuthSessionPatch, AuthSessionState
1✔
11
from ..db.session import get_db
1✔
12
from ..core.acapy.client import AcapyClient
1✔
13
from ..verificationConfigs.crud import VerificationConfigCRUD
1✔
14

15
from ..core.config import settings
1✔
16
from ..routers.socketio import sio, get_socket_id_for_pid, safe_emit
1✔
17

18
logger: structlog.typing.FilteringBoundLogger = structlog.getLogger(__name__)
1✔
19

20
router = APIRouter()
1✔
21

22

23
async def _send_problem_report_safely(
1✔
24
    client: AcapyClient, pres_ex_id: str, description: str
25
) -> None:
26
    """Send a problem report with error handling."""
27
    try:
1✔
28
        client.send_problem_report(pres_ex_id, description)
1✔
29
        logger.info(f"Problem report sent for pres_ex_id: {pres_ex_id}")
1✔
UNCOV
30
    except Exception as e:
×
UNCOV
31
        logger.error(f"Failed to send problem report: {e}")
×
32

33

34
async def _cleanup_presentation_and_connection(
1✔
35
    auth_session: AuthSession, pres_ex_id: str, context: str
36
) -> None:
37
    """Clean up presentation record and connection (if applicable) with proper error handling."""
38
    # Determine if connection should also be deleted based on verification type and multi-use flag
39
    connection_id_to_delete = (
1✔
40
        auth_session.connection_id
41
        if (
42
            settings.USE_CONNECTION_BASED_VERIFICATION
43
            and auth_session.connection_id
44
            and not auth_session.multi_use  # Only delete single-use connections
45
        )
46
        else None
47
    )
48

49
    try:
1✔
50
        client = AcapyClient()
1✔
51
        presentation_deleted, connection_deleted, errors = (
1✔
52
            client.delete_presentation_record_and_connection(
53
                pres_ex_id, connection_id_to_delete
54
            )
55
        )
56

57
        # Log results for presentation cleanup
58
        if presentation_deleted:
1✔
59
            logger.info(
1✔
60
                f"Successfully cleaned up presentation record {pres_ex_id} after {context}"
61
            )
62
        else:
63
            logger.warning(
1✔
64
                f"Failed to cleanup presentation record {pres_ex_id} after {context} - will be handled by background cleanup"
65
            )
66

67
        # Log results for connection cleanup (if attempted)
68
        if connection_deleted:
1✔
69
            logger.info(
1✔
70
                f"Successfully cleaned up single-use connection {connection_id_to_delete} after {context}"
71
            )
72
        elif connection_id_to_delete:
1✔
73
            logger.warning(
1✔
74
                f"Failed to cleanup single-use connection {connection_id_to_delete} after {context}"
75
            )
76
        elif (
1✔
77
            settings.USE_CONNECTION_BASED_VERIFICATION
78
            and auth_session.connection_id
79
            and auth_session.multi_use
80
        ):
81
            logger.info(
1✔
82
                f"Preserving multi-use connection {auth_session.connection_id} after {context}"
83
            )
84

85
        # Log any errors from the cleanup operation
86
        if errors:
1✔
87
            logger.warning(f"{context.capitalize()} cleanup errors: {errors}")
1✔
88

89
    except Exception as cleanup_error:
1✔
90
        logger.warning(
1✔
91
            f"Cleanup failed for presentation record {pres_ex_id} after {context}: {cleanup_error} - will be handled by background cleanup"
92
        )
93

94

95
async def _update_auth_session(db: Database, auth_session: AuthSession) -> None:
1✔
96
    """Update auth session in database with error handling."""
97
    await AuthSessionCRUD(db).patch(
1✔
98
        str(auth_session.id), AuthSessionPatch(**auth_session.model_dump())
99
    )
100

101

102
async def _emit_status_to_socket(
1✔
103
    db: Database, auth_session: AuthSession, status: str
104
) -> None:
105
    """Emit status update to socket if session ID exists."""
106
    pid = str(auth_session.id)
1✔
107
    sid = await get_socket_id_for_pid(pid, db)
1✔
108
    if sid:
1✔
109
        await safe_emit("status", {"status": status}, to=sid)
1✔
110

111

112
async def _parse_webhook_body(request: Request) -> dict[Any, Any]:
1✔
113
    return json.loads((await request.body()).decode("ascii"))
1✔
114

115

116
@router.post("/topic/{topic}/")
1✔
117
async def post_topic(request: Request, topic: str, db: Database = Depends(get_db)):
1✔
118
    """Called by aca-py agent."""
119
    logger.info(f">>> post_topic : topic={topic}")
1✔
120
    logger.info(f">>> web hook post_body : {await _parse_webhook_body(request)}")
1✔
121

122
    match topic:
1✔
123
        case "connections":
1✔
124
            if settings.USE_CONNECTION_BASED_VERIFICATION:
1✔
125
                webhook_body = await _parse_webhook_body(request)
1✔
126
                logger.info(f">>>> connection_id: {webhook_body.get('connection_id')}")
1✔
127
                logger.info(f">>>> connection state: {webhook_body.get('state')}")
1✔
128

129
                # Log request state for debugging but don't act on it yet
130
                if webhook_body.get("state") == "request":
1✔
131
                    logger.info(
1✔
132
                        f"Connection {webhook_body.get('connection_id')} is in request state, waiting for active/completed"
133
                    )
134

135
                if webhook_body.get("state") in ["active", "completed"]:
1✔
136
                    # Connection established, now send presentation request
137
                    connection_id = webhook_body.get("connection_id")
1✔
138
                    invitation_msg_id = webhook_body.get("invitation_msg_id")
1✔
139

140
                    logger.debug(f"Full webhook body: {webhook_body}")
1✔
141
                    logger.debug(f"Available keys: {list(webhook_body.keys())}")
1✔
142

143
                    # Try multiple possible fields for invitation message ID
144
                    search_id = (
1✔
145
                        invitation_msg_id
146
                        or webhook_body.get("invi_msg_id")
147
                        or webhook_body.get("invitation_id")
148
                    )
149

150
                    # Find the auth session by invitation message ID (stored as connection_id initially)
151
                    logger.info(f"Looking for auth session with search_id: {search_id}")
1✔
152
                    auth_session = await AuthSessionCRUD(db).get_by_connection_id(
1✔
153
                        search_id
154
                    )
155

156
                    # If not found by invitation message ID, try by connection_id directly
157
                    if not auth_session and connection_id:
1✔
158
                        logger.info(
1✔
159
                            f"Trying to find auth session by connection_id: {connection_id}"
160
                        )
161
                        auth_session = await AuthSessionCRUD(db).get_by_connection_id(
1✔
162
                            connection_id
163
                        )
164

165
                    # If still not found, try searching by pres_exch_id pattern
166
                    if not auth_session and search_id:
1✔
167
                        logger.info(
1✔
168
                            f"Trying to find auth session by pres_exch_id pattern: {search_id}"
169
                        )
170
                        try:
1✔
171
                            auth_session = await AuthSessionCRUD(
1✔
172
                                db
173
                            ).get_by_pres_exch_id(f"{search_id}")
174
                        except:
1✔
175
                            pass  # This lookup might fail if the pattern doesn't match
1✔
176

177
                    if auth_session:
1✔
178
                        logger.info(f"Found auth session: {auth_session.id}")
1✔
179
                        logger.info(
1✔
180
                            f"Auth session has proof_request: {auth_session.proof_request is not None}"
181
                        )
182

183
                        if auth_session.proof_request:
1✔
184
                            logger.info(
1✔
185
                                f"Sending presentation request to connection {connection_id}"
186
                            )
187

188
                            # Send presentation request to the established connection
189
                            client = AcapyClient()
1✔
190
                            try:
1✔
191
                                pres_response = client.send_presentation_request_by_connection(
1✔
192
                                    connection_id=connection_id,
193
                                    presentation_request_configuration=auth_session.proof_request,
194
                                )
195

196
                                # Update auth session with presentation exchange details and real connection ID
197
                                auth_session.pres_exch_id = pres_response.pres_ex_id
1✔
198
                                auth_session.presentation_exchange = (
1✔
199
                                    pres_response.model_dump()
200
                                )
201
                                auth_session.connection_id = (
1✔
202
                                    connection_id  # Update with real connection ID
203
                                )
204
                                await _update_auth_session(db, auth_session)
1✔
205

206
                                logger.info(
1✔
207
                                    f"Presentation request sent successfully: {pres_response.pres_ex_id}"
208
                                )
209
                            except Exception as e:
1✔
210
                                logger.error(
1✔
211
                                    f"Failed to send presentation request: {e}"
212
                                )
213
                                # Set auth session to failed state
214
                                auth_session.proof_status = AuthSessionState.FAILED
1✔
215
                                await _update_auth_session(db, auth_session)
1✔
216

217
                                # Send problem report if we have a presentation exchange ID
218
                                if auth_session.pres_exch_id:
1✔
219
                                    await _send_problem_report_safely(
1✔
220
                                        client,
221
                                        auth_session.pres_exch_id,
222
                                        f"Failed to send presentation request: {str(e)}",
223
                                    )
224

225
                                # Emit failure status to frontend
226
                                await _emit_status_to_socket(db, auth_session, "failed")
1✔
227
                        else:
228
                            logger.debug(
1✔
229
                                f"Auth session found but no proof_request: {auth_session.id}"
230
                            )
231
                    else:
232
                        logger.debug(
1✔
233
                            f"No auth session found for invitation_msg_id: {invitation_msg_id}"
234
                        )
235

236
        case "present_proof_v2_0":
1✔
237
            webhook_body = await _parse_webhook_body(request)
1✔
238
            logger.info(f">>>> pres_exch_id: {webhook_body['pres_ex_id']}")
1✔
239
            # logger.info(f">>>> web hook: {webhook_body}")
240
            auth_session: AuthSession = await AuthSessionCRUD(db).get_by_pres_exch_id(
1✔
241
                webhook_body["pres_ex_id"]
242
            )
243

244
            # Get the saved websocket session
245
            pid = str(auth_session.id)
1✔
246
            sid = await get_socket_id_for_pid(pid, db)
1✔
247
            logger.debug(f"sid: {sid} found for pid: {pid}")
1✔
248

249
            if webhook_body["state"] == "presentation-received":
1✔
UNCOV
250
                logger.info("presentation-received")
×
251

252
            if webhook_body["state"] == "done":
1✔
253
                logger.info("VERIFIED")
1✔
254
                if webhook_body["verified"] == "true":
1✔
255
                    auth_session.proof_status = AuthSessionState.VERIFIED
1✔
256

257
                    # Get presentation data via API call instead of webhook payload
258
                    client = AcapyClient()
1✔
259
                    presentation_data = client.get_presentation_request(
1✔
260
                        webhook_body["pres_ex_id"]
261
                    )
262

263
                    if not presentation_data:
1✔
264
                        raise ValueError(
1✔
265
                            f"Failed to retrieve presentation data for {webhook_body['pres_ex_id']} - record may have been deleted or is inaccessible"
266
                        )
267

268
                    auth_session.presentation_exchange = presentation_data.get(
1✔
269
                        "by_format", {}
270
                    )
271
                    logger.debug(
1✔
272
                        f"Retrieved presentation data via API for {webhook_body['pres_ex_id']}"
273
                    )
274

275
                    # Cleanup presentation record and connection after successful verification
276
                    await _cleanup_presentation_and_connection(
1✔
277
                        auth_session,
278
                        webhook_body["pres_ex_id"],
279
                        "successful verification",
280
                    )
281

282
                    await _emit_status_to_socket(db, auth_session, "verified")
1✔
283
                else:
284
                    auth_session.proof_status = AuthSessionState.FAILED
1✔
285
                    await _emit_status_to_socket(db, auth_session, "failed")
1✔
286

287
                    # Send problem report for failed verification in connection-based flow
288
                    if (
1✔
289
                        settings.USE_CONNECTION_BASED_VERIFICATION
290
                        and auth_session.pres_exch_id
291
                    ):
292
                        client = AcapyClient()
1✔
293
                        await _send_problem_report_safely(
1✔
294
                            client,
295
                            auth_session.pres_exch_id,
296
                            f"Presentation verification failed: {webhook_body.get('error_msg', 'Unknown error')}",
297
                        )
298

299
                await _update_auth_session(db, auth_session)
1✔
300

301
                # Connection cleanup is now handled above in the combined cleanup operation
302

303
            # abandoned state
304
            if webhook_body["state"] == "abandoned":
1✔
305
                logger.info("ABANDONED")
1✔
306
                logger.info(webhook_body["error_msg"])
1✔
307
                auth_session.proof_status = AuthSessionState.ABANDONED
1✔
308
                await _emit_status_to_socket(db, auth_session, "abandoned")
1✔
309

310
                # Send problem report for abandoned presentation in connection-based flow
311
                if (
1✔
312
                    settings.USE_CONNECTION_BASED_VERIFICATION
313
                    and auth_session.pres_exch_id
314
                ):
315
                    client = AcapyClient()
1✔
316
                    await _send_problem_report_safely(
1✔
317
                        client,
318
                        auth_session.pres_exch_id,
319
                        f"Presentation abandoned: {webhook_body.get('error_msg', 'Unknown error')}",
320
                    )
321

322
                await _update_auth_session(db, auth_session)
1✔
323

324
                # Cleanup presentation record and connection after abandonment
325
                await _cleanup_presentation_and_connection(
1✔
326
                    auth_session, webhook_body["pres_ex_id"], "abandonment"
327
                )
328

329
            # Calcuate the expiration time of the proof
330
            now_time = datetime.now(UTC)
1✔
331
            expired_time = now_time + timedelta(
1✔
332
                seconds=settings.CONTROLLER_PRESENTATION_EXPIRE_TIME
333
            )
334

335
            # Update the expiration time of the proof
336
            auth_session.expired_timestamp = expired_time
1✔
337
            await AuthSessionCRUD(db).patch(
1✔
338
                str(auth_session.id), AuthSessionPatch(**auth_session.model_dump())
339
            )
340

341
            # Check if expired. But only if the proof has not been started.
342
            # Handle comparison between timezone-aware and naive datetimes
343
            if auth_session.expired_timestamp.tzinfo is not None:
1✔
344
                # Use timezone-aware comparison if database has timezone-aware timestamp
345
                expired_time = datetime.now(UTC) + timedelta(
1✔
346
                    seconds=settings.CONTROLLER_PRESENTATION_EXPIRE_TIME
347
                )
348
                now_time = datetime.now(UTC)
1✔
349

350
            if (
1✔
351
                expired_time < now_time
352
                and auth_session.proof_status == AuthSessionState.NOT_STARTED
353
            ):
UNCOV
354
                logger.info("EXPIRED")
×
UNCOV
355
                auth_session.proof_status = AuthSessionState.EXPIRED
×
UNCOV
356
                await _emit_status_to_socket(db, auth_session, "expired")
×
357

358
                # Send problem report for expired presentation in connection-based flow
359
                if (
×
360
                    settings.USE_CONNECTION_BASED_VERIFICATION
361
                    and auth_session.pres_exch_id
362
                ):
UNCOV
363
                    client = AcapyClient()
×
UNCOV
364
                    await _send_problem_report_safely(
×
365
                        client,
366
                        auth_session.pres_exch_id,
367
                        f"Presentation expired: timeout after {settings.CONTROLLER_PRESENTATION_EXPIRE_TIME} seconds",
368
                    )
369

UNCOV
370
                await _update_auth_session(db, auth_session)
×
371

372
                # Cleanup presentation record and connection after expiration
UNCOV
373
                await _cleanup_presentation_and_connection(
×
374
                    auth_session, auth_session.pres_exch_id, "expiration"
375
                )
376

377
            pass
1✔
UNCOV
378
        case _:
×
UNCOV
379
            logger.debug("skipping webhook")
×
380

381
    return {}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc