Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Update g1_benchmark_task.py #3

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 54 additions & 35 deletions module/spark_task/spark_task/task/g1_benchmark/g1_benchmark_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,33 +9,36 @@ def __init__(self, **kwargs):
self.velocity = kwargs.get("velocity", 1.0)
self.bound = kwargs.get("bound", np.zeros((3,2)))
self.smooth_weight = kwargs.get("smooth_weight", 1.0)
self.last_direction = np.zeros(3)
self.last_direction = kwargs.get("direction", np.array([-1.0,0.0,0.0]))
self.step_counter = 0
self.keep_direction_step = kwargs.get("keep_direction_step", 1)

def move(self, mode="Brownian"):

def move(self, mode):
if mode == "Brownian":
if self.step_counter % self.keep_direction_step == 0:
direction = np.random.normal(loc=0.0, size=3)
direction = self.velocity * direction / np.linalg.norm(direction)
else:
direction = self.last_direction
self.last_frame = self.frame.copy()
update_step = (1 - self.smooth_weight) * self.last_direction + self.smooth_weight * direction
self.frame[:3,3] += update_step

for dim in range(3):
if self.frame[dim,3] < self.bound[dim][0]:
self.frame[dim,3] = self.last_frame[dim,3] - update_step[dim]
elif self.frame[dim,3] > self.bound[dim][1]:
self.frame[dim,3] = self.last_frame[dim,3] - update_step[dim]

self.last_direction = self.frame[:3,3] - self.last_frame[:3,3]
update_step = (1 - self.smooth_weight) * self.last_direction + self.smooth_weight * direction
self.frame[:3, 3] += update_step

self.last_direction = self.frame[:3, 3] - self.last_frame[:3, 3]

elif mode == "Velocity":
update_step = self.velocity * self.last_direction
self.frame[:3, 3] += update_step

# Enforce bounds
for dim in range(3):
if self.frame[dim, 3] < self.bound[dim][0]:
self.frame[dim, 3] = self.bound[dim][0]
elif self.frame[dim, 3] > self.bound[dim][1]:
self.frame[dim, 3] = self.bound[dim][1]

self.step_counter += 1




class G1BenchmarkTask(BaseTask):
def __init__(self, robot_cfg, robot_kinematics, agent, **kwargs):
Expand All @@ -44,6 +47,16 @@ def __init__(self, robot_cfg, robot_kinematics, agent, **kwargs):
self.task_name = kwargs.get("task_name", "G1BenchmarkTask")
self.max_episode_length = kwargs.get("max_episode_length", -1)
self.num_obstacle_task = kwargs.get("num_obstacle", 3) # todo online update
self.mode = kwargs.get("mode", "Brownian") #kwargs.get("mode", "Brownian")
self.obstacle_init = kwargs.get("obstacle_init", np.array([0.2,0.0,1.0]))
self.obstacle_velocity = kwargs.get("obstacle_velocity", 0.001)
self.obstacle_direction = kwargs.get("obstacle_direction", np.array([-1.0,0.0,0.0]))
self.goal_left_init = kwargs.get("goal_left_init", np.array([0.25, 0.25, 0.1]))
self.goal_left_velocity = kwargs.get("goal_left_velocity", 0.001)
self.goal_left_direction = kwargs.get("goal_left_direction", np.array([1.0,0.0,0.0]))
self.goal_right_init = kwargs.get("goal_right_init", np.array([0.25, -0.25, 0.1]))
self.goal_right_velocity = kwargs.get("goal_right_velocity", 0.001)
self.goal_right_direction = kwargs.get("goal_right_direction", np.array([1.0, 0.0, 0.0]))

self.reset()

Expand All @@ -57,42 +70,48 @@ def reset(self):
self.obstacle_task_geom = []

for _ in range(self.num_obstacle_task):
obstacle = TaskObject3D(velocity=0.01,
obstacle = TaskObject3D(velocity=self.obstacle_velocity,
direction=self.obstacle_direction,
keep_direction_step = 500,
bound=np.array([[-0.3, 0.5],
[-0.3, 0.5],
[0.8, 1.0]]))
obstacle.frame[:3,3] = np.array([np.random.uniform(low, high) for low, high in obstacle.bound])
if self.mode == 'Velocity':
obstacle.frame[:3,3] = self.obstacle_init # need to deal with multiple init later
else:
obstacle.frame[:3,3] = np.array([np.random.uniform(low, high) for low, high in obstacle.bound])

self.obstacle_task.append(obstacle)
self.obstacle_task_geom.append(Geometry(type="sphere", radius=0.05, color=VizColor.obstacle_task))

# --------------------------------- init goal -------------------------------- #
self.goal_teleop = {}
self.goal_teleop["left"] = np.array([[1., 0., 0., 0.25],
[0., 1., 0., 0.25],
[0., 0., 1., 0.1],
[0., 0., 0., 1.]])
self.goal_teleop["right"] = np.array([[1., 0., 0., 0.25],
[0., 1., 0., -0.25],
[0., 0., 1., 0.1],
[0., 0., 0., 1.]])

self.robot_goal_left = TaskObject3D(velocity=0.001,

self.robot_goal_left = TaskObject3D(velocity=self.goal_left_velocity,
direction=self.goal_left_direction,
keep_direction_step = 10,
bound=np.array([[0.1, 0.4],
[0.1, 0.4],
[0.0, 0.2]]),
smooth_weight = 0.8)
self.robot_goal_left.frame[:3,3] = np.array([np.random.uniform(low, high) for low, high in self.robot_goal_left.bound])

self.robot_goal_right = TaskObject3D(velocity=0.001,

self.robot_goal_right = TaskObject3D(velocity=self.goal_right_velocity,
direction=self.goal_right_direction,
keep_direction_step = 10,
bound=np.array([[0.1, 0.4],
[-0.4, -0.1],
[0.0, 0.2]]),
smooth_weight = 0.8)
self.robot_goal_right.frame[:3,3] = np.array([np.random.uniform(low, high) for low, high in self.robot_goal_right.bound])

if self.mode == 'Velocity':
self.robot_goal_left.frame[:3,3] = self.goal_left_init
self.robot_goal_right.frame[:3,3] = self.goal_right_init
else:
self.robot_goal_left.frame[:3,3] = np.array([np.random.uniform(low, high) for low, high in self.robot_goal_left.bound])
self.robot_goal_right.frame[:3,3] = np.array([np.random.uniform(low, high) for low, high in self.robot_goal_right.bound])

self.goal_teleop = {}
self.goal_teleop["left"] = self.robot_goal_left.frame
self.goal_teleop["right"] = self.robot_goal_right.frame


# --------------------------------- init info -------------------------------- #
Expand All @@ -105,12 +124,12 @@ def reset(self):
self.info["robot_state"] = {}

def update_robot_goal(self):
self.robot_goal_left.move()
self.robot_goal_right.move()
self.robot_goal_left.move(self.mode)
self.robot_goal_right.move(self.mode)

def update_obstacle_task(self):
for obstacle in self.obstacle_task:
obstacle.move()
obstacle.move(self.mode)

def step(self, feedback):

Expand Down