交通信号の最適化-QUBO 定式化に基づく実装¶
交通渋滞は経済及び環境保全の観点から重要な社会問題であると考えられます。本サンプルコードでは、都市において渋滞がなるべき起きないように、刻一刻と変化する交通状況に応じてリアルタイムに信号機制御することを考えます。その様な信号制御を QUBO 定式化に基づく組合せ最適化問題として考慮し、交通量をシミュレーションします。
同様の信号機制御への取り組みとして、ブラックボックス最適化(機械学習と組合せ最適化の融合)とマルチ・エージェント・シミュレーションによる応用例をこちらで紹介していますので、合わせてご覧ください。
QUBO 式に基づいて信号制御を行う、本サンプルプログラムの問題設定、信号機制御に関する定式化及び実装は次の順番で行います。
- 1. 問題設定
- 2. 定式化
- 3. 信号機最適制御の実装
- 3.1. 前準備
- 3.2. 道路に関するクラス
Roads
- 3.3. 信号に関するクラス
Signals
- 3.4. 車数に関するクラス
Traffic
- 3.5. イジング変数に関するクラス
IsingVariable
- 3.6. 初期状態の決定
- 3.7. シミュレーションに関するクラス
Simulation
- 3.8.
main
関数
- 4. まとめ
1. 問題設定¶
本サンプルコードで取り扱う都市やその交通に関し、簡単のため以下のような仮定を設けます。
-
都市は正方形の格子状道路網を有する
-
各交差点には信号機があり、時刻ごとに南北方向と東西方向の通行を制御している
-
都市外部に対する車の流出や流入はない
-
全ての交差点において、直進、右左折する車の割合は、一定である(つまり各車においては特定の出発地や目的地を持たない)
上記の単純化に基づき、本サンプルコードでは、次の3つの方針に基づいて信号の最適制御を実施します。
-
車の流れを良くする
今回は、「車の流れが良い」状態を「各交差点において、南北からの流入数と東西からの流入数に偏りが小さい」状態と定義します。
-
信号の色はできるだけ同じ色であり続ける
現実の信号は頻繁に変化しないためです。
-
各時刻において流れる車数を最大化する
流れる車数が多いということは、信号待ちしている車が少ないということになります。
以上の方針をできるだけ満たすような信号機の状態を組合せ最適化として求解します。
2. 定式化¶
2.1. 考え方¶
以上の方針で最適な信号機の制御を実現するために、必要な条件を具体化していきます。まず、定式化に必要な定数・変数を定義します。
定数¶
-
$N$:南北・東西の信号機の数
-
$a$:各交差点において直進する確率
-
$\displaystyle\frac{1-a}{2}$:左右それぞれに曲がる確率
変数¶
-
$s_i(t)$:時刻 $t$ における交差点 $i$ の信号機の状態
-
$s_i(t)=1$:南北方向が青(東西方向が赤)
-
$s_i(t)=-1$:東西方向が青(南北方向が赤)
-
-
$q_i(t)$:時刻 $t$ における隣接する交差点から交差点 $i$ に向かう車数(各時刻で交差点を流れることができる最大車数を1に正規化)
-
$q_i^n(t)$:北方向から流入する車数
-
$q_i^s(t)$:南方向から流入する車数
-
$q_i^e(t)$:東方向から流入する車数
-
$q_i^w(t)$:西方向から流入する車数
-
車数の時間変化¶
車数 $q_i$ は、以下に示す保存則に従って、毎時刻更新されます。以下に $q_i$ の各時刻での更新方法を示します。
まず、$q_i$ は次の時刻 $t+1$ では、直進率 $a$ と信号機の状態をもとに $q_i^n(t)$~$q_i^w(t)$ が交差点 $i$ から出る道路に分配されます。下図では、$s_i(t)=1$ のときに $q_i^n(t)$ がどのように分配されるかを表しています。
-
車の流入量
$q_i$ の各時刻の流入量は、交差点 $i$ と接続している交差点 $j$ から $i$ に向かって出ていく車数 $r_j$ と等しくなります。
この時の交差点 $j$ について考えます。$r_j(t)$ は、以下のような行列の計算式を定義することで $q_j^n(t)$~$q_j^w(t)$ から計算できます。
$$ \begin{bmatrix} r_j^n(t) \\ r_j^s(t) \\ r_j^e(t) \\ r_j^w(t) \end{bmatrix} = A_{s_j} \: \begin{bmatrix} q_j^n(t) \\ q_j^s(t) \\ q_j^e(t) \\ q_j^w(t) \end{bmatrix} $$
ここで、$A_{s_j}$ は、信号機の状態と直進率から決定される定数を要素とする $4$ 次正方行列です。
$$ A_{s_j=1} = \begin{bmatrix} 0 & a & 0 & 0 \\ a & 0 & 0 & 0 \\ \displaystyle\frac{1-a}{2} & \displaystyle\frac{1-a}{2} & 0 & 0 \\ \displaystyle\frac{1-a}{2} & \displaystyle\frac{1-a}{2} & 0 & 0 \\ \end{bmatrix} $$ $$ A_{s_j=-1} = \begin{bmatrix} 0 & 0 & \displaystyle\frac{1-a}{2} & \displaystyle\frac{1-a}{2} \\ 0 & 0 & \displaystyle\frac{1-a}{2} & \displaystyle\frac{1-a}{2} \\ 0 & 0 & a & 0 \\ 0 & 0 & 0 & a \\ \end{bmatrix} $$
上記の行列計算により、$q_i$ の流入量 $r_j$ が求められます。
-
車の流出量
次に、$q_i$ の各時刻の流出量について記述します。各時刻で交差点を流れることができる最大車数を1に正規化したので、$q_i^n(t)$~$q_i^w(t)$ の流出量はそれぞれ $\text{min}$ を取って以下のように書けます。
$$ \begin{bmatrix} d_j^n(t) \\ d_j^s(t) \\ d_j^e(t) \\ d_j^w(t) \end{bmatrix} = \begin{bmatrix} \displaystyle\frac{1+s_i}{2} \text{min}(q_i^n(t), 1) \\ \displaystyle\frac{1+s_i}{2} \text{min}(q_i^s(t), 1) \\ \displaystyle\frac{1-s_i}{2} \text{min}(q_i^e(t), 1) \\ \displaystyle\frac{1-s_i}{2} \text{min}(q_i^w(t), 1) \end{bmatrix} $$
例として、南北方向が青の場合、$q_j^n(t)$ と $q_j^s(t)$ からは、それぞれ $\text{min}(q_j^n(t), 1)$、$\text{min}(q_j^s(t), 1)$ 流出します。また、$q_j^e(t)$ と $q_j^w(t)$ は東西方向の信号が赤なので流出しません。
以上より、$q_i$ の変化量(=流出量+流出量)が求められます。現在の車数 $q_i(t)$ と求めた変化量をもとに、次の時刻における車数 $q_i(t+1)$ として更新されます。
2.2. 定式化¶
『1. 問題設定』で紹介した3つの方針に、次のような時刻の概念を導入し、目的関数の定式化を行います。
-
次の時刻 $t+1$ で、各交差点において南北方向と東西方向から流入する車数に偏りがない
南北方向からの車数と東西方向からの車数の差を最小化する
$$ \text{minimize}\quad\sum_{i}^{N^2}\Bigl(\bigl(q_i^n(t+1)+q_i^s(t+1)\bigr)-\bigl(q_i^e(t+1)+q_i^w(t+1)\bigr)\Bigr)^2 $$
-
信号の色ができるだけ同じ色であり続ける
前の時刻 $t-1$ との信号機の状態の差を最小化する
$$ \text{minimize}\quad\sum_{i}^{N^2}\left(\frac{s_i(t)-s_i(t-1)}{2}\right)^2 $$
-
次の時刻 $t+1$ で、流れる車数を最大化する
南北方向と東西方向のどちらを青にすれば流れる車数を最大化できるか
$$ \text{maximize}\quad\sum_{i}^{N^2}\left(\frac{1+s_i(t)}{2}\bigl(q_i^n(t)+q_i^s(t)\bigr)-\frac{1-s_i(t)}{2}\bigl(q_i^e(t)+q_i^w(t)\bigr)\right) $$
これらの関数を足し合わせることで、最終的な目的関数 $\text{cost}$ を得ることができます。
-
car_bias_cost
$\displaystyle= \sum_{i}^{N^2}\Bigl(\bigl(q_i^n(t+1)+q_i^s(t+1)\bigr)-\bigl(q_i^e(t+1)+q_i^w(t+1)\bigr)\Bigr)^2$ -
car_bias_cost
$\displaystyle= \sum_{i}^{N^2}\Bigl(\bigl(q_i^n(t+1)+q_i^s(t+1)\bigr)-\bigl(q_i^e(t+1)+q_i^w(t+1)\bigr)\Bigr)^2$ -
signal_cost
$\displaystyle= \sum_{i}^{N^2}\left(\frac{s_i(t)-s_i(t-1)}{2}\right)^2$ -
car_flow_cost
$\displaystyle= -\sum_{i}^{N^2}\left(\frac{1+s_i(t)}{2}\bigl(q_i^n(t)+q_i^s(t)\bigr)-\frac{1-s_i(t)}{2}\bigl(q_i^e(t)+q_i^w(t)\bigr)\right)$ -
cost
$\displaystyle= \alpha \times$car_bias_cost
$\displaystyle+ \beta \times$signal_cost
$\displaystyle+ \gamma \times$car_flow_cost
ここで、$\alpha$、$\beta$、$\gamma$ はそれぞれの目的関数の項の重みであり、これらを調整することでより良い結果を得ることができます。 ここで、$\alpha,\beta,\gamma > 0$ です。
本サンプルコードでは、この目的関数 $\text{cost}$ を最小化するような現在の時刻 $t$ での信号の状態 $s(t)$ を組合せ最適化により求める、という操作を各時刻で繰り返します。
2.3. 定式化のまとめ¶
以上から、交通信号最適化問題は以下のように定式化できます。制約条件はありません。
-
minimize
cost
$= \alpha\: \times$car_bias_cost
$+ \beta\: \times$signal_cost
$+ \gamma\: \times$car_flow_cost
-
car_bias_cost
$\displaystyle= \sum_{i}^{N^2}\Bigl(\bigl(q_i^n(t+1)+q_i^s(t+1)\bigr)-\bigl(q_i^e(t+1)+q_i^w(t+1)\bigr)\Bigr)^2$ -
signal_cost
$\displaystyle= \sum_{i}^{N^2}\left(\frac{s_i(t)-s_i(t-1)}{2}\right)^2$ -
car_flow_cost
$\displaystyle= \sum_{i}^{N^2}-\left(\frac{1+s_i(t)}{2}\bigl(q_i^n(t)+q_i^s(t)\bigr)-\frac{1-s_i(t)}{2}\bigl(q_i^e(t)+q_i^w(t)\bigr)\right)$ -
$s_i(t) \in \{-1, 1\}$
%matplotlib widget
from __future__ import annotations
import itertools
import random
import numpy as np
from amplify import IsingSymbolGenerator, IsingPolyArray, sum_poly, Solver, IsingPoly
from matplotlib.animation import FuncAnimation
import matplotlib.pyplot as plt
from sigfig import round
from amplify.client import FixstarsClient
from typing import Union, Tuple
# Fixstars Amplify クライアントを作成
client = FixstarsClient()
# client.token = "API トークンを入力してください"
client.parameters.timeout = 1000 # タイムアウト1秒
class Roads:
def __init__(
self,
num_road: int,
init_can_come_from: list[np.ndarray],
straight_rate: float = 0.6,
):
# 交差点にその方角から進入できるか
self._can_come_from_north = init_can_come_from[0]
self._can_come_from_south = init_can_come_from[1]
self._can_come_from_east = init_can_come_from[2]
self._can_come_from_west = init_can_come_from[3]
# 与えられた変数の条件をチェック
assert num_road == len(self._can_come_from_north) - 2
assert self._can_come_from_north.shape == self._can_come_from_south.shape
assert self._can_come_from_north.shape == self._can_come_from_east.shape
assert self._can_come_from_north.shape == self._can_come_from_west.shape
assert self._can_come_from_north.shape[0] == self._can_come_from_north.shape[1]
assert 0 <= straight_rate <= 1
# 都市の1辺にある交差点の個数
self._num_road = num_road
# 交差点での直進する車数の割合
self._straight_rate = straight_rate
# 交差点からどの方角に進めるか=交差点にどの方角から進入できるかのroll
self._can_go_to_north = np.roll(self._can_come_from_south, 1, axis=0)
self._can_go_to_south = np.roll(self._can_come_from_north, -1, axis=0)
self._can_go_to_east = np.roll(self._can_come_from_west, -1, axis=1)
self._can_go_to_west = np.roll(self._can_come_from_east, 1, axis=1)
@property
def num_road(self):
return self._num_road
@property
def straight_rate(self):
return self._straight_rate
@property
def can_come_from_north(self):
return self._can_come_from_north
@property
def can_come_from_south(self):
return self._can_come_from_south
@property
def can_come_from_east(self):
return self._can_come_from_east
@property
def can_come_from_west(self):
return self._can_come_from_west
@property
def can_go_to_north(self):
return self._can_go_to_north
@property
def can_go_to_south(self):
return self._can_go_to_south
@property
def can_go_to_east(self):
return self._can_go_to_east
@property
def can_go_to_west(self):
return self._can_go_to_west
3.3.
信号に関するクラス Signals
¶
Signals
クラスでは、信号機の状態を保存する次の変数を管理しています。
- 信号機の状態
init_signal
,pre_signal
,current_signal
:信号が南北方向に青か東西方向に青かsignal_weight
:コスト関数での信号機に関する項の重み
class Signals:
def __init__(self, roads: Roads, signal_weight: float, seed: int = 0):
self._roads = roads
# コスト関数での信号機に関する項の重み
self._signal_weight = signal_weight
# 信号機の初期状態
init_signal = np.zeros((self._roads.num_road + 2, self._roads.num_road + 2))
# 信号機の初期化
random.seed(seed)
for i, j in itertools.product(
range(1, self._roads.num_road + 1), range(1, self._roads.num_road + 1)
):
if (random.randint(0, 1)) == 0:
init_signal[i, j] = -1
else:
init_signal[i, j] = 1
# ひとつ前の信号機の状態
self._pre_signal = np.copy(init_signal)
# 現在の信号機の状態(式では時刻t)
self._current_signal = init_signal
@property
def signal_weight(self):
return self._signal_weight
@property
def pre_signal(self):
return self._pre_signal
@pre_signal.setter
def pre_signal(self, value):
self._pre_signal = value
@property
def current_signal(self):
return self._current_signal
@current_signal.setter
def current_signal(self, value):
self._current_signal = value
3.4.
車数に関するクラス Traffic
¶
Traffic
クラスでは、車数に関する処理を管理しています。
next_step_num_car
では、まず『2. 定式化準備』で導入した $A_{s_j}$
を求め、車の流入量及び流出量を計算します。それらをもとに次の時刻における車数を計算します。この関数では、引数である信号機の状態がイジング変数を含む場合と NumPy 配列の場合の 2
パターンあるので、それによって異なる処理をしている箇所があります。
また、car_bias_model
と car_flow
では、コスト関数の car_bias_cost
と
car_flow_cost
のための計算をしています。
class Traffic:
def __init__(
self,
roads: Roads,
init_cars: list[np.ndarray],
car_bias_weight: float,
car_flow_weight: float,
):
self._roads = roads
# 交差点にそれぞれの方角から向かう車数
self._num_car_from_north = init_cars[0]
self._num_car_from_south = init_cars[1]
self._num_car_from_east = init_cars[2]
self._num_car_from_west = init_cars[3]
# コスト関数での流れる車数に関する項の重み
self._car_flow_weight = car_flow_weight
# コスト関数での車数の偏りに関する項の重み
self._car_bias_weight = car_bias_weight
@property
def num_car_from_north(self):
return self._num_car_from_north
@num_car_from_north.setter
def num_car_from_north(self, value):
self._num_car_from_north = value
@property
def num_car_from_south(self):
return self._num_car_from_south
@num_car_from_south.setter
def num_car_from_south(self, value):
self._num_car_from_south = value
@property
def num_car_from_east(self):
return self._num_car_from_east
@num_car_from_east.setter
def num_car_from_east(self, value):
self._num_car_from_east = value
@property
def num_car_from_west(self):
return self._num_car_from_west
@num_car_from_west.setter
def num_car_from_west(self, value):
self._num_car_from_west = value
@property
def car_flow_weight(self):
return self._car_flow_weight
@property
def car_bias_weight(self):
return self._car_bias_weight
# 次の時刻の車数の計算
def _next_step_num_car(
self, signal: Union[np.ndarray, IsingPolyArray]
) -> Union[
Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray],
Tuple[IsingPolyArray, IsingPolyArray, IsingPolyArray, IsingPolyArray],
]:
# 次の時刻の車数を保存する変数を定義
if isinstance(signal, IsingPolyArray):
temp_num_car_from_north = IsingPolyArray(self._num_car_from_north)
temp_num_car_from_south = IsingPolyArray(self._num_car_from_south)
temp_num_car_from_east = IsingPolyArray(self._num_car_from_east)
temp_num_car_from_west = IsingPolyArray(self._num_car_from_west)
else:
temp_num_car_from_north = self._num_car_from_north.copy()
temp_num_car_from_south = self._num_car_from_south.copy()
temp_num_car_from_east = self._num_car_from_east.copy()
temp_num_car_from_west = self._num_car_from_west.copy()
# 各交差点について
for i, j in itertools.product(
range(1, self._roads.num_road + 1), range(1, self._roads.num_road + 1)
):
# 各方角から進入してくる車数の配列
from_car_array = np.zeros(4)
from_car_array[0] = min(self._num_car_from_north[i, j], 1)
from_car_array[1] = min(self._num_car_from_south[i, j], 1)
from_car_array[2] = min(self._num_car_from_east[i, j], 1)
from_car_array[3] = min(self._num_car_from_west[i, j], 1)
# 南北方向が青か
NS_flow_flag = (-signal[i, j] * (signal[i, j] - 1)) / 2 + 1
# 東西方向が青か
EW_flow_flag = (-signal[i, j] * (signal[i, j] + 1)) / 2 + 1
# 交差点から出ていく車数を交差点に向かう車数から計算するための行列 A_s_j
if isinstance(signal, IsingPolyArray):
from_to_array = IsingPolyArray(np.zeros((4, 4)))
else:
from_to_array = np.zeros((4, 4))
# 北方向に向かう車数を計算するための行列要素
fromS_toN = NS_flow_flag * (
(signal[i, j] ** 2) * (self._roads.straight_rate - 1) + 1
)
fromE_toN = EW_flow_flag * (
(1 - self._roads.can_go_to_south[i, j] / 2)
* (1 - (self._roads.straight_rate * self._roads.can_go_to_west[i, j]))
)
fromW_toN = EW_flow_flag * (
(1 - self._roads.can_go_to_south[i, j] / 2)
* (1 - (self._roads.straight_rate * self._roads.can_go_to_east[i, j]))
)
# 南方向に向かう車数を計算するための行列要素
fromN_toS = NS_flow_flag * (
(signal[i, j] ** 2) * (self._roads.straight_rate - 1) + 1
)
fromE_toS = EW_flow_flag * (
(1 - self._roads.can_go_to_north[i, j] / 2)
* (1 - (self._roads.straight_rate * self._roads.can_go_to_west[i, j]))
)
fromW_toS = EW_flow_flag * (
(1 - self._roads.can_go_to_north[i, j] / 2)
* (1 - (self._roads.straight_rate * self._roads.can_go_to_east[i, j]))
)
# 東方向に向かう車数を計算するための行列要素
fromN_toE = NS_flow_flag * (
(1 - self._roads.can_go_to_west[i, j] / 2)
* (1 - (self._roads.straight_rate * self._roads.can_go_to_south[i, j]))
)
fromS_toE = NS_flow_flag * (
(1 - self._roads.can_go_to_west[i, j] / 2)
* (1 - (self._roads.straight_rate * self._roads.can_go_to_north[i, j]))
)
fromW_toE = EW_flow_flag * (
(signal[i, j] ** 2) * (self._roads.straight_rate - 1) + 1
)
# 西方向に向かう車数を計算するための行列要素
fromN_toW = NS_flow_flag * (
(1 - self._roads.can_go_to_east[i, j] / 2)
* (1 - (self._roads.straight_rate * self._roads.can_go_to_south[i, j]))
)
fromS_toW = NS_flow_flag * (
(1 - self._roads.can_go_to_east[i, j] / 2)
* (1 - (self._roads.straight_rate * self._roads.can_go_to_north[i, j]))
)
fromE_toW = EW_flow_flag * (
(signal[i, j] ** 2) * (self._roads.straight_rate - 1) + 1
)
# 行列要素の計算結果をまとめる
# 交差点から出ていく道路が存在しない可能性もあるので、can_go_to_{north, south, east, west} をそれぞれかける
from_to_array[0, 1] = fromS_toN * self._roads.can_go_to_north[i, j]
from_to_array[0, 2] = fromE_toN * self._roads.can_go_to_north[i, j]
from_to_array[0, 3] = fromW_toN * self._roads.can_go_to_north[i, j]
from_to_array[1, 0] = fromN_toS * self._roads.can_go_to_south[i, j]
from_to_array[1, 2] = fromE_toS * self._roads.can_go_to_south[i, j]
from_to_array[1, 3] = fromW_toS * self._roads.can_go_to_south[i, j]
from_to_array[2, 0] = fromN_toE * self._roads.can_go_to_east[i, j]
from_to_array[2, 1] = fromS_toE * self._roads.can_go_to_east[i, j]
from_to_array[2, 3] = fromW_toE * self._roads.can_go_to_east[i, j]
from_to_array[3, 0] = fromN_toW * self._roads.can_go_to_west[i, j]
from_to_array[3, 1] = fromS_toW * self._roads.can_go_to_west[i, j]
from_to_array[3, 2] = fromE_toW * self._roads.can_go_to_west[i, j]
# 交差点から出ていく車数の計算
to_car_array = (from_to_array * from_car_array).sum(axis=1)
# temp_num_car を更新(流入分)
temp_num_car_from_north[i + 1, j] += to_car_array[1]
temp_num_car_from_south[i - 1, j] += to_car_array[0]
temp_num_car_from_east[i, j - 1] += to_car_array[3]
temp_num_car_from_west[i, j + 1] += to_car_array[2]
# temp_num_car を更新(流出分)
# 信号機の状態によって流出量が変わる
temp_num_car_from_north[i, j] -= NS_flow_flag * from_car_array[0]
temp_num_car_from_south[i, j] -= NS_flow_flag * from_car_array[1]
temp_num_car_from_east[i, j] -= EW_flow_flag * from_car_array[2]
temp_num_car_from_west[i, j] -= EW_flow_flag * from_car_array[3]
return (
temp_num_car_from_north,
temp_num_car_from_south,
temp_num_car_from_east,
temp_num_car_from_west,
)
# 計算結果をもとに車数を更新する
def update_num_car(self, signal: np.ndarray):
(
self._num_car_from_north,
self._num_car_from_south,
self._num_car_from_east,
self._num_car_from_west,
) = self._next_step_num_car(signal)
# 現在の東西方向からの車数と東西方向からの車数の差を計算
def current_car_bias(self) -> np.ndarray:
car_bias = (
(self._num_car_from_north + self._num_car_from_south)
- (self._num_car_from_east + self._num_car_from_west)
) / 2
return car_bias
# コスト関数の car_bias_cost の計算のために次の時刻の東西方向と南北方向の車数の差を計算
def car_bias_model(self, signal: IsingPolyArray) -> IsingPolyArray:
# イジング変数が含まれた状態の次の時刻の車数を計算
(
temp_num_car_from_north,
temp_num_car_from_south,
temp_num_car_from_east,
temp_num_car_from_west,
) = self._next_step_num_car(signal)
# 計算した次の時刻の車数を用いて東西方向と南北方向の車数の差を計算
next_car_bias = (
(temp_num_car_from_north + temp_num_car_from_south)
- (temp_num_car_from_east + temp_num_car_from_west)
) / 2
return next_car_bias
# コスト関数の car_flow_cost のための流れる車数を計算
def car_flow(self, signal: IsingPolyArray) -> IsingPoly:
# 流れる車数
car_flow = 0
# 各交差点について
for i, j in itertools.product(
range(1, self._roads.num_road + 1), range(1, self._roads.num_road + 1)
):
# 信号機が存在するとき
if signal[i, j] != 0:
# 南北方向が青の場合
num_car_north_and_south = min(self._num_car_from_north[i, j], 1) + min(
self._num_car_from_south[i, j], 1
)
# 東西方向が青の場合
num_car_east_and_west = min(self._num_car_from_east[i, j], 1) + min(
self._num_car_from_west[i, j], 1
)
# 信号機の状態によって南北方向が流れるか東西方向が流れるかが決まる
# イジング変数が含まれた状態の流れる車数を計算
car_flow += (signal[i, j] + 1) / 2 * num_car_north_and_south - (
signal[i, j] - 1
) / 2 * num_car_east_and_west
# 信号機が存在しないとき
# 角もしくは曲がれない交差点の場合
else:
car_flow += (
min(self._num_car_from_north[i, j], 1)
* self._roads.can_come_from_north[i, j]
+ min(self._num_car_from_south[i, j], 1)
* self._roads.can_come_from_south[i, j]
+ min(self._num_car_from_east[i, j], 1)
* self._roads.can_come_from_east[i, j]
+ min(self._num_car_from_west[i, j], 1)
* self._roads.can_come_from_west[i, j]
)
return car_flow
class IsingVariable:
def __init__(self, roads: Roads):
# イジング変数 sigma の初期化
gen = IsingSymbolGenerator()
self._variables = gen.array(roads.num_road + 2, roads.num_road + 2)
self._variables[0, :] = 0
self._variables[roads.num_road + 1, :] = 0
self._variables[:, 0] = 0
self._variables[:, roads.num_road + 1] = 0
# 各交差点について
# 信号機がない場合について考える
for i, j in itertools.product(
range(1, roads.num_road + 1), range(1, roads.num_road + 1)
):
# 角のとき
if (
(roads.can_come_from_north[i, j] == 1)
^ (roads.can_come_from_south[i, j] == 1)
) and (
(roads.can_come_from_east[i, j] == 1)
^ (roads.can_come_from_west[i, j] == 1)
):
self._variables[i, j] = 0
# 直進しかないとき
if (
abs(
(roads.can_come_from_north[i, j] + roads.can_come_from_south[i, j])
- (roads.can_come_from_east[i, j] + roads.can_come_from_west[i, j])
)
== 2
):
self._variables[i, j] = 0
# 道路がないとき
if (
roads.can_come_from_north[i, j]
+ roads.can_come_from_south[i, j]
+ roads.can_come_from_east[i, j]
+ roads.can_come_from_west[i, j]
== 0
):
self._variables[i, j] = 0
@property
def variables(self):
return self._variables
import numpy as np
import itertools
import random
class City:
def __init__(self, num_road: int):
self.num_road = num_road
self.can_come_from_north = np.zeros((num_road + 2, num_road + 2))
self.can_come_from_south = np.zeros((num_road + 2, num_road + 2))
self.can_come_from_east = np.zeros((num_road + 2, num_road + 2))
self.can_come_from_west = np.zeros((num_road + 2, num_road + 2))
self.num_car_from_north = np.zeros((num_road + 2, num_road + 2))
self.num_car_from_south = np.zeros((num_road + 2, num_road + 2))
self.num_car_from_east = np.zeros((num_road + 2, num_road + 2))
self.num_car_from_west = np.zeros((num_road + 2, num_road + 2))
# 乱数を用いて車数を決定する
def set_num_cars(self, variance: float, average: float):
random.seed(314)
self.num_car_from_north = (
np.random.lognormal(
average, variance, (self.num_road + 2, self.num_road + 2)
)
* self.can_come_from_north
)
self.num_car_from_south = (
np.random.lognormal(
average, variance, (self.num_road + 2, self.num_road + 2)
)
* self.can_come_from_south
)
self.num_car_from_east = (
np.random.lognormal(
average, variance, (self.num_road + 2, self.num_road + 2)
)
* self.can_come_from_east
)
self.num_car_from_west = (
np.random.lognormal(
average, variance, (self.num_road + 2, self.num_road + 2)
)
* self.can_come_from_west
)
def Grid(num_road: int, variance: float, average: float):
grid = City(num_road)
# 各交差点にどの方角から進入できるか
# このコードでは格子状
for i, j in itertools.product(range(1, num_road + 1), range(1, num_road + 1)):
if i == 1:
grid.can_come_from_north[i, j] = 0
else:
grid.can_come_from_north[i, j] = 1
if i == num_road:
grid.can_come_from_south[i, j] = 0
else:
grid.can_come_from_south[i, j] = 1
if j == num_road:
grid.can_come_from_east[i, j] = 0
else:
grid.can_come_from_east[i, j] = 1
if j == 1:
grid.can_come_from_west[i, j] = 0
else:
grid.can_come_from_west[i, j] = 1
# 与えられた分散と平均から車数を決定する
grid.set_num_cars(variance, average)
return grid
3.7. シミュレーションに関するクラス Simulation
¶
Simulation
クラスでは、step()
で 1
ステップ分の信号機の最適制御とそれに基づく車数の更新を行います。また、plot_traffic()
で道路網・信号機・車数のプロットを行います。結果確認用の画像生成も本クラスで実施します。
class Simulation:
def __init__(
self,
city: City,
straight_rate: float,
car_bias_weight: float,
car_flow_weight: float,
signal_weight: float,
fig,
ax,
):
# 与えられた道路網と車数を用いて全てのクラスを初期化
init_can_come_from = [
city.can_come_from_north,
city.can_come_from_south,
city.can_come_from_east,
city.can_come_from_west,
]
init_cars = [
city.num_car_from_north,
city.num_car_from_south,
city.num_car_from_east,
city.num_car_from_west,
]
num_road = city.num_road
self._roads = Roads(num_road, init_can_come_from, straight_rate)
self._signals = Signals(self._roads, signal_weight)
self._cars = Traffic(self._roads, init_cars, car_bias_weight, car_flow_weight)
self._ising = IsingVariable(self._roads)
self.num_step = 0
self.fig = fig
self.ax = ax
# 道路網と車数のプロット
def plot_traffic(self) -> plt.figure:
# self.fig = plt.figure(
# figsize=(self._roads.num_road * 1.5, self._roads.num_road * 1.5)
# )
# self.ax = self.fig.add_subplot(111)
self.fig.set_size_inches(self._roads.num_road, self._roads.num_road)
self.ax.clear()
self.ax.set_xlim(0, self._roads.num_road + 1)
self.ax.set_ylim(0, self._roads.num_road + 1)
for i, j in itertools.product(
range(1, self._roads.num_road + 1), range(1, self._roads.num_road + 1)
):
x = j
y = self._roads.num_road + 1 - i
# 道路
if self._roads.can_come_from_north[i, j] == 1:
self.ax.vlines(x, y, y + 1, color="black", zorder=1) # north
if self._roads.can_come_from_south[i, j] == 1:
self.ax.vlines(x, y - 1, y, color="black", zorder=1) # south
if self._roads.can_come_from_east[i, j] == 1:
self.ax.hlines(y, x, x + 1, color="black", zorder=1) # east
if self._roads.can_come_from_west[i, j] == 1:
self.ax.hlines(y, x - 1, x, color="black", zorder=1) # west
current_car_bias = self.get_car_bias()
# 信号について
if self._signals.current_signal[i, j] != 0:
if self._signals.current_signal[i, j] == 1: # 南北方向が青
self.ax.plot(
x,
y,
marker="$⇅$",
color="blue",
markersize=abs(current_car_bias[i, j]) * 20,
zorder=2,
)
else: # 東西方向が赤
self.ax.plot(
x,
y,
marker="$⇆$",
color="red",
markersize=abs(current_car_bias[i, j]) * 20,
zorder=2,
)
fontsize = 8
# 車数について
if self._roads.can_come_from_north[i, j] == 1:
self.ax.text(
x + 0.1,
y + 0.5,
"↓" + str(round(self._cars.num_car_from_north[i, j], sigfigs=1)),
ha="left",
va="center",
size=fontsize,
)
if self._roads.can_come_from_south[i, j] == 1:
self.ax.text(
x - 0.1,
y - 0.5,
str(round(self._cars.num_car_from_south[i, j], sigfigs=1)) + "↑",
ha="right",
va="center",
size=fontsize,
)
if self._roads.can_come_from_east[i, j] == 1:
self.ax.text(
x + 0.5,
y - 0.1,
"← " + str(round(self._cars.num_car_from_east[i, j], sigfigs=1)),
ha="center",
va="top",
size=fontsize,
)
if self._roads.can_come_from_west[i, j] == 1:
self.ax.text(
x - 0.5,
y + 0.1,
str(round(self._cars.num_car_from_west[i, j], sigfigs=1)) + " →",
ha="center",
va="bottom",
size=fontsize,
)
# タイトル・説明
self.ax.set_title(f"t = {self.num_step}")
[
self.ax.spines[side].set_visible(False)
for side in ["right", "left", "top", "bottom"]
]
self.ax.tick_params(
labelbottom=False,
labelleft=False,
labelright=False,
labeltop=False,
bottom=False,
left=False,
right=False,
top=False,
)
return None
# コスト関数の計算
# 車数の偏りについて
def calc_car_bias_cost(self) -> IsingPoly:
# 次の時刻の車数の東西方向と南北方向の車数の差をイジング変数が含まれた状態で計算
next_car_bias = self._cars.car_bias_model(self._ising.variables)
# 二乗して全ての交差点について足し合わせる
car_bias_cost = (next_car_bias * next_car_bias).sum()
return car_bias_cost
# 信号機の状態について
def calc_signal_cost(self) -> IsingPoly:
# 前の時刻の信号機の状態との差を取る
signal_cost = sum_poly(
((self._ising.variables - self._signals.pre_signal) / 2) ** 2
)
return signal_cost
# 流れる車数について
def calc_car_flow_cost(self) -> IsingPoly:
# 流れる車数をイジング変数が含まれた状態で計算し、マイナスを取る
car_flow_cost = -self._cars.car_flow(self._ising.variables)
return car_flow_cost
# コスト関数の項をすべて足し合わせる
def calc_cost(self) -> IsingPoly:
car_bias_cost = self.calc_car_bias_cost()
signal_cost = self.calc_signal_cost()
car_flow_cost = self.calc_car_flow_cost()
# それぞれの項に重みをかける
cost = (
self._cars.car_bias_weight * car_bias_cost
+ self._signals.signal_weight * signal_cost
+ self._cars.car_flow_weight * car_flow_cost
)
return cost
# 1ステップ分実行
def step(self):
# コスト関数の計算
cost = self.calc_cost()
solver = Solver(client)
# コスト関数をソルバーに与えて求解を実行
result = solver.solve(cost)
if len(result) == 0:
raise RuntimeError("Any one of constraints is not satisfied.")
values = result[0].values
# 求解結果をもとに信号機の状態を更新
self._signals.current_signal = self._ising.variables.decode(values)
# 求解結果をもとに車数を更新
self._cars.update_num_car(self._signals.current_signal)
# ひとつ前の信号を記憶
self._signals.pre_signal = np.copy(self._signals.current_signal)
self.num_step += 1
def get_car_bias(self):
return self._cars.current_car_bias()
3.8. main
関数¶
main
関数では、以下のことを行います。
-
都市の各パラメータの初期値の決定
-
コスト関数の重みの設定
-
所望のステップ分、信号機最適制御を実施した場合の交通シミュレーションの実行
# ステップ数
num_step = 50
# 道路数、初期車数の分散・平均
num_road = 10
init_variance = 0.5 # 0~1が妥当
init_average = 0
# 都市の初期化
city = Grid(num_road, init_variance, init_average)
# 各交差点において直進する割合
straight_rate = 0.6
# コスト関数の重み(各コストの値のオーダーがおよそ同様になる様に決定)
car_bias_weight = 2.0
car_flow_weight = 1.0
signal_weight = 1.0
fig, ax = plt.subplots()
sim = Simulation(
city,
straight_rate,
car_bias_weight,
car_flow_weight,
signal_weight,
fig,
ax,
)
# シミュレーションの実行関数
def update(i: int):
sim.plot_traffic()
sim.step()
# シミュレーションの実施とともにアニメーションの表示
anim = FuncAnimation(fig, update, frames=num_step, repeat=False)
4. まとめ¶
本サンプルコードでは、単純化された都市交通に対して、組合せ最適化に基づくリアルタイムな信号機制御を導入し、そのような制御を行った場合の交通シミュレーションを実施しました。単純化には、全ての交差点において、直進・右左折する車の割合を定数に置く、という仮定がありますが、過去の交通データに基づき、本割合を各交差点ごと・各時刻ごとに設定にすることで、より現実的な交通における信号機制御が可能になると考えられます。
本サンプルコードでは、刻一刻と変化する交通状態に応じたリアルタイムな信号機制御を実施しましたが、過去の交通状況の観測データ(又はシミュレーション結果)に基づき、統計的に最適な信号機制御をブラックボックス最適化を用いて実現することも可能です。そのようなブラックボックス最適化に関するサンプルコードはこちらに紹介しています。