强化学习实例6:策略迭代法(policyiteration)

强化学习实例6:策略迭代法(policyiteration)
马尔可夫决策过程定义:
以上⼀节中蛇棋游戏为例,状态表⽰为100个格⼦,⾏动表⽰⽤哪种骰⼦,转移模型表⽰梯⼦,回报为是否到达终点,策略表⽰从开始到终点中的所有状态⾏动链:{(s0,a0), (s1,a1), ...... , (st, at}
衡量策略的价值:值函数,有两种:状态值函数和状态-⾏动值函数
累积回报:
状态值函数(回报在s状态的期望值):
状态-⾏为函数:
上⾯两个式的贝尔曼⽅程:
策略迭代法
强化学习就是到最优的策略,使每个状态的价值最⼤化,
⽽对于每个状态对应的⾏为,希望到使其价值最⼤化的⾏动:
评估算法使⽤⾼斯-赛德尔迭代法求解
代码:
import numpy as np
import gym
from gym.spaces import Discrete流体中文网
from gym.spaces import Discrete
惠尚学class SnakeEnv(gym.Env):
SIZE=100  # 格⼦数
def __init__(self, ladder_num, dices):
self.dices = dices            # 不同骰⼦⽅法的最⼤值
self.ladder_num = ladder_num  # 梯⼦数
# 构建梯⼦
self.ladders = dict(np.random.randint(1, self.SIZE, size=(self.ladder_num, 2)))        temp = dict()
for k,v in self.ladders.items():
temp[v] = k
self.ladders.update(temp)
self.pos = 1
self.observation_space=Discrete(self.SIZE+1)  # 状态空间
self.action_space=Discrete(len(dices))        # ⾏为
def reset(self):
self.pos = 1
return self.pos
def step(self, a):
step = np.random.randint(1, self.dices[a] + 1)  # a为选择的骰⼦编号,然后随机        self.pos += step
if self.pos == 100:
return 100, 100, 1, {}
elif self.pos > 100:
self.pos = 100*2 - self.pos
if self.pos in self.ladders:  # 是否有梯⼦
self.pos = self.ladders[self.pos]
return self.pos, -1, 0, {}
def reward(self, s):
if s == 100:  # 到达终点
return 100
else:
return -1
def render(self):
passshr
# 表格式 agent
class TableAgent(object):
def __init__(self, env):
self.s_len = env.observation_space.n    # 状态空间
self.a_len = env.action_space.n        # ⾏为空间
self.r = [ward(s) for s in range(0, self.s_len)]  # 状态回报记录
self.pi = np.array([0 for s in range(0, self.s_len)])    # 策略pi pi(a|s)
# 转移矩阵 p(s'|s, a)
self.p = np.zeros([self.a_len, self.s_len, self.s_len], dtype=np.float)
ladder_move = np.vectorize(lambda x: env.ladders[x] if x in env.ladders else x)        for i, dice in enumerate(env.dices): # 每个⾏为下的状态转移矩阵
prob = 1.0/dice
for src in range(1, 100):
step = np.arange(dice)
女同志小说step += src
step = np.piecewise(step, [step>100, step<=100],
[lambda x: 200-x, lambda x:x])
step = ladder_move(step)
for dst in step:
self.p[i, src, dst] += prob
self.p[:, 100, 100] = 1
# 状态值v(s)
self.value_pi = np.zeros((self.s_len))
self.value_pi = np.zeros((self.s_len))
self.value_q = np.zeros((self.s_len, self.a_len))  # 状态值函数q(s,a)
self.gamma = 0.8 # 打折率
def play(self, state):
return self.pi[state]
# 计算⼀个策略的回报
def eval_game(env, policy):
state = set()
return_val = 0
while True:
if isinstance(policy, TableAgent) or isinstance(policy, ModelFreeAgent):            act = policy.play(state)
elif isinstance(policy, list):
act = policy[state]
else:
raise Error("Illegal policy")
state, reward, terminate, _ = env.step(act)
return_val += reward
if terminate:
break
return return_val
policy_ref = [1] * 97 + [0] * 3
policy_0 = [0] * 100
policy_1 = [1] * 100
# 测试
def test_easy():
np.random.seed(0)
sum_opt = 0
sum_0 = 0
sum_1 = 0
env = SnakeEnv(0, [3, 6])
# 每⼀种策略进⾏1万局游戏
for i in range(10000):
sum_opt += eval_game(env, policy_ref)
sum_0 += eval_game(env, policy_0)
sum_1 += eval_game(env, policy_1)
print('opt avg={}'.format(sum_opt / 10000.0))
print('0 avg={}'.format(sum_0 / 10000.0))
print('1 avg={}'.format(sum_1 / 10000.0))
test_easy()  # 可以看出经过精⼼设计的获得最⾼的得分
策略迭代法
# 策略迭代法
class PolicyIteration(object):
# 策略评估
def policy_evaluation(self, agent, max_iter=-1):
iteration = 0
while True:
iteration += 1
new_value_pi = agent.py()
for i in range(1, agent.s_len):
value_sas = []
ac = agent.pi[i]
transition = agent.p[ac, i, :]
value_sa = np.dot(transition, agent.r+agent.gamma*agent.value_pi)                new_value_pi[i] = value_sa
diff = np.sqrt(np.sum(np.power(agent.value_pi - new_value_pi, 2)))
if diff < 1e-6:
break
再障
else:
agent.value_pi = new_value_pi
agent.value_pi = new_value_pi
if iteration == max_iter:
break
# 策略提升
def policy_improvement(self, agent):
new_policy = np.zeros_like(agent.pi)
for i in range(1, agent.s_len):
for j in range(0, agent.a_len):
agent.value_q[i,j] = np.dot(agent.p[j,i,:],
agent.r+agent.gamma*agent.value_pi)
max_act = np.argmax(agent.value_q[i,:])
new_policy[i] = max_act
if np.all(np.equal(new_policy, agent.pi)):
return False
else:
agent.pi = new_policy
return True
# 策略迭代
东乡论坛def policy_iteration(self, agent):
iteration = 0
while True:
iteration += 1
self.policy_evaluation(agent)
ret = self.policy_improvement(agent)
if not ret:
break
print('Iter {} rounds converge'.format(iteration))
def policy_iteration_demo1():
env = SnakeEnv(0, [3,6])  # 没有梯⼦,两个⾏为
agent = TableAgent(env)
pi_algo = PolicyIteration()
pi_algo.policy_iteration(agent)
print('return_pi={}'.format(eval_game(env, agent)))
print(agent.pi)
policy_iteration_demo1()
def policy_iteration_demo2():
env = SnakeEnv(10, [3,6])
agent = TableAgent(env)
agent.pi[:]=0
print('return3={}'.format(eval_game(env,agent)))
agent.pi[:]=1
print('return6={}'.format(eval_game(env,agent)))
agent.pi[97:100]=0
print('return_ensemble={}'.format(eval_game(env,agent)))    pi_algo = PolicyIteration()
pi_algo.policy_iteration(agent)
print('return_pi={}'.format(eval_game(env,agent)))
print(agent.pi)
policy_iteration_demo2()
从上⾯的结果可以看出,策略迭代算法最后的得分最⾼

本文发布于:2024-09-22 01:32:09,感谢您对本站的认可!

本文链接:https://www.17tex.com/xueshu/569750.html

版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。

标签:策略   状态   回报   函数
留言与评论(共有 0 条评论)
   
验证码:
Copyright ©2019-2024 Comsenz Inc.Powered by © 易纺专利技术学习网 豫ICP备2022007602号 豫公网安备41160202000603 站长QQ:729038198 关于我们 投诉建议