Dataset
我用最简单的 PushT 数据集为例(2D 视频,用 robot 推东西到目标形状/位置),
Batch Dataset
打印下 dataset 基本信息:
dataset = LeRobotDataset("lerobot/pusht")
print(dataset)
# output
# LeRobotDataset({
# Repository ID: 'lerobot/pusht',
# Number of selected episodes: '206',
# Number of selected samples: '25650',
# Features: '['observation.image', 'observation.state', 'action', 'episode_index', 'frame_index', 'timestamp', 'next.reward', 'next.done', 'next.success', 'index', 'task_index']',
# })',
里面的几个关键变量:
- episode: 视频片段,在这里是一个任务属于一个片段
- observation
- image: video frame, (H,W,C), 96x96x3,这里应该是 robot 视角
- state: (x,y) 坐标
- action: (2,) 也是 x,y 坐标
- reward: (1,) 奖励标量
LeRobotDataset 直接把数据 load 到内存里。
Visualization
python src/lerobot/scripts/lerobot_dataset_viz.py \
--repo-id lerobot/pusht \
--episode-index 1 \
--display-compressed-images true
效果:

更多的用法以后再解锁一下。
Streaming Dataset
dataset = StreamingLeRobotDataset("lerobot/pusht")
for sample in dataset:
print(sample['action'])
Streaming Dataset 用的是 PyTorch 里的 IterableDataset,不需要把数据全部 load 到内存里。主要看下 __iter__(self) 方法,
def __iter__(self) -> Iterator[dict[str, torch.Tensor]]:
# ...
# buffer 内打散
buffer_indices_generator = self._iter_random_indices(rng, self.buffer_size)
# 拆成多个 shard, backtrack_dataset 是和 delta_timestamp 相关,需要拿到前后的若干条数据
idx_to_backtrack_dataset = {
idx: self._make_backtrackable_dataset(safe_shard(self.hf_dataset, idx, self.num_shards))
for idx in range(self.num_shards)
}
frames_buffer = []
while available_shards := list(idx_to_backtrack_dataset.keys()):
# 随便选一个 shard,拿到对应的 backtrack_dataset
shard_key = next(self._infinite_generator_over_elements(rng, available_shards))
backtrack_dataset = idx_to_backtrack_dataset[shard_key] # selects which shard to iterate on
try:
# make_frame 有可能会产生多个 frame,和 delta_timestamp 有关,下面再详细说说
for frame in self.make_frame(backtrack_dataset):
# 蓄水池采样,如果满了就丢出去第 i 个,把当前的 frame 补到 i-th 里
if len(frames_buffer) == self.buffer_size:
i = next(buffer_indices_generator) # samples a element from the buffer
yield frames_buffer[i]
frames_buffer[i] = frame
else:
frames_buffer.append(frame)
break # random shard sampled, switch shard
except (
RuntimeError,
StopIteration,
): # NOTE: StopIteration inside a generator throws a RuntimeError since python 3.7
del idx_to_backtrack_dataset[shard_key] # Remove exhausted shard, onto another shard
make_frame
这里主要考虑有 delta_timestamp 的情况, 比如 [-1, -0.5, -0.20, 0] 表示过去 1 秒、0.5 秒、0.2 秒和当前帧。
在处理过程里,会用 delta ts * fps ,比如 30fps 视频,-0.20*30=-6,那就是要看下前面第 6 帧的数据,算好这些 delta frame 的 indices 之后,就去组装真实的结果,组装代码在 _get_delta_frames,这里为了复用 frame 的数据,backtrack_dataset 里用了这样的结构:
self._source: Iterator[T] = iter(iterable)
self._back_buf: deque[T] = deque(maxlen=history)
self._ahead_buf: deque[T] = deque(maxlen=lookahead) if lookahead > 0 else deque()
self._cursor: int = 0
- 往前看(negative delta):正常遍历的时候,会把遍历过的 frame 塞到 _back_buf 里,往前看的时候直接去取就可以;
- 往后看(positive delta):要继续获取 _source,放到 _ahead_buf,但是 cursor 不变;
这里用了 torch.cdist 来计算最后在视频中真正 load 的 frame。
for frame, pts in zip(frames_batch.data, frames_batch.pts_seconds, strict=True):
loaded_frames.append(frame)
loaded_ts.append(pts.item())
if log_loaded_timestamps:
logging.info(f"Frame loaded at timestamp={pts:.4f}")
query_ts = torch.tensor(timestamps)
loaded_ts = torch.tensor(loaded_ts)
# compute distances between each query timestamp and loaded timestamps
dist = torch.cdist(query_ts[:, None], loaded_ts[:, None], p=1)
min_, argmin_ = dist.min(1)
Training
training policy 的执行示例:
python src/lerobot/scripts/lerobot_train.py \
--dataset.repo_id lerobot/pusht \
--policy.type act \
--policy.push_to_hub False \
--steps 50 \
--num_workers 0 \
--policy.device cpu
在 lerobot_train.py 里,最主要的操作:
# preprocessor 和 postprocessor
preprocessor, postprocessor = make_pre_post_processors(
policy_cfg=cfg.policy,
pretrained_path=cfg.policy.pretrained_path,
**processor_kwargs,
**postprocessor_kwargs,
)
logging.info(f"preprocessor: {preprocessor}, postprocessor: {postprocessor}")
# ...
# accelerator 根据加速器(如 deepspeed)等的封装,初始化几个关键对象
policy, optimizer, dataloader, lr_scheduler = accelerator.prepare(
policy, optimizer, dataloader, lr_scheduler
)
print(f"policy: {policy}, optimizer: {optimizer}, dataloader: {dataloader}, lr_scheduler: {lr_scheduler}")
# ...
for _ in range(step, cfg.steps):
start_time = time.perf_counter()
# dl_iter 是上面讲到的 dataloader
batch = next(dl_iter)
batch = preprocessor(batch)
train_tracker.dataloading_s = time.perf_counter() - start_time
train_tracker, output_dict = update_policy(
train_tracker,
policy,
batch,
optimizer,
cfg.optimizer.grad_clip_norm,
accelerator=accelerator,
lr_scheduler=lr_scheduler,
rabc_weights_provider=rabc_weights,
)
PreProcessor 和 PostProcessor
每个 model 允许有自己的配置,比如 ACT 在 processor_act.py 中根据自己的 model 输入做了定义:
input_steps = [
RenameObservationsProcessorStep(rename_map={}),
AddBatchDimensionProcessorStep(),
DeviceProcessorStep(device=config.device),
NormalizerProcessorStep(
features={**config.input_features, **config.output_features},
norm_map=config.normalization_mapping,
stats=dataset_stats,
device=config.device,
),
]
output_steps = [
UnnormalizerProcessorStep(
features=config.output_features, norm_map=config.normalization_mapping, stats=dataset_stats
),
DeviceProcessorStep(device="cpu"),
]
return (
PolicyProcessorPipeline[dict[str, Any], dict[str, Any]](
steps=input_steps,
name=POLICY_PREPROCESSOR_DEFAULT_NAME,
),
PolicyProcessorPipeline[PolicyAction, PolicyAction](
steps=output_steps,
name=POLICY_POSTPROCESSOR_DEFAULT_NAME,
to_transition=policy_action_to_transition,
to_output=transition_to_policy_action,
),
)
update_policy
这里和直接用 torch 跑一个训练的流程差不多。policy 本身也继承的是 nn.Module。一个 policy 里包含了 train 也包含了 eval,包括 action 输出的部分。以 ACT 为例:
关于 ACT(Aciton Chunk Transformer) 的模型结构可以参考:https://arxiv.org/abs/2304.13705,整体的亮点包括:
- imitation learning 不用单步预测,用多步预测,利用 transformer 的模型结构保证了平滑。
- 训练使用 VAE 对 observation/state 进行 encoder,但是在推理时使用 prior mean(z=0)。这里因为使用 VAE 后,会把 input 压缩到正态分布的 latent space,然后进行采样。推理时使用正态分布的 mean/std,提升了泛化性。
Training 时 ACT 的 input/output:
def forward(self, batch: dict[str, Tensor]) -> tuple[Tensor, tuple[Tensor, Tensor] | tuple[None, None]]:
"""A forward pass through the Action Chunking Transformer (with optional VAE encoder).
`batch` should have the following structure:
{
[robot_state_feature] (optional): (B, state_dim) batch of robot states.
[image_features]: (B, n_cameras, C, H, W) batch of images.
AND/OR
[env_state_feature]: (B, env_dim) batch of environment states.
[action_feature] (optional, only if training with VAE): (B, chunk_size, action dim) batch of actions.
}
Returns:
(B, chunk_size, action_dim) batch of action sequences
Tuple containing the latent PDF's parameters (mean, log(σ²)) both as (B, L) tensors where L is the
latent dimension.
"""
这里 ACT 的 transformer decoder,用的是全 0 矩阵,通过 position encoding 后的 emb 来进行 cross-attention。
decoder_in = torch.zeros(
(self.config.chunk_size, batch_size, self.config.dim_model),
dtype=encoder_in_pos_embed.dtype,
device=encoder_in_pos_embed.device,
)
decoder_out = self.decoder(
decoder_in,
encoder_out,
encoder_pos_embed=encoder_in_pos_embed,
decoder_pos_embed=self.decoder_pos_embed.weight.unsqueeze(1),
)
#...
# 最后用 action_head 把 model output 转成有限维度的 action
# Final action regression head on the output of the transformer's decoder.
self.action_head = nn.Linear(config.dim_model, self.config.action_feature.shape[0])
看到的其他内容
- AE(AutoEncoder), 高维转低维,
- VAE(Variational AutoEncoder), 加入分布,在低维空间的 emb 更加规则化
- VQ-VAE(Vector Quantized VAE), 用有限集的 codebook({e₁, e₂, ..., e_K})
- KL 散度, 描述两个概率分布差多远