このチュートリアルではポートフォリオ最適化を扱います。ポートフォリオとは株や債券などの金融商品の組み合わせのことです。金融商品に投資を行うことはリスクを伴いつつ確実な利益を狙うことと言えます。したがって、なるべくリスク抑えるために分散投資をしつつ利益の期待値を最大化するポートフォリオの最適化が必要になります。
今回は、過去の価格を格納したデータを用いて、リスクを抑えつつ、利益を最大化する平均分散モデルによってポートフォリオを求めます。
まず、利益とリスクを下記に定義します。
上記の二つの目的を同時に達成するための定式化に必要な定数・変数を定義します。
$n$: 金融商品の総数、 $R_i$: 金融商品$i$の収益率、 $K$: 最大資産口数
$w_i $: 金融商品$i$に投資する口数を表す非負変数
この問題では、リスクを抑えつつ、利益を最大化することを目的関数として定式化します。
期待収益率を最大化する式は、$E(\cdot)$を期待値として、 \begin{align*} \max_{\bf w} E_{\bf w}(R) \Leftrightarrow \max_{\bf w} \sum_{i=1}^n w_i E(R_i) \end{align*} と表されます。
ポートフォリオ収益の分散の最小化は、$Cov(\cdot, \cdot)$を共分散として、
\begin{align*} \min_{\bf w} V_{\bf w}(R) = \min_{\bf w} \sum_{i=1}^n \sum_{j=1}^n w_i w_j Cov(R_i, R_j) \end{align*}と表されます。
これら期待収益率の最大化とポートフォリオ収益の分散の最小化を目的関数とします。ポートフォリオ収益の分散を考慮する非負のパラメータを$\gamma $として、
\begin{align*} \max_{\bf w} E_{\bf w}(R) - \frac{\gamma}{2} V_{\bf w}(R) \end{align*}として目的関数ができました。$\gamma=0$であれば、リスクを無視した利益追求型のポートフォリオが出力されることが期待されます。
最大で投資できる資産の口数は$K$であるため、各金融商品$i$に対する投資口数$w_i$の総和が$K$である必要があるため、制約式として表現します。
\begin{align*} \sum_{i=1}^n w_i = K \end{align*}ここまで、非負変数$w_i$を用いて記述してましたが、アニーリングマシンに入力するためには、非負変数をバイナリ変数で表現する必要があります。ここでは、非負変数をバイナリ変数で表現す方法を紹介します。
一般的には、非負変数$w_i$に対して、$D$次元バイナリベクトル${\bf x}_i$が存在し、 $$ w_i = \sum_{d=1}^D f(d) x_{id} $$ と表現できる関数$f$を定めます。$f$を定め方として次のUnary法が知られています。
Unary法では、$f$を次のように定めます。 $$ f(d)=1 $$
Unary法は最も単純な方法です。特性として、非負変数の最大値(定式化中の$K$)と同じだけのバイナリ変数が必要となり、変数が多くなる点が挙げられます。
このサンプルコードでは、Unary法を採用して実装を行います。
以上より、ポートフォリオ最適化問題は以下のように定式化されます。
\begin{matrix} \max & \displaystyle E_{\bf w}(R) - \frac{\gamma}{2} V_{\bf w}(R) \\ {\text{s.t}} & \displaystyle \sum_{i=1}^n w_i = K \\ & \displaystyle w_i = \sum_{d=1}^D f(d) x_{id}\\ & \displaystyle x_{id} \in \{0, 1\} \end{matrix}株価の時系列データは pandas-datareader
[2] モジュールを使用して取得します。
先程の定式化で使用している金融商品$i$の期待収益率$E(R_i)$と、金融商品$i,j$を同時に購入した際のポートフォリオ収益の分散$Cov(R_i, R_j)$を推定する方法について、ここで説明します。
ここでは、推定方法の1つである、ヒストリカルデータ方式について紹介[3]します。
ヒストリカルデータ方式は、過去の価格データの平均値などを用いて資産の期待収益率を求める推定方法です。ある資産の価格データが、過去のデータと同じ値動きをする条件下では、信頼できる期待収益率の推定方法です。一方、過去データに外れ値が含まれていると、期待収益率の推定値の信頼性が低くなる可能性があります。
今回のサンプルコードではヒストリカルデータ方式を採用して、期待収益率とポートフォリオ収益の共分散を推定します。
株価データを読み込む関数を作成します。 今回使用するデータは次のようなcsvファイルとして事前に取得しています。 データの粒度は日単位です。
date
: 日付High
: 高値Low
: 安値Open
: 始値Close
: 終値Volume
: ボリューム(取引数量)Adj Close
: 調整後終値をカラムとしてデータが格納されています。
今回は、日単位での取引を前提にポートフォリオ最適化の入力データを作成します。日単位でのポートフォリオを作成するために、
ある日における株の値段を高値と安値の平均値とします。以下の関数は、.csv
ファイルのあるディレクトリのroot
を指定して、それ以下に存在する.csv
ファイルを読み込み、毎日の値動きをnumpy.array
に格納して返します。.csv
ファイルの列挙にはglob
を用いており、.csv
ファイルの読み込みにはpandas
を使用しています。
import os
import glob
import pandas as pd
import numpy as np
def read_csv_data_yahoo(root):
"""
Parameters
----------
root: str
データのあるディレクトリ名
Returns
-------
asset: np.array
銘柄と期間中のkeyの変動を格納した行列, shape is (num of brands, num of dates)
"""
asset = list()
paths = glob.glob(f"{root}/*.csv")
for path in paths:
df = pd.read_csv(
path, index_col="Date", infer_datetime_format=True
) # showするならここに挿入
df.index = pd.to_datetime(df.index)
high_value = df["High"].to_numpy().reshape(1, -1)
low_value = df["Low"].to_numpy().reshape(1, -1)
# highとlowの平均値
value = (high_value + low_value) * 0.5
asset.append(value)
dates = df.index.to_pydatetime() # pandas.Timestamp → datetime
asset = np.concatenate(asset, axis=0)
names = [os.path.basename(path).replace(".csv", "") for path in paths]
return asset, names, dates
ヒストリカルデータ方式では、過去データにおける収益率の平均を期待収益率として推定します。ここでは、具体的に期待収益率とリスク分散の計算方法について説明します。
ある金融商品$i$について、$N$日間の過去の値動きデータ$x_0, \cdots, x_{N-1}$が得られたとします。このとき$N$日以降の$D$日間のポートフォリオを考えます。
まず、$D$日間の時系列データの組を作成します。
これらの組それぞれにおける収益率$\mu_{i, d}$を計算します。
$$ \mu_{i,d} = \frac{x_d - x_{d+D-1}}{x_d} ~ d\in \{0, \cdots, N-D\} $$各組で計算された収益率の平均を、この金融商品$i$における期待収益率$E(R_i)$とします。 $$ E(R_i) = \frac{1}{N-D+1} \sum_{d=0}^{N-D}\mu_{i, d} $$
また、$E(R_i)$を用いて銘柄$i, j$に関する共分散行列$Cov(R_i, R_j)$は、次のように求められます。 $$ Cov(R_i, R_j) = \frac{1}{N-D+1}\sum_{d=0}^{N-D}\left(\mu_{i, d}-E(R_i))(\mu_{j, d} - E(R_j)\right) $$
実装では、$D$日間の収益率をリストに格納し、numpy.array
として銘柄で結合(numpy.concatenate
)します。期待収益率の計算では、銘柄ごとに収益率の平均を計算(numpy.mean
)し、銘柄間の分散共分散は、numpy.cov
を用いて計算しています。
def historical_data_method(asset, D):
"""ヒストリカルデータ方式による期待収益率・分散の計算
Parameters
----------
asset: np.ndarray
各銘柄の過去の価格値
D: int
投資して回収するまでの期間
Returns
-------
expected_rate_of_return: array
shape is (num of brands, N - D)
covariance_rate_of_return; array
shape is (num of brands, num of brands)
"""
assert isinstance(D, int)
_, N = asset.shape
# N - Dまで欲しいので + 1
list_rate_of_return_per_term = list()
for j in range(N - D + 1):
start = j
end = j + D - 1
rate_of_return = np.divide(
asset[:, end] - asset[:, start], asset[:, start]
).reshape(-1, 1)
list_rate_of_return_per_term.append(rate_of_return)
rate_of_return_per_term = np.concatenate(list_rate_of_return_per_term, axis=1)
# 各銘柄に対する期待収益率
expected_rate_of_return = np.mean(rate_of_return_per_term, axis=1)
# 銘柄間の共分散
covariance_rate_of_return = np.cov(rate_of_return_per_term)
return expected_rate_of_return, covariance_rate_of_return
定式化中の$K$と投資してから回収するまでの期間$D$日を定めます。ここでは回収するまでの期間を一週間($D=7$)とします。 また、定式化中のリスク分散を考慮するパラメータ$\gamma$を定めます。
その後、過去の価格データを格納しているcsvからデータを読み込み、ヒストリカルデータ方式で各銘柄の期待収益率とリスク分散を計算します。
## 最大投資口数 K
K = 10
## 回収期間D
D = 7
## 共分散を考慮するパラメータ
gamma = 20
# storage以下のデータの読み込み
asset, names, _ = read_csv_data_yahoo(root="../../../storage/portfolio")
num_brand, num_date = asset.shape
# ヒストリカルデータ方式による期待収益率とリスク分散の計算
expected_rate_of_return, variance_rate_of_return = historical_data_method(asset, D)
まず、銘柄$i$の投資口数を表す非負変数$w_i$を作成します。アニーリングマシンに入力するためには、非負変数をバイナリ変数で表現するために、 Unary法を用いて$w_i$を表現します。
$$ w_i = \sum_{d=1}^K x_{id} $$まず、BinarySymbolGenerator
クラスを用いて、銘柄$i$に対して$K$次元バイナリベクトル${\bf
x}_i$を用意します。(num_brand
$\times$K
)の行列をイメージして頂ければわかりやすいと思います。
その後、銘柄$i$に対して$K$次元ベクトル${\bf x }_i$の全ての要素を足し合わせることで、非負変数$w_i$を配列として作成します。
from amplify import BinarySymbolGenerator
# 変数の定義
x = BinarySymbolGenerator().array(num_brand, K)
# Unary法による非負整数wのバイナリ変数xによる表現
w = x.sum(axis=1)
次に目的関数を定義します。 目的関数は、期待収益率の最大化かつポートフォリオ収益の分の最小化です。ポートフォリオ収益の分を考慮する非負パラメータを$\gamma$として、 \begin{align*} &\max_{ {\bf w} \in \mathbf{Z}^n_{\geq 0}} E_{\bf w}(R) - \frac{\gamma}{2} V_{\bf w}(R) = \sum_{i=1}^n w_i E(R_i) - \frac{\gamma}{2} \sum_{i=1}^n \sum_{j=1}^n w_i w_j Cov(R_i, R_j) \end{align*} と定式化されています。
ここでは、einsum
関数を用いて最もシンプルな形式で数式を記述しています。einsum
関数の詳細については こちら
を参照してください。数式と einsum
関数の引数を見比べると理解しやすいと思います。
最後に最大化問題を最小化問題に変換するため $(-1)$ 倍しています。
from amplify import einsum
# 目的関数の定義
def setObjective(w, num_brand, expected_rate_of_return, variance_rate_of_return, gamma):
"""目的関数を定義
----------
w: BinaryPolyArray
各銘柄の購入口数を表す非負変数
num_brand: int
ポートフォリオを組む銘柄数
expected_rate_of_return: np.ndarray
期待収益率
variance_rate_of_return: np.ndarray
リスク分散
gamma: float
期待リスクの重要度を表すパラメータ
Notes
-----
期待収益率の最大化と期待リスクの最小化
期待収益率: E_w(R)
期待リスク: V_w(R)
目的関数 = E_w(R) - gamma / 2 * V_w(R)
"""
# 期待収益率の定義
profit = einsum("i,i->", w, expected_rate_of_return)
# 分散の定義
risk = einsum("i,j,ij->", w, w, variance_rate_of_return)
# 目的関数の定義
objective = -profit + gamma * 0.5 * risk
return objective, profit, risk
次に制約式を実装します。各金融商品$i$に対する投資口数$w_i$の総和が$K$であるという制約が必要です。
\begin{align*} \sum_{i=1}^n w_i = K \end{align*}等式制約であるため equal_to
を使用して、引数に上式の左辺を表す配列と右辺を与えて制約条件オブジェクトを作成します。
# 制約条件の定義
from amplify.constraint import equal_to
def setConstraint(w, K, num_brand):
"""制約式を定義
Parameters
----------
w: BinaryPoly
各銘柄の購入口数を表す非負変数
K: int
全投資資産口数
num_brand: int
ポートフォリオを組む銘柄数
Notes
-----
全投資資産はK口だけ
\sum_{i=1}^n w_i = K
"""
constraint = equal_to(w, K)
return constraint
上記の目的関数と制約条件を足し合わせ、QUBOモデルを構築します。足し合わせる際に、制約条件の強さを表す係数priority
をかけます。
# ===================
# 目的関数と制約式の構築
# ===================
objective, _, _ = setObjective(
w, num_brand, expected_rate_of_return, variance_rate_of_return, gamma
)
constraint = setConstraint(w, K, num_brand)
# 制約式の強さを表す係数
priority = 0.05
model = objective + priority * constraint
イジングマシンのクライアントをFixstarsClient
に設定、ソルバーを作成して以下のように問題を解きます。
from amplify import Solver
from amplify.client import FixstarsClient
client = FixstarsClient()
# ローカル環境では Amplify AEのアクセストークンを入力してください
# client.token = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
client.parameters.timeout = 10000
# ソルバーを定義して実行
solver = Solver(client)
result = solver.solve(model)
続いて、得られた解を確認していきます。
変数配列の decode
メソッドを用いて銘柄$i$に対する投資口数を取得します。
その後、得られた解を元に、投資口数の多い順に銘柄名と投資口数を pandas.DataFrame
を用いて出力します。
def showSortedDataFrame(names, w_values, return_rates=None):
"""
Parameters
----------
names: list
銘柄の名前
return_rates: np.ndarray
各銘柄の期待収益率
w_values: BinaryPolyArray
各銘柄の投資口数
"""
if return_rates is None:
df = pd.DataFrame(dict(brand_name=names, count=w_values.tolist()))
else:
df = pd.DataFrame(
dict(brand_name=names, return_rate=return_rates, count=w_values.tolist())
)
# 投資口数が多い順にソート
sorted_df = df.sort_values("count", ascending=False)
# 表示する際には投資口数0の銘柄を表示しない
display_df = sorted_df.query("count > 0")
display(display_df.reset_index(drop=True))
return display_df
for solution in result:
values = solution.values
# 各銘柄の投資口数が解である.
w_values = w.decode(result[0].values).astype(int)
# 投資口数の多い順に出力する
showSortedDataFrame(
names=names, return_rates=expected_rate_of_return, w_values=w_values
)
上記の例では期待収益率と共分散のバランスを表すパラメータとして gamma = 20
での実行例を示しましたが、gamma
は小さいほど期待収益率の最大化を優先し、大きいほど共分散が小さくなります。gamma
を変更し上記のコードを再実行することでこの様子を確認できます。
[1] 田中宗, et al. "量子アニーリングの基礎と応用事例の現状." 低温工学 53.5 (2018): 287-294.
最後に、ここまで実装してきたポートフォリオ最適化をシミュレーションします。
NASDAQ100を構成する銘柄の2021/2/1~2021/8/31までの日単位での価格。
最適化実施日を8/1~8/24とします。 最適化実施日それぞれにおいて、その前日までのデータを用いて、ヒストリカルデータ方式で期待収益率と分散を計算します。 その後、得られた期待収益率と分散をポートフォリオ最適化の入力として、最適化します。
投資してから回収するまでの期間を7日間とし、最適化実施日から7日後の価格と最適化実施日の価格を用いて収益率を計算します。 最適化結果と7日間の収益率を用いて実績の利益(actual profit)を計算します。
まず、データを管理するクラス(Dataloader)を定義します。
これまでは、得られているデータを全て読み込み、ヒストリカルデータ方式で期待収益率と分散を計算していました。
これ以降は、最適化実施日までのデータ用いて処理を行う必要があるので、日付を指定すればその期間のデータを得られるようにDataloaderを定義します。
ただし、NASDAQ100のデータが取得できない日が存在するので、その日付に関しては、スキップします。
その処理を表しているのが、getFromDate
メソッド内で使用されているSamplingError
です。
getFromDates
:
得たい期間の最初の日付(mindate
)から、最後の日付(maxdate
)を指定すれば、その期間のデータを返すメソッドです。# Dataloaderの定義
import datetime
from datetime import datetime as dt
import numpy as np
class SamplingError(Exception):
def __init__(self, *args, **kwargs):
pass
class DataLoader(object):
name = "dateloader"
def __init__(self, root: str = "../../../storage/portfolio", *args, **kwargs):
asset, names, dates = read_csv_data_yahoo(root=root)
self.asset = asset
self.names = names
self.dates = dates
def getFromDates(self, mindate: dt, maxdate: dt) -> np.array:
"""
Parameters
----------
mindate: datetime.datetime
データを取り出す最初の日付
maxdate: datetime.datetime
データを取り出す最後の日付
Notes
-----
np.arrayのスライシングとは異なり、最終日までデータを含めて取得します。
"""
min_index = self.getIndexFromDate(mindate)
max_index = self.getIndexFromDate(maxdate)
assert min_index <= max_index
return self.asset[:, min_index : max_index + 1] # +1で最数日まで含めてデータを渡す。
def getFromDate(self, date: dt) -> np.array:
index = self.getIndexFromDate(date)
return self.asset[:, index]
def getFromName(self, date: dt, name: str):
assert name in self.names
name_index = np.where((np.array(self.names) == name))[0].item()
date_index = self.getIndexFromDate(date)
return self.asset[name_index, date_index]
def getIndexFromDate(self, date) -> int:
assert isinstance(date, dt)
if not date in self.dates:
raise SamplingError
return np.where(self.dates == date)[0].item()
これまで定義してきた、目的関数を記述しているsetObjective
関数や、制約条件を記述しているsetConstraint
関数をもう一度定義すると処理の記述が長くなり、分かりにくくなるため、クラス化することで可読性を上げます。
# 問題クラスの作成
class Portfolio(object):
def __init__(self, D, K, gamma, date, dataloader, timeout, *args, **kwargs):
self.D = D
self.K = K
self.gamma = gamma
self.date = date
self.dataloader = dataloader
self.timeout = timeout
maximum_asset = K
mindate = dataloader.dates.min()
asset = dataloader.getFromDates(
mindate=mindate, maxdate=date - datetime.timedelta(days=1)
)
num_brand, num_date = asset.shape
names = dataloader.names
expected_rate_of_return, variance_rate_of_return = historical_data_method(
asset, D
)
self.num_brand = num_brand
self.num_date = num_date
self.names = names
self.expected_rate_of_return = expected_rate_of_return
self.variance_rate_of_return = variance_rate_of_return
def construct(self):
# 変数の定義
x = BinarySymbolGenerator().array(self.num_brand, self.K)
# Unary法による非負整数wのバイナリ変数xによる表現
w = x.sum(axis=1)
self.w = w
objective, profit, risk = self.setObjective()
self.profit, self.risk = profit, risk
constraint = self.setConstraint()
# 制約式の強さを表す係数
priority = 0.05
model = objective + priority * constraint
return model
def solve(self):
model = self.construct()
client = FixstarsClient()
client.parameters.timeout = self.timeout
solver = Solver(client)
result = solver.solve(model)
if len(result) == 0:
raise RuntimeError("The given constraints are not satisfied")
return result
def setObjective(self):
"""w, num_brand, expected_rate_of_return, variance_rate_of_return, gamma"""
return setObjective(
self.w,
self.num_brand,
self.expected_rate_of_return,
self.variance_rate_of_return,
self.gamma,
)
def setConstraint(self):
"""w, K, num_brand"""
return setConstraint(self.w, self.K, self.num_brand)
def eval(self, result):
"""Amplifyをもちいて解いた結果から評価を行う関数.
Parameters
----------
result: amplify.SolverResult
ソルバーから返された結果を格納している
"""
for solution in result:
energy = solution.energy
print(f"energy = {energy}")
# 各銘柄の投資口数が解である.
x_dict = result[0].values
w_values = self.w.decode(result[0].values).astype(int)
n = self.num_brand
# 分散投資の結果
display_df = showSortedDataFrame(names=self.names, w_values=w_values)
return display_df
今回の評価期間2021/8/1~2021/8/24をstart_date
とend_date
に格納します。
また、総資産K
や分散を考慮するパラメータgamma
、実行時間制限timeout
を設定します。繰り返しになりますが、gamma
を変更することで銘柄間の期待収益率の分散(相関)考慮具合を変更できます。
# パラメータの設定
K = 10
gamma = 20.0
timeout = 10000
start_date = datetime.datetime(2021, 8, 1)
end_date = datetime.datetime(2021, 8, 24)
# dataloaderのインスタンス化
dataloader = DataLoader(root="../../../storage/portfolio/longterm")
while
を用いて、評価期間全てで最適化を行います。
ただし、先程定義したSamplingError
を用いて、最適化実施日の7日後のデータが存在しなければ、その日に最適化を行わないようにしています。
lt_result
リストに、日付と最適化結果と期待収益率、分散を格納しています。
lt_result = list()
date = start_date
while date <= end_date:
print(date)
try:
dataloader.getFromDate(date=date + datetime.timedelta(days=D))
problem = Portfolio(
D=D, K=K, gamma=gamma, date=date, dataloader=dataloader, timeout=timeout
)
result = problem.solve()
df = problem.eval(result)
x_dict = result[0].values
predict_profit = (-1.0) * problem.profit.replace_all(x_dict)
predict_cov = problem.risk.replace_all(x_dict)
lt_result.append((date, df, predict_profit, predict_cov))
except KeyboardInterrupt:
exit(1)
except SamplingError:
print("skip")
date += datetime.timedelta(days=1)
postprocess
関数では、最適化結果を格納しているlt_result
リストを用いて、7日後に得られる利益を計算します。
得られた実績の利益を辞書dict_profit
に格納します。また、各最適化実施日の最適化結果から得られた期待収益率と分散もdict_profit
に格納します。
from collections import defaultdict
def postprocess(lt_result, D, dataloader):
dict_profit = defaultdict(list)
for date, df_result, predict_profit, predict_cov in lt_result:
date_execute = date
date_return = date + datetime.timedelta(D)
profit_per_day = 0.0
for name, num in zip(df_result["brand_name"], df_result["count"]):
start_value = dataloader.getFromName(date=date_execute, name=name)
end_value = dataloader.getFromName(date=date_return, name=name)
profit_rate = (end_value - start_value) / start_value
get_profit = profit_rate * num
profit_per_day += get_profit
for key, value in zip(
["date", "actual_profit", "predict_profit", "predict_cov"],
[date, profit_per_day, predict_profit, predict_cov],
):
dict_profit[key].append(value)
return dict_profit
各最適化実施日の実績収益率・期待収益率・分散を格納しているdict_profit
を用いて、利益を可視化します。
可視化方法は、それぞれ折れ線グラフを用いています。
import matplotlib.pyplot as plt
from matplotlib.dates import DateFormatter
dict_profit = postprocess(lt_result=lt_result, D=D, dataloader=dataloader)
dates = dict_profit["date"]
actual_profit = np.array(dict_profit["actual_profit"]) * 100
predict_profit = (-1) * np.array(dict_profit["predict_profit"]) * 100
predict_cov = np.array(dict_profit["predict_cov"]) * 100
figure_ = plt.figure(1)
axes_ = figure_.add_subplot(111)
xaxis_ = axes_.xaxis
xaxis_.set_major_formatter(DateFormatter("%Y-%m-%d"))
axes_.tick_params(axis="x", rotation=30)
# axes_.bar(x=list(range(len(left))), height=height, tick_label=left, color=color)
axes_.plot(dates, actual_profit, color="blue", marker="o", label="actual profit(%)")
axes_.plot(dates, predict_profit, color="red", marker=">", label="predict profit(%)")
axes_.plot(
dates, predict_cov, color="orange", marker="^", label="predict covariance(%)"
)
plt.title("Portfolio optimization using Amplify")
plt.legend()
plt.show()
plt.clf()
plt.close()
一定期間の実績データとこの可視化図を用いて、お好きなポートフォリオを作成することができます。今回のポートフォリオ最適化においては、
gamma
: 各銘柄間の期待収益率の分散(相関)を考慮するパラメータ。D
: 投資してから回収するまでの期間を表すパラメータ。K
: 投資口数を表すパラメータ。を変更しながら、上記の可視化図を用いてパラメータの検証ができます。