• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

openwallet-foundation / acapy-vc-authn-oidc / 19545712621

20 Nov 2025 05:30PM UTC coverage: 92.267% (-0.1%) from 92.384%
19545712621

Pull #906

github

web-flow
Merge 26080a16b into 4906a7603
Pull Request #906: Feat/manage script testing

12 of 15 new or added lines in 3 files covered. (80.0%)

30 existing lines in 2 files now uncovered.

1587 of 1720 relevant lines covered (92.27%)

0.92 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

83.81
/oidc-controller/api/core/acapy/client.py
1
import json
1✔
2
from uuid import UUID
1✔
3

4
import requests
1✔
5
import structlog
1✔
6

7
from ..config import settings
1✔
8
from .config import AgentConfig, MultiTenantAcapy, SingleTenantAcapy
1✔
9
from .models import CreatePresentationResponse, OobCreateInvitationResponse, WalletDid
1✔
10

11
# HTTP timeout for all ACA-Py API calls (seconds)
12
ACAPY_HTTP_TIMEOUT = 10.0
1✔
13

14
_client = None
1✔
15
logger: structlog.typing.FilteringBoundLogger = structlog.getLogger(__name__)
1✔
16

17
WALLET_DID_URI = "/wallet/did"
1✔
18
PUBLIC_WALLET_DID_URI = "/wallet/did/public"
1✔
19
CREATE_PRESENTATION_REQUEST_URL = "/present-proof-2.0/create-request"
1✔
20
PRESENT_PROOF_RECORDS = "/present-proof-2.0/records"
1✔
21
SEND_PRESENTATION_REQUEST_URL = "/present-proof-2.0/send-request"
1✔
22
PRESENT_PROOF_PROBLEM_REPORT_URL = (
1✔
23
    "/present-proof-2.0/records/{pres_ex_id}/problem-report"
24
)
25
OOB_CREATE_INVITATION = "/out-of-band/create-invitation"
1✔
26
CONNECTIONS_URI = "/connections"
1✔
27

28

29
class AcapyClient:
1✔
30
    acapy_host = settings.ACAPY_ADMIN_URL
1✔
31
    service_endpoint = settings.ACAPY_AGENT_URL
1✔
32

33
    wallet_token: str | None = None
1✔
34
    agent_config: AgentConfig
1✔
35

36
    def __init__(self):
1✔
37
        if settings.ACAPY_TENANCY == "multi":
1✔
38
            self.agent_config = MultiTenantAcapy()
1✔
39
        elif settings.ACAPY_TENANCY == "single":
1✔
40
            self.agent_config = SingleTenantAcapy()
1✔
41
        else:
42
            logger.warning("ACAPY_TENANCY not set, assuming SingleTenantAcapy")
1✔
43
            self.agent_config = SingleTenantAcapy()
1✔
44

45
        if _client:
1✔
46
            return _client
×
47
        super().__init__()
1✔
48

49
    def create_presentation_request(
1✔
50
        self, presentation_request_configuration: dict
51
    ) -> CreatePresentationResponse:
52
        logger.debug(">>> create_presentation_request")
1✔
53

54
        format_key = settings.ACAPY_PROOF_FORMAT
1✔
55
        present_proof_payload = {
1✔
56
            "presentation_request": {format_key: presentation_request_configuration}
57
        }
58

59
        resp_raw = requests.post(
1✔
60
            self.acapy_host + CREATE_PRESENTATION_REQUEST_URL,
61
            headers=self.agent_config.get_headers(),
62
            json=present_proof_payload,
63
        )
64

65
        # TODO: Determine if this should assert it received a json object
66
        assert resp_raw.status_code == 200, resp_raw.content
1✔
67

68
        resp = json.loads(resp_raw.content)
1✔
69
        result = CreatePresentationResponse.model_validate(resp)
1✔
70

71
        logger.debug("<<< create_presenation_request")
1✔
72
        return result
1✔
73

74
    def get_presentation_request(self, presentation_exchange_id: UUID | str):
1✔
75
        logger.debug(">>> get_presentation_request")
1✔
76

77
        resp_raw = requests.get(
1✔
78
            self.acapy_host
79
            + PRESENT_PROOF_RECORDS
80
            + "/"
81
            + str(presentation_exchange_id),
82
            headers=self.agent_config.get_headers(),
83
        )
84

85
        # TODO: Determine if this should assert it received a json object
86
        assert resp_raw.status_code == 200, resp_raw.content
1✔
87

88
        resp = json.loads(resp_raw.content)
1✔
89

90
        logger.debug(f"<<< get_presentation_request -> {resp}")
1✔
91
        return resp
1✔
92

93
    def delete_presentation_record(self, presentation_exchange_id: UUID | str) -> bool:
1✔
94
        """Delete a presentation record by ID"""
95
        logger.debug(f">>> delete_presentation_record: {presentation_exchange_id}")
1✔
96

97
        try:
1✔
98
            resp_raw = requests.delete(
1✔
99
                f"{self.acapy_host}{PRESENT_PROOF_RECORDS}/{presentation_exchange_id}",
100
                headers=self.agent_config.get_headers(),
101
                timeout=ACAPY_HTTP_TIMEOUT,
102
            )
103

104
            success = resp_raw.status_code == 200
1✔
105
            if success:
1✔
106
                logger.debug(f"<<< delete_presentation_record -> Success")
1✔
107
            else:
108
                logger.warning(
1✔
109
                    f"<<< delete_presentation_record -> Failed: {resp_raw.status_code}, {resp_raw.content}"
110
                )
111
            return success
1✔
112

113
        except Exception as e:
1✔
114
            logger.error(
1✔
115
                f"Failed to delete presentation record {presentation_exchange_id}: {e}"
116
            )
117
            return False
1✔
118

119
    def get_all_presentation_records(self) -> list[dict]:
1✔
120
        """Get all presentation records for cleanup purposes"""
121
        logger.debug(">>> get_all_presentation_records")
1✔
122

123
        try:
1✔
124
            resp_raw = requests.get(
1✔
125
                f"{self.acapy_host}{PRESENT_PROOF_RECORDS}",
126
                headers=self.agent_config.get_headers(),
127
                timeout=ACAPY_HTTP_TIMEOUT,
128
            )
129

130
            if resp_raw.status_code != 200:
1✔
131
                logger.warning(
1✔
132
                    f"Failed to get presentation records: {resp_raw.status_code}, {resp_raw.content}"
133
                )
134
                return []
1✔
135

136
            resp = json.loads(resp_raw.content)
1✔
137
            records = resp.get("results", [])
1✔
138
            logger.debug(f"<<< get_all_presentation_records -> {len(records)} records")
1✔
139
            return records
1✔
140

141
        except Exception as e:
1✔
142
            logger.error(f"Failed to get all presentation records: {e}")
1✔
143
            return []
1✔
144

145
    def get_wallet_did(self, public=False) -> WalletDid:
1✔
146
        logger.debug(">>> get_wallet_did")
1✔
147
        url = None
1✔
148
        if public:
1✔
149
            url = self.acapy_host + PUBLIC_WALLET_DID_URI
1✔
150
        else:
151
            url = self.acapy_host + WALLET_DID_URI
1✔
152

153
        resp_raw = requests.get(
1✔
154
            url,
155
            headers=self.agent_config.get_headers(),
156
        )
157

158
        # TODO: Determine if this should assert it received a json object
159
        assert (
1✔
160
            resp_raw.status_code == 200
161
        ), f"{resp_raw.status_code}::{resp_raw.content}"
162

163
        resp = json.loads(resp_raw.content)
1✔
164

165
        if public:
1✔
166
            resp_payload = resp["result"]
1✔
167
        else:
168
            resp_payload = resp["results"][0]
1✔
169

170
        did = WalletDid.model_validate(resp_payload)
1✔
171

172
        logger.debug(f"<<< get_wallet_did -> {did}")
1✔
173
        return did
1✔
174

175
    def oob_create_invitation(
1✔
176
        self, presentation_exchange: dict, use_public_did: bool
177
    ) -> OobCreateInvitationResponse:
UNCOV
178
        logger.debug(">>> oob_create_invitation")
