from amplify import VariableGenerator, solve, one_hot, sum
import matplotlib.pyplot as plt
import sys
import time
# FMQA for non-binary inputs
class FMQA_NB:
def __init__(self, D, N, N0, k, true_func, client, encoder_decoder) -> None:
assert N0 < N
self.D = D
self.N = N
self.N0 = N0
self.k = k
self.true_func = true_func
self.client = client
self.nbins_array = encoder_decoder.nbins_array
self.encoder = encoder_decoder.encoder
self.decoder = encoder_decoder.decoder
self.y = None
# A member function that repeatedly performs (N-N0)x FMQA based on the training data with adding new training data
def cycle(self, X, y, log: bool = False) -> np.ndarray:
# Weight for one-hot constraint
constraint_weight = max([1, int(np.abs(y).max() + 0.5)])
print("")
print("###################################################")
print(f" Starting FMQA cycles... {constraint_weight=}")
print("###################################################")
pred_x = X[0]
pred_y = 1e18
self.y = y
i_sta = 0
for i in range(self.N - self.N0):
print(f"FMQA Cycle #{i} ")
start_time = time.perf_counter()
try:
x_hat = self.step(X, y, constraint_weight)
except RuntimeError:
sys.exit(f"Unknown error, i = {i}")
# If an input value identical to the found x_hat already exists in the current training data set, a neighboring value is used as the new x_hat.
is_identical = True
while is_identical:
is_identical = False
for j in range(i + self.N0):
if np.all(x_hat == X[j, :]):
# Decode binary vector to integer vector and copy to inputs
inputs = self.decoder(x_hat)
# Get the surrounding values (inputs is an integer vector)
inputs += np.random.randint(-1, 2, len(inputs))
for i_inp in range(len(inputs)):
if inputs[i_inp] < 0:
inputs[i_inp] = 0
elif inputs[i_inp] > self.nbins_array[i_inp] - 1:
inputs[i_inp] = self.nbins_array[i_inp] - 1
# Encode from integer vector to binary vector and copy to x_hat
x_hat = self.encoder(inputs)
if log:
print(f"{i=}, Identical x is found, {x_hat=}")
is_identical = True
break
# Evaluate the objective function f() with x_hat
y_hat = self.true_func(x_hat)
# Add the input-output pair [x_hat, y_hat] to the training data set
X = np.vstack((X, x_hat))
y = np.append(y, y_hat)
self.y = np.append(self.y, y_hat)
# Copy the input-output pair to [pred_x, pred_y] when the evaluated value of the objective function updates the minimum value
print(
f"FMQA Cycle #{i} finished in {time.perf_counter() - start_time:.2f}s",
end="",
)
if pred_y > y_hat:
pred_y = y_hat
pred_x = x_hat
print(f", variable updated, {pred_y=:.2f}")
else:
print()
print(f"------------------------")
return pred_x
# Member function to perform one FMQA cycle
def step(self, X, y, constraint_weight) -> np.ndarray:
# Train FM
start_time = time.perf_counter()
model, corrcoef = train(
X,
y,
model_class=TorchFM,
model_params={"d": self.D, "k": self.k},
batch_size=8,
epochs=1000,
criterion=nn.MSELoss(),
optimizer_class=torch.optim.AdamW,
# Set scheduler to reduce learning rate with number of epochs
opt_params={"lr": 0.5},
lr_sche_class=torch.optim.lr_scheduler.StepLR,
lr_sche_params={"step_size": 50, "gamma": 0.9},
)
print(
f"FM Training: {time.perf_counter() - start_time:.2f}s (corr:{corrcoef:.2f})"
)
start_time = time.perf_counter()
# Extract FM parameters from the trained FM model
v, w, w0 = list(model.parameters())
v = v.detach().numpy()
w = w.detach().numpy()[0]
w0 = w0.detach().numpy()[0]
gen = VariableGenerator() # Instantiate Variable Generator
q = gen.array("Binary", self.D) # Generate a binary variable array
cost = self.__FM_as_QUBO(
q, w0, w, v
) # Define FM as a QUBO equation from FM parameters
# Construct one-hot constraints for non-binary variables
constraint_list = []
ista = 0
iend = 0
for i in range(len(self.nbins_array)):
iend += self.nbins_array[i]
constraint_list.append(one_hot(q[ista:iend]))
ista = iend
constraints = sum(constraint_list)
# Add up the objective function and constraints, and pass it to the Fixstars Amplify solver
model = cost + constraint_weight * constraints
result = solve(model, self.client)
print(
f"QUBO: {time.perf_counter() - start_time:.2f}s (AE execution: {result.execution_time.seconds:.2f}s)"
)
if len(result.solutions) == 0:
raise RuntimeError("No solution was found.")
q_values = q.evaluate(result.best.values)
return q_values
# A function that defines FM as a QUBO equation from FM parameters. As with the previously defined TorchFM class, the formula is written as per the acquisition function form of g(x).
def __FM_as_QUBO(self, x, w0, w, v):
lin = w0 + (x.T @ w)
out_1 = np.array([(x * v[:, i]).sum() ** 2 for i in range(self.k)]).sum()
# Note that x[j] = x[j]^2 since x[j] is a binary variable in the following equation.
out_2 = np.array([(x * v[:, i] * v[:, i]).sum() for i in range(self.k)]).sum()
return lin + (out_1 - out_2) / 2
# A function to plot the history of i-th objective function evaluations performed within the initial training data construction (blue) and during FMQA cycles (red).
def plot_history(self):
assert self.y is not None
fig = plt.figure(figsize=(6, 4))
plt.plot(
[i for i in range(self.N0)],
self.y[: self.N0],
marker="o",
linestyle="-",
color="b",
) # Objective function evaluation values at the time of initial training data generation (random process)
plt.plot(
[i for i in range(self.N0, self.N)],
self.y[self.N0 :],
marker="o",
linestyle="-",
color="r",
) # Objective function evaluation values during the FMQA cycles (FMQA cycle process)
plt.plot(
[i for i in range(self.N)],
[self.y[:i].min() for i in range(1, self.N + 1)],
linestyle="-",
color="k",
) # Update history of minimum objective function values
plt.xlabel("i-th evaluation of f(x)", fontsize=18)
plt.ylabel("f(x)", fontsize=18)
plt.tick_params(labelsize=18)
return fig