• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

openwallet-foundation / acapy-vc-authn-oidc / 18390534355

09 Oct 2025 10:14PM UTC coverage: 91.885% (-0.2%) from 92.11%
18390534355

push

github

web-flow
Handle Redis Connection Failure (#856)

* Better handle redis failing and working without redis

Signed-off-by: Gavin Jaeger-Freeborn <gavinfreeborn@gmail.com>

* Remove REDIS_REQUIRED and instead default to crashing on error connecting to redis

Signed-off-by: Gavin Jaeger-Freeborn <gavinfreeborn@gmail.com>

* Migrate from os._exit(1) to sys.exit(1)

Signed-off-by: Gavin Jaeger-Freeborn <gavinfreeborn@gmail.com>

* formatting

Signed-off-by: Gavin Jaeger-Freeborn <gavinfreeborn@gmail.com>

* Extra testing

Signed-off-by: Gavin Jaeger-Freeborn <gavinfreeborn@gmail.com>

* Implement proper degradation for redis crashing

Signed-off-by: Gavin Jaeger-Freeborn <gavinfreeborn@gmail.com>

* Refactor Redis error handling in socketio.py

Signed-off-by: Gavin Jaeger-Freeborn <gavinfreeborn@gmail.com>

- Removed custom exception classes for Redis connection, configuration, and operation errors to simplify error handling.
- Updated error handling in `create_socket_manager()` to log failures without specific Redis error classification.
- Cleaned up imports in test files to reflect the removal of the custom exceptions.
- Enhanced comments for clarity on exception handling strategy.

* Rename Redis connection validation functions

Signed-off-by: Gavin Jaeger-Freeborn <gavinfreeborn@gmail.com>

- Renamed functions for clarity: `_validate_redis_before_manager_creation` to `can_we_reach_redis` and `validate_redis_connection` to `should_we_use_redis`.

* removed single use function

Signed-off-by: Gavin Jaeger-Freeborn <gavinfreeborn@gmail.com>

* Corrected tests to use safe_emit

Signed-off-by: Gavin Jaeger-Freeborn <gavinfreeborn@gmail.com>

---------

Signed-off-by: Gavin Jaeger-Freeborn <gavinfreeborn@gmail.com>

85 of 113 new or added lines in 3 files covered. (75.22%)

52 existing lines in 2 files now uncovered.

1472 of 1602 relevant lines covered (91.89%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.43
/oidc-controller/api/routers/socketio.py
1
import asyncio
1✔
2
import redis.asyncio as async_redis
1✔
3
import redis
1✔
4
import socketio  # For using websockets
1✔
5
import logging
1✔
6
import structlog
1✔
7
from fastapi import Depends
1✔
8
from pymongo.database import Database
1✔
9

10
from ..authSessions.crud import AuthSessionCRUD
1✔
11
from ..db.session import get_db, client
1✔
12
from ..core.config import settings
1✔
13

14
logger = structlog.getLogger(__name__)
1✔
15

16

17
class RedisErrorType:
1✔
18
    """Error type classification for Redis failures"""
19

20
    CONNECTION = "connection"
1✔
21
    CONFIGURATION = "configuration"
1✔
22
    OPERATION = "operation"
1✔
23

24

25
def _classify_redis_error(operation: str, error: Exception) -> str:
1✔
26
    """Classify Redis error type based on operation and error details"""
27
    error_str = str(error).lower()
1✔
28

29
    # Connection-related errors (potentially recoverable)
30
    if any(
1✔
31
        keyword in error_str
32
        for keyword in [
33
            "connection refused",
34
            "connection failed",
35
            "connection timeout",
36
            "network is unreachable",
37
            "no route to host",
38
            "connection reset",
39
        ]
40
    ):
41
        return RedisErrorType.CONNECTION
1✔
42

43
    # Configuration-related errors (wrong settings)
44
    if any(
1✔
45
        keyword in error_str
46
        for keyword in [
47
            "authentication failed",
48
            "wrong number of arguments",
49
            "unknown command",
50
            "invalid password",
51
            "no password is set",
52
        ]
53
    ):
NEW
54
        return RedisErrorType.CONFIGURATION
×
55

56
    # Operation-specific errors (runtime issues)
57
    if operation in ["Socket.IO emit", "background thread"]:
1✔
58
        return RedisErrorType.OPERATION
1✔
59

60
    # Default to connection error for startup operations
61
    return RedisErrorType.CONNECTION
1✔
62

63

64
def _handle_redis_failure(operation: str, error: Exception) -> str:
1✔
65
    """
66
    Handle Redis failures with classification and graceful degradation.
67

68
    Args:
69
        operation: Description of the operation that failed
70
        error: The exception that occurred
71

72
    Returns:
73
        Error type classification string
74
    """
75
    error_type = _classify_redis_error(operation, error)
1✔
76

77
    logger.error(f"Redis {operation} failed: {error}")
1✔
78
    logger.error(f"Error classified as: {error_type}")
1✔
79
    logger.warning(f"Redis {operation} failed, falling back to degraded mode")
1✔
80

81
    return error_type
1✔
82

83

84
def _should_use_redis_adapter():
1✔
85
    """Single check to determine if Redis adapter should be used"""
86
    if not settings.USE_REDIS_ADAPTER:
1✔
87
        logger.info("Redis adapter disabled - using default manager")
1✔
88
        return False
1✔
89

90
    if not settings.REDIS_HOST:
1✔
91
        logger.warning("REDIS_HOST not configured - falling back to default manager")
1✔
92
        return False
1✔
93

94
    # All required settings present
95
    return True
1✔
96

97

98
def _build_redis_url():
1✔
99
    """Build Redis connection URL from settings"""
100
    if settings.REDIS_PASSWORD:
1✔
101
        return f"redis://:{settings.REDIS_PASSWORD}@{settings.REDIS_HOST}:{settings.REDIS_PORT}/{settings.REDIS_DB}"
1✔
102
    else:
103
        return (
1✔
104
            f"redis://{settings.REDIS_HOST}:{settings.REDIS_PORT}/{settings.REDIS_DB}"
105
        )
106

107

108
def can_we_reach_redis(redis_url):
1✔
109
    """
110
    Test if we can reach Redis right now before creating manager.
111

112
    Returns:
113
        bool: True if Redis is available, False if should fall back to local manager
114
    """
115
    try:
1✔
116
        # Use synchronous Redis client to avoid event loop conflicts
117
        redis_client = redis.from_url(redis_url)
1✔
118
        redis_client.ping()
1✔
119
        redis_client.close()
1✔
120
        return True
1✔
121

122
    except Exception as e:
1✔
123
        # Log the error and return False to indicate fallback should be used
124
        _handle_redis_failure("connectivity test before manager creation", e)
1✔
125
        return False
1✔
126

127

128
def _patch_redis_manager_for_graceful_failure(manager):
1✔
129
    """Patch Redis manager to handle background thread failures gracefully"""
130
    if manager is None:
1✔
131
        return
1✔
132

133
    # Store original methods
134
    original_thread = manager._thread
1✔
135
    original_redis_listen_with_retries = getattr(
1✔
136
        manager, "_redis_listen_with_retries", None
137
    )
138

139
    async def graceful_redis_failure_thread():
1✔
140
        """Enhanced _thread with comprehensive error handling and restart logic"""
141
        retry_count = 0
1✔
142
        max_retries = settings.REDIS_THREAD_MAX_RETRIES
1✔
143
        base_delay = settings.REDIS_RETRY_BASE_DELAY
1✔
144
        max_delay = settings.REDIS_RETRY_MAX_DELAY
1✔
145

146
        while retry_count < max_retries:
1✔
147
            try:
1✔
148
                await original_thread()
1✔
149
                # If we get here, the thread completed normally (shouldn't happen)
NEW
150
                logger.warning("Redis background thread completed unexpectedly")
×
NEW
151
                break
×
152
            except Exception as e:
1✔
153
                retry_count += 1
1✔
154
                error_type = _handle_redis_failure("background thread", e)
1✔
155

156
                if retry_count >= max_retries:
1✔
157
                    logger.error(
1✔
158
                        f"Redis background thread failed permanently after {max_retries} attempts (error type: {error_type})"
159
                    )
160
                    logger.warning("Redis manager switching to write-only mode")
1✔
161
                    break
1✔
162

163
                # Calculate exponential backoff delay
164
                delay = min(base_delay * (2 ** (retry_count - 1)), max_delay)
1✔
165
                logger.warning(
1✔
166
                    f"Redis background thread failed (attempt {retry_count}/{max_retries}, error type: {error_type}), retrying in {delay}s"
167
                )
168

169
                await asyncio.sleep(delay)
1✔
170

171
    async def enhanced_redis_listen_with_retries():
1✔
172
        """Enhanced _redis_listen_with_retries with broader exception handling"""
NEW
173
        if original_redis_listen_with_retries is None:
×
NEW
174
            return
×
175

NEW
176
        retry_sleep = settings.REDIS_RETRY_BASE_DELAY
×
NEW
177
        connect = False
×
NEW
178
        max_consecutive_failures = settings.REDIS_PUBSUB_MAX_FAILURES
×
NEW
179
        consecutive_failures = 0
×
180

NEW
181
        while consecutive_failures < max_consecutive_failures:
×
NEW
182
            try:
×
NEW
183
                if connect:
×
NEW
184
                    manager._redis_connect()
×
NEW
185
                    await manager.pubsub.subscribe(manager.channel)
×
NEW
186
                    retry_sleep = 1
×
NEW
187
                    consecutive_failures = 0  # Reset on successful connection
×
188

NEW
189
                async for message in manager.pubsub.listen():
×
NEW
190
                    yield message
×
191

NEW
192
            except Exception as e:  # Catch all exceptions for robust error handling
×
NEW
193
                consecutive_failures += 1
×
NEW
194
                error_type = _handle_redis_failure("Redis pubsub listen", e)
×
195

NEW
196
                if consecutive_failures >= max_consecutive_failures:
×
NEW
197
                    logger.error(
×
198
                        f"Redis pubsub failed {consecutive_failures} consecutive times, giving up"
199
                    )
NEW
200
                    break
×
201

NEW
202
                logger.warning(
×
203
                    f"Redis pubsub error (type: {error_type}, failure {consecutive_failures}/{max_consecutive_failures}), retrying in {retry_sleep}s"
204
                )
NEW
205
                connect = True
×
NEW
206
                await asyncio.sleep(retry_sleep)
×
NEW
207
                retry_sleep = min(retry_sleep * 2, settings.REDIS_RETRY_MAX_DELAY)
×
208

209
    # Replace the background thread method
210
    manager._thread = graceful_redis_failure_thread
1✔
211

212
    # Also patch the _redis_listen_with_retries method if it exists
213
    if original_redis_listen_with_retries:
1✔
214
        manager._redis_listen_with_retries = enhanced_redis_listen_with_retries
1✔
215

216

217
def create_socket_manager():
1✔
218
    """Create Socket.IO manager with Redis adapter if configured"""
219
    if not _should_use_redis_adapter():
1✔
220
        logger.info("Redis adapter disabled - using default Socket.IO manager")
1✔
221
        return None
1✔
222

223
    try:
1✔
224
        # Build Redis URL
225
        redis_url = _build_redis_url()
1✔
226

227
        # Part 1: Test Redis connectivity BEFORE creating manager
228
        # This prevents background threads from starting with bad Redis config
229
        redis_available = can_we_reach_redis(redis_url)
1✔
230

231
        if not redis_available:
1✔
232
            logger.warning(
1✔
233
                "Redis connectivity test failed - falling back to default Socket.IO manager"
234
            )
235
            return None
1✔
236

237
        # Create manager only if Redis connectivity test passed
238
        manager = socketio.AsyncRedisManager(redis_url)
1✔
239

240
        # Part 2: Patch manager for graceful error handling
241
        # This ensures background thread failures are handled gracefully
242
        _patch_redis_manager_for_graceful_failure(manager)
1✔
243

244
        logger.info(
1✔
245
            f"Redis adapter configured: {settings.REDIS_HOST}:{settings.REDIS_PORT}"
246
        )
247
        return manager
1✔
248

249
    except Exception as e:
1✔
250
        # Handle any unexpected errors gracefully
251
        error_type = _handle_redis_failure("adapter initialization", e)
1✔
252
        logger.warning(
1✔
253
            f"Unexpected error during Redis adapter initialization (type: {error_type}): {e}"
254
        )
255
        logger.info("Falling back to default Socket.IO manager")
1✔
256
        return None
1✔
257

258

259
async def safe_emit(event, data=None, **kwargs):
1✔
260
    """
261
    Safely emit to Socket.IO with graceful Redis failure handling.
262

263
    When USE_REDIS_ADAPTER=true, Redis failures are logged but don't crash the application.
264
    When USE_REDIS_ADAPTER=false, Redis is not used and this function simply calls sio.emit.
265
    """
266
    try:
1✔
267
        await sio.emit(event, data, **kwargs)
1✔
268
    except Exception as e:
1✔
269
        if settings.USE_REDIS_ADAPTER:
1✔
270
            # Log the error but continue gracefully - don't crash the application
271
            error_type = _handle_redis_failure("Socket.IO emit", e)
1✔
272
            logger.warning(f"Socket.IO emit failed (type: {error_type}): {e}")
1✔
273
            logger.info(
1✔
274
                "Continuing without real-time Socket.IO communication for this event"
275
            )
276
        else:
277
            logger.warning(f"Socket.IO emit failed, continuing gracefully: {e}")
1✔
278
            # Continue without Redis when adapter is disabled
279

280

281
# Create Socket.IO server with Redis adapter
282
sio = socketio.AsyncServer(
1✔
283
    async_mode="asgi", cors_allowed_origins="*", client_manager=create_socket_manager()
284
)
285

286
sio_app = socketio.ASGIApp(socketio_server=sio, socketio_path="/ws/socket.io")
1✔
287

288

289
def get_db_for_socketio():
1✔
290
    """
291
    Get a database connection for use in Socket.IO event handlers.
292

293
    FastAPI's dependency injection system (e.g., the get_db() dependency) is not available
294
    inside Socket.IO event handlers because these handlers are not managed by FastAPI's
295
    request/response lifecycle. As a result, dependencies like get_db() cannot be injected
296
    in the usual way.
297

298
    Use this function to obtain a database connection when handling Socket.IO events.
299
    In all other FastAPI routes or dependencies, prefer using the standard get_db() dependency.
300
    """
301
    return client[settings.DB_NAME]
1✔
302

303

304
@sio.event
1✔
305
async def connect(sid, socket):
1✔
306
    logger.info(f">>> connect : sid={sid}")
1✔
307

308

309
@sio.event
1✔
310
async def initialize(sid, data):
1✔
311
    # Store websocket session ID in the AuthSession
312
    db = get_db_for_socketio()
1✔
313
    pid = data.get("pid")
1✔
314
    if pid:
1✔
315
        try:
1✔
316
            # Update only the socket_id field for efficiency
317
            await AuthSessionCRUD(db).update_socket_id(pid, sid)
1✔
318
            logger.debug(f"Stored socket_id {sid} for pid {pid}")
1✔
319
        except Exception as e:
1✔
320
            logger.error(f"Failed to store socket_id for pid {pid}: {e}")
1✔
321

322

323
@sio.event
1✔
324
async def disconnect(sid):
1✔
325
    logger.info(f">>> disconnect : sid={sid}")
1✔
326
    # Clear socket_id from AuthSession
327
    db = get_db_for_socketio()
1✔
328
    try:
1✔
329
        auth_session = await AuthSessionCRUD(db).get_by_socket_id(sid)
1✔
330
        if auth_session:
1✔
331
            # Clear only the socket_id field for efficiency
332
            await AuthSessionCRUD(db).update_socket_id(str(auth_session.id), None)
1✔
333
            logger.debug(f"Cleared socket_id {sid} for pid {auth_session.id}")
1✔
334
    except Exception as e:
1✔
335
        logger.error(f"Failed to clear socket_id {sid}: {e}")
1✔
336

337

338
async def get_socket_id_for_pid(pid: str, db: Database) -> str | None:
1✔
339
    """Get current socket ID for presentation ID"""
340
    try:
1✔
341
        auth_session = await AuthSessionCRUD(db).get(pid)
1✔
342
        return auth_session.socket_id
1✔
343
    except Exception as e:
1✔
344
        logger.error(f"Failed to get socket_id for pid {pid}: {e}")
1✔
345
        return None
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc