How to use curobo to use the voxel map generated by nvblox as a voxel collision reference in MPC cases? #451
Unanswered
KKXiaoKang
asked this question in
Software Q&A
Replies: 3 comments 3 replies
-
Do you have a minimal example of how you queried the voxel map? |
Beta Was this translation helpful? Give feedback.
1 reply
-
|
Beta Was this translation helpful? Give feedback.
2 replies
-
from os import path
import time
import rospy
import math
from curobo.geom.sdf.world import CollisionCheckerType
from curobo.geom.types import Cuboid
from curobo.geom.types import Cylinder
from curobo.geom.types import Mesh
from curobo.geom.types import Sphere
from curobo.geom.types import VoxelGrid as CuVoxelGrid
from curobo.geom.types import WorldConfig
from curobo.types.base import TensorDeviceType
from curobo.types.math import Pose
from curobo.types.state import JointState as CuJointState # Prevent naming conflict with ROS built-in JointState
from curobo.util.logger import setup_curobo_logger
from curobo.util_file import get_robot_configs_path
from curobo.util_file import join_path
from curobo.util_file import load_yaml
from curobo.rollout.rollout_base import Goal
from curobo.wrap.reacher.mpc import MpcSolver, MpcSolverConfig
from curobo.wrap.reacher.ik_solver import IKSolver, IKSolverConfig
from curobo.wrap.reacher.motion_gen import MotionGen
from curobo.wrap.reacher.motion_gen import MotionGenConfig
from curobo.wrap.reacher.motion_gen import MotionGenPlanConfig
from curobo.wrap.reacher.motion_gen import MotionGenStatus
from geometry_msgs.msg import PoseStamped
from geometry_msgs.msg import Point
from geometry_msgs.msg import Vector3
from geometry_msgs.msg import Quaternion
from moveit_msgs.msg import CollisionObject
from moveit_msgs.msg import MoveItErrorCodes
from moveit_msgs.msg import RobotTrajectory
from nvblox_msgs.srv import EsdfAndGradients, EsdfAndGradientsRequest, EsdfAndGradientsResponse
import numpy as np
from sensor_msgs.msg import JointState
from shape_msgs.msg import SolidPrimitive
from std_msgs.msg import ColorRGBA
from std_msgs.msg import Header
from std_msgs.msg import Bool
import torch
from trajectory_msgs.msg import JointTrajectory
from trajectory_msgs.msg import JointTrajectoryPoint
from trajectory_msgs.msg import MultiDOFJointTrajectory
from visualization_msgs.msg import Marker
from tf2_ros import Buffer, TransformListener, LookupException, ExtrapolationException
from geometry_msgs.msg import PointStamped
from tf2_geometry_msgs import do_transform_point
from curobo_trajectory_server.srv import cuRoMoveGroup, cuRoMoveGroupResponse, cuRoMoveGroupRequest
from curobo_trajectory_server.srv import cuRoMpcSetting, cuRoMpcSettingResponse, cuRoMpcSettingRequest
from kuavo_msgs.msg import sensorsData
import threading
# from curobo.types.robot import JointState
# from isaac_ros_cumotion.xrdf_utils import convert_xrdf_to_curobo
# from moveit_msgs.action import MoveGroup
# from ament_index_python.packages import get_package_share_directory
# from kuavo_robot_interfaces.msg import CuRobotArmStatus
# from nvblox_msgs.srv import EsdfAndGradients
USE_ESDF_NVBLOX_MAP = True # Whether to use esdf map
global_esdf_grid = None # Global esdf map | Used to store the map for current planning
origin_esdf_grid = None # Real-time background updated esdf map
tsdf_response = None # tsdf_response result
IF_MPC_FIRST_SOLVE_FLAG = True # Whether MPC first solve is successful
if USE_ESDF_NVBLOX_MAP:
if_first_map_init_flag = False # Whether the first esdf map has been updated
"""
# Robot status | True for 'go' state | False for 'back' state
"""
global_robot_arm_plan_status = True
"""
# Global processing status | True for idle | False for processing
"""
global_if_process_action_flag = True
"""
# Robot trajectory status | 1 for Go state trajectory publisher | 0 for Back state trajectory publisher
JointTrajectory
"""
TRAJ_COUNTER = 1
"""
# Global storage for robot trajectories
"""
JOINT_TRAJ_GO = None
JOINT_TRAJ_BACK = None
"""
# Whether in map processing state
"""
IF_PROCESS_VOXEL_MAP_SATUS = False
DEBUG_MODE_FLAG = True
DEBUG_MODE = True # Debug mode
JOINT_NAME_LIST = [ "zarm_l1_link", "zarm_l2_link", "zarm_l3_link", "zarm_l4_link", "zarm_l5_link", "zarm_l6_link", "zarm_l7_link",
"zarm_r1_link", "zarm_r2_link", "zarm_r3_link", "zarm_r4_link", "zarm_r5_link", "zarm_r6_link", "zarm_r7_link"]
class CumotionActionServer:
def __init__(self):
rospy.init_node('trajectory_server_node', anonymous=True)
# Declare and initialize parameters
self.tensor_args = TensorDeviceType()
self.nvblox_tensor_args = TensorDeviceType()
# self.robot_config = rospy.get_param('~robot', '/home/lab/GenDexGrasp/Gendexgrasp_ros_ok/kuavo_assets/biped_s40/urdf/biped_s40.yml')
self.robot_config = rospy.get_param('~robot', '/home/lab/GenDexGrasp/Gendexgrasp_ros_ok/kuavo_assets/biped_s42/urdf/biped_s42.yml')
self.time_dilation_factor = rospy.get_param('~time_dilation_factor', 1.0)
self.collision_cache_mesh = rospy.get_param('~collision_cache_mesh', 20)
self.collision_cache_cuboid = rospy.get_param('~collision_cache_cuboid', 20)
self.interpolation_dt = rospy.get_param('~interpolation_dt', 0.02)
self.__voxel_dims = rospy.get_param('~voxel_dims', [2.0, 2.0, 2.0])
self.__voxel_size = rospy.get_param('~voxel_size', 0.02)
# rospy.loginfo(f"Voxel dimensions: {self.__voxel_dims}, Voxel size: {self.__voxel_size}")
self.publish_voxel_size = rospy.get_param('~publish_voxel_size', 0.01)
self.max_publish_voxels = rospy.get_param('~max_publish_voxels', 50000)
self.joint_states_topic = rospy.get_param('~joint_states_topic', '/joint_states')
self.tool_frame = rospy.get_param('~tool_frame', None)
self.grid_position = rospy.get_param('~grid_position', [0.0, 0.0, 0.0])
self.esdf_service_name = rospy.get_param('~esdf_service_name', '/nvblox_node/get_esdf_and_gradient')
# self.urdf_path = rospy.get_param('~urdf_path', '/home/lab/GenDexGrasp/Gendexgrasp_ros_ok/kuavo_assets/biped_s40/urdf/biped_s40_left_arm.urdf')
self.urdf_path = rospy.get_param('~urdf_path', '/home/lab/GenDexGrasp/Gendexgrasp_ros_ok/kuavo_assets/biped_s42/urdf/biped_s42_v4_left_arm.urdf')
self.enable_debug_mode = rospy.get_param('~enable_curobo_debug_mode', False)
self.add_ground_plane = rospy.get_param('~add_ground_plane', False)
self.override_moveit_scaling_factors = rospy.get_param('~override_moveit_scaling_factors', True)
self.read_esdf_world = rospy.get_param('~read_esdf_world', True)
self.publish_curobo_world_as_voxels = rospy.get_param('~publish_curobo_world_as_voxels', True)
# Configure logging
if self.enable_debug_mode:
setup_curobo_logger('info')
else:
setup_curobo_logger('warning')
self.goal = None
self.cube_pose = PoseStamped()
self.cube_pose.header.frame_id = 'base_link'
self.cube_pose.pose.position.x = 0.3
self.cube_pose.pose.position.y = 0.3
self.cube_pose.pose.position.z = 0.2
self.cube_pose.pose.orientation.x = 0.0
self.cube_pose.pose.orientation.y = -0.70710677
self.cube_pose.pose.orientation.z = 0.0
self.cube_pose.pose.orientation.w = 0.70710677
rospy.Timer(rospy.Duration(0.5), self.publish_marker)
self.mpc_l_arm_marker_pub = rospy.Publisher('/mpc_l_arm_goal/marker', Marker, queue_size=10)
self._mpc_cmd_goal_sub = rospy.Subscriber('/mpc_cmd_result', PoseStamped, self.mpc_cmd_goal_callback)
self.past_position = None
self.past_orientation = None
# Publishers
self.voxel_pub = rospy.Publisher('/curobo/voxels', Marker, queue_size=10)
self.go_joint_traj_pub = rospy.Publisher('/cumotion_go_joint_trajectory_topic', JointTrajectory, queue_size=10)
self.back_joint_traj_pub = rospy.Publisher('/cumotion_back_joint_trajectory_topic', JointTrajectory, queue_size=10)
self.global_if_process_flag_pub = rospy.Publisher('/global_if_process_action_flag_topic', Bool, queue_size=10)
self.flag_publisher_ = rospy.Publisher('/global_success_action_flag_topic', Bool, queue_size=10)
# Timers for periodic publishing
rospy.Timer(rospy.Duration(0.2), self.timer_callback)
self.arm_status_sub = rospy.Subscriber('/robot_arm_plan_status', Bool, self.arm_status_callback)
self.curobo_joint_state = None
self.current_curobo_joint_space = None
self._joint_names = ['zarm_l1_joint', 'zarm_l2_joint', 'zarm_l3_joint', 'zarm_l4_joint', 'zarm_l5_joint', 'zarm_l6_joint', 'zarm_l7_joint']
self.current_robot_joint_space = JointState(
header=Header(stamp=rospy.Time(0), frame_id='torso'),
name=self._joint_names,
position=[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
velocity=[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
effort=[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
)
self.ocs2_robot_arm_status_sub = rospy.Subscriber('/sensors_data_raw', sensorsData, self.ocs2_robot_arm_status_callback)
self.robot_arm_joint_cmd_pub = rospy.Publisher('/kuavo_arm_traj', JointState, queue_size=10)
self._action_server = rospy.Service('/cumotion/move_group', cuRoMoveGroup, self.execute_callback)
self._mpc_action_server = rospy.Service('/cumotion/mpc_set_goal', cuRoMpcSetting, self.mpc_goal_callback)
# TF2 Listener
self.tf_buffer = Buffer()
self.tf_listener = TransformListener(self.tf_buffer)
self.source_frame = 'camera_link'
self.target_frame = 'base_link'
self.mpc_result = None
# Initialize additional attributes
self.__esdf_client = None
self.__esdf_req = None
self.__read_esdf_grid = self.read_esdf_world
# warm up
self.warmup()
self.__query_count = 0
self.subscription = rospy.Subscriber(self.joint_states_topic, JointState, self.js_callback)
self.__js_buffer = None
# Create client
if self.__read_esdf_grid:
# Wait
rospy.loginfo(f"Waiting for {self.esdf_service_name} service...")
rospy.wait_for_service(self.esdf_service_name)
# Assign
self.__esdf_req = EsdfAndGradientsRequest()
# Create esdf client
self.__esdf_client = rospy.ServiceProxy(self.esdf_service_name, EsdfAndGradients)
rospy.loginfo(f"Connected to {self.esdf_service_name} service")
def mpc_cmd_goal_callback(self, msg):
try:
self.cube_pose = msg
# rospy.loginfo(f"Received mpc_cmd_result message: {self.cube_pose}")
except Exception as e:
rospy.logerr(f"Error in mpc_cmd_goal_callback: {e}")
def mpc_goal_callback(self, request):
try:
# Extract Pose data and assign to self.cube_pose
self.cube_pose = request.pose
# Return success result
return cuRoMpcSettingResponse(result=True)
except Exception as e:
rospy.logerr(f"Error in mpc_goal_callback: {e}")
return cuRoMpcSettingResponse(result=False)
def control_robot_arm_cmd(self, interpolated_positions):
joint_state_msg = JointState()
joint_state_msg.name = JOINT_NAME_LIST
joint_state_msg.position = list(interpolated_positions) # Fill the 7 positions of the left arm
joint_state_msg.position += [0.0] * 7 # Fill the 7 joint positions of the right arm with zero
joint_state_msg.position = [angle * (180 / math.pi) for angle in joint_state_msg.position]
self.robot_arm_joint_cmd_pub.publish(joint_state_msg)
def ocs2_robot_arm_status_callback(self, msg):
"""Callback function for handling robot arm status"""
# sensors_msg/JointState 维护
if len(msg.joint_data.joint_q) >= 19: # Ensure data length is sufficient
self.current_robot_joint_space.position = msg.joint_data.joint_q[12:19]
# rospy.loginfo(f"Received /sensors_data_raw message: {self.current_robot_joint_space.position}")
else:
rospy.logwarn("Received joint_data.joint_q does not contain enough elements!")
def publish_marker(self, event):
"""Timer callback function, used to publish Marker"""
# Create Marker message
marker = Marker()
# Set Marker basic properties
marker.header.frame_id = "base_link" # Set parent coordinate system
marker.header.stamp = rospy.Time.now()
marker.ns = "cube_namespace" # Namespace
marker.id = 0 # Marker ID
marker.type = Marker.CUBE # Set Marker shape to CUBE
marker.action = Marker.ADD # Add Marker
# Set Marker position and orientation
marker.pose.position.x = self.cube_pose.pose.position.x
marker.pose.position.y = self.cube_pose.pose.position.y
marker.pose.position.z = self.cube_pose.pose.position.z
marker.pose.orientation.x = self.cube_pose.pose.orientation.x
marker.pose.orientation.y = self.cube_pose.pose.orientation.y
marker.pose.orientation.z = self.cube_pose.pose.orientation.z
marker.pose.orientation.w = self.cube_pose.pose.orientation.w
# Set Marker size (e.g., cube side length)
marker.scale.x = 0.05 # Length
marker.scale.y = 0.05 # Width
marker.scale.z = 0.05 # Height
# Set Marker color (e.g., red)
marker.color.r = 1.0
marker.color.g = 0.0
marker.color.b = 0.0
marker.color.a = 1.0 # alpha transparency
# Publish Marker message
self.mpc_l_arm_marker_pub.publish(marker)
def timer_callback(self, event):
try:
global global_if_process_action_flag
msg = Bool()
msg.data = global_if_process_action_flag
self.global_if_process_flag_pub.publish(msg)
except Exception as e:
rospy.logerr(f"Exception in timer_callback: {e}")
def arm_status_callback(self, msg):
"""Callback function for handling robot arm planning status"""
global global_robot_arm_plan_status
global_robot_arm_plan_status = msg.data
def js_callback(self, msg):
self.__js_buffer = {
'joint_names': msg.name,
'position': msg.position,
'velocity': msg.velocity,
}
def warmup(self):
rospy.loginfo("cuMotion_MPC_Server Now is running")
# Load robot config
self.robot_config_path = join_path(get_robot_configs_path(), self.robot_config)
# Retrieve parameters from ROS parameter server
collision_cache_cuboid = rospy.get_param('~collision_cache_cuboid', 20)
collision_cache_mesh = rospy.get_param('~collision_cache_mesh', 20)
# collision_cache_cuboid = rospy.get_param('~collision_cache_cuboid', 30)
# collision_cache_mesh = rospy.get_param('~collision_cache_mesh', 10)
interpolation_dt = rospy.get_param('~interpolation_dt', 0.02)
# Define world configuration
world_file = WorldConfig.from_dict(
{
'cuboid': {
'table': {
'pose': [0, 0, -0.05, 1, 0, 0, 0],
'dims': [2.0, 2.0, 0.1],
}
},
'voxel': {
'world_voxel': {
'dims': self.__voxel_dims,
'pose': [0, 0, 0, 1, 0, 0, 0],
'voxel_size': self.__voxel_size,
'feature_dtype': torch.bfloat16,
},
},
}
)
# Load the robot configuration
robot_dict = load_yaml(self.robot_config_path)
rospy.loginfo(f"Loaded robot config: {self.robot_config_path}")
# Update robot configuration for URDF path if needed
if self.urdf_path:
robot_dict['robot_cfg']['kinematics']['urdf_path'] = self.urdf_path
robot_dict = robot_dict['robot_cfg']
# ik_config
self.ik_tensor_args = TensorDeviceType()
ik_config = IKSolverConfig.load_from_robot_config(
robot_dict,
world_file,
rotation_threshold=0.05,
position_threshold=0.005,
num_seeds=20,
self_collision_check=True,
self_collision_opt=True,
tensor_args= self.ik_tensor_args,
use_cuda_graph=True,
# use_fixed_samples=True,
)
ik_solver = IKSolver(ik_config)
self.ik_solver = ik_solver
# MPC - Setting
"""
mpc_config = MpcSolverConfig.load_from_robot_config(
robot_cfg,
world_cfg,
use_cuda_graph=True,
use_cuda_graph_metrics=True,
use_cuda_graph_full_step=False,
self_collision_check=True,
collision_checker_type=CollisionCheckerType.BLOX,
use_mppi=True,
use_lbfgs=False,
use_es=False,
store_rollouts=True,
step_dt=0.02,
)
"""
mpc_config = MpcSolverConfig.load_from_robot_config(
robot_dict,
world_file,
collision_cache={
'obb': collision_cache_cuboid,
'mesh': collision_cache_mesh,
},
# collision_checker_type=CollisionCheckerType.VOXEL,
collision_checker_type=CollisionCheckerType.BLOX,
use_cuda_graph=True,
use_cuda_graph_metrics=True,
use_cuda_graph_full_step=False,
self_collision_check=True,
use_mppi=True,
use_lbfgs=False,
use_es=False,
store_rollouts=True,
step_dt=0.02,
)
mpc = MpcSolver(mpc_config)
self.mpc = mpc
self.tensor_args = self.mpc.tensor_args
self.__robot_base_frame = mpc.kinematics.base_link
rospy.loginfo(f"Robot base frame: {self.__robot_base_frame}")
# Initial solve | Wait for /sensors_data_raw topic to publish
rospy.loginfo("Waiting for /sensors_data_raw topic to publish...")
sensors_data = rospy.wait_for_message('/sensors_data_raw', sensorsData) # Replace SensorMessageType with actual message type
rospy.loginfo(f"Received /sensors_data_raw message: {sensors_data.joint_data.joint_q}")
self.__world_collision = mpc.world_coll_checker # Collision world check declaration
self.__world_model = mpc.world_collision # World collision
rospy.loginfo('cuMotion_MPC_Server is ready for planning queries!')
# Control description
self.retract_cfg_increment = -0.2
def update_voxel_grid(self):
global global_robot_arm_plan_status
global global_esdf_grid
global origin_esdf_grid
global IF_PROCESS_VOXEL_MAP_SATUS
global tsdf_response
global DEBUG_MODE
# Get ESDF map and update
origin_esdf_grid = self.get_esdf_voxel_grid(tsdf_response)
IF_PROCESS_VOXEL_MAP_SATUS = True # Processing voxel map
rospy.loginfo('Calling ESDF service')
# Determine the state of the robot arm
if global_robot_arm_plan_status:
rospy.loginfo('Updated go state ESDF grid')
esdf_grid = origin_esdf_grid # Use the current origin实时更新的地图
else:
rospy.loginfo('Updated back state ESDF grid')
esdf_grid = global_esdf_grid # Use the last global map
# ESDF map description
if torch.max(esdf_grid.feature_tensor) <= (-1000.0 + 0.5 * self.__voxel_size + 1e-5):
rospy.logerr('ESDF data is empty, try again after few seconds.')
return False
if DEBUG_MODE:
rospy.loginfo(f"ESDF grid max: {torch.max(esdf_grid.feature_tensor)}, min: {torch.min(esdf_grid.feature_tensor)}")
rospy.loginfo(f"ESDF grid shape: {esdf_grid.feature_tensor.shape}")
# Update voxel map, assuming __world_collision is the class that handles collision detection
self.__world_collision.update_voxel_data(esdf_grid)
## Update the features of the voxels of the voxel map
# self.__world_collision.update_voxel_features(esdf_grid.feature_tensor, name="world_voxel", env_idx=0)
rospy.loginfo('Updated ESDF grid')
# Voxel map updated
IF_PROCESS_VOXEL_MAP_SATUS = False
return True
def send_request(self, aabb_min_m, aabb_size_m):
self.__esdf_req.aabb_min_m = aabb_min_m
self.__esdf_req.aabb_size_m = aabb_size_m
# esdf_future = self.__esdf_client.call_async(self.__esdf_req) # Asynchronous client call service
# return esdf_future
try:
# Use synchronous call instead of asynchronous call
esdf_response = self.__esdf_client(self.__esdf_req)
return esdf_response
except rospy.ServiceException as e:
rospy.logerr(f"Service call failed: {e}")
return None
def get_esdf_voxel_grid(self, esdf_data):
global DEBUG_MODE
esdf_array = esdf_data.esdf_and_gradients # Extract esdf grid and gradient information
array_shape = [ # Read the three-dimensional shape [x, y, z] of the voxel grid from the layout information of esdf_array, representing the number of voxels in each dimension
esdf_array.layout.dim[0].size,
esdf_array.layout.dim[1].size,
esdf_array.layout.dim[2].size,
]
if DEBUG_MODE:
rospy.loginfo("-------------- ESDF DEBUG --------------")
rospy.loginfo(f"ESDF array shape: {array_shape}")
# TODO:
array_data = np.array(esdf_array.data)
array_data = self.tensor_args.to_device(array_data)
if DEBUG_MODE:
rospy.loginfo(f"ESDF array data: {array_data}")
# Array data is reshaped to x y z channels
array_data = array_data.view(array_shape[0], array_shape[1], array_shape[2]).contiguous()
# Array is squeezed to 1 dimension
array_data = array_data.reshape(-1, 1)
# array_data[:] = -9 #
# nvblox uses negative distance inside obstacles, cuRobo needs the opposite:
array_data = -1 * array_data
# nvblox assigns a value of -1000.0 for unobserved voxels, making
array_data[array_data >= 1000.0] = -1000.0
# nvblox distance are at origin of each voxel, cuRobo's esdf needs it to be at faces
array_data = array_data + 0.5 * self.__voxel_size
"""
Create a CuVoxelGrid object esdf_grid using the generated data
Set the grid name, dimensions, pose, voxel size, data type, and feature data. The position pose includes position coordinates [x, y, z] and quaternion representation [qw, qx, qy, qz]
"""
esdf_grid = CuVoxelGrid(
name='world_voxel',
dims=self.__voxel_dims,
pose=[
self.grid_position[0],
self.grid_position[1],
self.grid_position[2],
1,
0.0,
0.0,
0,
], # x, y, z, qw, qx, qy, qz
voxel_size=self.__voxel_size,
feature_dtype=torch.float32,
feature_tensor=array_data,
)
return esdf_grid # Return the generated CuVoxelGrid object esdf_grid
def get_cumotion_collision_object(self, mv_object: CollisionObject):
objs = []
pose = mv_object.pose
world_pose = [
pose.position.x,
pose.position.y,
pose.position.z,
pose.orientation.w,
pose.orientation.x,
pose.orientation.y,
pose.orientation.z,
]
world_pose = Pose.from_list(world_pose)
supported_objects = True
if len(mv_object.primitives) > 0:
for k in range(len(mv_object.primitives)):
pose = mv_object.primitive_poses[k]
primitive_pose = [
pose.position.x,
pose.position.y,
pose.position.z,
pose.orientation.w,
pose.orientation.x,
pose.orientation.y,
pose.orientation.z,
]
object_pose = world_pose.multiply(Pose.from_list(primitive_pose)).tolist()
if mv_object.primitives[k].type == SolidPrimitive.BOX:
# cuboid:
dims = mv_object.primitives[k].dimensions
obj = Cuboid(
name=str(mv_object.id) + '_' + str(k) + '_cuboid',
pose=object_pose,
dims=dims,
)
objs.append(obj)
elif mv_object.primitives[k].type == SolidPrimitive.SPHERE:
# sphere:
radius = mv_object.primitives[k].dimensions[
mv_object.primitives[k].SPHERE_RADIUS
]
obj = Sphere(
name=str(mv_object.id) + '_' + str(k) + '_sphere',
pose=object_pose,
radius=radius,
)
objs.append(obj)
elif mv_object.primitives[k].type == SolidPrimitive.CYLINDER:
# cylinder:
cyl_height = mv_object.primitives[k].dimensions[
mv_object.primitives[k].CYLINDER_HEIGHT
]
cyl_radius = mv_object.primitives[k].dimensions[
mv_object.primitives[k].CYLINDER_RADIUS
]
obj = Cylinder(
name=str(mv_object.id) + '_' + str(k) + '_cylinder',
pose=object_pose,
height=cyl_height,
radius=cyl_radius,
)
objs.append(obj)
elif mv_object.primitives[k].type == SolidPrimitive.CONE:
rospy.logerr('Cone primitive is not supported')
supported_objects = False
else:
rospy.logerr('Unknown primitive type')
supported_objects = False
if len(mv_object.meshes) > 0:
for k in range(len(mv_object.meshes)):
pose = mv_object.mesh_poses[k]
mesh_pose = [
pose.position.x,
pose.position.y,
pose.position.z,
pose.orientation.w,
pose.orientation.x,
pose.orientation.y,
pose.orientation.z,
]
object_pose = world_pose.multiply(Pose.from_list(mesh_pose)).tolist()
verts = mv_object.meshes[k].vertices
verts = [[v.x, v.y, v.z] for v in verts]
tris = [
[v.vertex_indices[0], v.vertex_indices[1], v.vertex_indices[2]]
for v in mv_object.meshes[k].triangles
]
obj = Mesh(
name=str(mv_object.id) + '_' + str(len(objs)) + '_mesh',
pose=object_pose,
vertices=verts,
faces=tris,
)
objs.append(obj)
return objs, supported_objects
def update_world_objects(self, moveit_objects):
rospy.loginfo("Calling update_world_objects function.")
world_update_status = True
if len(moveit_objects) > 0:
cuboid_list = []
sphere_list = []
cylinder_list = []
mesh_list = []
for i, obj in enumerate(moveit_objects):
cumotion_objects, world_update_status = self.get_cumotion_collision_object(obj)
for cumotion_object in cumotion_objects:
if isinstance(cumotion_object, Cuboid):
cuboid_list.append(cumotion_object)
elif isinstance(cumotion_object, Cylinder):
cylinder_list.append(cumotion_object)
elif isinstance(cumotion_object, Sphere):
sphere_list.append(cumotion_object)
elif isinstance(cumotion_object, Mesh):
mesh_list.append(cumotion_object)
world_model = WorldConfig(
cuboid=cuboid_list,
cylinder=cylinder_list,
sphere=sphere_list,
mesh=mesh_list,
).get_collision_check_world()
rospy.loginfo("Calling mpc.update_world(world_model) function.") # When esdf service query is enabled, this service is not called
self.mpc.update_world(world_model)
if self.__read_esdf_grid: # Read current world collision distance from esdf
rospy.loginfo("Calling world_update_status = self.update_voxel_grid() function.")
world_update_status = self.update_voxel_grid() # Whether to take the previous map for obstacle avoidance based on the global state of go or back
if self.publish_curobo_world_as_voxels: # Whether to publish voxel world
rospy.loginfo("Calling world_update_status = self.publish_voxels(xyzr_tensor) function.")
voxels = self.__world_collision.get_esdf_in_bounding_box(
Cuboid(
name='test',
pose=[0.0, 0.0, 0.0, 1, 0, 0, 0], # x, y, z, qw, qx, qy, qz
dims=self.__voxel_dims,
),
voxel_size=self.publish_voxel_size,
)
xyzr_tensor = voxels.xyzr_tensor.clone()
xyzr_tensor[..., 3] = voxels.feature_tensor
self.publish_voxels(xyzr_tensor)
rospy.loginfo(f" get_esdf_in_bounding_box voxels: {voxels}")
# voxels = self.__world_collision.get_voxels_in_bounding_box(
# Cuboid(
# name='test',
# pose=[0.0, 0.0, 0.0, 1, 0, 0, 0],
# dims=self.__voxel_dims,
# ),
# voxel_size=self.publish_voxel_size,
# )
# self.publish_voxels_from_bounding_box(voxels)
# rospy.loginfo(f" get_voxels_in_bounding_box voxels: {voxels}")
return world_update_status
def execute_callback(self, goal_handle):
"""
TODO: Used to update the ik_goal of the mpc tracking (simply update the cube_pose PoseStamped data type)
"""
rospy.logwarn("You are using cuMotion_MPC_Server, this service is for cuMotion_Motion_Server . it's not recommended to use it for planning service")
def update_mpc_goal(self):
"""
Update target state (e.g., update target pose or position)
"""
global IF_MPC_FIRST_SOLVE_FLAG
self.curobo_joint_state = torch.tensor([self.current_robot_joint_space.position[0],
self.current_robot_joint_space.position[1],
self.current_robot_joint_space.position[2],
self.current_robot_joint_space.position[3],
self.current_robot_joint_space.position[4],
self.current_robot_joint_space.position[5],
self.current_robot_joint_space.position[6]
]).cuda() # Transfer to CUDA
self.current_curobo_joint_space = CuJointState.from_position(position=self.tensor_args.to_device(self.curobo_joint_state),
joint_names=self._joint_names)
if self.cube_pose:
# Get target position and orientation
cube_position = torch.tensor(
[self.cube_pose.pose.position.x, self.cube_pose.pose.position.y, self.cube_pose.pose.position.z]
).cuda()
cube_orientation = torch.tensor(
[self.cube_pose.pose.orientation.w, self.cube_pose.pose.orientation.x,
self.cube_pose.pose.orientation.y, self.cube_pose.pose.orientation.z]
).cuda()
# TODO: Update tracking point by changing Pose as MPC tracking point
# if self.past_position is None or torch.norm(cube_position - self.past_position) > 1e-3: # Update trigger condition
if self.past_position is None or torch.norm(cube_position - self.past_position) > 1e-3 or not torch.allclose(cube_orientation, self.past_orientation, atol=1e-3): # Update trigger condition
if IF_MPC_FIRST_SOLVE_FLAG:
# TODO: Use Goal as global point
self.past_position = cube_position
self.past_orientation = cube_orientation
ik_goal = Pose(position=self.tensor_args.to_device(cube_position),
quaternion=self.tensor_args.to_device(cube_orientation))
result = self.ik_solver.solve_batch(ik_goal)
# rospy.loginfo(f"ik_solver result: {result}")
ik_position = result.js_solution.position
retract_cfg = self.mpc.rollout_fn.dynamics_model.retract_config.clone().unsqueeze(0)
if ik_position.shape[-1] != retract_cfg.shape[-1]:
rospy.logerr("Dimension mismatch: ik_position and retract_cfg have different lengths!")
else:
# Update retract_cfg value
retract_cfg[0] = ik_position[0]
# rospy.loginfo(f"MPC update_mpc_goal retract_cfg: {retract_cfg}") # Get initial position
joint_names = self.mpc.rollout_fn.joint_names
state = self.mpc.rollout_fn.compute_kinematics(
CuJointState.from_position(retract_cfg, joint_names=joint_names)
)
self.current_state = CuJointState.from_position(retract_cfg, joint_names=joint_names)
retract_pose = Pose(state.ee_pos_seq, quaternion=state.ee_quat_seq)
# rospy.loginfo(f"state.ee_pos_seq: {state.ee_pos_seq}") # Get end effector position
# rospy.loginfo(f"state.ee_quat_seq: {state.ee_quat_seq}") # Get end effector orientation
self.goal = Goal(
current_state=self.current_state,
goal_state=CuJointState.from_position(retract_cfg, joint_names=joint_names),
goal_pose=retract_pose,
)
self.goal_buffer = self.mpc.setup_solve_single(self.goal, 1)
self.mpc.update_goal(self.goal_buffer)
# rospy.loginfo("--------- MPC First Solve -------") # Get initial position
# IF_MPC_FIRST_SOLVE_FLAG = False
else:
# TODO: Use Goal as global point | Note: Tensor dimension mismatch warning
self.past_position = cube_position
self.past_orientation = cube_orientation
ik_goal = Pose(position=self.tensor_args.to_device(cube_position),
quaternion=self.tensor_args.to_device(cube_orientation))
# TODO: Normal approach
self.goal_buffer.goal_pose.copy_(ik_goal)
self.mpc.update_goal(self.goal_buffer)
def run(self):
if USE_ESDF_NVBLOX_MAP:
rospy.loginfo("...USE_ESDF_NVBLOX_MAP...")
global if_first_map_init_flag
while not if_first_map_init_flag:
rospy.loginfo("Waiting for first map initialization...")
time.sleep(1) # Wait for esdf map initialization to complete
rospy.loginfo("ESDF Map initialization is complete...")
# Update world description
world_objects = []
world_update_status = self.update_world_objects(world_objects) # Update world description | Adjust map acquisition based on go or back state for planning
while not rospy.is_shutdown():
# Test
self.mpc.get_visual_rollouts()
# TODO: Update target
self.update_mpc_goal()
# TODO: Update next state
self.mpc_result = self.mpc.step(self.current_curobo_joint_space, max_attempts=2)
# TODO: Send CMD command
if self.mpc_result and hasattr(self.mpc_result, 'js_action') and self.mpc_result.js_action:
joint_positions = self.mpc_result.js_action.position
# Convert to list or numpy array for publishing
joint_positions_list = joint_positions.squeeze().cpu().numpy().tolist() # Remove singleton dimensions and move to CPU
# rospy.loginfo(f"Control command joint positions: {joint_positions_list}")
# control-CMD
self.control_robot_arm_cmd(joint_positions_list)
def publish_voxels_from_bounding_box(self, voxels):
global DEBUG_MODE
vox_size = self.publish_voxel_size
# create marker:
marker = Marker()
marker.header.frame_id = self.__robot_base_frame
marker.id = 1 # Unique ID for this marker type
marker.type = 6 # Cube list
marker.ns = 'curobo_world_bounding_box'
marker.action = 0
marker.pose.orientation.w = 1.0
marker.lifetime = rospy.Duration(1000.0)
marker.frame_locked = False
marker.scale.x = vox_size
marker.scale.y = vox_size
marker.scale.z = vox_size
# Convert tensor to numpy for easier processing
vox = voxels.cpu().numpy()
marker.points = []
# Map values to colors and points
for i in range(min(len(vox), self.max_publish_voxels)):
pt = Point()
pt.x = float(vox[i, 0])
pt.y = float(vox[i, 1])
pt.z = float(vox[i, 2])
color = ColorRGBA()
value = vox[i, 3]
# Map `value` to a color gradient (e.g., red to green)
rgba = [
max(0.0, 1.0 - value), # Red decreases with value
min(1.0, value), # Green increases with value
0.0, # Blue is fixed
1.0 # Alpha (fully opaque)
]
color.r = rgba[0]
color.g = rgba[1]
color.b = rgba[2]
color.a = rgba[3]
marker.colors.append(color)
marker.points.append(pt)
# Add timestamp and publish marker
marker.header.stamp = rospy.Time.now()
# DEBUG
if DEBUG_MODE:
rospy.loginfo(f"Bounding Box Voxels stats: max={torch.max(voxels[:, 3])}, min={torch.min(voxels[:, 3])}")
rospy.loginfo(f"Bounding Box Voxels shape: {voxels.shape}")
# Publish marker
self.voxel_pub.publish(marker)
def publish_voxels(self, voxels):
global DEBUG_MODE
vox_size = self.publish_voxel_size
# create marker:
marker = Marker()
marker.header.frame_id = self.__robot_base_frame
marker.id = 0
marker.type = 6 # cube list
marker.ns = 'curobo_world'
marker.action = 0
marker.pose.orientation.w = 1.0
marker.lifetime = rospy.Duration(1000.0) #
marker.frame_locked = False
marker.scale.x = vox_size
marker.scale.y = vox_size
marker.scale.z = vox_size
# get only voxels that are inside surfaces:
voxels = voxels[voxels[:, 3] >= 0.0]
vox = voxels.view(-1, 4).cpu().numpy()
marker.points = []
for i in range(min(len(vox), self.max_publish_voxels)):
pt = Point()
pt.x = float(vox[i, 0])
pt.y = float(vox[i, 1])
pt.z = float(vox[i, 2])
color = ColorRGBA()
d = vox[i, 3]
rgba = [min(1.0, 1.0 - float(d)), 0.0, 0.0, 1.0]
color.r = rgba[0]
color.g = rgba[1]
color.b = rgba[2]
color.a = rgba[3]
marker.colors.append(color)
marker.points.append(pt)
# publish voxels:
marker.header.stamp = rospy.Time.now()
# DEBUG
if DEBUG_MODE:
rospy.loginfo(f"Voxels stats: max={torch.max(voxels[:, 3])}, min={torch.min(voxels[:, 3])}")
rospy.loginfo(f"Voxels shape: {voxels.shape}")
# publish
self.voxel_pub.publish(marker)
def map_update_thread(cumotion_action_server):
"""Map update logic running in thread"""
global IF_PROCESS_VOXEL_MAP_SATUS
global origin_esdf_grid
global tsdf_response
global USE_ESDF_NVBLOX_MAP
origin_voxel_dims = [2.0, 2.0, 2.0]
while not rospy.is_shutdown():
if not IF_PROCESS_VOXEL_MAP_SATUS:
# This is half of x,y and z dims
aabb_min = Point()
aabb_min.x = -1 * origin_voxel_dims[0] / 2
aabb_min.y = -1 * origin_voxel_dims[1] / 2
aabb_min.z = -1 * origin_voxel_dims[2] / 2
# This is a voxel size.
voxel_dims = Vector3()
voxel_dims.x = origin_voxel_dims[0]
voxel_dims.y = origin_voxel_dims[1]
voxel_dims.z = origin_voxel_dims[2]
#
esdf_response = cumotion_action_server.send_request(aabb_min, voxel_dims)
if esdf_response is not None:
# rospy.loginfo("ESDF map updated successfully")
tsdf_response = esdf_response
if USE_ESDF_NVBLOX_MAP:
global if_first_map_init_flag
if_first_map_init_flag = True # First esdf map update completed
else:
rospy.logwarn("Failed to update ESDF map")
else:
rospy.loginfo("Voxel map processing status is active, skipping update")
rospy.sleep(1.0) # Control thread update frequency to avoid too frequent calls
if __name__ == '__main__':
curoBoServer = CumotionActionServer()
planning_thread = threading.Thread(target=map_update_thread, args=(curoBoServer,))
planning_thread.start()
try:
curoBoServer.run()
except rospy.ROSInterruptException as e:
rospy.logerr(e)
rospy.shutdown() |
Beta Was this translation helpful? Give feedback.
0 replies
# for free
to join this conversation on GitHub.
Already have an account?
# to comment
-
@balakumar-s
Beta Was this translation helpful? Give feedback.
All reactions