diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 00000000..bac9786a --- /dev/null +++ b/.gitmodules @@ -0,0 +1,3 @@ +[submodule "gpt_oss/evals/submodules/LiveCodeBench"] + path = gpt_oss/evals/submodules/LiveCodeBench + url = https://github.com/LiveCodeBench/LiveCodeBench.git diff --git a/gpt_oss/evals/__main__.py b/gpt_oss/evals/__main__.py index 40d56c12..bfccc0af 100644 --- a/gpt_oss/evals/__main__.py +++ b/gpt_oss/evals/__main__.py @@ -7,11 +7,13 @@ from .gpqa_eval import GPQAEval from .aime_eval import AIME25Eval from .healthbench_eval import HealthBenchEval +from .livecodebench_eval import LiveCodeBenchEval from .chat_completions_sampler import ( OPENAI_SYSTEM_MESSAGE_API, ChatCompletionsSampler, ) from .responses_sampler import ResponsesSampler +from .harmony_sampler import HarmonySampler def main(): @@ -34,9 +36,9 @@ def main(): parser.add_argument( "--sampler", type=str, - choices=["responses", "chat_completions"], + choices=["responses", "chat_completions", "harmony"], default="responses", - help="Sampler backend to use for models.", + help="Sampler backend to use for models. 'harmony' uses openai_harmony tokenization with SGLang /generate endpoint.", ) parser.add_argument( "--base-url", @@ -48,7 +50,7 @@ def main(): "--eval", type=str, default="gpqa,healthbench,healthbench_hard,healthbench_consensus,aime25", - help="Select an eval by name. Accepts a comma-separated list.", + help="Select an eval by name. Accepts a comma-separated list. Options: basic, gpqa, healthbench, healthbench_hard, healthbench_consensus, aime25, livecodebench", ) parser.add_argument( "--temperature", @@ -56,6 +58,24 @@ def main(): default=1.0, help="Sampling temperature", ) + parser.add_argument( + "--top-p", + type=float, + default=None, + help="Top-p (nucleus) sampling parameter", + ) + parser.add_argument( + "--top-k", + type=int, + default=None, + help="Top-k sampling parameter (sglang/vLLM specific)", + ) + parser.add_argument( + "--max-tokens", + type=int, + default=32768, + help="Maximum number of output tokens", + ) parser.add_argument( "--n-threads", type=int, @@ -68,22 +88,65 @@ def main(): parser.add_argument( "--examples", type=int, help="Number of examples to use (overrides default)" ) + parser.add_argument( + "--n-repeats", + type=int, + default=None, + help="Number of repeats per example (default: 1 in debug mode, 8 otherwise)", + ) + parser.add_argument( + "--dump-inputs", + type=str, + default=None, + help="Path to JSONL file to dump input tokens (harmony sampler only)", + ) + parser.add_argument( + "--timeout", + type=int, + default=1800, + help="Request timeout in seconds (default: 1800)", + ) + parser.add_argument( + "--lcb-workers", + type=int, + default=64, + help="Number of parallel workers for LiveCodeBench code evaluation (default: 64)", + ) + parser.add_argument( + "--lcb-version", + type=str, + default="release_v6", + help="LiveCodeBench version tag (default: release_v6). Options: release_v5, release_v6", + ) args = parser.parse_args() - sampler_cls = ResponsesSampler if args.sampler == "responses" else ChatCompletionsSampler + if args.sampler == "responses": + sampler_cls = ResponsesSampler + elif args.sampler == "chat_completions": + sampler_cls = ChatCompletionsSampler + else: # harmony + sampler_cls = HarmonySampler models = {} for model_name in args.model.split(","): for reasoning_effort in args.reasoning_effort.split(","): - models[f"{model_name}-{reasoning_effort}"] = sampler_cls( + sampler_kwargs = dict( model=model_name, reasoning_model=True, reasoning_effort=reasoning_effort, temperature=args.temperature, + top_p=args.top_p, + top_k=args.top_k, base_url=args.base_url, - max_tokens=131_072, + max_tokens=args.max_tokens, + timeout=args.timeout, ) + # Add harmony sampler specific options + if args.sampler == "harmony": + if args.dump_inputs: + sampler_kwargs["dump_inputs_dir"] = args.dump_inputs + models[f"{model_name}-{reasoning_effort}"] = sampler_cls(**sampler_kwargs) print(f"Running with args {args}") @@ -98,13 +161,18 @@ def get_evals(eval_name, debug_mode): num_examples = ( args.examples if args.examples is not None else (5 if debug_mode else None) ) + # Determine n_repeats: use --n-repeats if provided, else 1 for debug, else 8 + if args.n_repeats is not None: + n_repeats = args.n_repeats + else: + n_repeats = 1 if debug_mode else 8 # Set num_examples = None to reproduce full evals match eval_name: case "basic": return BasicEval() case "gpqa": return GPQAEval( - n_repeats=1 if args.debug else 8, + n_repeats=n_repeats, num_examples=num_examples, debug=debug_mode, n_threads=args.n_threads or 1, @@ -113,7 +181,7 @@ def get_evals(eval_name, debug_mode): return HealthBenchEval( grader_model=grading_sampler, num_examples=10 if debug_mode else num_examples, - n_repeats=1, + n_repeats=n_repeats, n_threads=args.n_threads or 1, subset_name=None, ) @@ -121,7 +189,7 @@ def get_evals(eval_name, debug_mode): return HealthBenchEval( grader_model=grading_sampler, num_examples=10 if debug_mode else num_examples, - n_repeats=1, + n_repeats=n_repeats, n_threads=args.n_threads or 1, subset_name="hard", ) @@ -129,15 +197,23 @@ def get_evals(eval_name, debug_mode): return HealthBenchEval( grader_model=grading_sampler, num_examples=10 if debug_mode else num_examples, - n_repeats=1, + n_repeats=n_repeats, n_threads=args.n_threads or 1, subset_name="consensus", ) case "aime25": return AIME25Eval( - n_repeats=1 if args.debug else 8, + n_repeats=n_repeats, + num_examples=num_examples, + n_threads=args.n_threads or 1, + ) + case "livecodebench": + return LiveCodeBenchEval( + n_repeats=n_repeats, num_examples=num_examples, n_threads=args.n_threads or 1, + lcb_workers=args.lcb_workers, + lcb_version=args.lcb_version, ) case _: raise Exception(f"Unrecognized eval type: {eval_name}") diff --git a/gpt_oss/evals/chat_completions_sampler.py b/gpt_oss/evals/chat_completions_sampler.py index 29c1a0a8..ee08857d 100644 --- a/gpt_oss/evals/chat_completions_sampler.py +++ b/gpt_oss/evals/chat_completions_sampler.py @@ -26,8 +26,11 @@ def __init__( reasoning_model: bool = False, reasoning_effort: str | None = None, base_url: str = "http://localhost:8000/v1", + top_p: float | None = None, + top_k: int | None = None, + timeout: int = 1800, ): - self.client = OpenAI(base_url=base_url, timeout=24 * 60 * 60) + self.client = OpenAI(base_url=base_url, timeout=timeout) self.model = model self.system_message = system_message self.temperature = temperature @@ -35,6 +38,8 @@ def __init__( self.reasoning_model = reasoning_model self.reasoning_effort = reasoning_effort self.image_format = "url" + self.top_p = top_p + self.top_k = top_k def _pack_message(self, role: str, content: Any) -> dict[str, Any]: return {"role": str(role), "content": content} @@ -47,6 +52,13 @@ def __call__(self, message_list: MessageList) -> SamplerResponse: trial = 0 while True: try: + # Build extra kwargs for optional sampling parameters + extra_kwargs = {} + if self.top_p is not None: + extra_kwargs["top_p"] = self.top_p + if self.top_k is not None: + extra_kwargs["extra_body"] = {"top_k": self.top_k} + if self.reasoning_model: response = self.client.chat.completions.create( model=self.model, @@ -54,6 +66,7 @@ def __call__(self, message_list: MessageList) -> SamplerResponse: reasoning_effort=self.reasoning_effort, temperature=self.temperature, max_tokens=self.max_tokens, + **extra_kwargs, ) else: response = self.client.chat.completions.create( @@ -61,6 +74,7 @@ def __call__(self, message_list: MessageList) -> SamplerResponse: messages=message_list, temperature=self.temperature, max_tokens=self.max_tokens, + **extra_kwargs, ) choice = response.choices[0] diff --git a/gpt_oss/evals/harmony_sampler.py b/gpt_oss/evals/harmony_sampler.py new file mode 100644 index 00000000..969c1c66 --- /dev/null +++ b/gpt_oss/evals/harmony_sampler.py @@ -0,0 +1,251 @@ +""" +Harmony Sampler - converts chat messages to Harmony tokens and sends to SGLang /generate endpoint. +""" +import json +import os +import threading +import time +from typing import Any + +import requests +from transformers import AutoTokenizer +from openai_harmony import ( + load_harmony_encoding, + HarmonyEncodingName, + Role, + Message, + Conversation, + SystemContent, + DeveloperContent, + ReasoningEffort, +) + +from .types import MessageList, SamplerBase, SamplerResponse + + +# Map string reasoning effort to enum +REASONING_EFFORT_MAP = { + "low": ReasoningEffort.LOW, + "medium": ReasoningEffort.MEDIUM, + "high": ReasoningEffort.HIGH, +} + + +class HarmonySampler(SamplerBase): + """ + Sample from SGLang's /generate endpoint using Harmony tokenization. + + Converts chat messages to Harmony format, tokenizes them, and sends + raw tokens to the /generate endpoint. + """ + + def __init__( + self, + model: str, + temperature: float = 1.0, + max_tokens: int = 32768, + reasoning_model: bool = False, + reasoning_effort: str | None = None, + base_url: str = "http://localhost:8080", + top_p: float | None = None, + top_k: int | None = None, + dump_inputs_dir: str | None = None, + timeout: int = 1800, + ): + self.model = model + self.temperature = temperature + self.max_tokens = max_tokens + self.reasoning_model = reasoning_model + self.reasoning_effort = reasoning_effort or "high" + self.base_url = base_url.rstrip("/") + self.top_p = top_p + self.top_k = top_k + self.image_format = "url" + self.dump_inputs_file = dump_inputs_dir # renamed but keeping param name for compatibility + self.timeout = timeout + self._dump_lock = threading.Lock() + + # Load tokenizer for decoding tokens to text (always needed for HTML reports) + print(f"Loading tokenizer for model: {model}") + self.tokenizer = AutoTokenizer.from_pretrained(model, trust_remote_code=True) + print("Tokenizer loaded successfully") + + # Initialize dump file if specified + if self.dump_inputs_file: + # Create parent directory if needed + dump_dir = os.path.dirname(self.dump_inputs_file) + if dump_dir: + os.makedirs(dump_dir, exist_ok=True) + # Clear/create the file + with open(self.dump_inputs_file, "w") as f: + pass # Create empty file + + # Load the Harmony encoding for gpt-oss models + self.enc = load_harmony_encoding(HarmonyEncodingName.HARMONY_GPT_OSS) + + def _pack_message(self, role: str, content: Any) -> dict[str, Any]: + return {"role": role, "content": content} + + def _convert_to_harmony_messages(self, message_list: MessageList) -> list[Message]: + """ + Convert chat messages (role/content dicts) to Harmony Message objects. + """ + harmony_messages = [] + reasoning_effort_enum = REASONING_EFFORT_MAP.get( + self.reasoning_effort.lower(), ReasoningEffort.HIGH + ) + + # Check if there's a system message, if not create a default one + has_system = any(msg.get("role") == "system" for msg in message_list) + assert not has_system, "System message not supported" + + if not has_system: + # Create default system message with reasoning effort + system_content = ( + SystemContent.new() + .with_reasoning_effort(reasoning_effort_enum) + .with_conversation_start_date("2025-09-30") + .with_required_channels(["analysis", "commentary", "final"]) + ) + harmony_messages.append( + Message.from_role_and_content(Role.SYSTEM, system_content) + ) + + for msg in message_list: + role = msg.get("role", "user") + content = msg.get("content", "") + + if role == "developer": + developer_content = DeveloperContent.new().with_instructions(content) + harmony_messages.append( + Message.from_role_and_content(Role.DEVELOPER, developer_content) + ) + elif role == "user": + harmony_messages.append( + Message.from_role_and_content(Role.USER, content) + ) + elif role == "assistant": + harmony_messages.append( + Message.from_role_and_content(Role.ASSISTANT, content) + ) + else: + # Default to user role for unknown roles + harmony_messages.append( + Message.from_role_and_content(Role.USER, content) + ) + + return harmony_messages + + def __call__(self, message_list: MessageList) -> SamplerResponse: + trial = 0 + while True: + try: + # Convert chat messages to Harmony format + harmony_messages = self._convert_to_harmony_messages(message_list) + + # Create conversation + convo = Conversation.from_messages(harmony_messages) + + # Tokenize for completion + tokens = self.enc.render_conversation_for_completion(convo, Role.ASSISTANT) + tokens_list = tokens.tolist() if hasattr(tokens, 'tolist') else list(tokens) + + # Decode tokens to text for HTML reports + text_input = self.tokenizer.decode(tokens_list, skip_special_tokens=False) + + # Dump inputs if file is specified + if self.dump_inputs_file: + dump_data = { + "input_tokens": tokens_list, + "num_tokens": len(tokens_list), + "text_input": text_input, + "original_messages": message_list, + "sampling_params": { + "temperature": self.temperature, + "max_new_tokens": self.max_tokens, + "top_p": self.top_p, + "top_k": self.top_k, + }, + } + # Thread-safe append to JSONL file + with self._dump_lock: + with open(self.dump_inputs_file, "a") as f: + f.write(json.dumps(dump_data) + "\n") + + # Create de-tokenized message list for HTML reports + detokenized_message_list = [ + {"role": "user", "content": text_input} + ] + + # Build sampling params + sampling_params = { + "temperature": self.temperature, + "max_new_tokens": self.max_tokens, + } + if self.top_p is not None: + sampling_params["top_p"] = self.top_p + if self.top_k is not None: + sampling_params["top_k"] = self.top_k + + # Send to SGLang /generate endpoint + response = requests.post( + f"{self.base_url}/generate", + json={ + "model": self.model, + "input_ids": tokens_list, + "sampling_params": sampling_params, + }, + timeout=self.timeout, + ) + + if response.status_code != 200: + raise ValueError(f"Generate endpoint returned {response.status_code}: {response.text}") + + result = response.json() + + # Always decode output tokens using our tokenizer for consistency + if "output_ids" not in result: + raise ValueError(f"Response missing 'output_ids' field. Got keys: {list(result.keys())}") + output_ids = result["output_ids"] + response_text = self.tokenizer.decode(output_ids, skip_special_tokens=False) + + if not response_text: + raise ValueError("Generate endpoint returned empty response; retrying") + + return SamplerResponse( + response_text=response_text, + response_metadata={ + "input_tokens": len(tokens_list), + "output_tokens": result.get("meta_info", {}).get("completion_tokens"), + }, + actual_queried_message_list=detokenized_message_list, + ) + + except requests.exceptions.RequestException as e: + exception_backoff = 2 ** trial + print( + f"Request exception, wait and retry {trial} after {exception_backoff} sec", + e, + ) + time.sleep(exception_backoff) + trial += 1 + if trial > 10: + return SamplerResponse( + response_text="No response (request failed).", + response_metadata={"error": str(e)}, + actual_queried_message_list=message_list, + ) + except Exception as e: + exception_backoff = 2 ** trial + print( + f"Exception, wait and retry {trial} after {exception_backoff} sec", + e, + ) + time.sleep(exception_backoff) + trial += 1 + if trial > 10: + return SamplerResponse( + response_text="No response (error).", + response_metadata={"error": str(e)}, + actual_queried_message_list=message_list, + ) diff --git a/gpt_oss/evals/livecodebench_eval.py b/gpt_oss/evals/livecodebench_eval.py new file mode 100644 index 00000000..1bfc306a --- /dev/null +++ b/gpt_oss/evals/livecodebench_eval.py @@ -0,0 +1,457 @@ +""" +LiveCodeBench: https://huggingface.co/datasets/livecodebench/code_generation_lite + +Two-phase evaluation: +1. Phase 1: Collect all model responses and extract code +2. Phase 2: Batch evaluate code execution in parallel using ProcessPoolExecutor +""" +import argparse +import os +import re +import shutil +import sys +import time +from concurrent.futures import ProcessPoolExecutor, as_completed, TimeoutError +from contextlib import redirect_stdout, redirect_stderr +from functools import lru_cache +from typing import Any, Dict, List, Optional, Tuple + +from datasets import load_dataset +from tqdm import tqdm + +from . import report +from .types import Eval, EvalResult, SamplerBase, SingleEvalResult + + +# HuggingFace dataset configuration +LCB_HF_DATASET = "livecodebench/code_generation_lite" +LCB_DEFAULT_VERSION = "release_v6" + +LIVECODEBENCH_INSTRUCTIONS = """ +You are a python coding expert that solves problems step-by-step. +You must provide the reasoning to arriving at your solution and the code to solve the problem. +Do not try simulating the code execution. The code must be enclosed within ```python delimiters. +""" + + +def parse_code(text: str) -> Optional[str]: + """Parse code from ```python or plain ``` code block. + + Priority: + 1. Last ```python block + 2. Last plain ``` block + """ + if not text or not isinstance(text, str): + return None + + text = text.strip() + if not text: + return None + + # Try ```python blocks first (most specific) + python_matches = list(re.finditer(r"```python(.*?)```", text, re.DOTALL)) + if python_matches: + return python_matches[-1].group(1).strip() + + # Fall back to plain ``` blocks + plain_matches = list(re.finditer(r"```(.*?)```", text, re.DOTALL)) + if plain_matches: + # Get the last match + code = plain_matches[-1].group(1).strip() + # Remove language tag if present (e.g., ```python\n or ```py\n) + code = re.sub(r'^(?:python|py)\s*\n', '', code, flags=re.IGNORECASE) + return code + + return None + + +def get_lcb_dir() -> str: + """Get the LiveCodeBench submodule directory path.""" + return os.path.abspath(os.path.join( + os.path.dirname(__file__), "submodules", "LiveCodeBench")) + + +@lru_cache(maxsize=4) +def load_lcb_from_huggingface(version_tag: str = LCB_DEFAULT_VERSION) -> List[Dict[str, Any]]: + """Load LiveCodeBench questions from HuggingFace. + + Args: + version_tag: Version tag for the dataset (e.g., "release_v5", "release_v6") + + Returns: + List of examples with question_id, question_content (prompt), and starter_code. + """ + print(f"Loading LiveCodeBench from HuggingFace: {LCB_HF_DATASET} ({version_tag})...") + ds = load_dataset(LCB_HF_DATASET, version_tag=version_tag, split="test") + + examples = [] + for row in ds: + examples.append({ + "question_id": row["question_id"], + "prompt": row["question_content"], # The problem description + "starter_code": row.get("starter_code", ""), # Starter code if available + }) + + print(f"Loaded {len(examples)} problems from HuggingFace") + return examples + + +def format_prompt_with_starter_code(prompt: str, starter_code: str = "") -> str: + """Append the format section with starter code to the prompt. + + This matches the format used in the working harmonize_inputs.py pipeline. + """ + format_section = "\n### Format: You will use the following starter code to write the solution to the problem and enclose your code within delimiters.\n```python\n" + if starter_code: + format_section += starter_code + "\n" + format_section += "```\n" + return prompt + format_section + + +@lru_cache(maxsize=4) +def load_lcb_benchmark_for_eval(version_tag: str = LCB_DEFAULT_VERSION) -> Dict[str, Any]: + """Load LiveCodeBench benchmark from submodule for test execution. + + Args: + version_tag: Version tag for the dataset (e.g., "release_v5", "release_v6") + + This is needed because test execution requires the LCB library's + instance objects which contain test cases. + """ + lcb_dir = get_lcb_dir() + + if not os.path.isdir(lcb_dir): + raise FileNotFoundError( + f"LiveCodeBench submodule required at: {lcb_dir}") + + original_cwd = os.getcwd() + os.chdir(lcb_dir) + + if lcb_dir not in sys.path: + sys.path.insert(0, lcb_dir) + + try: + os.environ['TQDM_DISABLE'] = '1' + + from lcb_runner.utils.scenarios import Scenario + from lcb_runner.runner.scenario_router import build_prompt_benchmark + + mock_args = argparse.Namespace( + scenario=Scenario.codegeneration, release_version=version_tag, + subset="code_generation", language="python", not_fast=False, + start_date=None, end_date=None, k=[1], num_samples=1, + timeout=60, num_workers=1, num_process_evaluate=1, + model_name="standalone_eval", output_dir="/tmp", + prompt_type="custom", continue_existing=False, evaluate=True + ) + + full_benchmark, _ = build_prompt_benchmark(mock_args) + return {inst.question_id: inst for inst in full_benchmark} + + finally: + os.chdir(original_cwd) + os.environ.pop('TQDM_DISABLE', None) + + +def evaluate_livecodebench_detailed( + code: Optional[str], question_id: str, + version_tag: str = LCB_DEFAULT_VERSION) -> Tuple[bool, str]: + """Evaluate LiveCodeBench code generation with detailed results. + + Args: + code: The code to evaluate + question_id: The question ID to look up test cases + version_tag: Version tag for the dataset (e.g., "release_v5", "release_v6") + + Returns: + Tuple[bool, str]: (passed, detailed_reason) + """ + if not code or not question_id: + return False, "No code or question_id provided" + + lcb_dir = get_lcb_dir() + + try: + benchmark_map = load_lcb_benchmark_for_eval(version_tag) + except Exception as e: + return False, f"Failed to load benchmark: {type(e).__name__}: {e}" + + instance = benchmark_map.get(question_id) + if not instance: + return False, f"Question ID '{question_id}' not found in benchmark" + + original_cwd = os.getcwd() + temp_dir = f"/tmp/temp_lcb_eval_{question_id}_{int(time.time())}" + os.makedirs(temp_dir, exist_ok=True) + + try: + os.chdir(lcb_dir) + os.environ['TQDM_DISABLE'] = '1' + + from lcb_runner.utils.scenarios import Scenario + from lcb_runner.evaluation import extract_instance_results + from lcb_runner.runner.scenario_router import sort_and_extract_save_results, get_metrics + + mock_args = argparse.Namespace( + scenario=Scenario.codegeneration, release_version=version_tag, + subset="code_generation", language="python", not_fast=False, + start_date=None, end_date=None, k=[1], num_samples=1, + timeout=60, num_workers=1, num_process_evaluate=1, + model_name="standalone_eval", output_dir=temp_dir, + prompt_type="custom", continue_existing=False, evaluate=True, + ) + + batch_benchmark = [instance] + batch_custom_outputs = [[code]] + + save_results = [inst.insert_output(output, output) + for inst, output in zip(batch_benchmark, batch_custom_outputs)] + + _, combined_results = sort_and_extract_save_results( + mock_args.scenario, save_results) + _, instance_results, _ = get_metrics( + mock_args.scenario, mock_args, batch_benchmark, combined_results + ) + + graded = extract_instance_results(instance_results) + passed = graded and graded[0] and graded[0][0] + + # Try to extract detailed results + detailed_reason = "" + try: + if combined_results and len(combined_results) > 0: + result_info = combined_results[0] + if hasattr(result_info, 'result') and result_info.result: + test_results = result_info.result + if isinstance(test_results, dict): + detailed_reason = f"Test results: {test_results}" + elif isinstance(test_results, list): + num_passed = sum(1 for r in test_results if r) + num_total = len(test_results) + detailed_reason = f"Passed {num_passed}/{num_total} test cases" + else: + detailed_reason = f"Result: {test_results}" + elif hasattr(result_info, 'status'): + detailed_reason = f"Status: {result_info.status}" + except Exception: + pass + + if not detailed_reason: + if passed: + detailed_reason = "All tests passed" + else: + detailed_reason = "Failed one or more test cases" + + return passed, detailed_reason + + except Exception as e: + return False, f"Evaluation error: {type(e).__name__}: {str(e)[:200]}" + finally: + os.chdir(original_cwd) + shutil.rmtree(temp_dir, ignore_errors=True) + os.environ.pop('TQDM_DISABLE', None) + + +def evaluate_livecodebench_worker(args: Tuple[int, str, str, str]) -> Tuple[int, bool, str]: + """Worker function for parallel LiveCodeBench evaluation. + + Args: + args: (index, code, question_id, version_tag) + + Returns: + Tuple[int, bool, str]: (index, passed, detailed_reason) + """ + idx, code, question_id, version_tag = args + + # Suppress all stdout/stderr from worker processes to prevent pollution + try: + with open(os.devnull, 'w') as devnull: + with redirect_stdout(devnull), redirect_stderr(devnull): + os.environ['TQDM_DISABLE'] = '1' + passed, reason = evaluate_livecodebench_detailed(code, question_id, version_tag) + return idx, passed, reason + except Exception as e: + return idx, False, f"Error: {type(e).__name__}: {e}" + + +class LiveCodeBenchEval(Eval): + """ + LiveCodeBench evaluation with two-phase approach: + 1. Collect all model responses + 2. Batch evaluate code execution in parallel + """ + + def __init__( + self, + n_repeats: int = 1, + num_examples: int | None = None, + n_threads: int = 1, + lcb_workers: int = 64, + test_timeout: int = 60, + lcb_version: str = LCB_DEFAULT_VERSION, + ): + """ + Initialize LiveCodeBench evaluation. + + Args: + n_repeats: Number of times to repeat each example + num_examples: Limit number of examples (for debugging) + n_threads: Number of threads for collecting model responses + lcb_workers: Number of parallel workers for code evaluation + test_timeout: Timeout for each test execution in seconds + lcb_version: LiveCodeBench version tag (e.g., "release_v5", "release_v6") + """ + self.n_repeats = n_repeats + self.n_threads = n_threads + self.lcb_workers = lcb_workers + self.test_timeout = test_timeout + self.lcb_version = lcb_version + + # Load questions from HuggingFace + examples = load_lcb_from_huggingface(lcb_version) + + # Limit examples if specified + if num_examples: + examples = examples[:num_examples] + + # Repeat examples + examples = examples * n_repeats + + self.examples = examples + print(f"Total examples to evaluate: {len(self.examples)}") + + def __call__(self, sampler: SamplerBase) -> EvalResult: + """ + Two-phase evaluation: + 1. Collect all model responses (using n_threads for parallelism) + 2. Batch evaluate all code in parallel using ProcessPoolExecutor + """ + # Phase 1: Collect all model responses + collected_results: List[Dict[str, Any]] = [] + + def collect_response(row: dict) -> Dict[str, Any]: + """Collect a single model response.""" + question_id = row["question_id"] + + # Construct prompt with starter code format section + user_prompt = format_prompt_with_starter_code( + row["prompt"], + row.get("starter_code", "") + ) + # Combine instructions and user prompt into a single user message + full_prompt = f"{LIVECODEBENCH_INSTRUCTIONS}\n\n{user_prompt}" + prompt_messages = [ + sampler._pack_message( + content=full_prompt, + role="user" + ), + ] + sampler_response = sampler(prompt_messages) + response_text = sampler_response.response_text + actual_queried_prompt_messages = sampler_response.actual_queried_message_list + + # Extract code from response + extracted_code = parse_code(response_text) + + return { + "question_id": question_id, + "prompt": row["prompt"], + "response_text": response_text, + "extracted_code": extracted_code, + "actual_queried_prompt_messages": actual_queried_prompt_messages, + } + + # Collect responses (can be parallelized with n_threads) + collected_results = report.map_with_progress( + collect_response, self.examples, num_threads=self.n_threads + ) + + # Phase 2: Batch evaluate all code in parallel + print(f"\nEvaluating {len(collected_results)} code samples with {self.lcb_workers} workers...") + + # Pre-load benchmark in main process before forking (for test execution) + try: + _ = load_lcb_benchmark_for_eval(self.lcb_version) + except Exception as e: + print(f"Warning: Failed to pre-load benchmark for evaluation: {e}") + + # Prepare work items + work_items = [] + for idx, result in enumerate(collected_results): + if result["extracted_code"]: + work_items.append((idx, result["extracted_code"], result["question_id"], self.lcb_version)) + + print(f"Extracted code from {len(work_items)} / {len(collected_results)} responses") + # Initialize scores to 0 + scores = [0.0] * len(collected_results) + eval_details = ["No code extracted"] * len(collected_results) + + if work_items: + max_workers = min(self.lcb_workers, len(work_items)) + print(f"Submitting {len(work_items)} code samples for evaluation...") + + with ProcessPoolExecutor(max_workers=max_workers) as executor: + future_to_idx = { + executor.submit(evaluate_livecodebench_worker, item): item[0] + for item in work_items + } + + future_timeout = self.test_timeout * 1.2 + for future in tqdm(as_completed(future_to_idx, timeout=future_timeout * len(work_items)), + total=len(future_to_idx), + desc="Evaluating code"): + idx = future_to_idx[future] + try: + result_idx, passed, reason = future.result(timeout=future_timeout) + scores[result_idx] = 1.0 if passed else 0.0 + eval_details[result_idx] = reason + except TimeoutError: + scores[idx] = 0.0 + eval_details[idx] = "Timeout: Test execution exceeded time limit" + except Exception as e: + scores[idx] = 0.0 + eval_details[idx] = f"Error: {type(e).__name__}: {e}" + + # Generate results + print("\nGenerating results...") + single_results = [] + + for idx, result in enumerate(collected_results): + score = scores[idx] + detail = eval_details[idx] + + # Generate HTML report + html = report.jinja_env.from_string(report.HTML_JINJA).render( + prompt_messages=result["actual_queried_prompt_messages"], + next_message=dict(content=result["response_text"], role="assistant"), + score=score, + correct_answer=f"question_id: {result['question_id']}", + extracted_answer=f"Code extracted: {'Yes' if result['extracted_code'] else 'No'}, {detail}", + ) + + convo = result["actual_queried_prompt_messages"] + [ + dict(content=result["response_text"], role="assistant") + ] + + single_results.append(SingleEvalResult( + html=html, + score=score, + convo=convo, + metrics={ + "chars": len(result["response_text"]), + "code_extracted": 1.0 if result["extracted_code"] else 0.0, + } + )) + + # Calculate summary stats + total = len(single_results) + passed = sum(1 for r in single_results if r.score > 0) + code_extracted = sum(1 for r in collected_results if r["extracted_code"]) + + print(f"\nLiveCodeBench Results:") + print(f" Total samples: {total}") + print(f" Code extracted: {code_extracted}/{total} ({100*code_extracted/total:.1f}%)") + print(f" Tests passed: {passed}/{total} ({100*passed/total:.1f}%)") + + return report.aggregate_results(single_results) + diff --git a/gpt_oss/evals/responses_sampler.py b/gpt_oss/evals/responses_sampler.py index 134303f5..551db6be 100644 --- a/gpt_oss/evals/responses_sampler.py +++ b/gpt_oss/evals/responses_sampler.py @@ -21,8 +21,11 @@ def __init__( reasoning_model: bool = False, reasoning_effort: str | None = None, base_url: str = "http://localhost:8000/v1", + top_p: float | None = None, + top_k: int | None = None, + timeout: int = 1800, ): - self.client = OpenAI(base_url=base_url, timeout=24*60*60) + self.client = OpenAI(base_url=base_url, timeout=timeout) self.model = model self.developer_message = developer_message self.temperature = temperature @@ -30,6 +33,8 @@ def __init__( self.image_format = "url" self.reasoning_model = reasoning_model self.reasoning_effort = reasoning_effort + self.top_p = top_p + self.top_k = top_k def _pack_message(self, role: str, content: Any) -> dict[str, Any]: return {"role": role, "content": content} @@ -48,6 +53,10 @@ def __call__(self, message_list: MessageList) -> SamplerResponse: "temperature": self.temperature, "max_output_tokens": self.max_tokens, } + if self.top_p is not None: + request_kwargs["top_p"] = self.top_p + if self.top_k is not None: + request_kwargs["extra_body"] = {"top_k": self.top_k} if self.reasoning_model: request_kwargs["reasoning"] = ( {"effort": self.reasoning_effort} if self.reasoning_effort else None diff --git a/gpt_oss/evals/submodules/LiveCodeBench b/gpt_oss/evals/submodules/LiveCodeBench new file mode 160000 index 00000000..b1e7cab4 --- /dev/null +++ b/gpt_oss/evals/submodules/LiveCodeBench @@ -0,0 +1 @@ +Subproject commit b1e7cab44d610bbc2e10d36d270cd0c89c600492