强化学习-马尔科夫及Q-learning及python代码实现
马尔科夫决策过程
马尔科夫决策过程由5个元素构成:
- S:表示状态集(states)
- A:表示一组动作(actions)
- P:表示状态转移概率.a表示在当前sES状态下,经过aEA作用后,会转移到的其他状态的概率分布情况
- R:奖励函数(reward function)表示agent采取某个动作后的即时奖励46.2
- y:折扣系数意味着当下的reward比未来反馈的reward更重要
1.智能体初始状态为S0
2.选择一个动作a0
3.按概率转移矩阵Psa转移到了下一个状态S1然后。。。
状态价值函数:
t时刻的状态s能获得的未来回报的期望价值函数用来衡量某一状态或状态-动作对的优劣价,累计奖励的期望最优价值函数:所有策略下的最优累计奖励期望
策略:已知状态下可能产生动作的概率分布
Bellman方程:当前状态的价值和下一步的价值及当前的奖励(Reward)有关
价值函数分解为当前的奖励和下一步的价值两部分
Bellman最优化方程:
在某个状态(state)下最优价值函数的值,就是智能体(agent)在该状态下,所能获得的累积期望奖励值(cumulative expective rewards)的最大值.
Q-learning
具体步骤:
首先取学习参数y=0.8,初始状态为房间1,并将Q初始化为一个零矩阵
观察矩阵R的第二行(对应房间1或状态1),它包含两个非负值,即当前状态1的下一步行为有两种可能:转至状态3或转至状态5.随机地,我们选取转至状态5.
想象一下,当我们的agent位于状态5以后,会发生什么事情呢?观察矩阵R的第6行它对应三个可能的行为:转至状态1,4或5.
接下来,进行下一次episode的迭代,首先随机地选取一个初始状态,这次我们选取状态3作为初始状态.
观察矩阵R的第四行(对应状态3),它对应三个可能的行为:转至状态1,2或4.随机地,我们选取转至状态1.因此观察矩阵R的第二行(对应状态1),它对应两个可能的行为:转至状态3或5.
现在状态1变成了当前状态.因为状态1还不是目标状态,因此我们需要继续往前探索.状态1对应三个可能的行为:转至状态3或5.不妨假定我们幸运地选择了状态5.
代码
1、自己编写gridworld.py 实现生成矩阵
import numpy as np
import sys
from gym.envs.toy_text import discrete
UP = 0
RIGHT = 1
DOWN = 2
LEFT = 3
class GridworldEnv(discrete.DiscreteEnv):
"""
Grid World environment from Sutton's Reinforcement Learning book chapter 4.
You are an agent on an MxN grid and your goal is to reach the terminal
state at the top left or the bottom right corner.
For example, a 4x4 grid looks as follows:
T o o o
o x o o
o o o o
o o o T
x is your position and T are the two terminal states.
You can take actions in each direction (UP=0, RIGHT=1, DOWN=2, LEFT=3).
Actions going off the edge leave you in your current state.
You receive a reward of -1 at each step until you reach a terminal state.
"""
metadata = {'render.modes': ['human', 'ansi']}
def __init__(self, shape=[4,4]):
if not isinstance(shape, (list, tuple)) or not len(shape) == 2:
raise ValueError('shape argument must be a list/tuple of length 2')
self.shape = shape
nS = np.prod(shape)
nA = 4
MAX_Y = shape[0]
MAX_X = shape[1]
P = {}
grid = np.arange(nS).reshape(shape)
it = np.nditer(grid, flags=['multi_index'])
while not it.finished:
s = it.iterindex
y, x = it.multi_index
P[s] = {a : [] for a in range(nA)}
is_done = lambda s: s == 0 or s == (nS - 1)
reward = 0.0 if is_done(s) else -1.0
# We're stuck in a terminal state
if is_done(s):
P[s][UP] = [(1.0, s, reward, True)]
P[s][RIGHT] = [(1.0, s, reward, True)]
P[s][DOWN] = [(1.0, s, reward, True)]
P[s][LEFT] = [(1.0, s, reward, True)]
# Not a terminal state
else:
ns_up = s if y == 0 else s - MAX_X
ns_right = s if x == (MAX_X - 1) else s + 1
ns_down = s if y == (MAX_Y - 1) else s + MAX_X
ns_left = s if x == 0 else s - 1
P[s][UP] = [(1.0, ns_up, reward, is_done(ns_up))]
P[s][RIGHT] = [(1.0, ns_right, reward, is_done(ns_right))]
P[s][DOWN] = [(1.0, ns_down, reward, is_done(ns_down))]
P[s][LEFT] = [(1.0, ns_left, reward, is_done(ns_left))]
it.iternext()
# Initial state distribution is uniform
isd = np.ones(nS) / nS
# We expose the model of the environment for educational purposes
# This should not be used in any model-free learning algorithm
self.P = P
super(GridworldEnv, self).__init__(nS, nA, P, isd)
def _render(self, mode='human', close=False):
if close:
return
outfile = StringIO() if mode == 'ansi' else sys.stdout
grid = np.arange(self.nS).reshape(self.shape)
it = np.nditer(grid, flags=['multi_index'])
while not it.finished:
s = it.iterindex
y, x = it.multi_index
if self.s == s:
output = " x "
elif s == 0 or s == self.nS - 1:
output = " T "
else:
output = " o "
if x == 0:
output = output.lstrip()
if x == self.shape[1] - 1:
output = output.rstrip()
outfile.write(output)
if x == self.shape[1] - 1:
outfile.write("\n")
it.iternext()
2 过程代码实现
import numpy as np
from gridworld import GridworldEnv
env = GridworldEnv()
def value_iteration(env, theta=0.0001, discount_factor=1.0):
"""
Value Iteration Algorithm.
Args:
env: OpenAI environment. env.P represents the transition probabilities of the environment.
theta: Stopping threshold. If the value of all states changes less than theta
in one iteration we are done.
discount_factor: lambda time discount factor.
Returns:
A tuple (policy, V) of the optimal policy and the optimal value function.
"""
def one_step_lookahead(state, V):
"""
Helper function to calculate the value for all action in a given state.
Args:
state: The state to consider (int)
V: The value to use as an estimator, Vector of length env.nS
Returns:
A vector of length env.nA containing the expected value of each action.
"""
A = np.zeros(env.nA)
for a in range(env.nA):
for prob, next_state, reward, done in env.P[state][a]:
A[a] += prob * (reward + discount_factor * V[next_state])
return A
V = np.zeros(env.nS)
while True:
# Stopping condition
delta = 0
# Update each state...
for s in range(env.nS):
# Do a one-step lookahead to find the best action
A = one_step_lookahead(s, V)
best_action_value = np.max(A)
# Calculate delta across all states seen so far
delta = max(delta, np.abs(best_action_value - V[s]))
# Update the value function
V[s] = best_action_value
# Check if we can stop
if delta < theta:
break
# Create a deterministic policy using the optimal value function
policy = np.zeros([env.nS, env.nA])
for s in range(env.nS):
# One step lookahead to find the best action for this state
A = one_step_lookahead(s, V)
best_action = np.argmax(A)
# Always take the best action
policy[s, best_action] = 1.0
return policy, V
policy, v = value_iteration(env)
print("Policy Probability Distribution:")
print(policy)
print("")
print("Reshaped Grid Policy (0=up, 1=right, 2=down, 3=left):")
print(np.reshape(np.argmax(policy, axis=1), env.shape))
print("")