SweepMeasurementExecutor T1-Like Example¶
This notebook shows how to execute a T1-like qxschema.SweepMeasurementConfig with SweepMeasurementExecutor.
The sequence applies a pulse on Q00, waits for a swept delay, and then adds a readout pulse on RQ00.
The example uses a stubbed Measurement runtime so it can be read and tried without hardware access.
In [ ]:
Copied!
import numpy as np
import plotly.graph_objects as go
from qxcore import Frequency, Time
from qxschema import (
DataAcquisitionConfig,
FrequencyConfig,
ParameterSweepConfig,
ParameterSweepContent,
ParametricSequenceConfig,
ParametricSequencePulseCommand,
SweepMeasurementConfig,
)
from qubex.measurement import Measurement, SweepMeasurementExecutor
from qubex.measurement.models import (
CaptureData,
MeasurementConfig,
MeasurementResult,
MeasurementSchedule,
)
import numpy as np
import plotly.graph_objects as go
from qxcore import Frequency, Time
from qxschema import (
DataAcquisitionConfig,
FrequencyConfig,
ParameterSweepConfig,
ParameterSweepContent,
ParametricSequenceConfig,
ParametricSequencePulseCommand,
SweepMeasurementConfig,
)
from qubex.measurement import Measurement, SweepMeasurementExecutor
from qubex.measurement.models import (
CaptureData,
MeasurementConfig,
MeasurementResult,
MeasurementSchedule,
)
In [ ]:
Copied!
# `channel_to_frequency_reference` is currently unsupported and must be empty.
# `keep_oscillator_relative_phase` is currently unsupported and must remain True.
# This sequence is T1-like: excite `Q00` with a Gaussian pulse, wait for a swept delay,
# then read out on `RQ00` with a FlatTop pulse.
# The capture range is chosen to match the FlatTop pulse exactly.
control_duration = 16.0
control_sigma = 4.0
readout_duration = 24.0
readout_tau = 6.0
capture_delay = 0.0
capture_duration = readout_duration
wait_values = np.asarray([0.0, 16.0, 32.0, 64.0, 96.0, 160.0])
config = SweepMeasurementConfig(
channel_list=["Q00", "RQ00"],
sequence=ParametricSequenceConfig(
delta_time=Time(2.0, "ns"),
variable_list=["wait"],
command_list=[
ParametricSequencePulseCommand(
name="Gaussian",
channel_list=["Q00"],
argument_list=[control_duration, 1.0, control_sigma],
),
ParametricSequencePulseCommand(
name="Blank",
channel_list=["Q00"],
argument_list=["wait"],
),
ParametricSequencePulseCommand(
name="Blank",
channel_list=["RQ00"],
argument_list=[f"{control_duration} + wait"],
),
ParametricSequencePulseCommand(
name="FlatTop",
channel_list=["RQ00"],
argument_list=[readout_duration, 0.25, readout_tau],
),
],
),
frequency=FrequencyConfig(
channel_to_frequency={
"Q00": Frequency(5.0, "GHz"),
"RQ00": Frequency(6.5, "GHz"),
},
channel_to_frequency_reference={},
channel_to_frequency_shift={
"Q00": Frequency(0.0, "MHz"),
"RQ00": Frequency(0.0, "MHz"),
},
keep_oscillator_relative_phase=True,
),
data_acquisition=DataAcquisitionConfig(
shot_count=16,
shot_repetition_margin=Time(200.0, "ns"),
data_acquisition_duration=Time(capture_duration, "ns"),
data_acquisition_delay=Time(capture_delay, "ns"),
data_acquisition_timeout=Time(10.0, "ms"),
flag_average_waveform=True,
flag_average_shots=True,
delta_time=Time(2.0, "ns"),
channel_to_averaging_time={"RQ00": Time(capture_duration, "ns")},
channel_to_averaging_window={"RQ00": np.asarray([1.0 + 0.0j])},
),
sweep_parameter=ParameterSweepConfig(
sweep_content_list={
"wait_sweep": ParameterSweepContent(
category="sequence_variable",
sweep_target=["wait"],
value_list=wait_values,
)
},
sweep_axis=[["wait_sweep"]],
),
)
config.to_dict()
# `channel_to_frequency_reference` is currently unsupported and must be empty.
# `keep_oscillator_relative_phase` is currently unsupported and must remain True.
# This sequence is T1-like: excite `Q00` with a Gaussian pulse, wait for a swept delay,
# then read out on `RQ00` with a FlatTop pulse.
# The capture range is chosen to match the FlatTop pulse exactly.
control_duration = 16.0
control_sigma = 4.0
readout_duration = 24.0
readout_tau = 6.0
capture_delay = 0.0
capture_duration = readout_duration
wait_values = np.asarray([0.0, 16.0, 32.0, 64.0, 96.0, 160.0])
config = SweepMeasurementConfig(
channel_list=["Q00", "RQ00"],
sequence=ParametricSequenceConfig(
delta_time=Time(2.0, "ns"),
variable_list=["wait"],
command_list=[
ParametricSequencePulseCommand(
name="Gaussian",
channel_list=["Q00"],
argument_list=[control_duration, 1.0, control_sigma],
),
ParametricSequencePulseCommand(
name="Blank",
channel_list=["Q00"],
argument_list=["wait"],
),
ParametricSequencePulseCommand(
name="Blank",
channel_list=["RQ00"],
argument_list=[f"{control_duration} + wait"],
),
ParametricSequencePulseCommand(
name="FlatTop",
channel_list=["RQ00"],
argument_list=[readout_duration, 0.25, readout_tau],
),
],
),
frequency=FrequencyConfig(
channel_to_frequency={
"Q00": Frequency(5.0, "GHz"),
"RQ00": Frequency(6.5, "GHz"),
},
channel_to_frequency_reference={},
channel_to_frequency_shift={
"Q00": Frequency(0.0, "MHz"),
"RQ00": Frequency(0.0, "MHz"),
},
keep_oscillator_relative_phase=True,
),
data_acquisition=DataAcquisitionConfig(
shot_count=16,
shot_repetition_margin=Time(200.0, "ns"),
data_acquisition_duration=Time(capture_duration, "ns"),
data_acquisition_delay=Time(capture_delay, "ns"),
data_acquisition_timeout=Time(10.0, "ms"),
flag_average_waveform=True,
flag_average_shots=True,
delta_time=Time(2.0, "ns"),
channel_to_averaging_time={"RQ00": Time(capture_duration, "ns")},
channel_to_averaging_window={"RQ00": np.asarray([1.0 + 0.0j])},
),
sweep_parameter=ParameterSweepConfig(
sweep_content_list={
"wait_sweep": ParameterSweepContent(
category="sequence_variable",
sweep_target=["wait"],
value_list=wait_values,
)
},
sweep_axis=[["wait_sweep"]],
),
)
config.to_dict()
In [ ]:
Copied!
class DemoMeasurement(Measurement):
"""Stub runtime used to demonstrate executor flow without hardware."""
def __init__(self) -> None:
"""Initialize the stub measurement state."""
super().__init__(
chip_id="DEMO",
qubits=["Q00"],
load_configs=False,
connect_devices=False,
)
self.executed_schedules: list[MeasurementSchedule] = []
self._backend_controller = object()
@property
def targets(self) -> dict[str, object]:
"""Return the runtime targets exposed by the stub."""
return {"Q00": object(), "RQ00": object()}
@property
def backend_controller(self) -> object:
"""Return a non-null backend marker for executor validation."""
return self._backend_controller
def create_measurement_config(
self,
*,
n_shots: int | None = None,
shot_interval: float | None = None,
shot_averaging: bool | None = None,
time_integration: bool | None = None,
state_classification: bool | None = None,
) -> MeasurementConfig:
"""Build the runtime config used by the demo execution path."""
if (
n_shots is None
or shot_interval is None
or shot_averaging is None
or time_integration is None
or state_classification is None
):
raise ValueError("DemoMeasurement requires explicit config values.")
return MeasurementConfig(
n_shots=n_shots,
shot_interval=shot_interval,
shot_averaging=shot_averaging,
time_integration=time_integration,
state_classification=state_classification,
)
async def run_measurement(
self,
schedule: MeasurementSchedule,
*,
config: MeasurementConfig,
) -> MeasurementResult:
"""Return deterministic T1-like capture data for each sweep point."""
self.executed_schedules.append(schedule)
capture = schedule.capture_schedule.captures[0]
wait_time = capture.start_time - control_duration
decay = float(np.exp(-wait_time / 80.0))
iq_point = np.asarray(complex(decay, 0.08 * (1.0 - decay)))
return MeasurementResult(
data={
"RQ00": [
CaptureData.from_primary_data(
target="RQ00",
data=iq_point,
config=config,
sampling_period=2.0,
)
]
},
measurement_config=config,
device_config={"backend": "demo"},
)
class DemoMeasurement(Measurement):
"""Stub runtime used to demonstrate executor flow without hardware."""
def __init__(self) -> None:
"""Initialize the stub measurement state."""
super().__init__(
chip_id="DEMO",
qubits=["Q00"],
load_configs=False,
connect_devices=False,
)
self.executed_schedules: list[MeasurementSchedule] = []
self._backend_controller = object()
@property
def targets(self) -> dict[str, object]:
"""Return the runtime targets exposed by the stub."""
return {"Q00": object(), "RQ00": object()}
@property
def backend_controller(self) -> object:
"""Return a non-null backend marker for executor validation."""
return self._backend_controller
def create_measurement_config(
self,
*,
n_shots: int | None = None,
shot_interval: float | None = None,
shot_averaging: bool | None = None,
time_integration: bool | None = None,
state_classification: bool | None = None,
) -> MeasurementConfig:
"""Build the runtime config used by the demo execution path."""
if (
n_shots is None
or shot_interval is None
or shot_averaging is None
or time_integration is None
or state_classification is None
):
raise ValueError("DemoMeasurement requires explicit config values.")
return MeasurementConfig(
n_shots=n_shots,
shot_interval=shot_interval,
shot_averaging=shot_averaging,
time_integration=time_integration,
state_classification=state_classification,
)
async def run_measurement(
self,
schedule: MeasurementSchedule,
*,
config: MeasurementConfig,
) -> MeasurementResult:
"""Return deterministic T1-like capture data for each sweep point."""
self.executed_schedules.append(schedule)
capture = schedule.capture_schedule.captures[0]
wait_time = capture.start_time - control_duration
decay = float(np.exp(-wait_time / 80.0))
iq_point = np.asarray(complex(decay, 0.08 * (1.0 - decay)))
return MeasurementResult(
data={
"RQ00": [
CaptureData.from_primary_data(
target="RQ00",
data=iq_point,
config=config,
sampling_period=2.0,
)
]
},
measurement_config=config,
device_config={"backend": "demo"},
)
In [ ]:
Copied!
measurement = DemoMeasurement()
executor = SweepMeasurementExecutor(measurement=measurement)
result = await executor.run(config)
print("sweep_key_list:", result.sweep_key_list)
print("data_key_list:", result.data_key_list)
print("data_shape:", result.data_shape)
result.data
measurement = DemoMeasurement()
executor = SweepMeasurementExecutor(measurement=measurement)
result = await executor.run(config)
print("sweep_key_list:", result.sweep_key_list)
print("data_key_list:", result.data_key_list)
print("data_shape:", result.data_shape)
result.data
In [ ]:
Copied!
first_schedule = measurement.executed_schedules[0]
capture = first_schedule.capture_schedule.captures[0]
print("pulse duration:", first_schedule.pulse_schedule.duration)
print("capture channels:", capture.channels)
print("capture start:", capture.start_time)
print("capture duration:", capture.duration)
first_schedule = measurement.executed_schedules[0]
capture = first_schedule.capture_schedule.captures[0]
print("pulse duration:", first_schedule.pulse_schedule.duration)
print("capture channels:", capture.channels)
print("capture start:", capture.start_time)
print("capture duration:", capture.duration)
In [ ]:
Copied!
t1_signal = np.abs(np.squeeze(result.data, axis=1))
go.Figure(
data=[
go.Scatter(
x=wait_values,
y=t1_signal,
mode="lines+markers",
name="|IQ|",
)
],
layout=go.Layout(
title="T1-Like Sweep Result",
xaxis_title="Wait Time (ns)",
yaxis_title="Signal Magnitude (arb. units)",
),
)
t1_signal = np.abs(np.squeeze(result.data, axis=1))
go.Figure(
data=[
go.Scatter(
x=wait_values,
y=t1_signal,
mode="lines+markers",
name="|IQ|",
)
],
layout=go.Layout(
title="T1-Like Sweep Result",
xaxis_title="Wait Time (ns)",
yaxis_title="Signal Magnitude (arb. units)",
),
)
In [ ]:
Copied!
for index, schedule in enumerate(measurement.executed_schedules):
schedule.plot(title=f"Sweep Point {index} Measurement Schedule")
for index, schedule in enumerate(measurement.executed_schedules):
schedule.plot(title=f"Sweep Point {index} Measurement Schedule")
For a real session, replace DemoMeasurement with a configured Measurement instance.
In a notebook cell, use top-level await.
session = Measurement(
chip_id="64Qv3",
qubits=["Q00"],
config_dir="/path/to/config",
params_dir="/path/to/params",
)
session.connect()
executor = SweepMeasurementExecutor(measurement=session)
result = await executor.run(config)
In a Python script, use asyncio.run(...).
import asyncio
session = Measurement(
chip_id="64Qv3",
qubits=["Q00"],
config_dir="/path/to/config",
params_dir="/path/to/params",
)
session.connect()
executor = SweepMeasurementExecutor(measurement=session)
result = asyncio.run(executor.run(config))