import copy
import json
import logging
import multiprocessing
import os
import random
import traceback
from collections import defaultdict
from pathlib import Path
import numpy as np
from hydra.utils import get_original_cwd
from tqdm import tqdm
from iclbench.agents.icl import ICLAgent
from iclbench.dataset import InContextDataset
from iclbench.environments import make_env
[docs]
class Evaluator:
"""
Class to evaluate an agent on a set of tasks in a given environment.
The `Evaluator` class is responsible for orchestrating the evaluation of agents
across multiple tasks within a specified environment. It manages the setup of
the environment, runs episodes, logs results, and can execute evaluations in
parallel or sequentially.
Attributes:
env_name (str): Name of the environment in which the agent operates.
config (Config): Configuration object containing evaluation parameters.
tasks (list): List of tasks for the specified environment.
num_episodes (int): Number of episodes to run for each task.
num_workers (int): Number of parallel worker processes to use.
max_steps_per_episode (int): Maximum number of steps per episode.
dataset (InContextDataset): Dataset object for managing in-context learning tasks.
"""
[docs]
def __init__(self, env_name, config, original_cwd=""):
"""
Initializes the Evaluator with environment name and configuration.
Args:
env_name (str): Name of the environment.
config (Config): Configuration object with evaluation parameters.
original_cwd (str, optional): Original current working directory. Defaults to "".
"""
self.env_name = env_name.strip() # Ensure no leading/trailing whitespace
self.config = config
self.tasks = config.tasks[f"{self.env_name}_tasks"]
self.num_episodes = config.eval.num_episodes[self.env_name]
self.num_workers = config.eval.num_workers
self.max_steps_per_episode = config.eval.max_steps_per_episode
self.dataset = InContextDataset(self.config, self.env_name, original_cwd=original_cwd)
[docs]
def load_in_context_learning_episode(self, i, task, agent, episode_log):
"""
Loads and executes an in-context learning episode for the specified task.
Args:
i (int): Index of the in-context learning episode.
task (str): Name of the task to be evaluated.
agent (BaseAgent): The agent being evaluated.
episode_log (dict): Log to record episode results.
"""
demo_config = copy.deepcopy(self.config)
demo_task = self.dataset.demo_task(task)
demo_path = self.dataset.demo_path(i, demo_task, demo_config)
self.dataset.override_incontext_config(demo_config, demo_path)
env = make_env(self.env_name, demo_task, demo_config)
recorded_actions = self.dataset.load_incontext_actions(demo_path)
seed = demo_config.envs.env_kwargs.seed
if seed is not None:
random.seed(seed)
np.random.seed(seed)
env.seed(seed=seed)
obs = env.reset()
for action in recorded_actions:
text_action = env.get_text_action(action)
agent.update_icl_observation(obs)
agent.update_icl_action(text_action)
if self.config.eval.save_trajectories:
episode_log["trajectory"].append((obs["text"]["long_term_context"], text_action))
episode_log["action_frequency"][text_action] += 1
obs, reward, done, info = env.step(text_action)
if done:
break
if not done:
print("warning: icl trajectory ended without done")
agent.wrap_episode()
[docs]
def run_episode(self, task, agent, process_num=None, position=0):
"""
Executes a single evaluation episode for the specified task.
Args:
task (str): Name of the task to be evaluated.
agent (BaseAgent): The agent being evaluated.
process_num (int, optional): Process number for logging purposes. Defaults to None.
position (int, optional): Position for progress bar. Defaults to 0.
Returns:
dict: Log of the episode results including trajectory, action frequency,
and performance metrics.
"""
env = make_env(self.env_name, task, self.config)
agent.reset()
seed = self.config.envs.env_kwargs.seed
if seed is not None:
random.seed(seed)
np.random.seed(seed)
env.seed(seed=seed)
obs = env.reset()
episode_log = {
"task": task,
"trajectory": [],
"action_frequency": defaultdict(int),
"input_tokens": 0,
"output_tokens": 0,
}
instructions = None
if self.env_name == "babyai":
instructions = obs["mission"]
agent.prompt_builder.update_instruction_prompt(env.get_instruction_prompt(instructions=instructions))
episode_return = 0.0
max_steps_per_episode = env.max_steps if self.max_steps_per_episode is None else self.max_steps_per_episode
# if the agent class is ICLAgent, load the in-context learning episode
if isinstance(agent, ICLAgent):
for icl_episode in range(self.config.eval.icl_episodes):
self.load_in_context_learning_episode(icl_episode, task, agent, episode_log)
if self.config.agent.cache_icl and self.config.client.client_name == "gemini":
agent.cache_icl()
pbar_desc = f"Task: {task}, Proc: {process_num}"
pbar = tqdm(
total=max_steps_per_episode,
desc=pbar_desc,
position=position,
leave=True, # Keep the progress bar after completion
dynamic_ncols=True,
)
action = None
for step in range(max_steps_per_episode):
response = agent.act(obs, prev_action=action)
action = env.check_action_validity(response.completion)
if self.config.eval.save_trajectories:
reasoning = response.reasoning if hasattr(response, "reasoning") else None
episode_log["trajectory"].append((obs["text"]["long_term_context"], reasoning if reasoning else action))
episode_log["action_frequency"][action] += 1
episode_log["input_tokens"] += response.input_tokens
episode_log["output_tokens"] += response.output_tokens
obs, reward, done, info = env.step(action)
episode_return += reward
pbar.update(1)
if done:
logging.info(f"Episode done with reward: {episode_return}")
episode_log["done"] = True
if pbar.n < pbar.total:
pbar.update(pbar.total - pbar.n)
pbar.set_postfix_str("DONE")
break
if pbar.n < pbar.total:
pbar.update(pbar.total - pbar.n)
if "done" not in episode_log:
pbar.set_postfix_str("DONE")
pbar.close()
episode_log["episode_return"] = episode_return
episode_log["num_steps"] = step + 1
episode_log["failed_candidates"] = env.failed_candidates
episode_log.update(env.get_stats())
return episode_log
[docs]
def run(self, agent_factory):
"""
Executes the evaluation process either sequentially or in parallel.
Args:
agent_factory (AgentFactory): Factory to create instances of the agent.
Returns:
dict: Summary of the results for all tasks.
"""
if self.num_workers > 1:
results = self._run_parallel(agent_factory)
else:
results = self._run_sequential(agent_factory)
summary = self._save_results(results, self.env_name)
return summary
def _run_sequential(self, agent_factory):
results = defaultdict(list)
total_episodes = len(self.tasks) * self.num_episodes
with tqdm(total=total_episodes, desc="Evaluating Episodes") as pbar:
for task in self.tasks:
for _ in range(self.num_episodes):
agent = agent_factory.create_agent()
episode_log = self.run_episode(task, agent)
results[task].append(episode_log)
pbar.update(1)
return results
def _run_parallel(self, agent_factory):
task_queue = multiprocessing.Queue()
results_queue = multiprocessing.Queue()
# Create a multiprocessing context with spawn
ctx = multiprocessing.get_context("fork")
# Create a list of all tasks to be executed
all_tasks = [task for task in self.tasks for _ in range(self.num_episodes)]
# Initially fill the task queue with tasks up to the number of workers
for task in all_tasks[: self.num_workers]:
task_queue.put(task)
# Assign unique positions for progress bars
positions = list(range(self.num_workers))
processes = []
for idx in range(self.num_workers):
position = positions[idx]
p = ctx.Process(
target=self._worker,
args=(task_queue, results_queue, agent_factory, position),
)
processes.append(p)
p.start()
results = defaultdict(list)
tasks_completed = 0
tasks_queued = self.num_workers
total_tasks = len(all_tasks)
with tqdm(total=total_tasks, desc="Evaluating Episodes") as pbar:
while tasks_completed < total_tasks:
result = results_queue.get()
if "error" in result:
logging.error(
f"Error in task {result['task']} processed by {result['process_num']}: {result['error']}"
)
logging.error(f"Traceback:\n{result['traceback']}")
else:
results[result["task"]].append(result)
tasks_completed += 1
# Update progress bar
pbar.update(1)
pbar.set_description(f"Last task: {result['task']}, Process: {result.get('process_num', 'N/A')}")
# Queue another task if there are any left
if tasks_queued < len(all_tasks):
task_queue.put(all_tasks[tasks_queued])
tasks_queued += 1
# Signal workers to stop
for _ in range(self.num_workers):
task_queue.put(None)
for p in processes:
p.join()
return results
def _worker(self, task_queue, results_queue, agent_factory, position):
agent = agent_factory.create_agent()
process_num = multiprocessing.current_process().name
while True:
task = task_queue.get()
if task is None:
break
try:
result = self.run_episode(task, agent, process_num=process_num, position=position + 1)
result["process_num"] = process_num # Include process number in result
results_queue.put(result)
except Exception as e:
tb = traceback.format_exc()
logging.error(f"Error in worker processing task {task}: {e}\n{tb}")
results_queue.put({"task": task, "error": str(e), "traceback": tb, "process_num": process_num})
def _save_results(self, results, env_name):
progression = 0.0
count = 0
env_summary = defaultdict(list)
for task, result in results.items():
task_folder = os.path.join(env_name, task)
task_progression = 0.0
task_count = 0
for idx, run in enumerate(result):
progression += run.get("progression", 0.0)
count += 1
task_progression += run.get("progression", 0.0)
task_count += 1
filename = os.path.join(task_folder, f"{task}_run_{idx:02d}.json")
Path(filename).parent.mkdir(exist_ok=True, parents=True)
with open(filename, "w") as file:
json.dump(run, file, indent=4)
env_summary[task] = (
task_progression / task_count if task_count else 0,
task_count,
)
data = {
"progression_percentage": 100 * progression / count if count else 0,
"episodes_played": count,
"tasks": {
task: {"progression_percentage": 100 * prog, "episodes_played": cnt}
for task, (prog, cnt) in env_summary.items()
},
"input_tokens": sum(run["input_tokens"] for task_results in results.values() for run in task_results),
"output_tokens": sum(run["output_tokens"] for task_results in results.values() for run in task_results),
}
filename = os.path.join(env_name, f"{env_name}_summary.json")
with open(filename, "w") as file:
json.dump(data, file, indent=4)
logging.info(f"Results saved for {env_name} in {filename}")
return data