• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

openwallet-foundation / acapy-vc-authn-oidc / 16353326490

17 Jul 2025 06:44PM UTC coverage: 88.317% (+0.5%) from 87.841%
16353326490

Pull #802

github

web-flow
Merge 834c9ce74 into b68e6aa9a
Pull Request #802: Connection based verification

158 of 221 new or added lines in 7 files covered. (71.49%)

6 existing lines in 1 file now uncovered.

1013 of 1147 relevant lines covered (88.32%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.12
/oidc-controller/api/routers/acapy_handler.py
1
import json
1✔
2
from pydantic.plugin import Any
1✔
3
import structlog
1✔
4
from datetime import datetime, timedelta
1✔
5

6
from fastapi import APIRouter, Depends, Request
1✔
7
from pymongo.database import Database
1✔
8

9
from ..authSessions.crud import AuthSessionCRUD
1✔
10
from ..authSessions.models import AuthSession, AuthSessionPatch, AuthSessionState
1✔
11
from ..db.session import get_db
1✔
12
from ..core.acapy.client import AcapyClient
1✔
13
from ..verificationConfigs.crud import VerificationConfigCRUD
1✔
14

15
from ..core.config import settings
1✔
16
from ..routers.socketio import sio, connections_reload
1✔
17

18
logger: structlog.typing.FilteringBoundLogger = structlog.getLogger(__name__)
1✔
19

20
router = APIRouter()
1✔
21

22

23
async def _parse_webhook_body(request: Request) -> dict[Any, Any]:
1✔
24
    return json.loads((await request.body()).decode("ascii"))
1✔
25

26

27
@router.post("/topic/{topic}/")
1✔
28
async def post_topic(request: Request, topic: str, db: Database = Depends(get_db)):
1✔
29
    """Called by aca-py agent."""
30
    logger.info(f">>> post_topic : topic={topic}")
1✔
31
    logger.info(f">>> web hook post_body : {await _parse_webhook_body(request)}")
1✔
32

33
    match topic:
1✔
34
        case "connections":
1✔
35
            if settings.USE_CONNECTION_BASED_VERIFICATION:
1✔
36
                webhook_body = await _parse_webhook_body(request)
1✔
37
                logger.info(f">>>> connection_id: {webhook_body.get('connection_id')}")
1✔
38
                logger.info(f">>>> connection state: {webhook_body.get('state')}")
1✔
39

40
                # Log request state for debugging but don't act on it yet
41
                if webhook_body.get("state") == "request":
1✔
NEW
42
                    logger.info(
×
43
                        f"Connection {webhook_body.get('connection_id')} is in request state, waiting for active/completed"
44
                    )
45

46
                if webhook_body.get("state") in ["active", "completed"]:
1✔
47
                    # NEW: Connection established, now send presentation request
48
                    connection_id = webhook_body.get("connection_id")
1✔
49
                    invitation_msg_id = webhook_body.get("invitation_msg_id")
1✔
50

51
                    logger.info(f"Full webhook body: {webhook_body}")
1✔
52
                    logger.info(f"Available keys: {list(webhook_body.keys())}")
1✔
53

54
                    # Try multiple possible fields for invitation message ID
55
                    search_id = (
1✔
56
                        invitation_msg_id
57
                        or webhook_body.get("invi_msg_id")
58
                        or webhook_body.get("invitation_id")
59
                    )
60

61
                    # Find the auth session by invitation message ID (stored as connection_id initially)
62
                    logger.info(f"Looking for auth session with search_id: {search_id}")
1✔
63
                    auth_session = await AuthSessionCRUD(db).get_by_connection_id(
1✔
64
                        search_id
65
                    )
66

67
                    # If not found by invitation message ID, try by connection_id directly
68
                    if not auth_session and connection_id:
1✔
NEW
69
                        logger.info(
×
70
                            f"Trying to find auth session by connection_id: {connection_id}"
71
                        )
NEW
72
                        auth_session = await AuthSessionCRUD(db).get_by_connection_id(
×
73
                            connection_id
74
                        )
75

76
                    # If still not found, try searching by pres_exch_id pattern
77
                    if not auth_session and search_id:
1✔
NEW
78
                        logger.info(
×
79
                            f"Trying to find auth session by pres_exch_id pattern: pending-{search_id}"
80
                        )
NEW
81
                        try:
×
NEW
82
                            auth_session = await AuthSessionCRUD(
×
83
                                db
84
                            ).get_by_pres_exch_id(f"pending-{search_id}")
NEW
85
                        except:
×
NEW
86
                            pass  # This lookup might fail if the pattern doesn't match
×
87

88
                    if auth_session:
1✔
89
                        logger.info(f"Found auth session: {auth_session.id}")
1✔
90
                        logger.info(
1✔
91
                            f"Auth session has proof_request: {auth_session.proof_request is not None}"
92
                        )
93

94
                        if auth_session.proof_request:
1✔
95
                            logger.info(
1✔
96
                                f"Sending presentation request to connection {connection_id}"
97
                            )
98

99
                            # Send presentation request to the established connection
100
                            client = AcapyClient()
1✔
101
                            try:
1✔
102
                                pres_response = client.send_presentation_request_by_connection(
1✔
103
                                    connection_id=connection_id,
104
                                    presentation_request_configuration=auth_session.proof_request,
105
                                )
106

107
                                # Update auth session with presentation exchange details and real connection ID
108
                                auth_session.pres_exch_id = pres_response.pres_ex_id
1✔
109
                                auth_session.presentation_exchange = (
1✔
110
                                    pres_response.model_dump()
111
                                )
112
                                auth_session.connection_id = (
1✔
113
                                    connection_id  # Update with real connection ID
114
                                )
115
                                await AuthSessionCRUD(db).patch(
1✔
116
                                    str(auth_session.id),
117
                                    AuthSessionPatch(**auth_session.model_dump()),
118
                                )
119

120
                                logger.info(
1✔
121
                                    f"Presentation request sent successfully: {pres_response.pres_ex_id}"
122
                                )
123
                            except Exception as e:
1✔
124
                                logger.error(
1✔
125
                                    f"Failed to send presentation request: {e}"
126
                                )
127
                                # Set auth session to failed state
128
                                auth_session.proof_status = AuthSessionState.FAILED
1✔
129
                                await AuthSessionCRUD(db).patch(
1✔
130
                                    str(auth_session.id),
131
                                    AuthSessionPatch(**auth_session.model_dump()),
132
                                )
133

134
                                # Send problem report if we have a presentation exchange ID
135
                                if auth_session.pres_exch_id:
1✔
136
                                    try:
1✔
137
                                        client.send_problem_report(
1✔
138
                                            auth_session.pres_exch_id,
139
                                            f"Failed to send presentation request: {str(e)}",
140
                                        )
141
                                        logger.info(
1✔
142
                                            f"Problem report sent for pres_ex_id: {auth_session.pres_exch_id}"
143
                                        )
NEW
144
                                    except Exception as problem_report_error:
×
NEW
145
                                        logger.error(
×
146
                                            f"Failed to send problem report: {problem_report_error}"
147
                                        )
148

149
                                # Emit failure status to frontend
150
                                connections = connections_reload()
1✔
151
                                sid = connections.get(str(auth_session.id))
1✔
152
                                if sid:
1✔
153
                                    await sio.emit(
1✔
154
                                        "status", {"status": "failed"}, to=sid
155
                                    )
156
                        else:
NEW
157
                            logger.warning(
×
158
                                f"Auth session found but no proof_request: {auth_session.id}"
159
                            )
160
                    else:
NEW
161
                        logger.warning(
×
162
                            f"No auth session found for invitation_msg_id: {invitation_msg_id}"
163
                        )
164

165
        case "present_proof_v2_0":
1✔
166
            webhook_body = await _parse_webhook_body(request)
1✔
167
            logger.info(f">>>> pres_exch_id: {webhook_body['pres_ex_id']}")
1✔
168
            # logger.info(f">>>> web hook: {webhook_body}")
169
            auth_session: AuthSession = await AuthSessionCRUD(db).get_by_pres_exch_id(
1✔
170
                webhook_body["pres_ex_id"]
171
            )
172

173
            # Get the saved websocket session
174
            pid = str(auth_session.id)
1✔
175
            connections = connections_reload()
1✔
176
            sid = connections.get(pid)
1✔
177
            logger.debug(f"sid: {sid} found for pid: {pid}")
1✔
178

179
            if webhook_body["state"] == "presentation-received":
1✔
180
                logger.info("presentation-received")
×
181

182
            if webhook_body["state"] == "done":
1✔
183
                logger.info("VERIFIED")
1✔
184
                if webhook_body["verified"] == "true":
1✔
185
                    auth_session.proof_status = AuthSessionState.VERIFIED
1✔
186
                    auth_session.presentation_exchange = webhook_body["by_format"]
1✔
187
                    if sid:
1✔
188
                        await sio.emit("status", {"status": "verified"}, to=sid)
1✔
189
                else:
190
                    auth_session.proof_status = AuthSessionState.FAILED
1✔
191
                    if sid:
1✔
192
                        await sio.emit("status", {"status": "failed"}, to=sid)
1✔
193

194
                    # Send problem report for failed verification in connection-based flow
195
                    if (
1✔
196
                        settings.USE_CONNECTION_BASED_VERIFICATION
197
                        and auth_session.pres_exch_id
198
                    ):
199
                        try:
1✔
200
                            client = AcapyClient()
1✔
201
                            client.send_problem_report(
1✔
202
                                auth_session.pres_exch_id,
203
                                f"Presentation verification failed: {webhook_body.get('error_msg', 'Unknown error')}",
204
                            )
205
                            logger.info(
1✔
206
                                f"Problem report sent for failed verification: {auth_session.pres_exch_id}"
207
                            )
NEW
208
                        except Exception as problem_report_error:
×
NEW
209
                            logger.error(
×
210
                                f"Failed to send problem report for failed verification: {problem_report_error}"
211
                            )
212

213
                await AuthSessionCRUD(db).patch(
1✔
214
                    str(auth_session.id), AuthSessionPatch(**auth_session.model_dump())
215
                )
216

217
                # Cleanup connection after verification is complete (for connection-based flow)
218
                if (
1✔
219
                    settings.USE_CONNECTION_BASED_VERIFICATION
220
                    and auth_session.connection_id
221
                ):
222
                    try:
1✔
223
                        client = AcapyClient()
1✔
224
                        success = client.delete_connection(auth_session.connection_id)
1✔
225
                        if success:
1✔
226
                            logger.info(
1✔
227
                                f"Cleaned up connection {auth_session.connection_id} after verification"
228
                            )
229
                        else:
NEW
230
                            logger.warning(
×
231
                                f"Failed to cleanup connection {auth_session.connection_id}"
232
                            )
NEW
233
                    except Exception as e:
×
NEW
234
                        logger.error(
×
235
                            f"Error cleaning up connection {auth_session.connection_id}: {e}"
236
                        )
237

238
            # abandoned state
239
            if webhook_body["state"] == "abandoned":
1✔
240
                logger.info("ABANDONED")
1✔
241
                logger.info(webhook_body["error_msg"])
1✔
242
                auth_session.proof_status = AuthSessionState.ABANDONED
1✔
243
                if sid:
1✔
244
                    await sio.emit("status", {"status": "abandoned"}, to=sid)
1✔
245

246
                # Send problem report for abandoned presentation in connection-based flow
247
                if (
1✔
248
                    settings.USE_CONNECTION_BASED_VERIFICATION
249
                    and auth_session.pres_exch_id
250
                ):
251
                    try:
1✔
252
                        client = AcapyClient()
1✔
253
                        client.send_problem_report(
1✔
254
                            auth_session.pres_exch_id,
255
                            f"Presentation abandoned: {webhook_body.get('error_msg', 'Unknown error')}",
256
                        )
257
                        logger.info(
1✔
258
                            f"Problem report sent for abandoned presentation: {auth_session.pres_exch_id}"
259
                        )
NEW
260
                    except Exception as problem_report_error:
×
NEW
261
                        logger.error(
×
262
                            f"Failed to send problem report for abandoned presentation: {problem_report_error}"
263
                        )
264

265
                await AuthSessionCRUD(db).patch(
1✔
266
                    str(auth_session.id), AuthSessionPatch(**auth_session.model_dump())
267
                )
268

269
                # Cleanup connection after verification is abandoned (for connection-based flow)
270
                if (
1✔
271
                    settings.USE_CONNECTION_BASED_VERIFICATION
272
                    and auth_session.connection_id
273
                ):
274
                    try:
1✔
275
                        client = AcapyClient()
1✔
276
                        success = client.delete_connection(auth_session.connection_id)
1✔
277
                        if success:
1✔
278
                            logger.info(
1✔
279
                                f"Cleaned up connection {auth_session.connection_id} after abandonment"
280
                            )
281
                        else:
NEW
282
                            logger.warning(
×
283
                                f"Failed to cleanup connection {auth_session.connection_id}"
284
                            )
NEW
285
                    except Exception as e:
×
NEW
286
                        logger.error(
×
287
                            f"Error cleaning up connection {auth_session.connection_id}: {e}"
288
                        )
289

290
            # Calcuate the expiration time of the proof
291
            now_time = datetime.now()
1✔
292
            expired_time = now_time + timedelta(
1✔
293
                seconds=settings.CONTROLLER_PRESENTATION_EXPIRE_TIME
294
            )
295

296
            # Update the expiration time of the proof
297
            auth_session.expired_timestamp = expired_time
1✔
298
            await AuthSessionCRUD(db).patch(
1✔
299
                str(auth_session.id), AuthSessionPatch(**auth_session.model_dump())
300
            )
301

302
            # Check if expired. But only if the proof has not been started.
303
            if (
1✔
304
                expired_time < now_time
305
                and auth_session.proof_status == AuthSessionState.NOT_STARTED
306
            ):
307
                logger.info("EXPIRED")
×
308
                auth_session.proof_status = AuthSessionState.EXPIRED
×
309
                if sid:
×
310
                    await sio.emit("status", {"status": "expired"}, to=sid)
×
311

312
                # Send problem report for expired presentation in connection-based flow
NEW
313
                if (
×
314
                    settings.USE_CONNECTION_BASED_VERIFICATION
315
                    and auth_session.pres_exch_id
316
                ):
NEW
317
                    try:
×
NEW
318
                        client = AcapyClient()
×
NEW
319
                        client.send_problem_report(
×
320
                            auth_session.pres_exch_id,
321
                            f"Presentation expired: timeout after {settings.CONTROLLER_PRESENTATION_EXPIRE_TIME} seconds",
322
                        )
NEW
323
                        logger.info(
×
324
                            f"Problem report sent for expired presentation: {auth_session.pres_exch_id}"
325
                        )
NEW
326
                    except Exception as problem_report_error:
×
NEW
327
                        logger.error(
×
328
                            f"Failed to send problem report for expired presentation: {problem_report_error}"
329
                        )
330

331
                await AuthSessionCRUD(db).patch(
×
332
                    str(auth_session.id), AuthSessionPatch(**auth_session.model_dump())
333
                )
334

335
                # Cleanup connection after verification expires (for connection-based flow)
NEW
336
                if (
×
337
                    settings.USE_CONNECTION_BASED_VERIFICATION
338
                    and auth_session.connection_id
339
                ):
NEW
340
                    try:
×
NEW
341
                        client = AcapyClient()
×
NEW
342
                        success = client.delete_connection(auth_session.connection_id)
×
NEW
343
                        if success:
×
NEW
344
                            logger.info(
×
345
                                f"Cleaned up connection {auth_session.connection_id} after expiration"
346
                            )
347
                        else:
NEW
348
                            logger.warning(
×
349
                                f"Failed to cleanup connection {auth_session.connection_id}"
350
                            )
NEW
351
                    except Exception as e:
×
NEW
352
                        logger.error(
×
353
                            f"Error cleaning up connection {auth_session.connection_id}: {e}"
354
                        )
355

356
            pass
1✔
357
        case _:
×
358
            logger.debug("skipping webhook")
×
359

360
    return {}
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc