BlackBox Optimiztion of Traffic Light Control¶
Alleviating traffic congestion in urban areas is essential not only from an economic perspective but also from the perspective of environmental preservation. Especially in areas with highrise condominiums and large shopping malls, where many traffic lines concentrate, local traffic congestion can propagate to a wide area, so congestion mitigation measures must be comprehensive, considering the entire region.
This example program uses blackbox optimization with Fixstars Amplify to optimally control traffic signals to maximize the average speed of all vehicles traveling in the city. Although we evaluate the objective function (average vehicle speed) using a multiagent simulation (MAS) in this example, we can perform the optimization in the same way if the objective function were the average vehicle speed observed in actual traffic conditions or the number of vehicles entering a particular intersection per unit of time.
For basic knowledge of blackbox optimization with machine learning and quantum annealing/Ising machines used here, see "BlackBox Optimization with Quantum Annealing and Ising Machines". Also, as other applied model cases utilizing this blackbox optimization method, see:
 BlackBox Optimization of Operating Condition in a Chemical Reactor
 BlackBox Optimization of Airfoil Geometry by Fluid Flow Simulation
Also, please refer to QUBObased traffic light optimization for a similar approach to the signal control treated in this sample program, where we consider the signal control as a combinatorial optimization problem based on the QUBO formulation.
This sample program (optimal signal control by blackbox optimization) consists of the following:
 1. MAS for urban congestion
 1.1. What is MAS?
 1.2. Traffic congestion model
 1.3. MAS traffic simulator
 2. FMQA program implementation (integer input values)
 3. Optimization of traffic light control
 4. Summary and practical guidelines
*In this online demo and tutorial environment, the continuous execution time is limited to about 20 minutes. If you expect the execution time to exceed 20 minutes, for example, when trying optimization by changing conditions, please copy this sample program to your local environment before execution. In that case, download the following libraries related to the MAS traffic simulator as appropriate and save them in the following directory structure.
├ fmqa_4_traffic.ipynb
(this program)
└ utils/
├ __init__.py
(blank file)
└ traffic_model.py
1. MAS for urban congestion¶
1.1. What is MAS?¶
Multiagent simulation (MAS) is a simulation technique that mimics the behavior of multiple agents interacting with each other and may be employed to reproduce realworld complex systems and phenomena. In a simulation, each agent makes individual decisions and behaves in response to its surroundings. Complex interactions characterize the result as a whole.
MAS is used in a variety of fields. Examples include the following areas:

Social systems
MAS is used to study human behavior and social interactions and to understand the impact of policies and social systems. We can use MAS in economics, sociology, and political science. 
Transportation systems
Simulates the movement of vehicles and pedestrians within a transportation network to study measures to optimize traffic flow and reduce congestion. 
Natural Disasters
Natural disasters such as earthquakes, tsunamis, and volcanic eruptions are simulated and used to assess damage and study disaster prevention measures. 
Organizational Dynamics
Mimics the interactions of individuals and departments within a company or organization to analyze the impact of organizational efficiency and decisionmaking.
In this section, we explain MAS related to automobile traffic in cities and will perform actual traffic simulations.
1.2. Traffic congestion model¶
Each car must make individual decisions when simulating urban traffic (by cars) with MAS. In other words, the control is not from the viewpoint of an observer who can see the entire simulation but from the perspective of each car, i.e., the decision is based on the distance from the car in front (and, if necessary, behind). The key here is the following model, called the optimal speed model (Bando et al., Jpn. J. Ind. Appl. Math. (1994); Bando et al., Phys. Rev. E ( 1995)).
$$ \ddot x_i = a \{ V(\Delta x_i)  \dot x_i \} $$
$$ \Delta x_i = x_{i+1}  x_i $$
$$ v_i = \dot x_i $$
Here, $x_i$ and $v_i$ are the position and speed of the $i$th car (you), and $i+1$ is the index of the vehicle in front of you. Also, $V(\Delta x_i)$ denotes the optimal target speed at the current distance $\Delta x_i$ from the car in front of you. Finally, $a$ is the sensitivity, a parameter corresponding to the speed of the driver's reflexes. This parameter models the acceleration and deceleration by the driver to match the optimal rate determined from the distance between vehicles. Here, this example code uses the following model as the optimal speed.
$$ V(\Delta x_i)=\tanh(\Delta x_i2) + \tanh(2) $$
In this simulation, each car (driver) implements speed control based on these model equations and drives autonomously to the destination while observing signals along the route. Therefore, we expect that this simulation naturally reproduces traffic congestion caused directly or indirectly by the presence of large commercial facilities, etc.
1.3. MAS traffic simulator¶
We will now introduce the MASbased traffic simulator TrafficSimulator2Malls
that we
will
use here. The city considered in this simulator is a city with a simple shape consisting of
city_size
$\times$city_size
blocks, with roads in a grid pattern between the
blocks. All intersections have traffic lights, meaning we can control traffic signals installed at
city_size
$\times$city_size
intersections. The city has two popular shopping
malls, and we likely see heavy traffic locally around these malls. To simplify the problem, the
boundaries
surrounding the city are periodic (e.g., what goes out of the city from the left comes in from the
right).
Our sample code considers three control parameters for traffic signals at each intersection:
[red light length, green light length, phase]
(units are all in seconds). For example, if
you
use [15.0, 12.0, 5.0]
as control parameters for the traffic light at an intersection $i$,
then **5.0 seconds after the simulation start time, the northsouth signal at that intersection will
turn
red for the next 15.0 seconds, then green for 12.0 seconds, then red for 15.0 seconds, then green for
12.0
seconds, then red for 15.0 seconds, ... ** and so on.
Here, the input parameters to the traffic lights are real numbers. From the applicability of the blackbox optimization standpoint, we use integer indices corresponding to the real numbers as the simulation input. To understand the integer index, consider a realnumber parameter that can take values between 0.0 and 1.0, represented by six integer indices. The correspondence table between each integer index and the parameter value would be as follows.
Integer index value  0  1  2  3  4  5 

