配送スケジューリング¶
イントロダクション¶
Fixstars Amplify Scheduling Engine (Amplify SE) は、トラックによる荷物輸送やタクシーによる旅客送迎などといった、配車の計画の最適化にも有用です。ここでは、Vehicle Routing Problem (VRP) に似た典型的な配送計画の立案を Amplify SE を用いて実施します。
問題設定¶
VRP とは、複数の車(vehicle)がスタート地点から予め与えられた複数の地点を重複や漏れなくめぐり、最後に再びスタート地点に戻るようなルートの中で 、最も総移動距離が短いルートを探索する問題です。
Amplify SE を用いた配送計画立案¶
Amplify SE のオブジェクトとの対応¶
Amplify SE を使って配送計画問題を解くために、配送計画問題の問題設定をどのように Amplify SE のオブジェクトに落とし込めばよいか考えましょう。
それぞれの車が複数の地点を順番に訪れて何らかの処理を行う、という状況なので、車をマシン、地点をジョブと考えることができます。すると、地点 A から地点 B への移動時間は「ジョブ A からジョブ B
に切り替える時間」と見なすことができ、setup_times
として扱えます。
ここで、Amplify SE では、全「ジョブ」が完了する時間であるメイクスパンの最小化を目的として最適化を行うため、トータルの移動距離最小化を目指す VRP とは厳密には異なることに注意してください。しかし、往々にして、メイクスパン最小化を実現する解は、VRP における目的である総移動距離も小さくなる傾向にあると考えられます。
要素 | Amplify SE の言葉 |
---|---|
車 | マシン |
地点 | ジョブ |
移動時間 | setup_times
|
実装¶
今回使うモジュールは以下の 5 つです。
# ! pip install networkx amplify_sched -q # Google Colab 場合、こちらのコメントアウトを外し、amplify_sched 等の必要ライブラリをインストールしてください。
from amplify_sched import *
import numpy as np
import itertools
import matplotlib.pyplot as plt
import networkx as nx
問題設定¶
まず、車と地点の情報を与え、地点間の距離を設定します。今回は、各地点の座標を乱数で生成することにします。また、各車のスタート地点を「ホーム」と呼ぶことにし、ホームの座標は (0, 0) であるとします。
# 乱数シードの固定
np.random.seed(1)
# 車の情報
num_vehicles = 3 # 台数
vehicles = [f"車{i + 1}" for i in range(num_vehicles)] # ラベル
# 地点の情報
num_locations = 20 # 地点数
locations = [f"地点{i + 1}" for i in range(num_locations)] # ラベル
# 各地点の座標を-50から50の間の乱数で与える(最後の1つはホームを表す)
pos_locations = np.append(100 * np.random.rand(num_locations, 2) - 50, [[0, 0]], axis=0)
# 地点間の距離(SEに渡すため、小数点以下を切り上げて整数にする)
distances = np.ceil(
np.array(
[
[max(np.linalg.norm(r2 - r1), 1) for r2 in pos_locations]
for r1 in pos_locations
]
)
)
確認のため、各地点を可視化してみましょう。
「0」と書かれた黄色い丸がホームを、その他の灰色の丸が各訪問地点を表します。
def visualize_locations():
# グラフオブジェクト
G = nx.Graph()
# 各地点をノードとして設定
for i in range(num_locations):
G.add_node(i + 1, pos=pos_locations[i], color="silver")
# ホームをノードとして設定
G.add_node(0, pos=(0, 0), color="gold")
pos = nx.get_node_attributes(G, "pos") # ノードの座標
node_color = [node["color"] for node in G.nodes.values()] # ノードの色
nx.draw_networkx(G, pos, node_color=node_color) # グラフの描画
plt.grid()
plt.show()
visualize_locations()
# モデルオブジェクト
model = Model()
# 地点ジョブの設定
for i in range(num_locations):
name = locations[i]
model.jobs.add(name)
model.jobs[name].append(Task())
Step 2¶
各車のホームへの帰還をジョブとして設定します(これを「帰還ジョブ」と呼ぶことにします)。
SE の仕様上、同じ名前のジョブを複数設定することはできないので、「帰還(車の名前)」を名前に持つ帰還ジョブを車ご とに用意します。
帰還ジョブは全ての地点ジョブの完了後に処理される必要があります。これを dependent_jobs
で表します。
# 帰還ジョブの設定
for v in vehicles:
name = f"帰還({v})" # 車vの帰還ジョブ
model.jobs.add(name)
model.jobs[name].append(Task())
# 全ての地点ジョブの完了後に処理される
for l in locations:
model.jobs[name].dependent_jobs.append(l)
Step 3¶
各車のホームからの出発をジョブとして設定します(これを「出発ジョブ」と呼ぶことにします)。
帰還ジョブと同様、出発ジョブも車ごとに用意します。
全ての地点ジョブおよび帰還ジョブは出発ジョブの完了後に処理される必要があります。
# 出発ジョブの設定
for v in vehicles:
name = f"出発({v})"
model.jobs.add(name)
model.jobs[name].append(Task())
# 地点ジョブより先に処理される
for l in locations:
model.jobs[l].dependent_jobs.append(name)
# 帰還ジョブより先に処理される
for v2 in vehicles:
model.jobs[f"帰還({v2})"].dependent_jobs.append(name)
# 車の情報
for v in vehicles:
model.machines.add(v)
# 地点間の距離をsetup_timeで与える
for j0, j1 in itertools.product(range(num_locations), range(num_locations)):
model.machines[v].setup_times.append(
(int(distances[j0, j1]), locations[j0], locations[j1])
)
# ホームとの距離をsetup_timeで与える
for j in range(num_locations):
model.machines[v].setup_times.append(
(int(distances[-1, j]), f"出発({v})", locations[j])
)
model.machines[v].setup_times.append(
(int(distances[-1, j]), locations[j], f"帰還({v})")
)
Step 5¶
地点ジョブ、帰還ジョブ、出発ジョブの処理時間をprocessing_times
として与えます。
ここではジョブの処理時間は常に 1 であるとします。
地点ジョブについては、各車が任意の地点に訪問できるので、全ての車に対して処理時間を設定します。
一方、出発・帰還ジョブについては名前に対応した車にのみ処理時間を設定します。
# ジョブの処理時間の設定
for j in model.jobs:
# 出発ジョブ
if j.name.startswith("出発"):
j[0].processing_times[j.name.strip("出発()")] = 1
# 帰還ジョブ
elif j.name.startswith("帰還"):
j[0].processing_times[j.name.strip("帰還()")] = 1
# 地点ジョブ
else:
for v in vehicles:
j[0].processing_times[v] = 1
token = "" # ローカル環境等で使用する場合は、Amplify SE のアクセストークンを入力してください。
result = model.solve(token=token, timeout=10) # ご自身のトークンを入れてください
result.status
結果¶
得られた結果を可視化してみましょう。
まず、オプション引数を与えずにresult.timeline()
を実行します。
ガントチャートの横軸は時刻で、各時刻において各地点がどの車に訪問されたかが見て取れます。
fig = result.timeline(width=1000, height=700)
fig.show()
次に、オプション引数としてmachine_view=True
を与えてガントチャートを生成しましょう。
今度は、各時刻で各車がどの地点を訪問したかが分かるようになっています。
fig = result.timeline(machine_view=True)
fig.show()
最後に、得られた結果をより見やすくするため、グラフとして可視化しましょう。
result.table
に解の情報が含まれているので、これを使ってグラフを描画します。
車 1, 2, 3 が訪問する地点がそれぞれ赤、青、緑で、ホームが黄色のノードで表されています。
def visualize_result(solution):
# 色の設定:車1->赤、車2->青、車3->緑
colors = {"車1": "lightcoral", "車2": "lightblue", "車3": "lightgreen"}
G = nx.Graph() # グラフオブジェクト
G.add_node(0, pos=(0, 0), color="gold") # ホーム
# tableを"Start"キーの値でソートする
df = solution.table.sort_values("Start")
for car in vehicles:
# dfから車の名前がcarであるものを取り出す
df_tmp = df[df["Machine"] == car]
# carが訪問する地点のリスト
jobs = df_tmp["Job"].values
# ホームと訪問地点をedgesに格納
edges = [0]
for i in range(1, len(jobs) - 1):
site = int(jobs[i].strip("地点"))
G.add_node(site, pos=pos_locations[site - 1], color=colors[car])
edges.append(site)
edges.append(0)
# edgesをグラフのパスとして追加
nx.add_path(G, edges, color=colors[car])
# 位置、ノードの色、エッジの色を抽出してグラフを描画
pos = nx.get_node_attributes(G, "pos")
node_color = [node["color"] for node in G.nodes.values()]
edge_color = [edge["color"] for edge in G.edges.values()]
nx.draw_networkx(G, pos, node_color=node_color, edge_color=edge_color)
plt.grid()
plt.show()
visualize_result(result)
今回説明したコードを 1 つにまとめると、以下のようになります。
from amplify_sched import *
import numpy as np
import itertools
import matplotlib.pyplot as plt
import networkx as nx
def visualize_result(solution):
# 色の設定:車1->赤、車2->青、車3->緑
colors = {"車1": "lightcoral", "車2": "lightblue", "車3": "lightgreen"}
G = nx.Graph() # グラフオブジェクト
G.add_node(0, pos=(0, 0), color="gold") # ホーム
# tableを"Start"キーの値でソートする
df = solution.table.sort_values("Start")
for car in vehicles:
# dfから車の名前がcarであるものを取り出す
df_tmp = df[df["Machine"] == car]
# carが訪問する地点のリスト
jobs = df_tmp["Job"].values
# ホームと訪問地点をedgesに格納
edges = [0]
for i in range(1, len(jobs) - 1):
site = int(jobs[i].strip("地点"))
G.add_node(site, pos=pos_locations[site - 1], color=colors[car])
edges.append(site)
edges.append(0)
# edgesをグラフのパスとして追加
nx.add_path(G, edges, color=colors[car])
# 位置、ノードの色、エッジの色を抽出してグラフを描画
pos = nx.get_node_attributes(G, "pos")
node_color = [node["color"] for node in G.nodes.values()]
edge_color = [edge["color"] for edge in G.edges.values()]
nx.draw_networkx(G, pos, node_color=node_color, edge_color=edge_color)
plt.grid()
plt.show()
def schedule_vrp():
# 乱数シードの固定
np.random.seed(1)
# 車の情報
num_vehicles = 3 # 台数
vehicles = [f"車{i + 1}" for i in range(num_vehicles)] # ラベル
# 地点の情報
num_locations = 20 # 地点数
locations = [f"地点{i + 1}" for i in range(num_locations)] # ラベル
# 各地点の座標を-50から50の間の乱数で与える(最後の1つはホームを表す)
pos_locations = np.append(
100 * np.random.rand(num_locations, 2) - 50, [[0, 0]], axis=0
)
# 地点間の距離(SEに渡すため、小数点以下を切り上げて整数にする)
distances = np.ceil(
np.array(
[
[max(np.linalg.norm(r2 - r1), 1) for r2 in pos_locations]
for r1 in pos_locations
]
)
)
# モデルオブジェクト
model = Model()
# 地点ジョブの設定
for i in range(num_locations):
name = locations[i]
model.jobs.add(name)
model.jobs[name].append(Task())
# 帰還ジョブの設定
for v in vehicles:
name = f"帰還({v})" # 車vの帰還ジョブ
model.jobs.add(name)
model.jobs[name].append(Task())
# 全ての地点ジョブの完了後に処理される
for l in locations:
model.jobs[name].dependent_jobs.append(l)
# 出発ジョブの設定
for v in vehicles:
name = f"出発({v})"
model.jobs.add(name)
model.jobs[name].append(Task())
# 地点ジョブより先に処理される
for l in locations:
model.jobs[l].dependent_jobs.append(name)
# 帰還ジョブより先に処理される
for v2 in vehicles:
model.jobs[f"帰還({v2})"].dependent_jobs.append(name)
# 車の情報
for v in vehicles:
model.machines.add(v)
# 地点間の距離をsetup_timeで与える
for j0, j1 in itertools.product(range(num_locations), range(num_locations)):
model.machines[v].setup_times.append(
(int(distances[j0, j1]), locations[j0], locations[j1])
)
# ホームとの距離をsetup_timeで与える
for j in range(num_locations):
model.machines[v].setup_times.append(
(int(distances[-1, j]), f"出発({v})", locations[j])
)
model.machines[v].setup_times.append(
(int(distances[-1, j]), locations[j], f"帰還({v})")
)
# ジョブの処理時間の設定
for j in model.jobs:
# 出発ジョブ
if j.name.startswith("出発"):
j[0].processing_times[j.name.strip("出発()")] = 1
# 帰還ジョブ
elif j.name.startswith("帰還"):
j[0].processing_times[j.name.strip("帰還()")] = 1
# 地点ジョブ
else:
for v in vehicles:
j[0].processing_times[v] = 1
token = "" # ローカル環境等で使用する場合は、Amplify SE のアクセストークンを入力してください。
result = model.solve(token=token, timeout=10) # ご自身のトークンを入れてください
print(result.status)
result.timeline()
visualize_result(result)
schedule_vrp()