一个通用的 LLM 的评测框架解析

December 28, 2025

主要看了 opencompass 和 lighteval 两个开源框架,整体的逻辑都是比较类似的。主要逻辑 是围绕「模型」和「数据集」来进行的。

LightEval

一个典型的命令:

lighteval accelerate \
    "model_name=openai-community/gpt2,batch_size=1" \
    hellaswag

1. 先选择评测执行的 backend

lighteval-backend

2. 以 accelerate 为例,构建一个评测完整的 Pipeline,

pipeline = Pipeline(
    tasks=tasks,
    pipeline_parameters=pipeline_params,
    evaluation_tracker=evaluation_tracker,
    model_config=model_config,
)

包含:

  • Tasks:提供 inference 需要的信息,如 dataset,model,评测规则等;
  • EvaluationTracker:tracker 信息,和外部 hub 打通,结果格式化等;

3. 数据集构建

在 task 中,会指定相关数据集,以 hellaswag task 为例,hf_xx 一系列的属性即指定了数据集和数据集的划分。 另外,Metrics.exact_match 指定了判断 predictions 和 target 的比较规则,这里是要完全匹配。

hellaswag = LightevalTaskConfig(
    name="hellaswag",
    prompt_function=hellaswag_prompt,
    hf_repo="Rowan/hellaswag",
    hf_subset="default",
    hf_avail_splits=["train", "test", "validation"],
    evaluation_splits=["validation"],
    few_shots_split=None,
    few_shots_select=None,
    generation_size=1,
    metrics=[
        Metrics.exact_match,
    ],
    stop_sequence=["\n"],
    version=0,
)

对应到 Pipeline 中,取出 dataset 里 validation 部分,根据 sampling_method 进行划分, sampling_method(how we query/sample information from the model) 分为三种:

  • GENERATIVE: Sample text from the distribution
  • LOGPROBS: Sample/retrieve probability scores for choices
  • PERPLEXITY: Sample/retrieve probability scores across sequences

4. 模型评估

SamplingMethod.GENERATIVE 为例,

case SamplingMethod.GENERATIVE:
    model_outputs = self.model.greedy_until(docs)
    outputs[sampling_method] = model_outputs

里面也分为几步:

  1. 切分 dataset 成若干个 split(粒度不会太粗,为了观测后续 inference 的一个进度)
  2. 组装 torch 的 DataLoader
  3. 数据预处理,根据不同 Model 的 tokenizer 进行 prompt 预处理
  4. 模型推理,返回 responses

5. 计算 metrics

以上面的 hellaswag 为例,使用的是 exact_match,关键方法:

@abstractmethod
def compute(self, doc: Doc, model_response: ModelResponse, **kwargs):
    raise NotImplementedError
  • doc: 包含了 gold/target 的原文;
  • model_response:prediction 推理的模型输出;

经过 normalization 后进行对比,不同的 Metrics 会有不同的对比方法,收集结果后通过 aggregation_function 将结果进行聚合。exact_match 的具体实现:

def compute(self, doc: Doc, model_response: ModelResponse, **kwargs) -> float:
    """Computes the metric over a list of golds and predictions for one single sample.

    Args:
        doc (Doc): The document containing gold references.
        model_response (ModelResponse): The model's response containing predictions.
        **kwargs: Additional keyword arguments.

    Returns:
        float: Aggregated score over the current sample's items.
    """
    results = []
    # We might need to flatten golds if they are a list of lists
    golds = doc.get_golds()
    for gold in golds:
        for pred in model_response.final_text:
            results.append(self.compute_one_item(gold=gold, pred=pred))
    return self.aggregation_function(results)

OpenCompass

对于 OpenCompass 框架也是类似,用户需要配置「模型」和「数据集」,以这行命令为例,

opencompass --models hf_qwen2_0_5b_instruct_cpu --datasets demo_gsm8k_chat_gen
  • model:使用 qwen 的一个小模型
  • dataset: 使用 gsm8k

1. 切分数据集

partitioner = PARTITIONERS.build(cfg.infer.partitioner)
tasks = partitioner(cfg)

Partitioner 是可以让用户指定,默认是根据 max_number_workers 来进行划分。然后每个 Partitioner 会根据自己的划分结果,进行 task config 的组装,返回一系列 task 配置,例如 NumWorkerPartitioner

for dataset in chunks:
    tasks.append(
        Config({
            'models': [model],
            'datasets': [[dataset]],
            'work_dir': work_dir,
            **add_cfg
        }))

2. 组装 Runner 并执行

runner = RUNNERS.build(cfg.infer.runner)
runner(tasks)

用 Task config 初始化 runner,runner 执行 launch 函数,以 LocalRunner 为例,他这里通过 多进程的方式做了一个并行化(而上面的 lighteval 是没有做的):

pbar = tqdm(total=len(tasks))
lock = Lock()

def submit(task, index):
    task = TASKS.build(dict(cfg=task, type=self.task_cfg['type']))
    num_gpus = task.num_gpus
    assert len(gpus) >= num_gpus

    while True:
        lock.acquire()
        if sum(gpus > 0) >= num_gpus:
            gpu_ids = np.where(gpus)[0][:num_gpus]
            gpus[gpu_ids] -= 1
            lock.release()
            break
        lock.release()
        time.sleep(1)

    res = self._launch(task, gpu_ids, index)
    pbar.update()

    with lock:
        gpus[gpu_ids] += 1

    return res

with ThreadPoolExecutor(
        max_workers=self.max_num_workers) as executor:
    status = executor.map(submit, tasks, range(len(tasks)))

由于是并行,这里对 gpu 资源的记录进行加锁,然后使用 ThreadPoolExecutor 并行提交。

3. Evaluation

OpenCompass 这里的 Evaluation 和 lighteval 不一样的是,

  • OpenCompass 也用 Infer 一样的架构(Runner、Task、..)来做 Eval 的设计,这样有一个好处是, 用户在使用时,可以单独把 Eval 单独跑;
  • lighteval 是把 Eval 集成到 pipeline 里,没法单独跑;

整体流程和 lighteval 差不多,经过 preprocess、socre、save 多个操作。