class Traffic:
def __init__(
self,
roads: Roads,
init_cars: list[np.ndarray],
car_bias_weight: float,
car_flow_weight: float,
):
self._roads = roads
# Number of cars heading to the intersection from each direction
self._num_car_from_north = init_cars[0]
self._num_car_from_south = init_cars[1]
self._num_car_from_east = init_cars[2]
self._num_car_from_west = init_cars[3]
# Weight for the cost function related to the car flow
self._car_flow_weight = car_flow_weight
# Weight for the cost function related to the car bias
self._car_bias_weight = car_bias_weight
@property
def num_car_from_north(self):
return self._num_car_from_north
@num_car_from_north.setter
def num_car_from_north(self, value):
self._num_car_from_north = value
@property
def num_car_from_south(self):
return self._num_car_from_south
@num_car_from_south.setter
def num_car_from_south(self, value):
self._num_car_from_south = value
@property
def num_car_from_east(self):
return self._num_car_from_east
@num_car_from_east.setter
def num_car_from_east(self, value):
self._num_car_from_east = value
@property
def num_car_from_west(self):
return self._num_car_from_west
@num_car_from_west.setter
def num_car_from_west(self, value):
self._num_car_from_west = value
@property
def car_flow_weight(self):
return self._car_flow_weight
@property
def car_bias_weight(self):
return self._car_bias_weight
# Calculating the number of cars at next time step
def _next_step_num_car(self, signal: Union[np.ndarray, PolyArray]) -> Union[
Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray],
Tuple[PolyArray, PolyArray, PolyArray, PolyArray],
]:
# Definition of variables to store the number of cases at next time step
if isinstance(signal, PolyArray):
temp_num_car_from_north = PolyArray(self._num_car_from_north)
temp_num_car_from_south = PolyArray(self._num_car_from_south)
temp_num_car_from_east = PolyArray(self._num_car_from_east)
temp_num_car_from_west = PolyArray(self._num_car_from_west)
else:
temp_num_car_from_north = self._num_car_from_north.copy()
temp_num_car_from_south = self._num_car_from_south.copy()
temp_num_car_from_east = self._num_car_from_east.copy()
temp_num_car_from_west = self._num_car_from_west.copy()
# At each intersection
for i, j in itertools.product(
range(1, self._roads.num_road + 1), range(1, self._roads.num_road + 1)
):
# Array for the number of cases entering from each direction
from_car_array = np.zeros(4)
from_car_array[0] = min(self._num_car_from_north[i, j], 1)
from_car_array[1] = min(self._num_car_from_south[i, j], 1)
from_car_array[2] = min(self._num_car_from_east[i, j], 1)
from_car_array[3] = min(self._num_car_from_west[i, j], 1)
# If the signal is green for north-south direction
NS_flow_flag = (-signal[i, j] * (signal[i, j] - 1)) / 2 + 1
# If the signal is green for west-east direction
EW_flow_flag = (-signal[i, j] * (signal[i, j] + 1)) / 2 + 1
# The matrix A_s_j to compute the number of cars outgoing from the intersection based on the number of cars heading to the intersection
if isinstance(signal, PolyArray):
from_to_array = PolyArray(np.zeros((4, 4)))
else:
from_to_array = np.zeros((4, 4))
# A factor to compute the number of cars heading towards north
fromS_toN = NS_flow_flag * (
(signal[i, j] ** 2) * (self._roads.straight_rate - 1) + 1
)
fromE_toN = EW_flow_flag * (
(1 - self._roads.can_go_to_south[i, j] / 2)
* (1 - (self._roads.straight_rate * self._roads.can_go_to_west[i, j]))
)
fromW_toN = EW_flow_flag * (
(1 - self._roads.can_go_to_south[i, j] / 2)
* (1 - (self._roads.straight_rate * self._roads.can_go_to_east[i, j]))
)
# A factor to compute the number of cars heading towards south
fromN_toS = NS_flow_flag * (
(signal[i, j] ** 2) * (self._roads.straight_rate - 1) + 1
)
fromE_toS = EW_flow_flag * (
(1 - self._roads.can_go_to_north[i, j] / 2)
* (1 - (self._roads.straight_rate * self._roads.can_go_to_west[i, j]))
)
fromW_toS = EW_flow_flag * (
(1 - self._roads.can_go_to_north[i, j] / 2)
* (1 - (self._roads.straight_rate * self._roads.can_go_to_east[i, j]))
)
# A factor to compute the number of cars heading towards east
fromN_toE = NS_flow_flag * (
(1 - self._roads.can_go_to_west[i, j] / 2)
* (1 - (self._roads.straight_rate * self._roads.can_go_to_south[i, j]))
)
fromS_toE = NS_flow_flag * (
(1 - self._roads.can_go_to_west[i, j] / 2)
* (1 - (self._roads.straight_rate * self._roads.can_go_to_north[i, j]))
)
fromW_toE = EW_flow_flag * (
(signal[i, j] ** 2) * (self._roads.straight_rate - 1) + 1
)
# A factor to compute the number of cars heading towards west
fromN_toW = NS_flow_flag * (
(1 - self._roads.can_go_to_east[i, j] / 2)
* (1 - (self._roads.straight_rate * self._roads.can_go_to_south[i, j]))
)
fromS_toW = NS_flow_flag * (
(1 - self._roads.can_go_to_east[i, j] / 2)
* (1 - (self._roads.straight_rate * self._roads.can_go_to_north[i, j]))
)
fromE_toW = EW_flow_flag * (
(signal[i, j] ** 2) * (self._roads.straight_rate - 1) + 1
)
# The array A_s_j to compute the number of cars outgoing from the intersection based on the number of cars heading to the intersection
# We multiply can_go_to_{north, south, east, west} as there might not be a road towards that direction from the intersection
from_to_array[0, 1] = fromS_toN * self._roads.can_go_to_north[i, j]
from_to_array[0, 2] = fromE_toN * self._roads.can_go_to_north[i, j]
from_to_array[0, 3] = fromW_toN * self._roads.can_go_to_north[i, j]
from_to_array[1, 0] = fromN_toS * self._roads.can_go_to_south[i, j]
from_to_array[1, 2] = fromE_toS * self._roads.can_go_to_south[i, j]
from_to_array[1, 3] = fromW_toS * self._roads.can_go_to_south[i, j]
from_to_array[2, 0] = fromN_toE * self._roads.can_go_to_east[i, j]
from_to_array[2, 1] = fromS_toE * self._roads.can_go_to_east[i, j]
from_to_array[2, 3] = fromW_toE * self._roads.can_go_to_east[i, j]
from_to_array[3, 0] = fromN_toW * self._roads.can_go_to_west[i, j]
from_to_array[3, 1] = fromS_toW * self._roads.can_go_to_west[i, j]
from_to_array[3, 2] = fromE_toW * self._roads.can_go_to_west[i, j]
# The number of cars outgoing from the intersection
to_car_array = (from_to_array * from_car_array).sum(axis=1)
# Update "temp_num_car", which is inflow of cars
temp_num_car_from_north[i + 1, j] += to_car_array[1]
temp_num_car_from_south[i - 1, j] += to_car_array[0]
temp_num_car_from_east[i, j - 1] += to_car_array[3]
temp_num_car_from_west[i, j + 1] += to_car_array[2]
# Update "temp_num_car", which is outflow of cars
# The outflow amount changes depending on the state of signal
temp_num_car_from_north[i, j] -= NS_flow_flag * from_car_array[0]
temp_num_car_from_south[i, j] -= NS_flow_flag * from_car_array[1]
temp_num_car_from_east[i, j] -= EW_flow_flag * from_car_array[2]
temp_num_car_from_west[i, j] -= EW_flow_flag * from_car_array[3]
return (
temp_num_car_from_north,
temp_num_car_from_south,
temp_num_car_from_east,
temp_num_car_from_west,
)
# Update the number of cars based on the results
def update_num_car(self, signal: np.ndarray):
(
self._num_car_from_north,
self._num_car_from_south,
self._num_car_from_east,
self._num_car_from_west,
) = self._next_step_num_car(signal)
# Compute the difference between the number of cars from east/west and from north/south
def current_car_bias(self) -> np.ndarray:
car_bias = (
(self._num_car_from_north + self._num_car_from_south)
- (self._num_car_from_east + self._num_car_from_west)
) / 2
return car_bias
# Compute the difference between the number of cars from east/west and from north/south at the next time step for the calculation of "car_bias_cost"
def car_bias_model(self, signal: PolyArray) -> PolyArray:
# Compute the number of cars in the next time step
(
temp_num_car_from_north,
temp_num_car_from_south,
temp_num_car_from_east,
temp_num_car_from_west,
) = self._next_step_num_car(signal)
# Compute the difference between the number of cars from east/west and from north/south based on the number of cars at the next time step
next_car_bias = (
(temp_num_car_from_north + temp_num_car_from_south)
- (temp_num_car_from_east + temp_num_car_from_west)
) / 2
return next_car_bias
# Compute the flow of the cars for "car_flow_cost"
def car_flow(self, signal: PolyArray) -> PolyArray:
# Initialize "car_flow"
car_flow = 0
# At each intersection
for i, j in itertools.product(
range(1, self._roads.num_road + 1), range(1, self._roads.num_road + 1)
):
# If there is a signal
if signal[i, j] != 0:
# If green in north/south direction
num_car_north_and_south = min(self._num_car_from_north[i, j], 1) + min(
self._num_car_from_south[i, j], 1
)
# If green in east/west direction
num_car_east_and_west = min(self._num_car_from_east[i, j], 1) + min(
self._num_car_from_west[i, j], 1
)
# The state of signal determines whether cars move in north/south direction or east/west direction
car_flow += (signal[i, j] + 1) / 2 * num_car_north_and_south - (
signal[i, j] - 1
) / 2 * num_car_east_and_west
# If there is no signal
# Or an intersection where cars cannot turn
else:
car_flow += (
min(self._num_car_from_north[i, j], 1)
* self._roads.can_come_from_north[i, j]
+ min(self._num_car_from_south[i, j], 1)
* self._roads.can_come_from_south[i, j]
+ min(self._num_car_from_east[i, j], 1)
* self._roads.can_come_from_east[i, j]
+ min(self._num_car_from_west[i, j], 1)
* self._roads.can_come_from_west[i, j]
)
return car_flow