|
| 1 | +import numpy as np |
| 2 | +import numpy.core.multiarray as multiarray |
| 3 | +import torch |
| 4 | +import torch.nn as nn |
| 5 | +from huggingface_hub import hf_hub_download |
| 6 | +from torch.serialization import add_safe_globals |
| 7 | + |
| 8 | +from diffusers import DDPMScheduler, UNet1DModel |
| 9 | + |
| 10 | + |
| 11 | +add_safe_globals( |
| 12 | + [ |
| 13 | + multiarray._reconstruct, |
| 14 | + np.ndarray, |
| 15 | + np.dtype, |
| 16 | + np.dtype(np.float32).type, |
| 17 | + np.dtype(np.float64).type, |
| 18 | + np.dtype(np.int32).type, |
| 19 | + np.dtype(np.int64).type, |
| 20 | + type(np.dtype(np.float32)), |
| 21 | + type(np.dtype(np.float64)), |
| 22 | + type(np.dtype(np.int32)), |
| 23 | + type(np.dtype(np.int64)), |
| 24 | + ] |
| 25 | +) |
| 26 | + |
| 27 | +""" |
| 28 | +An example of using HuggingFace's diffusers library for diffusion policy, |
| 29 | +generating smooth movement trajectories. |
| 30 | +
|
| 31 | +This implements a robot control model for pushing a T-shaped block into a target area. |
| 32 | +The model takes in the robot arm position, block position, and block angle, |
| 33 | +then outputs a sequence of 16 (x,y) positions for the robot arm to follow. |
| 34 | +""" |
| 35 | + |
| 36 | + |
| 37 | +class ObservationEncoder(nn.Module): |
| 38 | + """ |
| 39 | + Converts raw robot observations (positions/angles) into a more compact representation |
| 40 | +
|
| 41 | + state_dim (int): Dimension of the input state vector (default: 5) |
| 42 | + [robot_x, robot_y, block_x, block_y, block_angle] |
| 43 | +
|
| 44 | + - Input shape: (batch_size, state_dim) |
| 45 | + - Output shape: (batch_size, 256) |
| 46 | + """ |
| 47 | + |
| 48 | + def __init__(self, state_dim): |
| 49 | + super().__init__() |
| 50 | + self.net = nn.Sequential(nn.Linear(state_dim, 512), nn.ReLU(), nn.Linear(512, 256)) |
| 51 | + |
| 52 | + def forward(self, x): |
| 53 | + return self.net(x) |
| 54 | + |
| 55 | + |
| 56 | +class ObservationProjection(nn.Module): |
| 57 | + """ |
| 58 | + Takes the encoded observation and transforms it into 32 values that represent the current robot/block situation. |
| 59 | + These values are used as additional contextual information during the diffusion model's trajectory generation. |
| 60 | +
|
| 61 | + - Input: 256-dim vector (padded to 512) |
| 62 | + Shape: (batch_size, 256) |
| 63 | + - Output: 32 contextual information values for the diffusion model |
| 64 | + Shape: (batch_size, 32) |
| 65 | + """ |
| 66 | + |
| 67 | + def __init__(self): |
| 68 | + super().__init__() |
| 69 | + self.weight = nn.Parameter(torch.randn(32, 512)) |
| 70 | + self.bias = nn.Parameter(torch.zeros(32)) |
| 71 | + |
| 72 | + def forward(self, x): # pad 256-dim input to 512-dim with zeros |
| 73 | + if x.size(-1) == 256: |
| 74 | + x = torch.cat([x, torch.zeros(*x.shape[:-1], 256, device=x.device)], dim=-1) |
| 75 | + return nn.functional.linear(x, self.weight, self.bias) |
| 76 | + |
| 77 | + |
| 78 | +class DiffusionPolicy: |
| 79 | + """ |
| 80 | + Implements diffusion policy for generating robot arm trajectories. |
| 81 | + Uses diffusion to generate sequences of positions for a robot arm, conditioned on |
| 82 | + the current state of the robot and the block it needs to push. |
| 83 | +
|
| 84 | + The model expects observations in pixel coordinates (0-512 range) and block angle in radians. |
| 85 | + It generates trajectories as sequences of (x,y) coordinates also in the 0-512 range. |
| 86 | + """ |
| 87 | + |
| 88 | + def __init__(self, state_dim=5, device="cpu"): |
| 89 | + self.device = device |
| 90 | + |
| 91 | + # define valid ranges for inputs/outputs |
| 92 | + self.stats = { |
| 93 | + "obs": {"min": torch.zeros(5), "max": torch.tensor([512, 512, 512, 512, 2 * np.pi])}, |
| 94 | + "action": {"min": torch.zeros(2), "max": torch.full((2,), 512)}, |
| 95 | + } |
| 96 | + |
| 97 | + self.obs_encoder = ObservationEncoder(state_dim).to(device) |
| 98 | + self.obs_projection = ObservationProjection().to(device) |
| 99 | + |
| 100 | + # UNet model that performs the denoising process |
| 101 | + # takes in concatenated action (2 channels) and context (32 channels) = 34 channels |
| 102 | + # outputs predicted action (2 channels for x,y coordinates) |
| 103 | + self.model = UNet1DModel( |
| 104 | + sample_size=16, # length of trajectory sequence |
| 105 | + in_channels=34, |
| 106 | + out_channels=2, |
| 107 | + layers_per_block=2, # number of layers per each UNet block |
| 108 | + block_out_channels=(128,), # number of output neurons per layer in each block |
| 109 | + down_block_types=("DownBlock1D",), # reduce the resolution of data |
| 110 | + up_block_types=("UpBlock1D",), # increase the resolution of data |
| 111 | + ).to(device) |
| 112 | + |
| 113 | + # noise scheduler that controls the denoising process |
| 114 | + self.noise_scheduler = DDPMScheduler( |
| 115 | + num_train_timesteps=100, # number of denoising steps |
| 116 | + beta_schedule="squaredcos_cap_v2", # type of noise schedule |
| 117 | + ) |
| 118 | + |
| 119 | + # load pre-trained weights from HuggingFace |
| 120 | + checkpoint = torch.load( |
| 121 | + hf_hub_download("dorsar/diffusion_policy", "push_tblock.pt"), weights_only=True, map_location=device |
| 122 | + ) |
| 123 | + self.model.load_state_dict(checkpoint["model_state_dict"]) |
| 124 | + self.obs_encoder.load_state_dict(checkpoint["encoder_state_dict"]) |
| 125 | + self.obs_projection.load_state_dict(checkpoint["projection_state_dict"]) |
| 126 | + |
| 127 | + # scales data to [-1, 1] range for neural network processing |
| 128 | + def normalize_data(self, data, stats): |
| 129 | + return ((data - stats["min"]) / (stats["max"] - stats["min"])) * 2 - 1 |
| 130 | + |
| 131 | + # converts normalized data back to original range |
| 132 | + def unnormalize_data(self, ndata, stats): |
| 133 | + return ((ndata + 1) / 2) * (stats["max"] - stats["min"]) + stats["min"] |
| 134 | + |
| 135 | + @torch.no_grad() |
| 136 | + def predict(self, observation): |
| 137 | + """ |
| 138 | + Generates a trajectory of robot arm positions given the current state. |
| 139 | +
|
| 140 | + Args: |
| 141 | + observation (torch.Tensor): Current state [robot_x, robot_y, block_x, block_y, block_angle] |
| 142 | + Shape: (batch_size, 5) |
| 143 | +
|
| 144 | + Returns: |
| 145 | + torch.Tensor: Sequence of (x,y) positions for the robot arm to follow |
| 146 | + Shape: (batch_size, 16, 2) where: |
| 147 | + - 16 is the number of steps in the trajectory |
| 148 | + - 2 is the (x,y) coordinates in pixel space (0-512) |
| 149 | +
|
| 150 | + The function first encodes the observation, then uses it to condition a diffusion |
| 151 | + process that gradually denoises random trajectories into smooth, purposeful movements. |
| 152 | + """ |
| 153 | + observation = observation.to(self.device) |
| 154 | + normalized_obs = self.normalize_data(observation, self.stats["obs"]) |
| 155 | + |
| 156 | + # encode the observation into context values for the diffusion model |
| 157 | + cond = self.obs_projection(self.obs_encoder(normalized_obs)) |
| 158 | + # keeps first & second dimension sizes unchanged, and multiplies last dimension by 16 |
| 159 | + cond = cond.view(normalized_obs.shape[0], -1, 1).expand(-1, -1, 16) |
| 160 | + |
| 161 | + # initialize action with noise - random noise that will be refined into a trajectory |
| 162 | + action = torch.randn((observation.shape[0], 2, 16), device=self.device) |
| 163 | + |
| 164 | + # denoise |
| 165 | + # at each step `t`, the current noisy trajectory (`action`) & conditioning info (context) are |
| 166 | + # fed into the model to predict a denoised trajectory, then uses self.noise_scheduler.step to |
| 167 | + # apply this prediction & slightly reduce the noise in `action` more |
| 168 | + |
| 169 | + self.noise_scheduler.set_timesteps(100) |
| 170 | + for t in self.noise_scheduler.timesteps: |
| 171 | + model_output = self.model(torch.cat([action, cond], dim=1), t) |
| 172 | + action = self.noise_scheduler.step(model_output.sample, t, action).prev_sample |
| 173 | + |
| 174 | + action = action.transpose(1, 2) # reshape to [batch, 16, 2] |
| 175 | + action = self.unnormalize_data(action, self.stats["action"]) # scale back to coordinates |
| 176 | + return action |
| 177 | + |
| 178 | + |
| 179 | +if __name__ == "__main__": |
| 180 | + policy = DiffusionPolicy() |
| 181 | + |
| 182 | + # sample of a single observation |
| 183 | + # robot arm starts in center, block is slightly left and up, rotated 90 degrees |
| 184 | + obs = torch.tensor( |
| 185 | + [ |
| 186 | + [ |
| 187 | + 256.0, # robot arm x position (middle of screen) |
| 188 | + 256.0, # robot arm y position (middle of screen) |
| 189 | + 200.0, # block x position |
| 190 | + 300.0, # block y position |
| 191 | + np.pi / 2, # block angle (90 degrees) |
| 192 | + ] |
| 193 | + ] |
| 194 | + ) |
| 195 | + |
| 196 | + action = policy.predict(obs) |
| 197 | + |
| 198 | + print("Action shape:", action.shape) # should be [1, 16, 2] - one trajectory of 16 x,y positions |
| 199 | + print("\nPredicted trajectory:") |
| 200 | + for i, (x, y) in enumerate(action[0]): |
| 201 | + print(f"Step {i:2d}: x={x:6.1f}, y={y:6.1f}") |
0 commit comments