ブラックボックス最適化による信号制御最適化¶
都市部における渋滞の緩和は、時間・経済的な側面だけでなく、環境保全の観点からも重要であると考えられます。特に多くの世帯の動線が集中する高層マンション群や大型ショッピングモール等が立ち並ぶ地域では、局所的な交通渋滞の影響が幅広いエリアに伝播し得るため、渋滞緩和の施策は、地域全体を考慮し、包括的に実施する必要があります。
本サンプルコードでは、Fixstars Amplify によるブラックボックス最適化を用いて、都市を走行する全自動車の平均車速を最大化するような信号機の最適制御を実施します。最適化に際して、マルチ・エージェント・シミュレーション (MAS) による交通シミュレーションを用いて目的関数(平均車速)を評価しますが、実際の交通状況から観測される平均車速や特定の交差点への単位時間当たりの自動車進入台数を目的関数とした場合でも、同様に最適化が可能と考えられます。
本サンプルコードで用いる、機械学習と量子アニーリング・イジングマシンによるブラックボックス最適化の基本知識については、『量子アニーリング・イジングマシンによるブラックボックス最適化』をご覧ください。また、本ブラックボックス最適化手法を活用した他の応用的なモデルケースとして、
も紹介されていますので、ご覧ください。また、本サンプルプログラムで取り扱う信号制御と同様の取り組みとして、信号制御を QUBO 定式化に基づく組合せ最適化問題として考慮する場合のサンプルをこちらで紹介していますので、合わせてご覧ください。
本サンプルプログラム(ブラックボックス最適化による最適信号制御)は以下の構成となっています。
- 1. 都市渋滞の MAS
- 1.1. MAS とは
- 1.2. 交通渋滞モデル
- 1.3. MAS 交通シミュレータ
- 2. FMQA のプログラム実装(整数入力値)
- 2.1. 乱数の初期化
- 2.2. クライアントの設定
- 2.3. PyTorch による FM の実装
- 2.4. 初期教師データの作成
- 2.5. 最適化サイクルの実行クラス(非バイナリ決定変数)
- 2.6. One-hot エンコーダー・デコーダーの実装
- 3. ブラックボックス最適化による信号機制御の最適化
- 3.1. 最適化の実行
- 3.2. 最適化過程における目的関数値の推移
- 3.3. 最適化された信号機設定での交通シミュレーション
- 3.4. 本サンプルコードによる実行例
- 4. まとめ及び実運用の指針
※本オンラインデモ & チュートリアル環境では、連続実行時間が20分程度に制限されています。条件を変更して最適化を試される場合など、実行時間が20分を超えることが想定される場合、本サンプルプログラムをご自身の環境にコピーした上で実行してください。その場合、MAS 交通シミュレータに関する以下のライブラリを適宜ダウンロードし、次のようなディレクトリ構成で保存してください。
├ fmqa_4_traffic.ipynb(本サンプルプログラム)
└ utils/
├ __init__.py
(空ファイル)
└ traffic_model.py
1. 都市渋滞の MAS¶
1.1. MASとは¶
マルチ・エージェント・シミュレーション (MAS) は、複数のエージェントが相互に影響しながら振る舞う様子を模倣するシミュレーション手法であり、現実世界のシステムや現象を再現するために採用されることがあります。シミュレーションでは、各エージェントは個別の意思決定を行い、周囲の環境に反応して振る舞います。その結果、全体として複雑な相互作用が生じることが特徴です。
MAS は、さまざまな分野で利用されています。例えば、次のような分野が挙げられます。
-
社会システム
人間の行動や社会の相互作用を研究し、政策や社会システムの影響を理解するために使用されます。経済学、社会学、政治学などの分野で利用されることがあります。 -
交通システム
車両や歩行者などの交通ネットワーク内での動きをシミュレートし、交通流の最適化や混雑の緩和策を検討します。 -
自然災害
地震、津波、火山噴火などの自然災害をシミュレートして、被害の評価や防災対策の検討に活用されます。 -
組織内動態
企業や組織内の個人や部門の相互作用を模倣して、組織の効率性や意思決定の影響を分析します。
本節では、都市における自動車交通に関する MAS について簡単に解説し、実際に交通シミュレーションを実施します。
1.2. 交通渋滞モデル¶
MAS により都市の(自動車による)交通をシミュレーションを実施する場合、個々の自動車は個別の意思決定を行う必要があります。つまり、シミュレーション全体を見渡せる観測者の視点での制御ではなく、個々の自動車の視点による制御、つまり前(と必要により後ろ)の車との距離に応じた意思決定となります。ここで重要となるのが最適速度モデルと呼ばれる次のモデルです (Bando et al., Jpn. J. Ind. Appl. Math. (1994); Bando et al., Phys. Rev. E (1995)) 。
$$ \ddot x_i = a \{ V(\Delta x_i) - \dot x_i \} $$
$$ \Delta x_i = x_{i+1} - x_i $$
$$ v_i = \dot x_i $$
ここで、$x_i$ と $v_i$ は $i$ 番目の自動車の位置と速度であり、$i+1$ は自分の前の車の番号を示します。また、$V(\Delta x_i)$ は、現在の前の自動車との車間距離 $\Delta x_i$ における最適目標速度を示します。最後に、$a$ は感応度であり、運転手の反射の速さに相当するパラメータとなります。これにより、車間距離から決まる最適速度に合わせるようなドライバーによる加減速をモデル化します。ここで、本サンプルコードでは、最適速度として次のようなモデルを用います。
$$ V(\Delta x_i)=\tanh(\Delta x_i-2) + \tanh(2) $$
本シミュレーションでは、これらのモデル式に基づく速度制御を個々の自動車(ドライバー)が個別に実施し、経路途中の信号を守りながら、目的地まで自律的に走行します。従って、本シミュレーションでは、大型商業施設等の存在に直接的・間接的に起因する渋滞を自然に再現することが可能と考えられます。
1.3. MAS 交通シミュレータ¶
それでは今回用いる MAS に基づく交通シミュレータ TrafficSimulator2Malls
について紹介します。本シミュレータで考慮するのは、city_size
$\times$city_size
個のブロックから構成される簡易的な形状を有する都市であり、ブロック間に格子状に道路が存在します。また、全ての交差点に信号機が設置されています。つまり、都市全体で、city_size
$\times$city_size
箇所の交差点に設置された信号機を制御することが可能です。都市の中には 2
つのショッピングモールが存在し、局所的に交通が集中しやすい状況を模しています。問題の簡略化のために、都市を取り囲む境界は周期境界(例:都市の左から出たものは右から入ってくる)としています。
今回のサンプルコードでは、各交差点における信号機の制御パラメータとして、[赤信号の長さ, 青信号の長さ, 位相]
(単位はいずれも秒)の3つのパラメータを考慮します。例えば、ある交差点
$i$ の信号機に対して、制御パラメータとして [15.0, 12.0, 5.0]
を入力すれば、シミュレーション開始時刻から5.0秒後に当該交差点に設置の南北方向の信号が赤となり、その後15.0秒間同信号は赤、その後12.0秒間は青、その後15秒間赤、・・・
という風に信号機は制御されます。
ここで、信号機への入力パラメータは実数ですが、ブラックボックス最適化への適用性の観点から、整数インデックスを用いて入力します。整数インデックスとは、例えば、0.0 から 1.0 の間の値を取り得る実数パラメータを、6個の整数インデックスで表すことを考えます。各整数インデックスと、それに対応するパラメータ値との対応表は、次のようになるでしょう。
整数インデックス値 | 0 | 1 | 2 | 3 | 4 | 5 |
---|---|---|---|---|---|---|
パラメータ値 | 0.0 | 0.2 | 0.4 | 0.6 | 0.8 | 1.0 |
以下では、このような整数インデックスと実数パラメータとの対応を、信号機制御パラメータ生成クラス SignalController
で定義します。ここで、デフォルト設定では、各信号機の3つのパラメータが取る値は、いずれも
1~20秒であり、これを20個の整数インデックスで表現します。以下のコードセルを実行すると、考慮される整数インデックスと、対応するパラメータ値の対応表が表示されます。例えばデフォルト設定では、整数インデックス
10 に対応するパラメータ値は 11.0 秒となります。
%matplotlib widget
from utils.traffic_model import SignalController
# 碁盤目状都市の大きさを指定。都市は city_size*city_size の格子状のマップで表現される
city_size = 3
# 与えられた都市の大きさに対応した信号機制御パラメータ生成クラスのインスタンス化
signal_controller = SignalController(city_size)
# 整数インデックスとパラメータ値の対応表の表示
signal_controller.show_index_table()
整数インデックスと実数パラメータとの対応クラスをインスタンス化したので、実際に本クラスを使って、信号機制御パラメータを作成します。以下では、0~19までの整数乱数を用い、ランダムに整数インデックスを生成します。生成する整数インデックスの数は、各信号機のパラメータ数
$\times$ 交差点の数 = 3
$\times$ city_size
$\times$ city_size
となります。
次に、生成した整数インデックスを対応する実数パラメータ値に変換(デコード)します。これには、SignalController
クラスの
decode()
メソッドを用います。decode()
メソッドの引数 disp_flag=True
により、デコード後のパラメータ値を表示することができます。どの様な値を信号機制御パラメータとして設定されるのか確認してみましょう。
import numpy as np
# 整数乱数により、0 から 19 までの整数インデックスを 3 * city_size * city_size 個生成
index_list = np.random.randint(0, 20, 3 * city_size * city_size)
# 整数インデックスを対応するパラメータ値に変換(デコード)し、制御パラメータを生成
signal_info = signal_controller.decode(index_list=index_list, disp_flag=True)
最後に、生成した制御パラメータ signal_info
に基づき信号機を制御した場合の交通シミュレーションを、MAS 交通シミュレータクラス
TrafficSimulator2Malls
を用いて実施します。TrafficSimulator2Malls
のインスタンス化に際して、碁盤目状都市内を走行する自動車の台数の上限を 20 台とし(num_cars=20
)、そのうち 50%
の自動車が2つのショッピングモールのいずれかを目的地とする(ratio_mall=0.5
)ような条件を考慮します。交通シミュレーションで考慮される各自動車の出発地及び目的地は乱数を用いて決定されますが、4番目の引数、seed
はその乱数のシード値に相当します。
ここで、シミュレーション中、全ての自動車は、①乱数により設定された自宅と②ランダムに設定された場所又はショッピングモールの 2 点間を往復します。また、目的地に到着後は、一定時間その場所に留まり、復路が開始するまで道路からは非表示となります。表示される自動車は、経路に対応する色で表示されており、次のような経路を往復します。
- 灰色 $\rightarrow$ 自宅とショッピングモール 1 を往復
- 黒色 $\rightarrow$ 自宅とショッピングモール 2 を往復
- 白色 $\rightarrow$ 自宅とその他の場所を往復
目的地がショッピングモールの場合を除いて、道路を走行中の各自動車の現在の目的地は、対応する色の〇で表示されています。各自動車が、信号を守りながら目的地へ自律的に走行している様子を確認してください。
また、本最適化の本質ではないですが、シミュレーション結果は全てSI単位 (m, s, m/s) で表示されます。
from utils.traffic_model import TrafficSimulator2Malls
# シミュレータのインスタンス化
traffic_sim = TrafficSimulator2Malls(
signal_info=signal_info, num_cars=20, ratio_mall=0.5, seed=0
)
# シミュレーションの実行(アニメーション表示)
traffic_sim.run(steps=500, animation=True)
# シミュレーションの実行(gif アニメーションの出力)
# ※下記引数の設定でおよそ1分程度の処理時間
# ※gif出力はローカル環境でお試しください。
# traffic_sim.run(steps=200, gif="./anim.gif")
2. FMQA のプログラム実装(整数入力値)¶
ここでは、FMQA のプログラム実装を行います。FMQA 部分の実装の多くは、『量子アニーリング・イジングマシンによるブラックボックス最適化』と同様ですので、詳細はそちらの解説をご覧ください。
今回の FMQA では、バイナリ変数ではなく、整数ベクトルを目的関数への入力値として考慮している、という相違点があることにご注意ください。FMQA における整数入力値の取り扱いは、『2.6. One-hot エンコーダー・デコーダーの実装』をご覧ください。
import os
import torch
def seed_everything(seed=0):
os.environ["PYTHONHASHSEED"] = str(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.backends.cudnn.deterministic = True
from amplify import FixstarsClient
from datetime import timedelta
client = FixstarsClient()
client.parameters.timeout = timedelta(milliseconds=5000) # 5000 ミリ秒
# client.token = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx" # ローカル環境等で使用する場合は、Amplify AEのアクセストークンを入力してください。
2.3. PyTorch による FM の実装¶
FM の学習と推論を PyTorch で行います。TorchFM
クラスでは、機械学習モデルとしての獲得関数 $g(\boldsymbol{x})$
を定義します。下式の通り、$g(\boldsymbol{x})$ 内の各項は、TorchFM
クラス内の
out_lin
、out_1
、out_2
、out_inter
に直接対応します。
$$ \begin{aligned} g(\boldsymbol{x} | \boldsymbol{w}, \boldsymbol{v}) &= \underset{\color{red}{\mathtt{out\_lin}}}{\underline{ w_0 + \sum_{i=1}^n w_i x_i} } + \underset{\color{red}{\mathtt{out\_inter}}}{\underline{\frac{1}{2} \left[\underset{\color{red}{\mathtt{out\_1}}}{\underline{ \sum_{f=1}^k\left(\sum_{i=1}^n v_{i f} x_i\right)^2 }} - \underset{\color{red}{\mathtt{out\_2}}}{\underline{ \sum_{f=1}^k\sum_{i=1}^n v_{i f}^2 x_i^2 }} \right] }} \end{aligned} $$
import torch.nn as nn
class TorchFM(nn.Module):
def __init__(self, d: int, k: int) -> None:
super().__init__()
self.V = nn.Parameter(torch.randn(d, k), requires_grad=True)
self.lin = nn.Linear(d, 1)
def forward(self, x):
out_1 = torch.matmul(x, self.V).pow(2).sum(1, keepdim=True)
out_2 = torch.matmul(x.pow(2), self.V.pow(2)).sum(1, keepdim=True)
out_inter = 0.5 * (out_1 - out_2)
out_lin = self.lin(x)
out = out_inter + out_lin
return out
次に、入出力データから FM を機械学習する関数 train
を定義します。一般的な機械学習と同様に、教師データを学習データと検証データに分割し、学習データを用いてパラメータの最適化、検証データを用いて学習中のモデル検証を行います。train
関数は、検証データに対して最も予測精度の高かったモデルを返します。
from torch.utils.data import TensorDataset, DataLoader
from sklearn.model_selection import train_test_split
from typing import Type
import copy
def train(
X: np.ndarray,
y: np.ndarray,
model_class: Type[nn.Module],
model_params: dict[str, int | float],
batch_size=1024,
epochs=3000,
criterion=nn.MSELoss(),
optimizer_class=torch.optim.AdamW,
opt_params={"lr": 1},
lr_sche_class=None,
lr_sche_params=None,
):
X_tensor, y_tensor = (
torch.from_numpy(X).float(),
torch.from_numpy(y).float(),
)
indices = np.array(range(X.shape[0]))
indices_train, indices_valid = train_test_split(
indices, test_size=0.2, random_state=42
)
train_set = TensorDataset(X_tensor[indices_train], y_tensor[indices_train])
valid_set = TensorDataset(X_tensor[indices_valid], y_tensor[indices_valid])
loaders = {
"train": DataLoader(train_set, batch_size=batch_size, shuffle=True),
"valid": DataLoader(valid_set, batch_size=batch_size, shuffle=False),
}
model = model_class(**model_params)
best_model_wts = copy.deepcopy(model.state_dict())
optimizer = optimizer_class(model.parameters(), **opt_params)
scheduler = None
if lr_sche_class is not None:
scheduler = lr_sche_class(optimizer, **lr_sche_params)
best_score = 1e18
for _ in range(epochs):
losses = {"train": 0.0, "valid": 0.0}
for phase in ["train", "valid"]:
if phase == "train":
model.train()
else:
model.eval()
for batch_x, batch_y in loaders[phase]:
optimizer.zero_grad()
out = model(batch_x).T[0]
loss = criterion(out, batch_y)
losses[phase] += loss.item() * batch_x.size(0)
with torch.set_grad_enabled(phase == "train"):
if phase == "train":
loss.backward()
optimizer.step()
losses[phase] /= len(loaders[phase].dataset)
with torch.no_grad():
model.eval()
if best_score > losses["valid"]:
best_model_wts = copy.deepcopy(model.state_dict())
best_score = losses["valid"]
if scheduler is not None:
scheduler.step()
with torch.no_grad():
model.load_state_dict(best_model_wts)
model.eval()
out = model(X_tensor).T[0]
coef = torch.corrcoef(torch.stack((out, y_tensor)))[0, 1].detach()
return model, coef.item()
2.4. 初期教師データの作成¶
入力値 $\boldsymbol{x}$ に対して目的関数 $f(\boldsymbol{x})$ を評価し、$N_0$ 個の入出力ペア(初期教師データ)を作成します。ここでの入力値 $\boldsymbol{x}$ の決め方は様々ですが、乱数を用いたり、現象に対する知見に基づき機械学習に適した値を用いたりします。FMQA 毎に新規に初期教師データを構築する必要はなく、過去に実施した実験やシミュレーションの結果から、教師データを構築しても構いません。
今回は、非バイナリである整数入力値を考慮するため、乱数で整数入力ベクトルを生成し、さらに one-hot に基づいてバイナリベクトルに変換する関数
input_generator
を用います。input_generator
は、後に紹介する one-hot エンコーダー・デコーダークラス EncDecOneHot
内で定義されています。
def gen_training_data_flow_onehot(D: int, N0: int, true_func, input_generator):
X = np.zeros((N0, D))
y = np.zeros(N0)
print("")
print("###################################################")
print(" Making initial training data")
print("###################################################")
for i in range(N0):
print(f"Generating training data set #{i}.")
# (0 or 1)の要素からなる入力ベクトルをランダムに決定
x = input_generator()
# 既に全く同じ入力ベクトルが与えられている場合、入力値を再生成
is_identical = True
while is_identical:
is_identical = False
for j in range(i + 1):
if np.all(x == X[j, :]):
x = input_generator()
is_identical = True
break
# 目的関数 f(x) で入力値 x を評価し、入出力ペア (x,f(x)) を教師データにコピー
X[i, :] = x
y[i] = true_func(x)
print(f"------------------------")
return X, y
2.5. FMQA サイクルの実行クラス¶
FMQA_NB.cycle
では、事前に準備した初期教師データを用い、FMQA サイクルを $N-N_0$ 回実施します。FMQA_NB.step
は、FMQA を1サイクルのみ行う関数で、FMQA_NB.cycle
から $N-N_0$ 回呼び出されます。
非バイナリ変数を対象とした FMQA の場合、各変数をバイナリ変数にエンコードする必要があります。FMQA_NB
クラスでは、one-hot
エンコードを前提とします。非バイナリ変数のエンコード及びデコードは、それぞれ encoder_decoder.encoder
及び
encoder_decoder.decoder
として本クラスのインスタンス時に与えられます。また、nbins_array
は、各非バイナリ変数のエンコードに用いる総 bin 数(可変)のリストです。
from amplify import VariableGenerator, solve, one_hot, sum
import matplotlib.pyplot as plt
import sys
import time
# FMQA for non-binary inputs
class FMQA_NB:
def __init__(self, D, N, N0, k, true_func, client, encoder_decoder) -> None:
assert N0 < N
self.D = D
self.N = N
self.N0 = N0
self.k = k
self.true_func = true_func
self.client = client
self.nbins_array = encoder_decoder.nbins_array
self.encoder = encoder_decoder.encoder
self.decoder = encoder_decoder.decoder
self.y = None
# 教師データに基づいて N-N0 回の FMQA を教師データを追加しながら繰り返し実施するメンバー関数
def cycle(self, X, y, log: bool = False) -> np.ndarray:
# one-hot 制約条件の重み係数
constraint_weight = max([1, int(np.abs(y).max() + 0.5)])
print("")
print("###################################################")
print(f" Starting FMQA cycles... {constraint_weight=}")
print("###################################################")
pred_x = X[0]
pred_y = 1e18
self.y = y
i_sta = 0
for i in range(self.N - self.N0):
print(f"FMQA Cycle #{i} ")
start_time = time.perf_counter()
try:
x_hat = self.step(X, y, constraint_weight)
except RuntimeError:
sys.exit(f"Unknown error, i = {i}")
# x_hat として既に全く同じ入力が教師データ内に存在する場合、その周辺の値を x_hat とする。
is_identical = True
while is_identical:
is_identical = False
for j in range(i + self.N0):
if np.all(x_hat == X[j, :]):
# バイナリベクトルから整数ベクトルへデコードし、inputs へコピー
inputs = self.decoder(x_hat)
# 周辺の値を取得(inputs は整数ベクトル)
inputs += np.random.randint(-1, 2, len(inputs))
for i_inp in range(len(inputs)):
if inputs[i_inp] < 0:
inputs[i_inp] = 0
elif inputs[i_inp] > self.nbins_array[i_inp] - 1:
inputs[i_inp] = self.nbins_array[i_inp] - 1
# 整数ベクトルからバイナリベクトルへエンコードし、x_hat へコピー
x_hat = self.encoder(inputs)
if log:
print(f"{i=}, Identical x is found, {x_hat=}")
is_identical = True
break
# hat{x} で目的関数 f() を評価
y_hat = self.true_func(x_hat)
# 最適点近傍における入出力ペア [x_hat, y_hat] を教師データに追加
X = np.vstack((X, x_hat))
y = np.append(y, y_hat)
self.y = np.append(self.y, y_hat)
# 目的関数の評価値が最小値を更新したら、その入出力ペアを [pred_x, pred_y] へコピー
print(
f"FMQA Cycle #{i} finished in {time.perf_counter() - start_time:.2f}s",
end="",
)
if pred_y > y_hat:
pred_y = y_hat
pred_x = x_hat
print(f", variable updated, {pred_y=:.2f}")
else:
print()
print(f"------------------------")
return pred_x
# 1回のFMQAを実施するメンバー関数
def step(self, X, y, constraint_weight) -> np.ndarray:
# FM を機械学習
start_time = time.perf_counter()
model, corrcoef = train(
X,
y,
model_class=TorchFM,
model_params={"d": self.D, "k": self.k},
batch_size=8,
epochs=1000,
criterion=nn.MSELoss(),
optimizer_class=torch.optim.AdamW,
# 学習率をエポック数とともに小さくするスケジューラーを設定
opt_params={"lr": 0.5},
lr_sche_class=torch.optim.lr_scheduler.StepLR,
lr_sche_params={"step_size": 50, "gamma": 0.9},
)
print(
f"FM Training: {time.perf_counter() - start_time:.2f}s (corr:{corrcoef:.2f})"
)
start_time = time.perf_counter()
# 学習済みモデルから、FM パラメータの抽出
v, w, w0 = list(model.parameters())
v = v.detach().numpy()
w = w.detach().numpy()[0]
w0 = w0.detach().numpy()[0]
# ここから量子アニーリング (QA) を実施
gen = VariableGenerator() # 変数ジェネレータを宣言
q = gen.array("Binary", self.D) # 決定変数の作成
cost = self.__FM_as_QUBO(q, w0, w, v) # FM パラメータから QUBO として FM を定義
# 一つの変数が一つの値を持つように one-hot 制約
constraint_list = []
ista = 0
iend = 0
for i in range(len(self.nbins_array)):
iend += self.nbins_array[i]
constraint_list.append(one_hot(q[ista:iend]))
ista = iend
constraints = sum(constraint_list)
# 目的関数と制約条件を足し合わし、Amplify のソルバーに受け渡し
model = cost + constraint_weight * constraints
result = solve(model, self.client) # 目的関数を Amplify のソルバーに受け渡し
print(
f"QUBO: {time.perf_counter() - start_time:.2f}s (AE execution: {result.execution_time.seconds:.2f}s)"
)
if len(result.solutions) == 0:
raise RuntimeError("No solution was found.")
q_values = q.evaluate(result.best.values)
return q_values
# FM パラメータから QUBO として FM を定義する関数。前定義の TorchFM クラスと同様に、g(x) の関数形通りに数式を記述。
def __FM_as_QUBO(self, x, w0, w, v):
lin = w0 + (x.T @ w)
out_1 = np.array([(x * v[:, i]).sum() ** 2 for i in range(self.k)]).sum()
# 次式において、x[j] はバイナリ変数なので、x[j] = x[j]^2 であることに注意。
out_2 = np.array([(x * v[:, i] * v[:, i]).sum() for i in range(self.k)]).sum()
return lin + (out_1 - out_2) / 2
# 初期教師データ及び各 FMQA サイクル内で実施した N 回の目的関数評価値の履歴をプロットする関数
def plot_history(self):
assert self.y is not None
fig = plt.figure(figsize=(6, 4))
plt.plot(
[i for i in range(self.N0)],
self.y[: self.N0],
marker="o",
linestyle="-",
color="b",
) # 初期教師データ生成時の目的関数評価値
plt.plot(
[i for i in range(self.N0, self.N)],
self.y[self.N0 :],
marker="o",
linestyle="-",
color="r",
) # FMQA サイクル時の目的関数評価値
plt.plot(
[i for i in range(self.N)],
[self.y[:i].min() for i in range(1, self.N + 1)],
linestyle="-",
color="k",
) # 目的関数最小値の更新履歴
plt.xlabel("i-th evaluation of f(x)", fontsize=18)
plt.ylabel("f(x)", fontsize=18)
plt.tick_params(labelsize=18)
return fig
2.6. One-hot エンコーダー・デコーダーの実装¶
1.3 節で紹介したように、信号機制御パラメータ生成クラス SignalController
を使う際、入力として整数インデックスが必要です。一方、QUBO への入力は、バイナリ変数である必要があるため、整数インデックスからバイナリ変数へのエンコーディングが必要となります。今回は、one-hot
エンコーディングを用います。例えば、1から4の値を取りうる整数を4ビットで表す one-hot エンコーディングでは、
- 1 は
[1, 0, 0, 0]
- 2 は
[0, 1, 0, 0]
- 3 は
[0, 0, 1, 0]
- 4 は
[0, 0, 0, 1]
のようにバイナリベクトルで表現されます。以下の EncDecOneHot
クラスでは、整数インデックスからバイナリ変数への変換・逆変換を行う関数(encoder()
、decoder()
)や、整数インデックス入力ベクトルを乱数で生成し、バイナリ変数行列へ変換する関数
gen_random_input()
が定義されています。
class EncDecOneHot:
def __init__(self, D: int, nbins_array):
self.D = D
self.nbins_array = nbins_array
def gen_random_input(self):
ista = 0
iend = 0
x = np.zeros(self.D, int)
for i in range(len(self.nbins_array)):
iend += self.nbins_array[i]
idx = np.random.randint(ista, iend, 1)[0]
x[idx] = 1
ista = iend
return x
def encoder(self, inputs):
if len(inputs) != len(self.nbins_array):
raise RuntimeError("inputs should be the same length as nbins_array!")
x = np.zeros(self.D, int)
i_offset = 0
for i in range(len(inputs)):
x[inputs[i] + i_offset] = 1
i_offset += self.nbins_array[i]
return x
def decoder(self, x):
bit_locs = np.where(x == 1)[0]
if len(bit_locs) != len(self.nbins_array):
raise RuntimeError("bit_locs should be the same length as nbins_array!")
i_offset = 0
for i in range(1, len(bit_locs)):
i_offset += self.nbins_array[i - 1]
bit_locs[i] -= i_offset
return bit_locs
3. ブラックボックス最適化による信号機制御の最適化¶
3.1. 最適化の実行¶
それでは、MAS 交通シミュレータ(1節)で予測された全車の平均車速の負値を目的関数として、2節で実装したブラックボックス最適化クラス
FMQA
により、対象都市に設置された信号機網の制御を最適化します。最適化は、目的関数値を最小化する方向に進めますので、最適化後の信号機制御により全自動車の平均車速が最大化することが期待されます。本交通シミュレータにおいて、平均車速はシミュレーション後に
get_latest_mean()
で取得することが可能です。最適化過程における目的関数の各評価では、初期条件(出発地と目的地)がランダムに決定された
n_cases
個のシミュレーションケースを考慮し、その平均値を目的関数値としています。
以下では、デモ・チュートリアルの環境における実行時間制限のために、最低限の動作確認を目的として、目的関数を評価できる回数 $N$ を 3 回、そのうち初期データの生成のための評価回数 $N_0$ を 2 回としています(従って全 $N-N_0=1$ 回の最適化サイクル数を考慮)。1 回の最適化サイクルという非常に限られた最適化のため、ランダム探索に相当する初期データより良い解が見つからない場合もありますが、一連の動作をご確認ください。
十分なサイクル数を考慮した適切な最適化を実施するには、サンプルコードを手元の環境にダウンロード頂き、目的関数の評価回数 $N$ を増加させた上で最適化をお試しください。例えば $N=100$ 程度の条件では最適化過程終了時には、平均車速としておよそ 8 m/s程度の交通状況を実現することができます。この場合、全最適化サイクルが終了するまでに2時間程度要する点にご注意ください。
$N_0=100$ における出力例及び最適信号機制御に基づく/基づかない場合の交通シミュレーション結果の比較アニメーションを、『3.4. 本サンプルコードによる実行例』に紹介しています。
import time
# 乱数シード値を初期化
seed_everything()
# 碁盤目状都市の大きさを指定。city_size*city_size の格子マップ
city_size = 4
# 信号設定生成クラスのインスタンス化
signal_controller = SignalController(city_size)
# 各変数の one-hot エンコーディングの bin サイズの1次元配列
nbins_array = np.array(
[
[
signal_controller.nbins_red_duration,
signal_controller.nbins_green_duration,
signal_controller.nbins_phase,
]
for i in range(city_size**2)
]
).reshape(-1)
# バイナリ変数の全体のサイズ(QUBO が取り扱うバイナリ決定変数のサイズ)
D = nbins_array.sum()
print(f"Number of binary decision variables: {D}")
# One-hot エンコーダー・デコーダークラス(EncDecOneHot)のインスタンス化
enc_dec_one_hot = EncDecOneHot(D, nbins_array)
# 目的関数評価回数のカウンター
n_eval = 0
num_cars = 100
ratio_mall = 0.5
# 目的関数(信号機制御パラメータの取得 → 交通シミュレーション実施 → 目的関数値の計算)
def obj_func(x):
global n_eval
# 求解結果(バイナリ)から、信号機制御パラメータ(赤の期間、緑の期間、位相)を表現する整数インデックスに変換する。
index_list = enc_dec_one_hot.decoder(x)
# さらに、signal_controller.decode メソッドにより、整数インデックスから信号機制御パラメータ(実数)に変換する。
signal_info = signal_controller.decode(index_list)
# 同じ信号機制御で、n_cases 回の異なるシミュレーションを実施。
n_cases = 5
costs = []
print("Cost convergence: ", end="")
start_time = time.perf_counter()
for i in range(n_cases):
# TrafficSimulator2Malls のインスタンス化(初期条件を決定するシード値は乱数)
traffic_sim = TrafficSimulator2Malls(
signal_info=signal_info,
num_cars=num_cars,
ratio_mall=ratio_mall,
seed=np.random.randint(0, 100),
)
# シミュレーションの実行
traffic_sim.run(steps=20000)
# 各シミュレーションにおける全自動車の平均速度の負値を取得し、costs に追加
costs.append(-traffic_sim.get_latest_mean())
print(f"{sum(costs) / (i + 1):.2f} ", end="")
# n_cases 回のシミュレーションの平均出力値を最終的な目的関数値とする。
cost = sum(costs) / n_cases
print(
f"\nEvaluation #{n_eval} finished in {time.perf_counter() - start_time:.2f}s. cost:{cost:.2f}"
)
n_eval += 1
return cost
N = 3 # 関数を評価できる回数。『3.4. 本サンプルコードによる FMQA 実行例』では 100 と設定
N0 = 2 # 初期教師データのサンプル数。『3.4. 本サンプルコードによる FMQA 実行例』では 10 と設定
k = 20 # FMにおけるベクトルの次元(ハイパーパラメータ)
# 時間計測用開始時刻
start_time = time.perf_counter()
# 初期教師データの生成
X, y = gen_training_data_flow_onehot(D, N0, obj_func, enc_dec_one_hot.gen_random_input)
# FMQA のインスタンス化
fmqa_solver = FMQA_NB(
D=D,
N=N,
N0=N0,
k=k,
true_func=obj_func,
client=client,
encoder_decoder=enc_dec_one_hot,
)
# FMQA サイクルの実行
pred_x = fmqa_solver.cycle(X, y)
# 最適化結果の出力
print("###################################################")
print(" Optimal input values and corresponding output")
print("###################################################")
print(f"pred x: {pred_x}")
print(f"pred value: {obj_func(pred_x):.2f}")
print(f"Elapsed time: {time.perf_counter() - start_time:.2f}s")
3.2. 最適化過程における目的関数値の推移¶
初期教師データ作成時にランダムに生成した入力値に対して得られた $N_0$ 個の目標関数値及び $N-N_0$ サイクルの FMQA 最適化過程における目標関数値の推移を以下にプロットします。それぞれ、青色及び赤色で示されています。また、出力例を、『3.4. 本サンプルコードによる実行例』に紹介しています。十分大きな $N$ を設定し、FMQA 最適化サイクルにより得られた入力値 $\hat{x}$ により、目的関数値の最小値が次々と更新される様子を確認してください。
fig = fmqa_solver.plot_history()
3.3. 最適化された信号機設定での交通シミュレーション¶
ブラックボックス最適化で得られた最適制御パラメータを用いて、交通シミュレーションを再度実施します。実施されたシミュレーションの一例を、アニメーションで『3.3. 本サンプルコードによる FMQA 実行例』に紹介していますのでご覧ください。
# 最適化の際のシミュレーションと同じ条件設定
city_size = 4
num_cars = 100
ratio_mall = 0.5
signal_controller = SignalController(city_size)
# 最適化結果 pred_x から整数インデックスを構築
index_list = enc_dec_one_hot.decoder(pred_x)
print(f"{index_list=}")
# 整数インデックスを信号機制御パラメータに変換
signal_info = signal_controller.decode(index_list)
# シミュレータのインスタンス化
traffic_sim = TrafficSimulator2Malls(
signal_info=signal_info, num_cars=num_cars, ratio_mall=ratio_mall
)
# シミュレーションの実施
traffic_sim.run(2000, animation=True, interval=100)
3.4. 本サンプルコードによる実行例¶
一般的に、FixstarsClient
で採用されているヒューリスティクスというアルゴリズムの原理上、得られる解に完全な再現性はありませんが、$N=100, N_0=10$
の条件において本サンプルコードを実行した際に得られる、典型的な標準出力及び画像出力を以下に紹介します。※得られる値が異なる場合があります。
-
『3.1. 最適化の実行』に記載のコードを条件を変えずに実行すると、次のような標準出力が逐次表示されます。
-
まず、初期教師データ作成中に生成される、乱数に基づいて制御された信号機に対する交通シミュレーション結果が逐次出力されます($N_0$ 回)。
Number of binary decision variables: 960 ################################################### Making initial training data ################################################### Generating training data set #0. Cost convergence: -2.40 -3.41 -3.32 -3.53 -3.95 Evaluation #0 finished in 52.88s. cost:-3.95 ------------------------ Generating training data set #1. Cost convergence: -6.56 -6.06 -6.13 -6.00 -5.50 Evaluation #1 finished in 50.06s. cost:-5.50 ------------------------ Generating training data set #2. Cost convergence: -6.19 -6.58 -6.59 -6.56 -6.49 Evaluation #2 finished in 47.57s. cost:-6.49 ------------------------ Generating training data set #3. Cost convergence: -4.18 -4.01 -4.07 -4.10 -4.13 Evaluation #3 finished in 53.65s. cost:-4.13 ------------------------ Generating training data set #4. Cost convergence: -3.79 -4.32 -4.19 -3.73 -3.90 Evaluation #4 finished in 55.38s. cost:-3.90 ------------------------ Generating training data set #5. Cost convergence: -1.68 -3.16 -3.20 -3.22 -3.06 Evaluation #5 finished in 56.32s. cost:-3.06 ------------------------ Generating training data set #6. Cost convergence: -6.94 -5.61 -5.75 -5.71 -5.90 Evaluation #6 finished in 50.48s. cost:-5.90 ------------------------ Generating training data set #7. Cost convergence: -5.38 -6.08 -5.45 -5.38 -5.43 Evaluation #7 finished in 51.79s. cost:-5.43 ------------------------ Generating training data set #8. Cost convergence: -3.43 -3.91 -4.12 -4.55 -4.85 Evaluation #8 finished in 50.39s. cost:-4.85 ------------------------ Generating training data set #9. Cost convergence: -3.77 -4.18 -4.31 -4.37 -4.08 Evaluation #9 finished in 54.33s. cost:-4.08 ------------------------
-
初期教師データ構築が終了後、 $N-N_0$ 回の 最適化サイクルが始まります。サイクル中は、最適化の一度の試行ごとに次のような出力が逐次表示されます。
################################################### Starting FMQA cycles... constraint_weight=6 ################################################### FMQA Cycle #0 FM Training: 1.55s (corr:0.61) QUBO: 11.52s (AE execution: 5.05s) Cost convergence: -3.95 -3.91 -3.59 -3.63 -3.58 Evaluation #10 finished in 51.21s. cost:-3.58 FMQA Cycle #0 finished in 64.31s, variable updated, pred_y=-3.58 ------------------------ FMQA Cycle #1 FM Training: 1.62s (corr:0.04) QUBO: 10.90s (AE execution: 5.00s) Cost convergence: -3.06 -3.52 -3.99 -4.14 -4.40 Evaluation #11 finished in 52.14s. cost:-4.40 FMQA Cycle #1 finished in 64.69s, variable updated, pred_y=-4.40 ------------------------ FMQA Cycle #2 FM Training: 2.53s (corr:0.73) QUBO: 10.34s (AE execution: 4.92s) Cost convergence: -6.94 -5.87 -5.73 -5.74 -6.05 Evaluation #12 finished in 48.41s. cost:-6.05 FMQA Cycle #2 finished in 61.32s, variable updated, pred_y=-6.05 ------------------------ FMQA Cycle #3 FM Training: 3.15s (corr:0.85) QUBO: 10.34s (AE execution: 4.93s) Cost convergence: -4.43 -5.11 -5.31 -5.06 -5.11 Evaluation #13 finished in 45.11s. cost:-5.11 FMQA Cycle #3 finished in 58.63s ------------------------ ・ ・ ・ ------------------------ FMQA Cycle #34 FM Training: 5.82s (corr:0.95) QUBO: 10.98s (AE execution: 4.97s) Cost convergence: -7.84 -8.14 -7.96 -7.85 -7.89 Evaluation #44 finished in 48.05s. cost:-7.89 FMQA Cycle #34 finished in 64.88s ------------------------ FMQA Cycle #35 FM Training: 9.23s (corr:0.95) QUBO: 14.44s (AE execution: 4.94s) Cost convergence: -7.47 -7.20 -6.80 -6.60 -6.76 Evaluation #45 finished in 59.24s. cost:-6.76 FMQA Cycle #35 finished in 82.94s ------------------------ FMQA Cycle #36 FM Training: 12.03s (corr:0.95) QUBO: 14.06s (AE execution: 4.99s) Cost convergence: -7.80 -8.14 -8.10 -8.20 -8.21 Evaluation #46 finished in 60.88s. cost:-8.21 FMQA Cycle #36 finished in 87.03s, variable updated, pred_y=-8.21 ------------------------ ・ ・ ・ ------------------------ FMQA Cycle #88 FM Training: 8.81s (corr:0.92) QUBO: 10.11s (AE execution: 4.95s) Cost convergence: -5.62 -5.64 -5.83 -5.90 -5.96 Evaluation #98 finished in 45.61s. cost:-5.96 FMQA Cycle #88 finished in 64.57s ------------------------ FMQA Cycle #89 FM Training: 8.72s (corr:0.94) QUBO: 11.70s (AE execution: 4.99s) Cost convergence: -5.34 -5.79 -6.10 -5.93 -6.14 Evaluation #99 finished in 44.31s. cost:-6.14 FMQA Cycle #89 finished in 64.76s ------------------------ ################################################### Optimal input values and corresponding output ################################################### pred x: [0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0.] Cost convergence: -7.43 -7.56 -7.83 -7.85 -7.91 Evaluation #100 finished in 45.37s. cost:-7.91 pred value: -7.91 Elapsed time: 6319.51s
-
『3.2. FMQA 最適化過程における目的関数値の推移』に記載の
fmqa_reactor.plot_history()
による出力画像は次のようになり、ブラックボックス最適化過程において、目的関数値の最小値が次々更新される様子が示されています。例えば、$N=100$ の試行では、ブラックボックス最適化により最終的に得られた信号機制御パターンによる平均車速は、ランダム探索(青色で示される初期教師データ構築)過程で取得された最も良い信号機制御における平均車速に比べて 25% 程度向上しています。 -
『3.3. 最適化された信号機設定での交通シミュレーション』では、最適化された信号機制御を考慮した場合の交通シミュレーション結果をアニメーションで表示します。上記の最適化推移において取得された①最終的な最適化結果に基づく信号機制御による交通シミュレーション結果を、②ランダム探索過程における最良信号機制御に基づくシミュレーション結果とともに以下に示します。両方のシミュレーションにおいて、同じ初期条件を考慮しています。
①最終的な最適化結果に基づく信号機制御の場合とは異なり、②ランダム探索過程における最良結果に基づく信号機制御では、ショッピングモール周辺の交差点において比較的長い渋滞が発生し、さらにその周辺の交差点からの自動車の進入が妨げられている現象が複数観察されます。
-
①最終的な最適化結果に基づく信号機制御(全車平均車速:8.55 m/s)
-
-