#!pip install gymnasium
#---#
import gymnasium as gym
#---#
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
import IPython14wk-2: 강화학습 (3) – 4x4 Grid World (AgentGreedy,AgentExplorer)
1. 강의영상
2. Imports
3. 필요한 클래스 및 함수선언
action_to_direction = {
0 : np.array([1, 0]), # row+, down
1 : np.array([0, 1]), # col+, right
2 : np.array([-1 ,0]), # row-, up
3 : np.array([0, -1]) # col-, left
}
action_to_direction2 = {0: 'down', 1: 'right', 2: 'up', 3: 'left'} # 당장쓰진 않지만 하는김에 def show(states):
fig = plt.Figure()
ax = fig.subplots()
ax.matshow(np.zeros([4,4]), cmap='bwr',alpha=0.0)
sc = ax.scatter(0, 0, color='red', s=500)
ax.text(0, 0, 'start', ha='center', va='center')
ax.text(3, 3, 'end', ha='center', va='center')
# Adding grid lines to the plot
ax.set_xticks(np.arange(-.5, 4, 1), minor=True)
ax.set_yticks(np.arange(-.5, 4, 1), minor=True)
ax.grid(which='minor', color='black', linestyle='-', linewidth=2)
state_space = gym.spaces.MultiDiscrete([4,4])
def update(t):
if states[t] in state_space:
s1,s2 = states[t]
states[t] = [s2,s1]
sc.set_offsets(states[t])
else:
s1,s2 = states[t]
s1 = s1 + 0.5 if s1 < 0 else (s1 - 0.5 if s1 > 3 else s1)
s2 = s2 + 0.5 if s2 < 0 else (s2 - 0.5 if s2 > 3 else s2)
states[t] = [s2,s1]
sc.set_offsets(states[t])
ani = FuncAnimation(fig,update,frames=len(states))
display(IPython.display.HTML(ani.to_jshtml()))class GridWorld:
def __init__(self):
self.state_space = gym.spaces.MultiDiscrete([4,4])
self.action_space = gym.spaces.Discrete(4)
self._action_to_direction = {
0 : np.array([1, 0]), # row+, down
1 : np.array([0, 1]), # col+, right
2 : np.array([-1 ,0]), # row-, up
3 : np.array([0, -1]) # col-, left
}
self.reset()
self.state = None
self.reward = None
self.termiated = None
def step(self,action):
direction = self._action_to_direction[action]
self.state = self.state + direction
if np.array_equal(self.state,np.array([3,3])):
self.reward = 100
self.terminated = True
elif self.state not in self.state_space:
self.reward = -10
self.terminated = True
else:
self.reward = -1
return self.state, self.reward, self.terminated
def reset(self):
self.state = np.array([0,0])
self.terminated = False
return self.state class AgentRandom:
def __init__(self,env):
#--# define spaces
self.action_space = env.action_space
self.state_space = env.state_space
#--# replay buffer
self.action = None
self.actions = []
self.current_state = None
self.current_states = []
self.reward = None
self.rewards = []
self.next_state = None
self.next_states = []
self.terminated = None
self.terminations = []
#--# other information
self.n_episodes = 0
self.n_experiences = 0
self.score = 0
self.playtimes = []
self.scores = []
def act(self):
self.action = self.action_space.sample()
def learn(self):
pass
def save_experience(self):
self.current_states.append(self.current_state)
self.actions.append(self.action)
self.rewards.append(self.reward)
self.next_states.append(self.next_state)
self.terminations.append(self.terminated)
#--#
self.n_experiences = self.n_experiences + 1
self.score = self.score + self.reward4. AgentGreedy
A. 환경의 이해
- 랜덤에이전트를 이용해 무작위로 10000판을 진행해보자.
env = GridWorld()
agent = AgentRandom(env)
for _ in range(10000):
# Step1: 에피소드 준비
agent.current_state = env.reset()
agent.terminated = False
agent.score = 0
# Step2: 에피소드 진행
for t in range(1,51):
# step1: 행동
agent.act()
# step2: 보상
agent.next_state, agent.reward, agent.terminated = env.step(agent.action)
# step3: 저장 & 학습
agent.save_experience()
agent.learn() # 사실학습하는 함수는 dummy 함수임..
# step4: 다음 스텝준비
agent.current_state = agent.next_state
if agent.terminated: break
# Step3: 다음에피소드 준비
agent.scores.append(agent.score)
agent.playtimes.append(t)
agent.n_episodes = agent.n_episodes + 1 agent.n_experiences32507
- 데이터관찰
agent.rewards[0], agent.next_states[0](-10, array([ 0, -1]))
print(f"에이전트: 현재상태/행동 = {agent.current_states[0]} / {agent.actions[0],action_to_direction2[agent.actions[0]]}")
print(f"환경: 보상/다음상태 = {agent.rewards[0]} / {agent.next_states[0]}")에이전트: 현재상태/행동 = [0 0] / (3, 'left')
환경: 보상/다음상태 = -10 / [ 0 -1]
print(f"에이전트: 현재상태/행동 = {agent.current_states[1]} / {agent.actions[1],action_to_direction2[agent.actions[1]]}")
print(f"환경: 보상/다음상태 = {agent.rewards[1]} / {agent.next_states[1]}")에이전트: 현재상태/행동 = [0 0] / (2, 'up')
환경: 보상/다음상태 = -10 / [-1 0]
print(f"에이전트: 현재상태/행동 = {agent.current_states[2]} / {agent.actions[2],action_to_direction2[agent.actions[2]]}")
print(f"환경: 보상/다음상태 = {agent.rewards[2]} / {agent.next_states[2]}")에이전트: 현재상태/행동 = [0 0] / (2, 'up')
환경: 보상/다음상태 = -10 / [-1 0]
print(f"에이전트: 현재상태/행동 = {agent.current_states[3]} / {agent.actions[3],action_to_direction2[agent.actions[3]]}")
print(f"환경: 보상/다음상태 = {agent.rewards[3]} / {agent.next_states[3]}")에이전트: 현재상태/행동 = [0 0] / (3, 'left')
환경: 보상/다음상태 = -10 / [ 0 -1]
print(f"에이전트: 현재상태/행동 = {agent.current_states[4]} / {agent.actions[4],action_to_direction2[agent.actions[4]]}")
print(f"환경: 보상/다음상태 = {agent.rewards[4]} / {agent.next_states[4]}")에이전트: 현재상태/행동 = [0 0] / (1, 'right')
환경: 보상/다음상태 = -1 / [0 1]
print(f"에이전트: 현재상태/행동 = {agent.current_states[5]} / {agent.actions[5],action_to_direction2[agent.actions[5]]}")
print(f"환경: 보상/다음상태 = {agent.rewards[5]} / {agent.next_states[5]}")에이전트: 현재상태/행동 = [0 1] / (1, 'right')
환경: 보상/다음상태 = -1 / [0 2]
print(f"에이전트: 현재상태/행동 = {agent.current_states[6]} / {agent.actions[6],action_to_direction2[agent.actions[6]]}")
print(f"환경: 보상/다음상태 = {agent.rewards[6]} / {agent.next_states[6]}")에이전트: 현재상태/행동 = [0 2] / (2, 'up')
환경: 보상/다음상태 = -10 / [-1 2]
- 환경을 이해하기 위한 기록 (1)
q_table = np.zeros([4,4,4])
count = np.zeros([4,4,4])
for i in range(agent.n_experiences):
s1,s2 = agent.current_states[i]
a = agent.actions[i]
r = agent.rewards[i]
q_table[s1,s2,a] = q_table[s1,s2,a] + r
count[s1,s2,a] = count[s1,s2,a] + 1 q_table[0,0,:]array([ -2983., -2960., -29690., -30800.])
count[count==0] = 0.01
q_table = q_table / count for i in range(4):
print(f"action = {i}/{action_to_direction2[i]}")
print(f"action-value function = \n{q_table[:,:,i]}\n")action = 0/down
action-value function =
[[ -1. -1. -1. -1.]
[ -1. -1. -1. -1.]
[ -1. -1. -1. 100.]
[-10. -10. -10. 0.]]
action = 1/right
action-value function =
[[ -1. -1. -1. -10.]
[ -1. -1. -1. -10.]
[ -1. -1. -1. -10.]
[ -1. -1. 100. 0.]]
action = 2/up
action-value function =
[[-10. -10. -10. -10.]
[ -1. -1. -1. -1.]
[ -1. -1. -1. -1.]
[ -1. -1. -1. 0.]]
action = 3/left
action-value function =
[[-10. -1. -1. -1.]
[-10. -1. -1. -1.]
[-10. -1. -1. -1.]
[-10. -1. -1. 0.]]
- 환경을 이해하기 위한 기록 (2)
q_table = np.zeros([4,4,4])
for i in range(agent.n_experiences):
s1,s2 = agent.current_states[i]
a = agent.actions[i]
r = agent.rewards[i]
q_hat = q_table[s1,s2,a] # 우리가 환경을 이해해서 얻은값, 우리가 풀어낸 답
q = r # 실제답
diff = q - q_hat # 실제답과 풀이한값의 차이 = 오차피드백값
q_table[s1,s2,a] = q_hat + 0.05 * diff for i in range(4):
print(f"action = {i}/{action_to_direction2[i]}")
print(f"action-value function = \n{q_table[:,:,i].round(2)}\n")action = 0/down
action-value function =
[[ -1. -1. -1. -1. ]
[ -1. -1. -1. -1. ]
[ -1. -1. -1. 98.72]
[-10. -9.99 -9.91 0. ]]
action = 1/right
action-value function =
[[-1. -1. -1. -9.99]
[-1. -1. -1. -9.99]
[-1. -1. -1. -9.93]
[-1. -1. 98.9 0. ]]
action = 2/up
action-value function =
[[-10. -10. -10. -9.97]
[ -1. -1. -1. -1. ]
[ -1. -1. -1. -0.99]
[ -1. -1. -0.99 0. ]]
action = 3/left
action-value function =
[[-10. -1. -1. -1. ]
[-10. -1. -1. -1. ]
[-10. -1. -1. -0.98]
[ -9.98 -1. -0.99 0. ]]
B. 환경의 깊은 이해
- 분석1: row=3, col=2 상태에서 행동1(right)에 대한 가치
q_table[3,2,1]98.90433632025939
- 상태 (3,2)에서 행동1을 하게되면 100의 보상을 얻으므로
q_table[3,2,1] = 98.904는 합리적임
- 분석2: row=3, col=1 상태에서 행동1에(right)에 대한 가치
q_table[3,1,1]-0.9990658635484849
- 상태 (3,1)에서 행동1을 하게되면 -1 의 보상을 얻으므로
q_table[3,1,1] = - 0.999는 합리적인가??
- 비판: 분석2는 합리적인것 처럼 보이지만 data를 분석한 뒤에는 그다지 합리적이지 못함.
- 상황상상
- 빈 종이를 줌
- 빈 종이에는 0 또는 1을 쓸 수 있음 (action = 0 혹은 1)
- 0을 쓸때와 1을 쓸때 보상이 다름
- 무수히 많은 데이터를 분석해보니, 0을 쓰면 0원을 주고 1을 쓰면 10만원을 보상을 준다는 것을 “알게 되었음”
- 이때 빈 종이의 가치는 5만원인가? 10만원인가? –> 거의 10만원아니야? (9.99만원쯤?)
- 직관: 생각해보니 현재 \(s=(3,1)\) \(a=1\)에서 추정된(esitated) 값은 q_table[3,1,1] \(\approx\) -1 이지만1, 현실적으로는 “실제보상(-1)과 잠재적보상(100)”을 동시에 고려해야 하는게 합리적임
1 즉 next_state가 가지는 잠재적값어치는 고려되어있지 않음
q_hat = q_table[3,1,1]
q_hat-0.9990658635484849
q = (-1) + 0.99 * 100
q98.0
- 여기에서 0.99는 “미래에 받을 보상이 현재에 비해 얼마나 중요한지를 결정하는 가중치” 이다.
- 1에 가까울수록 미래에 받을 보상을 매우 중시한다는 의미 (즉 빈종이 \(\approx\) 십만원 으로 생각한다는 의미)
- 0.99는 보통 \(\gamma\)라는 기호로 표기하며
discount rate이라고 표현한다. (외우세여)
- 즉 \(q(s,a)\)는 모든 \(s\), \(a\)에 대하여
\[q(s,a) \approx \text{reward}(s,a) + 0.99 \times \max_{a}q(s',a)\]
가 성립한다면 \(q(s,a)\)는 타당하게 추정된 것이라 볼 수 있다. 물론 수식을 좀 더 엄밀하게 쓰면 (terminated, not-terminated 로 나누어 쓰면) 아래와 같다.
\[q(s,a) \approx \begin{cases} \text{reward}(s,a) + 0.99 \times \max_{a}q(s',a) & \text{not terminated} \\ \text{reward}(s,a) & \text{terminated} \end{cases}\]
대충 설명하면서 넘어갔지만 이 수식을 벨만방정식이라고 부른다. (외우세여) 위의 식은 강화학습에서 가장 중요한 식이며 원래 버전은 아래와 같다.
\[Q^\star(s,a) = R(s,a) +\gamma\sum_{s'}P(s'|s,a)\max_{a}Q(s',a)\]
여기에서 \(P(s'|s,a)\) 는 상태 \(s \in {\cal S}\)에서 행동 \(a \in {\cal A}\)를 했을때 \(s'\)에 있을 확률이다. 이러한 확률은 “바람,소용돌이” 등의 외부의 확률적인 요소가 있는 환경에서 의미가 있으며 우리의 예제에서는 의미가 없다.
q_table = np.zeros([4,4,4])
for i in range(agent.n_experiences):
s1,s2 = agent.current_states[i]
ss1,ss2 = agent.next_states[i]
a = agent.actions[i]
r = agent.rewards[i]
q_hat = q_table[s1,s2,a] # 우리가 환경을 이해해서 얻은값, 우리가 풀어낸 답
if agent.terminations[i]:
q = r
else:
future_reward = q_table[ss1,ss2,:].max()
q = r + 0.99 * future_reward
diff = q - q_hat # 실제답과 풀이한값의 차이 = 오차피드백값
q_table[s1,s2,a] = q_hat + 0.05 * diff for i in range(4):
print(f"action = {i}/{action_to_direction2[i]}")
print(f"action-value function = \n{q_table[:,:,i].round(2)}\n")action = 0/down
action-value function =
[[ 87.57 89.5 91.05 89.46]
[ 89.25 91.56 93.71 95.09]
[ 84.44 91.63 96.13 98.72]
[-10. -9.99 -9.91 0. ]]
action = 1/right
action-value function =
[[87.55 88.89 86.05 -9.99]
[89.51 91.48 92.34 -9.99]
[91.28 93.68 96. -9.93]
[87.7 94.52 98.9 0. ]]
action = 2/up
action-value function =
[[-10. -10. -10. -9.97]
[ 85.52 87.25 88.12 80.83]
[ 86.99 88.99 90.17 86.96]
[ 85.85 88.51 89.37 0. ]]
action = 3/left
action-value function =
[[-10. 85.5 86.94 84.25]
[-10. 87.39 88.97 89.31]
[-10. 88.83 90.9 86.27]
[ -9.98 80.23 81.86 0. ]]
q_table.max(axis=-1)array([[87.5672123 , 89.49562198, 91.04523609, 89.45787756],
[89.50614803, 91.56172927, 93.70863488, 95.08948559],
[91.27612012, 93.67916052, 96.1257449 , 98.72207181],
[87.70384078, 94.51521902, 98.90433632, 0. ]])
C. 행동 전략 수립
- 상태 (0,0)에 있다고 가정해보자.
print(q_table[0,0,:])
print(action_to_direction2)[ 87.5672123 87.54804715 -10. -10. ]
{0: 'down', 1: 'right', 2: 'up', 3: 'left'}
- 행동 0 혹은 행동 1을 하는게 유리하다. // 행동 2,3을 하면 망한다.
- 상태 (2,3)에 있다고 가정해보자.
print(q_table[2,3,:])
print(action_to_direction2)[98.72207181 -9.92731143 86.96434157 86.27101599]
{0: 'down', 1: 'right', 2: 'up', 3: 'left'}
- 행동 0을 하는게 유리함.
- 상태 (3,2)에 있다고 가정해보자.
print(q_table[3,2,:])
print(action_to_direction2)[-9.90606054 98.90433632 89.37012074 81.86454084]
{0: 'down', 1: 'right', 2: 'up', 3: 'left'}
- 행동1을 하는게 유리함
- 위에서 제시한 각 상태에서 최적은 action은 아래와 같다.
print(q_table[0,0,:].argmax())
print(q_table[2,3,:].argmax())
print(q_table[3,2,:].argmax())0
0
1
- 전략(=정책)을 정리해보자.
(ver1)
q_table.argmax(axis=-1)array([[0, 0, 0, 0],
[1, 0, 0, 0],
[1, 1, 0, 0],
[1, 1, 1, 0]])
(ver2)
policy = np.array(["?????"]*16).reshape(4,4)
policyarray([['?????', '?????', '?????', '?????'],
['?????', '?????', '?????', '?????'],
['?????', '?????', '?????', '?????'],
['?????', '?????', '?????', '?????']], dtype='<U5')
for s1 in range(4):
for s2 in range(4):
policy[s1,s2] = action_to_direction2[q_table[s1,s2,:].argmax()]
policyarray([['down', 'down', 'down', 'down'],
['right', 'down', 'down', 'down'],
['right', 'right', 'down', 'down'],
['right', 'right', 'right', 'down']], dtype='<U5')
D. 에이전트 클래스 설계
q_table[0,0,:]array([ 87.5672123 , 87.54804715, -10. , -10. ])
class AgentGreedy(AgentRandom):
def __init__(self,env):
super().__init__(env)
#--#
self.q_table = np.zeros([4,4,4])
def learn(self): # q_table
s1,s2 = self.current_state
ss1,ss2 = self.next_state
a = self.action
r = self.reward
q_hat = self.q_table[s1,s2,a] # 우리가 환경을 이해해서 얻은값, 우리가 풀어낸 답
if self.terminated:
q = r
else:
future_reward = self.q_table[ss1,ss2,:].max()
q = r + 0.99 * future_reward
diff = q - q_hat
self.q_table[s1,s2,a] = q_hat + 0.05 * diff
def act(self):
if self.n_experiences < 3000:
self.action = self.action_space.sample()
else:
s1,s2 = self.current_state
self.action = self.q_table[s1,s2,:].argmax() # 그리디..E. 환경과 상호작용
env = GridWorld()
agent = AgentGreedy(env)
for _ in range(3000):
# Step1: 에피소드 준비
agent.current_state = env.reset()
agent.terminated = False
agent.score = 0
# Step2: 에피소드 진행
for t in range(1,51):
# step1: 행동
agent.act()
# step2: 보상
agent.next_state, agent.reward, agent.terminated = env.step(agent.action)
# step3: 저장 & 학습
agent.save_experience()
agent.learn()
# step4: 다음 스텝준비
agent.current_state = agent.next_state
if agent.terminated: break
# Step3: 다음에피소드 준비
agent.scores.append(agent.score)
agent.playtimes.append(t)
agent.n_episodes = agent.n_episodes + 1
#---#
logfreq = 300
if (agent.n_episodes % logfreq) == 0:
print(
f"에피소드:{agent.n_episodes}\t"
f"점수(에피소드):{np.mean(agent.scores[-logfreq:]):.2f}\t"
f"게임시간(에피소드):{np.mean(agent.playtimes[-logfreq:]):.2f}\t"
)에피소드:300 점수(에피소드):-7.32 게임시간(에피소드):3.45
에피소드:600 점수(에피소드):-9.92 게임시간(에피소드):3.48
에피소드:900 점수(에피소드):-0.54 게임시간(에피소드):3.64
에피소드:1200 점수(에피소드):95.00 게임시간(에피소드):6.00
에피소드:1500 점수(에피소드):95.00 게임시간(에피소드):6.00
에피소드:1800 점수(에피소드):95.00 게임시간(에피소드):6.00
에피소드:2100 점수(에피소드):95.00 게임시간(에피소드):6.00
에피소드:2400 점수(에피소드):95.00 게임시간(에피소드):6.00
에피소드:2700 점수(에피소드):95.00 게임시간(에피소드):6.00
에피소드:3000 점수(에피소드):95.00 게임시간(에피소드):6.00
F. 상호작용결과 시각화
states = [np.array([0,0])] + agent.next_states[-agent.playtimes[-1]:]
show(states)5. AgentExplorer
A. 클래스 설계
class AgentExplorer(AgentGreedy):
def __init__(self,env):
super().__init__(env)
self.eps = 0 # 이것이 0이라는 의미는 돌발행동을 안한다는 의미. 즉 AgentGreedy 와 같은 행동을 한다는 의미
def act(self):
if np.random.rand() < self.eps:
self.action = self.action_space.sample()
else:
super().act() B. 환경과 상호작용
env = GridWorld()
agent = AgentExplorer(env)
agent.eps = 1 # 돌발행동할 확률이 100퍼
for _ in range(3000):
# Step1: 에피소드 준비
agent.current_state = env.reset()
agent.terminated = False
agent.score = 0
# Step2: 에피소드 진행
for t in range(1,51):
# step1: 행동
agent.act()
# step2: 보상
agent.next_state, agent.reward, agent.terminated = env.step(agent.action)
# step3: 저장 & 학습
agent.save_experience()
agent.learn()
# step4: 다음 스텝준비
agent.current_state = agent.next_state
if agent.terminated: break
# Step3: 다음에피소드 준비
agent.scores.append(agent.score)
agent.playtimes.append(t)
agent.n_episodes = agent.n_episodes + 1
agent.eps = agent.eps * 0.999
#---#
logfreq = 300
if (agent.n_episodes % logfreq) == 0:
print(
f"에피소드:{agent.n_episodes}\t"
f"점수(에피소드):{np.mean(agent.scores[-logfreq:]):.2f}\t"
f"게임시간(에피소드):{np.mean(agent.playtimes[-logfreq:]):.2f}\t"
f"돌발행동(에피소드):{agent.eps:.2f}"
)에피소드:300 점수(에피소드):-11.40 게임시간(에피소드):3.50 돌발행동(에피소드):0.74
에피소드:600 점수(에피소드):-9.67 게임시간(에피소드):3.60 돌발행동(에피소드):0.55
에피소드:900 점수(에피소드):0.70 게임시간(에피소드):4.23 돌발행동(에피소드):0.41
에피소드:1200 점수(에피소드):51.58 게임시간(에피소드):6.15 돌발행동(에피소드):0.30
에피소드:1500 점수(에피소드):65.10 게임시간(에피소드):6.20 돌발행동(에피소드):0.22
에피소드:1800 점수(에피소드):69.62 게임시간(에피소드):6.08 돌발행동(에피소드):0.17
에피소드:2100 점수(에피소드):78.86 게임시간(에피소드):6.01 돌발행동(에피소드):0.12
에피소드:2400 점수(에피소드):83.88 게임시간(에피소드):6.12 돌발행동(에피소드):0.09
에피소드:2700 점수(에피소드):84.49 게임시간(에피소드):5.87 돌발행동(에피소드):0.07
에피소드:3000 점수(에피소드):88.74 게임시간(에피소드):6.03 돌발행동(에피소드):0.05
C. 상호작용 결과 시각화
states = [np.array([0,0])] + agent.next_states[-agent.playtimes[-1]:]
show(states)