×
UNCOV
179
        create_invitation_payload = {
×
180
            "attachments": [
181
                {
182
                    "id": presentation_exchange["pres_ex_id"],
183
                    "type": "present-proof",
184
                    "data": {"json": presentation_exchange},
185
                }
186
            ],
187
            "use_public_did": use_public_did,
188
            "my_label": settings.INVITATION_LABEL,
189
        }
190

UNCOV
191
        resp_raw = requests.post(
×
192
            self.acapy_host + OOB_CREATE_INVITATION,
193
            headers=self.agent_config.get_headers(),
194
            json=create_invitation_payload,
195
        )
196

197
        assert resp_raw.status_code == 200, resp_raw.content
×
198

UNCOV
199
        resp = json.loads(resp_raw.content)
×
200
        result = OobCreateInvitationResponse.model_validate(resp)
×
201

UNCOV
202
        logger.debug("<<< oob_create_invitation")
×
UNCOV
203
        return result
×
204

205
    def send_presentation_request_by_connection(
1✔
206
        self, connection_id: str, presentation_request_configuration: dict
207
    ) -> CreatePresentationResponse:
208
        """
209
        Send a presentation request to an existing connection.
210

211
        Args:
212
            connection_id: The ID of the established connection
213
            presentation_request_configuration: The presentation request configuration
214

215
        Returns:
216
            CreatePresentationResponse: The response containing presentation exchange details
217
        """
218
        logger.debug(">>> send_presentation_request_by_connection")
1✔
219

220
        format_key = settings.ACAPY_PROOF_FORMAT
1✔
221
        present_proof_payload = {
1✔
222
            "connection_id": connection_id,
223
            "presentation_request": {format_key: presentation_request_configuration},
224
        }
225

226
        resp_raw = requests.post(
1✔
227
            self.acapy_host + SEND_PRESENTATION_REQUEST_URL,
228
            headers=self.agent_config.get_headers(),
229
            json=present_proof_payload,
230
        )
231

232
        assert resp_raw.status_code == 200, resp_raw.content
1✔
233

234
        resp = json.loads(resp_raw.content)
1✔
235
        result = CreatePresentationResponse.model_validate(resp)
1✔
236

237
        logger.debug("<<< send_presentation_request_by_connection")
1✔
238
        return result
1✔
239

240
    def get_connection(self, connection_id: str) -> dict:
1✔
241
        """
242
        Get details of a specific connection.
243

244
        Args:
245
            connection_id: The ID of the connection to retrieve
246

247
        Returns:
248
            dict: Connection details
249
        """
250
        logger.debug(">>> get_connection")
1✔
251

252
        resp_raw = requests.get(
1✔
253
            self.acapy_host + CONNECTIONS_URI + "/" + connection_id,
254
            headers=self.agent_config.get_headers(),
255
        )
256

257
        assert resp_raw.status_code == 200, resp_raw.content
1✔
258

259
        resp = json.loads(resp_raw.content)
1✔
260
        logger.debug(f"<<< get_connection -> {resp}")
1✔
261
        return resp
1✔
262

263
    def list_connections(self, state: str | None = None) -> list[dict]:
1✔
264
        """
265
        List all connections, optionally filtered by state.
266

267
        Args:
268
            state: Optional state filter (e.g., "active", "completed")
269

270
        Returns:
271
            list[dict]: List of connection records
272
        """
273
        logger.debug(">>> list_connections")
1✔
274

275
        params = {"state": state} if state else {}
1✔
276

277
        resp_raw = requests.get(
1✔
278
            self.acapy_host + CONNECTIONS_URI,
279
            headers=self.agent_config.get_headers(),
280
            params=params,
281
        )
282

283
        assert resp_raw.status_code == 200, resp_raw.content
1✔
284

285
        resp = json.loads(resp_raw.content)
1✔
286
        connections = resp.get("results", [])
1✔
287

288
        logger.debug(f"<<< list_connections -> {len(connections)} connections")
1✔
289
        return connections
1✔
290

291
    def _get_connections_page(
1✔
292
        self, state: str | None = None, limit: int = 100, offset: int = 0
293
    ) -> list[dict]:
294
        """
295
        Get a page of connections with pagination support.
296

297
        Args:
298
            state: Optional state filter (e.g., "invitation", "active")
299
            limit: Maximum number of connections to return
300
            offset: Number of connections to skip
301

302
        Returns:
303
            list[dict]: List of connection records for this page
304
        """
305
        logger.debug(
1✔
306
            f">>> _get_connections_page: state={state}, limit={limit}, offset={offset}"
307
        )
308

309
        params = {
1✔
310
            "limit": limit,
311
            "offset": offset,
312
            **({"state": state} if state else {}),
313
        }
314

315
        try:
1✔
316
            resp_raw = requests.get(
1✔
317
                self.acapy_host + CONNECTIONS_URI,
318
                headers=self.agent_config.get_headers(),
319
                params=params,
320
                timeout=ACAPY_HTTP_TIMEOUT,
321
            )
322

323
            if resp_raw.status_code != 200:
1✔
324
                logger.warning(
×
325
                    f"Failed to get connections page: {resp_raw.status_code}"
326
                )
UNCOV
327
                return []
×
328

329
            resp = json.loads(resp_raw.content)
1✔
330
            connections = resp.get("results", [])
1✔
331

332
            logger.debug(f"<<< _get_connections_page -> {len(connections)} connections")
1✔
333
            return connections
1✔
334

335
        except Exception as e:
1✔
336
            logger.error(f"Error getting connections page: {e}")
1✔
337
            return []
1✔
338

339
    def get_connections_batched(self, state: str = "invitation", batch_size: int = 100):
1✔
340
        """
341
        Get connections in batches using iterator pattern for memory efficiency.
342

343
        Args:
344
            state: State filter for connections (default: "invitation")
345
            batch_size: Number of connections per batch (default: 100)
346

347
        Yields:
348
            list[dict]: Batches of connection records
349
        """
350
        logger.debug(
1✔
351
            f">>> get_connections_batched: state={state}, batch_size={batch_size}"
352
        )
353

354
        offset = 0
1✔
355
        total_yielded = 0
1✔
356

357
        while True:
1✔
358
            batch = self._get_connections_page(state, batch_size, offset)
1✔
359

360
            if not batch:  # No more results
1✔
361
                break
1✔
362

363
            total_yielded += len(batch)
1✔
364
            logger.debug(
1✔
365
                f"Yielding batch of {len(batch)} connections (total so far: {total_yielded})"
366
            )
367
            yield batch
1✔
368

369
            # If we got less than batch_size, we've reached the end
370
            if len(batch) < batch_size:
1✔
371
                break
1✔
372

UNCOV
373
            offset += batch_size
×
374

375
        logger.debug(
1✔
376
            f"<<< get_connections_batched -> yielded {total_yielded} total connections"
377
        )
378

379
    def get_all_connections(self) -> list[dict]:
1✔
380
        """Get all connections for cleanup purposes"""
381
        logger.debug(">>> get_all_connections")
×
382

UNCOV
383
        try:
×
UNCOV
384
            resp_raw = requests.get(
×
385
                f"{self.acapy_host}{CONNECTIONS_URI}",
386
                headers=self.agent_config.get_headers(),
387
                timeout=ACAPY_HTTP_TIMEOUT,
388
            )
389

UNCOV
390
            if resp_raw.status_code != 200:
×
391
                logger.warning(
×
392
                    f"Failed to get connections: {resp_raw.status_code}, {resp_raw.content}"
393
                )
394
                return []
×
395

396
            resp = json.loads(resp_raw.content)
×
397
            connections = resp.get("results", [])
×
398

399
            logger.debug(f"<<< get_all_connections -> {len(connections)} connections")
×
400
            return connections
×
401

UNCOV
402
        except Exception as e:
×
UNCOV
403
            logger.error(f"Error getting all connections: {e}")
×
UNCOV
404
            return []
×
405

406
    def delete_connection(self, connection_id: str) -> bool:
1✔
407
        """
408
        Delete a connection.
409

410
        Args:
411
            connection_id: The ID of the connection to delete
412

413
        Returns:
414
            bool: True if deletion was successful
415
        """
416
        logger.debug(">>> delete_connection", connection_id=connection_id)
1✔
417

418
        try:
1✔
419
            resp_raw = requests.delete(
1✔
420
                self.acapy_host + CONNECTIONS_URI + "/" + connection_id,
421
                headers=self.agent_config.get_headers(),
422
                timeout=ACAPY_HTTP_TIMEOUT,
423
            )
424

425
            success = resp_raw.status_code == 200
1✔
426
            if success:
1✔
427
                logger.debug(f"<<< delete_connection -> Success")
1✔
428
            else:
429
                logger.warning(
1✔
430
                    f"<<< delete_connection -> Failed: {resp_raw.status_code}, {resp_raw.content}"
431
                )
432
            return success
1✔
433

UNCOV
434
        except Exception as e:
×
UNCOV
435
            logger.error(f"Failed to delete connection {connection_id}: {e}")
×
UNCOV
436
            return False
×
437

438
    def delete_presentation_record_and_connection(
1✔
439
        self, presentation_exchange_id: UUID | str, connection_id: str | None = None
440
    ) -> tuple[bool, bool | None, list[str]]:
441
        """
442
        Delete a presentation record and optionally its associated connection.
443

444
        This is the recommended method for cleanup as it handles both the presentation
445
        record and connection deletion in the proper order.
446

447
        Args:
448
            presentation_exchange_id: The presentation exchange ID to delete
449
            connection_id: Optional connection ID to delete. If not provided,
450
                         only the presentation record will be deleted.
451

452
        Returns:
453
            tuple[bool, bool | None, list[str]]: A tuple containing:
454
                - presentation_deleted: bool - True if presentation record was successfully deleted
455
                - connection_deleted: bool | None - True/False if connection deletion was attempted, None if not
456
                - errors: list[str] - List of error messages from failed operations
457
        """
458
        logger.debug(
1✔
459
            f">>> delete_presentation_record_and_connection: pres_ex={presentation_exchange_id}, conn={connection_id}"
460
        )
461

462
        presentation_deleted = False
1✔
463
        connection_deleted = None
1✔
464
        errors = []
1✔
465

466
        # First, delete the presentation record
467
        if presentation_exchange_id:
1✔
468
            try:
1✔
469
                presentation_deleted = self.delete_presentation_record(
1✔
470
                    presentation_exchange_id
471
                )
472

473
                if not presentation_deleted:
1✔
474
                    errors.append(
1✔
475
                        f"Failed to delete presentation record {presentation_exchange_id}"
476
                    )
477

478
            except Exception as e:
×
UNCOV
479
                error_msg = f"Error deleting presentation record {presentation_exchange_id}: {e}"
×
UNCOV
480
                errors.append(error_msg)
×
UNCOV
481
                logger.error(error_msg)
×
482

483
        # Second, delete the connection if provided
484
        # TODO: make mandatory when we drop OOB
485
        if connection_id:
1✔
486
            try:
1✔
487
                connection_deleted = self.delete_connection(connection_id)
1✔
488

489
                if not connection_deleted:
1✔
490
                    errors.append(f"Failed to delete connection {connection_id}")
1✔
491

492
            except Exception as e:
×
UNCOV
493
                error_msg = f"Error deleting connection {connection_id}: {e}"
×
UNCOV
494
                errors.append(error_msg)
×
UNCOV
495
                logger.error(error_msg)
×
496

497
        logger.debug(
1✔
498
            f"<<< delete_presentation_record_and_connection -> pres:{presentation_deleted}, conn:{connection_deleted}"
499
        )
500
        return presentation_deleted, connection_deleted, errors
1✔
501

502
    def send_problem_report(self, pres_ex_id: str, description: str) -> bool:
1✔
503
        """
504
        Send a problem report for a presentation exchange.
505

506
        Args:
507
            pres_ex_id: The presentation exchange ID
508
            description: Description of the problem
509

510
        Returns:
511
            bool: True if problem report was sent successfully
512
        """
513
        logger.debug(">>> send_problem_report")
1✔
514

515
        problem_report_payload = {"description": description}
1✔
516

517
        try:
1✔
518
            resp_raw = requests.post(
1✔
519
                self.acapy_host
520
                + PRESENT_PROOF_PROBLEM_REPORT_URL.format(pres_ex_id=pres_ex_id),
521
                json=problem_report_payload,
522
                headers=self.agent_config.get_headers(),
523
            )
