Source code for iclbench.environments.crafter.env

import itertools

import crafter
import gym
import numpy as np
from PIL import Image

from iclbench.environments import Strings

ACTIONS = [
    "Noop",
    "Move West",
    "Move East",
    "Move North",
    "Move South",
    "Do",
    "Sleep",
    "Place Stone",
    "Place Table",
    "Place Furnace",
    "Place Plant",
    "Make Wood Pickaxe",
    "Make Stone Pickaxe",
    "Make Iron Pickaxe",
    "Make Wood Sword",
    "Make Stone Sword",
    "Make Iron Sword",
]

id_to_item = [0] * 19


# wrap in singleton to avoid import errors for doc building
PLAYER_IDX = None
[docs] def get_player_idx(): global PLAYER_IDX if PLAYER_IDX is None: dummyenv = crafter.Env() for name, ind in itertools.chain(dummyenv._world._mat_ids.items(), dummyenv._sem_view._obj_ids.items()): name = ( str(name)[str(name).find("objects.") + len("objects.") : -2].lower() if "objects." in str(name) else str(name) ) id_to_item[ind] = name PLAYER_IDX = id_to_item.index("player") return PLAYER_IDX
vitals = [ "health", "food", "drink", "energy", ] rot = np.array([[0, -1], [1, 0]]) directions = ["front", "right", "back", "left"]
[docs] def describe_inventory(info): result = "" status_str = "Your status:\n{}".format("\n".join(["- {}: {}/9".format(v, info["inventory"][v]) for v in vitals])) result += status_str + "\n\n" inventory_str = "\n".join( ["- {}: {}".format(i, num) for i, num in info["inventory"].items() if i not in vitals and num != 0] ) inventory_str = ( "Your inventory:\n{}".format(inventory_str) if inventory_str else "You have nothing in your inventory." ) result += inventory_str # + "\n\n" return result.strip()
REF = np.array([0, 1])
[docs] def rotation_matrix(v1, v2): dot = np.dot(v1, v2) cross = np.cross(v1, v2) rotation_matrix = np.array([[dot, -cross], [cross, dot]]) return rotation_matrix
[docs] def describe_loc(ref, P): desc = [] if ref[1] > P[1]: desc.append("north") elif ref[1] < P[1]: desc.append("south") if ref[0] > P[0]: desc.append("west") elif ref[0] < P[0]: desc.append("east") return "-".join(desc)
[docs] def describe_env(info): assert info["semantic"][info["player_pos"][0], info["player_pos"][1]] == get_player_idx() semantic = info["semantic"][ info["player_pos"][0] - info["view"][0] // 2 : info["player_pos"][0] + info["view"][0] // 2 + 1, info["player_pos"][1] - info["view"][1] // 2 + 1 : info["player_pos"][1] + info["view"][1] // 2, ] center = np.array([info["view"][0] // 2, info["view"][1] // 2 - 1]) result = "" x = np.arange(semantic.shape[1]) y = np.arange(semantic.shape[0]) x1, y1 = np.meshgrid(x, y) loc = np.stack((y1, x1), axis=-1) dist = np.absolute(center - loc).sum(axis=-1) obj_info_list = [] facing = info["player_facing"] target = (center[0] + facing[0], center[1] + facing[1]) max_y, max_x = semantic.shape target_x = center[0] + facing[0] target_y = center[1] + facing[1] if 0 <= target_x < max_x and 0 <= target_y < max_y: target_id = semantic[int(target_y), int(target_x)] target_item = id_to_item[target_id] obs = "You face {} at your front.".format(target_item) else: obs = "You face nothing at your front." for idx in np.unique(semantic): if idx == get_player_idx(): continue smallest = np.unravel_index(np.argmin(np.where(semantic == idx, dist, np.inf)), semantic.shape) obj_info_list.append((id_to_item[idx], dist[smallest], describe_loc(np.array([0, 0]), smallest - center))) if len(obj_info_list) > 0: status_str = "You see:\n{}".format( "\n".join(["- {} {} steps to your {}".format(name, dist, loc) for name, dist, loc in obj_info_list]) ) else: status_str = "You see nothing away from you." result += status_str + "\n\n" result += obs.strip() return result.strip()
[docs] def describe_act(action): result = "" action_str = action.replace("do_", "interact_") action_str = action_str.replace("move_up", "move_north") action_str = action_str.replace("move_down", "move_south") action_str = action_str.replace("move_left", "move_west") action_str = action_str.replace("move_right", "move_east") act = "You took action {}.".format(action_str) result += act return result.strip()
[docs] def describe_status(info): if info["sleeping"]: return "You are sleeping, and will not be able take actions until energy is full.\n\n" elif info["dead"]: return "You died.\n\n" else: return ""
[docs] def describe_frame(info): try: result = "" result += describe_status(info) result += "\n\n" result += describe_env(info) result += "\n\n" return result.strip(), describe_inventory(info) except Exception as e: breakpoint() return "Error, you are out of the map."
[docs] class CrafterLanguageWrapper(gym.Wrapper): default_iter = 10 default_steps = 10000
[docs] def __init__( self, env, task="", max_episode_steps=2, ): super().__init__(env) self.score_tracker = 0 self.language_action_space = Strings(ACTIONS) self.default_action = "Noop" self.max_steps = max_episode_steps self.achievements = None
[docs] def get_text_action(self, action): return self.language_action_space._values[action]
def _step_impl(self, action): obs, reward, done, info = super().step(action) # extra stuff for language wrapper aug_info = info.copy() aug_info["sleeping"] = self.env._player.sleeping aug_info["player_facing"] = self.env._player.facing aug_info["dead"] = self.env._player.health <= 0 aug_info["unlocked"] = { name for name, count in self.env._player.achievements.items() if count > 0 and name not in self.env._unlocked } aug_info["view"] = self.env._view return obs, reward, done, aug_info
[docs] def reset(self): self.env.reset() obs, reward, done, info = self._step_impl(0) self.score_tracker = 0 self.achievements = None return self.process_obs(obs, info)
[docs] def step(self, action): obs, reward, done, info = self._step_impl(self.language_action_space.map(action)) self.score_tracker = self.update_progress(info) obs = self.process_obs(obs, info) return obs, reward, done, info
[docs] def process_obs(self, obs, info): img = Image.fromarray(self.env.render()).convert("RGB") long_term_context, short_term_context = describe_frame(info) return { "text": {"long_term_context": long_term_context, "short_term_context": short_term_context}, "image": img, "obs": obs, }
[docs] def update_progress(self, info): self.score_tracker = 0 + sum([1.0 for k, v in info["achievements"].items() if v > 0]) self.achievements = info["achievements"] return self.score_tracker
[docs] def get_stats(self): return { "score": self.score_tracker, "progression": float(self.score_tracker) / 22.0, "achievements": self.achievements, }