PulseSchedule to QuEL-3 Sequencer Flow¶
This notebook demonstrates how a PulseSchedule is converted into a QuEL-3 execution payload and then compiled into a quelware-client-compatible sequencer.
The flow is:
- Build a
PulseScheduleandMeasurementSchedule - Build
Quel3ExecutionPayloadwithQuel3MeasurementBackendAdapter - Build payload into sequencer events/waveforms with
Quel3SequencerBuilder - Visualize both
MeasurementScheduleand sequencer timelines with shared plotting helpers
In [ ]:
Copied!
from __future__ import annotations
from collections import defaultdict
from dataclasses import dataclass
from types import SimpleNamespace
from typing import Any, cast
import numpy as np
import qubex as qx
from qubex.backend.quel3 import Quel3ExecutionPayload, Quel3SequencerBuilder
from qubex.measurement.adapters import Quel3MeasurementBackendAdapter
from qubex.measurement.measurement_constraint_profile import (
MeasurementConstraintProfile,
)
from qubex.measurement.models.capture_schedule import Capture, CaptureSchedule
from qubex.measurement.models.measurement_config import MeasurementConfig
from qubex.measurement.models.measurement_schedule import MeasurementSchedule
from qubex.visualization import schedule_visualizer
from __future__ import annotations
from collections import defaultdict
from dataclasses import dataclass
from types import SimpleNamespace
from typing import Any, cast
import numpy as np
import qubex as qx
from qubex.backend.quel3 import Quel3ExecutionPayload, Quel3SequencerBuilder
from qubex.measurement.adapters import Quel3MeasurementBackendAdapter
from qubex.measurement.measurement_constraint_profile import (
MeasurementConstraintProfile,
)
from qubex.measurement.models.capture_schedule import Capture, CaptureSchedule
from qubex.measurement.models.measurement_config import MeasurementConfig
from qubex.measurement.models.measurement_schedule import MeasurementSchedule
from qubex.visualization import schedule_visualizer
In [ ]:
Copied!
# QuEL-3 default schedule dt
# NOTE: PulseSchedule currently assumes one common dt across channels.
SAMPLING_PERIOD = 0.4
# Apply notebook-wide default sampling period.
qx.pulse.set_sampling_period(SAMPLING_PERIOD)
# Build shape-equivalent control pulses (different only by scale/phase).
gaussian = qx.pulse.Gaussian(
duration=16.0,
amplitude=1.0,
sigma=4,
)
gaussian_shifted = gaussian.shifted(np.deg2rad(30.0))
# Build one readout waveform prototype with the same schedule dt.
flatop = qx.pulse.FlatTop(
duration=64.0,
amplitude=0.35,
tau=8.0,
)
pulse_schedule = qx.PulseSchedule()
with pulse_schedule as s:
s.add("Q00", qx.Blank(duration=4.0))
s.add("Q00", gaussian)
s.add("Q01", qx.Blank(duration=12.0))
s.add("Q01", gaussian_shifted)
s.barrier(["Q00", "RQ00"])
s.add("RQ00", flatop)
s.barrier()
s.add("Q00", qx.Blank(duration=4.0))
s.add("Q00", gaussian.scaled(0.5))
s.add("Q01", qx.Blank(duration=12.0))
s.add("Q01", gaussian_shifted.shifted(np.deg2rad(60.0)))
s.barrier(["Q00", "RQ00"])
s.add("RQ00", flatop)
s.barrier(["Q01", "RQ01"])
s.add("RQ01", flatop.shifted(np.deg2rad(30.0)))
print("pulse_schedule.duration =", pulse_schedule.duration)
print("pulse_schedule.labels =", pulse_schedule.labels)
pulse_schedule.plot()
# QuEL-3 default schedule dt
# NOTE: PulseSchedule currently assumes one common dt across channels.
SAMPLING_PERIOD = 0.4
# Apply notebook-wide default sampling period.
qx.pulse.set_sampling_period(SAMPLING_PERIOD)
# Build shape-equivalent control pulses (different only by scale/phase).
gaussian = qx.pulse.Gaussian(
duration=16.0,
amplitude=1.0,
sigma=4,
)
gaussian_shifted = gaussian.shifted(np.deg2rad(30.0))
# Build one readout waveform prototype with the same schedule dt.
flatop = qx.pulse.FlatTop(
duration=64.0,
amplitude=0.35,
tau=8.0,
)
pulse_schedule = qx.PulseSchedule()
with pulse_schedule as s:
s.add("Q00", qx.Blank(duration=4.0))
s.add("Q00", gaussian)
s.add("Q01", qx.Blank(duration=12.0))
s.add("Q01", gaussian_shifted)
s.barrier(["Q00", "RQ00"])
s.add("RQ00", flatop)
s.barrier()
s.add("Q00", qx.Blank(duration=4.0))
s.add("Q00", gaussian.scaled(0.5))
s.add("Q01", qx.Blank(duration=12.0))
s.add("Q01", gaussian_shifted.shifted(np.deg2rad(60.0)))
s.barrier(["Q00", "RQ00"])
s.add("RQ00", flatop)
s.barrier(["Q01", "RQ01"])
s.add("RQ01", flatop.shifted(np.deg2rad(30.0)))
print("pulse_schedule.duration =", pulse_schedule.duration)
print("pulse_schedule.labels =", pulse_schedule.labels)
pulse_schedule.plot()
In [ ]:
Copied!
blank_ranges = pulse_schedule.get_blank_ranges()
blank_ranges
blank_ranges = pulse_schedule.get_blank_ranges()
blank_ranges
In [ ]:
Copied!
pulse_ranges = pulse_schedule.get_pulse_ranges()
pulse_ranges
pulse_ranges = pulse_schedule.get_pulse_ranges()
pulse_ranges
In [ ]:
Copied!
readout_ranges = filtered = {k: pulse_ranges[k] for k in ("RQ00", "RQ01")}
readout_ranges
readout_ranges = filtered = {k: pulse_ranges[k] for k in ("RQ00", "RQ01")}
readout_ranges
In [ ]:
Copied!
captures = []
for target, readout_range in readout_ranges.items():
for readout_range in pulse_ranges[target]:
capture = Capture(
channels=[target],
start_time=readout_range.start * SAMPLING_PERIOD,
duration=(readout_range.stop - readout_range.start) * SAMPLING_PERIOD,
)
captures.append(capture)
measurement_schedule = MeasurementSchedule(
pulse_schedule=pulse_schedule,
capture_schedule=CaptureSchedule(captures=captures),
)
schedule_visualizer.plot_measurement_schedule(
measurement_schedule,
title="MeasurementSchedule",
)
captures = []
for target, readout_range in readout_ranges.items():
for readout_range in pulse_ranges[target]:
capture = Capture(
channels=[target],
start_time=readout_range.start * SAMPLING_PERIOD,
duration=(readout_range.stop - readout_range.start) * SAMPLING_PERIOD,
)
captures.append(capture)
measurement_schedule = MeasurementSchedule(
pulse_schedule=pulse_schedule,
capture_schedule=CaptureSchedule(captures=captures),
)
schedule_visualizer.plot_measurement_schedule(
measurement_schedule,
title="MeasurementSchedule",
)
In [ ]:
Copied!
class _BackendControllerStub:
DEFAULT_SAMPLING_PERIOD = SAMPLING_PERIOD
def __init__(self) -> None:
self._alias_map = {
"Q00": "ctrl-00",
"Q01": "ctrl-01",
"RQ00": "readout-00",
"RQ01": "readout-01",
}
def resolve_instrument_alias(self, target: str) -> str:
return self._alias_map.get(target, target)
class _ExperimentSystemStub:
@staticmethod
def get_awg_frequency(target: str) -> float:
return {
"Q00": 5.00e9,
"Q01": 5.10e9,
"RQ00": 6.80e9,
"RQ01": 6.90e9,
}[target]
measurement_config = MeasurementConfig(
mode="avg",
shots=1024,
interval=100.0,
frequencies={},
enable_dsp_demodulation=True,
enable_dsp_sum=False,
enable_dsp_classification=False,
line_param0=(1.0, 0.0, 0.0),
line_param1=(0.0, 1.0, 0.0),
)
adapter = Quel3MeasurementBackendAdapter(
backend_controller=_BackendControllerStub(),
experiment_system=cast(Any, _ExperimentSystemStub()),
constraint_profile=MeasurementConstraintProfile.quel3(
sampling_period_ns=SAMPLING_PERIOD
),
)
request = adapter.build_execution_request(
schedule=measurement_schedule,
config=measurement_config,
)
payload = request.payload
assert isinstance(payload, Quel3ExecutionPayload)
print(type(payload).__name__)
print(
"interval_ns=",
payload.interval_ns,
"repeats=",
payload.repeats,
"mode=",
payload.mode,
)
class _BackendControllerStub:
DEFAULT_SAMPLING_PERIOD = SAMPLING_PERIOD
def __init__(self) -> None:
self._alias_map = {
"Q00": "ctrl-00",
"Q01": "ctrl-01",
"RQ00": "readout-00",
"RQ01": "readout-01",
}
def resolve_instrument_alias(self, target: str) -> str:
return self._alias_map.get(target, target)
class _ExperimentSystemStub:
@staticmethod
def get_awg_frequency(target: str) -> float:
return {
"Q00": 5.00e9,
"Q01": 5.10e9,
"RQ00": 6.80e9,
"RQ01": 6.90e9,
}[target]
measurement_config = MeasurementConfig(
mode="avg",
shots=1024,
interval=100.0,
frequencies={},
enable_dsp_demodulation=True,
enable_dsp_sum=False,
enable_dsp_classification=False,
line_param0=(1.0, 0.0, 0.0),
line_param1=(0.0, 1.0, 0.0),
)
adapter = Quel3MeasurementBackendAdapter(
backend_controller=_BackendControllerStub(),
experiment_system=cast(Any, _ExperimentSystemStub()),
constraint_profile=MeasurementConstraintProfile.quel3(
sampling_period_ns=SAMPLING_PERIOD
),
)
request = adapter.build_execution_request(
schedule=measurement_schedule,
config=measurement_config,
)
payload = request.payload
assert isinstance(payload, Quel3ExecutionPayload)
print(type(payload).__name__)
print(
"interval_ns=",
payload.interval_ns,
"repeats=",
payload.repeats,
"mode=",
payload.mode,
)
In [ ]:
Copied!
payload.instrument_aliases
payload.instrument_aliases
In [ ]:
Copied!
for target, timeline in payload.timelines.items():
print(f"\n[{target}] alias={payload.instrument_aliases[target]}")
print(
f" timeline_dt={timeline.sampling_period_ns} ns "
f"length={timeline.length_ns} ns modulation_hz={timeline.modulation_frequency_hz}"
)
for idx, event in enumerate(timeline.events):
waveform_def = payload.waveform_library[event.waveform_name]
waveform_peak = float(np.max(np.abs(waveform_def.waveform)) * event.gain)
print(
f" event[{idx}] start={event.start_offset_ns:>4.1f} ns "
f"waveform={event.waveform_name} "
f"dt={waveform_def.sampling_period_ns} ns "
f"samples={len(waveform_def.waveform):>2d} "
f"gain={event.gain:.4f} "
f"phase={event.phase_offset_deg:.2f} deg "
f"peak={waveform_peak:.4f}"
)
for window in timeline.capture_windows:
print(
f" capture[{window.name}] start={window.start_offset_ns:.1f} ns "
f"length={window.length_ns:.1f} ns"
)
for target, timeline in payload.timelines.items():
print(f"\n[{target}] alias={payload.instrument_aliases[target]}")
print(
f" timeline_dt={timeline.sampling_period_ns} ns "
f"length={timeline.length_ns} ns modulation_hz={timeline.modulation_frequency_hz}"
)
for idx, event in enumerate(timeline.events):
waveform_def = payload.waveform_library[event.waveform_name]
waveform_peak = float(np.max(np.abs(waveform_def.waveform)) * event.gain)
print(
f" event[{idx}] start={event.start_offset_ns:>4.1f} ns "
f"waveform={event.waveform_name} "
f"dt={waveform_def.sampling_period_ns} ns "
f"samples={len(waveform_def.waveform):>2d} "
f"gain={event.gain:.4f} "
f"phase={event.phase_offset_deg:.2f} deg "
f"peak={waveform_peak:.4f}"
)
for window in timeline.capture_windows:
print(
f" capture[{window.name}] start={window.start_offset_ns:.1f} ns "
f"length={window.length_ns:.1f} ns"
)
In [ ]:
Copied!
# Inspect shape-level deduplication in the payload waveform library.
shape_to_waveform: dict[str, str] = {}
rows: list[tuple[str, int, str, str, str, str, str, str]] = []
for target in pulse_schedule.labels:
sequence = pulse_schedule.get_sequence(target, copy=False)
timeline = payload.timelines[target]
event_iter = iter(timeline.events)
pulse_index = 0
for waveform in sequence.get_flattened_waveforms(apply_frame_shifts=True):
if isinstance(waveform, qx.Blank):
continue
if not isinstance(waveform, qx.Pulse):
continue
shape_values = np.asarray(waveform.shape_values, dtype=np.complex128)
if (
shape_values.size == 0
or np.max(np.abs(shape_values)) <= 1e-12
or abs(float(waveform.scale)) <= 1e-12
):
continue
event = next(event_iter)
shape_hash = waveform.shape_hash
waveform_name = event.waveform_name
known_name = shape_to_waveform.get(shape_hash)
if known_name is None:
shape_to_waveform[shape_hash] = waveform_name
status = "new"
elif known_name == waveform_name:
status = "reused"
else:
status = f"mismatch(expected={known_name})"
rows.append(
(
target,
pulse_index,
waveform.__class__.__name__,
shape_hash[:10],
waveform_name,
f"{waveform.scale:.3f}",
f"{np.rad2deg(waveform.phase):.1f}",
status,
)
)
pulse_index += 1
remaining = list(event_iter)
if remaining:
print(f"[WARN] {target}: {len(remaining)} unmatched events")
print("unique shape hashes:", len(shape_to_waveform))
print("registered waveforms:", len(payload.waveform_library))
print("\nshape_hash -> waveform_name")
for shape_hash, waveform_name in sorted(shape_to_waveform.items(), key=lambda x: x[1]):
print(f" {shape_hash[:16]}... -> {waveform_name}")
print("\nper-pulse assignment")
print("target | idx | pulse | shape_hash | waveform_name | scale | phase_deg | status")
print("-|-|-|-|-|-|-|-")
for row in rows:
print(" | ".join(map(str, row)))
# Inspect shape-level deduplication in the payload waveform library.
shape_to_waveform: dict[str, str] = {}
rows: list[tuple[str, int, str, str, str, str, str, str]] = []
for target in pulse_schedule.labels:
sequence = pulse_schedule.get_sequence(target, copy=False)
timeline = payload.timelines[target]
event_iter = iter(timeline.events)
pulse_index = 0
for waveform in sequence.get_flattened_waveforms(apply_frame_shifts=True):
if isinstance(waveform, qx.Blank):
continue
if not isinstance(waveform, qx.Pulse):
continue
shape_values = np.asarray(waveform.shape_values, dtype=np.complex128)
if (
shape_values.size == 0
or np.max(np.abs(shape_values)) <= 1e-12
or abs(float(waveform.scale)) <= 1e-12
):
continue
event = next(event_iter)
shape_hash = waveform.shape_hash
waveform_name = event.waveform_name
known_name = shape_to_waveform.get(shape_hash)
if known_name is None:
shape_to_waveform[shape_hash] = waveform_name
status = "new"
elif known_name == waveform_name:
status = "reused"
else:
status = f"mismatch(expected={known_name})"
rows.append(
(
target,
pulse_index,
waveform.__class__.__name__,
shape_hash[:10],
waveform_name,
f"{waveform.scale:.3f}",
f"{np.rad2deg(waveform.phase):.1f}",
status,
)
)
pulse_index += 1
remaining = list(event_iter)
if remaining:
print(f"[WARN] {target}: {len(remaining)} unmatched events")
print("unique shape hashes:", len(shape_to_waveform))
print("registered waveforms:", len(payload.waveform_library))
print("\nshape_hash -> waveform_name")
for shape_hash, waveform_name in sorted(shape_to_waveform.items(), key=lambda x: x[1]):
print(f" {shape_hash[:16]}... -> {waveform_name}")
print("\nper-pulse assignment")
print("target | idx | pulse | shape_hash | waveform_name | scale | phase_deg | status")
print("-|-|-|-|-|-|-|-")
for row in rows:
print(" | ".join(map(str, row)))
In [ ]:
Copied!
@dataclass(frozen=True)
class _CompatEvent:
waveform_name: str
start_offset_ns: float
gain: float
phase_offset_deg: float
@dataclass(frozen=True)
class _CompatCaptureWindow:
name: str
start_offset_ns: float
length_ns: float
@dataclass(frozen=True)
class _CompatWaveform:
sampling_period_ns: float
iq_array: np.ndarray
class _CompatSequencer:
def __init__(self, default_sampling_period_ns: float):
self._waveform_library: dict[str, _CompatWaveform] = {}
self._alias_to_events: dict[str, list[_CompatEvent]] = defaultdict(list)
self._alias_to_capwin: dict[str, list[_CompatCaptureWindow]] = defaultdict(list)
self._length_ns = 0.0
self._default_sampling_period_ns = float(default_sampling_period_ns)
def register_waveform(
self, name: str, waveform, sampling_period_ns: float | None = None
):
dt = (
self._default_sampling_period_ns
if sampling_period_ns is None
else float(sampling_period_ns)
)
iq_array = np.asarray(waveform, dtype=np.complex128)
if np.any(np.abs(iq_array) > 1.0):
raise ValueError("The amplitude must be in the range -1 to 1.")
self._waveform_library[name] = _CompatWaveform(dt, iq_array)
def add_event(
self,
instrument_alias: str,
waveform_name: str,
start_offset_ns: float,
gain: float = 1.0,
phase_offset_deg: float = 0.0,
):
if waveform_name not in self._waveform_library:
raise ValueError(f"waveform '{waveform_name}' is not registered.")
self._alias_to_events[instrument_alias].append(
_CompatEvent(
waveform_name=waveform_name,
start_offset_ns=float(start_offset_ns),
gain=float(gain),
phase_offset_deg=float(phase_offset_deg),
)
)
waveform = self._waveform_library[waveform_name]
end_at_ns = (
float(start_offset_ns)
+ len(waveform.iq_array) * waveform.sampling_period_ns
)
self._length_ns = max(self._length_ns, end_at_ns)
def add_capture_window(
self,
instrument_alias: str,
window_name: str,
start_offset_ns: float,
length_ns: float,
):
self._alias_to_capwin[instrument_alias].append(
_CompatCaptureWindow(
name=window_name,
start_offset_ns=float(start_offset_ns),
length_ns=float(length_ns),
)
)
self._length_ns = max(
self._length_ns, float(start_offset_ns) + float(length_ns)
)
@property
def length_ns(self) -> float:
return self._length_ns
def export_set_fixed_timeline_directive(
self, instrument_alias: str, sampling_period_fs: int
):
name_to_index: dict[str, int] = {}
local_library = []
local_events = []
for event in self._alias_to_events[instrument_alias]:
if event.waveform_name in name_to_index:
index = name_to_index[event.waveform_name]
else:
waveform = self._waveform_library[event.waveform_name]
index = len(local_library)
name_to_index[event.waveform_name] = index
local_library.append(
SimpleNamespace(
sampling_period_fs=round(waveform.sampling_period_ns * 1e6),
iq_array=waveform.iq_array,
)
)
local_events.append(
SimpleNamespace(
waveform_index=index,
start_offset_samples=round(
event.start_offset_ns * 1e6 / sampling_period_fs
),
gain=event.gain,
phase_offset_deg=event.phase_offset_deg,
)
)
local_capture_windows = [
SimpleNamespace(
name=window.name,
start_offset_samples=round(
window.start_offset_ns * 1e6 / sampling_period_fs
),
length_samples=round(window.length_ns * 1e6 / sampling_period_fs),
)
for window in self._alias_to_capwin[instrument_alias]
]
return SimpleNamespace(
waveform_library=local_library,
events=local_events,
capture_windows=local_capture_windows,
length=round(self.length_ns * 1e6 / sampling_period_fs),
)
def _load_sequencer_class() -> tuple[type, str]:
try:
from quelware_client.client.helpers.sequencer import Sequencer
except Exception as exc:
return (
_CompatSequencer,
f"Using compatibility sequencer: {type(exc).__name__}: {exc}",
)
else:
return Sequencer, "Loaded Sequencer from installed quelware_client package."
SequencerClass, sequencer_note = _load_sequencer_class()
print(sequencer_note)
@dataclass(frozen=True)
class _CompatEvent:
waveform_name: str
start_offset_ns: float
gain: float
phase_offset_deg: float
@dataclass(frozen=True)
class _CompatCaptureWindow:
name: str
start_offset_ns: float
length_ns: float
@dataclass(frozen=True)
class _CompatWaveform:
sampling_period_ns: float
iq_array: np.ndarray
class _CompatSequencer:
def __init__(self, default_sampling_period_ns: float):
self._waveform_library: dict[str, _CompatWaveform] = {}
self._alias_to_events: dict[str, list[_CompatEvent]] = defaultdict(list)
self._alias_to_capwin: dict[str, list[_CompatCaptureWindow]] = defaultdict(list)
self._length_ns = 0.0
self._default_sampling_period_ns = float(default_sampling_period_ns)
def register_waveform(
self, name: str, waveform, sampling_period_ns: float | None = None
):
dt = (
self._default_sampling_period_ns
if sampling_period_ns is None
else float(sampling_period_ns)
)
iq_array = np.asarray(waveform, dtype=np.complex128)
if np.any(np.abs(iq_array) > 1.0):
raise ValueError("The amplitude must be in the range -1 to 1.")
self._waveform_library[name] = _CompatWaveform(dt, iq_array)
def add_event(
self,
instrument_alias: str,
waveform_name: str,
start_offset_ns: float,
gain: float = 1.0,
phase_offset_deg: float = 0.0,
):
if waveform_name not in self._waveform_library:
raise ValueError(f"waveform '{waveform_name}' is not registered.")
self._alias_to_events[instrument_alias].append(
_CompatEvent(
waveform_name=waveform_name,
start_offset_ns=float(start_offset_ns),
gain=float(gain),
phase_offset_deg=float(phase_offset_deg),
)
)
waveform = self._waveform_library[waveform_name]
end_at_ns = (
float(start_offset_ns)
+ len(waveform.iq_array) * waveform.sampling_period_ns
)
self._length_ns = max(self._length_ns, end_at_ns)
def add_capture_window(
self,
instrument_alias: str,
window_name: str,
start_offset_ns: float,
length_ns: float,
):
self._alias_to_capwin[instrument_alias].append(
_CompatCaptureWindow(
name=window_name,
start_offset_ns=float(start_offset_ns),
length_ns=float(length_ns),
)
)
self._length_ns = max(
self._length_ns, float(start_offset_ns) + float(length_ns)
)
@property
def length_ns(self) -> float:
return self._length_ns
def export_set_fixed_timeline_directive(
self, instrument_alias: str, sampling_period_fs: int
):
name_to_index: dict[str, int] = {}
local_library = []
local_events = []
for event in self._alias_to_events[instrument_alias]:
if event.waveform_name in name_to_index:
index = name_to_index[event.waveform_name]
else:
waveform = self._waveform_library[event.waveform_name]
index = len(local_library)
name_to_index[event.waveform_name] = index
local_library.append(
SimpleNamespace(
sampling_period_fs=round(waveform.sampling_period_ns * 1e6),
iq_array=waveform.iq_array,
)
)
local_events.append(
SimpleNamespace(
waveform_index=index,
start_offset_samples=round(
event.start_offset_ns * 1e6 / sampling_period_fs
),
gain=event.gain,
phase_offset_deg=event.phase_offset_deg,
)
)
local_capture_windows = [
SimpleNamespace(
name=window.name,
start_offset_samples=round(
window.start_offset_ns * 1e6 / sampling_period_fs
),
length_samples=round(window.length_ns * 1e6 / sampling_period_fs),
)
for window in self._alias_to_capwin[instrument_alias]
]
return SimpleNamespace(
waveform_library=local_library,
events=local_events,
capture_windows=local_capture_windows,
length=round(self.length_ns * 1e6 / sampling_period_fs),
)
def _load_sequencer_class() -> tuple[type, str]:
try:
from quelware_client.client.helpers.sequencer import Sequencer
except Exception as exc:
return (
_CompatSequencer,
f"Using compatibility sequencer: {type(exc).__name__}: {exc}",
)
else:
return Sequencer, "Loaded Sequencer from installed quelware_client package."
SequencerClass, sequencer_note = _load_sequencer_class()
print(sequencer_note)
In [ ]:
Copied!
builder = Quel3SequencerBuilder()
sequencer = builder.build(
payload=payload,
sequencer_factory=SequencerClass,
default_sampling_period_ns=SAMPLING_PERIOD,
)
schedule_visualizer.plot_sequencer_timeline(
sequencer,
title="Sequencer Timeline",
)
sequencer_state = vars(sequencer)
print("registered waveforms:", list(sequencer_state["_waveform_library"].keys()))
builder = Quel3SequencerBuilder()
sequencer = builder.build(
payload=payload,
sequencer_factory=SequencerClass,
default_sampling_period_ns=SAMPLING_PERIOD,
)
schedule_visualizer.plot_sequencer_timeline(
sequencer,
title="Sequencer Timeline",
)
sequencer_state = vars(sequencer)
print("registered waveforms:", list(sequencer_state["_waveform_library"].keys()))
In [ ]:
Copied!
sequencer_state = vars(sequencer)
ctrl_00_name = sequencer_state["_alias_to_events"]["ctrl-00"][0].waveform_name
ctrl_01_name = sequencer_state["_alias_to_events"]["ctrl-01"][0].waveform_name
print("shared waveform for ctrl-00 and ctrl-01:", ctrl_00_name == ctrl_01_name)
print("ctrl-00 waveform:", ctrl_00_name)
print("ctrl-01 waveform:", ctrl_01_name)
directive_ctrl_00 = sequencer.export_set_fixed_timeline_directive(
"ctrl-00",
sampling_period_fs=SAMPLING_PERIOD * 1e6,
)
directive_readout_00 = sequencer.export_set_fixed_timeline_directive(
"readout-00",
sampling_period_fs=SAMPLING_PERIOD * 1e6,
)
directive_readout_01 = sequencer.export_set_fixed_timeline_directive(
"readout-01",
sampling_period_fs=SAMPLING_PERIOD * 1e6,
)
print(
"ctrl-00 directive: events=",
len(directive_ctrl_00.events),
"waveforms=",
len(directive_ctrl_00.waveform_library),
"length_samples=",
directive_ctrl_00.length,
)
print(
"readout-00 directive: events=",
len(directive_readout_00.events),
"captures=",
len(directive_readout_00.capture_windows),
"length_samples=",
directive_readout_00.length,
)
print(
"readout-01 directive: events=",
len(directive_readout_01.events),
"captures=",
len(directive_readout_01.capture_windows),
"length_samples=",
directive_readout_01.length,
)
sequencer_state = vars(sequencer)
ctrl_00_name = sequencer_state["_alias_to_events"]["ctrl-00"][0].waveform_name
ctrl_01_name = sequencer_state["_alias_to_events"]["ctrl-01"][0].waveform_name
print("shared waveform for ctrl-00 and ctrl-01:", ctrl_00_name == ctrl_01_name)
print("ctrl-00 waveform:", ctrl_00_name)
print("ctrl-01 waveform:", ctrl_01_name)
directive_ctrl_00 = sequencer.export_set_fixed_timeline_directive(
"ctrl-00",
sampling_period_fs=SAMPLING_PERIOD * 1e6,
)
directive_readout_00 = sequencer.export_set_fixed_timeline_directive(
"readout-00",
sampling_period_fs=SAMPLING_PERIOD * 1e6,
)
directive_readout_01 = sequencer.export_set_fixed_timeline_directive(
"readout-01",
sampling_period_fs=SAMPLING_PERIOD * 1e6,
)
print(
"ctrl-00 directive: events=",
len(directive_ctrl_00.events),
"waveforms=",
len(directive_ctrl_00.waveform_library),
"length_samples=",
directive_ctrl_00.length,
)
print(
"readout-00 directive: events=",
len(directive_readout_00.events),
"captures=",
len(directive_readout_00.capture_windows),
"length_samples=",
directive_readout_00.length,
)
print(
"readout-01 directive: events=",
len(directive_readout_01.events),
"captures=",
len(directive_readout_01.capture_windows),
"length_samples=",
directive_readout_01.length,
)
What to check¶
- The control channels (
ctrl-00,ctrl-01) reuse the same waveform shape. - The shape-dedup inspection table shows
shape_hash -> waveform_namewithnew/reusedstatus. - Scale/phase differences are encoded in
add_event(..., gain, phase_offset_deg). PulseSchedulecurrently uses a common dt, so the waveform library entries usesampling_period_ns=0.4in this example.- Readout conversion is still demonstrated by exporting the readout directive with
sampling_period_fs=800_000. - Readout timelines are exported for both
readout-00(RQ00) andreadout-01(RQ01). - Multiple capture windows (
capture_0,capture_1) are present onRQ00, and one capture window (capture_0) is present onRQ01. - Capture windows are independent timeline entries and appear as separate lanes in the visualizer.