524

525
            success = resp_raw.status_code == 200
1✔
526
            logger.debug(f"<<< send_problem_report -> {success}")
1✔
527

528
            if not success:
1✔
529
                logger.error(
1✔
530
                    f"Failed to send problem report: {resp_raw.status_code} - {resp_raw.content}"
531
                )
532

533
            return success
1✔
534

535
        except Exception as e:
1✔
536
            logger.error(f"Error sending problem report: {e}")
1✔
537
            return False
1✔
538

539
    def create_connection_invitation(
1✔
540
        self,
541
        multi_use: bool = False,
542
        presentation_exchange: dict | None = None,
543
        use_public_did: bool = False,
544
        alias: str | None = None,
545
        auto_accept: bool | None = None,
546
        metadata: dict | None = None,
547
    ) -> OobCreateInvitationResponse:
548
        """
549
        Create an out-of-band invitation for either ephemeral or persistent connections.
550

551
        Args:
552
            multi_use: Whether this is an non ephemeral (multi_use) connection (default: False)
553
            presentation_exchange: Optional presentation exchange to attach to invitation
554
            use_public_did: Whether to use public DID for the invitation (default: False)
555
            alias: Optional alias for the connection (default: None)
556
            auto_accept: Whether to auto-accept the connection (default: None - use configuration)
557
            metadata: Optional metadata to attach to the connection (default: None)
558

559
        Returns:
560
            OobCreateInvitationResponse: The response containing invitation details
561
        """
562
        logger.debug(">>> create_connection_invitation")
1✔
563

564
        # Determine connection type and goal code
565
        if multi_use:
1✔
566
            goal_code = "aries.vc.verify"
1✔
567
            goal = "Verify credentials for authentication"
1✔
568
            multi_use = True
1✔
569
        else:
570
            goal_code = "aries.vc.verify.once"
1✔
571
            goal = "Verify credentials for single-use authentication"
1✔
572
            multi_use = False
1✔
573
        # Prepare the payload for the invitation creation
574
        create_invitation_payload = {
1✔
575
            "use_public_did": use_public_did,
576
            "my_label": settings.INVITATION_LABEL,
577
            "goal_code": goal_code,
578
            "goal": goal,
579
        }
580

581
        # Add handshake protocols if no presentation attachment is provided
582
        if not presentation_exchange:
1✔
583
            create_invitation_payload["handshake_protocols"] = [
1✔
584
                "https://didcomm.org/didexchange/1.0",
585
                "https://didcomm.org/connections/1.0",
586
            ]
587

588
        # Add presentation exchange attachment if provided
589
        if presentation_exchange:
1✔
UNCOV
590
            create_invitation_payload["attachments"] = [
×
591
                {
592
                    "id": presentation_exchange["pres_ex_id"],
593
                    "type": "present-proof",
594
                    "data": {"json": presentation_exchange},
595
                }
596
            ]
597

598
        # Add optional body parameters if provided
599
        if alias is not None:
1✔
UNCOV
600
            create_invitation_payload["alias"] = alias
×
601
        if metadata:
1✔
UNCOV
602
            create_invitation_payload["metadata"] = metadata
×
603

604
        # Prepare query parameters
605
        params = {"multi_use": str(multi_use).lower()}
1✔
606
        if auto_accept is not None:
1✔
UNCOV
607
            params["auto_accept"] = str(auto_accept).lower()
×
608

609
        # Make the request to ACA-Py
610
        resp_raw = requests.post(
1✔
611
            self.acapy_host + OOB_CREATE_INVITATION,
612
            headers=self.agent_config.get_headers(),
613
            json=create_invitation_payload,
614
            params=params,
615
        )
616

617
        # Validate the response
618
        assert resp_raw.status_code == 200, resp_raw.content
1✔
619

620
        # Parse and validate the response
621
        resp = json.loads(resp_raw.content)
1✔
622
        result = OobCreateInvitationResponse.model_validate(resp)
1✔
623

624
        logger.debug("<<< create_connection_invitation")
1✔
625
        return result
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc