Source code for iclbench.evaluator

import copy
import json
import logging
import multiprocessing
import os
import random
import traceback
from collections import defaultdict
from pathlib import Path

import numpy as np
from hydra.utils import get_original_cwd
from tqdm import tqdm

from iclbench.agents.icl import ICLAgent
from iclbench.dataset import InContextDataset
from iclbench.environments import make_env


[docs] class Evaluator: """ Class to evaluate an agent on a set of tasks in a given environment. The `Evaluator` class is responsible for orchestrating the evaluation of agents across multiple tasks within a specified environment. It manages the setup of the environment, runs episodes, logs results, and can execute evaluations in parallel or sequentially. Attributes: env_name (str): Name of the environment in which the agent operates. config (Config): Configuration object containing evaluation parameters. tasks (list): List of tasks for the specified environment. num_episodes (int): Number of episodes to run for each task. num_workers (int): Number of parallel worker processes to use. max_steps_per_episode (int): Maximum number of steps per episode. dataset (InContextDataset): Dataset object for managing in-context learning tasks. """
[docs] def __init__(self, env_name, config, original_cwd=""): """ Initializes the Evaluator with environment name and configuration. Args: env_name (str): Name of the environment. config (Config): Configuration object with evaluation parameters. original_cwd (str, optional): Original current working directory. Defaults to "". """ self.env_name = env_name.strip() # Ensure no leading/trailing whitespace self.config = config self.tasks = config.tasks[f"{self.env_name}_tasks"] self.num_episodes = config.eval.num_episodes[self.env_name] self.num_workers = config.eval.num_workers self.max_steps_per_episode = config.eval.max_steps_per_episode self.dataset = InContextDataset(self.config, self.env_name, original_cwd=original_cwd)
[docs] def load_in_context_learning_episode(self, i, task, agent, episode_log): """ Loads and executes an in-context learning episode for the specified task. Args: i (int): Index of the in-context learning episode. task (str): Name of the task to be evaluated. agent (BaseAgent): The agent being evaluated. episode_log (dict): Log to record episode results. """ demo_config = copy.deepcopy(self.config) demo_task = self.dataset.demo_task(task) demo_path = self.dataset.demo_path(i, demo_task, demo_config) self.dataset.override_incontext_config(demo_config, demo_path) env = make_env(self.env_name, demo_task, demo_config) recorded_actions = self.dataset.load_incontext_actions(demo_path) seed = demo_config.envs.env_kwargs.seed if seed is not None: random.seed(seed) np.random.seed(seed) env.seed(seed=seed) obs = env.reset() for action in recorded_actions: text_action = env.get_text_action(action) agent.update_icl_observation(obs) agent.update_icl_action(text_action) if self.config.eval.save_trajectories: episode_log["trajectory"].append((obs["text"]["long_term_context"], text_action)) episode_log["action_frequency"][text_action] += 1 obs, reward, done, info = env.step(text_action) if done: break if not done: print("warning: icl trajectory ended without done") agent.wrap_episode()
[docs] def run_episode(self, task, agent, process_num=None, position=0): """ Executes a single evaluation episode for the specified task. Args: task (str): Name of the task to be evaluated. agent (BaseAgent): The agent being evaluated. process_num (int, optional): Process number for logging purposes. Defaults to None. position (int, optional): Position for progress bar. Defaults to 0. Returns: dict: Log of the episode results including trajectory, action frequency, and performance metrics. """ env = make_env(self.env_name, task, self.config) agent.reset() seed = self.config.envs.env_kwargs.seed if seed is not None: random.seed(seed) np.random.seed(seed) env.seed(seed=seed) obs = env.reset() episode_log = { "task": task, "trajectory": [], "action_frequency": defaultdict(int), "input_tokens": 0, "output_tokens": 0, } instructions = None if self.env_name == "babyai": instructions = obs["mission"] agent.prompt_builder.update_instruction_prompt(env.get_instruction_prompt(instructions=instructions)) episode_return = 0.0 max_steps_per_episode = env.max_steps if self.max_steps_per_episode is None else self.max_steps_per_episode # if the agent class is ICLAgent, load the in-context learning episode if isinstance(agent, ICLAgent): for icl_episode in range(self.config.eval.icl_episodes): self.load_in_context_learning_episode(icl_episode, task, agent, episode_log) if self.config.agent.cache_icl and self.config.client.client_name == "gemini": agent.cache_icl() pbar_desc = f"Task: {task}, Proc: {process_num}" pbar = tqdm( total=max_steps_per_episode, desc=pbar_desc, position=position, leave=True, # Keep the progress bar after completion dynamic_ncols=True, ) action = None for step in range(max_steps_per_episode): response = agent.act(obs, prev_action=action) action = env.check_action_validity(response.completion) if self.config.eval.save_trajectories: reasoning = response.reasoning if hasattr(response, "reasoning") else None episode_log["trajectory"].append((obs["text"]["long_term_context"], reasoning if reasoning else action)) episode_log["action_frequency"][action] += 1 episode_log["input_tokens"] += response.input_tokens episode_log["output_tokens"] += response.output_tokens obs, reward, done, info = env.step(action) episode_return += reward pbar.update(1) if done: logging.info(f"Episode done with reward: {episode_return}") episode_log["done"] = True if pbar.n < pbar.total: pbar.update(pbar.total - pbar.n) pbar.set_postfix_str("DONE") break if pbar.n < pbar.total: pbar.update(pbar.total - pbar.n) if "done" not in episode_log: pbar.set_postfix_str("DONE") pbar.close() episode_log["episode_return"] = episode_return episode_log["num_steps"] = step + 1 episode_log["failed_candidates"] = env.failed_candidates episode_log.update(env.get_stats()) return episode_log
[docs] def run(self, agent_factory): """ Executes the evaluation process either sequentially or in parallel. Args: agent_factory (AgentFactory): Factory to create instances of the agent. Returns: dict: Summary of the results for all tasks. """ if self.num_workers > 1: results = self._run_parallel(agent_factory) else: results = self._run_sequential(agent_factory) summary = self._save_results(results, self.env_name) return summary
def _run_sequential(self, agent_factory): results = defaultdict(list) total_episodes = len(self.tasks) * self.num_episodes with tqdm(total=total_episodes, desc="Evaluating Episodes") as pbar: for task in self.tasks: for _ in range(self.num_episodes): agent = agent_factory.create_agent() episode_log = self.run_episode(task, agent) results[task].append(episode_log) pbar.update(1) return results def _run_parallel(self, agent_factory): task_queue = multiprocessing.Queue() results_queue = multiprocessing.Queue() # Create a multiprocessing context with spawn ctx = multiprocessing.get_context("fork") # Create a list of all tasks to be executed all_tasks = [task for task in self.tasks for _ in range(self.num_episodes)] # Initially fill the task queue with tasks up to the number of workers for task in all_tasks[: self.num_workers]: task_queue.put(task) # Assign unique positions for progress bars positions = list(range(self.num_workers)) processes = [] for idx in range(self.num_workers): position = positions[idx] p = ctx.Process( target=self._worker, args=(task_queue, results_queue, agent_factory, position), ) processes.append(p) p.start() results = defaultdict(list) tasks_completed = 0 tasks_queued = self.num_workers total_tasks = len(all_tasks) with tqdm(total=total_tasks, desc="Evaluating Episodes") as pbar: while tasks_completed < total_tasks: result = results_queue.get() if "error" in result: logging.error( f"Error in task {result['task']} processed by {result['process_num']}: {result['error']}" ) logging.error(f"Traceback:\n{result['traceback']}") else: results[result["task"]].append(result) tasks_completed += 1 # Update progress bar pbar.update(1) pbar.set_description(f"Last task: {result['task']}, Process: {result.get('process_num', 'N/A')}") # Queue another task if there are any left if tasks_queued < len(all_tasks): task_queue.put(all_tasks[tasks_queued]) tasks_queued += 1 # Signal workers to stop for _ in range(self.num_workers): task_queue.put(None) for p in processes: p.join() return results def _worker(self, task_queue, results_queue, agent_factory, position): agent = agent_factory.create_agent() process_num = multiprocessing.current_process().name while True: task = task_queue.get() if task is None: break try: result = self.run_episode(task, agent, process_num=process_num, position=position + 1) result["process_num"] = process_num # Include process number in result results_queue.put(result) except Exception as e: tb = traceback.format_exc() logging.error(f"Error in worker processing task {task}: {e}\n{tb}") results_queue.put({"task": task, "error": str(e), "traceback": tb, "process_num": process_num}) def _save_results(self, results, env_name): progression = 0.0 count = 0 env_summary = defaultdict(list) for task, result in results.items(): task_folder = os.path.join(env_name, task) task_progression = 0.0 task_count = 0 for idx, run in enumerate(result): progression += run.get("progression", 0.0) count += 1 task_progression += run.get("progression", 0.0) task_count += 1 filename = os.path.join(task_folder, f"{task}_run_{idx:02d}.json") Path(filename).parent.mkdir(exist_ok=True, parents=True) with open(filename, "w") as file: json.dump(run, file, indent=4) env_summary[task] = ( task_progression / task_count if task_count else 0, task_count, ) data = { "progression_percentage": 100 * progression / count if count else 0, "episodes_played": count, "tasks": { task: {"progression_percentage": 100 * prog, "episodes_played": cnt} for task, (prog, cnt) in env_summary.items() }, "input_tokens": sum(run["input_tokens"] for task_results in results.values() for run in task_results), "output_tokens": sum(run["output_tokens"] for task_results in results.values() for run in task_results), } filename = os.path.join(env_name, f"{env_name}_summary.json") with open(filename, "w") as file: json.dump(data, file, indent=4) logging.info(f"Results saved for {env_name} in {filename}") return data