LeRobot Notes(基本流程)

January 24, 2026

Dataset

我用最简单的 PushT 数据集为例(2D 视频,用 robot 推东西到目标形状/位置),

Batch Dataset

打印下 dataset 基本信息:

dataset = LeRobotDataset("lerobot/pusht")
print(dataset)

# output
# LeRobotDataset({
#     Repository ID: 'lerobot/pusht',
#     Number of selected episodes: '206',
#     Number of selected samples: '25650',
#     Features: '['observation.image', 'observation.state', 'action', 'episode_index', 'frame_index', 'timestamp', 'next.reward', 'next.done', 'next.success', 'index', 'task_index']',
# })',

里面的几个关键变量:

  • episode: 视频片段,在这里是一个任务属于一个片段
  • observation
    • image: video frame, (H,W,C), 96x96x3,这里应该是 robot 视角
    • state: (x,y) 坐标
  • action: (2,) 也是 x,y 坐标
  • reward: (1,) 奖励标量

LeRobotDataset 直接把数据 load 到内存里。

Visualization

python src/lerobot/scripts/lerobot_dataset_viz.py \
    --repo-id lerobot/pusht \
    --episode-index 1 \
    --display-compressed-images true

效果:

rerun

更多的用法以后再解锁一下。

Streaming Dataset

dataset = StreamingLeRobotDataset("lerobot/pusht")
for sample in dataset:
    print(sample['action'])

Streaming Dataset 用的是 PyTorch 里的 IterableDataset,不需要把数据全部 load 到内存里。主要看下 __iter__(self) 方法,

def __iter__(self) -> Iterator[dict[str, torch.Tensor]]:
    # ...

    # buffer 内打散
    buffer_indices_generator = self._iter_random_indices(rng, self.buffer_size)

    # 拆成多个 shard, backtrack_dataset 是和 delta_timestamp 相关,需要拿到前后的若干条数据
    idx_to_backtrack_dataset = {
        idx: self._make_backtrackable_dataset(safe_shard(self.hf_dataset, idx, self.num_shards))
        for idx in range(self.num_shards)
    }

    frames_buffer = []
    while available_shards := list(idx_to_backtrack_dataset.keys()):
        # 随便选一个 shard,拿到对应的 backtrack_dataset
        shard_key = next(self._infinite_generator_over_elements(rng, available_shards))
        backtrack_dataset = idx_to_backtrack_dataset[shard_key]  # selects which shard to iterate on

        try:
            # make_frame 有可能会产生多个 frame,和 delta_timestamp 有关,下面再详细说说
            for frame in self.make_frame(backtrack_dataset):
                # 蓄水池采样,如果满了就丢出去第 i 个,把当前的 frame 补到 i-th 
                if len(frames_buffer) == self.buffer_size:
                    i = next(buffer_indices_generator)  # samples a element from the buffer
                    yield frames_buffer[i]
                    frames_buffer[i] = frame
                else:
                    frames_buffer.append(frame)
                break  # random shard sampled, switch shard
        except (
            RuntimeError,
            StopIteration,
        ):  # NOTE: StopIteration inside a generator throws a RuntimeError since python 3.7
            del idx_to_backtrack_dataset[shard_key]  # Remove exhausted shard, onto another shard

make_frame

这里主要考虑有 delta_timestamp 的情况, 比如 [-1, -0.5, -0.20, 0] 表示过去 1 秒、0.5 秒、0.2 秒和当前帧。

在处理过程里,会用 delta ts * fps ,比如 30fps 视频,-0.20*30=-6,那就是要看下前面第 6 帧的数据,算好这些 delta frame 的 indices 之后,就去组装真实的结果,组装代码在 _get_delta_frames,这里为了复用 frame 的数据,backtrack_dataset 里用了这样的结构:

self._source: Iterator[T] = iter(iterable)
self._back_buf: deque[T] = deque(maxlen=history)
self._ahead_buf: deque[T] = deque(maxlen=lookahead) if lookahead > 0 else deque()
self._cursor: int = 0
  • 往前看(negative delta):正常遍历的时候,会把遍历过的 frame 塞到 _back_buf 里,往前看的时候直接去取就可以;
  • 往后看(positive delta):要继续获取 _source,放到 _ahead_buf,但是 cursor 不变;

这里用了 torch.cdist 来计算最后在视频中真正 load 的 frame。

for frame, pts in zip(frames_batch.data, frames_batch.pts_seconds, strict=True):
    loaded_frames.append(frame)
    loaded_ts.append(pts.item())
    if log_loaded_timestamps:
        logging.info(f"Frame loaded at timestamp={pts:.4f}")

query_ts = torch.tensor(timestamps)
loaded_ts = torch.tensor(loaded_ts)

# compute distances between each query timestamp and loaded timestamps
dist = torch.cdist(query_ts[:, None], loaded_ts[:, None], p=1)
min_, argmin_ = dist.min(1)

Training

training policy 的执行示例:

python src/lerobot/scripts/lerobot_train.py \
  --dataset.repo_id lerobot/pusht \
  --policy.type act \
  --policy.push_to_hub False \
  --steps 50 \
  --num_workers 0 \
  --policy.device cpu

在 lerobot_train.py 里,最主要的操作:


