import sys import numpy as np import gym import gym.spaces from gym import wrappers from keras.models import Model,Sequential from keras.layers import Dense, Flatten, Input, concatenate,Activation from keras.optimizers import Adam from rl.agents.dqn import DQNAgent from rl.policy import BoltzmannQPolicy from rl.agents import DDPGAgent from rl.memory import SequentialMemory from ctypes import windll,wintypes,byref STD_OUTPUT_HANDLE = -11 ENABLE_VIRTUAL_TERMINAL_PROCESSING = 0x0004 class MyEnv(gym.Env): def __init__(self): self.action_space = gym.spaces.Discrete(3) # 行動空間。速度を下げる、そのまま、上げるの3種 high = np.array([1.0, 1.0]) # 観測空間(state)の次元 (位置と速度の2次元) とそれらの最大値 self.observation_space = gym.spaces.Box(low=-high, high=high) # 最小値は、最大値のマイナスがけ # 各stepごとに呼ばれる # actionを受け取り、次のstateとreward、episodeが終了したかどうかを返すように実装 def step(self, action): # actionを受け取り、次のstateを決定 dt = 0.1 acc = (np.random.choice(3,p=action) - 1) * 0.1 self.vel += acc * dt self.vel = max(-1.0, min(self.vel, 1.0)) self.pos += self.vel * dt self.pos = max(-1.0, min(self.pos, 1.0)) # 位置と速度の絶対値が十分小さくなったらepisode終了 done = abs(self.pos) < 0.1 and abs(self.vel) < 0.1 if done: # 終了したときに正の報酬 reward = 1.0 else: # 時間経過ごとに負の報酬 # ゴールに近づくように、距離が近くなるほど絶対値を減らしておくと、学習が早く進む reward = -0.01 * abs(self.pos) # 次のstate、reward、終了したかどうか、追加情報の順に返す # 追加情報は特にないので空dict return np.array([self.pos, self.vel]), reward, done, {} # 各episodeの開始時に呼ばれ、初期stateを返すように実装 def reset(self): # 初期stateは、位置はランダム、速度ゼロ self.pos = np.random.rand()*2 - 1 self.vel = 0.0 return np.array([self.pos, self.vel]) def render(self,mode ='human',close=False): outfile = sys.stdout outfile.write('\r'+'pos:' + str(self.pos) +' vel:' + str(self.vel)) return outfile class MyEnv2(gym.Env): metadata = {'render.modes': ['human', 'ansi']} FIELD_TYPES = [ 'S', # 0: スタート 'G', # 1: ゴール '~', # 2: 芝生(敵の現れる確率1/10) 'w', # 3: 森(敵の現れる確率1/2) '=', # 4: 毒沼(1step毎に1のダメージ, 敵の現れる確率1/2) 'A', # 5: 山(歩けない) 'Y', # 6: 勇者 ] MAP = np.array([ [5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5], # "AAAAAAAAAAAA" [5, 5, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2], # "AA~~~~~~~~~~" [5, 5, 2, 0, 2, 2, 5, 2, 2, 4, 2, 2], # "AA~S~~A~~=~~" [5, 2, 2, 2, 2, 2, 5, 5, 4, 4, 2, 2], # "A~~~~~AA==~~" [2, 2, 3, 3, 3, 3, 5, 5, 2, 2, 3, 3], # "~~wwwwAA~~ww" [2, 3, 3, 3, 3, 5, 2, 2, 1, 2, 2, 3], # "~wwwwA~~G~~w" [2, 2, 2, 2, 2, 2, 4, 4, 2, 2, 2, 2], # "~~~~~~==~~~~" ]) MAX_STEPS = 100 def __init__(self): super().__init__() # action_space, observation_space, reward_range を設定する self.action_space = gym.spaces.Discrete(4) # 東西南北 self.observation_space = gym.spaces.Box( low=0, high=len(self.FIELD_TYPES), shape=self.MAP.shape ) self.reward_range = [-1., 100.] self.reset() def reset(self): # 諸々の変数を初期化する self.pos = self._find_pos('S')[0] self.goal = self._find_pos('G')[0] self.done = False self.damage = 0 self.steps = 0 return self._observe() def step(self, action): # 1ステップ進める処理を記述。戻り値は observation, reward, done(ゲーム終了したか), info(追加の情報の辞書) #acc = np.random.choice(4,p=action) if np.random.rand()>0.01 else np.random.choice(4,p=np.array([0.25,0.25,0.25,0.25])) if action == 0: next_pos = self.pos + [0, 1] elif action == 1: next_pos = self.pos + [0, -1] elif action == 2: next_pos = self.pos + [1, 0] elif action == 3: next_pos = self.pos + [-1, 0] if self._is_movable(next_pos): self.pos = next_pos moved = True else: moved = False observation = self._observe() reward = self._get_reward(self.pos, moved) self.damage += self._get_damage(self.pos) self.done = self._is_done() return observation, reward, self.done, {} def render(self, mode='human', close=False): # human の場合はコンソールに出力。ansiの場合は StringIO を返す outfile = StringIO() if mode == 'ansi' else sys.stdout outfile.write('\n'.join(' '.join( self.FIELD_TYPES[elem] for elem in row ) for row in self._observe() )+ '\n' ) outfile.flush() outfile.write('\033[7F') outfile.write('\033[J') return outfile def close(self): pass def seed(self, seed=None): pass def _get_reward(self, pos, moved): # 報酬を返す。報酬の与え方が難しいが、ここでは # - ゴールにたどり着くと 100 ポイント # - ダメージはゴール時にまとめて計算 # - 1ステップごとに-1ポイント(できるだけ短いステップでゴールにたどり着きたい) # とした if moved and (self.goal == pos).all(): return max(100 - self.damage, 0) else: return -1 def _get_damage(self, pos): # ダメージの計算 field_type = self.FIELD_TYPES[self.MAP[tuple(pos)]] if field_type == 'S': return 0 elif field_type == 'G': return 0 elif field_type == '~': return 10 if np.random.random() < 1/10. else 0 elif field_type == 'w': return 10 if np.random.random() < 1/2. else 0 elif field_type == '=': return 11 if np.random.random() < 1/2. else 1 def _is_movable(self, pos): # マップの中にいるか、歩けない場所にいないか return ( 0 <= pos[0] < self.MAP.shape[0] and 0 <= pos[1] < self.MAP.shape[1] and self.FIELD_TYPES[self.MAP[tuple(pos)]] != 'A' ) def _observe(self): # マップに勇者の位置を重ねて返す observation = self.MAP.copy() observation[tuple(self.pos)] = self.FIELD_TYPES.index('Y') return observation def _is_done(self): # 今回は最大で self.MAX_STEPS までとした if (self.pos == self.goal).all(): return True elif self.steps > self.MAX_STEPS: return True else: return False def _find_pos(self, field_type): return np.array(list(zip(*np.where( self.MAP == self.FIELD_TYPES.index(field_type) )))) def build_actor_model(num_action, observation_shape): action_input = Input(shape=(1,)+observation_shape) x = Flatten()(action_input) x = Dense(16, activation="relu")(x) x = Dense(16, activation="relu")(x) x = Dense(num_action, activation="softmax")(x) actor = Model(inputs=action_input, outputs=x) return actor def build_critic_model(num_action, observation_shape): action_input = Input(shape=(num_action,)) observation_input = Input(shape=(1,)+observation_shape) flattened_observation = Flatten()(observation_input) x = concatenate([action_input, flattened_observation]) x = Dense(32, activation="relu")(x) x = Dense(32, activation="relu")(x) x = Dense(1, activation="linear")(x) critic = Model(inputs=[action_input, observation_input], outputs=x) return (critic, action_input) def build_agent(num_action, observation_shape): actor = build_actor_model(num_action, observation_shape) critic, critic_action_input = build_critic_model(num_action, observation_shape) memory = SequentialMemory(limit=10**5, window_length=1) agent = DDPGAgent( num_action, actor, critic, critic_action_input, memory ) return agent def run(): env = MyEnv2() print("Action Space: %s" % env.action_space) print("Observation Space: %s" % env.observation_space) agent = build_agent(env.action_space.n, env.observation_space.shape) agent.compile(Adam(lr=0.001, clipnorm=1.), metrics=["mae"]) agent.fit(env, nb_steps=50000, visualize=True, verbose=2, nb_max_episode_steps=200) agent.test(env, nb_episodes=5, visualize=True, nb_max_episode_steps=200) agent.save_weights('testDDPG_weights.hdf5',True) if __name__ == "__main__": hOut = windll.kernel32.GetStdHandle(wintypes.HANDLE(STD_OUTPUT_HANDLE)) dwMode = wintypes.DWORD() windll.kernel32.GetConsoleMode(hOut, byref(dwMode)) dwMode.value |= ENABLE_VIRTUAL_TERMINAL_PROCESSING windll.kernel32.SetConsoleMode(hOut, dwMode) run()