Parameter value  0.0  0.2  0.4  0.6  0.8  1.0 
The following defines the correspondence between integer indices and realnumber parameters in the
parameter generation class for signal control SignalController
. Here, in the default
configuration, the values taken by the three parameters for each traffic light are all between 1 and
20
seconds, which we represent by 20 integer indices. Running the following code cell will display the
corresponding table of integer indices considered and the corresponding parameter values. For example,
with the default settings, the parameter value corresponding to an integer index of 10 is 11.0
seconds.
%matplotlib widget
from utils.traffic_model import SignalController
# Specifies the size of the grid city. Cities are represented by a grid map of city_size*city_size
city_size = 3
# Instantiate the class to generate signal control parameters for a given city size
signal_controller = SignalController(city_size)
# Display the table of correspondence between integer indices and parameter values
signal_controller.show_index_table()
Now that we have instantiated the class that maps integer indices to realnumber parameters, we can
use
this class to create traffic light control parameters (real numbers). In the following, we generate
integer indices randomly using integer random numbers from 0 to 19. The number of integer indices we
need
to generate is the [number of parameters for each traffic light] $\times$ [number of intersections] =
3
$\times$ city_size
$\times$ city_size
.
Next, the generated integer indices are converted (decoded) to the corresponding realnumber
parameter
values. We perform this conversion using the decode()
method of the
SignalController
class. The argument disp_flag=True
of the
decode()
method can display the parameter value after decoding. Let's check what values we have had as randomly
chosen signal control parameters.
import numpy as np
# Generate [3 * city_size * city_size] integer indices ranging from 0 to 19 using integer random numbers
index_list = np.random.randint(0, 20, 3 * city_size * city_size)
# Convert the integer indices to corresponding parameter values (decode)
signal_info = signal_controller.decode(index_list=index_list, disp_flag=True)
Finally, we perform a traffic simulation using the MAS traffic simulator class
TrafficSimulator2Malls
, where traffic signals are controlled based on the generated
control
parameter signal_info
. When instantiating TrafficSimulator2Malls
, we set the
upper limit of the number of cars traveling in a grid city to 20 (num_cars=20
), with 50%
of
the vehicles having a destination in one of the two shopping malls (ratio_mall=0.5
). The
origin and destination of each car considered in the traffic simulation are determined using random
numbers, and the fourth argument, seed
, corresponds to the seed value of the random
numbers.
Here, during the simulation, all the cars go back and forth between (1) their "home," which is set by a random number, and (2) a randomly set location or a shopping mall. After arriving at the destination, they remain at that location for a certain period and are hidden from the road until the return trip begins. The cars displayed are colored to correspond to the route and make a round trip along the following path:
 Gray $\rightarrow$ Round trip between home and Shopping Mall 1
 Black $\rightarrow$ Round Trip between home and Shopping Mall 2
 White $\rightarrow$ Round trip between home and other location
The current destination of each car on the road is indicated by a corresponding colored circle, except when the destination is a shopping mall. Observe how each vehicle drives autonomously to its destination while obeying the traffic signals.
Also, although not an essential part of this optimization, all simulation results are displayed in SI units (m, s, m/s).
from utils.traffic_model import TrafficSimulator2Malls
# Instantiate simulator
traffic_sim = TrafficSimulator2Malls(
signal_info=signal_info, num_cars=20, ratio_mall=0.5, seed=0
)
# Run the simulation (display result in animation)
traffic_sim.run(steps=500, animation=True)
# Run simulation (output gif animation)
# *About 1 minute of processing time with the following argument settings.
# *Please try gif output in local environment.
# traffic_sim.run(steps=200, gif="./anim.gif")
2. FMQA program implementation (integer input values)¶
This section describes the program implementation of FMQA, much of which is the same as in "BlackBox Optimization with Quantum Annealing/Ising Machines". Please refer to that document for details.
Please note that the only difference is that the present blackbox optimization considers integer vectors as input values to the objective function instead of binary variables. For the handling of integer input values in FMQA, see "Onehot encoder/decoder implementation".
2.1. Random seed initialization¶
Define a function seed_everything
to initialize random seed values to ensure that the
initial training data and machine learning results do not change with each run.
import os
import torch
def seed_everything(seed=0):
os.environ["PYTHONHASHSEED"] = str(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.backends.cudnn.deterministic = True
2.2. Configuration of Amplify client¶
Here, we create an Amplify client and set the necessary parameters. In the following, we set the timeout for a single search by the Ising machine to 5 seconds.
from amplify import FixstarsClient
from datetime import timedelta
client = FixstarsClient()
client.parameters.timeout = timedelta(milliseconds=5000) # 5000 msec
# client.token = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" # If you use Amplify in a local environment or Google Colaboratory, enter your Amplify API token.
2.3. Implementing FM with PyTorch¶
This example code implements FM with PyTorch. In the TorchFM
class, we define the
acquisition function $g(x)$ as a machine learning model. Each term in $g(x)$ directly corresponds to
out_lin
, out_1
, out_2
, and out_inter
in the
TorchFM
class, as in the following equation.
$$ \begin{aligned} g(\boldsymbol{x}  \boldsymbol{w}, \boldsymbol{v}) &= \underset{\color{red}{\mathtt{out\_lin}}}{\underline{ w_0 + \sum_{i=1}^n w_i x_i} } + \underset{\color{red}{\mathtt{out\_inter}}}{\underline{\frac{1}{2} \left[\underset{\color{red}{\mathtt{out\_1}}}{\underline{ \sum_{f=1}^k\left(\sum_{i=1}^n v_{i f} x_i\right)^2 }}  \underset{\color{red}{\mathtt{out\_2}}}{\underline{ \sum_{f=1}^k\sum_{i=1}^n v_{i f}^2 x_i^2 }} \right] }} \end{aligned} $$
import torch.nn as nn
class TorchFM(nn.Module):
def __init__(self, d: int, k: int) > None:
super().__init__()
self.V = nn.Parameter(torch.randn(d, k), requires_grad=True)
self.lin = nn.Linear(d, 1)
def forward(self, x):
out_1 = torch.matmul(x, self.V).pow(2).sum(1, keepdim=True)
out_2 = torch.matmul(x.pow(2), self.V.pow(2)).sum(1, keepdim=True)
out_inter = 0.5 * (out_1  out_2)
out_lin = self.lin(x)
out = out_inter + out_lin
return out
Next, a function train()
is defined to train the FM based on the training data sets. As
in
general machine learning methods, this function divides the data sets into training and validation
data,
optimizes the FM parameters using the training data, and validates the model during training using the
validation data. The train()
function returns the model with the highest prediction
accuracy
for the validation data.
from torch.utils.data import TensorDataset, DataLoader
from sklearn.model_selection import train_test_split
from typing import Type
import copy
def train(
X: np.ndarray,
y: np.ndarray,
model_class: Type[nn.Module],
model_params: dict[str, int  float],
batch_size=1024,
epochs=3000,
criterion=nn.MSELoss(),
optimizer_class=torch.optim.AdamW,
opt_params={"lr": 1},
lr_sche_class=None,
lr_sche_params=None,
):
X_tensor, y_tensor = (
torch.from_numpy(X).float(),
torch.from_numpy(y).float(),
)
indices = np.array(range(X.shape[0]))
indices_train, indices_valid = train_test_split(
indices, test_size=0.2, random_state=42
)
train_set = TensorDataset(X_tensor[indices_train], y_tensor[indices_train])
valid_set = TensorDataset(X_tensor[indices_valid], y_tensor[indices_valid])
loaders = {
"train": DataLoader(train_set, batch_size=batch_size, shuffle=True),
"valid": DataLoader(valid_set, batch_size=batch_size, shuffle=False),
}
model = model_class(**model_params)
best_model_wts = copy.deepcopy(model.state_dict())
optimizer = optimizer_class(model.parameters(), **opt_params)
scheduler = None
if lr_sche_class is not None:
scheduler = lr_sche_class(optimizer, **lr_sche_params)
best_score = 1e18
for _ in range(epochs):
losses = {"train": 0.0, "valid": 0.0}
for phase in ["train", "valid"]:
if phase == "train":
model.train()
else:
model.eval()
for batch_x, batch_y in loaders[phase]:
optimizer.zero_grad()
out = model(batch_x).T[0]
loss = criterion(out, batch_y)
losses[phase] += loss.item() * batch_x.size(0)
with torch.set_grad_enabled(phase == "train"):
if phase == "train":
loss.backward()
optimizer.step()
losses[phase] /= len(loaders[phase].dataset)
with torch.no_grad():
model.eval()
if best_score > losses["valid"]:
best_model_wts = copy.deepcopy(model.state_dict())
best_score = losses["valid"]
if scheduler is not None:
scheduler.step()
with torch.no_grad():
model.load_state_dict(best_model_wts)
model.eval()
out = model(X_tensor).T[0]
coef = torch.corrcoef(torch.stack((out, y_tensor)))[0, 1].detach()
return model, coef.item()
2.4. Construction of initial training data¶
The gen_training_data
function evaluates the objective function $f(\boldsymbol{x})$
against
the input value $\boldsymbol{x}$ to produce $N_0$ inputoutput pairs (initial training data). We
can
determine the input value $\boldsymbol{x}$ in various ways, such as using a random number or a value
suitable for machine learning based on prior knowledge. You can also build up the training data from
the
results of previous experiments or simulations.
To handle nonbinary integer input values, we use the input_generator
function, which
generates an integer input vector with random numbers and converts it to a binary vector based on
onehot
encoding. The function input_generator
is defined in the onehot encoder/decoder class
EncDecOneHot
, which we will introduce later.
def gen_training_data_flow_onehot(D: int, N0: int, true_func, input_generator):
X = np.zeros((N0, D))
y = np.zeros(N0)
print("")
print("###################################################")
print(" Making initial training data")
print("###################################################")
for i in range(N0):
print(f"Generating training data set #{i}.")
# Randomly determine input vector consisting of (0 or 1)
x = input_generator()
# If the same input vector has already exist, regenerate the input values
is_identical = True
while is_identical:
is_identical = False
for j in range(i + 1):
if np.all(x == X[j, :]):
x = input_generator()
is_identical = True
break
# Evaluate input value x with objective function f(x) and copy inputoutput pair (x,f(x)) to training data
X[i, :] = x
y[i] = true_func(x)
print(f"")
return X, y
2.5. Execution class for FMQA cycle¶
FMQA.cycle()
executes an FMQA cycle run for $N−N_0$ times using the preprepared
initial
training data. FMQA.step()
is a function that performs only one FMQA cycle and is called
$N−N_0$ times by FMQA.cycle()
.
For FMQA with nonbinary variables, we need to encode each real/integer variable as a set of binary
variables. The FMQA_NB
class assumes onehot encoding. The encoder and decoder for the
nonbinary variables are specified in this class as encoder_decoder.encoder
and
encoder_decoder.decoder
, respectively. Also, nbins_array
is a list of the
total
(variable) number of bins used to encode each nonbinary variable.
from amplify import VariableGenerator, solve, one_hot, sum
import matplotlib.pyplot as plt
import sys
import time
# FMQA for nonbinary inputs
class FMQA_NB:
def __init__(self, D, N, N0, k, true_func, client, encoder_decoder) > None:
assert N0 < N
self.D = D
self.N = N
self.N0 = N0
self.k = k
self.true_func = true_func
self.client = client
self.nbins_array = encoder_decoder.nbins_array
self.encoder = encoder_decoder.encoder
self.decoder = encoder_decoder.decoder
self.y = None
# A member function that repeatedly performs (NN0)x FMQA based on the training data with adding new training data
def cycle(self, X, y, log: bool = False) > np.ndarray:
# Weight for onehot constraint
constraint_weight = max([1, int(np.abs(y).max() + 0.5)])
print("")
print("###################################################")
print(f" Starting FMQA cycles... {constraint_weight=}")
print("###################################################")
pred_x = X[0]
pred_y = 1e18
self.y = y
i_sta = 0
for i in range(self.N  self.N0):
print(f"FMQA Cycle #{i} ")
start_time = time.perf_counter()
try:
x_hat = self.step(X, y, constraint_weight)
except RuntimeError:
sys.exit(f"Unknown error, i = {i}")
# If an input value identical to the found x_hat already exists in the current training data set, a neighboring value is used as the new x_hat.
is_identical = True
while is_identical:
is_identical = False
for j in range(i + self.N0):
if np.all(x_hat == X[j, :]):
# Decode binary vector to integer vector and copy to inputs
inputs = self.decoder(x_hat)
# Get the surrounding values (inputs is an integer vector)
inputs += np.random.randint(1, 2, len(inputs))
for i_inp in range(len(inputs)):
if inputs[i_inp] < 0:
inputs[i_inp] = 0
elif inputs[i_inp] > self.nbins_array[i_inp]  1:
inputs[i_inp] = self.nbins_array[i_inp]  1
# Encode from integer vector to binary vector and copy to x_hat
x_hat = self.encoder(inputs)
if log:
print(f"{i=}, Identical x is found, {x_hat=}")
is_identical = True
break
# Evaluate the objective function f() with x_hat
y_hat = self.true_func(x_hat)
# Add the inputoutput pair [x_hat, y_hat] to the training data set
X = np.vstack((X, x_hat))
y = np.append(y, y_hat)
self.y = np.append(self.y, y_hat)
# Copy the inputoutput pair to [pred_x, pred_y] when the evaluated value of the objective function updates the minimum value
print(
f"FMQA Cycle #{i} finished in {time.perf_counter()  start_time:.2f}s",
end="",
)
if pred_y > y_hat:
pred_y = y_hat
pred_x = x_hat
print(f", variable updated, {pred_y=:.2f}")
else:
print()
print(f"")
return pred_x
# Member function to perform one FMQA cycle
def step(self, X, y, constraint_weight) > np.ndarray:
# Train FM
start_time = time.perf_counter()
model, corrcoef = train(
X,
y,
model_class=TorchFM,
model_params={"d": self.D, "k": self.k},
batch_size=8,
epochs=1000,
criterion=nn.MSELoss(),
optimizer_class=torch.optim.AdamW,
# Set scheduler to reduce learning rate with number of epochs
opt_params={"lr": 0.5},
lr_sche_class=torch.optim.lr_scheduler.StepLR,
lr_sche_params={"step_size": 50, "gamma": 0.9},
)
print(
f"FM Training: {time.perf_counter()  start_time:.2f}s (corr:{corrcoef:.2f})"
)
start_time = time.perf_counter()
# Extract FM parameters from the trained FM model
v, w, w0 = list(model.parameters())
v = v.detach().numpy()
w = w.detach().numpy()[0]
w0 = w0.detach().numpy()[0]
gen = VariableGenerator() # Instantiate Variable Generator
q = gen.array("Binary", self.D) # Generate a binary variable array
cost = self.__FM_as_QUBO(
q, w0, w, v
) # Define FM as a QUBO equation from FM parameters
# Construct onehot constraints for nonbinary variables
constraint_list = []
ista = 0
iend = 0
for i in range(len(self.nbins_array)):
iend += self.nbins_array[i]
constraint_list.append(one_hot(q[ista:iend]))
ista = iend
constraints = sum(constraint_list)
# Add up the objective function and constraints, and pass it to the Fixstars Amplify solver
model = cost + constraint_weight * constraints
result = solve(model, self.client)
print(
f"QUBO: {time.perf_counter()  start_time:.2f}s (AE execution: {result.execution_time.seconds:.2f}s)"
)
if len(result.solutions) == 0:
raise RuntimeError("No solution was found.")
q_values = q.evaluate(result.best.values)
return q_values
# A function that defines FM as a QUBO equation from FM parameters. As with the previously defined TorchFM class, the formula is written as per the acquisition function form of g(x).
def __FM_as_QUBO(self, x, w0, w, v):
lin = w0 + (x.T @ w)
out_1 = np.array([(x * v[:, i]).sum() ** 2 for i in range(self.k)]).sum()
# Note that x[j] = x[j]^2 since x[j] is a binary variable in the following equation.
out_2 = np.array([(x * v[:, i] * v[:, i]).sum() for i in range(self.k)]).sum()
return lin + (out_1  out_2) / 2
# A function to plot the history of ith objective function evaluations performed within the initial training data construction (blue) and during FMQA cycles (red).
def plot_history(self):
assert self.y is not None
fig = plt.figure(figsize=(6, 4))
plt.plot(
[i for i in range(self.N0)],
self.y[: self.N0],
marker="o",
linestyle="",
color="b",
) # Objective function evaluation values at the time of initial training data generation (random process)
plt.plot(
[i for i in range(self.N0, self.N)],
self.y[self.N0 :],
marker="o",
linestyle="",
color="r",
) # Objective function evaluation values during the FMQA cycles (FMQA cycle process)
plt.plot(
[i for i in range(self.N)],
[self.y[:i].min() for i in range(1, self.N + 1)],
linestyle="",
color="k",
) # Update history of minimum objective function values
plt.xlabel("ith evaluation of f(x)", fontsize=18)
plt.ylabel("f(x)", fontsize=18)
plt.tick_params(labelsize=18)
return fig
2.6. Onehot encoder/decoder implementation¶
As introduced in section 1.3, an integer index is required as input when using the
parameter generation class for signal control SignalController
. On the other hand, the
input
to QUBO must be a binary variable, so encoding from integer indices to binary variables is required.
In
this tutorial, we will use a onehot encoding. For example, in onehot encoding with a 4bit
representation of an integer that can take values from 1 to 4,
 1 is encoded to
[1, 0, 0, 0]
 2 is encoded to
[0, 1, 0, 0]
 3 is encoded to
[0, 0, 1, 0]
 4 is encoded to
[0, 0, 0, 1]
The following EncDecOneHot
class defines functions for converting integer index to
binary
variable and vice versa (encoder()
, decoder()
) and functions for generating
an
integer index input vector with random numbers and converting it to a binary variable matrix
gen_random_input()
.
class EncDecOneHot:
def __init__(self, D: int, nbins_array):
self.D = D
self.nbins_array = nbins_array
def gen_random_input(self):
ista = 0
iend = 0
x = np.zeros(self.D, int)
for i in range(len(self.nbins_array)):
iend += self.nbins_array[i]
idx = np.random.randint(ista, iend, 1)[0]
x[idx] = 1
ista = iend
return x
def encoder(self, inputs):
if len(inputs) != len(self.nbins_array):
raise RuntimeError("inputs should be the same length as nbins_array!")
x = np.zeros(self.D, int)
i_offset = 0
for i in range(len(inputs)):
x[inputs[i] + i_offset] = 1
i_offset += self.nbins_array[i]
return x
def decoder(self, x):
bit_locs = np.where(x == 1)[0]
if len(bit_locs) != len(self.nbins_array):
raise RuntimeError("bit_locs should be the same length as nbins_array!")
i_offset = 0
for i in range(1, len(bit_locs)):
i_offset += self.nbins_array[i  1]
bit_locs[i] = i_offset
return bit_locs
3. Optimization of traffic light control¶
3.1. FMQA execution example¶
Now, we use the blackbox optimization class FMQA
implemented in Section
2
to optimize the control of the traffic light network installed in the target city. Here, the objective
function is the negative value of the average speed of all vehicles predicted by the MAS traffic
simulator
(Section 1). Since the optimization proceeds to minimize the objective function
value, we
expect that the optimized traffic light control will maximize the average vehicle speed of all
vehicles.
We can obtain the average vehicle speed in the simulation after the simulation with
get_latest_mean()
. For each objective function evaluation during the optimization
process,
n_cases
simulation cases with randomly determined initial conditions (departure and
destination locations) are considered, and the objective function returns the average value as the
objective function value.
In the following code, due to the runtime limit in the demo/tutorial environment, the number of times that we can evaluate the objective function $N$ is 3, of which $N_0$ is 2 for generating the initial data (thus, a total of $NN_0=1$ optimization cycle).
Although the limited number of optimization cycles ($N  N_0 = 1$) may not always find a better solution than the initial data corresponding to a random search, you can see how the entire optimization progresses.
To perform optimization properly considering a sufficient number of cycles, please download the sample code and run optimization in a local environment after increasing the number of evaluations of the objective function $N$ (our demo/tutorial environment limits the runtime up to 20 minutes). For example, for $N=100$, you can achieve an average vehicle speed of about 8 m/s at the end of the optimization process. In this case, it takes about 23 hours for the entire optimization cycle to complete.
An example output for $N_0=100$ and a simulation animation comparing traffic simulation results with/without optimal traffic light control are shown in "3.4. Example of Execution with this Sample Code".
import time
# Initialize random seed value
seed_everything()
# Specify the size of the grid city, a [city_size*city_size] lattice map
city_size = 4
# Instantiate the class to generate signal control parameters for a given city size
signal_controller = SignalController(city_size)
# 1D array of bin sizes in onehot encoding for each variable
nbins_array = np.array(
[
[
signal_controller.nbins_red_duration,
signal_controller.nbins_green_duration,
signal_controller.nbins_phase,
]
for i in range(city_size**2)
]
).reshape(1)
# Overall size of binary variables (size of binary decision variables handled by QUBO)
D = nbins_array.sum()
print(f"Number of binary decision variables: {D}")
# Instantiate Onehot encoderdecoder class (EncDecOneHot)
enc_dec_one_hot = EncDecOneHot(D, nbins_array)
# Counter for the number of times the objective function is evaluated
n_eval = 0
num_cars = 100
ratio_mall = 0.5
# Objective function (obtain traffic light control parameters > run traffic simulation > calculate objective function value)
def obj_func(x):
global n_eval
# Convert from the solution (binary) to integer indices representing the traffic light control parameters (red period, green period, phase)
index_list = enc_dec_one_hot.decoder(x)
# In addition, the signal_controller.decode method is used to convert from integer indexes to signal control parameters (real numbers)
signal_info = signal_controller.decode(index_list)
# Run simulations with different initialization for n_cases times with the same signal control
n_cases = 5
costs = []
print("Cost convergence: ", end="")
start_time = time.perf_counter()
for i in range(n_cases):
# Instantiate TrafficSimulator2Malls (seed value to determine initial conditions is a random number)
traffic_sim = TrafficSimulator2Malls(
signal_info=signal_info,
num_cars=num_cars,
ratio_mall=ratio_mall,
seed=np.random.randint(0, 100),
)
# Run the simulation
traffic_sim.run(steps=20000)
# Get the negative value of the average speed of all cars in each simulation and append to costs
costs.append(traffic_sim.get_latest_mean())
print(f"{sum(costs) / (i + 1):.2f} ", end="")
# The average output value of the "n_cases" simulations is the final objective function value.
cost = sum(costs) / n_cases
print(
f"\nEvaluation #{n_eval} finished in {time.perf_counter()  start_time:.2f}s. cost:{cost:.2f}"
)
n_eval += 1
return cost
# N and N0 should be generally increased for meaningful optimization. See the description in Sec. 3.1 above.
N = 3 # Number of times the objective function can be evaluated
N0 = 2 # Number of samples in initial training data
k = 20 # Dimensions of vectors in FM (hyperparameters)
# Start time for optimization
start_time = time.perf_counter()
# Generate initial training data
X, y = gen_training_data_flow_onehot(D, N0, obj_func, enc_dec_one_hot.gen_random_input)
# Instantiate the FMQA class
fmqa_solver = FMQA_NB(
D=D,
N=N,
N0=N0,
k=k,
true_func=obj_func,
client=client,
encoder_decoder=enc_dec_one_hot,
)
# Execute FMQA cycles
pred_x = fmqa_solver.cycle(X, y)
# Output optimization results
print("###################################################")
print(" Optimal input values and corresponding output")
print("###################################################")
print(f"pred x: {pred_x}")
print(f"pred value: {obj_func(pred_x):.2f}")
print(f"Elapsed time: {time.perf_counter()  start_time:.2f}s")
3.2. Transition of objective function values during the optimization process¶
Below are the $N_0$ target function values obtained for randomly generated input values during initial training data generation and the evolution of target function values during the FMQA optimization process for $NN_0$ cycles. They are shown in blue and red, respectively. We explain an example output in "3.4. Example of Execution with this Sample Code". When you consider a large enough $N$, the plot shows how the minimum value of the objective function is successively updated by the input value $\hat{x}$ obtained by the FMQA optimization cycle.
fig = fmqa_solver.plot_history()
3.3. MAS traffic simulation with optimized traffic light settings¶
We rerun the traffic simulation with the optimal control parameters obtained from the blackbox optimization. An example of the simulation performed is shown in the animation in "3.3. FMQA Execution Example with this Sample Code" for an example of the simulation performed.
# Same condition as the simulation during optimization
city_size = 4
num_cars = 100
ratio_mall = 0.5
signal_controller = SignalController(city_size)
# Construct integer indices from optimization result pred_x
index_list = enc_dec_one_hot.decoder(pred_x)
print(f"{index_list=}")
# Convert integer indices to signal control parameters (real numbers)
signal_info = signal_controller.decode(index_list)
# Instantiate simulator
traffic_sim = TrafficSimulator2Malls(
signal_info=signal_info, num_cars=num_cars, ratio_mall=ratio_mall
)
# Run the traffic simulation with the optimized signal control
traffic_sim.run(2000, animation=True, interval=100)
3.4. Example output from this FMQA sample program¶
In general, the solutions obtained are not entirely reproducible due to the principle of the
heuristics
algorithm used in FixstarsClient
. Still, a typical standard output and image output
obtained
when executing this example code with $N=100, N_0=10$ are shown below. Note that the obtained values
may
differ.

If you execute the code described in "3.1. FMQA execution example" with "$N=100, N_0=10$", the following standard output will appear sequentially.

First, the results (cost) of the traffic simulation for the traffic lights controlled by random numbers generated during the initial training data generation are sequentially output ($N_0$ times).
Number of binary decision variables: 960 ################################################### Making initial training data ################################################### Generating training data set #0. Cost convergence: 2.40 3.41 3.32 3.53 3.95 Evaluation #0 finished in 52.88s. cost:3.95  Generating training data set #1. Cost convergence: 6.56 6.06 6.13 6.00 5.50 Evaluation #1 finished in 50.06s. cost:5.50  Generating training data set #2. Cost convergence: 6.19 6.58 6.59 6.56 6.49 Evaluation #2 finished in 47.57s. cost:6.49  Generating training data set #3. Cost convergence: 4.18 4.01 4.07 4.10 4.13 Evaluation #3 finished in 53.65s. cost:4.13  Generating training data set #4. Cost convergence: 3.79 4.32 4.19 3.73 3.90 Evaluation #4 finished in 55.38s. cost:3.90  Generating training data set #5. Cost convergence: 1.68 3.16 3.20 3.22 3.06 Evaluation #5 finished in 56.32s. cost:3.06  Generating training data set #6. Cost convergence: 6.94 5.61 5.75 5.71 5.90 Evaluation #6 finished in 50.48s. cost:5.90  Generating training data set #7. Cost convergence: 5.38 6.08 5.45 5.38 5.43 Evaluation #7 finished in 51.79s. cost:5.43  Generating training data set #8. Cost convergence: 3.43 3.91 4.12 4.55 4.85 Evaluation #8 finished in 50.39s. cost:4.85  Generating training data set #9. Cost convergence: 3.77 4.18 4.31 4.37 4.08 Evaluation #9 finished in 54.33s. cost:4.08 

After completing the initial training data construction, an $NN_0$ optimization cycle begins. The following output is sequentially displayed during the cycle for each optimization attempt.
################################################### Starting FMQA cycles... constraint_weight=6 ################################################### FMQA Cycle #0 FM Training: 1.55s (corr:0.61) QUBO: 11.52s (AE execution: 5.05s) Cost convergence: 3.95 3.91 3.59 3.63 3.58 Evaluation #10 finished in 51.21s. cost:3.58 FMQA Cycle #0 finished in 64.31s, variable updated, pred_y=3.58  FMQA Cycle #1 FM Training: 1.62s (corr:0.04) QUBO: 10.90s (AE execution: 5.00s) Cost convergence: 3.06 3.52 3.99 4.14 4.40 Evaluation #11 finished in 52.14s. cost:4.40 FMQA Cycle #1 finished in 64.69s, variable updated, pred_y=4.40  FMQA Cycle #2 FM Training: 2.53s (corr:0.73) QUBO: 10.34s (AE execution: 4.92s) Cost convergence: 6.94 5.87 5.73 5.74 6.05 Evaluation #12 finished in 48.41s. cost:6.05 FMQA Cycle #2 finished in 61.32s, variable updated, pred_y=6.05  FMQA Cycle #3 FM Training: 3.15s (corr:0.85) QUBO: 10.34s (AE execution: 4.93s) Cost convergence: 4.43 5.11 5.31 5.06 5.11 Evaluation #13 finished in 45.11s. cost:5.11 FMQA Cycle #3 finished in 58.63s  ・ ・ ・  FMQA Cycle #34 FM Training: 5.82s (corr:0.95) QUBO: 10.98s (AE execution: 4.97s) Cost convergence: 7.84 8.14 7.96 7.85 7.89 Evaluation #44 finished in 48.05s. cost:7.89 FMQA Cycle #34 finished in 64.88s  FMQA Cycle #35 FM Training: 9.23s (corr:0.95) QUBO: 14.44s (AE execution: 4.94s) Cost convergence: 7.47 7.20 6.80 6.60 6.76 Evaluation #45 finished in 59.24s. cost:6.76 FMQA Cycle #35 finished in 82.94s  FMQA Cycle #36 FM Training: 12.03s (corr:0.95) QUBO: 14.06s (AE execution: 4.99s) Cost convergence: 7.80 8.14 8.10 8.20 8.21 Evaluation #46 finished in 60.88s. cost:8.21 FMQA Cycle #36 finished in 87.03s, variable updated, pred_y=8.21  ・ ・ ・  FMQA Cycle #88 FM Training: 8.81s (corr:0.92) QUBO: 10.11s (AE execution: 4.95s) Cost convergence: 5.62 5.64 5.83 5.90 5.96 Evaluation #98 finished in 45.61s. cost:5.96 FMQA Cycle #88 finished in 64.57s  FMQA Cycle #89 FM Training: 8.72s (corr:0.94) QUBO: 11.70s (AE execution: 4.99s) Cost convergence: 5.34 5.79 6.10 5.93 6.14 Evaluation #99 finished in 44.31s. cost:6.14 FMQA Cycle #89 finished in 64.76s  ################################################### Optimal input values and corresponding output ################################################### pred x: [0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 2. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 3. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 4. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 5. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 6. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 7. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 8. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 9. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 10. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 11. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 12. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 13. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 14. 0. 1. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 15. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 16. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 17. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 18. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 19. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 20. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 21. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 22. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 23. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 24. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 25. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 26. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 27. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 28. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 29. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 30. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 31. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 32. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 33. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 34. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 35. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 36. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 37. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 38. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 39. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0.] Cost convergence: 7.43 7.56 7.83 7.85 7.91 Evaluation #100 finished in 45.37s. cost:7.91 pred value: 7.91 Elapsed time: 6319.51s

The output image of
fmqa_reactor.plot_history()
described in "3.2. Transition of objective function values" is as follows, showing how the minimum value of the objective function is updated one after another during the blackbox optimization process. For example, for a $N=100$ condition, the average vehicle speed with the traffic light control pattern obtained in the blackbox optimization is about 25% higher than the average speed with the best traffic light control obtained in the random search process (initial training data construction shown in blue). 
"3.3. MAS traffic simulation with optimized traffic light settings" displays an animation of the traffic simulation results when we use the optimized traffic signal control. The traffic simulation results with traffic light control based on (1) the final optimized results obtained in the optimization transition described above are shown below, along with (2) the simulation results based on the best traffic light control in the random search process. We consider the same initial conditions in both simulations.
Unlike the case of (1) traffic light control based on the final optimization result, (2) traffic light control based on the best result in the random search process yields several phenomena where relatively long traffic jams occur at intersections around shopping malls and, vehicles are prevented from entering to the intersection from intersections in the vicinity.

(1) Signal control based on final optimization results (average vehicle speed for all vehicles: 8.55 m/s)

