• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Quang00 / dqsweep / 13927664269

18 Mar 2025 03:52PM UTC coverage: 85.714% (-0.1%) from 85.854%
13927664269

push

github

Quang00
refactor code

23 of 52 new or added lines in 2 files covered. (44.23%)

1 existing line in 1 file now uncovered.

528 of 616 relevant lines covered (85.71%)

1.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

66.49
/experiments/utils.py
1
import itertools
2✔
2
import os
2✔
3
from typing import Callable, Generator, List, Union
2✔
4

5
import matplotlib.pyplot as plt
2✔
6
import numpy as np
2✔
7
import pandas as pd
2✔
8
from netqasm.sdk import Qubit
2✔
9
from netsquid.qubits.dmutil import dm_fidelity
2✔
10

11
from squidasm.run.stack.config import StackNetworkConfig
2✔
12
from squidasm.run.stack.run import run
2✔
13
from squidasm.sim.stack.program import ProgramContext
2✔
14
from squidasm.util import get_qubit_state
2✔
15
from squidasm.util.routines import teleport_recv, teleport_send
2✔
16

17
# =============================================================================
18
# Constants
19
# =============================================================================
20
LOG_SCALE_PARAMS = {
2✔
21
    "length",
22
    "T1",
23
    "T2",
24
    "single_qubit_gate_time",
25
    "two_qubit_gate_time",
26
}
27

28

29
# =============================================================================
30
# Helper Functions
31
# =============================================================================
32
def truncate_param(name: str, char: str = "_", n: int = 4) -> str:
2✔
33
    """Truncates a parameter name to improve readability in plots.
34

35
    Args:
36
        name (str): Parameter name to truncate.
37
        char (str, optional): Character to split the name. Defaults to "_".
38
        n (int, optional): Number of tokens to keep. Defaults to 4.
39

40
    Returns:
41
        str: Truncated parameter name.
42
    """
43
    return " ".join(name.split(char)[:n]).capitalize()
2✔
44

45

46
def create_subdir(
2✔
47
    directory: str, experiment: str, sweep_params: Union[list, str]
48
) -> str:
49
    """Creates a structured subdirectory for storing experiment results.
50

51
    Args:
52
        directory (str): The main results directory (e.g., "results").
53
        experiment (str): The experiment name (e.g., "pingpong").
54
        sweep_params (list | str): List of swept parameters (or a string).
55

56
    Returns:
57
        str: Path to the created experiment-specific directory.
58
    """
59
    if not os.path.exists(directory):
2✔
60
        os.makedirs(directory)
2✔
61

62
    if isinstance(sweep_params, str):
2✔
63
        sweep_params = sweep_params.split(",")
2✔
64

65
    sweep_params = [param.strip() for param in sweep_params]
2✔
66
    param_names = (
2✔
67
        "_".join(truncate_param(param, n=1) for param in sweep_params)
68
        if sweep_params
69
        else "default"
70
    )
71
    experiment_dir = os.path.join(directory, f"{experiment}_{param_names}")
2✔
72
    cpt = 1
2✔
73
    new_subdir = experiment_dir
2✔
74

75
    while os.path.exists(new_subdir):
2✔
76
        new_subdir = f"{experiment_dir}_{cpt}"
2✔
77
        cpt += 1
2✔
78

79
    os.makedirs(new_subdir)
2✔
80

81
    return new_subdir
2✔
82

83

84
def parse_range(range_str: str, param_name: str) -> np.ndarray:
2✔
85
    """Parses a range string and returns a numpy array of values.
86

87
    Uses logarithmic scaling if the parameter is in LOG_SCALE_PARAMS.
88

89
    Args:
90
        range_str (str): Range in format "start,end,points".
91
        param_name (str): The name of the parameter being parsed.
92

93
    Returns:
94
        np.ndarray: Array of evenly spaced values.
95
    """
96
    try:
2✔
97
        start, end, points = map(float, range_str.split(","))
2✔
98
        if param_name in LOG_SCALE_PARAMS:
2✔
99
            return np.logspace(start, end, int(points))
2✔
100
        return np.linspace(start, end, int(points))
2✔
101
    except ValueError:
×
102
        raise ValueError("Invalid format. Use 'start,end,points'.") from None
×
103

104

105
def compute_fidelity(
2✔
106
    qubit: Qubit, owner: str, dm_state: np.ndarray, full_state: bool = True
107
) -> float:
108
    """Computes the fidelity between the density matrix of a given qubit and
109
    the density matrix constructed from a reference state vector.
110

111
    Args:
112
        qubit (Qubit): The qubit whose state is to be evaluated.
113
        owner (str): The owner of the qubit.
114
        dm_state (np.ndarray): The reference state vector.
115
        full_state (bool): Flag to retrieve the full entangled state.
116

117
    Returns:
118
        float: The fidelity between the two density matrices.
119
    """
120
    dm = get_qubit_state(qubit, owner, full_state=full_state)
2✔
121
    dm_ref = np.outer(dm_state, np.conjugate(dm_state))
2✔
122
    fidelity = dm_fidelity(dm, dm_ref, dm_check=False)
2✔
123

124
    return fidelity
2✔
125

126

127
def metric_correlation(
2✔
128
    df: pd.DataFrame,
129
    sweep_params: list,
130
    metric_cols: list,
131
    output_dir: str,
132
    experiment: str,
133
):
134
    """Computes and generate a txt file for parameter-performance correlation.
135

136
    Args:
137
        df (pd.DataFrame): Dataframe containing parameter values and metrics.
138
        sweep_params (list): List of parameters being swept.
139
        metric_cols (list): List of performance metrics.
140
        output_dir (str): Directory to save the plot.
141
        experiment (str): Experiment name.
142
    """
143
    # Calculate the correlation for the selected columns
144
    corr = df[sweep_params + metric_cols].corr().loc[sweep_params, metric_cols]
×
145
    filename = os.path.join(output_dir, f"{experiment}_corr.txt")
×
146

147
    with open(filename, "w") as f:
×
148
        f.write("Parameter\t" + "\t".join(metric_cols) + "\n")
×
149
        for param in sweep_params:
×
150
            row = [f"{corr.loc[param, metric]:.3f}" for metric in metric_cols]
×
151
            f.write(f"{param}\t" + "\t".join(row) + "\n")
×
152

153
    print(f"Saved correlation values to {filename}")
×
154

155

156
def run_simulation(
2✔
157
    config: str,
158
    epr_rounds: int = 10,
159
    num_times: int = 10,
160
    classes: dict = None,
161
):
162
    """Runs a simulation with the given configuration and program classes.
163

164
    Args:
165
        config (str): Path to the network configuration YAML file.
166
        epr_rounds (int): Number of EPR rounds to execute in the simulation.
167
        num_times (int): Number of simulation repetitions.
168
        classes (dict): A dictionary mapping program names to their classes.
169

170
    Returns:
171
        dict: A dictionary containing simulation results for each node.
172
    """
173
    if classes is None or not classes:
2✔
174
        raise ValueError("At least one class must be provided.")
×
175

176
    # Load the network configuration.
177
    cfg = StackNetworkConfig.from_file(config)
2✔
178

179
    # Instantiate all program classes.
180
    programs = {
2✔
181
        name: cls(num_epr_rounds=epr_rounds) for name, cls in classes.items()
182
    }
183

184
    # Run the simulation with the provided configuration.
185
    results = run(
2✔
186
        config=cfg,
187
        programs=programs,
188
        num_times=num_times,
189
    )
190

191
    return results
2✔
192

193

194
def check_sweep_params_input(params: str, cfg: StackNetworkConfig) -> bool:
2✔
195
    """Check that all sweep parameters are found in the configuration.
196

197
    Args:
198
        params (str): Parameters to sweep.
199
        cfg (StackNetworkConfig): Network configuration.
200

201
    Returns:
202
        bool: True if all sweep parameters are found.
203

204
    Raises:
205
        ValueError: If one or more sweep parameters are missing in the config.
206
    """
207
    sweep_set = {param.strip() for param in params.split(",")}
×
208
    cfg_set = {token.strip("',:") for token in str(cfg).split()}
×
209

210
    missing = sweep_set - cfg_set
×
211
    if missing:
×
212
        raise ValueError("Please provide parameters that exist in the config.")
×
213
    return True
×
214

215

216
def update_cfg(cfg: StackNetworkConfig, map_param: dict) -> None:
2✔
217
    """
218
    Update the configuration for every link and stack with the
219
    given parameter values.
220

221
    Args:
222
        cfg (StackNetworkConfig): Network configuration.
223
        map_param (dict): Map parameter names to their values.
224
    """
225
    for param, value in map_param.items():
2✔
226
        for link in cfg.links:
2✔
227
            if getattr(link, "link_cfg", None) is not None:
2✔
NEW
228
                link.cfg[param] = value
×
229
        for stack in cfg.stacks:
2✔
230
            if getattr(stack, "qdevice_cfg", None) is not None:
2✔
231
                stack.qdevice_cfg[param] = value
2✔
232

233

234
# =============================================================================
235
# Plotting Functions
236
# =============================================================================
237
def plot_heatmap(
2✔
238
    ax,
239
    df: pd.DataFrame,
240
    p: str,
241
    q: str,
242
    metric: dict,
243
    params: dict,
244
    exp: str,
245
    rounds: int,
246
):
247
    """Plot an individual heatmap on a given axes.
248

249
    Args:
250
        ax: Matplotlib Axes object.
251
        df (pd.DataFrame): Dataframe containing experiment results.
252
        p (str): Name of the parameter for the y-axis.
253
        q (str): Name of the parameter for the x-axis.
254
        metric (dict): Dictionary with keys 'name', 'cmap', and 'file_label'.
255
        params (dict): Dictionary mapping parameters to their ranges.
256
        exp (str): Name of the experiment.
257
        rounds (int): Number of epr rounds.
258
    """
259
    pivot = df.pivot_table(index=p, columns=q, values=metric["name"])
×
260
    metric_matrix = pivot.values
×
261

262
    im = ax.imshow(
×
263
        metric_matrix,
264
        extent=[
265
            params[q][0],
266
            params[q][-1],  # X-axis
267
            params[p][0],
268
            params[p][-1],  # Y-axis
269
        ],
270
        origin="lower",
271
        aspect="auto",
272
        cmap=metric["cmap"],
273
    )
274

275
    cbar = ax.figure.colorbar(im, ax=ax)
×
276
    cbar.set_label(metric["name"], fontsize=14)
×
277

278
    var1 = truncate_param(q)
×
279
    var2 = truncate_param(p)
×
280

281
    ax.set_xlabel(var1, fontsize=14)
×
282
    ax.set_ylabel(var2, fontsize=14)
×
283

NEW
284
    title_suffix = f" with {rounds} hops" if exp == "pingpong" else ""
×
285
    ax.set_title(
×
286
        f"{exp.capitalize()}: {var2} vs {var1}{title_suffix}",
287
        fontsize=16,
288
        fontweight="bold",
289
    )
290

291

292
def save_single_heatmap(
2✔
293
    metric: dict,
294
    df: pd.DataFrame,
295
    pairs: list,
296
    params: dict,
297
    exp: str,
298
    rounds: int,
299
    dir: str,
300
) -> None:
301
    """Generate and save a heatmap for a single metric.
302

303
    Args:
304
        metric (dict): Metric details (keys: 'name', 'cmap', 'file_label').
305
        df (pd.DataFrame): DataFrame containing experiment results.
306
        pairs (list): List of parameter pairs generated from sweep_params.
307
        params (dict): Dictionary mapping parameters to their ranges .
308
        exp (str): Name of the experiment.
309
        rounds (int): Number of EPR rounds.
310
        dir (str): Directory to save the generated figures.
311
    """
NEW
312
    figsize = (12 * len(pairs), 8)
×
NEW
313
    fig, axes = plt.subplots(1, len(pairs), figsize=figsize)
×
314

315
    # Ensure axes is iterable.
NEW
316
    if len(pairs) == 1:
×
NEW
317
        axes = [axes]
×
318

NEW
319
    for ax, (p, q) in zip(axes, pairs):
×
NEW
320
        plot_heatmap(ax, df, p, q, metric, params, exp, rounds)
×
321

NEW
322
    plt.tight_layout()
×
NEW
323
    prefix = f"{exp}_heat_{metric['file_label']}"
×
NEW
324
    suffix = f"{(rounds + 1) // 2}" if exp == "pingpong" else ""
×
NEW
325
    filename = os.path.join(dir, f"{prefix}_{suffix}.png")
×
NEW
326
    plt.savefig(filename, dpi=300)
×
NEW
327
    plt.close(fig)
×
NEW
328
    print(f"Saved {metric['name']} heatmap to {filename}")
×
329

330

331
def save_combined_heatmaps(
2✔
332
    metrics: list,
333
    df: pd.DataFrame,
334
    pairs: list,
335
    params: dict,
336
    exp: str,
337
    rounds: int,
338
    dir: str,
339
) -> None:
340
    """Generate and save a combined figure with heatmaps for all metrics.
341

342
    Args:
343
           metrics (list): Metric details (keys: 'name', 'cmap', 'file_label').
344
           df (pd.DataFrame): DataFrame containing experiment results.
345
           pairs (list): List of parameter pairs generated from sweep_params.
346
           params (dict): Dictionary mapping parameters to their ranges.
347
           exp (str): Name of the experiment.
348
           rounds (int): Number of EPR rounds.
349
           dir (str): Directory to save the generated figures.
350
    """
NEW
351
    figsize = (12 * len(pairs), 8 * len(metrics))
×
NEW
352
    fig, axes = plt.subplots(len(metrics), len(pairs), figsize=figsize)
×
353

354
    # Ensure axes is 2D even when there is only one pair.
NEW
355
    if len(pairs) == 1:
×
NEW
356
        axes = np.array([axes]).reshape(len(metrics), 1)
×
357

NEW
358
    for i, metric in enumerate(metrics):
×
NEW
359
        for j, (p, q) in enumerate(pairs):
×
NEW
360
            plot_heatmap(axes[i, j], df, p, q, metric, params, exp, rounds)
×
361

NEW
362
    plt.tight_layout()
×
NEW
363
    filename = os.path.join(dir, f"{exp}_heatmaps.png")
×
NEW
364
    plt.savefig(filename, dpi=300)
×
NEW
365
    plt.close(fig)
×
NEW
366
    print(f"Saved combined heatmaps to {filename}")
×
367

368

369
def plot_combined_heatmaps(
2✔
370
    df: pd.DataFrame,
371
    sweep_params: list,
372
    params: dict,
373
    dir: str,
374
    exp: str,
375
    rounds: int,
376
    separate_files: bool = False,
377
):
378
    """Generates heatmaps from experiment results.
379

380
    If `separate_files` is True, separate figures will be created for each
381
    metric. Otherwise, a single combined figure is generated.
382

383
    Args:
384
        df (pd.DataFrame): DataFrame containing experiment results.
385
        sweep_params (list): List of swept parameters.
386
        params (dict): Dictionary mapping parameters to their ranges.
387
        dir (str): Directory to save the generated figures.
388
        exp (str): Name of the experiment.
389
        rounds (int): Number of EPR rounds.
390
        separate_files (bool, optional): Whether to generate separate files.
391
    """
392
    pairs = list(itertools.combinations(sweep_params, 2))
×
393
    metrics = [
×
394
        {
395
            "name": "Average Fidelity (%)",
396
            "cmap": "magma",
397
            "file_label": "fidelity"},
398
        {
399
            "name": "Average Simulation Time (ms)",
400
            "cmap": "viridis",
401
            "file_label": "sim_times",
402
        },
403
    ]
404

405
    if separate_files:
×
UNCOV
406
        for metric in metrics:
×
NEW
407
            save_single_heatmap(metric, df, pairs, params, exp, rounds, dir)
×
408
    else:
NEW
409
        save_combined_heatmaps(metrics, df, pairs, params, exp, rounds, dir)
×
410

411

412
# =============================================================================
413
# Gates
414
# =============================================================================
415
def toffoli(control1: Qubit, control2: Qubit, target: Qubit) -> None:
2✔
416
    """Performs a Toffoli gate with `control1` and `control2` as control qubits
417
    and `target` as target, using CNOTS, Ts and Hadamard gates.
418

419
    See https://en.wikipedia.org/wiki/Toffoli_gate
420

421
    Args:
422
        control1 (Qubit): First control qubit.
423
        control2 (Qubit): Second control qubit.
424
        target (Qubit): Target qubit.
425
    """
426
    target.H()
2✔
427
    control2.cnot(target)
2✔
428
    target.rot_Z(angle=-np.pi / 4)
2✔
429
    control1.cnot(target)
2✔
430
    target.rot_Z(angle=np.pi / 4)
2✔
431
    control2.cnot(target)
2✔
432
    target.rot_Z(angle=-np.pi / 4)
2✔
433
    control1.cnot(target)
2✔
434
    control2.rot_Z(angle=np.pi / 4)
2✔
435
    target.rot_Z(angle=np.pi / 4)
2✔
436
    target.H()
2✔
437
    control1.cnot(control2)
2✔
438
    control1.rot_Z(angle=np.pi / 4)
2✔
439
    control2.rot_Z(angle=-np.pi / 4)
2✔
440
    control1.cnot(control2)
2✔
441

442

443
def ccz(control1: Qubit, control2: Qubit, target: Qubit) -> None:
2✔
444
    """Performs a CCZ gate with `control1` and `control2` as control qubits
445
    and `target` as target, using Toffoli and Hadamard gates.
446

447
    Args:
448
        control1 (Qubit): First control qubit.
449
        control2 (Qubit): Second control qubit.
450
        target (Qubit): Target qubit.
451
    """
452
    target.H()
×
453
    toffoli(control1, control2, target)
×
454
    target.H()
×
455

456

457
# =============================================================================
458
# Routines
459
# =============================================================================
460
def pingpong_initiator(
2✔
461
    qubit: Qubit, context: ProgramContext, peer_name: str, num_rounds: int = 3
462
):
463
    """Executes the ping‐pong teleportation protocol for the initiator.
464

465
    In even rounds, the provided qubit is sent to the peer.
466
    In odd rounds, the initiator receives the qubit.
467
    The formal return is a generator and requires use of `yield from`
468
    in usage in order to function as intended.
469

470
    Args:
471
        qubit (Qubit): The qubit to be teleported.
472
        context (ProgramContext): Connection, csockets, and epr_sockets.
473
        peer_name (str): Name of the peer.
474
        num_rounds (int): Number of ping‐pong rounds.
475
    """
476
    if num_rounds % 2 == 0:
2✔
477
        raise ValueError("It must be odd for a complete ping-pong exchange.")
2✔
478

479
    for round_num in range(num_rounds):
2✔
480
        if round_num % 2 == 0:
2✔
481
            # Even round: send the qubit.
482
            yield from teleport_send(qubit, context, peer_name)
2✔
483
        else:
484
            # Odd round: receive a new qubit from the peer.
485
            qubit = yield from teleport_recv(context, peer_name)
2✔
486

487
    yield from context.connection.flush()
2✔
488

489

490
def pingpong_responder(
2✔
491
    context: ProgramContext, peer_name: str, num_rounds: int = 3
492
) -> Generator[None, None, Qubit]:
493
    """Executes the complementary ping‐pong teleportation protocol
494
    for the responder.
495

496
    The responder starts without a qubit and in the first (even) round
497
    receives one. In odd rounds he sends the qubit. After completing
498
    the rounds, Bob returns the final qubit he holds.
499
    The formal return is a generator and requires use of `yield from`
500
    in usage in order to function as intended.
501

502
    Args:
503
        context (ProgramContext): Connection, csockets, and epr_sockets.
504
        peer_name (str): Name of the peer.
505
        num_rounds (int): Number of ping‐pong rounds.
506

507
    Returns:
508
        Generator[None, None, Qubit]: The final teleported qubit.
509
    """
510
    if num_rounds % 2 == 0:
2✔
511
        raise ValueError("It must be odd for a complete ping-pong exchange.")
×
512

513
    qubit = None
2✔
514

515
    for round_num in range(num_rounds):
2✔
516
        if round_num % 2 == 0:
2✔
517
            # Even round: receive a qubit from the peer.
518
            qubit = yield from teleport_recv(context, peer_name)
2✔
519
        else:
520
            # Odd round: send the qubit to the peer.
521
            yield from teleport_send(qubit, context, peer_name)
2✔
522

523
    yield from context.connection.flush()
2✔
524

525
    return qubit
2✔
526

527

528
def distributed_n_qubit_controlled_u_control(
2✔
529
    context: ProgramContext, peer_name: str, ctrl_qubit: Qubit
530
) -> Generator[None, None, None]:
531
    """Performs the n-qubit controlled-U gate, but with one control qubit
532
    located on this node, the target on a remote node. The formal return is a
533
    generator and requires use of `yield from` in usage in order to function
534
    as intended.
535

536
    Args:
537
        context (ProgramContext): Context of the current program.
538
        peer_name (str): Name of the peer.
539
        ctrl_qubit (Qubit): The control qubit.
540
    """
541
    csocket = context.csockets[peer_name]
2✔
542
    epr_socket = context.epr_sockets[peer_name]
2✔
543
    connection = context.connection
2✔
544

545
    epr = epr_socket.create_keep()[0]
2✔
546
    ctrl_qubit.cnot(epr)
2✔
547
    epr_meas = epr.measure()
2✔
548
    yield from connection.flush()
2✔
549

550
    csocket.send(str(epr_meas))
2✔
551
    target_meas = yield from csocket.recv()
2✔
552
    if target_meas == "1":
2✔
553
        ctrl_qubit.Z()
2✔
554

555
    yield from connection.flush()
2✔
556

557

558
def distributed_n_qubit_controlled_u_target(
2✔
559
    context: ProgramContext,
560
    peer_names: List[str],
561
    target_qubit: Qubit,
562
    controlled_u: Callable[..., None],
563
):
564
    """Performs the n-qubit controlled-U gate, but with the target qubit
565
    located on this node, the controls on remote nodes. The formal return is
566
    a generator and requires use of `yield from` in usage in order to function
567
    as intended.
568

569
    Args:
570
        context (ProgramContext): Context of the current program.
571
        peer_names (List[str]): Name of the peer engaging.
572
        target_qubit (Qubit): The target qubit.
573
        controlled_u (Callable[..., None]): The n-qubits gate U with this
574
        signature `U(control_qubit_1, control_qubit_2, ..., target_qubit)`.
575
    """
576
    connection = context.connection
2✔
577

578
    epr_dict = {}
2✔
579
    for peer_name in peer_names:
2✔
580
        epr_dict[peer_name] = context.epr_sockets[peer_name].recv_keep()[0]
2✔
581
    yield from connection.flush()
2✔
582

583
    for peer_name, epr in epr_dict.items():
2✔
584
        m = yield from context.csockets[peer_name].recv()
2✔
585
        if m == "1":
2✔
586
            epr.X()
2✔
587

588
    epr_list = [epr_dict[peer_name] for peer_name in peer_names]
2✔
589
    controlled_u(*epr_list, target_qubit)
2✔
590

591
    epr_meas = {}
2✔
592
    for peer_name, epr in epr_dict.items():
2✔
593
        epr.H()
2✔
594
        epr_meas[peer_name] = epr.measure()
2✔
595
    yield from connection.flush()
2✔
596

597
    for peer_name in peer_names:
2✔
598
        context.csockets[peer_name].send(str(epr_meas[peer_name]))
2✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc