A2: 강화학습 (2) – 4x4 grid
강의영상
Game2: 4x4 grid
-
문제설명: 4x4 그리드월드에서 상하좌우로 움직이는 에이전트가 목표점에 도달하도록 학습하는 방법
imports
예비학습: 시각화
def show(states):
fig = plt.Figure()
ax = fig.subplots()
ax.matshow(np.zeros([4,4]), cmap='bwr',alpha=0.0)
sc = ax.scatter(0, 0, color='red', s=500)
ax.text(0, 0, 'start', ha='center', va='center')
ax.text(3, 3, 'end', ha='center', va='center')
# Adding grid lines to the plot
ax.set_xticks(np.arange(-.5, 4, 1), minor=True)
ax.set_yticks(np.arange(-.5, 4, 1), minor=True)
ax.grid(which='minor', color='black', linestyle='-', linewidth=2)
def update(t):
sc.set_offsets(states[t])
ani = FuncAnimation(fig,update,frames=len(states))
display(IPython.display.HTML(ani.to_jshtml()))
Env 클래스 구현
-
GridWorld: 강화학습에서 많이 예시로 사용되는 기본적인 시뮬레이션 환경
- State: 각 격자 셀이 하나의 상태이며, 에이전트는 이러한 상태 중 하나에 있을 수 있음.
- Action: 에이전트는 현재상태에서 다음상태로 이동하기 위해 상,하,좌,우 중 하나의 행동을 취할 수 있음.
- Reward: 에이전트가 현재상태에서 특정 action을 하면 얻어지는 보상
- Terminated: 하나의 에피소드가 종료되었음을 나타내는 상태
NameError: name 'action_to_direction' is not defined
class GridWorld:
def __init__(self):
self.reset()
self.state_space = gym.spaces.MultiDiscrete([4,4])
self.action_space = gym.spaces.Discrete(4)
self._action_to_direction = {
0 : np.array([1, 0]), # x+
1 : np.array([0, 1]), # y+
2 : np.array([-1 ,0]), # x-
3 : np.array([0, -1]) # y-
}
def reset(self):
self.agent_action = None
self.agent_state = np.array([0,0])
return self.agent_state
def step(self,action):
direction = self._action_to_direction[action]
self.agent_state = self.agent_state + direction
if self.agent_state not in env.state_space: # 4x4 그리드 밖에 있는 경우
reward = -10
terminated = True
self.agent_state = self.agent_state -1/2 * direction
elif np.array_equal(env.agent_state, np.array([3,3])): # 목표지점에 도달할 경우
reward = 100
terminated = True
else:
reward = -1
terminated = False
return self.agent_state, reward, terminated
Agent1 클래스 구현 + Run
-
우리가 구현하고 싶은 기능
.act()
: 액션을 결정 –> 여기서는 그냥 랜덤액션.save_experience()
: 데이터를 저장 –> 여기에 일단 초점을 맞추자.learn()
: 데이터로에서 학습 –> 패스
-
첫번째 시도
class Agent1:
def __init__(self,env):
self.action_space = env.action_space
self.state_spcae = env.state_space
self.n_experiences = 0
self.n_episodes = 0
self.score = 0
# episode-wise info
self.scores = []
self.playtimes = []
# time-wise info
self.current_state = None
self.action = None
self.reward = None
self.next_state = None
self.terminated = None
# replay_buffer
self.actions = []
self.current_states = []
self.rewards = []
self.next_states = []
self.terminations = []
def act(self):
self.action = self.action_space.sample()
def save_experience(self):
self.actions.append(self.action)
self.current_states.append(self.current_state)
self.rewards.append(self.reward)
self.next_states.append(self.next_state)
self.terminations.append(self.terminated)
self.n_experiences += 1
self.score = self.score + self.reward
def learn(self):
pass
env = GridWorld()
agent = Agent1(env)
for _ in range(20):
## 본질적인 코드
agent.current_state = env.reset()
agent.terminated = False
agent.score = 0
for t in range(50):
# step1: agent >> env
agent.act()
env.agent_action = agent.action
# step2: agent << env
agent.next_state, agent.reward, agent.terminated = env.step(env.agent_action)
agent.save_experience()
# step3: learn
# agent.learn()
# step4: state update
agent.current_state = agent.next_state
# step5:
if agent.terminated: break
agent.scores.append(agent.score)
agent.playtimes.append(t+1)
agent.n_episodes = agent.n_episodes + 1
## 덜 본질적인 코드
print(
f"Epsiode: {agent.n_episodes} \t"
f"Score: {agent.scores[-1]} \t"
f"Playtime: {agent.playtimes[-1]}"
)
Epsiode: 1 Score: -21 Playtime: 12
Epsiode: 2 Score: -10 Playtime: 1
Epsiode: 3 Score: -11 Playtime: 2
Epsiode: 4 Score: -10 Playtime: 1
Epsiode: 5 Score: -10 Playtime: 1
Epsiode: 6 Score: -11 Playtime: 2
Epsiode: 7 Score: -18 Playtime: 9
Epsiode: 8 Score: 93 Playtime: 8
Epsiode: 9 Score: -13 Playtime: 4
Epsiode: 10 Score: -13 Playtime: 4
Epsiode: 11 Score: -18 Playtime: 9
Epsiode: 12 Score: -10 Playtime: 1
Epsiode: 13 Score: -10 Playtime: 1
Epsiode: 14 Score: -10 Playtime: 1
Epsiode: 15 Score: -10 Playtime: 1
Epsiode: 16 Score: -16 Playtime: 7
Epsiode: 17 Score: -10 Playtime: 1
Epsiode: 18 Score: -24 Playtime: 15
Epsiode: 19 Score: -13 Playtime: 4
Epsiode: 20 Score: -10 Playtime: 1
- 우연히 잘맞춘 케이스
환경의 이해 (1차원적 이해)
-
무작위로 10000판을 진행해보자.
env = GridWorld()
agent = Agent1(env)
for _ in range(10000):
## 본질적인 코드
agent.current_state = env.reset()
agent.terminated = False
agent.score = 0
for t in range(50):
# step1: agent >> env
agent.act()
env.agent_action = agent.action
# step2: agent << env
agent.next_state, agent.reward, agent.terminated = env.step(env.agent_action)
agent.save_experience()
# step3: learn
# agent.learn()
# step4: state update
agent.current_state = agent.next_state
# step5:
if agent.terminated: break
agent.scores.append(agent.score)
agent.playtimes.append(t+1)
agent.n_episodes = agent.n_episodes + 1
-
데이터관찰
(array([0, 0]), 0, -1, array([1, 0]))
(array([1, 0]), 1, -1, array([1, 1]))
(array([1, 1]), 1, -1, array([1, 2]))
(array([1, 2]), 2, -1, array([0, 2]))
(array([0, 2]), 1, -1, array([0, 3]))
-
환경을 이해하기 위한 기록 (1)
array([[-10., -1., -1., -1.],
[-10., -1., -1., -1.],
[-10., -1., -1., -1.],
[-10., -1., -1., 0.]])
action = 0
action-value function =
[[ -1. -1. -1. -1.]
[ -1. -1. -1. -1.]
[ -1. -1. -1. 100.]
[-10. -10. -10. 0.]]
action = 1
action-value function =
[[ -1. -1. -1. -10.]
[ -1. -1. -1. -10.]
[ -1. -1. -1. -10.]
[ -1. -1. 100. 0.]]
action = 2
action-value function =
[[-10. -10. -10. -10.]
[ -1. -1. -1. -1.]
[ -1. -1. -1. -1.]
[ -1. -1. -1. 0.]]
action = 3
action-value function =
[[-10. -1. -1. -1.]
[-10. -1. -1. -1.]
[-10. -1. -1. -1.]
[-10. -1. -1. 0.]]
-
환경을 이해하기 위한 기록 (2)
q = np.zeros([4,4,4])
for i in range(agent.n_experiences):
x,y = agent.current_states[i]
a = agent.actions[i]
q_estimated = q[x,y,a] # 우리가 환경을 이해하고 있는 값, 우리가 풀어낸 답
q_realistic = agent.rewards[i] # 실제 답
diff = q_realistic - q_estimated # 실제답과 풀이한값의 차이 = 오차피드백값
q[x,y,a] = q_estimated + 0.05 * diff ## 새로운답 = 원래답 + 오차피드백값
action = 0
action-value function =
[[-1. -1. -1. -0.99866234]
[-1. -1. -1. -0.99851783]
[-0.99999999 -1. -0.99999593 98.43103943]
[-9.97394217 -9.99697776 -9.93439857 0. ]]
action = 1
action-value function =
[[-1. -1. -1. -9.98591939]
[-1. -1. -0.99999996 -9.99588862]
[-1. -0.99999999 -0.99999593 -9.92731143]
[-0.99915694 -0.99971289 98.50948746 0. ]]
action = 2
action-value function =
[[-10. -10. -9.99999999 -9.99065864]
[ -1. -1. -0.99999999 -0.99923914]
[ -1. -1. -0.99999321 -0.9884667 ]
[ -0.99946866 -0.99981905 -0.99465672 0. ]]
action = 3
action-value function =
[[-10. -1. -1. -0.99919909]
[-10. -1. -1. -0.99866234]
[ -9.99999999 -1. -0.99999285 -0.99541881]
[ -9.99347658 -0.99987363 -0.99776587 0. ]]
환경의 깊은 이해 (좀 더 고차원적인 이해)
-
action=1 일때 각 state의 가치 (=기대보상)
array([[-1. , -1. , -1. , -9.98591939],
[-1. , -1. , -0.99999996, -9.99588862],
[-1. , -0.99999999, -0.99999593, -9.92731143],
[-0.99915694, -0.99971289, 98.50948746, 0. ]])
-
분석1
- 상태 (3,2)에서 행동 1을 하게되면 100의 보상을 얻으므로 기대보상값은 100근처 –> 합리적임
-
분석2
- 상태 (3,1)에서 행동 1을 하게되면 -1 의 보상을 얻으므로 기대보상값은 -1 근처 –> 합리적일까??
-
비판: 분석2는 합리적인것 처럼 보이지만 data를 분석한 뒤에는 그다지 합리적이지 못함
-
상황상상
- 빈 종이를 줌
- 빈 종이에는 0 또는 1을 쓸 수 있음 (action = 0 혹은 1)
- 0을 쓸때와 1을 쓸때 보상이 다름
- 무수히 많은 데이터를 분석해보니, 0을 쓰면 0원을 주고 1을 쓰면 10만원을 보상을 준다는 것을 “알게 되었음”
- 이때 빈 종이의 가치는 5만원인가? 10만원인가? –> 10만원아니야?
-
직관: 생각해보니 현재 \(s=(3,1)\) \(a=1\)에서 추정된(esitated) 값은 q[3,1,1]= -0.9997128867462345
이지만1, 현실적으로는 “실제보상(-1)과 잠재적보상(100)”을 동시에 고려해야 하는게 합리적임
1 즉 next_state가 가지는 잠재적값어치는 고려되어있지 않음
- 여기에서 0.99는 “미래에 받을 보상이 현재에 비해 얼마나 중요한지를 결정하는 가중치” 이다.
- 1에 가까울수록 미래에 받을 보상을 매우 중시한다는 의미 (즉 빈종이= 십만원 으로 생각한다는 의미)
-
즉 \(q(s,a)\)는 모든 \(s\), \(a\)에 대하여
\[q(s,a) \approx \text{reward}(s,a) + 0.99 \times \max_{a}q(s',a)\]
가 성립한다면 \(q(s,a)\)는 타당하게 추정된 것이라 볼 수 있다. 물론 수식을 좀 더 엄밀하게 쓰면 아래와 같다.
\[q(s,a) \approx \begin{cases} \text{reward}(s,a) & \text{terminated} \\ \text{reward}(s,a) + 0.99 \times \max_{a}q(s',a) & \text{not terminated}\end{cases}\]
q = np.zeros([4,4,4])
for i in range(agent.n_experiences):
x,y = agent.current_states[i]
xx,yy = agent.next_states[i]
a = agent.actions[i]
q_estimated = q[x,y,a]
if agent.terminations[i]:
q_realistic = agent.rewards[i]
else:
q_future = q[xx,yy,:].max()
q_realistic = agent.rewards[i] + 0.99 * q_future
diff = q_realistic - q_estimated
q[x,y,a] = q_estimated + 0.05 * diff
action = 0
action-value function =
[[87.02554961 88.94759484 90.75390245 88.54847007]
[88.4709728 91.06852327 93.18709107 94.21998722]
[84.98258538 91.44091272 95.48024593 98.43103943]
[-9.97394217 -9.99697776 -9.93439857 0. ]]
action = 1
action-value function =
[[87.01670813 88.59888111 85.52951661 -9.98591939]
[88.98190464 91.03081993 91.50379877 -9.99588862]
[90.76721433 93.24316728 95.65715857 -9.92731143]
[89.20612688 94.47295823 98.50948746 0. ]]
action = 2
action-value function =
[[-10. -10. -9.99999999 -9.99065864]
[ 84.96179325 86.84873675 88.0518007 80.10750712]
[ 86.40784936 88.69218405 89.83203868 83.06339754]
[ 86.40852121 89.09508079 89.87262647 0. ]]
action = 3
action-value function =
[[-10. 84.96186287 86.49128928 84.57992176]
[-10. 86.73523202 88.56505447 86.7154156 ]
[ -9.99999999 88.3058275 90.27264766 87.96618484]
[ -9.99347658 80.88548565 86.63274331 0. ]]
행동 전략 수립
-
상태 (0,0)에 있다고 가정해보자.
- 행동 0 혹은 행동 1을 하는게 유리하다. // 행동 2,3을 하면 망한다.
-
상태 (2,3)에 있다고 가정해보자.
- 행동 0을 하는게 유리함.
-
상태 (3,2)에 있다고 가정해보자.
- 행동1을 하는게 유리함
-
각 상태에서 최적은 action은 아래와 같다.
-
전략(=정책)을 정리해보자.
array([['?????', '?????', '?????', '?????'],
['?????', '?????', '?????', '?????'],
['?????', '?????', '?????', '?????'],
['?????', '?????', '?????', '?????']], dtype='<U5')
array([['down', 'down', 'down', 'down'],
['right', 'down', 'down', 'down'],
['right', 'right', 'right', 'down'],
['right', 'right', 'right', 'down']], dtype='<U5')
Agent2 클래스 구현 + Run
class Agent2(Agent1):
def __init__(self,env):
super().__init__(env)
self.q = np.zeros([4,4,4])
def learn(self):
x,y = self.current_state
xx,yy = self.next_state
a = self.action
q_estimated = self.q[x,y,a]
if self.terminated:
q_realistic = self.reward
else:
q_future = q[xx,yy,:].max()
q_realistic = self.reward + 0.99 * q_future
diff = q_realistic - q_estimated
self.q[x,y,a] = q_estimated + 0.05 * diff
def act(self):
if self.n_experiences < 3000:
self.action = self.action_space.sample()
else:
x,y = self.current_state
self.action = self.q[x,y,:].argmax()
env = GridWorld()
agent = Agent2(env)
for _ in range(2000):
## 본질적인 코드
agent.current_state = env.reset()
agent.terminated = False
agent.score = 0
for t in range(50):
# step1: agent >> env
agent.act()
env.agent_action = agent.action
# step2: agent << env
agent.next_state, agent.reward, agent.terminated = env.step(env.agent_action)
agent.save_experience()
# step3: learn
agent.learn()
# step4: state update
agent.current_state = agent.next_state
# step5:
if agent.terminated: break
agent.scores.append(agent.score)
agent.playtimes.append(t+1)
agent.n_episodes = agent.n_episodes + 1
## 덜 본질적인 코드
if (agent.n_episodes % 100) ==0:
print(
f"Epsiode: {agent.n_episodes} \t"
f"Score: {np.mean(agent.scores[-100:])} \t"
f"Playtime: {np.mean(agent.playtimes[-100:])}"
)
Epsiode: 100 Score: -10.36 Playtime: 3.56
Epsiode: 200 Score: -10.9 Playtime: 3.0
Epsiode: 300 Score: -11.02 Playtime: 3.12
Epsiode: 400 Score: -6.64 Playtime: 4.24
Epsiode: 500 Score: -11.08 Playtime: 3.18
Epsiode: 600 Score: -10.53 Playtime: 3.73
Epsiode: 700 Score: -9.96 Playtime: 3.16
Epsiode: 800 Score: -8.6 Playtime: 2.9
Epsiode: 900 Score: -13.6 Playtime: 7.61
Epsiode: 1000 Score: -50.0 Playtime: 50.0
Epsiode: 1100 Score: -50.0 Playtime: 50.0
Epsiode: 1200 Score: -50.0 Playtime: 50.0
Epsiode: 1300 Score: -50.0 Playtime: 50.0
Epsiode: 1400 Score: -50.0 Playtime: 50.0
Epsiode: 1500 Score: -50.0 Playtime: 50.0
Epsiode: 1600 Score: -50.0 Playtime: 50.0
Epsiode: 1700 Score: -50.0 Playtime: 50.0
Epsiode: 1800 Score: -50.0 Playtime: 50.0
Epsiode: 1900 Score: -50.0 Playtime: 50.0
Epsiode: 2000 Score: -50.0 Playtime: 50.0
Agnet3 클래스 구현 + Run
env = GridWorld()
agent = Agent3(env)
agent.eps = 1
for _ in range(5000):
## 본질적인 코드
agent.current_state = env.reset()
agent.terminated = False
agent.score = 0
for t in range(50):
# step1: agent >> env
agent.act()
env.agent_action = agent.action
# step2: agent << env
agent.next_state, agent.reward, agent.terminated = env.step(env.agent_action)
agent.save_experience()
# step3: learn
agent.learn()
# step4: state update
agent.current_state = agent.next_state
# step5:
if agent.terminated: break
agent.scores.append(agent.score)
agent.playtimes.append(t+1)
agent.n_episodes = agent.n_episodes + 1
agent.eps = agent.eps * 0.999
## 덜 본질적인 코드
if (agent.n_episodes % 200) ==0:
print(
f"Epsiode: {agent.n_episodes} \t"
f"Score: {np.mean(agent.scores[-100:])} \t"
f"Playtime: {np.mean(agent.playtimes[-100:])}\t"
f"Epsilon: {agent.eps : .2f}"
)
Epsiode: 200 Score: -8.49 Playtime: 3.89 Epsilon: 0.82
Epsiode: 400 Score: -9.83 Playtime: 4.13 Epsilon: 0.67
Epsiode: 600 Score: -10.72 Playtime: 6.12 Epsilon: 0.55
Epsiode: 800 Score: -7.08 Playtime: 7.98 Epsilon: 0.45
Epsiode: 1000 Score: -1.87 Playtime: 10.65 Epsilon: 0.37
Epsiode: 1200 Score: 28.23 Playtime: 10.16 Epsilon: 0.30
Epsiode: 1400 Score: 61.38 Playtime: 6.62 Epsilon: 0.25
Epsiode: 1600 Score: 66.42 Playtime: 5.98 Epsilon: 0.20
Epsiode: 1800 Score: 74.94 Playtime: 6.26 Epsilon: 0.17
Epsiode: 2000 Score: 75.29 Playtime: 5.91 Epsilon: 0.14
Epsiode: 2200 Score: 77.24 Playtime: 6.16 Epsilon: 0.11
Epsiode: 2400 Score: 86.1 Playtime: 6.1 Epsilon: 0.09
Epsiode: 2600 Score: 83.81 Playtime: 6.19 Epsilon: 0.07
Epsiode: 2800 Score: 87.27 Playtime: 6.03 Epsilon: 0.06
Epsiode: 3000 Score: 86.1 Playtime: 6.1 Epsilon: 0.05
Epsiode: 3200 Score: 87.37 Playtime: 5.93 Epsilon: 0.04
Epsiode: 3400 Score: 93.68 Playtime: 6.22 Epsilon: 0.03
Epsiode: 3600 Score: 90.58 Playtime: 6.02 Epsilon: 0.03
Epsiode: 3800 Score: 92.77 Playtime: 6.03 Epsilon: 0.02
Epsiode: 4000 Score: 93.79 Playtime: 6.11 Epsilon: 0.02
Epsiode: 4200 Score: 94.88 Playtime: 6.12 Epsilon: 0.01
Epsiode: 4400 Score: 92.85 Playtime: 5.95 Epsilon: 0.01
Epsiode: 4600 Score: 94.96 Playtime: 6.04 Epsilon: 0.01
Epsiode: 4800 Score: 94.92 Playtime: 6.08 Epsilon: 0.01
Epsiode: 5000 Score: 93.9 Playtime: 6.0 Epsilon: 0.01