# preprocessor  postprocessor
preprocessor, postprocessor = make_pre_post_processors(
    policy_cfg=cfg.policy,
    pretrained_path=cfg.policy.pretrained_path,
    **processor_kwargs,
    **postprocessor_kwargs,
)
logging.info(f"preprocessor: {preprocessor}, postprocessor: {postprocessor}")

# ...

# accelerator 根据加速器(如 deepspeed)等的封装,初始化几个关键对象
policy, optimizer, dataloader, lr_scheduler = accelerator.prepare(
    policy, optimizer, dataloader, lr_scheduler
)
print(f"policy: {policy}, optimizer: {optimizer}, dataloader: {dataloader}, lr_scheduler: {lr_scheduler}")

# ...

for _ in range(step, cfg.steps):
    start_time = time.perf_counter()
    # dl_iter 是上面讲到的 dataloader
    batch = next(dl_iter)

    batch = preprocessor(batch)
    train_tracker.dataloading_s = time.perf_counter() - start_time

    train_tracker, output_dict = update_policy(
        train_tracker,
        policy,
        batch,
        optimizer,
        cfg.optimizer.grad_clip_norm,
        accelerator=accelerator,
        lr_scheduler=lr_scheduler,
        rabc_weights_provider=rabc_weights,
    )

PreProcessor 和 PostProcessor

每个 model 允许有自己的配置,比如 ACT 在 processor_act.py 中根据自己的 model 输入做了定义:

input_steps = [
    RenameObservationsProcessorStep(rename_map={}),
    AddBatchDimensionProcessorStep(),
    DeviceProcessorStep(device=config.device),
    NormalizerProcessorStep(
        features={**config.input_features, **config.output_features},
        norm_map=config.normalization_mapping,
        stats=dataset_stats,
        device=config.device,
    ),
]
output_steps = [
    UnnormalizerProcessorStep(
        features=config.output_features, norm_map=config.normalization_mapping, stats=dataset_stats
    ),
    DeviceProcessorStep(device="cpu"),
]

return (
    PolicyProcessorPipeline[dict[str, Any], dict[str, Any]](
        steps=input_steps,
        name=POLICY_PREPROCESSOR_DEFAULT_NAME,
    ),
    PolicyProcessorPipeline[PolicyAction, PolicyAction](
        steps=output_steps,
        name=POLICY_POSTPROCESSOR_DEFAULT_NAME,
        to_transition=policy_action_to_transition,
        to_output=transition_to_policy_action,
    ),
)

update_policy

这里和直接用 torch 跑一个训练的流程差不多。policy 本身也继承的是 nn.Module。一个 policy 里包含了 train 也包含了 eval,包括 action 输出的部分。以 ACT 为例:

关于 ACT(Aciton Chunk Transformer) 的模型结构可以参考:https://arxiv.org/abs/2304.13705,整体的亮点包括:

  • imitation learning 不用单步预测,用多步预测,利用 transformer 的模型结构保证了平滑。
  • 训练使用 VAE 对 observation/state 进行 encoder,但是在推理时使用 prior mean(z=0)。这里因为使用 VAE 后,会把 input 压缩到正态分布的 latent space,然后进行采样。推理时使用正态分布的 mean/std,提升了泛化性。

Training 时 ACT 的 input/output:

def forward(self, batch: dict[str, Tensor]) -> tuple[Tensor, tuple[Tensor, Tensor] | tuple[None, None]]:
    """A forward pass through the Action Chunking Transformer (with optional VAE encoder).

    `batch` should have the following structure:
    {
        [robot_state_feature] (optional): (B, state_dim) batch of robot states.

        [image_features]: (B, n_cameras, C, H, W) batch of images.
            AND/OR
        [env_state_feature]: (B, env_dim) batch of environment states.

        [action_feature] (optional, only if training with VAE): (B, chunk_size, action dim) batch of actions.
    }

    Returns:
        (B, chunk_size, action_dim) batch of action sequences
        Tuple containing the latent PDF's parameters (mean, log(σ²)) both as (B, L) tensors where L is the
        latent dimension.
    """

这里 ACT 的 transformer decoder,用的是全 0 矩阵,通过 position encoding 后的 emb 来进行 cross-attention。

decoder_in = torch.zeros(
    (self.config.chunk_size, batch_size, self.config.dim_model),
    dtype=encoder_in_pos_embed.dtype,
    device=encoder_in_pos_embed.device,
)
decoder_out = self.decoder(
    decoder_in,
    encoder_out,
    encoder_pos_embed=encoder_in_pos_embed,
    decoder_pos_embed=self.decoder_pos_embed.weight.unsqueeze(1),
)

#...

# 最后用 action_head  model output 转成有限维度的 action
# Final action regression head on the output of the transformer's decoder.
self.action_head = nn.Linear(config.dim_model, self.config.action_feature.shape[0])

看到的其他内容

  • AE(AutoEncoder), 高维转低维,
  • VAE(Variational AutoEncoder), 加入分布,在低维空间的 emb 更加规则化
  • VQ-VAE(Vector Quantized VAE), 用有限集的 codebook({e₁, e₂, ..., e_K})
  • KL 散度, 描述两个概率分布差多远