Material Book of Statistics

統計、機械学習、プログラミングなどで実験的な試みを書いていきます。

強化学習 n本腕バンディットにおけるε-グリーディ行動評価手法の性能比較を再現する

はじめに

会社の勉強会で強化学習を応用したマーケティング手法の発表を聞いて「強化学習が面白そうだ」と思い、私も勉強することにしました。
勉強している本は、Sutton and Barto著、三上・皆川訳の強化学習(原題: Reinforcement Learning: An Introduction)です。

www.amazon.co.jp

2章を読み終えましたが、この章で主に扱われているn本腕バンディット(多腕バンディット)問題の理解が怪しいと感じたので、途中に出てくるシミュレーションを再現してみて、理解度をチェックしました。

なお、この記事のほとんどは上記の本を元にして書いていることを、予めお断りしておきます。

 n本腕バンディット問題の定義

 n本腕バンディット問題 ( n-armed bandit problem) とは、

 n種類の異なる行動選択肢があり、その中から1つを選択すると報酬がもらえる場合に、ある一定期間(または回数)の間に合計報酬の期待値を最大化する

問題のことです。
日本語では「多腕バンディット問題 (multi-armed bandit problem)」とも呼ばれていますが、ここでは本に習うことにします。
報酬は決まっているわけではなく、1回毎に選択した行動に依存する確率分布にしたがって決まります。
また、それぞれの行動選択をプレイ (play) と呼びます。

 n本腕バンディット問題の例

この問題を具体的にイメージするにはスロットマシンを考えるのが良いです。*1
ただし、普通にイメージするスロットマシンとは違い、ここでは下図のように n本のレバーを持っている機械について考えます。

f:id:ishiyama-katsuya:20190504131638p:plain
N本腕バンディット問題の具体例

図のように、1つのレバーを引くことが1回の行動選択に該当し、報酬は当たりで得られる利益に相当します。 同じレバーを引いても報酬が毎回異なることに注意してください。
このような状況下で、プレイヤーはプレイを繰り返しながら最良のレバーを探し、賞金を最大にすることを目指します。

行動価値手法

行動価値手法を説明するに当たって使う数式の準備をします。

行動 aの真の価値を Q^{*}(a)として、 t番目のプレイにおける Q^{*}(a)の推定量を[tex: Q{t}(a)]とします。
また、 t回目のプレイにおいて、それまでの間に行動 aが[tex: k
{a}]回選択されていたとして、それぞれの回での報酬を[tex: r{1}, r{2}, \dots, r{k{a}}]とします。

貪欲な手と探索的な手

強化学習教師あり学習とは違って、教師データは与えられません。 そのため、獲得する報酬を最大にするためには、過去に得た各プレイに対する報酬から各行動の価値を推定して、価値が最も高い行動を選択していきます。
このように、最も価値が高い行動を選択していくことを貪欲な(グリーディ; greedy)手を打つといいます。
これを数式で表すと、

[tex: \displaystyle Q{t}(a^{*}) = \max{a} Q_{t}(a)]

となります。

これに対して、ランダムに行動を選ぶこともあります。
これは、そうしなければ遭遇しなかったような状態を生じさせるということから探索的な(explore)手と呼ばれます。

行動価値推定方法: 標本平均手法(sample-averaging method)

行動価値を評価する方法は様々ですが、今回は単純な標本平均手法を使います。
行動 aの価値 Q_{t}(a)

[tex: \displaystyle Q{t}(a) = \frac{r{1} + r{2} + \dots + r{k{a}}}{k{a}}]

と推定します。
ただし、[tex: k{a} = 0]の場合は、任意の行動 aに対して[tex: Q{0}(a) = 0]とします。

行動選択方法:  \varepsilon-グリーディ手法

グリーディな手を選択し続ければ報酬が最大化できるかと言えば、そうではありません。
報酬はランダムに決定されるので、例えば10回プレイした結果で価値が最大になる行動を推測したとしても、それが間違っていることがあり得るからです。
そのため、ある一定の確率 \varepsilonで探索的な手を打つようにして、価値の推定値を改良することを狙います。これを \epsilon-グリーディ手法といいます。

 \varepsilon-グリーディ手法の性能評価シミュレーションを再現する

ここからはSutton and Barto著、三上・皆川訳(2000) 、強化学習のp.31の図2.1で行われた \varepsilon-グリーディ手法の性能評価を再現していきます。
目標にするグラフを転載します。

f:id:ishiyama-katsuya:20190503195642j:plain
Sutton and Barto著、三上・皆川訳、強化学習 p.31の図2.1

シミュレーションの設定

以下にシミュレーションの設定を示します。
使うアルゴリズムは10本腕バンディット(「 n本腕バンディット問題の例」を参照)です。
ここで、 N(\mu, \sigma^{2})は平均 \muと分散 \sigma^{2}正規分布を表します。
 \varepsilon-グリーディ手法の \varepsilonは0, 0.01, 0.1の3つを試しています。
なお、 \varepsilon = 0はグリーディ手法のみになることに注意してください。

  1. 各行動 aの真の価値 Q^{*}(a)は標準正規分布 N(0, 1)で選ぶ。
  2. 報酬は各プレイ毎に選択した行動 aによって N(Q^{*}(a), 1)で決定する。
  3.  \varepsilon-グリーディ手法で行動選択する。ただし、1回目のプレイの場合は探索的に行動を選択する。
  4. 選択した行動に応じて、2で決めた報酬を受け取る。
  5. 今まで受け取った報酬を元に、標本平均手法で行動価値推定を行う。
  6. 2〜5までの設定を1プレイとして、1000回プレイする。
  7. 1〜6までを1セットとして、2000回繰り返す。
  8. プレイ回数毎に2000回の平均値を算出する。

シミュレーションの環境

項目
OS Ubuntu 18.04.2 LTS
CPU Intel(R) Core(TM) i7-8086K CPU @ 4.00GHz
メモリ 64GB
python Python 3.6.8 :: Anaconda, Inc.

シミュレーションを実装する

ここからは実際にシミュレーションをpythonで実装していきます。
作成するスクリプトは次の3つです。

  1. bandit.py
  2. evaluation.py
  3. n_armed_bandit_simulation.ipynb

1.のbandit.pyはシミュレーション本体で、この中にシミュレーションを書いていきます。
2.のevaluation.pyはbandit.pyで得た結果データをグラフで使えるように集計するモジュールを提供します。
3.のn_armed_bandit_simulation.ipynbでは、2をモジュールとして読み込み、実際にグラフを描画します。
コードはhttps://github.com/Katsuya-Ishiyama/rl_suttonにあります。

bandit.py

シミュレーションの実装はエージェント(実際にプレイする人)と環境(スロットマシン)に分けて行います。
これはSutton and Barto著、三上・皆川訳、強化学習 p.56の図3.1を参考にしたためです。

f:id:ishiyama-katsuya:20190505003137j:plain
Sutton and Barto著、三上・皆川訳、強化学習 p.56より転載

NArmedBanditEnvironmentクラス

このクラスの役割は

  1. 行動の真の価値を決める
  2. 1つの行動が実行されたら対応する報酬を返す
  3. 真の価値が最も高い行動を教える

です。
2番目の役割は、上図のエージェントから行動を受け取りと対応する報酬を返す部分を担っています。
図には環境の状態をエージェントに返す部分が存在しますが、今回の設定では環境の状態は変わらないため、実装していません。
また、3番目の役割は本来ならば必要ありませんが、シミュレーションで性能を評価する上で正解は必要なので実装しています。

BanditLoggerクラス

このクラスの役割は

  1. プレイ毎にプレイ回数、行動、報酬、探索的な手を打つ確率、最良な行動を保持する
  2. 保持しているデータをCSVに吐き出す
  3. 保持しているデータをプレイ回数または行動毎にまとめてエージェントに渡す

です。

NArmedBanditAgentクラス

このクラスの役割は

  1. 行動を決定する
  2. 与えられた環境でプレイする
  3. 環境から報酬を受け取る
  4. 受け取った報酬をBanditLoggerに登録する
  5. 受け取った報酬を元に行動価値を推定する
  6. BanditLoggerに記録されたデータをCSVに吐き出す(BanditLoggerをラップしたもの)

です。
インスタンス化するときにNArmedBanditEnvironmentインスタンスを受け取るようにしていますが、これは久保(2019)を参考にしました。

simulate_n_armed_bandit関数

シミュレーションの設定にある手順1〜8を1つにまとめた関数です。
設定を下表の引数で指定することができます。

引数名 意味
arm シミュテーションする行動の数
exploratory_rate 探索的な手を打つ確率
play シミュレーションするプレイ回数
iterations シミュレーションの設定にある手順7の回数

main関数

実際にシミュレーションを実行します。

実際のコード

# -*- coding: utf-8 -*-

import csv
from datetime import datetime
import logging
import os
from typing import List, Dict
from tqdm import tqdm
import numpy as np

logger = logging.getLogger()


class NArmedBanditEnvironment(object):

    def __init__(self, arm: int):
        self.arm = arm
        self.arm_list = list(range(1, arm+1))
        self.true_action_values = None
        self.most_suitable_action_index = None

    def initialize(self):
        _this_class_name = __class__.__name__
        logger.info("Initializing {}...".format(_this_class_name))
        self._create_true_action_values()
        self._calculate_most_suitable_action_index()
        logger.info("{} has been initialized.".format(_this_class_name))

    def _calculate_most_suitable_action_index(self):
        _action_index_list = list(self.true_action_values.keys())
        _action_value_list = list(self.true_action_values.values())
        _max_action_value_list_index = np.argmax(_action_value_list)
        self.most_suitable_action_index = _action_index_list[_max_action_value_list_index]
        logger.info('Most suitable action: {}'.format(self.most_suitable_action_index))

    def _create_true_action_values(self):
        _true_action_values_list = np.random.normal(0, 1, self.arm).tolist()
        self.true_action_values = {i: v for i, v in zip(self.arm_list, _true_action_values_list)}
        logger.info("True action values: {}".format(self.true_action_values))

    def create_action_values(self) -> List[float]:
        if self.true_action_values is None:
            raise Exception("Create true action values before you run this method.")
        _true_action_values = list(self.true_action_values.values())
        _action_values_list = np.random.normal(_true_action_values, 1).tolist()
        _action_values = {i: v for i, v in zip(self.arm_list, _action_values_list)}
        logger.debug("Created action values: {}".format(_action_values))
        return _action_values

    def run(self, arm: int):
        _selected_arm = arm
        _action_values = self.create_action_values()
        reward: float = _action_values[arm]
        logger.debug("Selected arm index: {}, Reward: {}".format(_selected_arm, reward))
        return reward


class BanditLogger(object):

    def __init__(self):
        self._logs = []
        self.most_suitable_action = None

    def register(self, play_count, arm, reward, exploratory_rate):

        if self.most_suitable_action is None:
            logger.error('most_suitable_action is not set.')
            raise Exception('most_suitable_action is not set.')

        _log = {
            "play_count": play_count,
            "action": arm,
            "reward": reward,
            "exploratory_rate": exploratory_rate,
            "most_suitable_action": self.most_suitable_action
        }
        self._logs.append(_log)
        log_msg = 'Registered log. (Play count: {}, Action: {}, Reward: {}, Exploratory rate: {})'
        logger.debug(log_msg.format(play_count, arm, reward, exploratory_rate))

    def write_logs_to_csv(self, path):
        with open(path, 'w') as f:
            fieldnames = ['play_count', 'action', 'reward', 'most_suitable_action', 'exploratory_rate']
            csv_writer = csv.DictWriter(f, fieldnames=fieldnames)
            csv_writer.writeheader()
            csv_writer.writerows(self._logs)

    def get_rewards_by_action(self) -> Dict[int, List[float]]:
        _rewards_by_action: Dict[int, List[float]] = {}
        for _log in self._logs:
            _action: int = _log["action"]
            _reward: float = _log["reward"]
            _rewards_by_action.setdefault(_action, [])
            _rewards_by_action[_action].append(_reward)
        return _rewards_by_action

    def get_rewards_by_play_count(self) -> List[float]:
        _rewards_by_play_count = []
        for _log in self._logs:
            _rewards_by_play_count.append(_log["reward"])
        return _rewards_by_play_count


class NArmedBanditAgent(object):

    def __init__(self, environment):
        self.environment = environment
        self.arm_list = environment.arm_list
        self.play_count = 0
        self.play_log = BanditLogger()
        self.play_log.most_suitable_action = environment.most_suitable_action_index
        self.exploratory_rate = None

    def select_policy(self, eps=0.1):
        if (eps < 0) or (1 < eps):
            logger.error('eps cannot be handled.')
            raise ValueError('eps cannot be handled.')
        if eps == 0:
            logger.warning('The greedy policy will be selected at this time.')
        if eps == 1:
            logger.warning('The exploratory policy will be selected at this time.')
        is_exploratory = (np.random.uniform(0, 1) < eps) or (self.play_count == 1)
        if is_exploratory:
            _selected_action_index = self.select_policy_exploratory()
        else:
            _selected_action_index = self.select_policy_greedy()
        logger.info('Selected action: {}'.format(_selected_action_index))
        return _selected_action_index

    def select_policy_greedy(self) -> int:
        _estimated_action_values = self.estimate_action_values()
        _estimated_action_values_list = list(_estimated_action_values.values())
        _estimated_action_values_list_index = np.argmax(_estimated_action_values_list)
        _selected_action_value_index = self.arm_list[_estimated_action_values_list_index]
        logger.info('A greedy policy has been selected.')
        return _selected_action_value_index

    def select_policy_exploratory(self) -> int:
        _selected_action_value_index = np.random.choice(self.arm_list)
        logger.info('A exploratory policy has been selected.')
        return _selected_action_value_index

    def receive_reward(self, arm: int, reward: float):
        self.play_log.register(self.play_count, arm, reward, self.exploratory_rate)

    def estimate_action_values(self) -> Dict[int, float]:
        rewards = self.play_log.get_rewards_by_action()
        _estimated_action_values = {i: 0 for i in self.arm_list}
        for _arm, rewards_list in rewards.items():
            _estimated_action_values[_arm] = np.mean(rewards_list)
        logger.info('Estimated action values: {}'.format(_estimated_action_values))
        return _estimated_action_values

    def play(self, exploratory_rate=0.1):
        self.play_count += 1
        logger.info('Play counts: {}'.format(self.play_count))
        self.exploratory_rate = exploratory_rate
        logger.info('Exploratory rate: {}'.format(self.exploratory_rate))
        selected_action = self.select_policy(self.exploratory_rate)
        reward = self.environment.run(selected_action)
        logger.info('Reward: {}'.format(reward))
        self.receive_reward(selected_action, reward)

    def write_logs_to_csv(self, path):
        logger.info('Exporting logs... Path: {}'.format(path))
        self.play_log.write_logs_to_csv(path)
        logger.info('Exporting logs has done. Path: {}'.format(path))


def simulate_n_armed_bandit(arm, exploratory_rate, play, iterations):

    bandit_env = NArmedBanditEnvironment(arm=arm)

    for simulation_num in tqdm(range(1, iterations+1)):
        logger.info('--------------------------------------------------')
        logger.info('Start simulation No.{}'.format(simulation_num))
        logger.info('arm: {}, exploratory_rate: {}, play: {}, iterations: {}'.format(arm, exploratory_rate, play, iterations))
        bandit_env.initialize()
        agent = NArmedBanditAgent(bandit_env)
        for _ in range(play):
            agent.play(exploratory_rate)
        filename = 'n_armed_bandit_arm{}_exploratory{}_simulation{}.csv'.format(arm, exploratory_rate, simulation_num)
        save_path = os.path.join('output', 'exploratory{}'.format(exploratory_rate), filename)
        agent.write_logs_to_csv(path=save_path)


def main():
    ARM_NUM = 10
    MAX_PLAY_COUNT = 1000
    SIMULATION_COUNT = 2000

    simulate_n_armed_bandit(
        arm=ARM_NUM,
        exploratory_rate=0,
        play=MAX_PLAY_COUNT,
        iterations=SIMULATION_COUNT
    )

    simulate_n_armed_bandit(
        arm=ARM_NUM,
        exploratory_rate=0.01,
        play=MAX_PLAY_COUNT,
        iterations=SIMULATION_COUNT
    )

    simulate_n_armed_bandit(
        arm=ARM_NUM,
        exploratory_rate=0.1,
        play=MAX_PLAY_COUNT,
        iterations=SIMULATION_COUNT
    )


if __name__ == '__main__':
    current_time_str = datetime.now().strftime('%Y%m%d%H%M%S')
    file_handler = logging.FileHandler('bandit_simulation_{}.log'.format(current_time_str))
    logging.basicConfig(
        format="[%(asctime)s %(levelname)s] %(message)s",
        level=logging.INFO,
        handlers=[file_handler]
    )

    main()

evaluation.py

ここからは欲しいグラフのデータを計算するモジュールを実装していきます。
実装する関数は以下のとおりです。

calculate_average_by_play_count関数

各プレイ回数毎に平均値を計算します。
今回の場合、1つのプレイ回数につき2000個の報酬または最適な行動だったことを示すフラグを平均するために使います。

extract_exploratory_rate関数

結果が出力されているディレクトリ名から探索的な手を打つ確率(exploratory_rate)を抽出します。

白状しますと、各CSVファイルにexploratory_rateを出していたことを忘れていて、そのことを上の方針で実装している途中に思い出しました。。。
まぁ、「正規表現に苦手意識もあるし、練習にはちょうどいいかな」と、思った次第です(笑)

calculate_average_rewards関数

引数に指定されたディレクトリにある結果データを読み込んで、各プレイ回数毎の平均報酬を計算します。

calculate_average_suitable_action_rate関数

引数に指定されたディレクトリにある結果データを読み込んで、各プレイ回数毎の行動の最適度を計算します。
行動の最適度は、エージェントが選んだ行動が環境が持っている正解データmost_suitable_actionに一致する割合をプレイ回数で累積して計算します。
と、ここまで書いて、再現したい図2.1では、累積ではなく、各プレイ回数毎に2000回中の割合を求めていることに気付きました。。。
これは後日修正します。 (2019/05/05 更新)修正しました。

# -*- coding: utf-8 -*-

import logging
from pathlib import Path
import re
from typing import List
import numpy as np
import pandas as pd


logger = logging.getLogger(__name__)


def calculate_average_by_play_count(data: List[float]):
    data_array = np.array(data)
    _average_by_play_count = data_array.mean(axis=0)
    return _average_by_play_count


def extract_exploratory_rate(dir: Path):
    logger.debug('source text: {}'.format(dir.name))
    exploratory_rate_pattern = re.compile(r'^exploratory(0\.*\d*)$')
    _extract_result = exploratory_rate_pattern.findall(dir.name)
    logger.debug('extract result: {}'.format(_extract_result))
    exploratory_rate = float(_extract_result[0])
    return exploratory_rate


def calculate_average_rewards(output_dir):
    _output_dir = Path(output_dir)
    exploratory_rate = extract_exploratory_rate(_output_dir)
    logger.debug('Exploratory Rate: {}'.format(exploratory_rate))

    play_count_list = []
    rewards_list = []
    for file in _output_dir.iterdir():
        logger.debug('reading {}'.format(file))
        src_data = pd.read_csv(file)
        if not play_count_list:
            play_count_list.extend(src_data.play_count.tolist())
            logger.debug('Extracted Play Counts: {}'.format(play_count_list))
        rewards_list.append(src_data.reward.tolist())

    average_rewards_by_play_counts = pd.DataFrame(
        data={
            'play_count': play_count_list,
            'exploratory_rate': exploratory_rate,
            'average_reward': calculate_average_by_play_count(rewards_list)
        }
    )
    logger.debug('average_rewards_by_play_counts: {}'.format(average_rewards_by_play_counts.head()))

    return average_rewards_by_play_counts


def calculate_average_suitable_action_rate(output_dir):
    _output_dir = Path(output_dir)
    exploratory_rate = extract_exploratory_rate(_output_dir)
    logger.debug('Exploratory Rate: {}'.format(exploratory_rate))

    play_count_list = []
    suitable_action_flag_list = []
    for file in _output_dir.iterdir():
        src_data = pd.read_csv(file)
        if not play_count_list:
            play_count_list.extend(src_data.play_count.tolist())
            logger.debug('Extracted Play Counts: {}'.format(play_count_list))
        is_suitable_action = src_data.action == src_data.most_suitable_action
        suitable_action_flag = 1 * is_suitable_action
        suitable_action_flag_list.append(suitable_action_flag.tolist())

    average_suitable_action_rate_by_play_count = pd.DataFrame(
        data={
            'play_count': play_count_list,
            'exploratory_rate': exploratory_rate,
            'average_suitable_action_rate': calculate_average_by_play_count(suitable_action_flag_list)
        }
    )
    logger.debug('calculate_average_suitable_action_rate: {}'.format(average_suitable_action_rate_by_play_count.head()))

    return average_suitable_action_rate_by_play_count

シミュレーションの結果

ここからはJupyter Notebookを起動して実際にグラフを描いていきます。
下の内容はGitHubにn_armed_bandit_simulation.ipynbでアップロードしてあります。

まずは、必要なモジュールをインポート

from pathlib import Path
from matplotlib import pyplot as plt
import pandas as pd
from evaluation import calculate_average_rewards, calculate_average_suitable_action_rate

次に結果データを読み込みます。

OUTPUT_DIR = 'output/'
average_rewards = pd.DataFrame()
average_suitable_action_rate = pd.DataFrame()
for child_dir in Path(OUTPUT_DIR).iterdir():
    _reward = calculate_average_rewards(child_dir)
    average_rewards = average_rewards.append(_reward)

    _action_rate = calculate_average_suitable_action_rate(child_dir)
    average_suitable_action_rate = average_suitable_action_rate.append(_action_rate)

平均報酬のグラフを描画

exploratory_rate_list = sorted(average_rewards.exploratory_rate.unique().tolist())

for r in exploratory_rate_list:
    dat = average_rewards.loc[average_rewards.exploratory_rate == r, :]
    plt.plot(dat.play_count.tolist(), dat.average_reward.tolist(), label='eps = {:0.2f}'.format(r))
plt.xlabel('Play Count')
plt.ylabel('Average Reward')
plt.legend(loc='lower right')
plt.savefig('bandit_average_reward.png')
plt.show()

実行して得られるグラフは下のとおりです。

f:id:ishiyama-katsuya:20190505023556p:plain
プレイ回数毎の平均報酬

行動の最適度のグラフを描画

これは後日修正します (2019/05/05 更新)修正しました。

average_suitable_action_rate.average_suitable_action_rate *= 100

for r in exploratory_rate_list:
    dat = average_suitable_action_rate.loc[average_suitable_action_rate.exploratory_rate == r, :]
    plt.plot(dat.play_count.tolist(), dat.average_suitable_action_rate.tolist(), label='eps = {:0.2f}'.format(r))
plt.xlabel('Play Count')
plt.ylabel('Suitable Action Rate (%)')
plt.legend(loc='lower right')
plt.savefig('bandit_suitable_action_rate.png')
plt.show()

実行すると次のグラフが得られます。

f:id:ishiyama-katsuya:20190505112812p:plain
プレイ回数毎の行動の最適度

まとめ

Sutton and Barto著、三上・皆川訳(2000) のn本腕バンディット問題における \varepsilon-グリーディ行動評価手法の性能比較シミュレーションを再現しました。
結果は、p.31の図2.1とほぼ同じグラフが得られたため、私の理解に問題が無いことが分かりました。

課題

 \varepsilonを上げてシミュレーションした場合に、どのように平均報酬が変わるのかを試してみようと思います。
(ブログとしてアップしないと思います)

参考文献

脚注

*1:Sutton and Barto著、三上・皆川訳の強化学習 n本腕バンディット問題の由来は「「片腕のバンディット」と呼ばれている1本腕のスロットマシンから来ている」とある。