diff --git a/check_logits_hidden_layers.ipynb b/check_logits_hidden_layers.ipynb new file mode 100644 index 00000000..a9c55b2f --- /dev/null +++ b/check_logits_hidden_layers.ipynb @@ -0,0 +1,400 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 13, + "metadata": {}, + "outputs": [], + "source": [ + "from fast_llm.data.dataset.gpt.memmap import GPTMemmapDataset\n", + "from pathlib import Path\n", + "import numpy as np\n", + "from transformers import AutoTokenizer\n", + "import torch\n", + "import pickle\n", + "\n", + "import matplotlib.pyplot as plt" + ] + }, + { + "cell_type": "code", + "execution_count": 139, + "metadata": {}, + "outputs": [], + "source": [ + "files_root = Path(\"/mnt/datasets/tests/denis/tensors_f32/\")\n", + "#files_root = Path(\"/mnt/datasets/tests/denis/tensors/\")" + ] + }, + { + "cell_type": "code", + "execution_count": 140, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "1000" + ] + }, + "execution_count": 140, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "fm_files = {int(file.stem.split(\"tensor\")[1]): file for file in (files_root / \"fast_llm/logits/\").glob(\"tensor*.pt\")}\n", + "hf_files = {int(file.stem.split(\"tensor\")[1]): file for file in (files_root / \"hf/logits\").glob(\"tensor*.pt\")}\n", + "assert len(fm_files) == len(hf_files)\n", + "len(fm_files)" + ] + }, + { + "cell_type": "code", + "execution_count": 141, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/tmp/ipykernel_14929/1685567046.py:7: FutureWarning: You are using `torch.load` with `weights_only=False` (the current default value), which uses the default pickle module implicitly. It is possible to construct malicious pickle data which will execute arbitrary code during unpickling (See https://github.com/pytorch/pytorch/blob/main/SECURITY.md#untrusted-models for more details). In a future release, the default value for `weights_only` will be flipped to `True`. This limits the functions that could be executed during unpickling. Arbitrary objects will no longer be allowed to be loaded via this mode unless they are explicitly allowlisted by the user via `torch.serialization.add_safe_globals`. We recommend you start setting `weights_only=True` for any use case where you don't have full control of the loaded file. Please open an issue on GitHub for any issues related to this experimental feature.\n", + " fm_data = torch.load(fm_files[i])\n", + "/tmp/ipykernel_14929/1685567046.py:8: FutureWarning: You are using `torch.load` with `weights_only=False` (the current default value), which uses the default pickle module implicitly. It is possible to construct malicious pickle data which will execute arbitrary code during unpickling (See https://github.com/pytorch/pytorch/blob/main/SECURITY.md#untrusted-models for more details). In a future release, the default value for `weights_only` will be flipped to `True`. This limits the functions that could be executed during unpickling. Arbitrary objects will no longer be allowed to be loaded via this mode unless they are explicitly allowlisted by the user via `torch.serialization.add_safe_globals`. We recommend you start setting `weights_only=True` for any use case where you don't have full control of the loaded file. Please open an issue on GitHub for any issues related to this experimental feature.\n", + " hf_data = torch.load(hf_files[i])\n" + ] + }, + { + "data": { + "text/plain": [ + "True" + ] + }, + "execution_count": 141, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "hf_tokens = []\n", + "fm_tokens = []\n", + "max_adiff = []\n", + "mean_adiff = []\n", + "sum_adiff = []\n", + "for i in range(len(fm_files)):\n", + " fm_data = torch.load(fm_files[i])\n", + " hf_data = torch.load(hf_files[i])\n", + " \n", + " hf_tokens.append(hf_data[0, -1, :].argmax().item())\n", + " fm_tokens.append(fm_data[0, -1, :].argmax().item())\n", + "\n", + " adiff = torch.abs(hf_data[0, -1, :] - fm_data[0, -1, :])\n", + " max_adiff.append(adiff.max().item())\n", + " mean_adiff.append(adiff.mean().item())\n", + " sum_adiff.append(adiff.sum().item())\n", + " \n", + "all(a == b for a, b in zip(hf_tokens, fm_tokens))" + ] + }, + { + "cell_type": "code", + "execution_count": 129, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "107" + ] + }, + "execution_count": 129, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "min(len(hf_tokens)+1 if ab[0] == ab[1] else i for i, ab in enumerate(zip(hf_tokens, fm_tokens)))" + ] + }, + { + "cell_type": "code", + "execution_count": 130, + "metadata": {}, + "outputs": [ + { + "data": { + "image/png": "", + "text/plain": [ + "
" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "fig, axes = plt.subplots(1, 2, figsize=(12, 5), sharex=True)\n", + "\n", + "# Left plot: max and mean absolute differences\n", + "axes[0].plot(max_adiff, label='max')\n", + "axes[0].plot(mean_adiff, label='mean')\n", + "axes[0].set_title('Max and Mean Absolute Difference')\n", + "axes[0].set_xlabel('Token Position Index')\n", + "axes[0].set_ylabel('Absolute Difference')\n", + "axes[0].legend()\n", + "axes[0].grid(True)\n", + "\n", + "# Right plot: sum absolute difference\n", + "axes[1].plot(sum_adiff, label='sum', color='tab:orange')\n", + "axes[1].set_title('Sum Absolute Difference')\n", + "axes[1].set_xlabel('Token Position Index')\n", + "axes[1].set_ylabel('Absolute Difference')\n", + "axes[1].legend()\n", + "axes[1].grid(True)\n", + "\n", + "plt.tight_layout()\n", + "plt.show()" + ] + }, + { + "cell_type": "code", + "execution_count": 131, + "metadata": {}, + "outputs": [], + "source": [ + "fm_hidden_files = {int(file.stem.split(\"data\")[1]): file for file in (files_root / \"fast_llm/hidden_states/\").glob(\"data*.pickle\")}\n", + "hf_hidden_files = {int(file.stem.split(\"data\")[1]): file for file in (files_root / \"hf/hidden_states\").glob(\"data*.pickle\")}" + ] + }, + { + "cell_type": "code", + "execution_count": 132, + "metadata": {}, + "outputs": [], + "source": [ + "def mad(new_token_index, fm_hidden_files, hf_hidden_files):\n", + " with fm_hidden_files[new_token_index].open(\"rb\") as f:\n", + " fm_data = pickle.load(f)\n", + " with hf_hidden_files[new_token_index].open(\"rb\") as f:\n", + " hf_data = pickle.load(f)\n", + " max_adiffs_hidden_layers = []\n", + " for i in range(len(hf_data)):\n", + " max_adiff = torch.abs(hf_data[i][0,-1,:]-fm_data[i]['tensor'][0,-1,:]).max().item()\n", + " max_adiffs_hidden_layers.append(max_adiff)\n", + " return max_adiffs_hidden_layers\n", + " " + ] + }, + { + "cell_type": "code", + "execution_count": 133, + "metadata": {}, + "outputs": [], + "source": [ + "new_token_index = 107\n", + "new_token_index1 = 108\n", + "max_adiffs_hidden_layers = mad(0, fm_hidden_files, hf_hidden_files)\n", + "max_adiffs_hidden_layers2 = mad(new_token_index, fm_hidden_files, hf_hidden_files)\n", + "max_adiffs_hidden_layers3 = mad(new_token_index1, fm_hidden_files, hf_hidden_files)" + ] + }, + { + "cell_type": "code", + "execution_count": 134, + "metadata": {}, + "outputs": [ + { + "data": { + "image/png": "", + "text/plain": [ + "
" + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "fig, axes = plt.subplots(1, 2, figsize=(12, 5), sharex=True)\n", + "\n", + "axes[0].plot(max_adiffs_hidden_layers, label='new_token_0', color='blue')\n", + "axes[0].plot(max_adiffs_hidden_layers2, label=f'new_token_{new_token_index}', color='green')\n", + "axes[0].set_title('Max and Mean Absolute Difference')\n", + "axes[0].set_xlabel('Hidden Layer Index')\n", + "axes[0].set_ylabel('Max Absolute Difference')\n", + "axes[0].legend()\n", + "axes[0].grid(True)\n", + "\n", + "axes[1].plot(max_adiffs_hidden_layers, label='new_token_0', color='blue')\n", + "axes[1].plot(max_adiffs_hidden_layers3, label=f'new_token_{new_token_index1}', color='green')\n", + "axes[1].set_title('Max and Mean Absolute Difference')\n", + "axes[1].set_xlabel('Hidden Layer Index')\n", + "axes[1].set_ylabel('Max Absolute Difference')\n", + "axes[1].legend()\n", + "axes[1].grid(True)\n", + "\n", + "\n", + "\n", + "plt.title('Per-layer Max Absolute Differences')\n", + "plt.tight_layout()\n", + "plt.show()" + ] + }, + { + "cell_type": "code", + "execution_count": 144, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "[2389, 28, 527, 26648, 357, 2258, 260, 3712, 282, 260, 635, 4062, 12903, 30]\n", + "[2389, 284, 260, 1439, 357, 3593, 30, 378, 540, 6207, 260, 1569, 28, 260]\n" + ] + } + ], + "source": [ + "print(hf_tokens_bf16[106:120])\n", + "print(fm_tokens_b16[106:120])" + ] + }, + { + "cell_type": "code", + "execution_count": 145, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "[2389, 284, 260, 1439, 357, 3593, 30, 378, 540, 6207, 260, 1569, 28, 260]\n", + "[2389, 284, 260, 1439, 357, 3593, 30, 378, 540, 6207, 260, 1569, 28, 260]\n" + ] + } + ], + "source": [ + "print(hf_tokens[106:120])\n", + "print(fm_tokens[106:120])" + ] + }, + { + "cell_type": "code", + "execution_count": 136, + "metadata": {}, + "outputs": [], + "source": [ + "hf_tokens_bf16 = hf_tokens\n", + "fm_tokens_b16 = fm_tokens" + ] + }, + { + "cell_type": "code", + "execution_count": 152, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "1001" + ] + }, + "execution_count": 152, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "min(len(hf_tokens)+1 if ab[0] == ab[1] else i for i, ab in enumerate(zip(hf_tokens, fm_tokens)))" + ] + }, + { + "cell_type": "code", + "execution_count": 153, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "107" + ] + }, + "execution_count": 153, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "min(len(hf_tokens)+1 if ab[0] == ab[1] else i for i, ab in enumerate(zip(hf_tokens, hf_tokens_bf16)))" + ] + }, + { + "cell_type": "code", + "execution_count": 154, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "174" + ] + }, + "execution_count": 154, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "min(len(hf_tokens)+1 if ab[0] == ab[1] else i for i, ab in enumerate(zip(fm_tokens, fm_tokens_b16)))" + ] + }, + { + "cell_type": "code", + "execution_count": 151, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "1001" + ] + }, + "execution_count": 151, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "min(len(hf_tokens)+1 if ab[0] == ab[1] else i for i, ab in enumerate(zip(hf_tokens, fm_tokens)))" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "fastllm", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.9" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/classes_fast_llm.jpg b/classes_fast_llm.jpg new file mode 100644 index 00000000..ea418943 Binary files /dev/null and b/classes_fast_llm.jpg differ diff --git a/examples/qwen_evaluate.yaml b/examples/qwen_evaluate.yaml new file mode 100644 index 00000000..c1890ad0 --- /dev/null +++ b/examples/qwen_evaluate.yaml @@ -0,0 +1,77 @@ +training: + train_iters: 100_000 + logs: + interval: 10 + evaluations: + gsm8k: + type: lm_eval + cli_args: + - --tasks + - gsm8k + - --output_path + - /mnt/checkpoints/test/denis/qwen_eval_experiment/lm_eval + # stack_3b: + # iterations: 10 + # interval: 10 + # fineweb: + # iterations: 10 + # interval: 10 + checkpoint: + interval: 1000 + keep: 5 + test_iters: 0 + export: # (1)! + format: llama + interval: 20_000 +batch: + micro_batch_size: 16 + sequence_length: 4096 + batch_size: 16 +data: + tokenizer: + path: /mnt/checkpoints/pretrained_models/Qwen2-1.5B-Instruct + bos_token: "<|endoftext|>" + datasets: + # Bad dataset they are tokenized with different tokenizer, then llama + training: + type: file + path: /mnt/datasets/test/denis/fineweb_the_stack_3b.yaml + stack_3b: + type: memmap + path: /mnt/datasets/data_collections/the_stack_3b/tokens/stack_3b/default/train/99 + fineweb: + type: memmap + path: /mnt/datasets/data_collections/standalone_datasets/tokens/HuggingFaceFW/fineweb/default/train/9_1000 +optimizer: + weight_decay: 0.1 + beta_1: 0.9 + beta_2: 0.95 + learning_rate: + base: 1.0e-04 # (3)! + minimum: 1.0e-05 + decay_style: cosine + decay_iterations: 100_000 + warmup_iterations: 2000 +pretrained: # (4)! + format: qwen2 + path: /mnt/checkpoints/pretrained_models/Qwen2-1.5B-Instruct + model_weights: yes # (5)! +model: + base_model: + transformer: + use_flash_attention: yes + cross_entropy_impl: fused + multi_stage: + zero_stage: 2 + distributed: + training_dtype: bf16 + +run: + experiment_dir: "/mnt/checkpoints/test/denis/qwen_eval_experiment" + +# training: +# logs: +# interval: 10 +# wandb: +# project_name: ${job.project_name} +# group_name: ${job.project_version} \ No newline at end of file diff --git a/examples/smol_evaluate.yaml b/examples/smol_evaluate.yaml new file mode 100644 index 00000000..7c06ffed --- /dev/null +++ b/examples/smol_evaluate.yaml @@ -0,0 +1,77 @@ +training: + train_iters: 100_000 + logs: + interval: 10 + evaluations: + gsm8k: + type: lm_eval + cli_args: + - --tasks + - gsm8k + - --output_path + - /mnt/checkpoints/test/denis/smol_eval_experiment/lm_eval + # stack_3b: + # type: loss + # iterations: 10 + # interval: 10 + # fineweb: + # iterations: 10 + # interval: 10 + checkpoint: + interval: 1000 + keep: 5 + test_iters: 0 + export: # (1)! + format: llama + interval: 20_000 +batch: + micro_batch_size: 16 + sequence_length: 4096 + batch_size: 16 +data: + tokenizer: + path: /mnt/checkpoints/pretrained_models/SmolLM2-135M-Instruct + datasets: + # Bad dataset they are tokenized with different tokenizer, then llama + training: + type: file + path: /mnt/datasets/test/denis/fineweb_the_stack_3b.yaml + stack_3b: + type: memmap + path: /mnt/datasets/data_collections/the_stack_3b/tokens/stack_3b/default/train/99 + fineweb: + type: memmap + path: /mnt/datasets/data_collections/standalone_datasets/tokens/HuggingFaceFW/fineweb/default/train/9_1000 +optimizer: + weight_decay: 0.1 + beta_1: 0.9 + beta_2: 0.95 + learning_rate: + base: 1.0e-04 # (3)! + minimum: 1.0e-05 + decay_style: cosine + decay_iterations: 100_000 + warmup_iterations: 2000 +pretrained: # (4)! + format: llama + path: /mnt/checkpoints/pretrained_models/SmolLM2-135M-Instruct/ + model_weights: yes # (5)! +model: + base_model: + transformer: + use_flash_attention: yes + cross_entropy_impl: fused + multi_stage: + zero_stage: 2 + distributed: + training_dtype: bf16 + +run: + experiment_dir: "/mnt/checkpoints/test/denis/smol_eval_experiment" + +# training: +# logs: +# interval: 10 +# wandb: +# project_name: ${job.project_name} +# group_name: ${job.project_version} \ No newline at end of file diff --git a/fast_llm/data/config.py b/fast_llm/data/config.py index 1586d370..4c041945 100644 --- a/fast_llm/data/config.py +++ b/fast_llm/data/config.py @@ -34,3 +34,8 @@ class TokenizerConfig(Config): desc="Path to the tokenizer file.", hint=FieldHint.core, ) + bos_token: str | None = Field( + default=None, + desc="BOS token to use if the tokenizer doesn't define one; must be an existing token.", + hint=FieldHint.core, + ) diff --git a/fast_llm/data/tokenizer.py b/fast_llm/data/tokenizer.py index 28e105ee..bc801ed0 100644 --- a/fast_llm/data/tokenizer.py +++ b/fast_llm/data/tokenizer.py @@ -1,6 +1,6 @@ import numpy as np import torch -from transformers import PreTrainedTokenizerFast +from transformers import PreTrainedTokenizerFast, AutoTokenizer from fast_llm.data.config import TokenizerConfig from fast_llm.engine.config_utils.run import log_main_rank @@ -13,9 +13,18 @@ class Tokenizer: def __init__(self, config: TokenizerConfig): log_main_rank(f"> loading tokenizer from {config.path} ...") - self.tokenizer: PreTrainedTokenizerFast = PreTrainedTokenizerFast.from_pretrained( - pretrained_model_name_or_path=config.path, errors="replace", max_len=None + # self.tokenizer: PreTrainedTokenizerFast = PreTrainedTokenizerFast.from_pretrained( + # pretrained_model_name_or_path=config.path, errors="replace", max_len=None + # ) + self.tokenizer = AutoTokenizer.from_pretrained( + pretrained_model_name_or_path=config.path, + errors="replace", + max_len=None, + trust_remote_code=True, + use_fast=True, # This is the flag you're asking about ) + if config.bos_token is not None: + self.tokenizer.bos_token = config.bos_token if self.tokenizer.eos_token_id is None: raise ValueError("Tokenizer does not have an EOS token.") if self.tokenizer.bos_token_id is None: diff --git a/fast_llm/engine/inference/config.py b/fast_llm/engine/inference/config.py index d4b46bcc..c18daa48 100644 --- a/fast_llm/engine/inference/config.py +++ b/fast_llm/engine/inference/config.py @@ -91,7 +91,8 @@ def __eq__(self, other) -> bool: def to_dict(self) -> dict[str, typing.Any]: out = super().to_dict() - out["fast_llm_config"] = self.fast_llm_config.to_dict(verbose=FieldVerboseLevel.everything) + if self.fast_llm_config is not None: + out["fast_llm_config"] = self.fast_llm_config.to_dict(verbose=FieldVerboseLevel.everything) return out def to_diff_dict(self) -> dict[str, typing.Any]: diff --git a/fast_llm/engine/inference/huggingface.py b/fast_llm/engine/inference/huggingface.py index 196310b4..a39345a3 100644 --- a/fast_llm/engine/inference/huggingface.py +++ b/fast_llm/engine/inference/huggingface.py @@ -2,16 +2,22 @@ import pathlib import typing +import torch import transformers.modeling_outputs +import transformers.generation.utils from fast_llm.engine.checkpoint.config import CheckpointLoadConfig, FastLLMCheckpointFormat from fast_llm.engine.inference.config import HuggingfaceModelConfig from fast_llm.engine.inference.runner import InferenceRunner from fast_llm.engine.multi_stage.config import StageMode from fast_llm.engine.multi_stage.fast_llm_model import FastLLMModel +from fast_llm.engine.schedule.config import BatchConfig, ScheduleConfig +from fast_llm.engine.schedule.runner import ScheduleRunner +from fast_llm.engine.schedule.schedule import Schedule +from fast_llm.engine.training.config import TrainerConfig -class HuggingfacePreTrainedModel(transformers.PreTrainedModel): +class HuggingfaceBaseModelForCausalLM(transformers.PreTrainedModel, transformers.generation.utils.GenerationMixin): config_class: typing.ClassVar[type[HuggingfaceModelConfig]] = HuggingfaceModelConfig runner_class: typing.ClassVar[type[InferenceRunner]] = InferenceRunner config: HuggingfaceModelConfig @@ -20,31 +26,84 @@ class HuggingfacePreTrainedModel(transformers.PreTrainedModel): # _supports_cache_class = False # _tied_weights_keys = [] - def __init__(self, config: HuggingfaceModelConfig, fast_llm_model: FastLLMModel, **kwargs): + def __init__( + self, + config: HuggingfaceModelConfig, + fast_llm_model: FastLLMModel, + trainer_config: TrainerConfig | None = None, + runner: ScheduleRunner | None = None, + **kwargs, + ): + """ + Initializes the HuggingfaceBaseModelForCausalLM either in standalone mode (single GPU inference) + or integrated training mode (with runner from training loop). + + - If `trainer_config` and `runner` are both provided → assumes training mode. + - If both are omitted → assumes standalone mode with default configs. + - Any other combination will raise. + """ assert self.runner_class.model_class.config_class is config.model_config_class assert config.fast_llm_config is fast_llm_model.config assert isinstance(config, self.config_class) + # The HF constructor performs a deep copy of the config, + # but config.fast_llm_config may contain non-picklable items like process groups. + # Temporarily remove it before the call and restore it afterward. + fast_llm_config = config.fast_llm_config + config.fast_llm_config = None super().__init__(config, **kwargs) + config.fast_llm_config = fast_llm_config + + self._inference_runner = self.runner_class(fast_llm_model, trainer_config, runner) - self._inference_runner = self.runner_class(fast_llm_model) - if not fast_llm_model.is_setup: - fast_llm_model.setup(mode=StageMode.inference) + # A model can be created from pretrained which setup it in the current HF wrapper api + # or set from training loop and also is setup, so, do not accept not setup model + assert fast_llm_model.is_setup + # if not fast_llm_model.is_setup: + # fast_llm_model.setup(distributed=distributed, mode=StageMode.inference) self._inference_runner.setup() + # Transformers needs to be able to inspect the base model. self.fast_llm_base_model = fast_llm_model.base_model - # TODO: Support distributed models? - assert fast_llm_model.config.distributed.world_size == 1 + # # TODO: Support distributed models? + # assert fast_llm_model.config.distributed.world_size == 1 with transformers.modeling_utils.no_init_weights(): self.post_init() + def forward( + self, + input_ids: torch.Tensor | None = None, + attention_mask: torch.Tensor | None = None, + position_ids: torch.Tensor | None = None, + past_key_values=None, + inputs_embeds: torch.FloatTensor | None = None, + labels: torch.LongTensor | None = None, + use_cache: bool | None = None, + output_attentions: bool | None = None, + output_hidden_states: bool | None = None, + return_dict: bool | None = None, + ) -> tuple | transformers.modeling_outputs.CausalLMOutputWithPast: + # Meant to be overridden in derived classes + raise NotImplementedError() + + @classmethod + def from_fast_llm_model_in_training( + cls, fast_llm_model: FastLLMModel, trainer_config: TrainerConfig, runner: ScheduleRunner, **kwargs + ): + config = cls.config_class(fast_llm_model.config) + return cls(config, fast_llm_model, trainer_config=trainer_config, runner=runner, **kwargs) + @classmethod def from_pretrained( cls, pretrained_model_name_or_path: str | os.PathLike | CheckpointLoadConfig, - *, - mode: StageMode = StageMode.inference, + *updates: dict[str | tuple[str, ...], typing.Any], + optimizer_state_names: tuple[str, ...] | None = None, + # setup: bool = True, + mode: StageMode = StageMode.training, + use_cpu: bool = False, + stage_filter: set | None = None, **kwargs, ) -> typing.Self: # Pretrained config. @@ -54,18 +113,23 @@ def from_pretrained( format=FastLLMCheckpointFormat, ) - updates = {} - torch_dtype = kwargs.pop("torch_dtype", None) - if torch_dtype is not None: - updates[("distributed", "training_dtype")] = torch_dtype - # Create the model + # always set up model and crate distributed instance internally for now fast_llm_model = cls.runner_class.model_class.from_pretrained( - pretrained_model_name_or_path, updates, mode=mode + pretrained_model_name_or_path, + *updates, + optimizer_state_names=optimizer_state_names, + # setup=setup, + mode=mode, + use_cpu=use_cpu, + stage_filter=stage_filter, ) - config = cls.config_class(fast_llm_model.config) + config = cls.config_class(fast_llm_model.config) return cls(config, fast_llm_model, **kwargs) def _init_weights(self, module) -> None: raise NotImplementedError(module) + + def can_generate(self): + return True diff --git a/fast_llm/engine/inference/runner.py b/fast_llm/engine/inference/runner.py index 30f836b7..7e608e23 100644 --- a/fast_llm/engine/inference/runner.py +++ b/fast_llm/engine/inference/runner.py @@ -7,27 +7,46 @@ from fast_llm.engine.schedule.config import BatchConfig, ScheduleConfig from fast_llm.engine.schedule.runner import ScheduleRunner from fast_llm.engine.schedule.schedule import Schedule +from fast_llm.engine.training.config import TrainerConfig class InferenceRunner(abc.ABC): model_class: typing.ClassVar[type[FastLLMModel]] = FastLLMModel batch_config_class: typing.ClassVar[type[BatchConfig]] = BatchConfig - def __init__(self, fast_llm_model: FastLLMModel): + def __init__( + self, + fast_llm_model: FastLLMModel, + trainer_config: TrainerConfig | None = None, + runner: ScheduleRunner | None = None, + ): + has_training_args = trainer_config is not None and runner is not None + has_partial_args = (trainer_config is None) != (runner is None) + if has_partial_args: + raise ValueError("Both trainer_config and runner must be provided together or not at all.") + assert isinstance(fast_llm_model, self.model_class) self._fast_llm_model = fast_llm_model - # We only need a basic schedule and don't care about dimensions. - self._schedule_config = ScheduleConfig() - # TODO: Sort things out. - with NoAutoValidate(): - self._batch_config = self.batch_config_class() - self._batch_config.setup(self._fast_llm_model.config.distributed) - self._batch_config.validate() - self._runner = ScheduleRunner( - config=self._schedule_config, - multi_stage=self._fast_llm_model, - distributed_config=self._fast_llm_model.config.distributed, - ) + if has_training_args: + self._trainer_config = trainer_config + self._schedule_config = self._trainer_config.schedule + self._batch_config = self._trainer_config.batch + self._runner = runner + # External runner from training loop must be already setup + assert runner._is_setup + else: + # We only need a basic schedule and don't care about dimensions. + self._schedule_config = ScheduleConfig() + # TODO: Sort things out. + with NoAutoValidate(): + self._batch_config = self.batch_config_class() + self._batch_config.setup(self._fast_llm_model.config.distributed) + self._batch_config.validate() + self._runner = ScheduleRunner( + config=self._schedule_config, + multi_stage=self._fast_llm_model, + distributed_config=self._fast_llm_model.config.distributed, + ) # TODO: Random state? (Distributed.set_step) self._schedule = Schedule( multi_stage=self._fast_llm_model, @@ -42,7 +61,8 @@ def fast_llm_model(self) -> FastLLMModel: return self._fast_llm_model def setup(self): - self._runner.setup(self._fast_llm_model.distributed) + if not self._runner._is_setup: + self._runner.setup(self._fast_llm_model.distributed) def forward( self, input_, kwargs: dict, *, iteration: int = 1, return_metrics: bool = False diff --git a/fast_llm/engine/multi_stage/config.py b/fast_llm/engine/multi_stage/config.py index 69bf3695..174f4a56 100644 --- a/fast_llm/engine/multi_stage/config.py +++ b/fast_llm/engine/multi_stage/config.py @@ -30,7 +30,7 @@ from fast_llm.utils import Assert if typing.TYPE_CHECKING: - from fast_llm.engine.inference.model import HuggingfacePreTrainedModel + from fast_llm.engine.inference.model import HuggingfaceBaseModelForCausalLM from fast_llm.engine.multi_stage.fast_llm_model import FastLLMModel logger = logging.getLogger(__name__) @@ -247,7 +247,7 @@ def get_model_class(cls) -> type["FastLLMModel"]: raise NotImplementedError @classmethod - def get_huggingface_model_class(cls) -> type["HuggingfacePreTrainedModel"]: + def get_huggingface_model_for_causal_lm_class(cls) -> type["HuggingfaceBaseModelForCausalLM"]: raise NotImplementedError @classmethod diff --git a/fast_llm/engine/multi_stage/stage.py b/fast_llm/engine/multi_stage/stage.py index 675e878b..eb37c292 100644 --- a/fast_llm/engine/multi_stage/stage.py +++ b/fast_llm/engine/multi_stage/stage.py @@ -13,6 +13,9 @@ from fast_llm.tensor import ParameterMeta, TensorMeta, accumulate_gradient from fast_llm.utils import Assert +if typing.TYPE_CHECKING: + from fast_llm.core.distributed import ProcessGroup + logger = logging.getLogger(__name__) @@ -111,6 +114,15 @@ def forward( metrics, ) self._log_layer_forward(output, kwargs, i) + + # TODO: very slow and memory consuming, only use for debugging for now + # TODO: decide if and how we want to return + # HF transformer style details from forward properly + if "output_hidden_states" in kwargs and kwargs["output_hidden_states"]: + kwargs["hidden_states"][self._layer_range[i]] = { + "layer_type": type(layer).__name__, + "tensor": self._get_global_output_tensor(i, output), + } return None if output is None else output.detach(), (input_, output) def backward( @@ -185,6 +197,16 @@ def invalidate_buffer(self) -> None: for fsdp in self._fsdps: fsdp.invalidate_buffer() + @torch._dynamo.disable # noqa + def _get_global_output_tensor( + self, + i: int, + tensor: torch.Tensor, + ) -> typing.Tuple[torch.Tensor, bool]: + meta = self._meta_outputs[i] + tensor, _ = meta.local_to_global(tensor, distributed=self._distributed) + return tensor + def _log_layer_forward(self, output: torch.Tensor, kwargs: dict[str, typing.Any], i: int) -> None: if ( self._config.debug_tensor_parallel diff --git a/fast_llm/engine/schedule/runner.py b/fast_llm/engine/schedule/runner.py index 8eca4559..94991915 100644 --- a/fast_llm/engine/schedule/runner.py +++ b/fast_llm/engine/schedule/runner.py @@ -396,8 +396,11 @@ def _recv(self, context: BatchContext, step: Step) -> None: self._record_event(context, EventType.compute_wait_pipe, step) def _forward(self, context: BatchContext, step: Step) -> None: + input = self._get_forward_input(context, step) + if not "hidden_states" in context.batch[step.data_index]: + context.batch[step.data_index]["hidden_states"] = {} output, grad_context = self._stages[step.stage].forward( - self._get_forward_input(context, step), + input, context.batch[step.data_index], losses=context.losses, metrics=context.metrics, diff --git a/fast_llm/engine/training/config.py b/fast_llm/engine/training/config.py index 8b4cadc3..8373397e 100644 --- a/fast_llm/engine/training/config.py +++ b/fast_llm/engine/training/config.py @@ -28,11 +28,12 @@ from fast_llm.engine.optimizer.config import OptimizerConfig from fast_llm.engine.schedule.config import BatchConfig, ScheduleConfig from fast_llm.profile import ProfilingConfig -from fast_llm.utils import Assert +from fast_llm.utils import Assert, Registry if typing.TYPE_CHECKING: from fast_llm.engine.inference.runner import InferenceRunner from fast_llm.engine.training.trainer import Trainer + from fast_llm.engine.training.evaluator import Evaluation, EvaluationLoss, EvaluationHarness @config_class() @@ -154,6 +155,75 @@ class WandbConfig(Config): @config_class() class EvaluationConfig(IntervalConfig): + _abstract: typing.ClassVar[bool] = True + # TODO: Generalize dynamic types? + _registry: typing.ClassVar[Registry[str, type["EvaluationConfig"]]] = Registry[str, type["EvaluationConfig"]]( + "evaluation_class", {} + ) + type_: typing.ClassVar[str | None] = None + type: str | None = Field( + default=None, + desc="The type of evaluation.", + hint=FieldHint.core, + ) + + @classmethod + def get_evaluation_class(cls) -> "Evaluation": + raise NotImplementedError + + def _validate(self) -> None: + if self.type is None: + self.type = self.type_ + # Should be handled in `from_dict`, but can fail if instantiating directly. + Assert.eq(self.type, self.__class__.type_) + super()._validate() + + @classmethod + def _from_dict( + cls, + default: dict[str, typing.Any], + strict: bool = True, + flat: bool = False, + ) -> typing.Self: + type_ = default.get("type") + if type_ is None: + # TODO: Remove in version 0.* — this is for backward compatibility. + # If 'type' is not provided, it falls back to 'loss'. + type_ = "loss" + default["type"] = type_ + actual_cls = EvaluationLossConfig + # actual_cls = cls + else: + if type_ not in cls._registry: + raise ValueError( + f"Unknown {cls._registry.name} type {type_}." f" Available types: {list(cls._registry.keys())}" + ) + actual_cls = cls._registry[type_] + Assert.custom(issubclass, actual_cls, cls) + if actual_cls == cls: + return super()._from_dict(default, strict=strict, flat=flat) + else: + return actual_cls._from_dict(default, strict=strict, flat=flat) + + def __init_subclass__(cls) -> None: + if cls._abstract and cls.type_ is not None: + # Abstract classes should not have a `type_` + raise ValueError(f"Abstract class {cls.__name__} has type = {cls.type_}, expected None.") + if cls.type_ is not None: + if cls.type_ in cls._registry: + raise ValueError( + f"Registry {cls._registry.name} already contains type {cls.type_}." + f" Make sure all classes either have a unique or `None` type." + ) + EvaluationConfig._registry[cls.type_] = cls + super().__init_subclass__() + + +@config_class() +class EvaluationLossConfig(EvaluationConfig): + _abstract: typing.ClassVar[bool] = False + type_: typing.ClassVar[str | None] = "loss" + interval = FieldUpdate( desc="The number of training iterations between each evaluation phase." " Setting to None will disable evaluation." @@ -170,6 +240,59 @@ def get_iteration_count(self, training_iterations: int, extra_evaluations: int = # Number of completed validation iterations return (self.get_count(training_iterations) + extra_evaluations) * self.iterations if self.enabled() else 0 + @classmethod + def get_evaluation_class(cls) -> type["EvaluationLoss"]: + from fast_llm.engine.training.evaluator import EvaluationLoss + + return EvaluationLoss + + +@config_class() +class EvaluationHarnessConfig(EvaluationConfig): + _abstract: typing.ClassVar[bool] = False + type_: typing.ClassVar[str | None] = "lm_eval" + + interval = FieldUpdate( + desc="The number of training iterations between each evaluation phase." + " Setting to None will disable evaluation." + ) + offset = FieldUpdate(desc="Offset for the first evaluation phase.") + + cli_args: list[str] = Field( + default_factory=lambda: [], + desc="lm_eval CLI arguments, excluding those related to model, wandb, batch sizes, and device.", + ) + + truncation: bool = Field( + default=False, + desc="Whether to use truncation during tokenization (useful when inputs exceed model's max length);" + " passed to the Fast-LLM lm_eval model wrapper.", + ) + + logits_cache: bool = Field( + default=True, + desc="Whether to enable logits caching for speedup and avoiding recomputation during repeated evaluations;" + " passed to the Fast-LLM lm_eval model wrapper.", + ) + + add_bos_token: bool = Field( + default=False, + desc="Whether to prepend a beginning-of-sequence (BOS) token, required for some models like LLaMA;" + " passed to the Fast-LLM lm_eval model wrapper.", + ) + + prefix_token_id: int | None = Field( + default=None, + desc="Token ID to use as a prefix to the input (e.g., for control codes or prompts);" + " passed to the Fast-LLM lm_eval model wrapper.", + ) + + @classmethod + def get_evaluation_class(cls) -> type["EvaluationHarness"]: + from fast_llm.engine.training.evaluator import EvaluationHarness + + return EvaluationHarness + @config_class() class TrainingCheckpointBaseConfig(IntervalConfig): diff --git a/fast_llm/engine/training/evaluator.py b/fast_llm/engine/training/evaluator.py new file mode 100644 index 00000000..0abf2115 --- /dev/null +++ b/fast_llm/engine/training/evaluator.py @@ -0,0 +1,419 @@ +import abc +import logging +import math +import pathlib +import shutil +import time +import typing + +import torch + +from fast_llm.config import Configurable +from fast_llm.core.distributed import safe_barrier +from fast_llm.data.data.abstract import Data +from fast_llm.engine.config_utils.run import Run, is_main_rank, log_main_rank, log_pipeline_parallel_main_rank +from fast_llm.engine.distributed.config import PhaseType +from fast_llm.engine.distributed.distributed import Distributed +from fast_llm.engine.multi_stage.fast_llm_model import FastLLMModel + +from fast_llm.engine.schedule.runner import ScheduleRunner +from fast_llm.engine.schedule.schedule import Schedule +from fast_llm.engine.training.config import ( + TrainerConfig, + EvaluationConfig, + EvaluationLossConfig, + EvaluationHarnessConfig, +) +from fast_llm.engine.training.wandb import Wandb +from fast_llm.logging import format_metrics, get_memory_usage_mib, log_memory_usage +from fast_llm.utils import Assert +from fast_llm.engine.training.lm_eval.fast_llm_wrapper import FastLLMWrapper +from fast_llm.engine.training.lm_eval.utils import prepare_lm_eval_simple_eval_params, process_lm_eval_results + +# from fast_llm.engine.training.lm_eval.evaluator import simple_evaluate as lm_eval_simple_evaluate +from lm_eval.evaluator import simple_evaluate as lm_eval_simple_evaluate + +logger = logging.getLogger(__name__) + + +class Evaluation[ConfigType: EvaluationConfig](Configurable[ConfigType], abc.ABC): + config_class: typing.ClassVar[type[EvaluationConfig]] = EvaluationConfig + + _is_setup: bool = False + + @classmethod + def build( + cls, + name: str, + eval_config: EvaluationLossConfig, + trainer_config: TrainerConfig, + get_tflops_func: callable, + ) -> "Evaluation": + return cls( + name=name, + eval_config=eval_config, + trainer_config=trainer_config, + get_tflops_func=get_tflops_func, + ) + + def setup( + self, + distributed: Distributed, + run: Run, + multi_stage: FastLLMModel, + runner: ScheduleRunner, + data: Data, + ) -> None: + # TODO: check if objects passed are actually set up themselves, if appropriate + self._distributed = distributed + self._run = run + self._runner = runner + self._multi_stage = multi_stage + self._data = data + + @abc.abstractmethod + def run( + self, + done: bool, + completed_steps: int, + consumed_samples: int, + consumed_tokens: int, + ) -> tuple[dict[str, any], str | None]: ... + + @abc.abstractmethod + def get_dataset_samples(self) -> tuple[str, int] | None: + """ + Returns the name and number of required samples in a dataset, + or None if the evaluation does not rely on Fast-LLM data or + if the evaluation is skipped for this run. + """ + + +class EvaluationLoss[ConfigType: EvaluationLossConfig](Evaluation[ConfigType]): + config_class: typing.ClassVar[type[EvaluationLossConfig]] = EvaluationLossConfig + + def __init__( + self, + name: str, + eval_config: EvaluationLossConfig, + trainer_config: TrainerConfig, + get_tflops_func: callable, + ): + self._name = name + self._eval_config = eval_config + self._trainer_config = trainer_config + self._get_tflops_func = get_tflops_func + + steps = self._eval_config.get_iteration_count( + self._trainer_config.training.train_iters, + # There may be an extra evaluation after the last training step. + not self._eval_config.enabled(self._trainer_config.training.train_iters), + ) + + self._samples = self._trainer_config.batch.batch_size * steps if steps > 0 else None + + self._evaluation_iterator = None + + def setup( + self, + distributed: Distributed, + run: Run, + multi_stage: FastLLMModel, + runner: ScheduleRunner, + data: Data, + ) -> None: + super().setup(distributed, run, multi_stage, runner, data) + self._loss_defs = self._multi_stage.base_model.loss_defs + # Setup the schedule + self._schedule = Schedule( + multi_stage=self._multi_stage, + batch_config=self._trainer_config.batch, + schedule_config=self._trainer_config.schedule, + distributed_config=self._trainer_config.model.distributed, + phase=PhaseType.validation, + ) + + self._is_setup = True + + def get_dataset_samples(self) -> tuple[str, int] | None: + if self._samples is None: + return None + return self._name, self._samples + + def run( + self, + done: bool, + completed_steps: int, + consumed_samples: int, + consumed_tokens: int, + ) -> tuple[dict[str, any], str | None]: + assert self._is_setup + metrics = {} + formatted_metrics = None + if self._samples is not None and (done or self._eval_config.enabled(completed_steps)): + + if self._evaluation_iterator is None: + self._evaluation_iterator = self._get_data_iterator( + self._get_completed_evaluation_steps(completed_steps) + ) + # TODO: formatting metric category as Validation.evaluation_dataset_name + # maybe format each metric with evaluation_dataset_name prefix instead? + # TODO: setting performance metrics per evaluation dataset + # maybe to set aggregate performance metrics for all evaluations datasets? + metric_key = f"{PhaseType.validation.value}.{self._name}" + metrics[metric_key] = self._evaluate_loss( + data_iterator=self._evaluation_iterator, + phase=PhaseType.validation, + num_iters=self._eval_config.iterations, + begin_iter=self._get_completed_evaluation_steps(completed_steps), + completed_steps=completed_steps, + consumed_samples=consumed_samples, + consumed_tokens=consumed_tokens, + ) + formatted_metrics = format_metrics( + metrics[metric_key], + self._loss_defs, + PhaseType.validation, + dataset_name=self._name, + ) + + return metrics, formatted_metrics + + def _evaluate_loss( + self, + *, + data_iterator: typing.Iterator, + phase: PhaseType, + num_iters: int, + completed_steps: int, + consumed_samples: int, + consumed_tokens: int, + begin_iter: int = 0, + ) -> dict[str, float | int]: + full_phase_name = f"{phase.value}_{self._name}" + safe_barrier(self._distributed.world_group, f"{full_phase_name} begin") + begin_time = time.perf_counter() + total_losses = {loss_def.name: 0.0 for loss_def in self._loss_defs} + for iter_ in range(num_iters): + iter_losses, _, _ = self._runner.run_step(data_iterator, self._schedule, iteration=begin_iter + iter_) + for name, value in iter_losses.items(): + total_losses[name] += value + self._run.save_logged_tensors(f"{full_phase_name}_{completed_steps}_{iter_}") + + safe_barrier( + self._distributed.world_group, + f"{full_phase_name} end", + ) + end_time = time.perf_counter() + time_per_iteration = (end_time - begin_time) / num_iters + model_tflops, hardware_tflops = self._get_tflops_func(phase, time_per_iteration) + # TODO add other relevant eval metrics + metrics = { + "train_iters": self._trainer_config.training.train_iters, + "batch_size": self._trainer_config.batch.batch_size, + "iteration": completed_steps, + **{name: (value / num_iters) for name, value in total_losses.items()}, + "consumed_samples": consumed_samples, + "consumed_tokens": consumed_tokens, + "step_time_ms": time_per_iteration * 1000, + "model_tflops": model_tflops, + "hardware_tflops": hardware_tflops, + "tokens_per_sec_per_gpu": ( + (self._trainer_config.batch.sequence_length * self._trainer_config.batch.batch_size) + / self._trainer_config.model.distributed.world_size + / time_per_iteration + ), + **get_memory_usage_mib(), + } + + return metrics + + def _get_completed_evaluation_steps(self, completed_steps: int) -> int: + # Number of evaluations steps performed before the current step + return self._eval_config.get_iteration_count(completed_steps - 1) + + def _get_data_iterator( + self, completed_steps: int = 0, prefetch_factor: int | None = None + ) -> typing.Iterator[typing.Any]: + return self._data.get_iterator( + self._trainer_config.batch, + self._name, + consumed_samples=completed_steps * self._trainer_config.batch.batch_size, + num_workers=self._trainer_config.training.num_workers, + prefetch_factor=prefetch_factor, + ) + + +class EvaluationHarness[ConfigType: EvaluationHarnessConfig](Evaluation[ConfigType]): + config_class: typing.ClassVar[type[EvaluationHarnessConfig]] = EvaluationHarnessConfig + + def __init__( + self, + name: str, + eval_config: EvaluationHarnessConfig, + trainer_config: TrainerConfig, + get_tflops_func: callable, + ): + self._name = name + self._eval_config = eval_config + self._trainer_config = trainer_config + self._get_tflops_func = get_tflops_func + + def setup( + self, + distributed: Distributed, + run: Run, + multi_stage: FastLLMModel, + runner: ScheduleRunner, + data: Data, + ) -> None: + super().setup(distributed, run, multi_stage, runner, data) + + # TODO: pass mini and batch size of the same length for lm_eval not to crash during training + # or implement min batch sequential awareness in fas_llm_wrapper for lm_eval + self._hf_model = ( + self._multi_stage.config_class.get_huggingface_model_for_causal_lm_class().from_fast_llm_model_in_training( + self._multi_stage, self._trainer_config, self._runner + ) + ) + + # For reporting purposes, just to indicate it is from Fast-LLM + # as lm_eval.simple_evaluate will take it for results['config']['model'] + self._hf_model.config.name_or_path = type(self._hf_model).__name__ + + self._flm_wrapper = FastLLMWrapper( + model=self._hf_model, + tokenizer=self._data.tokenizer.tokenizer, + truncation=self._eval_config.truncation, + logits_cache=self._eval_config.logits_cache, + add_bos_token=self._eval_config.add_bos_token, + prefix_token_id=self._eval_config.prefix_token_id, + ) + self._is_setup = True + + def run( + self, + done: bool, + completed_steps: int, + consumed_samples: int, + consumed_tokens: int, + ) -> tuple[dict[str, any], str | None]: + assert self._is_setup + if not (done or self._eval_config.enabled(completed_steps)): + return {}, None + + # completed_steps is added to output_path like output_path/runs/run_index/completed_steps/ + + if self._run.is_main_rank: + args, simple_eval_kwargs = prepare_lm_eval_simple_eval_params( + self._eval_config.cli_args, completed_steps, self._run.index + ) + simple_eval_kwargs["model"] = self._flm_wrapper + + # Needed for reporting as batch_size is set from args not lm for reporting in evaluate + simple_eval_kwargs["batch_size"] = self._flm_wrapper.batch_size + simple_eval_kwargs["max_batch_size"] = self._flm_wrapper.max_batch_size + + # As of lm_eval commit 758c5ed891b1ca48acd8d3a0d309a827215796b7 + # Expected to be a string even if empty and not None in simple_evaluate + simple_eval_kwargs["model_args"] = "" + + results = lm_eval_simple_evaluate(**simple_eval_kwargs) + self._flm_wrapper.stop_workers() + + # Evaluation_tracker save expects model to be either string, but if model is passed + # LM wrapper needs to be deep copyable and json serializable + simple_eval_kwargs["evaluation_tracker"].general_config_tracker.model_source = ( + self._hf_model.config.name_or_path + ) + + if results is not None: + process_lm_eval_results( + args, + results, + simple_eval_kwargs["evaluation_tracker"], + completed_steps, + consumed_samples, + consumed_tokens, + ) + else: + self._flm_wrapper.worker_model_invoke() + + # TODO: do we need it here as self._flm_wrapper.stop_workers() and self._flm_wrapper.worker_model_invoke() + # already have barrier + safe_barrier(self._distributed.world_group, f"Evaluation Harness Run end") + + # lm_eval logs to disc, wandb and prints to screen itself + return {}, None + + def get_dataset_samples(self) -> tuple[str, int] | None: + return None + + +# NOTE: This is not a standalone runnable; it's a submodule of Trainer used for code encapsulation. +class Evaluator: + _is_setup: bool = False + + def __init__( + self, + config: TrainerConfig, + get_tflops_func: callable, + ): + self._config = config + self._evaluations = [ + eval_config.get_evaluation_class().build( + name=name, + eval_config=eval_config, + trainer_config=config, + get_tflops_func=get_tflops_func, + ) + for name, eval_config in config.training.evaluations.items() + ] + + def setup( + self, + distributed: Distributed, + run: Run, + multi_stage: FastLLMModel, + runner: ScheduleRunner, + data: Data, + wandb: Wandb, + ) -> None: + self._wandb = wandb + for evaluation in self._evaluations: + evaluation.setup(distributed, run, multi_stage, runner, data) + self._is_setup = True + + def get_datasets_samples(self) -> dict[str:int]: + return { + el[0]: el[1] + for el in (evaluation.get_dataset_samples() for evaluation in self._evaluations) + if el is not None + } + + def run( + self, + metrics: dict[str:any], + done: bool, + completed_steps: int, + consumed_samples: int, + consumed_tokens: int, + ): + assert self._is_setup + formatted_metrics = [] + for evaluation in self._evaluations: + this_metrics, this_formatted_metrics = evaluation.run( + done, completed_steps, consumed_samples, consumed_tokens + ) + if len(this_metrics) == 0: + continue + for k, v in this_metrics.items(): + metrics[k] = v + if this_formatted_metrics is not None: + formatted_metrics.append(this_formatted_metrics) + + if len(formatted_metrics) > 0: + formatted_metrics = "\n".join(formatted_metrics) + log_main_rank(formatted_metrics) + if self._config.training.wandb.alert.enabled(completed_steps): + self._wandb.alert("Validation results", formatted_metrics, "INFO") diff --git a/fast_llm/engine/training/lm_eval/evaluator.py b/fast_llm/engine/training/lm_eval/evaluator.py new file mode 100644 index 00000000..d1312b28 --- /dev/null +++ b/fast_llm/engine/training/lm_eval/evaluator.py @@ -0,0 +1,765 @@ +import itertools +import json +import logging +import random +import time +from collections import defaultdict +from typing import TYPE_CHECKING, List, Optional, Union + +import numpy as np +import torch + +import lm_eval.api.metrics +import lm_eval.api.registry +import lm_eval.api.task +import lm_eval.models +from lm_eval.caching.cache import delete_cache +from lm_eval.evaluator_utils import ( + consolidate_group_results, + consolidate_results, + get_sample_size, + get_subtask_list, + get_task_list, + prepare_print_tasks, + print_writeout, + run_task_tests, +) +from lm_eval.loggers import EvaluationTracker +from lm_eval.loggers.utils import add_env_info, add_tokenizer_info, get_git_commit_hash +from lm_eval.tasks import TaskManager, get_task_dict +from lm_eval.utils import ( + handle_non_serializable, + hash_string, + positional_deprecated, + setup_logging, + simple_parse_args_string, +) + + +if TYPE_CHECKING: + from lm_eval.api.model import LM + from lm_eval.api.task import Task + +eval_logger = logging.getLogger(__name__) + + +@positional_deprecated +def simple_evaluate( + model, + model_args: Optional[Union[str, dict]] = None, + tasks: Optional[List[Union[str, dict, object]]] = None, + num_fewshot: Optional[int] = None, + batch_size: Optional[Union[int, str]] = None, + max_batch_size: Optional[int] = None, + device: Optional[str] = None, + use_cache: Optional[str] = None, + cache_requests: bool = False, + rewrite_requests_cache: bool = False, + delete_requests_cache: bool = False, + limit: Optional[Union[int, float]] = None, + samples: Optional[dict] = None, + bootstrap_iters: int = 100000, + check_integrity: bool = False, + write_out: bool = False, + log_samples: bool = True, + evaluation_tracker: Optional[EvaluationTracker] = None, + system_instruction: Optional[str] = None, + apply_chat_template: Union[bool, str] = False, + fewshot_as_multiturn: bool = False, + gen_kwargs: Union[str, dict, None] = None, + task_manager: Optional[TaskManager] = None, + verbosity=None, + predict_only: bool = False, + random_seed: int = 0, + numpy_random_seed: int = 1234, + torch_random_seed: int = 1234, + fewshot_random_seed: int = 1234, + confirm_run_unsafe_code: bool = False, + metadata: Optional[dict] = None, +): + """Instantiate and evaluate a model on a list of tasks. + + :param model: Union[str, LM] + Name of model or LM object, see lm_eval.models.get_model + :param model_args: Optional[str, dict] + String or dict arguments for each model class, see LM.create_from_arg_string and LM.create_from_arg_object. + Ignored if `model` argument is a LM object. + :param tasks: list[Union[str, dict, Task]] + List of task names or Task objects. Task objects will be taken to have name task.EVAL_HARNESS_NAME if defined and type(task).__name__ otherwise. + :param num_fewshot: int + Number of examples in few-shot context + :param batch_size: int or str, optional + Batch size for model + :param max_batch_size: int, optional + Maximal batch size to try with automatic batch size detection + :param device: str, optional + PyTorch device (e.g. "cpu" or "cuda:0") for running models + :param use_cache: str, optional + A path to a sqlite db file for caching model responses. `None` if not caching. + :param cache_requests: bool, optional + Speed up evaluation by caching the building of dataset requests. `None` if not caching. + :param rewrite_requests_cache: bool, optional + Rewrites all the request cache if set to `True`. `None` if not desired. + :param delete_requests_cache: bool, optional + Deletes all the request cache if set to `True`. `None` if not desired. + :param limit: int or float, optional + Limit the number of examples per task (only use this for testing), If <1, limit is a percentage of the total number of examples. + :param samples: dictionary, optional + Dictionary indicating which examples should be tested in each task, e.g., {"mmlu_astronomy":[0,3,6],"mmlu_anatomy":[1,4,7,10]}. + :param bootstrap_iters: + Number of iterations for bootstrap statistics, used when calculating stderrs. set to 0 for no stderr calculations to be performed. + :param check_integrity: bool + Whether to run the relevant part of the test suite for the tasks + :param write_out: bool + If True, write out an example document and model input for checking task integrity + :param log_samples: bool + If True, write out all model outputs and documents for per-sample measurement and post-hoc analysis + :param system_instruction: str + System instruction to be applied to the prompt + :param apply_chat_template: Union[bool, str] + Specifies whether to apply a chat template to the prompt. + - If set to True, the default chat template is applied. + - If set to a string, applies the specified chat template by name. + Defaults to False (no chat template applied). + :param fewshot_as_multiturn: bool + Whether to provide the fewshot examples as a multiturn conversation or a single user turn. + :param gen_kwargs: dict or comma-separated string + Arguments for model generation + Ignored for all tasks with loglikelihood output_type + :param verbosity: str + Verbosity level for logging + :param predict_only: bool + If true only model outputs will be generated and returned. Metrics will not be evaluated + :param random_seed: int + Random seed for python's random module. If set to None, the seed will not be set. + :param numpy_random_seed: int + Random seed for numpy. If set to None, the seed will not be set. + :param torch_random_seed: int + Random seed for torch. If set to None, the seed will not be set. + :param fewshot_random_seed: int + Random seed for fewshot sampler random generator. If set to None, the seed of generator will be set to None. + :param metadata: dict + Additional metadata to be added to the task manager. Will get passed to the download function of the task. + + return + Dictionary of results + """ + if verbosity is not None: + setup_logging(verbosity=verbosity) + start_date = time.time() + + if limit is not None and samples is not None: + raise ValueError( + "Either 'limit' or 'samples' must be None, but both are not None." + ) + + if isinstance(model_args, str) and ( + "instruct" in model_args and not apply_chat_template + ): + eval_logger.warning( + "Instruct model detected, but chat template not applied. Recommend setting `apply_chat_template` (optionally `fewshot_as_multiturn`)." + ) + + if delete_requests_cache: + eval_logger.info("Deleting requests cache...") + delete_cache() + + seed_message = [] + if random_seed is not None: + # See https://github.com/EleutherAI/lm-evaluation-harness/pull/1412 + seed_message.append(f"Setting random seed to {random_seed}") + random.seed(random_seed) + + if numpy_random_seed is not None: + seed_message.append(f"Setting numpy seed to {numpy_random_seed}") + np.random.seed(numpy_random_seed) + + if torch_random_seed is not None: + seed_message.append(f"Setting torch manual seed to {torch_random_seed}") + torch.manual_seed(torch_random_seed) + + if fewshot_random_seed is not None: + seed_message.append(f"Setting fewshot manual seed to {fewshot_random_seed}") + + if seed_message: + eval_logger.info(" | ".join(seed_message)) + + if tasks is None: + tasks = [] + if len(tasks) == 0: + raise ValueError( + "No tasks specified, or no tasks found. Please verify the task names." + ) + + if gen_kwargs is not None: + if isinstance(gen_kwargs, str): + gen_kwargs = simple_parse_args_string(gen_kwargs) + eval_logger.warning( + f"generation_kwargs: {gen_kwargs} specified through cli, these settings will update set parameters in yaml tasks. " + "Ensure 'do_sample=True' for non-greedy decoding!" + ) + if not gen_kwargs: + gen_kwargs = None + + if isinstance(model, str): + if model_args is None: + eval_logger.warning("model_args not specified. Using defaults.") + model_args = "" + + if isinstance(model_args, dict): + eval_logger.info( + f"Initializing {model} model, with arguments: {model_args}" + ) + lm = lm_eval.api.registry.get_model(model).create_from_arg_obj( + model_args, + { + "batch_size": batch_size, + "max_batch_size": max_batch_size, + "device": device, + }, + ) + + else: + eval_logger.info( + f"Initializing {model} model, with arguments: {simple_parse_args_string(model_args)}" + ) + lm = lm_eval.api.registry.get_model(model).create_from_arg_string( + model_args, + { + "batch_size": batch_size, + "max_batch_size": max_batch_size, + "device": device, + }, + ) + else: + if not isinstance(model, lm_eval.api.model.LM): + raise TypeError( + f"The value of `model` passed to simple_evaluate() was of type {type(model)}, but is required to be a subclass of lm_eval.api.model.LM . This may be because you are passing an initialized Hugging Face PreTrainedModel without having wrapped it in `lm_eval.models.huggingface.HFLM(pretrained=my_model)` first." + ) + eval_logger.info("Using pre-initialized model") + lm = model + + if use_cache is not None: + eval_logger.info(f"Using cache at {use_cache + '_rank' + str(lm.rank) + '.db'}") + lm = lm_eval.api.model.CachingLM( + lm, + use_cache + # each rank receives a different cache db. + # necessary to avoid multiple writes to cache at once + + "_rank" + + str(lm.rank) + + ".db", + ) + + if task_manager is None: + metadata = ( + simple_parse_args_string(model_args) + if isinstance(model_args, str) + else model_args + if isinstance(model_args, dict) + else {} + ) | (metadata or {}) + task_manager = TaskManager(metadata=metadata) + + task_dict = get_task_dict( + tasks, + task_manager, + ) + + # helper function to recursively apply config overrides to leaf subtasks, skipping their constituent groups. + # (setting of num_fewshot ; bypassing metric calculation ; setting fewshot seed) + def _adjust_config(task_dict): + adjusted_task_dict = {} + for task_name, task_obj in task_dict.items(): + if isinstance(task_obj, dict): + adjusted_task_dict = { + **adjusted_task_dict, + **{task_name: _adjust_config(task_obj)}, + } + + else: + if task_obj.get_config("output_type") == "generate_until": + if gen_kwargs is not None: + task_obj.set_config( + key="generation_kwargs", value=gen_kwargs, update=True + ) + eval_logger.info( + f"{task_obj.config.task}: Using gen_kwargs: {task_obj.config.generation_kwargs}" + ) + + if predict_only: + eval_logger.info( + f"Processing {task_name} in output-only mode. Metrics will not be calculated!" + ) + # we have to change the class properties post-hoc. This is pretty hacky. + task_obj.override_metric(metric_name="bypass") + + # override tasks' fewshot values to the provided num_fewshot arg value + # except if tasks have it set to 0 manually in their configs--then we should never overwrite that + if num_fewshot is not None: + if (default_num_fewshot := task_obj.get_config("num_fewshot")) == 0: + eval_logger.info( + f"num_fewshot has been set to 0 for {task_name} in its config. Manual configuration will be ignored." + ) + else: + eval_logger.warning( + f"Overwriting default num_fewshot of {task_name} from {default_num_fewshot} to {num_fewshot}" + ) + task_obj.set_config(key="num_fewshot", value=num_fewshot) + else: + # if num_fewshot not provided, and the task does not define a default one, default to 0 + if ( + default_num_fewshot := task_obj.get_config("num_fewshot") + ) is None: + task_obj.set_config(key="num_fewshot", value=0) + # fewshot_random_seed set for tasks, even with a default num_fewshot (e.g. in the YAML file) + task_obj.set_fewshot_seed(seed=fewshot_random_seed) + + adjusted_task_dict[task_name] = task_obj + + return adjusted_task_dict + + task_dict = _adjust_config(task_dict) + + if check_integrity: + run_task_tests(task_list=tasks) + + if evaluation_tracker is not None: + evaluation_tracker.general_config_tracker.log_experiment_args( + model_source=model, + model_args=model_args, + system_instruction=system_instruction, + chat_template=lm.chat_template(apply_chat_template) + if apply_chat_template + else None, + fewshot_as_multiturn=fewshot_as_multiturn, + ) + + results = evaluate( + lm=lm, + task_dict=task_dict, + limit=limit, + samples=samples, + cache_requests=cache_requests, + rewrite_requests_cache=rewrite_requests_cache, + bootstrap_iters=bootstrap_iters, + write_out=write_out, + log_samples=True if predict_only else log_samples, + system_instruction=system_instruction, + apply_chat_template=apply_chat_template, + fewshot_as_multiturn=fewshot_as_multiturn, + verbosity=verbosity, + confirm_run_unsafe_code=confirm_run_unsafe_code, + ) + if verbosity is not None: + setup_logging(verbosity=verbosity) + + if lm.rank == 0: + if isinstance(model, str): + model_name = model + elif hasattr(model, "config") and hasattr(model.config, "_name_or_path"): + model_name = model.config._name_or_path + else: + model_name = type(model).__name__ + + # add info about the model and few shot config + results["config"] = { + "model": model_name, + "model_args": model_args, + } + # add more detailed model info if available + if isinstance(lm, lm_eval.models.huggingface.HFLM): + results["config"].update(lm.get_model_info()) + # add info about execution + results["config"].update( + { + "batch_size": batch_size, + "batch_sizes": ( + list(lm.batch_sizes.values()) if hasattr(lm, "batch_sizes") else [] + ), + "device": device, + "use_cache": use_cache, + "limit": limit, + "bootstrap_iters": bootstrap_iters, + "gen_kwargs": gen_kwargs, + "random_seed": random_seed, + "numpy_seed": numpy_random_seed, + "torch_seed": torch_random_seed, + "fewshot_seed": fewshot_random_seed, + } + ) + results["git_hash"] = get_git_commit_hash() + results["date"] = start_date + add_env_info(results) # additional environment info to results + add_tokenizer_info(results, lm) # additional info about tokenizer + return results + else: + return None + + +@positional_deprecated +def evaluate( + lm: "LM", + task_dict, + limit: Optional[int] = None, + samples: Optional[dict] = None, + cache_requests: bool = False, + rewrite_requests_cache: bool = False, + bootstrap_iters: Optional[int] = 100000, + write_out: bool = False, + log_samples: bool = True, + system_instruction: Optional[str] = None, + apply_chat_template: Union[bool, str] = False, + fewshot_as_multiturn: bool = False, + verbosity: str = "INFO", + confirm_run_unsafe_code: bool = False, +): + """Instantiate and evaluate a model on a list of tasks. + + :param lm: obj + Language Model + :param task_dict: dict[str, Task] + Dictionary of tasks. Tasks will be taken to have name type(task).config.task . + :param limit: int, optional + Limit the number of examples per task (only use this for testing) + :param samples: dictionary, optional + Dictionary indicating which examples should be tested in each task, e.g., {"mmlu_astronomy":[0,3,6],"mmlu_anatomy":[1,4,7,10]}. + :param cache_requests: bool, optional + Speed up evaluation by caching the building of dataset requests. + :param rewrite_requests_cache: bool, optional + Rewrites all the request cache if set to `True`. + :param bootstrap_iters: + Number of iterations for bootstrap statistics, used when calculating stderr. Set to 0 for skipping all stderr calculations. + :param write_out: bool + If True, write out an example document and model input for checking task integrity + :param log_samples: bool + If True, write out all model outputs and documents for per-sample measurement and post-hoc analysis + :param system_instruction: str + System instruction to be applied to the prompt + :param apply_chat_template: Union[bool, str] + Specifies whether to apply a chat template to the prompt. + - If set to True, the default chat template is applied. + - If set to a string, applies the specified chat template by name. + Defaults to False (no chat template applied). + :param fewshot_as_multiturn: bool + Whether to provide the fewshot examples as a multiturn conversation or a single user turn. + :param verbosity: str + Verbosity level for logging + :param confirm_run_unsafe_code: bool + Whether to confirm running tasks marked as unsafe. + :return + Dictionary of results + """ + + if limit is not None and samples is not None: + raise ValueError( + "Either 'limit' or 'samples' must be None, but both are not None." + ) + if samples is not None: + eval_logger.info(f"Evaluating examples for tasks {list(samples.keys())}") + if apply_chat_template: + eval_logger.warning( + "Chat template formatting change affects loglikelihood and multiple-choice tasks. See docs/chat-template-readme.md for details." + ) + # tracks all Instances/requests a model must generate output on. + requests = defaultdict(list) + # stores the amount to pad out reqs per req. type so that + # number of fwd passes per distributed rank is equal + padding_requests = defaultdict(int) + + # get lists of group hierarchy and each type of request + eval_tasks = get_task_list(task_dict) + if not log_samples: + if not all( + "bypass" not in getattr(task_output.task, "_metric_fn_list", {}).keys() + for task_output in eval_tasks + ): + raise ValueError("log_samples must be True for 'bypass' metric-only tasks") + + # validation checks: + # 1.are we running multimodal task <-> non-multimodal model class, or vice-versa. + # 2.are we running code that is marked as unsafe. + incompatible_tasks = [] + for task_output in eval_tasks: + task: Task = task_output.task + + if getattr(lm, "MULTIMODAL", False) != getattr(task, "MULTIMODAL", False): + incompatible_tasks.append(task_output.task_name) + elif getattr(task, "UNSAFE_CODE", False) and not confirm_run_unsafe_code: + raise ValueError( + f"Attempted to run task: {task_output.task_name} which is marked as unsafe. Set confirm_run_unsafe_code=True to run this task." + ) + if len(incompatible_tasks) > 0: + if not getattr(lm, "MULTIMODAL", False): + raise ValueError( + f"Attempted to run tasks: {incompatible_tasks} which require multimodal input, but the selected model type does not currently implement this. Multimodal support is currently restricted to the ['hf-multimodal', 'vllm-vlm'] model type." + ) + else: + raise ValueError( + f"Attempted to run tasks: {incompatible_tasks} which are text-only, but used a model type which only currently supports multimodal tasks." + ) + # end validation check + + # Cache the limit arg. + limit_arg = limit + limits = [] + for task_output in eval_tasks: + task: Task = task_output.task + + limit = get_sample_size(task, limit_arg) + limits.append(limit) + task.build_all_requests( + limit=limit, + samples=samples.get(task_output.task_name, None) + if samples is not None + else samples, + rank=lm.rank, + world_size=lm.world_size, + cache_requests=cache_requests, + rewrite_requests_cache=rewrite_requests_cache, + system_instruction=system_instruction, + apply_chat_template=bool(apply_chat_template), + fewshot_as_multiturn=fewshot_as_multiturn, + chat_template=getattr(lm, "apply_chat_template") + if apply_chat_template + else None, + tokenizer_name=getattr(lm, "tokenizer_name", "") + if apply_chat_template + else "", + ) + eval_logger.debug( + f"Task: {task_output.task_name}; number of requests on this rank: {len(task.instances)}" + ) + if write_out: + print_writeout(task) + # aggregate Instances by LM method requested to get output. + for instance in task.instances: + reqtype = instance.request_type + requests[reqtype].append(instance) + + if lm.world_size > 1: + instances_rnk = torch.tensor(len(task._instances), device=lm.device) + gathered_item = ( + lm.accelerator.gather(instances_rnk).cpu().detach().numpy().tolist() + ) + # "multiple_choice" task types dispatch (several) "loglikelihood" request types + reqtype = ( + "loglikelihood" + if task.OUTPUT_TYPE == "multiple_choice" + else task.OUTPUT_TYPE + ) + # compute number of pseudo-batches to pad with (FSDP/DDP require even batches among ranks) + numpad = max(gathered_item) - gathered_item[lm.rank] + # todo: may not account for padding in cases like SquadV2 which has multiple req types + padding_requests[reqtype] += numpad + + ### Run LM on inputs, get all outputs ### + # execute each type of request + for reqtype, reqs in requests.items(): + eval_logger.info(f"Running {reqtype} requests") + # create `K` copies of each request `req` based off `K = req.repeats` + cloned_reqs = [] + for req in reqs: + cloned_reqs.extend([req] * req.repeats) + + if (lm.world_size > 1) and (padding_requests[reqtype] > 0): + for _ in range(padding_requests[reqtype]): + cloned_reqs.extend([req] * req.repeats) + + # run requests through model + resps = getattr(lm, reqtype)(cloned_reqs) + + # put responses from model into a list of length K for each request. + for x, req in zip(resps, cloned_reqs): + req.resps.append(x) + + if lm.world_size > 1: + lm.accelerator.wait_for_everyone() + + RANK = lm.rank + WORLD_SIZE = lm.world_size + ### Postprocess outputs ### + # TODO: del model here, maybe (idea: allow user to specify device of e.g. reward model separately) + for task_output, limit in zip(eval_tasks, limits): + task = task_output.task + task.apply_filters() + + ### Collect values of metrics on all datapoints ### + # # unpack results and sort back in order and return control to Task + # TODO: make it possible to use a different metric per filter + # Pre-process task.instances to group by doc_id + instances_by_doc_id = defaultdict(list) + for instance in task.instances: + instances_by_doc_id[instance.doc_id].append(instance) + # Sort instances within each group + for instances in instances_by_doc_id.values(): + instances.sort(key=lambda x: x.idx) + # iterate over different filters used + for filter_key in task.instances[0].filtered_resps.keys(): + indices = ( + samples.get(task_output.task_name, None) + if samples is not None + else None + ) + doc_iterator = task.doc_iterator( + rank=RANK, + limit=limit, + world_size=WORLD_SIZE, + samples=indices, + ) + for doc_id, doc in doc_iterator: + if indices: + doc_id_true = indices[doc_id] + else: + doc_id_true = doc_id + requests = instances_by_doc_id[doc_id] + metrics = task.process_results( + doc, [req.filtered_resps[filter_key] for req in requests] + ) + if log_samples: + target = task.doc_to_target(doc) + example = { + "doc_id": doc_id_true, + "doc": doc, + "target": target, + "arguments": [req.args for req in requests], + "resps": [req.resps for req in requests], + "filtered_resps": [ + req.filtered_resps[filter_key] for req in requests + ], + "filter": filter_key, + "metrics": list(metrics.keys()), + "doc_hash": hash_string( + json.dumps( + requests[0].doc, + indent=2, + default=handle_non_serializable, + ensure_ascii=False, + ) + ), + "prompt_hash": hash_string(requests[0].arguments[0]), + "target_hash": hash_string(str(target)), + } + example.update(metrics) + task_output.logged_samples.append(example) + for metric, value in metrics.items(): + task_output.sample_metrics[(metric, filter_key)].append(value) + + if WORLD_SIZE > 1: + # if multigpu, then gather data across all ranks to rank 0 + # first gather logged samples across all ranks + for task_output in eval_tasks: + if log_samples: + # for task_name, task_samples in list(samples.items()): + full_samples = [None] * WORLD_SIZE if RANK == 0 else None + torch.distributed.gather_object( + obj=task_output.logged_samples, + object_gather_list=full_samples, + dst=0, + ) + + if RANK == 0: + task_output.logged_samples = list( + itertools.chain.from_iterable(full_samples) + ) + + # then collect metrics across all ranks + for metrics in task_output.sample_metrics: + metric_list = [None] * WORLD_SIZE if RANK == 0 else None + torch.distributed.gather_object( + obj=task_output.sample_metrics[metrics], + object_gather_list=metric_list, + dst=0, + ) + if RANK == 0: + task_output.sample_metrics[metrics] = list( + itertools.chain.from_iterable(metric_list) + ) + + if RANK == 0: + ### Aggregate results over all datapoints ### + # aggregate results ; run bootstrap CIs + for task_output in eval_tasks: + task_output.calculate_aggregate_metric(bootstrap_iters=bootstrap_iters) + ( + results, + samples, + configs, + versions, + num_fewshot, + higher_is_better, + ) = consolidate_results(eval_tasks) + + ### Calculate group metrics ### + if bool(results): + results, versions, show_group_table, *_ = consolidate_group_results( + results, versions, task_dict + ) + + results_agg, group_agg = prepare_print_tasks(task_dict, results) + subtask_list = get_subtask_list(task_dict) + + # collect all higher_is_better values for metrics + # in the group's subtasks. + # TODO: clean this up ; unify with the below metric_list loop? + _higher_is_better = {} + for group, task_list in subtask_list.items(): + if ( + len(task_list) != 0 + ): # subtask list will list "task_name": [] for solo tasks + for task in task_list: + for m, h in higher_is_better[task].items(): + if m not in _higher_is_better.keys(): + _higher_is_better[m] = h + + if ( + m in _higher_is_better + and _higher_is_better[m] is not None + and _higher_is_better[m] != h + ): + eval_logger.warning( + f"Higher_is_better values for metric {m} in group {group} are not consistent. Defaulting to None." + ) + _higher_is_better[m] = None + higher_is_better[group] = _higher_is_better + + results_dict = { + "results": dict(results_agg.items()), + **( + {"groups": dict(group_agg.items())} + if (bool(group_agg) & show_group_table) + else {} + ), + "group_subtasks": dict(reversed(subtask_list.items())), + "configs": dict(sorted(configs.items())), + "versions": dict(sorted(versions.items())), + "n-shot": dict(sorted(num_fewshot.items())), + "higher_is_better": dict(sorted(higher_is_better.items())), + "n-samples": { + task_output.task_name: { + "original": len(task_output.task.eval_docs), + "effective": min( + limit if limit else len(task_output.task.eval_docs), + len(task_output.task.eval_docs), + ), + } + for task_output, limit in zip(eval_tasks, limits) + }, + } + if log_samples: + results_dict["samples"] = dict(samples) + + return results_dict + + else: + return None + + +def request_caching_arg_to_dict(cache_requests: str) -> dict: + request_caching_args = { + "cache_requests": cache_requests in {"true", "refresh"}, + "rewrite_requests_cache": cache_requests == "refresh", + "delete_requests_cache": cache_requests == "delete", + } + + return request_caching_args diff --git a/fast_llm/engine/training/lm_eval/fast_llm_wrapper.py b/fast_llm/engine/training/lm_eval/fast_llm_wrapper.py new file mode 100644 index 00000000..43204206 --- /dev/null +++ b/fast_llm/engine/training/lm_eval/fast_llm_wrapper.py @@ -0,0 +1,896 @@ +import logging +import pathlib +import copy +import jinja2 + +from typing import Optional, Union, Any, List, Tuple, Dict + +import transformers +from tqdm.auto import tqdm +import torch +import torch.nn.functional as F +import torch.distributed as dist + + +# make lazy +from lm_eval import utils +from lm_eval.api.model import TemplateLM +from lm_eval.api.instance import Instance +from lm_eval.api.model import CacheHook +from lm_eval.models.utils import ( + Collator, + configure_pad_token, + handle_stop_sequences, + pad_and_concat, + stop_sequences_criteria, +) + +from fast_llm.core.distributed import safe_barrier +from fast_llm.engine.checkpoint.config import CheckpointLoadConfig +from fast_llm.engine.multi_stage.config import FastLLMModelConfig +from fast_llm.models.auto import model_registry +from fast_llm.engine.inference.huggingface import HuggingfaceBaseModelForCausalLM +from fast_llm.engine.distributed.distributed import Distributed +from fast_llm.engine.distributed.config import DistributedConfig + + +eval_logger = logging.getLogger(__name__) + + +# move to fast_llm +class FastLLMWrapper(TemplateLM): + _DEFAULT_MAX_LENGTH = 2048 + + def __init__( + self, + model: HuggingfaceBaseModelForCausalLM, + tokenizer: transformers.PreTrainedTokenizer | transformers.PreTrainedTokenizerFast, + truncation: Optional[bool] = False, + logits_cache: bool = True, + add_bos_token: Optional[bool] = False, + prefix_token_id: Optional[int] = None, + ): + super().__init__() + # This is for lm_eval sake, we always run lm_eval on one main rank + self._rank = 0 + self._world_size = 1 + + self._distributed: Distributed = model._inference_runner._fast_llm_model.distributed + dist_config: DistributedConfig = self._distributed.config + # get batch_data_parallel group leaders + if dist_config.sequence_data_rank == 0 and dist_config.pipeline_rank == 0 and dist_config.tensor_rank == 0: + self.group = self._distributed.batch_data_group + else: + self.group = dist.GroupMember.NON_GROUP_MEMBER + + # TODO: clean code which does not used parts from HFLM + backend = "causal" + revision = "main" + gguf_file = None + delta = None + peft = None + + # set some inputs which are expected in HFLM but are set by our model config + # TODO: do _batch_config public read only property + max_length = model._inference_runner._batch_config.sequence_length + # batch_size = model._batch_config.micro_batch_size + batch_size = model._inference_runner._batch_config.batch_size + max_batch_size = batch_size + + self.backend = backend + + # set tokenizer object + assert isinstance(tokenizer, transformers.PreTrainedTokenizer) or isinstance( + tokenizer, transformers.PreTrainedTokenizerFast + ) + self.tokenizer = tokenizer + + # initialize model fields + self._model = model + self._device = self._model.device + self._config = self._model.config + + # access self._model through self.model property outside this method + if isinstance(self.model, torch.nn.Module): + self.model.eval() + self.model.tie_weights() + + self.truncation = truncation + self.logits_cache = logits_cache + self.vocab_size = self.tokenizer.vocab_size + # select (or create) a pad token to use + self.tokenizer = configure_pad_token(self.tokenizer, model_config=self.config) + + self.add_bos_token = add_bos_token + # TODO: do we support gemma models? + if "gemma" in getattr(self.config, "model_type", ""): + self.add_bos_token = True + eval_logger.info( + f"Model type is '{self.config.model_type}', part of the Gemma family--a BOS" + " token will be used as Gemma underperforms without it." + ) + + self._max_length = max_length + self.pretrained = model + self.delta = delta + self.peft = peft + self.revision = revision + self.batch_schedule = 1 + self.batch_sizes = {} + self.max_batch_size = max_batch_size + + if str(batch_size).startswith("auto"): + batch_size = batch_size.split(":") + self.batch_size_per_gpu = batch_size[0] + self.batch_schedule = float(batch_size[1]) if len(batch_size) > 1 else 1 + else: + self.batch_size_per_gpu = int(batch_size) + + self.custom_prefix_token_id = prefix_token_id + if prefix_token_id is not None: + eval_logger.info(f"Loglikelihood prefix token id used in evaluation: {self.prefix_token_id}") + + def _model_invoke( + self, inputs, attn_mask, labels, max_length, stop, generate: bool, continue_generate: bool, **generation_kwargs + ): + if self.group is None or (world_size := self.group.size()) == 1: + # Must not be called with continue_generate false on one process + assert continue_generate + return self._model_invoke_inner(inputs, attn_mask, labels, max_length, stop, generate, **generation_kwargs) + + rank = self.group.rank() + assert rank == 0 + + if continue_generate: + assert inputs is not None + if generate: + assert max_length is not None and stop is not None + + step = len(inputs) // world_size + + inputs = [inputs[i * step : (i + 1) * step] for i in range(world_size)] + attn_mask = [ + attn_mask[i * step : (i + 1) * step] if attn_mask is not None else None for i in range(world_size) + ] + labels = [labels[i * step : (i + 1) * step] if labels is not None else None for i in range(world_size)] + + scatter_list = [ + [inputs[i], attn_mask[i], labels[i], max_length, stop, generate, continue_generate, generation_kwargs] + for i in range(world_size) + ] + else: + scatter_list = [[None, None, None, None, None, None, False, None] for _ in range(world_size)] + + obj_list = [None] + dist.scatter_object_list( + obj_list, + scatter_list, + # TODO: figure out how to get proper global rank as Fast-llm groups crash here with + # Group is not registered, please create group with torch.distributed.new_group API + # src=dist.get_global_rank(self.group, 0), + src=0, + group=self.group, + ) + inputs, attn_mask, labels, max_length, stop, generate, continue_generate, generation_kwargs = tuple( + obj_list[0] + ) + + if continue_generate == False: + return + + res = self._model_invoke_inner(inputs, attn_mask, labels, max_length, stop, generate, **generation_kwargs) + + gather_list = [None] * world_size + dist.gather_object( + res, + gather_list, + # TODO: make proper rank mapping + # dst=dist.get_global_rank(self.group, 0), + dst=0, + group=self.group, + ) + + return gather_list + + def worker_model_invoke(self): + assert self.group is not None + print(self.group) + # if isinstance(self.group, dist.ProcessGroup): + if not isinstance(self.group, int): + assert self.group.size() > 1 and self.group.rank() != 0 + # on worker ranks the function need to wait to be called multiple times + while True: + scatter_list = None + obj_list = [None] + dist.scatter_object_list( + obj_list, + scatter_list, + # TODO: figure out how to get proper global rank as Fast-llm groups crash here with + # Group is not registered, please create group with torch.distributed.new_group API + # src=dist.get_global_rank(self.group, 0), + src=0, + group=self.group, + ) + inputs, attn_mask, labels, max_length, stop, generate, continue_generate, generation_kwargs = tuple( + obj_list[0] + ) + + if continue_generate == False: + break + + res = self._model_invoke_inner( + inputs, attn_mask, labels, max_length, stop, generate, **generation_kwargs + ) + + gather_list = None + dist.gather_object( + res, + gather_list, + # TODO: make proper rank mapping + # dst=dist.get_global_rank(self.group, 0), + dst=0, + group=self.group, + ) + else: + # TODO: implement distributed model support + assert self.group == dist.GroupMember.NON_GROUP_MEMBER + safe_barrier(self._distributed.world_group, "lm_eval_end") + + def stop_workers(self): + if self.group is None or (world_size := self.group.size()) == 1: + return + self._model_invoke(None, None, None, None, None, None, continue_generate=False) + safe_barrier(self._distributed.world_group, "lm_eval_end") + + def _model_invoke_inner(self, inputs, attn_mask, labels, max_length, stop, generate: bool, **generation_kwargs): + if generate: + return self._model_generate_inner(inputs, max_length, stop, **generation_kwargs) + else: + return self._model_call_inner(inputs, attn_mask, labels) + + def _model_call(self, inps, attn_mask=None, labels=None): + return self._model_invoke(inps, attn_mask, labels, None, None, generate=False, continue_generate=True) + + def _model_generate(self, context, max_length, stop, **generation_kwargs): + return self._model_invoke( + context, None, None, max_length, stop, generate=True, continue_generate=True, **generation_kwargs + ) + + def _model_call_inner(self, inps, attn_mask=None, labels=None): + """ + :param inps: torch.Tensor + A torch tensor of shape [batch, (sequence_ctx + sequence_cont)] or of shape + [batch, sequence_ctx]. the size of sequence may vary from call to call + :param attn_mask: torch.Tensor, optional + A torch tensor of shape [batch, (sequence_ctx + sequence_cont)]. Only passed + (and must be passed) if self.AUTO_MODEL_CLASS is transformers.AutoModelForSeq2SeqLM + :param labels: torch.Tensor, optional + A torch tensor of shape [batch, (sequence_ctx + sequence_cont)]. Only passed + (and must be passed) if self.AUTO_MODEL_CLASS is transformers.AutoModelForSeq2SeqLM + :return + A torch tensor of shape [batch, sequence, vocab] with the + logits returned from the model's decoder + """ + # TODO: do we need no_grad for our model? + with torch.no_grad(): + if attn_mask is not None or labels is not None: + assert attn_mask is not None and labels is not None + return self.model( + input_ids=inps, + attention_mask=attn_mask, + labels=labels, + position_ids=None, + past_key_values=None, + inputs_embeds=None, + use_cache=False, + output_attentions=False, + output_hidden_states=False, + return_dict=True, + ).logits + else: + return self.model( + input_ids=inps, + attention_mask=None, + position_ids=None, + past_key_values=None, + inputs_embeds=None, + labels=None, + use_cache=False, + output_attentions=False, + output_hidden_states=False, + return_dict=True, + ).logits + + def _model_generate_inner(self, context, max_length, stop, **generation_kwargs): + # temperature = 0.0 if not set + # if do_sample is false and temp==0.0: + # remove temperature, as do_sample=False takes care of this + # and we don't want a warning from HF + generation_kwargs["temperature"] = generation_kwargs.get("temperature", 0.0) + do_sample = generation_kwargs.get("do_sample", None) + + # The temperature has to be a strictly positive float -- if it is 0.0, use greedy decoding strategies + if generation_kwargs.get("temperature") == 0.0 and do_sample is None: + generation_kwargs["do_sample"] = do_sample = False + + if do_sample is False and generation_kwargs.get("temperature") == 0.0: + generation_kwargs.pop("temperature") + # build stopping criteria + stopping_criteria = stop_sequences_criteria(self.tokenizer, stop, context.shape[1], context.shape[0]) + return self.model.generate( + input_ids=context, + max_length=max_length, + stopping_criteria=stopping_criteria, + pad_token_id=self.tokenizer.pad_token_id, + use_cache=False, + **generation_kwargs, + ) + + @property + def config(self): + # return the associated transformers.AutoConfig for the given pretrained model. + return self._config + + @property + def model(self): + # returns the model, unwrapping it if using Accelerate + if hasattr(self, "accelerator"): + return self.accelerator.unwrap_model(self._model) + else: + return self._model + + @property + def eot_token_id(self): + # we use EOT because end of *text* is more accurate for what we're doing than end of *sentence* + return self.tokenizer.eos_token_id + + @property + def prefix_token_id(self): + # it is used as prefix for loglikelihood + if self.custom_prefix_token_id is not None: + return self.custom_prefix_token_id + if self.tokenizer.bos_token_id is not None: + return self.tokenizer.bos_token_id + return self.tokenizer.eos_token_id + + @property + def max_length(self): + if self._max_length: # if max length manually set, return it + return self._max_length + seqlen_config_attrs = ("n_positions", "max_position_embeddings", "n_ctx") + for attr in seqlen_config_attrs: + if hasattr(self.model.config, attr): + return getattr(self.model.config, attr) + if hasattr(self.tokenizer, "model_max_length"): + if self.tokenizer.model_max_length == 1000000000000000019884624838656: + return self._DEFAULT_MAX_LENGTH + return self.tokenizer.model_max_length + return self._DEFAULT_MAX_LENGTH + + @property + def max_gen_toks(self) -> int: + return 256 + + @property + def batch_size(self): + return self.batch_size_per_gpu + + @property + def device(self): + return self._device + + @property + def rank(self): + return self._rank + + @property + def world_size(self): + return self._world_size + + @property + def tokenizer_name(self) -> str: + return self.tokenizer.name_or_path.replace("/", "__") + + def tok_encode(self, string: str, left_truncate_len=None, add_special_tokens=None) -> List[int]: + """ """ + # default for None - empty dict, use predefined tokenizer param + # used for all models except for CausalLM or predefined value + special_tokens_kwargs = {} + + # by default for CausalLM - false or self.add_bos_token is set + if add_special_tokens is None: + if self.backend == "causal": + special_tokens_kwargs = {"add_special_tokens": False or self.add_bos_token} + # otherwise the method explicitly defines the value + else: + special_tokens_kwargs = {"add_special_tokens": add_special_tokens} + + encoding = self.tokenizer.encode(string, **special_tokens_kwargs) + + # left-truncate the encoded context to be at most `left_truncate_len` tokens long + if left_truncate_len: + encoding = encoding[-left_truncate_len:] + + return encoding + + def tok_batch_encode( + self, + strings: List[str], + padding_side: str = "left", + left_truncate_len: int = None, + truncation: bool = False, + ) -> Tuple[torch.Tensor, torch.Tensor]: + # encode a batch of strings. converts to tensors and pads automatically, unlike tok_encode. + old_padding_side = self.tokenizer.padding_side + self.tokenizer.padding_side = padding_side + + add_special_tokens = {} + if self.backend == "causal": + add_special_tokens = {"add_special_tokens": False or self.add_bos_token} + + encoding = self.tokenizer( + strings, + truncation=truncation, + padding="longest", + return_tensors="pt", + **add_special_tokens, + ) + if left_truncate_len: + original_lengths = encoding["input_ids"].size(1) + if original_lengths > left_truncate_len: + eval_logger.warn( + f"Left truncation applied. Original sequence length was {original_lengths}, " + f"truncating to last {left_truncate_len} tokens. Some content will be lost.", + ) + encoding["input_ids"] = encoding["input_ids"][:, -left_truncate_len:] + encoding["attention_mask"] = encoding["attention_mask"][:, -left_truncate_len:] + self.tokenizer.padding_side = old_padding_side + + return encoding["input_ids"], encoding["attention_mask"] + + def tok_decode(self, tokens, skip_special_tokens=True): + return self.tokenizer.decode(tokens, skip_special_tokens=skip_special_tokens) + + def _select_cont_toks(self, logits: torch.Tensor, contlen: int = None, inplen: int = None) -> torch.Tensor: + if self.backend == "causal": + assert contlen and inplen, "Must pass input len and cont. len to select scored logits for causal LM" + # discard right-padding. + # also discard the input/context tokens. we'll only score continuations. + logits = logits[inplen - contlen : inplen] + elif self.backend == "seq2seq": + assert contlen and not inplen, "Selecting scored logits for Seq2SeqLM requires only cont. len" + # only discard right-padding. + # the logits input to this fn only contain decoder-side tokens. + logits = logits[:contlen] + + return logits + + def loglikelihood_rolling(self, requests: List[Instance], disable_tqdm: bool = False) -> List[float]: + adaptive_batch_size = None + if self.batch_size == "auto": + # using rolling window with maximum context + print("Passed argument batch_size = auto. Detecting largest batch size") + batch_size = self._detect_batch_size() + print(f"Determined Largest batch size: {batch_size}") + adaptive_batch_size = batch_size + + # First, collect all windows from all requests + all_windows = [] # List of (request_idx, window) tuples + request_window_counts = [] # Track number of windows per request + + for req_idx, (string,) in enumerate( + tqdm( + [req.args for req in requests], + disable=(disable_tqdm or (self.rank != 0)), + ) + ): + rolling_token_windows: List[Tuple[List[int], List[int]]] = list( + map( + utils.make_disjoint_window, + utils.get_rolling_token_windows( + token_list=self.tok_encode(string), + prefix_token=self.prefix_token_id, + max_seq_len=self.max_length, + context_len=1, + ), + ) + ) + + # TODO: Right now, we pass single EOT token to the Encoder and the full context to the decoder, in seq2seq case + windows = [(None,) + x for x in rolling_token_windows] + + # Store windows with their request index + all_windows.extend((req_idx, window) for window in windows) + request_window_counts.append(len(windows)) + + # Handle distributed case padding + pad_amnt = 0 + if self.world_size > 1: + mytensor = torch.tensor(len(all_windows), device=self.device) + gathered = self.accelerator.gather(mytensor).cpu().detach().numpy().tolist() + pad_amnt = max(gathered) - gathered[self.rank] + if pad_amnt > 0: + all_windows += pad_amnt * [all_windows[0]] + + all_nlls = [] + batch_size = adaptive_batch_size or self.batch_size + for i in range(0, len(all_windows), batch_size): + batch = all_windows[i : i + batch_size] + # Extract just the windows for processing, keeping track of request indices + batch_indices, batch_windows = zip(*batch) + + batch_nlls = self._loglikelihood_tokens( + requests=batch_windows, + disable_tqdm=False, + override_bs=len(batch_windows), + ) + # Store results with their request indices + all_nlls.extend(zip(batch_indices, batch_nlls)) + + # Remove padding if necessary + if (self.world_size > 1) and (pad_amnt > 0): + all_nlls = all_nlls[:-pad_amnt] + + # Reconstruct per-request loglikelihoods + loglikelihoods = [] + current_idx = 0 + for window_count in request_window_counts: + # Get all nlls for this request + request_nlls = all_nlls[current_idx : current_idx + window_count] + # Sum up the nlls for this request (discarding is_greedy) + request_total = sum(nll[0] for _, nll in request_nlls) + loglikelihoods.append(request_total) + current_idx += window_count + + string = requests[len(loglikelihoods) - 1].args[0] + self.cache_hook.add_partial("loglikelihood_rolling", (string,), request_total) + + return loglikelihoods + + def _batch_scheduler(self, pos, n_reordered_requests): + sched = pos // int(len(n_reordered_requests) / self.batch_schedule) + if sched in self.batch_sizes: + return self.batch_sizes[sched] + if (len(self.batch_sizes) > 1) and (self.batch_sizes[sched - 1] == self.max_batch_size): + # if previous batch size is already maximal, skip recomputation + self.batch_sizes[sched] = self.max_batch_size + return self.batch_sizes[sched] + print(f"Passed argument batch_size = auto:{self.batch_schedule}. Detecting largest batch size") + self.batch_sizes[sched] = self._detect_batch_size(n_reordered_requests, pos) + print(f"Determined largest batch size: {self.batch_sizes[sched]}") + return self.batch_sizes[sched] + + def _loglikelihood_tokens( + self, + requests: List[Tuple[Tuple[str, str], List[int], List[int]]], + disable_tqdm: bool = False, + override_bs: int = None, + ) -> List[Tuple[float, bool]]: + # TODO: implement some kind of efficient-request-middleware that lumps together requests with the same context + res = [] + + def _collate(req: Tuple[Tuple[str, str], List[int], List[int]]): + """Defines the key for the sorted method""" + # the negative sign on len(toks) sorts descending - this has a few advantages: + # - time estimates will always be over not underestimates, which is more useful for planning + # - to know the size of a batch when going through the list, you know the first one is always the batch + # padded context length. this is useful to simplify the batching logic and more importantly to make + # automatic adaptive batches much much easier to implement + # - any OOMs will happen right away rather than near the end + + toks = req[1] + req[2] + return -len(toks), tuple(toks) + + def _lookup_one_token_cont(req: Tuple[Tuple[str, str], List[int], List[int]]): + """Defines the key to group and lookup one-token continuations""" + # Use with group_by="contexts" (optional)" + # allows for the creation of a lookup, so we can reuse logits in case of one-token continuations. + # speeds up some multiple-choice tasks proportionally to the number of choices. + # groups requests by context+continuation[:-1] and infer on one request/group. + return req[-2] + req[-1][:-1] + + re_ord = Collator( + requests, + sort_fn=_collate, + group_by="contexts" if self.backend == "causal" and self.logits_cache else None, + group_fn=_lookup_one_token_cont, + ) + + # automatic (variable) batch size detection for vectorization + # pull longest context sample from request + n_reordered_requests = len(re_ord) + batch_size = self.batch_size if self.batch_size != "auto" else override_bs if override_bs is not None else 0 + batch_fn = ( + self._batch_scheduler + if self.batch_size == "auto" and n_reordered_requests > 0 and not override_bs + else None + ) + + chunks = re_ord.get_batched(n=batch_size, batch_fn=batch_fn) + pbar = tqdm( + total=len(requests), + disable=(disable_tqdm or (self.rank != 0)), + desc="Running loglikelihood requests", + ) + for chunk in chunks: + inps = [] + cont_toks_list = [] + inplens = [] + + conts = [] + encoder_attns = [] + + padding_len_inp = None + padding_len_cont = None + # because vectorizing is annoying, we first convert each (context, continuation) pair to padded + # tensors, then we pack them together into a batch, call the model, and then pick it all apart + # again because vectorizing is annoying + + for _, context_enc, continuation_enc in chunk: + # sanity check + assert len(context_enc) > 0 + assert len(continuation_enc) > 0 + assert len(continuation_enc) <= self.max_length + + # how this all works (illustrated on a causal decoder-only setup): + # CTX CONT + # inp 0 1 2 3|4 5 6 7 8 9 <- last token is deleted by inp[:, :-1] + # model \ \ + # logits 1 2 3|4 5 6 7 8 9 <- the ctx half gets tossed out by the + # cont_toks 4 5 6 7 8 9 [:, -len(continuation_enc):, :self.vocab_size] slice + + # when too long to fit in context, truncate from the left + if self.backend == "causal": + total_length = len(context_enc) + len(continuation_enc) + if total_length > self.max_length + 1: + eval_logger.warn( + f"Combined length of context ({len(context_enc)}) and continuation ({len(continuation_enc)}) " + f"exceeds model's maximum length ({self.max_length}). " + f"Truncating {total_length - self.max_length + 1} tokens from the left." + ) + inp = torch.tensor( + (context_enc + continuation_enc)[-(self.max_length + 1) :][:-1], + dtype=torch.long, + device=self.device, + ) + (inplen,) = inp.shape + elif self.backend == "seq2seq": + inp = torch.tensor( + (context_enc)[-self.max_length :], + dtype=torch.long, + device=self.device, + ) + (inplen,) = inp.shape + + # build encoder attn masks + encoder_attns.append(torch.ones_like(inp)) + + cont = torch.tensor( + (continuation_enc)[-self.max_length :], + # TODO: left-shift these? + # TODO: our code assumes we never end up truncating conts for either model type + dtype=torch.long, + device=self.device, + ) + (contlen,) = cont.shape + + conts.append(cont) + + padding_len_cont = max(padding_len_cont, contlen) if padding_len_cont is not None else contlen + + padding_len_inp = max(padding_len_inp, inplen) if padding_len_inp is not None else inplen + + inps.append(inp) # [1, inp_length] + cont_toks_list.append(continuation_enc) + inplens.append(inplen) + + # create encoder attn mask and batched conts, if seq2seq + call_kwargs = {} + if self.backend == "causal": + batched_inps = pad_and_concat(padding_len_inp, inps, padding_side="right") # [batch, padding_len_inp] + elif self.backend == "seq2seq": + # TODO: left-pad encoder inps and mask? + batched_inps = pad_and_concat(padding_len_inp, inps) # [batch, padding_len_inp] + batched_conts = pad_and_concat(padding_len_cont, conts) # [batch, padding_len_cont] + batched_encoder_mask = pad_and_concat(padding_len_inp, encoder_attns) # [batch, padding_len_inp] + call_kwargs = { + "attn_mask": batched_encoder_mask, + "labels": batched_conts, + } + + multi_logits = F.log_softmax( + self._model_call(batched_inps, **call_kwargs), dim=-1 + ) # [batch, padding_length (inp or cont), vocab] + + for (request_str, ctx_tokens, _), logits, inplen, cont_toks in zip( + chunk, multi_logits, inplens, cont_toks_list + ): + # Slice to original seq length + contlen = len(cont_toks) + # take only logits in the continuation + # (discard context toks if decoder-only ; discard right-padding) + # also discards + checks for "virtual tokens" in the causal LM's input window + # from prompt/prefix tuning tokens, if applicable + ctx_len = inplen + (logits.shape[0] - padding_len_inp) if self.backend == "causal" else None + logits = self._select_cont_toks(logits, contlen=contlen, inplen=ctx_len) + logits = logits.unsqueeze(0) # [1, seq, vocab] + + # Check if per-token argmax is exactly equal to continuation + greedy_tokens = logits.argmax(dim=-1) + + # check for one-token continuation cache hits. + # noop in case group_by != "contexts" or no cache hit and returns the + # original args. Otherwise, expands the logits batch dimension and yields each + # batch along with matching continuation tokens and prompt strings. + # logits -> [1, seq, vocab] + for request_str, cont_toks, logits in re_ord.get_cache( + req_str=request_str, + cxt_toks=ctx_tokens, + cont_toks=cont_toks, + logits=logits, + ): + cont_toks = torch.tensor(cont_toks, dtype=torch.long, device=self.device).unsqueeze(0) # [1, seq] + max_equal = (greedy_tokens == cont_toks).all() + + # Obtain log-probs at the corresponding continuation token indices + # last_token_slice = logits[:, -1, :].squeeze(0).tolist() + logits = torch.gather(logits, 2, cont_toks.unsqueeze(-1)).squeeze(-1) # [1, seq] + + # Answer: (log prob, is-exact-match) + answer = (float(logits.sum()), bool(max_equal)) + + res.append(answer) + + if request_str is not None: + # special case: loglikelihood_rolling produces a number of loglikelihood requests + # all with cache key None. instead do add_partial on the per-example level + # in the loglikelihood_rolling() function for those. + self.cache_hook.add_partial("loglikelihood", request_str, answer) + pbar.update(1) + + pbar.close() + + return re_ord.get_original(res) + + def generate_until(self, requests: List[Instance], disable_tqdm: bool = False) -> List[str]: + res = [] + + def _collate(req: Tuple[str, dict]): + """Defines the key for the sorted method""" + # the negative sign on len(toks) sorts descending - this has a few advantages: + # - time estimates will always be over not underestimates, which is more useful for planning + # - to know the size of a batch when going through the list, you know the first one is always the batch + # padded context length. this is useful to simplify the batching logic and more importantly to make + # automatic adaptive batches much much easier to implement + # - any OOMs will happen right away rather than near the end + toks = self.tok_encode(req[0]) + return -len(toks), req[0] + + pbar = tqdm( + total=len(requests), + disable=(disable_tqdm or (self.rank != 0)), + desc="Running generate_until requests", + ) + adaptive_batch_size = None + if self.batch_size == "auto": + # using rolling window with maximum context + print("Passed argument batch_size = auto. Detecting largest batch size") + batch_size = self._detect_batch_size() + print(f"Determined Largest batch size: {batch_size}") + adaptive_batch_size = batch_size + # for each different set of kwargs, we execute all requests, by batch. + batch_size = ( + self.batch_size + if self.batch_size != "auto" + else adaptive_batch_size if adaptive_batch_size is not None else 0 + ) + batch_fn = self._batch_scheduler if self.batch_size == "auto" and not adaptive_batch_size else None + + # we group requests by their generation_kwargs, + # so that we don't try to execute e.g. greedy sampling and temp=0.8 sampling + # in the same batch. + # group_fn=lambda x: x[1] -> x=(context, gen_kwargs) + re_ords = Collator( + [reg.args for reg in requests], + sort_fn=_collate, + group_by="gen_kwargs", + group_fn=lambda x: x[1], + ) + chunks = re_ords.get_batched(n=batch_size, batch_fn=batch_fn) + eos = self.tok_decode(self.eot_token_id, skip_special_tokens=False) + for chunk in chunks: + contexts, all_gen_kwargs = zip(*chunk) + # we assume all gen kwargs in the batch are the same + # this is safe to assume because the `grouper` object ensures it. + gen_kwargs = all_gen_kwargs[0] + # unpack our keyword arguments. + if isinstance(gen_kwargs, dict): + kwargs = copy.deepcopy(gen_kwargs) # edge case for repeats > 1 + # add EOS token to stop sequences + until = handle_stop_sequences(kwargs.pop("until", None), eos=eos) + else: + raise ValueError(f"Expected `kwargs` to be of type `dict` but got {type(gen_kwargs)}") + if "max_gen_toks" in kwargs.keys(): + max_gen_toks = kwargs.pop("max_gen_toks") + else: + max_gen_toks = self.max_gen_toks + + # set the max length in tokens of inputs ("context_enc") + if self.backend == "causal": + # max len for inputs = max length, minus room to generate the max new tokens + max_ctx_len = self.max_length - max_gen_toks + assert ( + max_ctx_len > 0 + ), f"Invalid configuration: requested max tokens to generate ({max_gen_toks}) must be less than model's maximum sequence length ({self.max_length})." + elif self.backend == "seq2seq": + # max len for inputs = encoder's whole max_length + max_ctx_len = self.max_length + + # encode, pad, and truncate contexts for this batch + context_enc, attn_masks = self.tok_batch_encode( + contexts, + left_truncate_len=max_ctx_len, + truncation=self.truncation, + ) + context_enc = context_enc.to(self.device) + attn_masks = attn_masks.to(self.device) + + if "max_length" not in kwargs: + kwargs["max_length"] = context_enc.shape[1] + max_gen_toks + + # perform batched generation + cont = self._model_generate( + context=context_enc, + attention_mask=attn_masks, + stop=until, + **kwargs, + ) + + cont_toks_list = cont.tolist() + for cont_toks, context in zip(cont_toks_list, contexts): + # discard context + left-padding toks if using causal decoder-only LM + if self.backend == "causal": + cont_toks = cont_toks[context_enc.shape[1] :] + + s = self.tok_decode(cont_toks) + + # use secondary stop seqs to cut off should-have-been-stopped content post-hoc + for term in until: + if len(term) > 0: + # ignore '' separator, + # for seq2seq case where self.tok_decode(self.eot_token_id) = '' + s = s.split(term)[0] + + res.append(s) + + self.cache_hook.add_partial("generate_until", (context, gen_kwargs), s) + pbar.update(1) + # reorder this group of results back to original unsorted form + res = re_ords.get_original(res) + + pbar.close() + + return res + + def apply_chat_template(self, chat_history: List[Dict[str, str]], add_generation_prompt: bool = True) -> str: + """ + Method to apply a chat template to a list of chat history between user and model. + """ + try: + chat_templated = self.tokenizer.apply_chat_template( + chat_history, + tokenize=False, + add_generation_prompt=add_generation_prompt, + continue_final_message=not add_generation_prompt, + ) + except jinja2.exceptions.TemplateError: + eval_logger.warning("Failed to apply chat template. removing the system role in chat history.") + chat_history = [msg for msg in chat_history if msg["role"] != "system"] + chat_templated = self.tokenizer.apply_chat_template( + chat_history, + tokenize=False, + add_generation_prompt=add_generation_prompt, + continue_final_message=not add_generation_prompt, + ) + + return chat_templated diff --git a/fast_llm/engine/training/lm_eval/utils.py b/fast_llm/engine/training/lm_eval/utils.py new file mode 100644 index 00000000..c9366189 --- /dev/null +++ b/fast_llm/engine/training/lm_eval/utils.py @@ -0,0 +1,544 @@ +import argparse +import json +import logging +import os +import pathlib +import sys +from functools import partial +from pathlib import Path +from typing import Union, Optional + +from lm_eval import utils +from lm_eval.evaluator import request_caching_arg_to_dict +from lm_eval.loggers import EvaluationTracker, WandbLogger +from lm_eval.tasks import TaskManager +from lm_eval.utils import handle_non_serializable, make_table, simple_parse_args_string + + +from lm_eval.api.model import LM + +eval_logger = logging.getLogger(__name__) + + +def try_parse_json(value: str) -> Union[str, dict, None]: + if value is None: + return None + try: + return json.loads(value) + except json.JSONDecodeError: + if "{" in value: + raise argparse.ArgumentTypeError(f"Invalid JSON: {value}. Hint: Use double quotes for JSON strings.") + return value + + +def _int_or_none_list_arg_type(min_len: int, max_len: int, defaults: str, value: str, split_char: str = ","): + def parse_value(item): + item = item.strip().lower() + if item == "none": + return None + try: + return int(item) + except ValueError: + raise argparse.ArgumentTypeError(f"{item} is not an integer or None") + + items = [parse_value(v) for v in value.split(split_char)] + num_items = len(items) + + if num_items == 1: + # Makes downstream handling the same for single and multiple values + items = items * max_len + elif num_items < min_len or num_items > max_len: + raise argparse.ArgumentTypeError(f"Argument requires {max_len} integers or None, separated by '{split_char}'") + elif num_items != max_len: + logging.warning( + f"Argument requires {max_len} integers or None, separated by '{split_char}'. " + "Missing values will be filled with defaults." + ) + default_items = [parse_value(v) for v in defaults.split(split_char)] + items.extend(default_items[num_items:]) # extend items list with missing defaults + + return items + + +def check_argument_types(parser: argparse.ArgumentParser): + """ + Check to make sure all CLI args are typed, raises error if not + """ + for action in parser._actions: + if action.dest != "help" and not action.const: + if action.type is None: + raise ValueError(f"Argument '{action.dest}' doesn't have a type specified.") + else: + continue + + +def setup_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser(formatter_class=argparse.RawTextHelpFormatter) + parser.add_argument("--model", "-m", type=str, default="hf", help="Name of model e.g. `hf`") + parser.add_argument( + "--tasks", + "-t", + default=None, + type=str, + metavar="task1,task2", + help="Comma-separated list of task names or task groupings to evaluate on.\nTo get full list of tasks, use one of the commands `lm-eval --tasks {{list_groups,list_subtasks,list_tags,list}}` to list out all available names for task groupings; only (sub)tasks; tags; or all of the above", + ) + parser.add_argument( + "--model_args", + "-a", + default="", + type=try_parse_json, + help="""Comma separated string or JSON formatted arguments for model, e.g. `pretrained=EleutherAI/pythia-160m,dtype=float32` or '{"pretrained":"EleutherAI/pythia-160m","dtype":"float32"}'""", + ) + parser.add_argument( + "--num_fewshot", + "-f", + type=int, + default=None, + metavar="N", + help="Number of examples in few-shot context", + ) + parser.add_argument( + "--batch_size", + "-b", + type=str, + default=1, + metavar="auto|auto:N|N", + help="Acceptable values are 'auto', 'auto:N' or N, where N is an integer. Default 1.", + ) + parser.add_argument( + "--max_batch_size", + type=int, + default=None, + metavar="N", + help="Maximal batch size to try with --batch_size auto.", + ) + parser.add_argument( + "--device", + type=str, + default=None, + help="Device to use (e.g. cuda, cuda:0, cpu).", + ) + parser.add_argument( + "--output_path", + "-o", + default=None, + type=str, + metavar="DIR|DIR/file.json", + help="The path to the output file where the result metrics will be saved. If the path is a directory and log_samples is true, the results will be saved in the directory. Else the parent directory will be used.", + ) + parser.add_argument( + "--limit", + "-L", + type=float, + default=None, + metavar="N|0 argparse.Namespace: + check_argument_types(parser) + return parser.parse_args(args) + + +def prepare_lm_eval_simple_eval_params( + cli_args: list[str], + completed_steps: int, + run_index: int, +) -> tuple[argparse.Namespace, dict[str, any]]: + """ + Parses CLI arguments for an LM evaluation run and prepares keyword arguments + for the `evaluate` function. + + This function wraps argument parsing, environment configuration, task resolution, + and metadata setup needed for evaluation with Fast-LLM's `lm_eval` wrapper. It also + handles special cases like hub token injection, dynamic sample loading, and task + listing commands. + + Args: + cli_args (list[str]): Command-line arguments, excluding the program name. + completed_steps (int): Current number of completed training steps, used to + uniquely tag evaluation output paths. + run_index (int): index of the current run of Fast-LLM experiment + + Returns: + tuple: + - argparse.Namespace: Parsed CLI arguments. + - dict: Keyword arguments to pass into `simple_evaluate`, including task list, + tracker, cache settings, random seeds, and generation parameters. + + Raises: + ValueError: If required fields like `--tasks` or `--output_path` are missing + when needed, or if misconfigured combinations are detected. + SystemExit: If special task listing flags are used. + """ + parser = setup_parser() + args = parse_eval_args(parser, cli_args) + + # NOTE: all this args are set by fast_llm on the model directly or not used here + assert not args.wandb_args # default empty string + assert not args.wandb_config_args # default empty string + assert args.model == "hf" # default value of 'hf' + assert not args.model_args # default empty string + assert args.batch_size == 1 # default value of 1 + assert args.max_batch_size is None + assert args.device is None + # if args.wandb_args: + # wandb_args_dict = simple_parse_args_string(args.wandb_args) + # wandb_config_args_dict = simple_parse_args_string(args.wandb_config_args) + # wandb_logger = WandbLogger(wandb_args_dict, wandb_config_args_dict) + + # TODO: change logging levels from fast_llm to lm_eval and then back? + # utils.setup_logging(args.verbosity) + # eval_logger = logging.getLogger(__name__) + + os.environ["TOKENIZERS_PARALLELISM"] = "false" + + # update the evaluation tracker args with the output path and the HF token + if args.output_path: + args.output_path = str(pathlib.Path(args.output_path) / f"runs/{run_index}/{completed_steps}") + args.hf_hub_log_args += f",output_path={args.output_path}" + if os.environ.get("HF_TOKEN", None): + args.hf_hub_log_args += f",token={os.environ.get('HF_TOKEN')}" + evaluation_tracker_args = simple_parse_args_string(args.hf_hub_log_args) + evaluation_tracker = EvaluationTracker(**evaluation_tracker_args) + + if args.predict_only: + args.log_samples = True + if (args.log_samples or args.predict_only) and not args.output_path: + raise ValueError("Specify --output_path if providing --log_samples or --predict_only") + + if args.fewshot_as_multiturn and args.apply_chat_template is False: + raise ValueError( + "When `fewshot_as_multiturn` is selected, `apply_chat_template` must be set (either to `True` or to the chosen template name)." + ) + + if args.include_path is not None: + eval_logger.info(f"Including path: {args.include_path}") + metadata = ( + simple_parse_args_string(args.model_args) + if isinstance(args.model_args, str) + else args.model_args if isinstance(args.model_args, dict) else {} + ) | (args.metadata if isinstance(args.metadata, dict) else simple_parse_args_string(args.metadata)) + + task_manager = TaskManager(include_path=args.include_path, metadata=metadata) + + if "push_samples_to_hub" in evaluation_tracker_args and not args.log_samples: + eval_logger.warning( + "Pushing samples to the Hub requires --log_samples to be set. Samples will not be pushed to the Hub." + ) + + if args.limit: + eval_logger.warning( + " --limit SHOULD ONLY BE USED FOR TESTING." "REAL METRICS SHOULD NOT BE COMPUTED USING LIMIT." + ) + if args.samples: + assert args.limit is None, "If --samples is not None, then --limit must be None." + if (samples := Path(args.samples)).is_file(): + args.samples = json.loads(samples.read_text()) + else: + args.samples = json.loads(args.samples) + + if args.tasks is None: + eval_logger.error("Need to specify task to evaluate.") + sys.exit() + elif args.tasks == "list": + print(task_manager.list_all_tasks()) + sys.exit() + elif args.tasks == "list_groups": + print(task_manager.list_all_tasks(list_subtasks=False, list_tags=False)) + sys.exit() + elif args.tasks == "list_tags": + print(task_manager.list_all_tasks(list_groups=False, list_subtasks=False)) + sys.exit() + elif args.tasks == "list_subtasks": + print(task_manager.list_all_tasks(list_groups=False, list_tags=False)) + sys.exit() + else: + if os.path.isdir(args.tasks): + import glob + + task_names = [] + yaml_path = os.path.join(args.tasks, "*.yaml") + for yaml_file in glob.glob(yaml_path): + config = utils.load_yaml_config(yaml_file) + task_names.append(config) + else: + task_list = args.tasks.split(",") + task_names = task_manager.match_tasks(task_list) + for task in [task for task in task_list if task not in task_names]: + if os.path.isfile(task): + config = utils.load_yaml_config(task) + task_names.append(config) + task_missing = [ + task for task in task_list if task not in task_names and "*" not in task + ] # we don't want errors if a wildcard ("*") task name was used + + if task_missing: + missing = ", ".join(task_missing) + eval_logger.error( + f"Tasks were not found: {missing}\n" + f"{utils.SPACING}Try `lm-eval --tasks list` for list of available tasks", + ) + raise ValueError( + f"Tasks not found: {missing}. Try `lm-eval --tasks {{list_groups,list_subtasks,list_tags,list}}` to list out all" + " available names for task groupings; only (sub)tasks; tags; or all of the above, or pass '--verbosity DEBUG'" + " to troubleshoot task registration issues." + ) + + # Respect user's value passed in via CLI, otherwise default to True and add to comma-separated model args + if args.trust_remote_code: + eval_logger.info( + "Passed `--trust_remote_code`, setting environment variable `HF_DATASETS_TRUST_REMOTE_CODE=true`" + ) + # HACK: import datasets and override its HF_DATASETS_TRUST_REMOTE_CODE value internally, + # because it's already been determined based on the prior env var before launching our + # script--`datasets` gets imported by lm_eval internally before these lines can update the env. + import datasets + + datasets.config.HF_DATASETS_TRUST_REMOTE_CODE = True + + args.model_args = args.model_args + ",trust_remote_code=True" + ( + eval_logger.info(f"Selected Tasks: {task_names}") + if eval_logger.getEffectiveLevel() >= logging.INFO + else print(f"Selected Tasks: {task_names}") + ) + + request_caching_args = request_caching_arg_to_dict(cache_requests=args.cache_requests) + + eval_kwargs = dict( + tasks=task_names, + num_fewshot=args.num_fewshot, + # batch_size=args.batch_size, + # max_batch_size=args.max_batch_size, + # device=args.device, + use_cache=args.use_cache, + limit=args.limit, + samples=args.samples, + check_integrity=args.check_integrity, + write_out=args.write_out, + log_samples=args.log_samples, + evaluation_tracker=evaluation_tracker, + system_instruction=args.system_instruction, + apply_chat_template=args.apply_chat_template, + fewshot_as_multiturn=args.fewshot_as_multiturn, + gen_kwargs=args.gen_kwargs, + task_manager=task_manager, + predict_only=args.predict_only, + random_seed=args.seed[0], + numpy_random_seed=args.seed[1], + torch_random_seed=args.seed[2], + fewshot_random_seed=args.seed[3], + confirm_run_unsafe_code=args.confirm_run_unsafe_code, + metadata=metadata, + **request_caching_args, + ) + + return args, eval_kwargs + + +def process_lm_eval_results( + args: argparse.Namespace, + results: dict[str, any], + evaluation_tracker: EvaluationTracker, + completed_steps: int, + consumed_samples: int, + consumed_tokens: int, +) -> None: + if results is not None: + import wandb + + if args.log_samples: + samples = results.pop("samples") + dumped = json.dumps(results, indent=2, default=handle_non_serializable, ensure_ascii=False) + if args.show_config: + print(dumped) + + batch_sizes = ",".join(map(str, results["config"]["batch_sizes"])) + + # Add W&B logging if we have the run to log to + # we expect the rest of the fast_llm code will finish the run. + if wandb.run is not None: + try: + wandb_logger = WandbLogger(init_args={"step": completed_steps}) + wandb_logger.post_init(results) + wandb_logger.log_eval_result() + if args.log_samples: + wandb_logger.log_eval_samples(samples) + except Exception as e: + eval_logger.info(f"Logging to Weights and Biases failed due to {e}") + + evaluation_tracker.save_results_aggregated(results=results, samples=samples if args.log_samples else None) + + if args.log_samples: + for task_name, config in results["configs"].items(): + evaluation_tracker.save_results_samples(task_name=task_name, samples=samples[task_name]) + + if evaluation_tracker.push_results_to_hub or evaluation_tracker.push_samples_to_hub: + evaluation_tracker.recreate_metadata_card() + + # TODO: convert to logging entries instead? + print( + f"{results["config"]["model"]}, gen_kwargs: ({args.gen_kwargs}), limit: {args.limit}, num_fewshot: {args.num_fewshot}, " + f"batch_size: {results["config"]["batch_size"]}{f' ({batch_sizes})' if batch_sizes else ''}" + ) + print(make_table(results)) + if "groups" in results: + print(make_table(results, "groups")) diff --git a/fast_llm/engine/training/trainer.py b/fast_llm/engine/training/trainer.py index 66f1ad86..a984a681 100644 --- a/fast_llm/engine/training/trainer.py +++ b/fast_llm/engine/training/trainer.py @@ -18,11 +18,13 @@ from fast_llm.engine.distributed.distributed import Distributed from fast_llm.engine.inference.runner import InferenceRunner from fast_llm.engine.multi_stage.config import StageMode +from fast_llm.engine.multi_stage.fast_llm_model import FastLLMModel from fast_llm.engine.optimizer.config import ParamGroup from fast_llm.engine.optimizer.optimizer import Optimizer from fast_llm.engine.schedule.runner import ScheduleRunner from fast_llm.engine.schedule.schedule import Schedule from fast_llm.engine.training.config import TrainerConfig, TrainingCheckpointBaseConfig, TrainingCheckpointConfig +from fast_llm.engine.training.evaluator import Evaluator from fast_llm.engine.training.wandb import Wandb from fast_llm.logging import format_metrics, get_memory_usage_mib, log_memory_usage from fast_llm.utils import Assert @@ -41,13 +43,20 @@ class Trainer[ConfigType: TrainerConfig](Configurable[ConfigType], abc.ABC): _completed_steps: int + _is_evaluation_only: bool + + _evaluator: Evaluator + def __init__(self, config: TrainerConfig): super().__init__(config) + + self._is_evaluation_only = config.training.train_iters == 0 + self._data = self._get_data() log_main_rank("Creating model...") self._multi_stage = self._config.model.get_model_class()( self._config.model, - optimizer_state_names=self._config.optimizer.state_names(), + optimizer_state_names=self._config.optimizer.state_names() if not self._is_evaluation_only else (), ) self._reference_models = {} for name, reference_config in self._config.reference_models.items(): @@ -65,45 +74,46 @@ def __init__(self, config: TrainerConfig): multi_stage=self._multi_stage, distributed_config=self._config.model.distributed, ) - steps_per_split = { - PhaseType.training: {PhaseType.training.value.lower(): self._config.training.train_iters}, - PhaseType.validation: { - dataset_name: self._config.training.evaluations[dataset_name].get_iteration_count( - self._config.training.train_iters, - # There may be an extra evaluation after the last training step. - not self._config.training.evaluations[dataset_name].enabled(self._config.training.train_iters), - ) - for dataset_name in self._config.training.evaluations.keys() - }, - PhaseType.test: {PhaseType.test.value.lower(): self._config.training.test_iters}, - } - self._samples_per_split = { - phase: { - dataset_name: self._config.batch.batch_size * steps - for dataset_name, steps in datasets.items() - if steps > 0 - } - for phase, datasets in steps_per_split.items() - } - # Prune empty phases. - self._samples_per_split = {k: v for k, v in self._samples_per_split.items() if len(v) > 0} - self._loss_defs = self._multi_stage.base_model.loss_defs - # Setup the schedules - self._schedule = { - phase: { - dataset_name: Schedule( - multi_stage=self._multi_stage, - batch_config=self._config.batch, - schedule_config=self._config.schedule, - distributed_config=self._config.model.distributed, - phase=phase, - ) - for dataset_name in datasets + if not self._is_evaluation_only: + steps_per_split = { + PhaseType.training: {PhaseType.training.value.lower(): self._config.training.train_iters}, + PhaseType.test: {PhaseType.test.value.lower(): self._config.training.test_iters}, } - for phase, datasets in self._samples_per_split.items() - } + + self._samples_per_split = { + phase: { + dataset_name: self._config.batch.batch_size * steps + for dataset_name, steps in datasets.items() + if steps > 0 + } + for phase, datasets in steps_per_split.items() + } + # Prune empty phases. + self._samples_per_split = {k: v for k, v in self._samples_per_split.items() if len(v) > 0} + + # Setup the schedules + self._schedule = { + phase: { + dataset_name: Schedule( + multi_stage=self._multi_stage, + batch_config=self._config.batch, + schedule_config=self._config.schedule, + distributed_config=self._config.model.distributed, + phase=phase, + ) + for dataset_name in datasets + } + for phase, datasets in self._samples_per_split.items() + } + else: + self._samples_per_split = {} + + self._evaluator = Evaluator( + config=self._config, + get_tflops_func=self.get_tflops, + ) def setup(self, distributed: Distributed, run: Run) -> None: assert distributed.config is self._config.model.distributed @@ -121,18 +131,23 @@ def setup(self, distributed: Distributed, run: Run) -> None: reference_model.fast_llm_model.setup(distributed, StageMode.inference) reference_model.setup() + # TODO: Check with Joel if this will be enought not to allocate grad buffers. # Setup the optimizer. - param_groups, grads_for_norm = self._multi_stage.get_param_groups(ParamGroup) - self._optimizer = self._config.optimizer.optimizer_cls( - self._config.optimizer, - param_groups=param_groups, - grads_for_norm=grads_for_norm, - distributed=self._distributed, - ) + if self._is_evaluation_only: + self._optimizer = None + else: + param_groups, grads_for_norm = self._multi_stage.get_param_groups(ParamGroup) + self._optimizer = self._config.optimizer.optimizer_cls( + self._config.optimizer, + param_groups=param_groups, + grads_for_norm=grads_for_norm, + distributed=self._distributed, + ) # Setup the schedules. with torch.no_grad(): self._runner.setup(distributed, self._optimizer) + # Setup the datasets. log_main_rank("Preparing datasets...") self._data.setup( @@ -141,10 +156,25 @@ def setup(self, distributed: Distributed, run: Run) -> None: dataset_name: self._get_sampling_parameters({"num_samples": samples}) for datasets in self._samples_per_split.values() for dataset_name, samples in datasets.items() + } + | { + dataset_name: self._get_sampling_parameters({"num_samples": samples}) + for dataset_name, samples in self._evaluator.get_datasets_samples().items() }, None if run.experiment_directory is None else run.experiment_directory / "dataset_cache", timeout=self._config.training.timeout, ) + + # Must be called with all arguments set up + self._evaluator.setup( + distributed=self._distributed, + run=self._run, + multi_stage=self._multi_stage, + runner=self._runner, + data=self._data, + wandb=self._wandb, + ) + self._is_setup = True @abc.abstractmethod @@ -166,21 +196,21 @@ def _consumed_tokens(self) -> int: assert self._is_setup return self._consumed_samples * self._config.batch.sequence_length - def _get_completed_evaluation_steps(self, dataset_name) -> int: - # Number of evaluations steps performed before the current step - return self._config.training.evaluations[dataset_name].get_iteration_count(self._completed_steps - 1) - def run(self) -> None: assert self._is_setup with self._wandb: self._run_training() def _run_training(self) -> None: - self._prepare_training_state() + self._prepare_model_state() + log_main_rank("done with setup ...") log_pipeline_parallel_main_rank(lambda: log_memory_usage(f"After initial setup", str)) self._run.save_logged_tensors("init") + if self._is_evaluation_only: + assert len(self._samples_per_split) == 0 + if PhaseType.training in self._samples_per_split: done = self._completed_steps >= self._config.training.train_iters if done: @@ -189,13 +219,21 @@ def _run_training(self) -> None: else: done, metrics = self._train() else: - done, metrics = True, {} + metrics = {} + done = True + self._evaluator.run( + metrics=metrics, + done=done, + completed_steps=0, + consumed_samples=0, + consumed_tokens=0, + ) if done and PhaseType.test in self._samples_per_split: log_main_rank(lambda: f"Running test phase ...") test_iterator = self._get_data_iterator(PhaseType.test.value.lower()) metrics_key = PhaseType.test.value - metrics[metrics_key] = self._evaluate( + metrics[metrics_key] = self._evaluate_loss( data_iterator=test_iterator, phase=PhaseType.test, num_iters=self._config.training.test_iters, @@ -223,7 +261,6 @@ def _train(self) -> tuple[bool, dict[PhaseType, dict[str, typing.Any]]]: self._completed_steps, self._config.training.prefetch_factor, ) - evaluation_iterators = {name: None for name in self._config.training.evaluations.keys()} log_main_rank("Training ...") @@ -323,49 +360,16 @@ def _train(self) -> tuple[bool, dict[PhaseType, dict[str, typing.Any]]]: done = self._completed_steps >= self._config.training.train_iters # TODO: Signal-based stop. stop = done or self._config.training.shutdown.enabled(self._completed_steps) + # Evaluation # TODO: Adjust valid iterator length. - if PhaseType.validation in self._samples_per_split and ( - done - or any( - evaluation_conf.enabled(self._completed_steps) - for evaluation_conf in self._config.training.evaluations.values() - ) - ): - formatted_metrics = [] - for dataset_name, evaluation_conf in self._config.training.evaluations.items(): - if not evaluation_conf.enabled(self._completed_steps): - continue - if evaluation_iterators[dataset_name] is None: - evaluation_iterators[dataset_name] = self._get_data_iterator( - dataset_name, self._get_completed_evaluation_steps(dataset_name) - ) - # TODO: formatting metric category as Validation.evaluation_dataset_name - # maybe format each metric with evaluation_dataset_name prefix instead? - # TODO: setting performance metrics per evaluation dataset - # maybe to set aggregate performance metrics for all evaluations datasets? - metric_key = f"{PhaseType.validation.value}.{dataset_name}" - metrics[metric_key] = self._evaluate( - data_iterator=evaluation_iterators[dataset_name], - phase=PhaseType.validation, - num_iters=evaluation_conf.iterations, - begin_iter=self._get_completed_evaluation_steps(dataset_name), - dataset_name=dataset_name, - ) - formatted_metrics.append( - format_metrics( - metrics[metric_key], - self._loss_defs, - PhaseType.validation, - dataset_name=dataset_name, - ) - ) - - if len(formatted_metrics) > 0: - formatted_metrics = "\n".join(formatted_metrics) - log_main_rank(formatted_metrics) - if self._config.training.wandb.alert.enabled(self._completed_steps): - self._wandb.alert("Validation results", formatted_metrics, "INFO") + self._evaluator.run( + metrics=metrics, + done=done, + completed_steps=self._completed_steps, + consumed_samples=self._consumed_samples, + consumed_tokens=self._consumed_tokens, + ) if is_main_rank() and metrics: self._wandb.log_metrics(self._completed_steps, metrics) @@ -379,55 +383,6 @@ def _train(self) -> tuple[bool, dict[PhaseType, dict[str, typing.Any]]]: profiler.step() return done, metrics - def _evaluate( - self, - *, - data_iterator: typing.Iterator, - phase: PhaseType, - num_iters: int, - begin_iter: int = 0, - dataset_name: str | None = None, - ) -> dict[str, float | int]: - full_phase_name = phase.value if dataset_name is None else f"{phase.value}_{dataset_name}" - safe_barrier(self._distributed.world_group, f"{full_phase_name} begin") - begin_time = time.perf_counter() - total_losses = {loss_def.name: 0.0 for loss_def in self._loss_defs} - for iter_ in range(num_iters): - iter_losses, _, _ = self._runner.run_step( - data_iterator, self._schedule[phase][dataset_name], iteration=begin_iter + iter_ - ) - for name, value in iter_losses.items(): - total_losses[name] += value - self._run.save_logged_tensors(f"{full_phase_name}_{self._completed_steps}_{iter_}") - - safe_barrier( - self._distributed.world_group, - f"{full_phase_name} end", - ) - end_time = time.perf_counter() - time_per_iteration = (end_time - begin_time) / num_iters - model_tflops, hardware_tflops = self.get_tflops(phase, time_per_iteration) - # TODO add other relevant eval metrics - metrics = { - "train_iters": self._config.training.train_iters, - "batch_size": self._config.batch.batch_size, - "iteration": self._completed_steps, - **{name: (value / num_iters) for name, value in total_losses.items()}, - "consumed_samples": self._consumed_samples, - "consumed_tokens": self._consumed_tokens, - "step_time_ms": time_per_iteration * 1000, - "model_tflops": model_tflops, - "hardware_tflops": hardware_tflops, - "tokens_per_sec_per_gpu": ( - (self._config.batch.sequence_length * self._config.batch.batch_size) - / self._config.model.distributed.world_size - / time_per_iteration - ), - **get_memory_usage_mib(), - } - - return metrics - def _get_data_iterator( self, dataset_name, completed_steps: int = 0, prefetch_factor: int | None = None ) -> typing.Iterator[typing.Any]: @@ -440,7 +395,7 @@ def _get_data_iterator( timeout=self._config.training.timeout, ) - def _prepare_training_state(self) -> None: + def _prepare_model_state(self) -> None: # Setup the training state. if (last_iteration := self._get_last_checkpoint()) is None: if (path := self._config.pretrained.path) is not None and self._config.pretrained.model_weights: @@ -451,9 +406,15 @@ def _prepare_training_state(self) -> None: ) self._multi_stage.load_checkpoint(self._config.pretrained) else: + if self._is_evaluation_only: + raise ValueError( + "Evaluation mode, model need to be trained first or pretrained checkpoint is provided for loading" + ) log_main_rank(f"Initializing training state from scratch...") self._multi_stage.initialize_weights() - self._optimizer.reset_state() + + if not self._is_evaluation_only: + self._optimizer.reset_state() self._completed_steps = 0 else: log_main_rank(lambda: f"Loading checkpoint from iteration {last_iteration}...") diff --git a/fast_llm/layers/language_model/head.py b/fast_llm/layers/language_model/head.py index 3b476f6a..5dd6e7b6 100644 --- a/fast_llm/layers/language_model/head.py +++ b/fast_llm/layers/language_model/head.py @@ -5,7 +5,7 @@ from torch.distributed import all_reduce from fast_llm.config import Configurable -from fast_llm.core.ops import split_op +from fast_llm.core.ops import gather_op, split_op from fast_llm.engine.base_model.base_model import Layer from fast_llm.engine.config_utils.tensor_space import DefaultDimNames, TensorDim, TensorSpace from fast_llm.engine.distributed.config import DistributedDimNames @@ -168,6 +168,14 @@ def _forward_backward( with torch.enable_grad(): ln_output = self.final_norm(input_) + if "output_hidden_states" in kwargs and kwargs["output_hidden_states"]: + # The last hidden layer output is returned normalized in the HF Transformers-style output, at least for LLama style models. + # So, if needed, we gather the data after normalization and set it as the output of the previous layer. + group = self._tensor_space.distributed.tensor_group if self._parallel_embeddings else None + sequence_parallel = self._sequence_parallel and self._parallel_embeddings + hidden_state = gather_op(ln_output.detach(), group, dim=0) if sequence_parallel else ln_output.detach() + kwargs["hidden_states"][len(kwargs["hidden_states"]) - 1]["tensor"] = hidden_state + grad_output = kwargs[TransformerKwargs.grad_output] / ( self._group_size if self._sequence_parallel_logits else 1 ) diff --git a/fast_llm/layers/transformer/preprocessing.py b/fast_llm/layers/transformer/preprocessing.py index 2415a2f9..1e07c1c1 100644 --- a/fast_llm/layers/transformer/preprocessing.py +++ b/fast_llm/layers/transformer/preprocessing.py @@ -239,7 +239,7 @@ def preprocess(self, batch, kwargs: dict[str, typing.Any]) -> None: ] if (sequence_lengths := kwargs.get(TransformerKwargs.sequence_lengths, None)) is not None: seq_ids = torch.stack( - [torch.cat([torch.arange(x) for x in sample_lens]) for sample_lens in sequence_lengths] + [torch.cat([torch.full((x,), i) for i, x in enumerate(sample_lens)]) for sample_lens in sequence_lengths] ) document_mask = (seq_ids[:, None, :] == seq_ids[:, :, None]).to(self._tensor_space.distributed.device) kwargs[TransformerKwargs.attention_mask] = ( diff --git a/fast_llm/models/custom/config.py b/fast_llm/models/custom/config.py index 1c82b59e..5c87a4ca 100644 --- a/fast_llm/models/custom/config.py +++ b/fast_llm/models/custom/config.py @@ -47,7 +47,7 @@ def get_model_class(cls) -> type["CustomModel"]: return CustomModel @classmethod - def get_huggingface_model_class(cls) -> type["HuggingfaceCustomModelForCausalLM"]: + def get_huggingface_model_for_causal_lm_class(cls) -> type["HuggingfaceCustomModelForCausalLM"]: from fast_llm.models.custom.huggingface import HuggingfaceCustomModelForCausalLM return HuggingfaceCustomModelForCausalLM diff --git a/fast_llm/models/gpt/config.py b/fast_llm/models/gpt/config.py index 18809419..30f3a4ff 100644 --- a/fast_llm/models/gpt/config.py +++ b/fast_llm/models/gpt/config.py @@ -161,7 +161,7 @@ def get_model_class(cls) -> type["GPTModel"]: return GPTModel @classmethod - def get_huggingface_model_class(cls) -> type["HuggingfaceGPTModelForCausalLM"]: + def get_huggingface_model_for_causal_lm_class(cls) -> type["HuggingfaceGPTModelForCausalLM"]: from fast_llm.models.gpt.huggingface import HuggingfaceGPTModelForCausalLM return HuggingfaceGPTModelForCausalLM diff --git a/fast_llm/models/gpt/huggingface.py b/fast_llm/models/gpt/huggingface.py index 0da4acbb..3f7e43ca 100644 --- a/fast_llm/models/gpt/huggingface.py +++ b/fast_llm/models/gpt/huggingface.py @@ -5,10 +5,11 @@ import torch import transformers.modeling_outputs + from fast_llm.data.data.gpt.data import GPTBatch from fast_llm.engine.distributed.config import PhaseType from fast_llm.engine.inference.config import HuggingfaceModelConfig -from fast_llm.engine.inference.huggingface import HuggingfacePreTrainedModel +from fast_llm.engine.inference.huggingface import HuggingfaceBaseModelForCausalLM from fast_llm.layers.transformer.config import TransformerKwargs from fast_llm.models.gpt.config import GPTModelConfig from fast_llm.models.gpt.model import GPTBaseModel, GPTInferenceRunner @@ -22,7 +23,7 @@ class HuggingfaceGPTModelConfig(HuggingfaceModelConfig): fast_llm_config: GPTModelConfig -class HuggingfaceGPTModelForCausalLM(HuggingfacePreTrainedModel): +class HuggingfaceGPTModelForCausalLM(HuggingfaceBaseModelForCausalLM): config_class = HuggingfaceGPTModelConfig config: HuggingfaceGPTModelConfig runner_class: typing.ClassVar[type[GPTInferenceRunner]] = GPTInferenceRunner @@ -55,21 +56,33 @@ def forward( if output_attentions: raise NotImplementedError() - if output_hidden_states: - raise NotImplementedError() - if attention_mask is not None: - raise NotImplementedError() - if position_ids is not None: - raise NotImplementedError() if inputs_embeds is not None: raise NotImplementedError() if labels is not None: raise NotImplementedError() + # NOTE: We are ignoring position_ids as we reconstruct them from attention_mask via sequence_lenghts. + if attention_mask is not None: + + # First non zero indexes or zero index if the row is all zeros (invalid row) + first_non_zero_indexes = attention_mask.argmax(dim=1) + + # Check if the sequence is left-padded and if the remaining ones are continuous 1-ns + assert (attention_mask.sum(axis=1) == (attention_mask.shape[1] - first_non_zero_indexes)).all() + + sequence_lenghts = [ + torch.tensor( + [attention_mask.shape[1]] if el == 0 else [el, attention_mask.shape[1] - el], dtype=torch.int64 + ) + for el in first_non_zero_indexes.tolist() + ] + else: + sequence_lenghts = None + # Iteration serves as a random seed, using random module because it's not seeded by Fast LLM iteration = random.randint(0, 2**32) batch = self.fast_llm_base_model.preprocess( - GPTBatch(input_ids), phase=PhaseType.inference, iteration=iteration + GPTBatch(input_ids, sequence_lengths=sequence_lenghts), phase=PhaseType.inference, iteration=iteration ) ((input_, kwargs),) = batch @@ -82,23 +95,39 @@ def forward( # The transformers will save the present keys and values to this list. kwargs[TransformerKwargs.presents] = [] + if output_hidden_states: + kwargs["output_hidden_states"] = True + else: + kwargs["output_hidden_states"] = False + self._inference_runner.forward(input_, kwargs, iteration=iteration) # TODO: Make a proper way of returning the model output. logits = kwargs["logits"] + # TODO: convert hidden state form dict to list to be the same as with HFs + hidden_states = None + if output_hidden_states: + hidden_states = kwargs["hidden_states"] + if not return_dict: - outputs = (logits,) + # TODO: check hidden state go before past in the tuple + if output_hidden_states: + outputs = (logits, hidden_states) + else: + outputs = (logits,) + if use_cache: outputs += (kwargs[TransformerKwargs.presents],) return outputs return transformers.modeling_outputs.CausalLMOutputWithPast( logits=logits, + hidden_states=hidden_states, past_key_values=kwargs[TransformerKwargs.presents], ) - def prepare_inputs_for_generation( - self, input_ids, past_key_values=None, attention_mask=None, inputs_embeds=None, **kwargs - ): - raise NotImplementedError() + # def prepare_inputs_for_generation( + # self, input_ids, past_key_values=None, attention_mask=None, inputs_embeds=None, **kwargs + # ): + # raise NotImplementedError() diff --git a/fast_llm/tools/cli.py b/fast_llm/tools/cli.py index 0cc02f42..b36a294d 100644 --- a/fast_llm/tools/cli.py +++ b/fast_llm/tools/cli.py @@ -15,11 +15,13 @@ def fast_llm(args=None): # (Pre-)configure logging configure_logging() parser = argparse.ArgumentParser(add_help=False) - parser.add_argument("subcommand", choices=["train", "convert", "prepare"]) + parser.add_argument("subcommand", choices=["train", "evaluate", "convert", "prepare"]) parsed, unparsed = parser.parse_known_args(args) try: if parsed.subcommand == "train": from fast_llm.tools.train import CliTrainingConfig as Runnable + elif parsed.subcommand == "evaluate": + from fast_llm.tools.evaluate import CliEvaluationConfig as Runnable elif parsed.subcommand == "convert": from fast_llm.tools.convert import ConversionConfig as Runnable elif parsed.subcommand == "prepare": diff --git a/fast_llm/tools/evaluate.py b/fast_llm/tools/evaluate.py new file mode 100644 index 00000000..26a9aa9c --- /dev/null +++ b/fast_llm/tools/evaluate.py @@ -0,0 +1,25 @@ +import argparse + +from fast_llm.engine.config_utils.runnable import RunnableConfig +from fast_llm.models.auto import trainer_registry + + +class CliEvaluationConfig(RunnableConfig): + @classmethod + def _get_parser(cls): + parser = super()._get_parser() + parser.add_argument( + "model_type", + choices=trainer_registry.keys(), + help="The Fast-LLM model type to use. Must be defined in the trainer registry in `fast_llm.models.auto`.", + ) + return parser + + @classmethod + def _from_parsed_args(cls, parsed: argparse.Namespace, unparsed: list[str]): + unparsed += ['training.train_iters=0'] + return trainer_registry[parsed.model_type]._from_parsed_args(parsed, unparsed) + + +if __name__ == "__main__": + CliEvaluationConfig.parse_and_run() diff --git a/test.py b/test.py new file mode 100644 index 00000000..e02fb32d --- /dev/null +++ b/test.py @@ -0,0 +1,216 @@ +import torch + +from pathlib import Path +import shutil +import cloudpickle + +from transformers import AutoTokenizer, AutoModelForCausalLM +from transformers.modeling_outputs import CausalLMOutputWithPast +from fast_llm.models.gpt.huggingface import HuggingfaceGPTModelForCausalLM +from fast_llm.engine.checkpoint.config import CheckpointLoadConfig +from fast_llm.models.gpt.config import LlamaGPTHuggingfaceCheckpointFormat + +import torch + + +def generate(model, input_ids, attention_mask, max_new_tokens, tensors_save_path: Path | None = None): + + if tensors_save_path is not None: + if tensors_save_path.is_dir(): + shutil.rmtree(tensors_save_path, ignore_errors=True) + logits_save_path = tensors_save_path / "logits" + hs_save_path = tensors_save_path / "hidden_states" + logits_save_path.mkdir(exist_ok=True, parents=True) + hs_save_path.mkdir(exist_ok=True, parents=True) + + # assume attention mask is left padded with zeroes if any + mask_step = torch.ones((attention_mask.shape[0], 1), dtype=torch.int64).to(attention_mask.device) + for i in range(max_new_tokens): + output: CausalLMOutputWithPast = model.forward( + input_ids=input_ids, + attention_mask=attention_mask, + position_ids=None, + past_key_values=None, + inputs_embeds=None, + labels=None, + use_cache=False, + output_attentions=False, + output_hidden_states=True, + return_dict=True, + ) + current_ids = output.logits[:, -1, :].argmax(dim=1, keepdim=True) + input_ids = torch.cat([input_ids, current_ids], dim=1) + attention_mask = torch.cat([attention_mask, mask_step], dim=1) + + if tensors_save_path is not None: + logits_file = logits_save_path / f"tensor{i}.pt" + torch.save(output.logits, logits_file) + + hidden_states_file = hs_save_path / f"data{i}.pickle" + with hidden_states_file.open("wb") as f: + cloudpickle.dump(output.hidden_states, f) + + return input_ids + + +def diff_flm_hf(tokenizer, flm_tokens, hf_tokens): + print("+++++++++++++++fast_llm:+++++++++++++++++++++++++++++++++++++++++++++++++") + fllm_str = tokenizer.decode(flm_tokens) + print(fllm_str) + print("---------------hugging_face:---------------------------------------------") + hf_str = tokenizer.decode(hf_tokens) + print(hf_str) + print( + f"==============================({"Same" if fllm_str==hf_str else "Different"})=====================================" + ) + + +def run_test_fast_llm( + attn_implementation, + torch_dtype, + is_batch_size2, + reverse_samples, + tensors_save_path, + num_new_tokens, +): + checkpoint = "/mnt/checkpoints/pretrained_models/SmolLM2-135M-Instruct" + + device = "cuda" # for GPU usage or "cpu" for CPU usage + tokenizer = AutoTokenizer.from_pretrained(checkpoint) + messages = [ + # {"role": "user", "content": "What is gravity?"}, + {"role": "user", "content": "What is gravity?"}, + {"role": "user", "content": "Who is the president of EU?"}, + # {"role": "user", "content": "Who is the president of EU?"}, + ] + if reverse_samples: + messages = list(reversed(messages)) + if not is_batch_size2: + messages = messages[0:1] + + input_text = [tokenizer.apply_chat_template([el], tokenize=False) for el in messages] + + tokenizer.padding_side = "left" + inputs = tokenizer(input_text, padding="longest", return_tensors="pt").to(device) + + fm_kwards = {} + if attn_implementation is not None and attn_implementation == "flash_attention_2": + fm_kwards["attn_implementation"] = "flash_attention_2" + else: + fm_kwards["attn_implementation"] = "fuse" + if torch_dtype is not None and torch_dtype == torch.bfloat16: + fm_kwards["torch_dtype"] = "bf16" + + print("fm_kwards", fm_kwards) + + model_fm = HuggingfaceGPTModelForCausalLM.from_pretrained( + CheckpointLoadConfig( + path=checkpoint, + format=LlamaGPTHuggingfaceCheckpointFormat, + ), + **fm_kwards, + ) + + # outputs_fm = model_fm.generate(**inputs, max_new_tokens=50, use_cache=False) + outputs_fm = generate( + model_fm, **inputs, max_new_tokens=num_new_tokens, tensors_save_path=tensors_save_path / "fast_llm" + ) + + print(tokenizer.decode(outputs_fm[0][inputs["input_ids"].shape[1] :])) + if len(outputs_fm) > 1: + print("--------------------------------------------------------------") + print(tokenizer.decode(outputs_fm[1][inputs["input_ids"].shape[1] :])) + + +def run_test( + attn_implementation, + torch_dtype, + is_batch_size2, + reverse_samples, + tensors_save_path, + num_new_tokens, +): + checkpoint = "/mnt/checkpoints/pretrained_models/SmolLM2-135M-Instruct" + + device = "cuda" # for GPU usage or "cpu" for CPU usage + tokenizer = AutoTokenizer.from_pretrained(checkpoint) + # for multiple GPUs install accelerate and do `model = AutoModelForCausalLM.from_pretrained(checkpoint, device_map="auto")` + hf_kwards = {} + if attn_implementation is not None and attn_implementation == "flash_attention_2": + hf_kwards["attn_implementation"] = "flash_attention_2" + if torch_dtype is not None: + hf_kwards["torch_dtype"] = torch_dtype + + print("hf_kwards", hf_kwards) + model_hf = AutoModelForCausalLM.from_pretrained(checkpoint, **hf_kwards).to(device) + + messages = [ + # {"role": "user", "content": "What is gravity?"}, + {"role": "user", "content": "Who is the president of EU?"}, + {"role": "user", "content": "Who is the president of EU?"}, + ] + if reverse_samples: + messages = list(reversed(messages)) + if not is_batch_size2: + messages = messages[0:1] + + input_text = [tokenizer.apply_chat_template([el], tokenize=False) for el in messages] + + tokenizer.padding_side = "left" + inputs = tokenizer(input_text, padding="longest", return_tensors="pt").to(device) + + # outputs_hf = model_hf.generate(**inputs, max_new_tokens=50, use_cache=False) + outputs_hf = generate( + model_hf, **inputs, max_new_tokens=num_new_tokens, tensors_save_path=tensors_save_path / "hf" + ) + # print(tokenizer.decode(outputs_hf[0])) + + fm_kwards = {} + if attn_implementation is not None and attn_implementation == "flash_attention_2": + fm_kwards["attn_implementation"] = "flash_attention_2" + else: + fm_kwards["attn_implementation"] = "fuse" + if torch_dtype is not None and torch_dtype == torch.bfloat16: + fm_kwards["torch_dtype"] = "bf16" + + print("fm_kwards", fm_kwards) + + model_fm = HuggingfaceGPTModelForCausalLM.from_pretrained( + CheckpointLoadConfig( + path=checkpoint, + format=LlamaGPTHuggingfaceCheckpointFormat, + ), + **fm_kwards, + ) + + # outputs_fm = model_fm.generate(**inputs, max_new_tokens=50, use_cache=False) + outputs_fm = generate( + model_fm, **inputs, max_new_tokens=num_new_tokens, tensors_save_path=tensors_save_path / "fast_llm" + ) + + diff_flm_hf( + tokenizer, outputs_fm[0][inputs["input_ids"].shape[1] :], outputs_hf[0][inputs["input_ids"].shape[1] :] + ) + if len(outputs_fm) > 1: + diff_flm_hf( + tokenizer, outputs_fm[1][inputs["input_ids"].shape[1] :], outputs_hf[1][inputs["input_ids"].shape[1] :] + ) + + +def main(): + run_test_fast_llm( + # run_test( + attn_implementation="flash_attention_2", + # attn_implementation=None, + torch_dtype=torch.bfloat16, + # torch_dtype=None, + is_batch_size2=True, + reverse_samples=False, + # tensors_save_path=Path("/mnt/datasets/tests/denis/tensors_bf16_flash_attention_2_batch_size2/"), + tensors_save_path=Path("/mnt/datasets/tests/denis/tmp/"), + num_new_tokens=100, + ) + + +if __name__ == "__main__": + main() diff --git a/test_distributed.py b/test_distributed.py new file mode 100644 index 00000000..35b84677 --- /dev/null +++ b/test_distributed.py @@ -0,0 +1,55 @@ +# distributed_example.py +import os +import torch +import torch.distributed as dist + +from transformers import AutoTokenizer +from transformers.modeling_outputs import CausalLMOutputWithPast + +from fast_llm.engine.checkpoint.config import CheckpointLoadConfig +from fast_llm.engine.distributed.config import DistributedConfig +from fast_llm.engine.distributed.distributed import Distributed +from fast_llm.models.gpt.config import LlamaGPTHuggingfaceCheckpointFormat, Qwen2GPTHuggingfaceCheckpointFormat +from fast_llm.models.gpt.huggingface import HuggingfaceGPTModelForCausalLM + + +def run( + attn_implementation="flash_attention_2", + torch_dtype=torch.bfloat16, + checkpoint="/mnt/checkpoints/pretrained_models/Qwen2-1.5B-Instruct/", +): + tokenizer = AutoTokenizer.from_pretrained(checkpoint) + + updates = { + ("base_model", "transformer", "use_flash_attention"): attn_implementation is not None + and attn_implementation == "flash_attention_2", + ("distributed", "tensor_parallel"): 2, + ("distributed", "pipeline_parallel"): 1, + ("distributed", "sequence_data_parallel"): 1, + } + + if torch_dtype is not None and torch_dtype == torch.bfloat16: + updates[("distributed", "training_dtype")] = "bf16" + + print("aupdatesgs", updates) + + model_fm = HuggingfaceGPTModelForCausalLM.from_pretrained( + CheckpointLoadConfig( + path=checkpoint, + format=Qwen2GPTHuggingfaceCheckpointFormat, + ), + updates, + ) + + input_ids = torch.randint(1, tokenizer.vocab_size, (10, 100), dtype=torch.int64, generator=torch.Generator().manual_seed(42)) + + res = model_fm.forward(input_ids, use_cache=False) + print(res.logits.shape, res.logits.sum().item()) + + +def main(): + run() + + +if __name__ == "__main__": + main() diff --git a/tests/test_checkpoint.py b/tests/test_checkpoint.py index 8ae30ee4..3ce38c38 100644 --- a/tests/test_checkpoint.py +++ b/tests/test_checkpoint.py @@ -32,7 +32,7 @@ from tests.compare_tensor_logs import CompareConfig, compare_logged_tensor TEST_MODEL_CONFIG_CLS = model_registry[TEST_MODEL_TYPE] -TEST_MODEL_HF_CLS = TEST_MODEL_CONFIG_CLS.get_huggingface_model_class() +TEST_MODEL_HF_CLS = TEST_MODEL_CONFIG_CLS.get_huggingface_model_for_causal_lm_class() TEST_MODEL_CLS = TEST_MODEL_CONFIG_CLS.get_model_class() TEST_BASE_MODEL_CONFIG_CLS = TEST_MODEL_CONFIG_CLS.get_base_model_config_class() TEST_ARCHITECTURE_CONFIG_CLS = TEST_BASE_MODEL_CONFIG_CLS.architecture_class