強化学習 n本腕バンディットにおけるε-グリーディ行動評価手法の性能比較を再現する
はじめに
会社の勉強会で強化学習を応用したマーケティング手法の発表を聞いて「強化学習が面白そうだ」と思い、私も勉強することにしました。
勉強している本は、Sutton and Barto著、三上・皆川訳の強化学習(原題: Reinforcement Learning: An Introduction)です。
2章を読み終えましたが、この章で主に扱われているn本腕バンディット(多腕バンディット)問題の理解が怪しいと感じたので、途中に出てくるシミュレーションを再現してみて、理解度をチェックしました。
なお、この記事のほとんどは上記の本を元にして書いていることを、予めお断りしておきます。
本腕バンディット問題の定義
本腕バンディット問題 (-armed bandit problem) とは、
種類の異なる行動選択肢があり、その中から1つを選択すると報酬がもらえる場合に、ある一定期間(または回数)の間に合計報酬の期待値を最大化する
問題のことです。
日本語では「多腕バンディット問題 (multi-armed bandit problem)」とも呼ばれていますが、ここでは本に習うことにします。
報酬は決まっているわけではなく、1回毎に選択した行動に依存する確率分布にしたがって決まります。
また、それぞれの行動選択をプレイ (play) と呼びます。
本腕バンディット問題の例
この問題を具体的にイメージするにはスロットマシンを考えるのが良いです。*1
ただし、普通にイメージするスロットマシンとは違い、ここでは下図のように本のレバーを持っている機械について考えます。
図のように、1つのレバーを引くことが1回の行動選択に該当し、報酬は当たりで得られる利益に相当します。
同じレバーを引いても報酬が毎回異なることに注意してください。
このような状況下で、プレイヤーはプレイを繰り返しながら最良のレバーを探し、賞金を最大にすることを目指します。
行動価値手法
行動価値手法を説明するに当たって使う数式の準備をします。
行動の真の価値をとして、番目のプレイにおけるの推定量を[tex: Q{t}(a)]とします。
また、回目のプレイにおいて、それまでの間に行動が[tex: k{a}]回選択されていたとして、それぞれの回での報酬を[tex: r{1}, r{2}, \dots, r{k{a}}]とします。
貪欲な手と探索的な手
強化学習は教師あり学習とは違って、教師データは与えられません。
そのため、獲得する報酬を最大にするためには、過去に得た各プレイに対する報酬から各行動の価値を推定して、価値が最も高い行動を選択していきます。
このように、最も価値が高い行動を選択していくことを貪欲な(グリーディ; greedy)手を打つといいます。
これを数式で表すと、
[tex: \displaystyle Q{t}(a^{*}) = \max{a} Q_{t}(a)]
となります。
これに対して、ランダムに行動を選ぶこともあります。
これは、そうしなければ遭遇しなかったような状態を生じさせるということから探索的な(explore)手と呼ばれます。
行動価値推定方法: 標本平均手法(sample-averaging method)
行動価値を評価する方法は様々ですが、今回は単純な標本平均手法を使います。
行動の価値を
[tex: \displaystyle Q{t}(a) = \frac{r{1} + r{2} + \dots + r{k{a}}}{k{a}}]
と推定します。
ただし、[tex: k{a} = 0]の場合は、任意の行動に対して[tex: Q{0}(a) = 0]とします。
行動選択方法: -グリーディ手法
グリーディな手を選択し続ければ報酬が最大化できるかと言えば、そうではありません。
報酬はランダムに決定されるので、例えば10回プレイした結果で価値が最大になる行動を推測したとしても、それが間違っていることがあり得るからです。
そのため、ある一定の確率で探索的な手を打つようにして、価値の推定値を改良することを狙います。これを-グリーディ手法といいます。
-グリーディ手法の性能評価シミュレーションを再現する
ここからはSutton and Barto著、三上・皆川訳(2000) 、強化学習のp.31の図2.1で行われた-グリーディ手法の性能評価を再現していきます。
目標にするグラフを転載します。
シミュレーションの設定
以下にシミュレーションの設定を示します。
使うアルゴリズムは10本腕バンディット(「本腕バンディット問題の例」を参照)です。
ここで、は平均と分散の正規分布を表します。
-グリーディ手法のは0, 0.01, 0.1の3つを試しています。
なお、はグリーディ手法のみになることに注意してください。
- 各行動の真の価値は標準正規分布で選ぶ。
- 報酬は各プレイ毎に選択した行動によってで決定する。
- -グリーディ手法で行動選択する。ただし、1回目のプレイの場合は探索的に行動を選択する。
- 選択した行動に応じて、2で決めた報酬を受け取る。
- 今まで受け取った報酬を元に、標本平均手法で行動価値推定を行う。
- 2〜5までの設定を1プレイとして、1000回プレイする。
- 1〜6までを1セットとして、2000回繰り返す。
- プレイ回数毎に2000回の平均値を算出する。
シミュレーションの環境
項目 | 値 |
---|---|
OS | Ubuntu 18.04.2 LTS |
CPU | Intel(R) Core(TM) i7-8086K CPU @ 4.00GHz |
メモリ | 64GB |
python | Python 3.6.8 :: Anaconda, Inc. |
シミュレーションを実装する
ここからは実際にシミュレーションをpythonで実装していきます。
作成するスクリプトは次の3つです。
- bandit.py
- evaluation.py
- n_armed_bandit_simulation.ipynb
1.のbandit.pyはシミュレーション本体で、この中にシミュレーションを書いていきます。
2.のevaluation.pyはbandit.pyで得た結果データをグラフで使えるように集計するモジュールを提供します。
3.のn_armed_bandit_simulation.ipynbでは、2をモジュールとして読み込み、実際にグラフを描画します。
コードはhttps://github.com/Katsuya-Ishiyama/rl_suttonにあります。
bandit.py
シミュレーションの実装はエージェント(実際にプレイする人)と環境(スロットマシン)に分けて行います。
これはSutton and Barto著、三上・皆川訳、強化学習 p.56の図3.1を参考にしたためです。
NArmedBanditEnvironmentクラス
このクラスの役割は
- 行動の真の価値を決める
- 1つの行動が実行されたら対応する報酬を返す
- 真の価値が最も高い行動を教える
です。
2番目の役割は、上図のエージェントから行動を受け取りと対応する報酬を返す部分を担っています。
図には環境の状態をエージェントに返す部分が存在しますが、今回の設定では環境の状態は変わらないため、実装していません。
また、3番目の役割は本来ならば必要ありませんが、シミュレーションで性能を評価する上で正解は必要なので実装しています。
BanditLoggerクラス
このクラスの役割は
- プレイ毎にプレイ回数、行動、報酬、探索的な手を打つ確率、最良な行動を保持する
- 保持しているデータをCSVに吐き出す
- 保持しているデータをプレイ回数または行動毎にまとめてエージェントに渡す
です。
NArmedBanditAgentクラス
このクラスの役割は
- 行動を決定する
- 与えられた環境でプレイする
- 環境から報酬を受け取る
- 受け取った報酬をBanditLoggerに登録する
- 受け取った報酬を元に行動価値を推定する
- BanditLoggerに記録されたデータをCSVに吐き出す(BanditLoggerをラップしたもの)
です。
インスタンス化するときにNArmedBanditEnvironmentインスタンスを受け取るようにしていますが、これは久保(2019)を参考にしました。
simulate_n_armed_bandit関数
シミュレーションの設定にある手順1〜8を1つにまとめた関数です。
設定を下表の引数で指定することができます。
引数名 | 意味 |
---|---|
arm | シミュテーションする行動の数 |
exploratory_rate | 探索的な手を打つ確率 |
play | シミュレーションするプレイ回数 |
iterations | シミュレーションの設定にある手順7の回数 |
main関数
実際にシミュレーションを実行します。
実際のコード
# -*- coding: utf-8 -*- import csv from datetime import datetime import logging import os from typing import List, Dict from tqdm import tqdm import numpy as np logger = logging.getLogger() class NArmedBanditEnvironment(object): def __init__(self, arm: int): self.arm = arm self.arm_list = list(range(1, arm+1)) self.true_action_values = None self.most_suitable_action_index = None def initialize(self): _this_class_name = __class__.__name__ logger.info("Initializing {}...".format(_this_class_name)) self._create_true_action_values() self._calculate_most_suitable_action_index() logger.info("{} has been initialized.".format(_this_class_name)) def _calculate_most_suitable_action_index(self): _action_index_list = list(self.true_action_values.keys()) _action_value_list = list(self.true_action_values.values()) _max_action_value_list_index = np.argmax(_action_value_list) self.most_suitable_action_index = _action_index_list[_max_action_value_list_index] logger.info('Most suitable action: {}'.format(self.most_suitable_action_index)) def _create_true_action_values(self): _true_action_values_list = np.random.normal(0, 1, self.arm).tolist() self.true_action_values = {i: v for i, v in zip(self.arm_list, _true_action_values_list)} logger.info("True action values: {}".format(self.true_action_values)) def create_action_values(self) -> List[float]: if self.true_action_values is None: raise Exception("Create true action values before you run this method.") _true_action_values = list(self.true_action_values.values()) _action_values_list = np.random.normal(_true_action_values, 1).tolist() _action_values = {i: v for i, v in zip(self.arm_list, _action_values_list)} logger.debug("Created action values: {}".format(_action_values)) return _action_values def run(self, arm: int): _selected_arm = arm _action_values = self.create_action_values() reward: float = _action_values[arm] logger.debug("Selected arm index: {}, Reward: {}".format(_selected_arm, reward)) return reward class BanditLogger(object): def __init__(self): self._logs = [] self.most_suitable_action = None def register(self, play_count, arm, reward, exploratory_rate): if self.most_suitable_action is None: logger.error('most_suitable_action is not set.') raise Exception('most_suitable_action is not set.') _log = { "play_count": play_count, "action": arm, "reward": reward, "exploratory_rate": exploratory_rate, "most_suitable_action": self.most_suitable_action } self._logs.append(_log) log_msg = 'Registered log. (Play count: {}, Action: {}, Reward: {}, Exploratory rate: {})' logger.debug(log_msg.format(play_count, arm, reward, exploratory_rate)) def write_logs_to_csv(self, path): with open(path, 'w') as f: fieldnames = ['play_count', 'action', 'reward', 'most_suitable_action', 'exploratory_rate'] csv_writer = csv.DictWriter(f, fieldnames=fieldnames) csv_writer.writeheader() csv_writer.writerows(self._logs) def get_rewards_by_action(self) -> Dict[int, List[float]]: _rewards_by_action: Dict[int, List[float]] = {} for _log in self._logs: _action: int = _log["action"] _reward: float = _log["reward"] _rewards_by_action.setdefault(_action, []) _rewards_by_action[_action].append(_reward) return _rewards_by_action def get_rewards_by_play_count(self) -> List[float]: _rewards_by_play_count = [] for _log in self._logs: _rewards_by_play_count.append(_log["reward"]) return _rewards_by_play_count class NArmedBanditAgent(object): def __init__(self, environment): self.environment = environment self.arm_list = environment.arm_list self.play_count = 0 self.play_log = BanditLogger() self.play_log.most_suitable_action = environment.most_suitable_action_index self.exploratory_rate = None def select_policy(self, eps=0.1): if (eps < 0) or (1 < eps): logger.error('eps cannot be handled.') raise ValueError('eps cannot be handled.') if eps == 0: logger.warning('The greedy policy will be selected at this time.') if eps == 1: logger.warning('The exploratory policy will be selected at this time.') is_exploratory = (np.random.uniform(0, 1) < eps) or (self.play_count == 1) if is_exploratory: _selected_action_index = self.select_policy_exploratory() else: _selected_action_index = self.select_policy_greedy() logger.info('Selected action: {}'.format(_selected_action_index)) return _selected_action_index def select_policy_greedy(self) -> int: _estimated_action_values = self.estimate_action_values() _estimated_action_values_list = list(_estimated_action_values.values()) _estimated_action_values_list_index = np.argmax(_estimated_action_values_list) _selected_action_value_index = self.arm_list[_estimated_action_values_list_index] logger.info('A greedy policy has been selected.') return _selected_action_value_index def select_policy_exploratory(self) -> int: _selected_action_value_index = np.random.choice(self.arm_list) logger.info('A exploratory policy has been selected.') return _selected_action_value_index def receive_reward(self, arm: int, reward: float): self.play_log.register(self.play_count, arm, reward, self.exploratory_rate) def estimate_action_values(self) -> Dict[int, float]: rewards = self.play_log.get_rewards_by_action() _estimated_action_values = {i: 0 for i in self.arm_list} for _arm, rewards_list in rewards.items(): _estimated_action_values[_arm] = np.mean(rewards_list) logger.info('Estimated action values: {}'.format(_estimated_action_values)) return _estimated_action_values def play(self, exploratory_rate=0.1): self.play_count += 1 logger.info('Play counts: {}'.format(self.play_count)) self.exploratory_rate = exploratory_rate logger.info('Exploratory rate: {}'.format(self.exploratory_rate)) selected_action = self.select_policy(self.exploratory_rate) reward = self.environment.run(selected_action) logger.info('Reward: {}'.format(reward)) self.receive_reward(selected_action, reward) def write_logs_to_csv(self, path): logger.info('Exporting logs... Path: {}'.format(path)) self.play_log.write_logs_to_csv(path) logger.info('Exporting logs has done. Path: {}'.format(path)) def simulate_n_armed_bandit(arm, exploratory_rate, play, iterations): bandit_env = NArmedBanditEnvironment(arm=arm) for simulation_num in tqdm(range(1, iterations+1)): logger.info('--------------------------------------------------') logger.info('Start simulation No.{}'.format(simulation_num)) logger.info('arm: {}, exploratory_rate: {}, play: {}, iterations: {}'.format(arm, exploratory_rate, play, iterations)) bandit_env.initialize() agent = NArmedBanditAgent(bandit_env) for _ in range(play): agent.play(exploratory_rate) filename = 'n_armed_bandit_arm{}_exploratory{}_simulation{}.csv'.format(arm, exploratory_rate, simulation_num) save_path = os.path.join('output', 'exploratory{}'.format(exploratory_rate), filename) agent.write_logs_to_csv(path=save_path) def main(): ARM_NUM = 10 MAX_PLAY_COUNT = 1000 SIMULATION_COUNT = 2000 simulate_n_armed_bandit( arm=ARM_NUM, exploratory_rate=0, play=MAX_PLAY_COUNT, iterations=SIMULATION_COUNT ) simulate_n_armed_bandit( arm=ARM_NUM, exploratory_rate=0.01, play=MAX_PLAY_COUNT, iterations=SIMULATION_COUNT ) simulate_n_armed_bandit( arm=ARM_NUM, exploratory_rate=0.1, play=MAX_PLAY_COUNT, iterations=SIMULATION_COUNT ) if __name__ == '__main__': current_time_str = datetime.now().strftime('%Y%m%d%H%M%S') file_handler = logging.FileHandler('bandit_simulation_{}.log'.format(current_time_str)) logging.basicConfig( format="[%(asctime)s %(levelname)s] %(message)s", level=logging.INFO, handlers=[file_handler] ) main()
evaluation.py
ここからは欲しいグラフのデータを計算するモジュールを実装していきます。
実装する関数は以下のとおりです。
calculate_average_by_play_count関数
各プレイ回数毎に平均値を計算します。
今回の場合、1つのプレイ回数につき2000個の報酬または最適な行動だったことを示すフラグを平均するために使います。
extract_exploratory_rate関数
結果が出力されているディレクトリ名から探索的な手を打つ確率(exploratory_rate)を抽出します。
白状しますと、各CSVファイルにexploratory_rateを出していたことを忘れていて、そのことを上の方針で実装している途中に思い出しました。。。
まぁ、「正規表現に苦手意識もあるし、練習にはちょうどいいかな」と、思った次第です(笑)
calculate_average_rewards関数
引数に指定されたディレクトリにある結果データを読み込んで、各プレイ回数毎の平均報酬を計算します。
calculate_average_suitable_action_rate関数
引数に指定されたディレクトリにある結果データを読み込んで、各プレイ回数毎の行動の最適度を計算します。
行動の最適度は、エージェントが選んだ行動が環境が持っている正解データmost_suitable_actionに一致する割合をプレイ回数で累積して計算します。
と、ここまで書いて、再現したい図2.1では、累積ではなく、各プレイ回数毎に2000回中の割合を求めていることに気付きました。。。
これは後日修正します。 (2019/05/05 更新)修正しました。
# -*- coding: utf-8 -*- import logging from pathlib import Path import re from typing import List import numpy as np import pandas as pd logger = logging.getLogger(__name__) def calculate_average_by_play_count(data: List[float]): data_array = np.array(data) _average_by_play_count = data_array.mean(axis=0) return _average_by_play_count def extract_exploratory_rate(dir: Path): logger.debug('source text: {}'.format(dir.name)) exploratory_rate_pattern = re.compile(r'^exploratory(0\.*\d*)$') _extract_result = exploratory_rate_pattern.findall(dir.name) logger.debug('extract result: {}'.format(_extract_result)) exploratory_rate = float(_extract_result[0]) return exploratory_rate def calculate_average_rewards(output_dir): _output_dir = Path(output_dir) exploratory_rate = extract_exploratory_rate(_output_dir) logger.debug('Exploratory Rate: {}'.format(exploratory_rate)) play_count_list = [] rewards_list = [] for file in _output_dir.iterdir(): logger.debug('reading {}'.format(file)) src_data = pd.read_csv(file) if not play_count_list: play_count_list.extend(src_data.play_count.tolist()) logger.debug('Extracted Play Counts: {}'.format(play_count_list)) rewards_list.append(src_data.reward.tolist()) average_rewards_by_play_counts = pd.DataFrame( data={ 'play_count': play_count_list, 'exploratory_rate': exploratory_rate, 'average_reward': calculate_average_by_play_count(rewards_list) } ) logger.debug('average_rewards_by_play_counts: {}'.format(average_rewards_by_play_counts.head())) return average_rewards_by_play_counts def calculate_average_suitable_action_rate(output_dir): _output_dir = Path(output_dir) exploratory_rate = extract_exploratory_rate(_output_dir) logger.debug('Exploratory Rate: {}'.format(exploratory_rate)) play_count_list = [] suitable_action_flag_list = [] for file in _output_dir.iterdir(): src_data = pd.read_csv(file) if not play_count_list: play_count_list.extend(src_data.play_count.tolist()) logger.debug('Extracted Play Counts: {}'.format(play_count_list)) is_suitable_action = src_data.action == src_data.most_suitable_action suitable_action_flag = 1 * is_suitable_action suitable_action_flag_list.append(suitable_action_flag.tolist()) average_suitable_action_rate_by_play_count = pd.DataFrame( data={ 'play_count': play_count_list, 'exploratory_rate': exploratory_rate, 'average_suitable_action_rate': calculate_average_by_play_count(suitable_action_flag_list) } ) logger.debug('calculate_average_suitable_action_rate: {}'.format(average_suitable_action_rate_by_play_count.head())) return average_suitable_action_rate_by_play_count
シミュレーションの結果
ここからはJupyter Notebookを起動して実際にグラフを描いていきます。
下の内容はGitHubにn_armed_bandit_simulation.ipynbでアップロードしてあります。
まずは、必要なモジュールをインポート
from pathlib import Path from matplotlib import pyplot as plt import pandas as pd from evaluation import calculate_average_rewards, calculate_average_suitable_action_rate
次に結果データを読み込みます。
OUTPUT_DIR = 'output/' average_rewards = pd.DataFrame() average_suitable_action_rate = pd.DataFrame() for child_dir in Path(OUTPUT_DIR).iterdir(): _reward = calculate_average_rewards(child_dir) average_rewards = average_rewards.append(_reward) _action_rate = calculate_average_suitable_action_rate(child_dir) average_suitable_action_rate = average_suitable_action_rate.append(_action_rate)
平均報酬のグラフを描画
exploratory_rate_list = sorted(average_rewards.exploratory_rate.unique().tolist()) for r in exploratory_rate_list: dat = average_rewards.loc[average_rewards.exploratory_rate == r, :] plt.plot(dat.play_count.tolist(), dat.average_reward.tolist(), label='eps = {:0.2f}'.format(r)) plt.xlabel('Play Count') plt.ylabel('Average Reward') plt.legend(loc='lower right') plt.savefig('bandit_average_reward.png') plt.show()
実行して得られるグラフは下のとおりです。
行動の最適度のグラフを描画
これは後日修正します (2019/05/05 更新)修正しました。
average_suitable_action_rate.average_suitable_action_rate *= 100 for r in exploratory_rate_list: dat = average_suitable_action_rate.loc[average_suitable_action_rate.exploratory_rate == r, :] plt.plot(dat.play_count.tolist(), dat.average_suitable_action_rate.tolist(), label='eps = {:0.2f}'.format(r)) plt.xlabel('Play Count') plt.ylabel('Suitable Action Rate (%)') plt.legend(loc='lower right') plt.savefig('bandit_suitable_action_rate.png') plt.show()
実行すると次のグラフが得られます。
まとめ
Sutton and Barto著、三上・皆川訳(2000) のn本腕バンディット問題における-グリーディ行動評価手法の性能比較シミュレーションを再現しました。
結果は、p.31の図2.1とほぼ同じグラフが得られたため、私の理解に問題が無いことが分かりました。
課題
を上げてシミュレーションした場合に、どのように平均報酬が変わるのかを試してみようと思います。
(ブログとしてアップしないと思います)
参考文献
- Sutton and Barto著、三上・皆川訳(2000) 、強化学習、森北出版
- 久保(2019)、Pythonで学ぶ強化学習、講談社
- 「スロットマシン」(2018年10月1日 (月) 05:47 UTCの版)『ウィキペディア日本語版』。https://ja.wikipedia.org/wiki/%E3%82%B9%E3%83%AD%E3%83%83%E3%83%88%E3%83%9E%E3%82%B7%E3%83%B3