QAOA with Server Side Execution¶
This section describes how to run QAOA with Server Side Execution feature (SSE). Suppose you can run QURI Parts riqu using the method described in Sampling on riqu server.
What is SSE?¶
Consider the case when you run a job that repeats classical and quantum programs, such as classical-quantum hybrid algorithms. If each quantum program waits in a job queue, the entire job takes a long time to complete.
SSE is a feature that reduces latency by executing Python programs that contain both classical and quantum programs at a location closer to the device than the job key.
In SSE, the device is occupied while the job is running.
Problem¶
In this tutorial, we use SSE to solve the Maxcut problem with QAOA for four vertices, which is introduced in Quantum Native Dojo.
QAOA Code¶
This code executes QAOA using QURI Parts. In this tutorial, this code is named qaoa.py. Details are explained after the code.
[ ]:
import traceback
import numpy as np
from scipy.optimize import minimize
from quri_parts.circuit import LinearMappedUnboundParametricQuantumCircuit
from quri_parts.core.estimator import QuantumEstimator
from quri_parts.core.estimator.sampling import create_sampling_estimator
from quri_parts.core.measurement import bitwise_commuting_pauli_measurement
from quri_parts.core.operator import Operator, pauli_label
from quri_parts.core.sampling import create_concurrent_sampler_from_sampling_backend
from quri_parts.core.sampling.shots_allocator import create_equipartition_shots_allocator
from quri_parts.core.state import quantum_state
# create estimator
def create_estimator(n_shots: int) -> QuantumEstimator:
from quri_parts.riqu.backend import RiquSamplingBackend
backend = RiquSamplingBackend()
return create_estimator_from_backend(backend, n_shots)
def create_estimator_from_backend(backend, n_shots: int):
sampler = create_concurrent_sampler_from_sampling_backend(backend)
allocator = create_equipartition_shots_allocator()
estimator = create_sampling_estimator(
n_shots, sampler, bitwise_commuting_pauli_measurement, allocator
)
return estimator
# A function to add U_C(gamma) to a circuit
def add_U_C(
n_vertices: int,
circuit: LinearMappedUnboundParametricQuantumCircuit,
gamma_idx: int,
) -> None:
gamma = circuit.add_parameter(f"gamma_{gamma_idx}")
for i in range(n_vertices):
j = (i + 1) % n_vertices
circuit.add_CNOT_gate(i, j)
## With QURI Parts, RZ(theta)=e^{-i*theta/2*Z}
circuit.add_ParametricRZ_gate(j, {gamma: 2})
circuit.add_CNOT_gate(i, j)
# A function to add U_X(beta) to a circuit
def add_U_X(
n_vertices: int,
circuit: LinearMappedUnboundParametricQuantumCircuit,
beta_idx: int,
) -> None:
beta = circuit.add_parameter(f"beta_{beta_idx}")
for i in range(n_vertices):
circuit.add_ParametricRX_gate(i, {beta: 2})
return circuit
def generate_parameter_order(n_layer: int):
array = [0] * (2 * n_layer)
for i in range(n_layer):
array[i * 2] = n_layer + i
array[i * 2 + 1] = i
return array
def cost_function(
n_vertices: int,
observable: Operator,
estimator: QuantumEstimator,
n_layers: int,
parameter_order: list[int],
):
def qaoa_function(
x: np.ndarray[float],
) -> float:
circuit = LinearMappedUnboundParametricQuantumCircuit(n_vertices)
## to create superposition, apply Hadamard gate
for i in range(n_vertices):
circuit.add_H_gate(i)
## apply U_C, U_X
for i in range(n_layers):
add_U_C(n_vertices, circuit, i)
add_U_X(n_vertices, circuit, i)
# Sorting the input x to x[[2, 0, 3, 1]] is for
# making the parameter order consistent with the
# circuit parameter order [gamma0, beta0, gamma1, beta1].
# You may check the circuit parameter order by running
# ```circuit.param_mapping.in_params````
bound_circuit = circuit.bind_parameters(x[parameter_order])
## prepare |beta, gamma>
state = quantum_state(n_vertices, circuit=bound_circuit)
return estimator(observable, state).value.real
return qaoa_function
try:
# setting
n_vertices = 4
n_layers = 2
n_shots = 10000
# initial parameter
x0 = np.array([0.1, 0.1, 0.2, 0.3])
## observable and cost function
cost_observable = Operator({pauli_label(f"Z{i} Z{(i+1) % n_vertices}"): 0.5 for i in range(n_vertices)})
estimator = create_estimator(n_shots)
parameter_order = generate_parameter_order(n_layers)
cost_fun = cost_function(n_vertices, cost_observable, estimator, n_layers, parameter_order)
## minimize with scipy.minimize
cost_history = [cost_fun(x0)]
result = minimize(
cost_fun,
x0,
method="COBYLA",
callback=lambda x: cost_history.append(cost_fun(x)),
options={"maxiter": 500},
)
print("QAOA Cost:", result.fun) # value after optimization
print("Cost History:", cost_history) # value after optimization
print("Optimized Parameter:", result.x) # (beta, gamma) after optimization
except Exception as e:
print("Exception:", e)
traceback.print_exc()
In this tutorial, the number of layers n_layers = 2. \(x_0 = [\gamma_0, \beta_0, \gamma_1, \beta_1]\) is the initial value of the rotation angle parameters to be embedded in the quantum circuit. The number of parameters in QAOA is 2 * n_layers, so if the number of layers is changed, \(x_0\) must also be changed.
To execute Python script with SSE, you must use RiquSamplingBackend in create_estimator function. You can rewrite create_estimator function to use any backend you like. For example, if you want to run using Qiskit, use QiskitSamplingBackend. You should first make sure it works on your PC before using RiquSamplingBackend.
The user must write the program in a single Python script. Libraries such as numpy are available.
In SSE, the QPU is occupied while the Python script is running. However, there is a defined execution time limit, and if the limit is exceeded, the script will be forced to terminate. Please contact the administrator of the Quantum Computing Cloud Service for the running time limit.
The following information can be obtained by the user after the SSE execution is complete.
countsandpropertiesof the last quantum circuit executedPython script’s output from
printfunction as a log file.
It is useful to print the history of QAOA cost values and the final cost value, as shown in the code above. The code also handles Exception and prints a stack trace for debugging if an error occurs at runtime.
Run SSE jobs¶
To execute qaoa.py with SSE, here is the code to execute on your PC.
[ ]:
from quri_parts.riqu.backend import RiquSseJob, RiquConfig
sse = RiquSseJob(RiquConfig.from_file("default"))
job = sse.run_sse("<path to qaoa.py>", remark="qaoa with sse")
job_id = job.id
print(f"job_id={job_id}")
try:
result = job.result()
print(f"counts of last circuit: {result.counts}")
except Exception as e:
print(e)
print(f"log file: {sse.download_log()}")
To run SSE, you need to instantiate RiquSseJob. To read authentication information from ~/.riqu, specify the section name in ~/.riqu as the argument of the RiquConfig.from_file function. If the RiquSseJob argument is omitted, the default section is read.
The argument of the run_sse function is the path to the Python script to be executed by SSE. The attribute remark of the job can be specified and remark is optional.
If you call result function after you call run_sse function, you will get the result after the job is completed. Although multiple quantum circuits may be executed in a single job in SSE, the result of the last executed quantum circuit is set to counts and properties.
Calling download_log function downloads the contents of the Python script’s print to the user’s PC as a zipped log file. The return value of download_log function is the path to the zipped file.
When download_log function is called, the contents of the Python script’s print is downloaded to the user’s PC as a zipped log file. The return value of the download_log function is the path to the zipped file.
How to get information about a job later¶
If you know job_id, you can retrieve information about SSE job at a later date. Execute the following code:
[ ]:
from quri_parts.riqu.backend import RiquSamplingBackend, RiquConfig
# retrieve job information
backend = RiquSamplingBackend(RiquConfig.from_file("default"))
job = backend.retrieve_job("<job id>")
result = job.result()
print(f"counts of last circuit: {result.counts}")
[ ]:
from quri_parts.riqu.backend import RiquSseJob, RiquConfig
# download log
sse = RiquSseJob(RiquConfig.from_file("default"))
log_file_path = sse.download_log("<job id>")
print(f"log file: {log_file_path}")