• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

qiskit-community / qiskit-aqt-provider / 9255177207

27 May 2024 12:52PM UTC coverage: 99.736%. Remained the same
9255177207

push

github

web-flow
Prepare release 1.5.0 (#161)

2267 of 2273 relevant lines covered (99.74%)

3.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.37
/qiskit_aqt_provider/aqt_job.py
1
# This code is part of Qiskit.
2
#
3
# (C) Copyright IBM 2019, Alpine Quantum Technologies GmbH 2022.
4
#
5
# This code is licensed under the Apache License, Version 2.0. You may
6
# obtain a copy of this license in the LICENSE.txt file in the root directory
7
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
8
#
9
# Any modifications or derivative works of this code must retain this
10
# copyright notice, and modified files need to carry a notice indicating
11
# that they have been altered from the originals.
12

13
import uuid
4✔
14
from collections import Counter, defaultdict
4✔
15
from dataclasses import dataclass
4✔
16
from pathlib import Path
4✔
17
from typing import (
4✔
18
    TYPE_CHECKING,
19
    Any,
20
    ClassVar,
21
    NoReturn,
22
    Optional,
23
    Union,
24
)
25

26
import numpy as np
4✔
27
from qiskit import QuantumCircuit
4✔
28
from qiskit.providers import JobV1
4✔
29
from qiskit.providers.jobstatus import JobStatus
4✔
30
from qiskit.result.result import Result
4✔
31
from qiskit.utils.lazy_tester import contextlib
4✔
32
from tqdm import tqdm
4✔
33
from typing_extensions import Self, TypeAlias, assert_never
4✔
34

35
from qiskit_aqt_provider import api_models_generated, persistence
4✔
36
from qiskit_aqt_provider.aqt_options import AQTOptions
4✔
37
from qiskit_aqt_provider.circuit_to_aqt import circuits_to_aqt_job
4✔
38

39
if TYPE_CHECKING:  # pragma: no cover
40
    from qiskit_aqt_provider.aqt_resource import AQTResource
41

42

43
# Tags for the status of AQT API jobs
44

45

46
@dataclass
4✔
47
class JobFinished:
4✔
48
    """The job finished successfully."""
4✔
49

50
    status: ClassVar = JobStatus.DONE
4✔
51
    results: dict[int, list[list[int]]]
4✔
52

53

54
@dataclass
4✔
55
class JobFailed:
4✔
56
    """An error occurred during the job execution."""
4✔
57

58
    status: ClassVar = JobStatus.ERROR
4✔
59
    error: str
4✔
60

61

62
class JobQueued:
4✔
63
    """The job is queued."""
4✔
64

65
    status: ClassVar = JobStatus.QUEUED
4✔
66

67

68
@dataclass
4✔
69
class JobOngoing:
4✔
70
    """The job is running."""
4✔
71

72
    status: ClassVar = JobStatus.RUNNING
4✔
73
    finished_count: int
4✔
74

75

76
class JobCancelled:
4✔
77
    """The job was cancelled."""
4✔
78

79
    status = ClassVar = JobStatus.CANCELLED
4✔
80

81

82
JobStatusPayload: TypeAlias = Union[JobQueued, JobOngoing, JobFinished, JobFailed, JobCancelled]
4✔
83

84

85
@dataclass(frozen=True)
4✔
86
class Progress:
4✔
87
    """Progress information of a job."""
4✔
88

89
    finished_count: int
4✔
90
    """Number of completed circuits."""
3✔
91

92
    total_count: int
4✔
93
    """Total number of circuits in the job."""
4✔
94

95

96
@dataclass
4✔
97
class _MockProgressBar:
4✔
98
    """Minimal tqdm-compatible progress bar mock."""
4✔
99

100
    total: int
4✔
101
    """Total number of items in the job."""
3✔
102

103
    n: int = 0
4✔
104
    """Number of processed items."""
3✔
105

106
    def update(self, n: int = 1) -> None:
4✔
107
        """Update the number of processed items by `n`."""
108
        self.n += n
4✔
109

110
    def __enter__(self) -> Self:
4✔
111
        return self
4✔
112

113
    def __exit__(*args) -> None: ...
4✔
114

115

116
class AQTJob(JobV1):
4✔
117
    """Handle for quantum circuits jobs running on AQT backends.
4✔
118

119
    Jobs contain one or more quantum circuits that are executed with a common
120
    set of options (see :class:`AQTOptions <qiskit_aqt_provider.aqt_options.AQTOptions>`).
121

122
    Job handles should be retrieved from calls
123
    to :meth:`AQTResource.run <qiskit_aqt_provider.aqt_resource.AQTResource.run>`, which immediately
124
    returns after submitting the job. The :meth:`result` method allows blocking until a job
125
    completes:
126

127
    >>> import qiskit
128
    >>> from qiskit.providers import JobStatus
129
    >>> from qiskit_aqt_provider import AQTProvider
130
    >>>
131
    >>> backend = AQTProvider("").get_backend("offline_simulator_no_noise")
132
    >>>
133
    >>> qc = qiskit.QuantumCircuit(1)
134
    >>> _ = qc.rx(3.14, 0)
135
    >>> _ = qc.measure_all()
136
    >>> qc = qiskit.transpile(qc, backend)
137
    >>>
138
    >>> job = backend.run(qc, shots=100)
139
    >>> result = job.result()
140
    >>> job.status() is JobStatus.DONE
141
    True
142
    >>> result.success
143
    True
144
    >>> result.get_counts()
145
    {'1': 100}
146
    """
147

148
    _backend: "AQTResource"
4✔
149

150
    def __init__(
4✔
151
        self,
152
        backend: "AQTResource",
153
        circuits: list[QuantumCircuit],
154
        options: AQTOptions,
155
    ):
156
        """Initialize an :class:`AQTJob` instance.
157

158
        .. tip:: :class:`AQTJob` instances should not be created directly. Use
159
          :meth:`AQTResource.run <qiskit_aqt_provider.aqt_resource.AQTResource.run>`
160
          to submit circuits for execution and retrieve a job handle.
161

162
        Args:
163
            backend: backend to run the job on.
164
            circuits: list of circuits to execute.
165
            options: overridden resource options for this job.
166
        """
167
        super().__init__(backend, "")
4✔
168

169
        self.circuits = circuits
4✔
170
        self.options = options
4✔
171
        self.api_submit_payload = circuits_to_aqt_job(circuits, options.shots)
4✔
172

173
        self.status_payload: JobStatusPayload = JobQueued()
4✔
174

175
    @classmethod
4✔
176
    def restore(
4✔
177
        cls,
178
        job_id: str,
179
        *,
180
        access_token: Optional[str] = None,
181
        store_path: Optional[Path] = None,
182
        remove_from_store: bool = True,
183
    ) -> Self:
184
        """Restore a job handle from local persistent storage.
185

186
        .. warning:: The default local storage path depends on the `qiskit_aqt_provider`
187
            package version. Job persisted with a different package version will therefore
188
            **not** be found!
189

190
        .. hint:: If the job's execution backend is an offline simulator, the
191
            job is re-submitted to the simulation backend and the new job ID differs
192
            from the one passed to this function.
193

194
        Args:
195
            job_id: identifier of the job to retrieve.
196
            access_token: access token for the AQT cloud.
197
              See :class:`AQTProvider <qiskit_aqt_provider.aqt_provider.AQTProvider>`.
198
            store_path: local persistent storage directory.
199
              By default, use a standard cache directory.
200
            remove_from_store: if :data:`True`, remove the retrieved job's data from persistent
201
              storage after a successful load.
202

203
        Returns:
204
            A job handle for the passed `job_id`.
205

206
        Raises:
207
            JobNotFoundError: the target job was not found in persistent storage.
208
        """
209
        from qiskit_aqt_provider.aqt_provider import AQTProvider
4✔
210
        from qiskit_aqt_provider.aqt_resource import AQTResource, OfflineSimulatorResource
4✔
211

212
        store_path = persistence.get_store_path(store_path)
4✔
213
        data = persistence.Job.restore(job_id, store_path)
4✔
214

215
        # TODO: forward .env loading args?
216
        provider = AQTProvider(access_token)
4✔
217
        if data.resource.resource_type == "offline_simulator":
4✔
218
            # FIXME: persist with_noise_model and restore it
219
            resource = OfflineSimulatorResource(provider, data.resource, with_noise_model=False)
4✔
220
        else:
221
            resource = AQTResource(provider, data.resource)
4✔
222

223
        obj = cls(backend=resource, circuits=data.circuits.circuits, options=data.options)
4✔
224

225
        if data.resource.resource_type == "offline_simulator":
4✔
226
            # re-submit the job because we can't restore the backend state
227
            obj.submit()
4✔
228
        else:
229
            obj._job_id = job_id
4✔
230

231
        if remove_from_store:
4✔
232
            persistence.Job.remove_from_store(job_id, store_path)
4✔
233

234
        return obj
4✔
235

236
    def persist(self, *, store_path: Optional[Path] = None) -> Path:
4✔
237
        """Save this job to local persistent storage.
238

239
        .. warning:: Only jobs that have been submitted for execution
240
          can be persisted (a valid `job_id` is required).
241

242
        Args:
243
            store_path: local persistent storage directory.
244
              By default, use a standard cache directory.
245

246
        Returns:
247
            The path to the job data in local persistent storage.
248

249
        Raises:
250
            RuntimeError: the job was never submitted for execution.
251
        """
252
        if not self.job_id():
4✔
253
            raise RuntimeError("Can only persist submitted jobs.")
4✔
254

255
        store_path = persistence.get_store_path(store_path)
4✔
256
        data = persistence.Job(
4✔
257
            resource=self._backend.resource_id,
258
            circuits=persistence.Circuits(self.circuits),
259
            options=self.options,
260
        )
261

262
        return data.persist(self.job_id(), store_path)
4✔
263

264
    def submit(self) -> None:
4✔
265
        """Submit this job for execution.
266

267
        This operation is not blocking. Use :meth:`result()` to block until
268
        the job completes.
269

270
        Raises:
271
            RuntimeError: this job was already submitted.
272
        """
273
        if self.job_id():
4✔
274
            raise RuntimeError(f"Job already submitted (ID: {self.job_id()})")
4✔
275

276
        job_id = self._backend.submit(self)
4✔
277
        self._job_id = str(job_id)
4✔
278

279
    def status(self) -> JobStatus:
4✔
280
        """Query the job's status.
281

282
        Returns:
283
            Aggregated job status for all the circuits in this job.
284
        """
285
        payload = self._backend.result(uuid.UUID(self.job_id()))
4✔
286

287
        if isinstance(payload, api_models_generated.JobResponseRRQueued):
4✔
288
            self.status_payload = JobQueued()
4✔
289
        elif isinstance(payload, api_models_generated.JobResponseRROngoing):
4✔
290
            self.status_payload = JobOngoing(finished_count=payload.response.finished_count)
4✔
291
        elif isinstance(payload, api_models_generated.JobResponseRRFinished):
4✔
292
            self.status_payload = JobFinished(
4✔
293
                results={
294
                    int(circuit_index): [[sample.root for sample in shot] for shot in shots]
295
                    for circuit_index, shots in payload.response.result.items()
296
                }
297
            )
298
        elif isinstance(payload, api_models_generated.JobResponseRRError):
4✔
299
            self.status_payload = JobFailed(error=payload.response.message)
4✔
300
        elif isinstance(payload, api_models_generated.JobResponseRRCancelled):
4✔
301
            self.status_payload = JobCancelled()
4✔
302
        else:  # pragma: no cover
303
            assert_never(payload)
304

305
        return self.status_payload.status
4✔
306

307
    def progress(self) -> Progress:
4✔
308
        """Progress information for this job."""
309
        num_circuits = len(self.circuits)
4✔
310

311
        if isinstance(self.status_payload, JobQueued):
4✔
312
            return Progress(finished_count=0, total_count=num_circuits)
×
313

314
        if isinstance(self.status_payload, JobOngoing):
4✔
315
            return Progress(
4✔
316
                finished_count=self.status_payload.finished_count, total_count=num_circuits
317
            )
318

319
        # if the circuit is finished, failed, or cancelled, it is completed
320
        return Progress(finished_count=num_circuits, total_count=num_circuits)
4✔
321

322
    @property
4✔
323
    def error_message(self) -> Optional[str]:
4✔
324
        """Error message for this job (if any)."""
325
        if isinstance(self.status_payload, JobFailed):
4✔
326
            return self.status_payload.error
4✔
327

328
        return None
4✔
329

330
    def result(self) -> Result:
4✔
331
        """Block until all circuits have been evaluated and return the combined result.
332

333
        Success or error is signalled by the `success` field in the returned Result instance.
334

335
        Returns:
336
            The combined result of all circuit evaluations.
337
        """
338
        if self.options.with_progress_bar:
4✔
339
            context: Union[tqdm[NoReturn], _MockProgressBar] = tqdm(total=len(self.circuits))
4✔
340
        else:
341
            context = _MockProgressBar(total=len(self.circuits))
4✔
342

343
        with context as progress_bar:
4✔
344

345
            def callback(
4✔
346
                job_id: str,  # noqa: ARG001
347
                status: JobStatus,  # noqa: ARG001
348
                job: AQTJob,
349
            ) -> None:
350
                progress = job.progress()
4✔
351
                progress_bar.update(progress.finished_count - progress_bar.n)
4✔
352

353
            # one of DONE, CANCELLED, ERROR
354
            self.wait_for_final_state(
4✔
355
                timeout=self.options.query_timeout_seconds,
356
                wait=self.options.query_period_seconds,
357
                callback=callback,
358
            )
359

360
            # make sure the progress bar completes
361
            progress_bar.update(self.progress().finished_count - progress_bar.n)
4✔
362

363
        results = []
4✔
364

365
        if isinstance(self.status_payload, JobFinished):
4✔
366
            for circuit_index, circuit in enumerate(self.circuits):
4✔
367
                samples = self.status_payload.results[circuit_index]
4✔
368
                meas_map = _build_memory_mapping(circuit)
4✔
369
                data: dict[str, Any] = {
4✔
370
                    "counts": _format_counts(samples, meas_map),
371
                }
372

373
                if self.options.memory:
4✔
374
                    data["memory"] = [
4✔
375
                        "".join(str(x) for x in reversed(states)) for states in samples
376
                    ]
377

378
                results.append(
4✔
379
                    {
380
                        "shots": self.options.shots,
381
                        "success": True,
382
                        "status": JobStatus.DONE,
383
                        "data": data,
384
                        "header": {
385
                            "memory_slots": circuit.num_clbits,
386
                            "creg_sizes": [[reg.name, reg.size] for reg in circuit.cregs],
387
                            "qreg_sizes": [[reg.name, reg.size] for reg in circuit.qregs],
388
                            "name": circuit.name,
389
                            "metadata": circuit.metadata or {},
390
                        },
391
                    }
392
                )
393

394
        return Result.from_dict(
4✔
395
            {
396
                "backend_name": self._backend.name,
397
                "backend_version": self._backend.version,
398
                "qobj_id": id(self.circuits),
399
                "job_id": self.job_id(),
400
                "success": self.status_payload.status is JobStatus.DONE,
401
                "results": results,
402
                # Pass error message as metadata
403
                "error": self.error_message,
404
            }
405
        )
406

407

408
def _build_memory_mapping(circuit: QuantumCircuit) -> dict[int, set[int]]:
4✔
409
    """Scan the circuit for measurement instructions and collect qubit to classical bits mappings.
410

411
    Qubits can be mapped to multiple classical bits, possibly in different classical registers.
412
    The returned map only maps qubits referenced in a `measure` operation in the passed circuit.
413
    Qubits not targeted by a `measure` operation will not appear in the returned result.
414

415
    Parameters:
416
        circuit: the `QuantumCircuit` to analyze.
417

418
    Returns:
419
        the translation map for all measurement operations in the circuit.
420

421
    Examples:
422
        >>> qc = QuantumCircuit(2)
423
        >>> qc.measure_all()
424
        >>> _build_memory_mapping(qc)
425
        {0: {0}, 1: {1}}
426

427
        >>> qc = QuantumCircuit(2, 2)
428
        >>> _ = qc.measure([0, 1], [1, 0])
429
        >>> _build_memory_mapping(qc)
430
        {0: {1}, 1: {0}}
431

432
        >>> qc = QuantumCircuit(3, 2)
433
        >>> _ = qc.measure([0, 1], [0, 1])
434
        >>> _build_memory_mapping(qc)
435
        {0: {0}, 1: {1}}
436

437
        >>> qc = QuantumCircuit(4, 6)
438
        >>> _ = qc.measure([0, 1, 2, 3], [2, 3, 4, 5])
439
        >>> _build_memory_mapping(qc)
440
        {0: {2}, 1: {3}, 2: {4}, 3: {5}}
441

442
        >>> qc = QuantumCircuit(3, 4)
443
        >>> qc.measure_all(add_bits=False)
444
        >>> _build_memory_mapping(qc)
445
        {0: {0}, 1: {1}, 2: {2}}
446

447
        >>> qc = QuantumCircuit(3, 3)
448
        >>> _ = qc.x(0)
449
        >>> _ = qc.measure([0], [2])
450
        >>> _ = qc.y(1)
451
        >>> _ = qc.measure([1], [1])
452
        >>> _ = qc.x(2)
453
        >>> _ = qc.measure([2], [0])
454
        >>> _build_memory_mapping(qc)
455
        {0: {2}, 1: {1}, 2: {0}}
456

457
        5 qubits in two registers:
458

459
        >>> from qiskit import QuantumRegister, ClassicalRegister
460
        >>> qr0 = QuantumRegister(2)
461
        >>> qr1 = QuantumRegister(3)
462
        >>> cr = ClassicalRegister(2)
463
        >>> qc = QuantumCircuit(qr0, qr1, cr)
464
        >>> _ = qc.measure(qr0, cr)
465
        >>> _build_memory_mapping(qc)
466
        {0: {0}, 1: {1}}
467

468
        Multiple mapping of a qubit:
469

470
        >>> qc = QuantumCircuit(3, 3)
471
        >>> _ = qc.measure([0, 1], [0, 1])
472
        >>> _ = qc.measure([0], [2])
473
        >>> _build_memory_mapping(qc)
474
        {0: {0, 2}, 1: {1}}
475
    """
476
    qu2cl: defaultdict[int, set[int]] = defaultdict(set)
4✔
477

478
    for instruction in circuit.data:
4✔
479
        if instruction.operation.name == "measure":
4✔
480
            for qubit, clbit in zip(instruction.qubits, instruction.clbits):
4✔
481
                qu2cl[circuit.find_bit(qubit).index].add(circuit.find_bit(clbit).index)
4✔
482

483
    return dict(qu2cl)
4✔
484

485

486
def _shot_to_int(
4✔
487
    fluorescence_states: list[int], qubit_to_bit: Optional[dict[int, set[int]]] = None
488
) -> int:
489
    """Format the detected fluorescence states from a single shot as an integer.
490

491
    This follows the Qiskit ordering convention, where bit 0 in the classical register is mapped
492
    to bit 0 in the returned integer. The first classical register in the original circuit
493
    represents the least-significant bits in the integer representation.
494

495
    An optional translation map from the quantum to the classical register can be applied.
496
    If given, only the qubits registered in the translation map are present in the return value,
497
    at the index given by the translation map.
498

499
    Parameters:
500
        fluorescence_states: detected fluorescence states for this shot
501
        qubit_to_bit: optional translation map from quantum register to classical register positions
502

503
    Returns:
504
        integral representation of the shot result, with the translation map applied.
505

506
    Examples:
507
       Without a translation map, the natural mapping is used (n -> n):
508

509
        >>> _shot_to_int([1])
510
        1
511

512
        >>> _shot_to_int([0, 0, 1])
513
        4
514

515
        >>> _shot_to_int([0, 1, 1])
516
        6
517

518
        Swap qubits 1 and 2 in the classical register:
519

520
        >>> _shot_to_int([1, 0, 1], {0: {0}, 1: {2}, 2: {1}})
521
        3
522

523
        If the map is partial, only the mapped qubits are present in the output:
524

525
        >>> _shot_to_int([1, 0, 1], {1: {2}, 2: {1}})
526
        2
527

528
        One can translate into a classical register larger than the
529
        qubit register.
530

531
        Warning: the classical register is always initialized to 0.
532

533
        >>> _shot_to_int([1], {0: {1}})
534
        2
535

536
        >>> _shot_to_int([0, 1, 1], {0: {3}, 1: {4}, 2: {5}}) == (0b110 << 3)
537
        True
538

539
        or with a map larger than the qubit space:
540

541
        >>> _shot_to_int([1], {0: {0}, 1: {1}})
542
        1
543

544
        Consider the typical example of two quantum registers (the second one contains
545
        ancilla qubits) and one classical register:
546

547
        >>> from qiskit import QuantumRegister, ClassicalRegister
548
        >>> qr_meas = QuantumRegister(2)
549
        >>> qr_ancilla = QuantumRegister(3)
550
        >>> cr = ClassicalRegister(2)
551
        >>> qc = QuantumCircuit(qr_meas, qr_ancilla, cr)
552
        >>> _ = qc.measure(qr_meas, cr)
553
        >>> tr_map = _build_memory_mapping(qc)
554

555
        We assume that a single shot gave the result:
556

557
        >>> ancillas = [1, 1, 0]
558
        >>> meas = [1, 0]
559

560
        Then the corresponding output is 0b01 (measurement qubits mapped straight
561
        to the classical register of length 2):
562

563
        >>> _shot_to_int(meas + ancillas, tr_map) == 0b01
564
        True
565

566
        One can overwrite qr_meas[1] with qr_ancilla[0]:
567

568
        >>> _ = qc.measure(qr_ancilla[0], cr[1])
569
        >>> tr_map = _build_memory_mapping(qc)
570
        >>> _shot_to_int(meas + ancillas, tr_map) == 0b11
571
        True
572
    """
573
    tr_map = qubit_to_bit or {}
4✔
574

575
    if tr_map:
4✔
576
        # allocate a zero-initialized classical register
577
        # TODO: support pre-initialized classical registers
578
        clbits = max(max(d) for d in tr_map.values()) + 1
4✔
579
        creg = [0] * clbits
4✔
580

581
        for src_index, dest_indices in tr_map.items():
4✔
582
            # the translation map could map more than just the measured qubits
583
            with contextlib.suppress(IndexError):
4✔
584
                for dest_index in dest_indices:
4✔
585
                    creg[dest_index] = fluorescence_states[src_index]
4✔
586
    else:
587
        creg = fluorescence_states.copy()
4✔
588

589
    return int((np.left_shift(1, np.arange(len(creg))) * creg).sum())
4✔
590

591

592
def _format_counts(
4✔
593
    samples: list[list[int]], qubit_to_bit: Optional[dict[int, set[int]]] = None
594
) -> dict[str, int]:
595
    """Format all shots results from a circuit evaluation.
596

597
    The returned dictionary is compatible with Qiskit's `ExperimentResultData`
598
    `counts` field.
599

600
    Keys are hexadecimal string representations of the detected states, with the
601
    optional `QuantumRegister` to `ClassicalRegister` applied. Values are the occurrences
602
    of the keys.
603

604
    Parameters:
605
        samples: detected qubit fluorescence states for all shots
606
        qubit_to_bit: optional quantum to classical register translation map
607

608
    Returns:
609
        collected counts, for `ExperimentResultData`.
610

611
    Examples:
612
        >>> _format_counts([[1, 0, 0], [0, 1, 0], [1, 0, 0]])
613
        {'0x1': 2, '0x2': 1}
614

615
        >>> _format_counts([[1, 0, 0], [0, 1, 0], [1, 0, 0]], {0: {2}, 1: {1}, 2: {0}})
616
        {'0x4': 2, '0x2': 1}
617
    """
618
    return dict(Counter(hex(_shot_to_int(shot, qubit_to_bit)) for shot in samples))
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc