FrozenLake-v1¶

In [1]:
import sys
import logging
import itertools

import numpy as np
np.random.seed(0)
import gym

logging.basicConfig(level=logging.INFO,
        format='%(asctime)s [%(levelname)s] %(message)s',
        stream=sys.stdout, datefmt='%H:%M:%S')

Use Environment¶

In [2]:
env = gym.make('FrozenLake-v1')
logging.info('observation space = %s', env.observation_space)
logging.info('action space = %s', env.action_space)
logging.info('number of states = %s', env.observation_space.n)
logging.info('number of actions = %s', env.action_space.n)
logging.info('P[14] = %s', env.P[14])
logging.info('P[14][2] = %s', env.P[14][2])
logging.info('reward threshold = %s', env.spec.reward_threshold)
logging.info('max episode steps = %s', env.spec.max_episode_steps)
00:00:00 [INFO] observation space = Discrete(16)
00:00:00 [INFO] action space = Discrete(4)
00:00:00 [INFO] number of states = 16
00:00:00 [INFO] number of actions = 4
00:00:00 [INFO] P[14] = {0: [(0.3333333333333333, 10, 0.0, False), (0.3333333333333333, 13, 0.0, False), (0.3333333333333333, 14, 0.0, False)], 1: [(0.3333333333333333, 13, 0.0, False), (0.3333333333333333, 14, 0.0, False), (0.3333333333333333, 15, 1.0, True)], 2: [(0.3333333333333333, 14, 0.0, False), (0.3333333333333333, 15, 1.0, True), (0.3333333333333333, 10, 0.0, False)], 3: [(0.3333333333333333, 15, 1.0, True), (0.3333333333333333, 10, 0.0, False), (0.3333333333333333, 13, 0.0, False)]}
00:00:00 [INFO] P[14][2] = [(0.3333333333333333, 14, 0.0, False), (0.3333333333333333, 15, 1.0, True), (0.3333333333333333, 10, 0.0, False)]
00:00:00 [INFO] reward threshold = 0.7
00:00:00 [INFO] max episode steps = 100

Play Using Random Policy

In [3]:
def play_policy(env, policy, render=False):
    episode_reward = 0.
    observation, _ = env.reset()
    while True:
        if render:
            env.render()  # render the environment
        action = np.random.choice(env.action_space.n, p=policy[observation])
        observation, reward, terminated, truncated, _ = env.step(action)
        episode_reward += reward
        if terminated or truncated:
            break
    return episode_reward
In [4]:
logging.info('==== Random policy ====')
random_policy = np.ones((env.observation_space.n, env.action_space.n)) / \
        env.action_space.n

episode_rewards = [play_policy(env, random_policy)  for _ in range(100)]
logging.info('average episode reward = %.2f ± %.2f',
        np.mean(episode_rewards), np.std(episode_rewards))
00:00:00 [INFO] ==== Random policy ====
00:00:00 [INFO] average episode reward = 0.04 ± 0.20

Evaluate Policy¶

In [5]:
def v2q(env, v, state=None, gamma=1.):  # calculate action value from state value
    if state is not None:  # solve for single state
        q = np.zeros(env.action_space.n)
        for action in range(env.action_space.n):
            for prob, next_state, reward, terminated in env.P[state][action]:
                q[action] += prob * \
                        (reward + gamma * v[next_state] * (1. - terminated))
    else:  # solve for all states
        q = np.zeros((env.observation_space.n, env.action_space.n))
        for state in range(env.observation_space.n):
            q[state] = v2q(env, v, state, gamma)
    return q

def evaluate_policy(env, policy, gamma=1., tolerant=1e-6):
    v = np.zeros(env.observation_space.n)  # initialize state values
    while True:
        delta = 0
        for state in range(env.observation_space.n):
            vs = sum(policy[state] * v2q(env, v, state, gamma))  # update state value
            delta = max(delta, abs(v[state]-vs))  # update max error
            v[state] = vs
        if delta < tolerant:  # check whether iterations can finish
            break
    return v

Evaluate Random Policy

In [6]:
v_random = evaluate_policy(env, random_policy)
logging.info('state value:\n%s', v_random.reshape(4, 4))

q_random = v2q(env, v_random)
logging.info('action value:\n%s', q_random)
00:00:00 [INFO] state value:
[[0.0139372  0.01162942 0.02095187 0.01047569]
 [0.01624741 0.         0.04075119 0.        ]
 [0.03480561 0.08816967 0.14205297 0.        ]
 [0.         0.17582021 0.43929104 0.        ]]
00:00:00 [INFO] action value:
[[0.01470727 0.01393801 0.01393801 0.01316794]
 [0.00852221 0.01162969 0.01086043 0.01550616]
 [0.02444416 0.0209521  0.02405958 0.01435233]
 [0.01047585 0.01047585 0.00698379 0.01396775]
 [0.02166341 0.01701767 0.0162476  0.01006154]
 [0.         0.         0.         0.        ]
 [0.05433495 0.04735099 0.05433495 0.00698396]
 [0.         0.         0.         0.        ]
 [0.01701767 0.04099176 0.03480569 0.04640756]
 [0.0702086  0.11755959 0.10595772 0.05895286]
 [0.18940397 0.17582024 0.16001408 0.04297362]
 [0.         0.         0.         0.        ]
 [0.         0.         0.         0.        ]
 [0.08799662 0.20503708 0.23442697 0.17582024]
 [0.25238807 0.53837042 0.52711467 0.43929106]
 [0.         0.         0.         0.        ]]

Improve Policy¶

In [7]:
def improve_policy(env, v, policy, gamma=1.):
    optimal = True
    for state in range(env.observation_space.n):
        q = v2q(env, v, state, gamma)
        action = np.argmax(q)
        if policy[state][action] != 1.:
            optimal = False
            policy[state] = 0.
            policy[state][action] = 1.
    return optimal

Improve from random policy

In [8]:
policy = random_policy.copy()
optimal = improve_policy(env, v_random, policy)
if optimal:
    logging.info('No update. Optimal policy is:\n%s', policy)
else:
    logging.info('Updating completes. Updated policy is:\n%s', policy)
00:00:00 [INFO] Updating completes. Updated policy is:
[[1. 0. 0. 0.]
 [0. 0. 0. 1.]
 [1. 0. 0. 0.]
 [0. 0. 0. 1.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [0. 0. 0. 1.]
 [0. 1. 0. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [1. 0. 0. 0.]
 [0. 0. 1. 0.]
 [0. 1. 0. 0.]
 [1. 0. 0. 0.]]

Iterate Policy¶

In [9]:
def iterate_policy(env, gamma=1., tolerant=1e-6):
    policy = np.ones((env.observation_space.n,
            env.action_space.n)) / env.action_space.n  # initialize
    while True:
        v = evaluate_policy(env, policy, gamma, tolerant)
        if improve_policy(env, v, policy):
            break
    return policy, v

policy_pi, v_pi = iterate_policy(env)
logging.info('optimal state value =\n%s', v_pi.reshape(4, 4))
logging.info('optimal policy =\n%s', np.argmax(policy_pi, axis=1).reshape(4, 4))
00:00:00 [INFO] optimal state value =
[[0.82351246 0.82350689 0.82350303 0.82350106]
 [0.82351416 0.         0.5294002  0.        ]
 [0.82351683 0.82352026 0.76469786 0.        ]
 [0.         0.88234658 0.94117323 0.        ]]
00:00:00 [INFO] optimal policy =
[[0 3 3 3]
 [0 0 0 0]
 [3 1 0 0]
 [0 2 1 0]]

Test Policy

In [10]:
episode_rewards = [play_policy(env, policy_pi)  for _ in range(100)]
logging.info('average episode reward = %.2f ± %.2f',
        np.mean(episode_rewards), np.std(episode_rewards))
00:00:00 [INFO] average episode reward = 0.77 ± 0.42

Value Iteration¶

In [11]:
def iterate_value(env, gamma=1, tolerant=1e-6):
    v = np.zeros(env.observation_space.n)  # initialization
    while True:
        delta = 0
        for state in range(env.observation_space.n):
            vmax = max(v2q(env, v, state, gamma))  # update state value
            delta = max(delta, abs(v[state]-vmax))
            v[state] = vmax
        if delta < tolerant:  # check whether iterations can finish
            break

    # calculate optimal policy
    policy = np.zeros((env.observation_space.n, env.action_space.n))
    for state in range(env.observation_space.n):
        action = np.argmax(v2q(env, v, state, gamma))
        policy[state][action] = 1.
    return policy, v

policy_vi, v_vi = iterate_value(env)
logging.info('optimal state value =\n%s', v_vi.reshape(4, 4))
logging.info('optimal policy = \n%s', np.argmax(policy_vi, axis=1).reshape(4, 4))
00:00:00 [INFO] optimal state value =
[[0.82351232 0.82350671 0.82350281 0.82350083]
 [0.82351404 0.         0.52940011 0.        ]
 [0.82351673 0.82352018 0.76469779 0.        ]
 [0.         0.88234653 0.94117321 0.        ]]
00:00:00 [INFO] optimal policy =
[[0 3 3 3]
 [0 0 0 0]
 [3 1 0 0]
 [0 2 1 0]]

Test Policy

In [12]:
episode_rewards = [play_policy(env, policy_vi) for _ in range(100)]
logging.info('average episode reward = %.2f ± %.2f',
        np.mean(episode_rewards), np.std(episode_rewards))
00:00:00 [INFO] average episode reward = 0.70 ± 0.46