from amplify import VariableGenerator, solve, one_hot, sum
import matplotlib.pyplot as plt
import sys
# FMQA for non-binary inputs
class FMQA_NB:
def __init__(self, D, N, N0, k, true_func, client, encoder_decoder) -> None:
assert N0 < N
self.D = D
self.N = N
self.N0 = N0
self.k = k
self.true_func = true_func
self.client = client
self.nbins_array = encoder_decoder.nbins_array
self.encoder = encoder_decoder.encoder
self.decoder = encoder_decoder.decoder
self.y = None
# A member function that repeatedly performs (N-N0)x FMQA based on the training data with adding new training data
def cycle(self, X, y, log=False) -> np.ndarray:
# Weight for the one-hot constraints
constraint_weight = max([1, int(np.abs(y).max() + 0.5)])
print("")
print("###################################################")
print(f" Starting FMQA cycles... {constraint_weight=}")
print("###################################################")
pred_x = X[0]
pred_y = 1e18
for i in range(self.N - self.N0):
print(f"FMQA Cycle #{i} ")
try:
x_hat = self.step(X, y, constraint_weight)
except RuntimeError:
sys.exit(f"Unknown error, i = {i}")
# If an input value identical to the found x_hat already exists in the current training data set, a neighboring value is used as the new x_hat.
is_identical = True
while is_identical:
is_identical = False
for j in range(i + self.N0):
if np.all(x_hat == X[j, :]):
# Decode binary vector to integer vector and copy to inputss
inputs = self.decoder(x_hat)
# Get the surrounding values (inputs is an integer vector)
inputs += np.random.randint(-1, 2, len(inputs))
for i_inp in range(len(inputs)):
if inputs[i_inp] < 0:
inputs[i_inp] = 0
elif inputs[i_inp] > self.nbins_array[i_inp] - 1:
inputs[i_inp] = self.nbins_array[i_inp] - 1
# Encode from integer vector to binary vector and copy to x_hat
x_hat = self.encoder(inputs)
if log:
print(f"{i=}, Identical x is found, {x_hat=}")
is_identical = True
break
# Evaluate objective function f() with x_hat
y_hat = self.true_func(x_hat)
# Add the input-output pair [x_hat, y_hat] to the training data set
X = np.vstack((X, x_hat))
y = np.append(y, y_hat)
# Copy the input-output pair to [pred_x, pred_y] when the evaluated value of the objective function updates the minimum value
if pred_y > y_hat:
pred_y = y_hat
pred_x = x_hat
print(f"variable updated, {pred_y=}")
else:
print("")
print(f"------------------------")
self.y = y
return pred_x
# Member function to perform one FMQA cycle
def step(self, X, y, constraint_weight) -> np.ndarray:
# Train FM
model = train(
X,
y,
model_class=TorchFM,
model_params={"d": self.D, "k": self.k},
batch_size=8,
epochs=2000,
criterion=nn.MSELoss(),
optimizer_class=torch.optim.AdamW,
# Set scheduler to reduce learning rate with number of epochs
opt_params={"lr": 0.5},
lr_sche_class=torch.optim.lr_scheduler.StepLR,
lr_sche_params={"step_size": 50, "gamma": 0.9},
)
# Extract FM parameters from the trained FM model
v, w, w0 = list(model.parameters())
v = v.detach().numpy()
w = w.detach().numpy()[0]
w0 = w0.detach().numpy()[0]
# Solve a QUBO problem using a quantum annealing or Ising machine
gen = VariableGenerator() # Declare a variable generator
q = gen.array("Binary", self.D) # Generate binary decision variables
cost = self.__FM_as_QUBO(q, w0, w, v) # Define FM as a QUBO equation
# Construct one-hot constraints for non-binary variables
constraint_list = []
ista = 0
iend = 0
for i in range(len(self.nbins_array)):
iend += self.nbins_array[i]
constraint_list.append(one_hot(q[ista:iend]))
ista = iend
constraints = sum(constraint_list)
# Add up the objective function and constraints, and pass it to the Fixstars Amplify solver
model = cost + constraint_weight * constraints
result = solve(model, self.client) # Pass the objective function to Amplify
if len(result.solutions) == 0:
raise RuntimeError("No solution was found.")
q_values = q.evaluate(result.best.values)
return q_values
# A function that defines FM as a QUBO equation from FM parameters. As with the previously defined TorchFM class, the formula is written as per the acquisition function form of g(x).
def __FM_as_QUBO(self, x, w0, w, v):
lin = w0 + (x.T @ w)
out_1 = np.array([(x * v[:, i]).sum() ** 2 for i in range(self.k)]).sum()
# Note that x[j] = x[j]^2 since x[j] is a binary variable in the following equation.
out_2 = np.array([(x * v[:, i] * v[:, i]).sum() for i in range(self.k)]).sum()
return lin + (out_1 - out_2) / 2
"""The sum_poly used in __FM_as_QUBO above is inefficient in terms of computation speed and
memory. In the case of FM, where the interaction terms of the decision variables are generally
nonzero, the following implementation using BinaryMatrix is more efficient. Here, the quadratic
terms in BinaryMatrix correspond to the non-diagonal terms represented by the upper triangular
matrix, so x(1/2) for the quadratic terms in the FM formula is unnecessary. Also, although x is
taken as an argument just to match the function signature with __FM_as_QUBO above (implementation
using sum_poly), it is not needed in this implementation using BinaryMatrix.
def __FM_as_QUBO(self, x, w0, w, v):
out_1_matrix = v @ v.T
out_2_matrix = np.diag((v * v).sum(axis=1))
matrix = BinaryMatrix(out_1_matrix - out_2_matrix + np.diag(w))
# Do not forget to put the constant term w0 in the second argument of BinaryQuadraticModel.
model = BinaryQuadraticModel(matrix, w0)
return model
"""
# A function to plot the history of i-th objective function evaluations performed within the initial training data construction (blue) and during FMQA cycles (red).
def plot_history(self):
assert self.y is not None
fig = plt.figure(figsize=(6, 4))
plt.plot(
[i for i in range(self.N0)],
self.y[: self.N0],
marker="o",
linestyle="-",
color="b",
) # Objective function evaluation values at the time of initial training data generation (random process)
plt.plot(
[i for i in range(self.N0, self.N)],
self.y[self.N0 :],
marker="o",
linestyle="-",
color="r",
) # Objective function evaluation values during the FMQA cycles (FMQA cycle process)
plt.xlabel("i-th evaluation of f(x)", fontsize=18)
plt.ylabel("f(x)", fontsize=18)
plt.tick_params(labelsize=18)
return fig