• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

qiskit-community / qiskit-aqt-provider / 7567300363

18 Jan 2024 08:32AM UTC coverage: 99.83%. Remained the same
7567300363

push

github

web-flow
Prepare for release 1.1.0 (#120)

2353 of 2357 relevant lines covered (99.83%)

3.97 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

99.38
/qiskit_aqt_provider/aqt_job.py
1
# This code is part of Qiskit.
2
#
3
# (C) Copyright IBM 2019, Alpine Quantum Technologies GmbH 2022.
4
#
5
# This code is licensed under the Apache License, Version 2.0. You may
6
# obtain a copy of this license in the LICENSE.txt file in the root directory
7
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
8
#
9
# Any modifications or derivative works of this code must retain this
10
# copyright notice, and modified files need to carry a notice indicating
11
# that they have been altered from the originals.
12

13
import uuid
4✔
14
from collections import Counter, defaultdict
4✔
15
from dataclasses import dataclass
4✔
16
from pathlib import Path
4✔
17
from typing import (
4✔
18
    TYPE_CHECKING,
19
    Any,
20
    ClassVar,
21
    DefaultDict,
22
    Dict,
23
    List,
24
    NoReturn,
25
    Optional,
26
    Set,
27
    Union,
28
)
29

30
import numpy as np
4✔
31
from qiskit import QuantumCircuit
4✔
32
from qiskit.providers import JobV1
4✔
33
from qiskit.providers.jobstatus import JobStatus
4✔
34
from qiskit.result.result import Result
4✔
35
from qiskit.utils.lazy_tester import contextlib
4✔
36
from tqdm import tqdm
4✔
37
from typing_extensions import Self, TypeAlias, assert_never
4✔
38

39
from qiskit_aqt_provider import api_models_generated, persistence
4✔
40
from qiskit_aqt_provider.aqt_options import AQTOptions
4✔
41
from qiskit_aqt_provider.circuit_to_aqt import circuits_to_aqt_job
4✔
42

43
if TYPE_CHECKING:  # pragma: no cover
44
    from qiskit_aqt_provider.aqt_resource import AQTResource
45

46

47
# Tags for the status of AQT API jobs
48

49

50
@dataclass
4✔
51
class JobFinished:
4✔
52
    """The job finished successfully."""
4✔
53

54
    status: ClassVar = JobStatus.DONE
4✔
55
    results: Dict[int, List[List[int]]]
4✔
56

57

58
@dataclass
4✔
59
class JobFailed:
4✔
60
    """An error occurred during the job execution."""
4✔
61

62
    status: ClassVar = JobStatus.ERROR
4✔
63
    error: str
4✔
64

65

66
class JobQueued:
4✔
67
    """The job is queued."""
4✔
68

69
    status: ClassVar = JobStatus.QUEUED
4✔
70

71

72
@dataclass
4✔
73
class JobOngoing:
4✔
74
    """The job is running."""
4✔
75

76
    status: ClassVar = JobStatus.RUNNING
4✔
77
    finished_count: int
4✔
78

79

80
class JobCancelled:
4✔
81
    """The job was cancelled."""
4✔
82

83
    status = ClassVar = JobStatus.CANCELLED
4✔
84

85

86
JobStatusPayload: TypeAlias = Union[JobQueued, JobOngoing, JobFinished, JobFailed, JobCancelled]
4✔
87

88

89
@dataclass(frozen=True)
4✔
90
class Progress:
4✔
91
    """Progress information of a job."""
4✔
92

93
    finished_count: int
4✔
94
    """Number of completed circuits."""
2✔
95

96
    total_count: int
4✔
97
    """Total number of circuits in the job."""
4✔
98

99

100
@dataclass
4✔
101
class _MockProgressBar:
4✔
102
    """Minimal tqdm-compatible progress bar mock."""
4✔
103

104
    total: int
4✔
105
    """Total number of items in the job."""
2✔
106

107
    n: int = 0
4✔
108
    """Number of processed items."""
2✔
109

110
    def update(self, n: int = 1) -> None:
4✔
111
        """Update the number of processed items by `n`."""
112
        self.n += n
4✔
113

114
    def __enter__(self) -> Self:
4✔
115
        return self
4✔
116

117
    def __exit__(*args) -> None:
4✔
118
        ...
4✔
119

120

121
class AQTJob(JobV1):
4✔
122
    """Handle for quantum circuits jobs running on AQT backends.
4✔
123

124
    Jobs contain one or more quantum circuits that are executed with a common
125
    set of options (see :class:`AQTOptions <qiskit_aqt_provider.aqt_options.AQTOptions>`).
126

127
    Job handles should be retrieved from calls to
128
    :func:`qiskit.execute <qiskit.execute_function.execute>`
129
    or :meth:`AQTResource.run <qiskit_aqt_provider.aqt_resource.AQTResource.run>`, both of which
130
    immediately submit the job for execution. The :meth:`result` method allows blocking until a job
131
    completes:
132

133
    >>> import qiskit
134
    >>> from qiskit.providers import JobStatus
135
    >>> from qiskit_aqt_provider import AQTProvider
136
    ...
137
    >>> backend = AQTProvider("").get_backend("offline_simulator_no_noise")
138
    ...
139
    >>> qc = qiskit.QuantumCircuit(1)
140
    >>> _ = qc.rx(3.14, 0)
141
    >>> _ = qc.measure_all()
142
    ...
143
    >>> job = qiskit.execute(qc, backend, shots=100)
144
    >>> result = job.result()
145
    >>> job.status() is JobStatus.DONE
146
    True
147
    >>> result.success
148
    True
149
    >>> result.get_counts()
150
    {'1': 100}
151
    """
152

153
    _backend: "AQTResource"
4✔
154

155
    def __init__(
4✔
156
        self,
157
        backend: "AQTResource",
158
        circuits: List[QuantumCircuit],
159
        options: AQTOptions,
160
    ):
161
        """Initialize an :class:`AQTJob` instance.
162

163
        .. tip:: :class:`AQTJob` instances should not be created directly. Use
164
          :meth:`AQTResource.run <qiskit_aqt_provider.aqt_resource.AQTResource.run>`
165
          to submit circuits for execution and retrieve a job handle.
166

167
        Args:
168
            backend: backend to run the job on.
169
            circuits: list of circuits to execute.
170
            options: overridden resource options for this job.
171
        """
172
        super().__init__(backend, "")
4✔
173

174
        self.circuits = circuits
4✔
175
        self.options = options
4✔
176
        self.api_submit_payload = circuits_to_aqt_job(circuits, options.shots)
4✔
177

178
        self.status_payload: JobStatusPayload = JobQueued()
4✔
179

180
    @classmethod
4✔
181
    def restore(
4✔
182
        cls,
183
        job_id: str,
184
        *,
185
        access_token: Optional[str] = None,
186
        store_path: Optional[Path] = None,
187
        remove_from_store: bool = True,
188
    ) -> Self:
189
        """Restore a job handle from local persistent storage.
190

191
        .. warning:: The default local storage path depends on the `qiskit_aqt_provider`
192
            package version. Job persisted with a different package version will therefore
193
            **not** be found!
194

195
        .. hint:: If the job's execution backend is an offline simulator, the
196
            job is re-submitted to the simulation backend and the new job ID differs
197
            from the one passed to this function.
198

199
        Args:
200
            job_id: identifier of the job to retrieve.
201
            access_token: access token for the AQT cloud.
202
              See :class:`AQTProvider <qiskit_aqt_provider.aqt_provider.AQTProvider>`.
203
            store_path: local persistent storage directory.
204
              By default, use a standard cache directory.
205
            remove_from_store: if :data:`True`, remove the retrieved job's data from persistent
206
              storage after a successful load.
207

208
        Returns:
209
            A job handle for the passed `job_id`.
210

211
        Raises:
212
            JobNotFoundError: the target job was not found in persistent storage.
213
        """
214
        from qiskit_aqt_provider.aqt_provider import AQTProvider
4✔
215
        from qiskit_aqt_provider.aqt_resource import AQTResource, OfflineSimulatorResource
4✔
216

217
        store_path = persistence.get_store_path(store_path)
4✔
218
        data = persistence.Job.restore(job_id, store_path)
4✔
219

220
        # TODO: forward .env loading args?
221
        provider = AQTProvider(access_token)
4✔
222
        if data.resource.resource_type == "offline_simulator":
4✔
223
            # FIXME: persist with_noise_model and restore it
224
            resource = OfflineSimulatorResource(provider, data.resource, with_noise_model=False)
4✔
225
        else:
226
            resource = AQTResource(provider, data.resource)
4✔
227

228
        obj = cls(backend=resource, circuits=data.circuits.circuits, options=data.options)
4✔
229

230
        if data.resource.resource_type == "offline_simulator":
4✔
231
            # re-submit the job because we can't restore the backend state
232
            obj.submit()
4✔
233
        else:
234
            obj._job_id = job_id
4✔
235

236
        if remove_from_store:
4✔
237
            persistence.Job.remove_from_store(job_id, store_path)
4✔
238

239
        return obj
4✔
240

241
    def persist(self, *, store_path: Optional[Path] = None) -> Path:
4✔
242
        """Save this job to local persistent storage.
243

244
        .. warning:: Only jobs that have been submitted for execution
245
          can be persisted (a valid `job_id` is required).
246

247
        Args:
248
            store_path: local persistent storage directory.
249
              By default, use a standard cache directory.
250

251
        Returns:
252
            The path to the job data in local persistent storage.
253

254
        Raises:
255
            RuntimeError: the job was never submitted for execution.
256
        """
257
        if not self.job_id():
4✔
258
            raise RuntimeError("Can only persist submitted jobs.")
4✔
259

260
        store_path = persistence.get_store_path(store_path)
4✔
261
        data = persistence.Job(
4✔
262
            resource=self._backend.resource_id,
263
            circuits=persistence.Circuits(self.circuits),
264
            options=self.options,
265
        )
266

267
        return data.persist(self.job_id(), store_path)
4✔
268

269
    def submit(self) -> None:
4✔
270
        """Submit this job for execution.
271

272
        This operation is not blocking. Use :meth:`result()` to block until
273
        the job completes.
274

275
        Raises:
276
            RuntimeError: this job was already submitted.
277
        """
278
        if self.job_id():
4✔
279
            raise RuntimeError(f"Job already submitted (ID: {self.job_id()})")
4✔
280

281
        job_id = self._backend.submit(self)
4✔
282
        self._job_id = str(job_id)
4✔
283

284
    def status(self) -> JobStatus:
4✔
285
        """Query the job's status.
286

287
        Returns:
288
            Aggregated job status for all the circuits in this job.
289
        """
290
        payload = self._backend.result(uuid.UUID(self.job_id()))
4✔
291

292
        if isinstance(payload, api_models_generated.JobResponseRRQueued):
4✔
293
            self.status_payload = JobQueued()
4✔
294
        elif isinstance(payload, api_models_generated.JobResponseRROngoing):
4✔
295
            self.status_payload = JobOngoing(finished_count=payload.response.finished_count)
4✔
296
        elif isinstance(payload, api_models_generated.JobResponseRRFinished):
4✔
297
            self.status_payload = JobFinished(
4✔
298
                results={
299
                    int(circuit_index): [[sample.root for sample in shot] for shot in shots]
300
                    for circuit_index, shots in payload.response.result.items()
301
                }
302
            )
303
        elif isinstance(payload, api_models_generated.JobResponseRRError):
4✔
304
            self.status_payload = JobFailed(error=payload.response.message)
4✔
305
        elif isinstance(payload, api_models_generated.JobResponseRRCancelled):
4✔
306
            self.status_payload = JobCancelled()
4✔
307
        else:  # pragma: no cover
308
            assert_never(payload)
309

310
        return self.status_payload.status
4✔
311

312
    def progress(self) -> Progress:
4✔
313
        """Progress information for this job."""
314
        num_circuits = len(self.circuits)
4✔
315

316
        if isinstance(self.status_payload, JobQueued):
4✔
317
            return Progress(finished_count=0, total_count=num_circuits)
×
318

319
        if isinstance(self.status_payload, JobOngoing):
4✔
320
            return Progress(
4✔
321
                finished_count=self.status_payload.finished_count, total_count=num_circuits
322
            )
323

324
        # if the circuit is finished, failed, or cancelled, it is completed
325
        return Progress(finished_count=num_circuits, total_count=num_circuits)
4✔
326

327
    @property
4✔
328
    def error_message(self) -> Optional[str]:
4✔
329
        """Error message for this job (if any)."""
330
        if isinstance(self.status_payload, JobFailed):
4✔
331
            return self.status_payload.error
4✔
332

333
        return None
4✔
334

335
    def result(self) -> Result:
4✔
336
        """Block until all circuits have been evaluated and return the combined result.
337

338
        Success or error is signalled by the `success` field in the returned Result instance.
339

340
        Returns:
341
            The combined result of all circuit evaluations.
342
        """
343
        if self.options.with_progress_bar:
4✔
344
            context: Union[tqdm[NoReturn], _MockProgressBar] = tqdm(total=len(self.circuits))
4✔
345
        else:
346
            context = _MockProgressBar(total=len(self.circuits))
4✔
347

348
        with context as progress_bar:
4✔
349

350
            def callback(
4✔
351
                job_id: str,  # noqa: ARG001
352
                status: JobStatus,  # noqa: ARG001
353
                job: AQTJob,
354
            ) -> None:
355
                progress = job.progress()
4✔
356
                progress_bar.update(progress.finished_count - progress_bar.n)
4✔
357

358
            # one of DONE, CANCELLED, ERROR
359
            self.wait_for_final_state(
4✔
360
                timeout=self.options.query_timeout_seconds,
361
                wait=self.options.query_period_seconds,
362
                callback=callback,
363
            )
364

365
            # make sure the progress bar completes
366
            progress_bar.update(self.progress().finished_count - progress_bar.n)
4✔
367

368
        results = []
4✔
369

370
        if isinstance(self.status_payload, JobFinished):
4✔
371
            for circuit_index, circuit in enumerate(self.circuits):
4✔
372
                samples = self.status_payload.results[circuit_index]
4✔
373
                meas_map = _build_memory_mapping(circuit)
4✔
374
                data: Dict[str, Any] = {
4✔
375
                    "counts": _format_counts(samples, meas_map),
376
                }
377

378
                if self.options.memory:
4✔
379
                    data["memory"] = [
4✔
380
                        "".join(str(x) for x in reversed(states)) for states in samples
381
                    ]
382

383
                results.append(
4✔
384
                    {
385
                        "shots": self.options.shots,
386
                        "success": True,
387
                        "status": JobStatus.DONE,
388
                        "data": data,
389
                        "header": {
390
                            "memory_slots": circuit.num_clbits,
391
                            "creg_sizes": [[reg.name, reg.size] for reg in circuit.cregs],
392
                            "qreg_sizes": [[reg.name, reg.size] for reg in circuit.qregs],
393
                            "name": circuit.name,
394
                            "metadata": circuit.metadata or {},
395
                        },
396
                    }
397
                )
398

399
        return Result.from_dict(
4✔
400
            {
401
                "backend_name": self._backend.name,
402
                "backend_version": self._backend.version,
403
                "qobj_id": id(self.circuits),
404
                "job_id": self.job_id(),
405
                "success": self.status_payload.status is JobStatus.DONE,
406
                "results": results,
407
                # Pass error message as metadata
408
                "error": self.error_message,
409
            }
410
        )
411

412

413
def _build_memory_mapping(circuit: QuantumCircuit) -> Dict[int, Set[int]]:
4✔
414
    """Scan the circuit for measurement instructions and collect qubit to classical bits mappings.
415

416
    Qubits can be mapped to multiple classical bits, possibly in different classical registers.
417
    The returned map only maps qubits referenced in a `measure` operation in the passed circuit.
418
    Qubits not targeted by a `measure` operation will not appear in the returned result.
419

420
    Parameters:
421
        circuit: the `QuantumCircuit` to analyze.
422

423
    Returns:
424
        the translation map for all measurement operations in the circuit.
425

426
    Examples:
427
        >>> qc = QuantumCircuit(2)
428
        >>> qc.measure_all()
429
        >>> _build_memory_mapping(qc)
430
        {0: {0}, 1: {1}}
431

432
        >>> qc = QuantumCircuit(2, 2)
433
        >>> _ = qc.measure([0, 1], [1, 0])
434
        >>> _build_memory_mapping(qc)
435
        {0: {1}, 1: {0}}
436

437
        >>> qc = QuantumCircuit(3, 2)
438
        >>> _ = qc.measure([0, 1], [0, 1])
439
        >>> _build_memory_mapping(qc)
440
        {0: {0}, 1: {1}}
441

442
        >>> qc = QuantumCircuit(4, 6)
443
        >>> _ = qc.measure([0, 1, 2, 3], [2, 3, 4, 5])
444
        >>> _build_memory_mapping(qc)
445
        {0: {2}, 1: {3}, 2: {4}, 3: {5}}
446

447
        >>> qc = QuantumCircuit(3, 4)
448
        >>> qc.measure_all(add_bits=False)
449
        >>> _build_memory_mapping(qc)
450
        {0: {0}, 1: {1}, 2: {2}}
451

452
        >>> qc = QuantumCircuit(3, 3)
453
        >>> _ = qc.x(0)
454
        >>> _ = qc.measure([0], [2])
455
        >>> _ = qc.y(1)
456
        >>> _ = qc.measure([1], [1])
457
        >>> _ = qc.x(2)
458
        >>> _ = qc.measure([2], [0])
459
        >>> _build_memory_mapping(qc)
460
        {0: {2}, 1: {1}, 2: {0}}
461

462
        5 qubits in two registers:
463

464
        >>> from qiskit import QuantumRegister, ClassicalRegister
465
        >>> qr0 = QuantumRegister(2)
466
        >>> qr1 = QuantumRegister(3)
467
        >>> cr = ClassicalRegister(2)
468
        >>> qc = QuantumCircuit(qr0, qr1, cr)
469
        >>> _ = qc.measure(qr0, cr)
470
        >>> _build_memory_mapping(qc)
471
        {0: {0}, 1: {1}}
472

473
        Multiple mapping of a qubit:
474

475
        >>> qc = QuantumCircuit(3, 3)
476
        >>> _ = qc.measure([0, 1], [0, 1])
477
        >>> _ = qc.measure([0], [2])
478
        >>> _build_memory_mapping(qc)
479
        {0: {0, 2}, 1: {1}}
480
    """
481
    qu2cl: DefaultDict[int, Set[int]] = defaultdict(set)
4✔
482

483
    for instruction in circuit.data:
4✔
484
        if instruction.operation.name == "measure":
4✔
485
            for qubit, clbit in zip(instruction.qubits, instruction.clbits):
4✔
486
                qu2cl[circuit.find_bit(qubit).index].add(circuit.find_bit(clbit).index)
4✔
487

488
    return dict(qu2cl)
4✔
489

490

491
def _shot_to_int(
4✔
492
    fluorescence_states: List[int], qubit_to_bit: Optional[Dict[int, Set[int]]] = None
493
) -> int:
494
    """Format the detected fluorescence states from a single shot as an integer.
495

496
    This follows the Qiskit ordering convention, where bit 0 in the classical register is mapped
497
    to bit 0 in the returned integer. The first classical register in the original circuit
498
    represents the least-significant bits in the interger representation.
499

500
    An optional translation map from the quantum to the classical register can be applied.
501
    If given, only the qubits registered in the translation map are present in the return value,
502
    at the index given by the translation map.
503

504
    Parameters:
505
        fluorescence_states: detected fluorescence states for this shot
506
        qubit_to_bit: optional translation map from quantum register to classical register positions
507

508
    Returns:
509
        integral representation of the shot result, with the translation map applied.
510

511
    Examples:
512
       Without a translation map, the natural mapping is used (n -> n):
513

514
        >>> _shot_to_int([1])
515
        1
516

517
        >>> _shot_to_int([0, 0, 1])
518
        4
519

520
        >>> _shot_to_int([0, 1, 1])
521
        6
522

523
        Swap qubits 1 and 2 in the classical register:
524

525
        >>> _shot_to_int([1, 0, 1], {0: {0}, 1: {2}, 2: {1}})
526
        3
527

528
        If the map is partial, only the mapped qubits are present in the output:
529

530
        >>> _shot_to_int([1, 0, 1], {1: {2}, 2: {1}})
531
        2
532

533
        One can translate into a classical register larger than the
534
        qubit register.
535

536
        Warning: the classical register is always initialized to 0.
537

538
        >>> _shot_to_int([1], {0: {1}})
539
        2
540

541
        >>> _shot_to_int([0, 1, 1], {0: {3}, 1: {4}, 2: {5}}) == (0b110 << 3)
542
        True
543

544
        or with a map larger than the qubit space:
545

546
        >>> _shot_to_int([1], {0: {0}, 1: {1}})
547
        1
548

549
        Consider the typical example of two quantum registers (the second one contains
550
        ancilla qubits) and one classical register:
551

552
        >>> from qiskit import QuantumRegister, ClassicalRegister
553
        >>> qr_meas = QuantumRegister(2)
554
        >>> qr_ancilla = QuantumRegister(3)
555
        >>> cr = ClassicalRegister(2)
556
        >>> qc = QuantumCircuit(qr_meas, qr_ancilla, cr)
557
        >>> _ = qc.measure(qr_meas, cr)
558
        >>> tr_map = _build_memory_mapping(qc)
559

560
        We assume that a single shot gave the result:
561

562
        >>> ancillas = [1, 1, 0]
563
        >>> meas = [1, 0]
564

565
        Then the corresponding output is 0b01 (measurement qubits mapped straight
566
        to the classical register of length 2):
567

568
        >>> _shot_to_int(meas + ancillas, tr_map) == 0b01
569
        True
570

571
        One can overwrite qr_meas[1] with qr_ancilla[0]:
572

573
        >>> _ = qc.measure(qr_ancilla[0], cr[1])
574
        >>> tr_map = _build_memory_mapping(qc)
575
        >>> _shot_to_int(meas + ancillas, tr_map) == 0b11
576
        True
577
    """
578
    tr_map = qubit_to_bit or {}
4✔
579

580
    if tr_map:
4✔
581
        # allocate a zero-initialized classical register
582
        # TODO: support pre-initialized classical registers
583
        clbits = max(max(d) for d in tr_map.values()) + 1
4✔
584
        creg = [0] * clbits
4✔
585

586
        for src_index, dest_indices in tr_map.items():
4✔
587
            # the translation map could map more than just the measured qubits
588
            with contextlib.suppress(IndexError):
4✔
589
                for dest_index in dest_indices:
4✔
590
                    creg[dest_index] = fluorescence_states[src_index]
4✔
591
    else:
592
        creg = fluorescence_states.copy()
4✔
593

594
    return int((np.left_shift(1, np.arange(len(creg))) * creg).sum())
4✔
595

596

597
def _format_counts(
4✔
598
    samples: List[List[int]], qubit_to_bit: Optional[Dict[int, Set[int]]] = None
599
) -> Dict[str, int]:
600
    """Format all shots results from a circuit evaluation.
601

602
    The returned dictionary is compatible with Qiskit's `ExperimentResultData`
603
    `counts` field.
604

605
    Keys are hexadecimal string representations of the detected states, with the
606
    optional `QuantumRegister` to `ClassicalRegister` applied. Values are the occurrences
607
    of the keys.
608

609
    Parameters:
610
        samples: detected qubit fluorescence states for all shots
611
        qubit_to_bit: optional quantum to classical register translation map
612

613
    Returns:
614
        collected counts, for `ExperimentResultData`.
615

616
    Examples:
617
        >>> _format_counts([[1, 0, 0], [0, 1, 0], [1, 0, 0]])
618
        {'0x1': 2, '0x2': 1}
619

620
        >>> _format_counts([[1, 0, 0], [0, 1, 0], [1, 0, 0]], {0: {2}, 1: {1}, 2: {0}})
621
        {'0x4': 2, '0x2': 1}
622
    """
623
    return dict(Counter(hex(_shot_to_int(shot, qubit_to_bit)) for shot in samples))
4✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc