6.1 Python接口使用示例
本章节将带您实现索引所示若干功能
构建 & 运行说明
进入到
aimdk目录,执行以下命令source /opt/ros/humble/setup.bash colcon build source install/setup.bash ros2 run py_examples '对应功能名称如: get_mc_action'
📝 代码说明
完整的代码实现包含完整的错误处理、信号处理、超时处理等机制,确保程序的健壮性。请您在 py_examples 目录下查看/修改
小心
ROS的服务(service)机制在跨板通信时存在一些待优化问题, 二次开发时请参考例程添加异常处理、快速重试等保护机制
6.1.1 获取机器人模式
通过调用GetMcAction服务获取机器人当前的运行模式,包括名称和状态信息。
1#!/usr/bin/env python3
2
3import rclpy
4import rclpy.logging
5from rclpy.node import Node
6
7from aimdk_msgs.srv import GetMcAction
8from aimdk_msgs.msg import CommonRequest
9
10
11class GetMcActionClient(Node):
12 def __init__(self):
13 super().__init__('get_mc_action_client')
14 self.client = self.create_client(
15 GetMcAction, '/aimdk_5Fmsgs/srv/GetMcAction')
16 self.get_logger().info('✅ GetMcAction client node created.')
17
18 # Wait for the service to become available
19 while not self.client.wait_for_service(timeout_sec=2.0):
20 self.get_logger().info('⏳ Service unavailable, waiting...')
21
22 self.get_logger().info('🟢 Service available, ready to send request.')
23
24 def send_request(self):
25 request = GetMcAction.Request()
26 request.request = CommonRequest()
27
28 self.get_logger().info('📨 Sending request to get robot mode')
29 for i in range(8):
30 request.request.header.stamp = self.get_clock().now().to_msg()
31 future = self.client.call_async(request)
32 rclpy.spin_until_future_complete(self, future, timeout_sec=0.25)
33
34 if future.done():
35 break
36
37 # retry as remote peer is NOT handled well by ROS
38 self.get_logger().info(f'trying ... [{i}]')
39
40 response = future.result()
41 if response is None:
42 self.get_logger().error('❌ Service call failed or timed out.')
43 return
44
45 self.get_logger().info('✅ Robot mode get successfully.')
46 self.get_logger().info(f'Mode name: {response.info.action_desc}')
47 self.get_logger().info(f'Mode status: {response.info.status.value}')
48
49
50def main(args=None):
51 rclpy.init(args=args)
52 node = None
53 try:
54 node = GetMcActionClient()
55 node.send_request()
56 except KeyboardInterrupt:
57 pass
58 except Exception as e:
59 rclpy.logging.get_logger('main').error(
60 f'Program exited with exception: {e}')
61
62 if node:
63 node.destroy_node()
64 if rclpy.ok():
65 rclpy.shutdown()
66
67
68if __name__ == '__main__':
69 main()
6.1.2 设置机器人模式
该示例中用到了SetMcAction服务,运行节点程序后终端输入模式对应的字段值,机器人会尝试进入相应的运动模式。
切入稳定站立(STAND_DEFAULT)模式前,确保机器人已立起且双脚着地。
模式切换需遵循模式转换图, 跨模式转换不能成功
走跑模式(LOCOMOTION_DEFAULT)和稳定站立模式一体化自动切换,按模式转换图流转到两者中较近的即可
1#!/usr/bin/env python3
2
3import sys
4import rclpy
5import rclpy.logging
6from rclpy.node import Node
7
8from aimdk_msgs.srv import SetMcAction
9from aimdk_msgs.msg import RequestHeader, CommonState, McAction, McActionCommand
10
11
12class SetMcActionClient(Node):
13 def __init__(self):
14 super().__init__('set_mc_action_client')
15 self.client = self.create_client(
16 SetMcAction, '/aimdk_5Fmsgs/srv/SetMcAction'
17 )
18 self.get_logger().info('✅ SetMcAction client node created.')
19
20 # Wait for the service to become available
21 while not self.client.wait_for_service(timeout_sec=2.0):
22 self.get_logger().info('⏳ Service unavailable, waiting...')
23
24 self.get_logger().info('🟢 Service available, ready to send request.')
25
26 def send_request(self, action_name: str):
27 req = SetMcAction.Request()
28 req.header = RequestHeader()
29
30 cmd = McActionCommand()
31 cmd.action_desc = action_name
32 req.command = cmd
33
34 self.get_logger().info(
35 f'📨 Sending request to set robot mode: {action_name}')
36 for i in range(8):
37 req.header.stamp = self.get_clock().now().to_msg()
38 future = self.client.call_async(req)
39 rclpy.spin_until_future_complete(self, future, timeout_sec=0.25)
40
41 if future.done():
42 break
43
44 # retry as remote peer is NOT handled well by ROS
45 self.get_logger().info(f'trying ... [{i}]')
46
47 response = future.result()
48 if response is None:
49 self.get_logger().error('❌ Service call failed or timed out.')
50 return
51
52 if response.response.status.value == CommonState.SUCCESS:
53 self.get_logger().info('✅ Robot mode set successfully.')
54 else:
55 self.get_logger().error(
56 f'❌ Failed to set robot mode: {response.response.message}'
57 )
58
59
60def main(args=None):
61 action_info = {
62 'PASSIVE_DEFAULT': ('PD', 'joints with zero torque'),
63 'DAMPING_DEFAULT': ('DD', 'joints in damping mode'),
64 'JOINT_DEFAULT': ('JD', 'Position Control Stand (joints locked)'),
65 'STAND_DEFAULT': ('SD', 'Stable Stand (auto-balance)'),
66 'LOCOMOTION_DEFAULT': ('LD', 'locomotion mode (walk or run)'),
67 }
68
69 choices = {}
70 for k, v in action_info.items():
71 choices[v[0]] = k
72
73 rclpy.init(args=args)
74 node = None
75 try:
76 # Prefer command-line argument, otherwise prompt for input
77 if len(sys.argv) > 1:
78 motion = sys.argv[1]
79 else:
80 print('{:<4} - {:<20} : {}'.format('abbr',
81 'robot mode', 'description'))
82 for k, v in action_info.items():
83 print(f'{v[0]:<4} - {k:<20} : {v[1]}')
84 motion = input('Enter abbr of robot mode:')
85
86 action_name = choices.get(motion)
87 if not action_name:
88 raise ValueError(f'Invalid abbr of robot mode: {motion}')
89
90 node = SetMcActionClient()
91 node.send_request(action_name)
92 except KeyboardInterrupt:
93 pass
94 except Exception as e:
95 rclpy.logging.get_logger('main').error(
96 f'Program exited with exception: {e}')
97
98 if node:
99 node.destroy_node()
100 if rclpy.ok():
101 rclpy.shutdown()
102
103
104if __name__ == '__main__':
105 main()
使用说明
# 使用命令行参数设置模式(推荐)
ros2 run py_examples set_mc_action JD # 零力矩模式>>站姿预备(位控站立)
ros2 run py_examples set_mc_action SD # 机器人已立起且脚着地后方可执行,站姿预备>>稳定站立
# 稳定站立>>走跑模式 自动切换,无需手动切换
# 或者不带参数运行,程序会提示用户输入双字母缩写代码
ros2 run py_examples set_mc_action
输出示例
...
[INFO] [1764066567.502968540] [set_mc_action_client]: ✅ Robot mode set successfully.
注意事项
切入
STAND_DEFAULT模式前,确保机器人已立起且双脚着地模式切换可能需要几秒钟时间完成
接口参考
服务:
/aimdk_5Fmsgs/srv/SetMcAction消息:
aimdk_msgs/srv/SetMcAction
6.1.3 设置机器人动作
该示例中用到了preset_motion_client,进入稳定站立模式后,启动节点程序,输入相应的字段值可实现左手(右手)的握手(举手、挥手、飞吻)动作。
可供调用的参数见预设动作表
1#!/usr/bin/env python3
2
3import rclpy
4import rclpy.logging
5from rclpy.node import Node
6
7from aimdk_msgs.srv import SetMcPresetMotion
8from aimdk_msgs.msg import McPresetMotion, McControlArea, RequestHeader, CommonState
9
10
11class SetMcPresetMotionClient(Node):
12 def __init__(self):
13 super().__init__('preset_motion_client')
14 self.client = self.create_client(
15 SetMcPresetMotion, '/aimdk_5Fmsgs/srv/SetMcPresetMotion')
16 self.get_logger().info('✅ SetMcPresetMotion client node created.')
17
18 # Wait for the service to become available
19 while not self.client.wait_for_service(timeout_sec=2.0):
20 self.get_logger().info('⏳ Service unavailable, waiting...')
21
22 self.get_logger().info('🟢 Service available, ready to send request.')
23
24 def send_request(self, area_id: int, motion_id: int) -> bool:
25 request = SetMcPresetMotion.Request()
26 request.header = RequestHeader()
27
28 motion = McPresetMotion()
29 area = McControlArea()
30
31 motion.value = motion_id
32 area.value = area_id
33
34 request.motion = motion
35 request.area = area
36 request.interrupt = False
37
38 self.get_logger().info(
39 f'📨 Sending request to set preset motion: motion={motion_id}, area={area_id}')
40
41 for i in range(8):
42 request.header.stamp = self.get_clock().now().to_msg()
43 future = self.client.call_async(request)
44 rclpy.spin_until_future_complete(self, future, timeout_sec=0.25)
45
46 if future.done():
47 break
48
49 # retry as remote peer is NOT handled well by ROS
50 self.get_logger().info(f'trying ... [{i}]')
51
52 response = future.result()
53 if response is None:
54 self.get_logger().error('❌ Service call failed or timed out.')
55 return False
56
57 if response.response.state.value == CommonState.SUCCESS:
58 self.get_logger().info(
59 f'✅ Preset motion set successfully: {response.response.task_id}')
60 return True
61 elif response.response.state.value == CommonState.RUNNING:
62 self.get_logger().info(
63 f'⏳ Preset motion executing: {response.response.task_id}')
64 return True
65 else:
66 self.get_logger().error(
67 f'❌ Failed to set preset motion: {response.response.task_id}'
68 )
69 return False
70
71
72def main(args=None):
73 rclpy.init(args=args)
74 node = None
75 try:
76 area = int(input("Enter arm area ID (1-left, 2-right): "))
77 motion = int(input(
78 "Enter preset motion ID (1001-raise,1002-wave,1003-handshake,1004-airkiss): "))
79
80 node = SetMcPresetMotionClient()
81 node.send_request(area, motion)
82 except KeyboardInterrupt:
83 pass
84 except Exception as e:
85 rclpy.logging.get_logger('main').error(
86 f'Program exited with exception: {e}')
87
88 if node:
89 node.destroy_node()
90 if rclpy.ok():
91 rclpy.shutdown()
92
93
94if __name__ == '__main__':
95 main()
6.1.4 夹爪控制
该示例中用到了hand_control,通过往话题/aima/hal/joint/hand/command发布消息来控制夹爪的运动
注意
注意⚠️ :运行本示例前需要在机器人运控计算单元(PC1)使用aima em stop-app mc关闭原生运控模块,获取机器人控制权。注意确保机器人安全
1import rclpy
2from rclpy.node import Node
3from aimdk_msgs.msg import HandCommandArray, HandCommand, HandType, MessageHeader
4import time
5
6
7class HandControl(Node):
8 def __init__(self):
9 super().__init__('hand_control')
10
11 # Preset parameter list: [(left hand, right hand), ...]
12 self.position_pairs = [
13 (1.0, 1.0), # fully open
14 (0.0, 0.0), # fully closed
15 (0.5, 0.5), # half open
16 (0.2, 0.8), # left slightly closed, right more open
17 (0.7, 0.3) # left more open, right slightly closed
18 ]
19 self.current_index = 0
20 self.last_switch_time = self.get_clock().now().nanoseconds / 1e9 # seconds
21
22 # Create publisher
23 self.publisher_ = self.create_publisher(
24 HandCommandArray,
25 '/aima/hal/joint/hand/command',
26 10
27 )
28
29 # 50 Hz timer
30 self.timer_ = self.create_timer(
31 0.02, # 20 ms = 50 Hz
32 self.publish_hand_commands
33 )
34
35 self.get_logger().info("Hand control node started!")
36
37 def publish_hand_commands(self):
38 # Check time to decide whether to switch to the next preset
39 now_sec = self.get_clock().now().nanoseconds / 1e9
40 if now_sec - self.last_switch_time >= 2.0:
41 self.current_index = (self.current_index +
42 1) % len(self.position_pairs)
43 self.last_switch_time = now_sec
44 self.get_logger().info(
45 f"Switched to preset: {self.current_index}, left={self.position_pairs[self.current_index][0]:.2f}, right={self.position_pairs[self.current_index][1]:.2f}"
46 )
47
48 # Use current preset
49 left_position, right_position = self.position_pairs[self.current_index]
50
51 msg = HandCommandArray()
52 msg.header = MessageHeader()
53
54 # Configure left hand
55 left_hand = HandCommand()
56 left_hand.name = "left_hand"
57 left_hand.position = float(left_position)
58 left_hand.velocity = 1.0
59 left_hand.acceleration = 1.0
60 left_hand.deceleration = 1.0
61 left_hand.effort = 1.0
62
63 # Configure right hand
64 right_hand = HandCommand()
65 right_hand.name = "right_hand"
66 right_hand.position = float(right_position)
67 right_hand.velocity = 1.0
68 right_hand.acceleration = 1.0
69 right_hand.deceleration = 1.0
70 right_hand.effort = 1.0
71
72 msg.left_hand_type = HandType(value=2) # gripper mode
73 msg.right_hand_type = HandType(value=2)
74 msg.left_hands = [left_hand]
75 msg.right_hands = [right_hand]
76
77 # Publish message
78 self.publisher_.publish(msg)
79 # We only log when switching presets to avoid too much log output
80
81
82def main(args=None):
83 rclpy.init(args=args)
84 hand_control_node = HandControl()
85
86 try:
87 rclpy.spin(hand_control_node)
88 except KeyboardInterrupt:
89 pass
90 finally:
91 hand_control_node.destroy_node()
92 rclpy.shutdown()
93
94
95if __name__ == '__main__':
96 main()
6.1.5 灵巧手控制 (升级中暂不开放)
该示例中用到了omnihand_control,通过往话题/aima/hal/joint/hand/command发布消息来控制夹爪的运动
注意
注意⚠️ :运行本示例前需要在机器人运控计算单元(PC1)使用aima em stop-app mc关闭原生运控模块,获取机器人控制权。注意确保机器人安全
1#!/usr/bin/env python3
2import rclpy
3from rclpy.node import Node
4from aimdk_msgs.msg import HandCommand, HandCommandArray, HandType, MessageHeader
5
6
7class OmnihandControl(Node):
8 def __init__(self):
9 super().__init__('omnihand_control')
10
11 self.num_fingers = 5
12 self.joints_per_finger = 2
13 self.total_joints = self.num_fingers * self.joints_per_finger # 10
14
15 # ===== Define 10 gesture sets directly as arrays =====
16 # Each set has 20 joint params (left 10 + right 10)
17 # 1 = bent, 0 = straight
18 self.gesture_sequence = [
19 # bend 1 finger
20 [1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0,
21 1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
22 # bend 2 fingers
23 [1.0, 1.0, 1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0,
24 1.0, 1.0, 1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0],
25 # bend 3 fingers
26 [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0,
27 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0],
28 # bend 4 fingers
29 [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0, 0.0,
30 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0, 0.0],
31 # bend 5 fingers
32 [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0,
33 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0],
34 # release 1 finger
35 [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0, 0.0,
36 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0, 0.0],
37 # release 2 fingers
38 [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0,
39 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0],
40 # release 3 fingers
41 [1.0, 1.0, 1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0,
42 1.0, 1.0, 1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0],
43 # release 4 fingers
44 [1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0,
45 1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
46 # all straight
47 [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0,
48 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
49 ]
50
51 self.current_index = 0
52 self.last_switch_time = self.get_clock().now()
53
54 self.publisher = self.create_publisher(
55 HandCommandArray,
56 '/aima/hal/joint/hand/command',
57 10
58 )
59
60 # Switch gesture every second
61 self.timer = self.create_timer(
62 1.0,
63 self.publish_hand_commands
64 )
65
66 self.get_logger().info("Omnihand demo control node started!")
67
68 def publish_hand_commands(self):
69 now_time = self.get_clock().now()
70 if (now_time - self.last_switch_time).nanoseconds / 1e9 >= 1.0:
71 self.current_index = (self.current_index +
72 1) % len(self.gesture_sequence)
73 self.last_switch_time = now_time
74 self.get_logger().info(
75 f'Switched to gesture {self.current_index + 1}/10')
76
77 joint_array = self.gesture_sequence[self.current_index]
78
79 msg = HandCommandArray()
80 msg.header = MessageHeader()
81
82 # Left hand
83 for i in range(self.total_joints):
84 cmd = HandCommand()
85 cmd.name = f"left_hand_joint{i}"
86 cmd.position = joint_array[i]
87 cmd.velocity = 1.0
88 cmd.acceleration = 1.0
89 cmd.deceleration = 1.0
90 cmd.effort = 1.0
91 msg.left_hands.append(cmd)
92
93 # Right hand
94 for i in range(self.total_joints):
95 cmd = HandCommand()
96 cmd.name = f"right_hand_joint{i}"
97 cmd.position = joint_array[self.total_joints + i]
98 cmd.velocity = 1.0
99 cmd.acceleration = 1.0
100 cmd.deceleration = 1.0
101 cmd.effort = 1.0
102 msg.right_hands.append(cmd)
103
104 msg.left_hand_type = HandType()
105 msg.left_hand_type.value = 1 # dexterous hand mode
106 msg.right_hand_type = HandType()
107 msg.right_hand_type.value = 1
108
109 self.publisher.publish(msg)
110
111
112def main(args=None):
113 rclpy.init(args=args)
114 node = OmnihandControl()
115 try:
116 rclpy.spin(node)
117 except KeyboardInterrupt:
118 pass
119 node.destroy_node()
120 rclpy.shutdown()
121
122
123if __name__ == '__main__':
124 main()
6.1.6 注册二开输入源
对于v0.7之后的版本,在实现对MC的控制前,需要先注册输入源。该示例中通过/aimdk_5Fmsgs/srv/SetMcInputSource服务来注册二开的输入源,让MC感知到,只有注册过输入源后才能实现机器人的速度控制
1#!/usr/bin/env python3
2
3import rclpy
4import rclpy.logging
5from rclpy.node import Node
6
7from aimdk_msgs.srv import SetMcInputSource
8from aimdk_msgs.msg import RequestHeader, McInputAction
9
10
11class McInputClient(Node):
12 def __init__(self):
13 super().__init__('set_mc_input_source_client')
14 self.client = self.create_client(
15 SetMcInputSource, '/aimdk_5Fmsgs/srv/SetMcInputSource'
16 )
17
18 self.get_logger().info('✅ SetMcInputSource client node created.')
19
20 # Wait for the service to become available
21 while not self.client.wait_for_service(timeout_sec=2.0):
22 self.get_logger().info('⏳ Service unavailable, waiting...')
23
24 self.get_logger().info('🟢 Service available, ready to send request.')
25
26 def send_request(self):
27 req = SetMcInputSource.Request()
28
29 # header
30 req.request.header = RequestHeader()
31
32 # action (e.g. 1001 = register)
33 req.action = McInputAction()
34 req.action.value = 1001
35
36 # input source info
37 req.input_source.name = 'node'
38 req.input_source.priority = 40
39 req.input_source.timeout = 1000 # ms
40
41 # Send request and wait for response
42 self.get_logger().info(
43 f'📨 Sending input source request: action_id={req.action.value}, '
44 f'name={req.input_source.name}, priority={req.input_source.priority}'
45 )
46 for i in range(8):
47 req.request.header.stamp = self.get_clock().now().to_msg()
48 future = self.client.call_async(req)
49 rclpy.spin_until_future_complete(
50 self, future, timeout_sec=0.25)
51
52 if future.done():
53 break
54
55 # retry as remote peer is NOT handled well by ROS
56 self.get_logger().info(f'trying ... [{i}]')
57
58 if not future.done():
59 self.get_logger().error('❌ Service call failed or timed out.')
60 return False
61
62 response = future.result()
63 ret_code = response.response.header.code
64 task_id = response.response.task_id
65
66 if ret_code == 0:
67 self.get_logger().info(
68 f'✅ Input source set successfully. task_id={task_id}'
69 )
70 return True
71 else:
72 self.get_logger().error(
73 f'❌ Input source set failed. ret_code={ret_code}, task_id={task_id} (duplicated ADD? or MODIFY/ENABLE/DISABLE for unknown source?)'
74 )
75 return False
76
77
78def main(args=None):
79 rclpy.init(args=args)
80
81 node = None
82 try:
83 node = McInputClient()
84 ok = node.send_request()
85 if not ok:
86 node.get_logger().error('Input source request failed.')
87 except KeyboardInterrupt:
88 pass
89 except Exception as e:
90 rclpy.logging.get_logger('main').error(
91 f'Program exited with exception: {e}')
92
93 if node:
94 node.destroy_node()
95 if rclpy.ok():
96 rclpy.shutdown()
97
98
99if __name__ == '__main__':
100 main()
6.1.7 获取当前输入源
该示例中用到了GetCurrentInputSource服务,用于获取当前已注册的输入源信息,包括输入源名称、优先级和超时时间等信息。
1#!/usr/bin/env python3
2
3import rclpy
4from rclpy.node import Node
5from aimdk_msgs.srv import GetCurrentInputSource
6from aimdk_msgs.msg import CommonRequest
7
8
9class GetCurrentInputSourceClient(Node):
10 def __init__(self):
11 super().__init__('get_current_input_source_client')
12 self.client = self.create_client(
13 GetCurrentInputSource,
14 '/aimdk_5Fmsgs/srv/GetCurrentInputSource'
15 )
16
17 self.get_logger().info('✅ GetCurrentInputSource client node created.')
18
19 # Wait for the service to become available
20 while not self.client.wait_for_service(timeout_sec=2.0):
21 self.get_logger().info('⏳ Service unavailable, waiting...')
22
23 self.get_logger().info('🟢 Service available, ready to send request.')
24
25 def send_request(self):
26 # Create request
27 req = GetCurrentInputSource.Request()
28 req.request = CommonRequest()
29
30 # Send request and wait for response
31 self.get_logger().info('📨 Sending request to get current input source')
32 for i in range(8):
33 req.request.header.stamp = self.get_clock().now().to_msg()
34 future = self.client.call_async(req)
35 rclpy.spin_until_future_complete(self, future, timeout_sec=0.25)
36
37 if future.done():
38 break
39
40 # retry as remote peer is NOT handled well by ROS
41 self.get_logger().info(f'trying ... [{i}]')
42
43 if not future.done():
44 self.get_logger().error('❌ Service call failed or timed out.')
45 return False
46
47 response = future.result()
48 ret_code = response.response.header.code
49 if ret_code == 0:
50 self.get_logger().info(
51 f'✅ Current input source get successfully:')
52 self.get_logger().info(
53 f'Name: {response.input_source.name}')
54 self.get_logger().info(
55 f'Priority: {response.input_source.priority}')
56 self.get_logger().info(
57 f'Timeout: {response.input_source.timeout}')
58 return True
59 else:
60 self.get_logger().error(
61 f'❌ Current input source get failed, return code: {ret_code}')
62 return False
63
64
65def main(args=None):
66 rclpy.init(args=args)
67
68 node = None
69 try:
70 node = GetCurrentInputSourceClient()
71 success = node.send_request()
72 except KeyboardInterrupt:
73 pass
74 except Exception as e:
75 rclpy.logging.get_logger('main').error(
76 f'Program exited with exception: {e}')
77
78 if node:
79 node.destroy_node()
80 if rclpy.ok():
81 rclpy.shutdown()
82
83
84if __name__ == '__main__':
85 main()
使用说明
# 获取当前输入源信息
ros2 run py_examples get_current_input_source
输出示例
[INFO] [get_current_input_source_client]: 当前输入源: node
[INFO] [get_current_input_source_client]: 优先级: 40
[INFO] [get_current_input_source_client]: 超时时间: 1000
注意事项
确保GetCurrentInputSource服务正常运行
需要在注册输入源之后才能获取到有效信息
状态码为0表示查询成功
6.1.8 控制机器人走跑
该示例中用到了mc_locomotion_velocity,以下示例通过发布/aima/mc/locomotion/velocity话题控制机器人的行走,对于v0.7之后的版本,实现速度控制前需要注册输入源(该示例已实现注册输入源),具体注册步骤可参考代码。
进入稳定站立模式后执行本程序
1#!/usr/bin/env python3
2
3import rclpy
4from rclpy.node import Node
5import time
6import signal
7import sys
8
9from aimdk_msgs.msg import McLocomotionVelocity, MessageHeader
10from aimdk_msgs.srv import SetMcInputSource
11
12
13class DirectVelocityControl(Node):
14 def __init__(self):
15 super().__init__('direct_velocity_control')
16
17 self.publisher = self.create_publisher(
18 McLocomotionVelocity, '/aima/mc/locomotion/velocity', 10)
19 self.client = self.create_client(
20 SetMcInputSource, '/aimdk_5Fmsgs/srv/SetMcInputSource')
21
22 self.forward_velocity = 0.0
23 self.lateral_velocity = 0.0
24 self.angular_velocity = 0.0
25
26 self.max_forward_speed = 1.0
27 self.min_forward_speed = 0.2
28
29 self.max_lateral_speed = 1.0
30 self.min_lateral_speed = 0.2
31
32 self.max_angular_speed = 1.0
33 self.min_angular_speed = 0.1
34
35 self.timer = None
36
37 self.get_logger().info("Direct velocity control node started!")
38
39 def start_publish(self):
40 if not self.timer:
41 self.timer = self.create_timer(0.02, self.publish_velocity)
42
43 def register_input_source(self):
44 self.get_logger().info("Registering input source...")
45
46 timeout_sec = 8.0
47 start = self.get_clock().now().nanoseconds / 1e9
48
49 while not self.client.wait_for_service(timeout_sec=2.0):
50 now = self.get_clock().now().nanoseconds / 1e9
51 if now - start > timeout_sec:
52 self.get_logger().error("Waiting for service timed out")
53 return False
54 self.get_logger().info("Waiting for input source service...")
55
56 req = SetMcInputSource.Request()
57 req.action.value = 1001
58 req.input_source.name = "node"
59 req.input_source.priority = 40
60 req.input_source.timeout = 1000
61
62 for i in range(8):
63 req.request.header.stamp = self.get_clock().now().to_msg()
64 future = self.client.call_async(req)
65 rclpy.spin_until_future_complete(self, future, timeout_sec=0.25)
66
67 if future.done():
68 break
69
70 # retry as remote peer is NOT handled well by ROS
71 self.get_logger().info(f"trying to register input source... [{i}]")
72
73 if future.done():
74 try:
75 response = future.result()
76 state = response.response.state.value
77 self.get_logger().info(
78 f"Input source set successfully: state={state}, task_id={response.response.task_id}")
79 return True
80 except Exception as e:
81 self.get_logger().error(f"Service call exception: {str(e)}")
82 return False
83 else:
84 self.get_logger().error("Service call failed or timed out")
85 return False
86
87 def publish_velocity(self):
88 msg = McLocomotionVelocity()
89 msg.header = MessageHeader()
90 msg.header.stamp = self.get_clock().now().to_msg()
91 msg.source = "node"
92 msg.forward_velocity = self.forward_velocity
93 msg.lateral_velocity = self.lateral_velocity
94 msg.angular_velocity = self.angular_velocity
95
96 self.publisher.publish(msg)
97
98 self.get_logger().info(
99 f"Publishing velocity: forward {self.forward_velocity:.2f} m/s, "
100 f"lateral {self.lateral_velocity:.2f} m/s, "
101 f"angular {self.angular_velocity:.2f} rad/s"
102 )
103
104 def set_forward(self, forward):
105 # check value range, mc has thresholds to start movement
106 if abs(forward) < 0.005:
107 self.forward_velocity = 0.0
108 return True
109 elif abs(forward) > self.max_forward_speed or abs(forward) < self.min_forward_speed:
110 raise ValueError("out of range")
111 else:
112 self.forward_velocity = forward
113 return True
114
115 def set_lateral(self, lateral):
116 # check value range, mc has thresholds to start movement
117 if abs(lateral) < 0.005:
118 self.lateral_velocity = 0.0
119 return True
120 elif abs(lateral) > self.max_lateral_speed or abs(lateral) < self.min_lateral_speed:
121 raise ValueError("out of range")
122 else:
123 self.lateral_velocity = lateral
124 return True
125
126 def set_angular(self, angular):
127 # check value range, mc has thresholds to start movement
128 if abs(angular) < 0.005:
129 self.angular_velocity = 0.0
130 return True
131 elif abs(angular) > self.max_angular_speed or abs(angular) < self.min_angular_speed:
132 raise ValueError("out of range")
133 else:
134 self.angular_velocity = angular
135 return True
136
137 def clear_velocity(self):
138 self.forward_velocity = 0.0
139 self.lateral_velocity = 0.0
140 self.angular_velocity = 0.0
141
142
143# Global node instance for signal handling
144global_node = None
145
146
147def signal_handler(sig, frame):
148 global global_node
149 if global_node is not None:
150 global_node.clear_velocity()
151 global_node.get_logger().info(
152 f"Received signal {sig}, clearing velocity and shutting down")
153 rclpy.shutdown()
154 sys.exit(0)
155
156
157def main():
158 global global_node
159 rclpy.init()
160
161 node = DirectVelocityControl()
162 global_node = node
163
164 signal.signal(signal.SIGINT, signal_handler)
165 signal.signal(signal.SIGTERM, signal_handler)
166
167 if not node.register_input_source():
168 node.get_logger().error("Input source registration failed, exiting")
169 rclpy.shutdown()
170 return
171
172 # get and check control values
173 # notice that mc has thresholds to start movement
174 try:
175 # get input forward
176 forward = float(
177 input("Please enter forward velocity 0 or ±(0.2 ~ 1.0) m/s: "))
178 node.set_forward(forward)
179 # get input lateral
180 lateral = float(
181 input("Please enter lateral velocity 0 or ±(0.2 ~ 1.0) m/s: "))
182 node.set_lateral(lateral)
183 # get input angular
184 angular = float(
185 input("Please enter angular velocity 0 or ±(0.1 ~ 1.0) rad/s: "))
186 node.set_angular(angular)
187 except Exception as e:
188 node.get_logger().error(f"Invalid input: {e}")
189 rclpy.shutdown()
190 return
191
192 node.get_logger().info("Setting velocity, moving for 5 seconds")
193 node.start_publish()
194
195 start = node.get_clock().now()
196 while (node.get_clock().now() - start).nanoseconds / 1e9 < 5.0:
197 rclpy.spin_once(node, timeout_sec=0.1)
198 time.sleep(0.001)
199
200 node.clear_velocity()
201 node.get_logger().info("5-second motion finished, robot stopped")
202
203 rclpy.spin(node)
204 rclpy.shutdown()
205
206
207if __name__ == '__main__':
208 main()
6.1.9 关节电机控制
本示例展示了如何使用ROS2和Ruckig库来控制机器人的关节运动。
注意
注意⚠️ :运行本示例前需要在机器人运控计算单元(PC1)使用aima em stop-app mc关闭原生运控模块,获取机器人控制权。注意确保机器人安全
!该示例代码直接对底层电机也就是HAL层进行控制,在运行程序前请检查代码中对关节的安全限位是否与所用机器人相符, 并确保安全!
机器人关节控制示例
这个示例展示了如何使用ROS2和Ruckig库来控制机器人的关节运动。示例实现了以下功能:
机器人关节模型定义
基于Ruckig的轨迹插补
多关节协同控制
实时位置、速度和加速度控制
示例功能说明
创建了四个控制器节点,分别控制:
腿部x2(12个关节)
腰部x1(3个关节)
手臂x2(14个关节)
头部x1(2个关节)
演示功能:
每10秒让指定关节在正负0.5弧度之间摆动
使用Ruckig库实现平滑的运动轨迹
实时发布关节控制命令
自定义使用
添加新的控制逻辑:
添加新的控制回调函数
自由组合关节运动
调整控制频率:
修改
control_timer_的周期(当前为2ms)
1#!/usr/bin/env python3
2"""
3Robot joint control example
4This script implements a ROS2-based robot joint control system, using the Ruckig trajectory
5planner to achieve smooth joint motion control.
6
7Main features:
81. Supports controlling multiple joint areas (head, arm, waist, leg)
92. Uses Ruckig for trajectory planning to ensure smooth motion
103. Supports real-time control of joint position, velocity, and acceleration
114. Provides joint limit and PID (stiffness/damping) parameter configuration
12"""
13
14import rclpy
15from rclpy.node import Node
16from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy, DurabilityPolicy
17from aimdk_msgs.msg import JointCommandArray, JointStateArray, JointCommand
18from std_msgs.msg import Header
19import ruckig
20from enum import Enum
21from dataclasses import dataclass
22from typing import List, Dict
23from threading import Lock
24
25# QoS config: define ROS2 Quality of Service parameters
26# Subscriber QoS: best-effort reliability, keep last 10 messages
27subscriber_qos = QoSProfile(
28 reliability=ReliabilityPolicy.BEST_EFFORT,
29 history=HistoryPolicy.KEEP_LAST,
30 depth=10,
31 durability=DurabilityPolicy.VOLATILE
32)
33
34# Publisher QoS: reliable transport, keep last 10 messages
35publisher_qos = QoSProfile(
36 reliability=ReliabilityPolicy.RELIABLE,
37 history=HistoryPolicy.KEEP_LAST,
38 depth=10,
39 durability=DurabilityPolicy.VOLATILE
40)
41
42
43class JointArea(Enum):
44 HEAD = 'HEAD' # Head joints
45 ARM = 'ARM' # Arm joints
46 WAIST = 'WAIST' # Waist joints
47 LEG = 'LEG' # Leg joints
48
49
50@dataclass
51class JointInfo:
52 # Joint information data class
53 name: str # Joint name
54 lower_limit: float # Joint lower angle limit
55 upper_limit: float # Joint upper angle limit
56 kp: float # Position control proportional gain
57 kd: float # Velocity control derivative gain
58
59
60# Robot model configuration: define all joint parameters
61robot_model: Dict[JointArea, List[JointInfo]] = {
62 # Leg joint configuration
63 JointArea.LEG: [
64 # Left leg joints
65 JointInfo("left_hip_pitch_joint", -2.4871,
66 2.4871, 40.0, 4.0), # left hip pitch
67 JointInfo("left_hip_roll_joint", -0.12217,
68 2.9059, 40.0, 4.0), # left hip roll
69 JointInfo("left_hip_yaw_joint", -1.6842,
70 3.4296, 30.0, 3.0), # left hip yaw
71 JointInfo("left_knee_joint", 0.026179,
72 2.1206, 80.0, 8.0), # left knee
73 JointInfo("left_ankle_pitch_joint", -0.80285,
74 0.45378, 40.0, 4.0), # left ankle pitch
75 JointInfo("left_ankle_roll_joint", -0.2618,
76 0.2618, 20.0, 2.0), # left ankle roll
77 # Right leg joints
78 JointInfo("right_hip_pitch_joint", -2.4871,
79 2.4871, 40.0, 4.0), # right hip pitch
80 JointInfo("right_hip_roll_joint", -2.9059,
81 0.12217, 40.0, 4.0), # right hip roll
82 JointInfo("right_hip_yaw_joint", -3.4296,
83 1.6842, 30.0, 3.0), # right hip yaw
84 JointInfo("right_knee_joint", 0.026179,
85 2.1206, 80.0, 8.0), # right knee
86 JointInfo("right_ankle_pitch_joint", -0.80285,
87 0.45378, 40.0, 4.0), # right ankle pitch
88 JointInfo("right_ankle_roll_joint", -0.2618,
89 0.2618, 20.0, 2.0), # right ankle roll
90 ],
91 # Waist joint configuration
92 JointArea.WAIST: [
93 JointInfo("waist_yaw_joint", -3.4296,
94 2.3824, 20.0, 4.0), # waist yaw
95 JointInfo("waist_pitch_joint", -0.17453,
96 0.17453, 20.0, 4.0), # waist pitch
97 JointInfo("waist_roll_joint", -0.48869,
98 0.48869, 20.0, 4.0), # waist roll
99 ],
100 # Arm joint configuration
101 JointArea.ARM: [
102 # Left arm
103 JointInfo("left_shoulder_pitch_joint", -
104 2.5569, 2.5569, 20.0, 2.0), # left shoulder pitch
105 JointInfo("left_shoulder_roll_joint", -
106 0.06108, 3.3598, 20.0, 2.0), # left shoulder roll
107 JointInfo("left_shoulder_yaw_joint", -
108 2.5569, 2.5569, 20.0, 2.0), # left shoulder yaw
109 JointInfo("left_elbow_pitch_joint", -2.4435,
110 0.0, 20.0, 2.0), # left elbow pitch
111 JointInfo("left_elbow_roll_joint", -2.5569,
112 2.5569, 20.0, 2.0), # left elbow roll
113 JointInfo("left_wrist_pitch_joint", -0.5585,
114 0.5585, 20.0, 2.0), # left wrist pitch
115 JointInfo("left_wrist_yaw_joint", -1.5446,
116 1.5446, 20.0, 2.0), # left wrist yaw
117 # Right arm
118 JointInfo("right_shoulder_pitch_joint", -
119 2.5569, 2.5569, 20.0, 2.0), # right shoulder pitch
120 JointInfo("right_shoulder_roll_joint", -
121 3.3598, 0.06108, 20.0, 2.0), # right shoulder roll
122 JointInfo("right_shoulder_yaw_joint", -
123 2.5569, 2.5569, 20.0, 2.0), # right shoulder yaw
124 JointInfo("right_elbow_pitch_joint", 0.0,
125 2.4435, 20.0, 2.0), # right elbow pitch
126 JointInfo("right_elbow_roll_joint", -2.5569,
127 2.5569, 20.0, 2.0), # right elbow roll
128 JointInfo("right_wrist_pitch_joint", -
129 0.5585, 0.5585, 20.0, 2.0), # right wrist pitch
130 JointInfo("right_wrist_yaw_joint", -1.5446,
131 1.5446, 20.0, 2.0), # right wrist yaw
132 ],
133 # Head joint configuration
134 JointArea.HEAD: [
135 JointInfo("head_pitch_joint", -0.61087,
136 0.61087, 20.0, 2.0), # head pitch
137 JointInfo("head_yaw_joint", -0.61087,
138 0.61087, 20.0, 2.0), # head yaw
139 ],
140}
141
142
143class JointControllerNode(Node):
144 """
145 Joint controller node
146 Responsible for receiving joint states, using Ruckig for trajectory planning,
147 and publishing joint commands.
148 """
149
150 def __init__(self, node_name: str, sub_topic: str, pub_topic: str, area: JointArea, dofs: int):
151 """
152 Initialize joint controller
153 Args:
154 node_name: node name
155 sub_topic: topic name to subscribe (joint states)
156 pub_topic: topic name to publish (joint commands)
157 area: joint area (head/arm/waist/leg)
158 dofs: number of DOFs
159 """
160 super().__init__(node_name)
161 self.lock = Lock()
162 self.joint_info = robot_model[area]
163 self.dofs = dofs
164 self.ruckig = ruckig.Ruckig(dofs, 0.002) # 2 ms control period
165 self.input = ruckig.InputParameter(dofs)
166 self.output = ruckig.OutputParameter(dofs)
167 self.ruckig_initialized = False
168
169 # Initialize trajectory parameters
170 self.input.current_position = [0.0] * dofs
171 self.input.current_velocity = [0.0] * dofs
172 self.input.current_acceleration = [0.0] * dofs
173
174 # Motion limits
175 self.input.max_velocity = [1.0] * dofs
176 self.input.max_acceleration = [1.0] * dofs
177 self.input.max_jerk = [25.0] * dofs
178
179 # ROS2 subscriber and publisher
180 self.sub = self.create_subscription(
181 JointStateArray,
182 sub_topic,
183 self.joint_state_callback,
184 subscriber_qos
185 )
186 self.pub = self.create_publisher(
187 JointCommandArray,
188 pub_topic,
189 publisher_qos
190 )
191
192 def joint_state_callback(self, msg: JointStateArray):
193 """
194 Joint state callback
195 Receives and processes joint state messages
196 """
197 self.ruckig_initialized = True
198
199 def control_callback(self, joint_idx):
200 """
201 Control callback
202 Uses Ruckig for trajectory planning and publishes control commands
203 Args:
204 joint_idx: target joint index
205 """
206 # Run Ruckig until the target is reached
207 while self.ruckig.update(self.input, self.output) in [ruckig.Result.Working, ruckig.Result.Finished]:
208 # Update current state
209 self.input.current_position = self.output.new_position
210 self.input.current_velocity = self.output.new_velocity
211 self.input.current_acceleration = self.output.new_acceleration
212
213 # Check if target is reached
214 tolerance = 1e-6
215 current_p = self.output.new_position[joint_idx]
216 if abs(current_p - self.input.target_position[joint_idx]) < tolerance:
217 break
218
219 # Create and publish command
220 cmd = JointCommandArray()
221 for i, joint in enumerate(self.joint_info):
222 j = JointCommand()
223 j.name = joint.name
224 j.position = self.output.new_position[i]
225 j.velocity = self.output.new_velocity[i]
226 j.effort = 0.0
227 j.stiffness = joint.kp
228 j.damping = joint.kd
229 cmd.joints.append(j)
230
231 self.pub.publish(cmd)
232
233 def set_target_position(self, joint_name, position):
234 """
235 Set target joint position
236 Args:
237 joint_name: joint name
238 position: target position
239 """
240 p_s = [0.0] * self.dofs
241 joint_idx = 0
242 for i, joint in enumerate(self.joint_info):
243 if joint.name == joint_name:
244 p_s[i] = position
245 joint_idx = i
246 self.input.target_position = p_s
247 self.input.target_velocity = [0.0] * self.dofs
248 self.input.target_acceleration = [0.0] * self.dofs
249 self.control_callback(joint_idx)
250
251
252def main(args=None):
253 """
254 Main function
255 Initialize ROS2 node and start joint controller
256 """
257 rclpy.init(args=args)
258
259 # Create leg controller node
260 leg_node = JointControllerNode(
261 "leg_node",
262 "/aima/hal/joint/leg/state",
263 "/aima/hal/joint/leg/command",
264 JointArea.LEG,
265 12
266 )
267
268 # waist_node = JointControllerNode(
269 # "waist_node",
270 # "/aima/hal/joint/waist/state",
271 # "/aima/hal/joint/waist/command",
272 # JointArea.WAIST,
273 # 3
274 # )
275
276 # arm_node = JointControllerNode(
277 # "arm_node",
278 # "/aima/hal/joint/arm/state",
279 # "/aima/hal/joint/arm/command",
280 # JointArea.ARM,
281 # 14
282 # )
283
284 # head_node = JointControllerNode(
285 # "head_node",
286 # "/aima/hal/joint/head/state",
287 # "/aima/hal/joint/head/command",
288 # JointArea.HEAD,
289 # 2
290 # )
291
292 position = 0.8
293
294 # Only control the left leg joint. If you want to control a specific joint, assign it directly.
295 def timer_callback():
296 """
297 Timer callback
298 Periodically change target position to achieve oscillating motion
299 """
300 nonlocal position
301 position = -position
302 position = 1.3 + position
303 leg_node.set_target_position("left_knee_joint", position)
304
305 # arm_node.set_target_position("left_shoulder_pitch_joint", position)
306 # waist_node.set_target_position("waist_yaw_joint", position)
307 # head_node.set_target_position("head_pitch_joint", position)
308
309 leg_node.create_timer(3.0, timer_callback)
310
311 # Multi-threaded executor
312 executor = rclpy.executors.MultiThreadedExecutor()
313 executor.add_node(leg_node)
314
315 # executor.add_node(waist_node)
316 # executor.add_node(arm_node)
317 # executor.add_node(head_node)
318
319 try:
320 executor.spin()
321 except KeyboardInterrupt:
322 pass
323 finally:
324 leg_node.destroy_node()
325 # waist_node.destroy_node()
326 # arm_node.destroy_node()
327 # head_node.destroy_node()
328 if rclpy.ok():
329 rclpy.shutdown()
330
331
332if __name__ == '__main__':
333 main()
6.1.10 键盘控制机器人
本示例实现了通过PC的键盘输入控制机器人的前进后退转弯的功能。
通过WASD控制机器人行走方向,增加/减少速度(±0.2 m/s),Q/E增加/减少角速度(±0.1 rad/s),ESC退出程序并释放终端资源,Space立即将速度归零,执行急停。
小心
注意:运行本示例前需要先使用手柄,将机器人切入稳定站立模式。(位控站立/走跑模式时按R2+X, 其他模式详见模式流转图进行操作),然后在机器人的终端界面使用:aima em stop-app rc关闭遥控器,防止通道占用。
实现键盘控制前需要注册输入源(该示例已实现注册输入源)
1#!/usr/bin/env python3
2
3import rclpy
4import rclpy.logging
5from rclpy.node import Node
6from aimdk_msgs.msg import McLocomotionVelocity, MessageHeader
7from aimdk_msgs.srv import SetMcInputSource
8import curses
9import time
10from functools import partial
11
12
13class KeyboardVelocityController(Node):
14 def __init__(self, stdscr):
15 super().__init__('keyboard_velocity_controller')
16 self.stdscr = stdscr
17 self.forward_velocity = 0.0
18 self.lateral_velocity = 0.0
19 self.angular_velocity = 0.0
20 self.step = 0.2
21 self.angular_step = 0.1
22
23 self.publisher = self.create_publisher(
24 McLocomotionVelocity, '/aima/mc/locomotion/velocity', 10)
25 self.client = self.create_client(
26 SetMcInputSource, '/aimdk_5Fmsgs/srv/SetMcInputSource')
27
28 if not self.register_input_source():
29 self.get_logger().error("Input source registration failed, exiting")
30 raise RuntimeError("Failed to register input source")
31
32 # Configure curses
33 curses.cbreak()
34 curses.noecho()
35 self.stdscr.keypad(True)
36 self.stdscr.nodelay(True)
37
38 self.get_logger().info(
39 "Control started: W/S forward/backward, A/D strafe, Q/E turn, Space stop, Esc exit")
40
41 # Timer: check keyboard every 50 ms
42 self.timer = self.create_timer(0.05, self.check_key_and_publish)
43
44 def register_input_source(self):
45 self.get_logger().info("Registering input source...")
46
47 timeout_sec = 8.0
48 start = self.get_clock().now().nanoseconds / 1e9
49
50 while not self.client.wait_for_service(timeout_sec=2.0):
51 now = self.get_clock().now().nanoseconds / 1e9
52 if now - start > timeout_sec:
53 self.get_logger().error("Waiting for service timed out")
54 return False
55 self.get_logger().info("Waiting for input source service...")
56
57 req = SetMcInputSource.Request()
58 req.action.value = 1001
59 req.input_source.name = "node"
60 req.input_source.priority = 40
61 req.input_source.timeout = 1000
62
63 for i in range(8):
64 req.request.header.stamp = self.get_clock().now().to_msg()
65 future = self.client.call_async(req)
66 rclpy.spin_until_future_complete(self, future, timeout_sec=0.25)
67
68 if future.done():
69 break
70
71 # retry as remote peer is NOT handled well by ROS
72 self.get_logger().info(f"trying to register input source... [{i}]")
73
74 if future.done():
75 try:
76 resp = future.result()
77 state = resp.response.state.value
78 self.get_logger().info(
79 f"Input source set successfully: state={state}, task_id={resp.response.task_id}")
80 return True
81 except Exception as e:
82 self.get_logger().error(f"Service exception: {str(e)}")
83 return False
84 else:
85 self.get_logger().error("Service call failed or timed out")
86 return False
87
88 def check_key_and_publish(self):
89 try:
90 ch = self.stdscr.getch()
91 except Exception:
92 ch = -1
93
94 if ch != -1:
95 if ch == ord(' '):
96 self.forward_velocity = 0.0
97 self.lateral_velocity = 0.0
98 self.angular_velocity = 0.0
99 elif ch == ord('w'):
100 self.forward_velocity = min(
101 self.forward_velocity + self.step, 1.0)
102 elif ch == ord('s'):
103 self.forward_velocity = max(
104 self.forward_velocity - self.step, -1.0)
105 elif ch == ord('a'):
106 self.lateral_velocity = min(
107 self.lateral_velocity + self.step, 1.0)
108 elif ch == ord('d'):
109 self.lateral_velocity = max(
110 self.lateral_velocity - self.step, -1.0)
111 elif ch == ord('q'):
112 self.angular_velocity = min(
113 self.angular_velocity + self.angular_step, 1.0)
114 elif ch == ord('e'):
115 self.angular_velocity = max(
116 self.angular_velocity - self.angular_step, -1.0)
117 elif ch == 27: # ESC
118 self.get_logger().info("Exiting control")
119 rclpy.shutdown()
120 return
121
122 msg = McLocomotionVelocity()
123 msg.header = MessageHeader()
124 msg.header.stamp = self.get_clock().now().to_msg()
125 msg.source = "node"
126 msg.forward_velocity = self.forward_velocity
127 msg.lateral_velocity = self.lateral_velocity
128 msg.angular_velocity = self.angular_velocity
129
130 self.publisher.publish(msg)
131
132 # Update UI
133 self.stdscr.clear()
134 self.stdscr.addstr(
135 0, 0, "W/S: Forward/Backward | A/D: Strafe | Q/E: Turn | Space: Stop | ESC: Exit")
136 self.stdscr.addstr(2, 0,
137 f"Speed Status: Forward: {self.forward_velocity:.2f} m/s | "
138 f"Lateral: {self.lateral_velocity:.2f} m/s | "
139 f"Angular: {self.angular_velocity:.2f} rad/s")
140 self.stdscr.refresh()
141
142
143def curses_main(stdscr):
144 rclpy.init()
145 try:
146 node = KeyboardVelocityController(stdscr)
147 rclpy.spin(node)
148 except Exception as e:
149 rclpy.logging.get_logger("main").fatal(
150 f"Program exited with exception: {e}")
151 finally:
152 curses.endwin()
153 rclpy.shutdown()
154
155
156def main():
157 curses.wrapper(curses_main)
158
159
160if __name__ == '__main__':
161 main()
6.1.11 拍照
该示例中用到了take_photo,在运行节点程序前,修改需要拍照的相机话题,启动节点程序后,会创建/images/目录,将当前帧图像保存在这个目录下。
1#!/usr/bin/env python3
2import time
3from pathlib import Path
4
5import rclpy
6from rclpy.node import Node
7from rclpy.qos import qos_profile_sensor_data
8from sensor_msgs.msg import Image
9from cv_bridge import CvBridge
10import cv2
11
12
13class SaveOneRawPy(Node):
14 def __init__(self):
15 super().__init__('save_one_image')
16
17 # parameter: image topic
18 self.declare_parameter(
19 'image_topic', '/aima/hal/sensor/stereo_head_front_left/rgb_image'
20 )
21 self.topic = self.get_parameter(
22 'image_topic').get_parameter_value().string_value
23
24 # save directory
25 self.save_dir = Path('images').resolve()
26 self.save_dir.mkdir(parents=True, exist_ok=True)
27
28 # state
29 self._saved = False
30 self._bridge = CvBridge()
31
32 # subscriber (sensor QoS)
33 self.sub = self.create_subscription(
34 Image,
35 self.topic,
36 self.image_cb,
37 qos_profile_sensor_data
38 )
39 self.get_logger().info(f'Subscribing to raw image: {self.topic}')
40 self.get_logger().info(f'Images will be saved to: {self.save_dir}')
41
42 def image_cb(self, msg: Image):
43 # already saved one, ignore later frames
44 if self._saved:
45 return
46
47 try:
48 enc = msg.encoding.lower()
49 self.get_logger().info(f'Received image with encoding: {enc}')
50
51 # convert from ROS Image to cv2
52 img = self._bridge.imgmsg_to_cv2(
53 msg, desired_encoding='passthrough')
54
55 # normalize to BGR for saving
56 if enc == 'rgb8':
57 img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
58 elif enc == 'mono8':
59 img = cv2.cvtColor(img, cv2.COLOR_GRAY2BGR)
60 # if it's bgr8 or other 8-bit bgr that cv2 can save, we just use it
61
62 ts_ms = int(time.time() * 1000)
63 out_path = self.save_dir / f'frame_{ts_ms}.png'
64
65 ok = cv2.imwrite(str(out_path), img)
66 if ok:
67 self.get_logger().info(
68 f'Saved image: {out_path} ({img.shape[1]}x{img.shape[0]})'
69 )
70 self._saved = True
71 # shut down once we got exactly one frame
72 # destroy node first, then shutdown rclpy
73 self.destroy_node()
74 if rclpy.ok():
75 rclpy.shutdown()
76 else:
77 self.get_logger().error(f'cv2.imwrite failed: {out_path}')
78 except Exception as e:
79 self.get_logger().error(f'Failed to decode / save image: {e}')
80
81
82def main():
83 rclpy.init()
84 node = SaveOneRawPy()
85 rclpy.spin(node)
86 # in case the node was already destroyed in the callback
87 if rclpy.ok():
88 try:
89 node.destroy_node()
90 except Exception:
91 pass
92 rclpy.shutdown()
93
94
95if __name__ == '__main__':
96 main()
6.1.12 相机推流示例集
该示例集提供了多种相机数据订阅和处理功能,支持深度相机、双目相机和单目相机的数据流订阅。
这些相机数据订阅 example 并没有实际的业务用途, 仅提供相机数据基础信息的打印;若您比较熟悉 ros2 使用,会发现 ros2 topic echo + ros2 topic hz 也能够实现 example 提供的功能。
您可以选择快速查阅 SDK 接口手册中话题列表直接快速进入自己模块的开发,也可以使用相机 example 作为脚手架加入自己的业务逻辑。
我们发布的传感器数据均为未经预处理(去畸变等)的原始数据,如果您需要查询传感器的详细信息(如分辨率、焦距等),请关注相应的内参(camera_info)话题。
深度相机数据订阅
该示例中用到了echo_camera_rgbd,通过订阅/aima/hal/sensor/rgbd_head_front/话题来接收机器人的深度相机数据,支持深度点云、深度图、RGB图、压缩RGB图和相机内参等多种数据类型。
功能特点:
支持多种数据类型订阅(深度点云、深度图、RGB图、压缩图、相机内参)
实时FPS统计和数据显示
支持RGB图视频录制功能
可配置的topic类型选择
支持的数据类型:
depth_pointcloud: 深度点云数据 (sensor_msgs/PointCloud2)depth_image: 深度图像 (sensor_msgs/Image)rgb_image: RGB图像 (sensor_msgs/Image)rgb_image_compressed: 压缩RGB图像 (sensor_msgs/CompressedImage)camera_info: 相机内参 (sensor_msgs/CameraInfo)
1#!/usr/bin/env python3
2"""
3Head depth camera multi-topic subscription example
4
5Supports selecting the topic type to subscribe via startup parameter --ros-args -p topic_type:=<type>:
6 - depth_pointcloud: Depth point cloud (sensor_msgs/PointCloud2)
7 - depth_image: Depth image (sensor_msgs/Image)
8 - depth_camera_info: Camera intrinsic parameters (sensor_msgs/CameraInfo)
9 - rgb_image: RGB image (sensor_msgs/Image)
10 - rgb_image_compressed: RGB compressed image (sensor_msgs/CompressedImage)
11 - rgb_camera_info: Camera intrinsic parameters (sensor_msgs/CameraInfo)
12
13Example:
14 ros2 run py_examples echo_camera_rgbd --ros-args -p topic_type:=depth_pointcloud
15 ros2 run py_examples echo_camera_rgbd --ros-args -p topic_type:=rgb_image
16 ros2 run py_examples echo_camera_rgbd --ros-args -p topic_type:=rgb_camera_info
17
18Default topic_type is rgb_image
19"""
20
21import rclpy
22from rclpy.node import Node
23from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy
24from sensor_msgs.msg import Image, CompressedImage, CameraInfo, PointCloud2
25from collections import deque
26import time
27
28
29class CameraTopicEcho(Node):
30 def __init__(self):
31 super().__init__('camera_topic_echo')
32
33 # Select the topic type to subscribe
34 self.declare_parameter('topic_type', 'rgb_image')
35 self.declare_parameter('dump_video_path', '')
36
37 self.topic_type = self.get_parameter('topic_type').value
38 self.dump_video_path = self.get_parameter('dump_video_path').value
39
40 # SensorDataQoS: BEST_EFFORT + VOLATILE
41 qos = QoSProfile(
42 reliability=QoSReliabilityPolicy.BEST_EFFORT,
43 history=QoSHistoryPolicy.KEEP_LAST,
44 depth=5
45 )
46
47 # Create different subscribers based on topic_type
48 if self.topic_type == "depth_pointcloud":
49 self.topic_name = "/aima/hal/sensor/rgbd_head_front/depth_pointcloud"
50 self.sub_pointcloud = self.create_subscription(
51 PointCloud2, self.topic_name, self.cb_pointcloud, qos)
52 self.get_logger().info(
53 f"✅ Subscribing PointCloud2: {self.topic_name}")
54
55 elif self.topic_type == "depth_image":
56 self.topic_name = "/aima/hal/sensor/rgbd_head_front/depth_image"
57 self.sub_image = self.create_subscription(
58 Image, self.topic_name, self.cb_image, qos)
59 self.get_logger().info(
60 f"✅ Subscribing Depth Image: {self.topic_name}")
61
62 elif self.topic_type == "rgb_image":
63 self.topic_name = "/aima/hal/sensor/rgbd_head_front/rgb_image"
64 self.sub_image = self.create_subscription(
65 Image, self.topic_name, self.cb_image, qos)
66 self.get_logger().info(
67 f"✅ Subscribing RGB Image: {self.topic_name}")
68 if self.dump_video_path:
69 self.get_logger().info(
70 f"📝 Will dump received images to video: {self.dump_video_path}")
71
72 elif self.topic_type == "rgb_image_compressed":
73 self.topic_name = "/aima/hal/sensor/rgbd_head_front/rgb_image/compressed"
74 self.sub_compressed = self.create_subscription(
75 CompressedImage, self.topic_name, self.cb_compressed, qos)
76 self.get_logger().info(
77 f"✅ Subscribing CompressedImage: {self.topic_name}")
78
79 elif self.topic_type == "rgb_camera_info":
80 self.topic_name = "/aima/hal/sensor/rgbd_head_front/rgb_camera_info"
81 # RGB-D CameraInfo is different with other cameras. The best_effort + volatile QoS is enough for 10Hz rgb_camera_info
82 self.sub_camerainfo = self.create_subscription(
83 CameraInfo, self.topic_name, self.cb_camerainfo, qos)
84 self.get_logger().info(
85 f"✅ Subscribing RGB CameraInfo: {self.topic_name}")
86
87 elif self.topic_type == "depth_camera_info":
88 self.topic_name = "/aima/hal/sensor/rgbd_head_front/depth_camera_info"
89 # RGB-D CameraInfo is different with other cameras. The best_effort + volatile QoS is enough for 10Hz depth_camera_info
90 self.sub_camerainfo = self.create_subscription(
91 CameraInfo, self.topic_name, self.cb_camerainfo, qos)
92 self.get_logger().info(
93 f"✅ Subscribing Depth CameraInfo: {self.topic_name}")
94
95 else:
96 self.get_logger().error(f"Unknown topic_type: {self.topic_type}")
97 raise ValueError("Unknown topic_type")
98
99 # Internal state
100 self.last_print = self.get_clock().now()
101 self.arrivals = deque()
102
103 def update_arrivals(self):
104 """Calculate received FPS"""
105 now = self.get_clock().now()
106 self.arrivals.append(now)
107 while self.arrivals and (now - self.arrivals[0]).nanoseconds * 1e-9 > 1.0:
108 self.arrivals.popleft()
109
110 def get_fps(self):
111 """Get FPS"""
112 return len(self.arrivals)
113
114 def should_print(self):
115 """Control print frequency"""
116 now = self.get_clock().now()
117 if (now - self.last_print).nanoseconds * 1e-9 >= 1.0:
118 self.last_print = now
119 return True
120 return False
121
122 def cb_pointcloud(self, msg: PointCloud2):
123 """PointCloud2 callback"""
124 self.update_arrivals()
125
126 if self.should_print():
127 stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
128
129 # Format fields information
130 fields_str = " ".join(
131 [f"{f.name}({f.datatype})" for f in msg.fields])
132
133 self.get_logger().info(
134 f"🌫️ PointCloud2 received\n"
135 f" • frame_id: {msg.header.frame_id}\n"
136 f" • stamp (sec): {stamp_sec:.6f}\n"
137 f" • width x height: {msg.width} x {msg.height}\n"
138 f" • point_step: {msg.point_step}\n"
139 f" • row_step: {msg.row_step}\n"
140 f" • fields: {fields_str}\n"
141 f" • is_bigendian: {msg.is_bigendian}\n"
142 f" • is_dense: {msg.is_dense}\n"
143 f" • data size: {len(msg.data)}\n"
144 f" • recv FPS (1s): {self.get_fps():.1f}"
145 )
146
147 def cb_image(self, msg: Image):
148 """Image callback (Depth/RGB image)"""
149 self.update_arrivals()
150
151 if self.should_print():
152 stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
153 self.get_logger().info(
154 f"📸 {self.topic_type} received\n"
155 f" • frame_id: {msg.header.frame_id}\n"
156 f" • stamp (sec): {stamp_sec:.6f}\n"
157 f" • encoding: {msg.encoding}\n"
158 f" • size (WxH): {msg.width} x {msg.height}\n"
159 f" • step (bytes/row):{msg.step}\n"
160 f" • is_bigendian: {msg.is_bigendian}\n"
161 f" • recv FPS (1s): {self.get_fps():.1f}"
162 )
163
164 # Only RGB image supports video dump
165 if self.topic_type == "rgb_image" and self.dump_video_path:
166 self.dump_image_to_video(msg)
167
168 def cb_compressed(self, msg: CompressedImage):
169 """CompressedImage callback"""
170 self.update_arrivals()
171
172 if self.should_print():
173 stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
174 self.get_logger().info(
175 f"🗜️ CompressedImage received\n"
176 f" • frame_id: {msg.header.frame_id}\n"
177 f" • stamp (sec): {stamp_sec:.6f}\n"
178 f" • format: {msg.format}\n"
179 f" • data size: {len(msg.data)}\n"
180 f" • recv FPS (1s): {self.get_fps():.1f}"
181 )
182
183 def cb_camerainfo(self, msg: CameraInfo):
184 """CameraInfo callback (camera intrinsic parameters)"""
185 # Camera info will only receive one frame, print it directly
186 stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
187
188 # Format D array
189 d_str = ", ".join([f"{d:.6f}" for d in msg.d])
190
191 # Format K matrix
192 k_str = ", ".join([f"{k:.6f}" for k in msg.k])
193
194 # Format P matrix
195 p_str = ", ".join([f"{p:.6f}" for p in msg.p])
196
197 self.get_logger().info(
198 f"📷 {self.topic_type} received\n"
199 f" • frame_id: {msg.header.frame_id}\n"
200 f" • stamp (sec): {stamp_sec:.6f}\n"
201 f" • width x height: {msg.width} x {msg.height}\n"
202 f" • distortion_model:{msg.distortion_model}\n"
203 f" • D: [{d_str}]\n"
204 f" • K: [{k_str}]\n"
205 f" • P: [{p_str}]\n"
206 f" • binning_x: {msg.binning_x}\n"
207 f" • binning_y: {msg.binning_y}\n"
208 f" • roi: {{ x_offset: {msg.roi.x_offset}, y_offset: {msg.roi.y_offset}, height: {msg.roi.height}, width: {msg.roi.width}, do_rectify: {msg.roi.do_rectify} }}"
209 )
210
211 def dump_image_to_video(self, msg: Image):
212 """Video dump is only supported for RGB images"""
213 # You can add video recording functionality here
214 # Simplified in the Python version, only logs instead
215 if self.should_print():
216 self.get_logger().info(f"📝 Video dump not implemented in Python version")
217
218
219def main(args=None):
220 rclpy.init(args=args)
221 try:
222 node = CameraTopicEcho()
223 rclpy.spin(node)
224 except KeyboardInterrupt:
225 pass
226 except Exception as e:
227 print(f"Error: {e}")
228 finally:
229 if 'node' in locals():
230 node.destroy_node()
231 rclpy.shutdown()
232
233
234if __name__ == '__main__':
235 main()
使用说明:
订阅深度点云数据:
ros2 run py_examples echo_camera_rgbd --ros-args -p topic_type:=depth_pointcloud
订阅RGB图像数据:
ros2 run py_examples echo_camera_rgbd --ros-args -p topic_type:=rgb_image
订阅相机内参:
ros2 run py_examples echo_camera_rgbd --ros-args -p topic_type:=rgb_camera_info ros2 run py_examples echo_camera_rgbd --ros-args -p topic_type:=depth_camera_info
录制RGB视频:
# dump_video_path的值可改为其他路径, 注意提前创建该文件所在目录才能保存 ros2 run py_examples echo_camera_rgbd --ros-args -p topic_type:=rgb_image -p dump_video_path:=$PWD/output.avi
双目相机数据订阅
该示例中用到了echo_camera_stereo,通过订阅/aima/hal/sensor/stereo_head_front_*/话题来接收机器人的双目相机数据,支持左右相机的RGB图、压缩图和相机内参数据。
功能特点:
支持左右相机独立数据订阅
实时FPS统计和数据显示
支持RGB图视频录制功能
可配置的相机选择(左/右)
支持的数据类型:
left_rgb_image: 左相机RGB图像 (sensor_msgs/Image)left_rgb_image_compressed: 左相机压缩RGB图像 (sensor_msgs/CompressedImage)left_camera_info: 左相机内参 (sensor_msgs/CameraInfo)right_rgb_image: 右相机RGB图像 (sensor_msgs/Image)right_rgb_image_compressed: 右相机压缩RGB图像 (sensor_msgs/CompressedImage)right_camera_info: 右相机内参 (sensor_msgs/CameraInfo)
1#!/usr/bin/env python3
2"""
3Head stereo camera multi-topic subscription example
4
5Supports selecting the topic type to subscribe via startup parameter --ros-args -p topic_type:=<type>:
6 - left_rgb_image: Left camera RGB image (sensor_msgs/Image)
7 - left_rgb_image_compressed: Left camera RGB compressed image (sensor_msgs/CompressedImage)
8 - left_camera_info: Left camera intrinsic parameters (sensor_msgs/CameraInfo)
9 - right_rgb_image: Right camera RGB image (sensor_msgs/Image)
10 - right_rgb_image_compressed: Right camera RGB compressed image (sensor_msgs/CompressedImage)
11 - right_camera_info: Right camera intrinsic parameters (sensor_msgs/CameraInfo)
12
13Example:
14 ros2 run py_examples echo_camera_stereo --ros-args -p topic_type:=left_rgb_image
15 ros2 run py_examples echo_camera_stereo --ros-args -p topic_type:=right_rgb_image
16 ros2 run py_examples echo_camera_stereo --ros-args -p topic_type:=left_camera_info
17
18Default topic_type is left_rgb_image
19"""
20
21import rclpy
22from rclpy.node import Node
23from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSDurabilityPolicy, QoSHistoryPolicy
24from sensor_msgs.msg import Image, CompressedImage, CameraInfo
25from collections import deque
26import time
27
28
29class StereoCameraTopicEcho(Node):
30 def __init__(self):
31 super().__init__('stereo_camera_topic_echo')
32
33 # Select the topic type to subscribe
34 self.declare_parameter('topic_type', 'left_rgb_image')
35 self.declare_parameter('dump_video_path', '')
36
37 self.topic_type = self.get_parameter('topic_type').value
38 self.dump_video_path = self.get_parameter('dump_video_path').value
39
40 # Set QoS parameters - use sensor data QoS
41 qos = QoSProfile(
42 reliability=QoSReliabilityPolicy.BEST_EFFORT,
43 history=QoSHistoryPolicy.KEEP_LAST,
44 depth=5,
45 durability=QoSDurabilityPolicy.VOLATILE
46 )
47
48 # Create different subscribers based on topic_type
49 if self.topic_type == "left_rgb_image":
50 self.topic_name = "/aima/hal/sensor/stereo_head_front_left/rgb_image"
51 self.sub_image = self.create_subscription(
52 Image, self.topic_name, self.cb_image, qos)
53 self.get_logger().info(
54 f"✅ Subscribing Left RGB Image: {self.topic_name}")
55 if self.dump_video_path:
56 self.get_logger().info(
57 f"📝 Will dump received images to video: {self.dump_video_path}")
58
59 elif self.topic_type == "left_rgb_image_compressed":
60 self.topic_name = "/aima/hal/sensor/stereo_head_front_left/rgb_image/compressed"
61 self.sub_compressed = self.create_subscription(
62 CompressedImage, self.topic_name, self.cb_compressed, qos)
63 self.get_logger().info(
64 f"✅ Subscribing Left CompressedImage: {self.topic_name}")
65
66 elif self.topic_type == "left_camera_info":
67 self.topic_name = "/aima/hal/sensor/stereo_head_front_left/camera_info"
68 # CameraInfo subscription must use reliable + transient_local QoS to receive historical messages (even if only one frame is published)
69 camera_qos = QoSProfile(
70 reliability=QoSReliabilityPolicy.RELIABLE,
71 history=QoSHistoryPolicy.KEEP_LAST,
72 depth=1,
73 durability=QoSDurabilityPolicy.TRANSIENT_LOCAL
74 )
75 self.sub_camerainfo = self.create_subscription(
76 CameraInfo, self.topic_name, self.cb_camerainfo, camera_qos)
77 self.get_logger().info(
78 f"✅ Subscribing Left CameraInfo (with transient_local): {self.topic_name}")
79
80 elif self.topic_type == "right_rgb_image":
81 self.topic_name = "/aima/hal/sensor/stereo_head_front_right/rgb_image"
82 self.sub_image = self.create_subscription(
83 Image, self.topic_name, self.cb_image, qos)
84 self.get_logger().info(
85 f"✅ Subscribing Right RGB Image: {self.topic_name}")
86 if self.dump_video_path:
87 self.get_logger().info(
88 f"📝 Will dump received images to video: {self.dump_video_path}")
89
90 elif self.topic_type == "right_rgb_image_compressed":
91 self.topic_name = "/aima/hal/sensor/stereo_head_front_right/rgb_image/compressed"
92 self.sub_compressed = self.create_subscription(
93 CompressedImage, self.topic_name, self.cb_compressed, qos)
94 self.get_logger().info(
95 f"✅ Subscribing Right CompressedImage: {self.topic_name}")
96
97 elif self.topic_type == "right_camera_info":
98 self.topic_name = "/aima/hal/sensor/stereo_head_front_right/camera_info"
99 # CameraInfo subscription must use reliable + transient_local QoS to receive historical messages (even if only one frame is published)
100 camera_qos = QoSProfile(
101 reliability=QoSReliabilityPolicy.RELIABLE,
102 history=QoSHistoryPolicy.KEEP_LAST,
103 depth=1,
104 durability=QoSDurabilityPolicy.TRANSIENT_LOCAL
105 )
106 self.sub_camerainfo = self.create_subscription(
107 CameraInfo, self.topic_name, self.cb_camerainfo, camera_qos)
108 self.get_logger().info(
109 f"✅ Subscribing Right CameraInfo (with transient_local): {self.topic_name}")
110
111 else:
112 self.get_logger().error(f"Unknown topic_type: {self.topic_type}")
113 raise ValueError("Unknown topic_type")
114
115 # Internal state
116 self.last_print = self.get_clock().now()
117 self.arrivals = deque()
118
119 def update_arrivals(self):
120 """Calculate received FPS"""
121 now = self.get_clock().now()
122 self.arrivals.append(now)
123 while self.arrivals and (now - self.arrivals[0]).nanoseconds * 1e-9 > 1.0:
124 self.arrivals.popleft()
125
126 def get_fps(self):
127 """Get FPS"""
128 return len(self.arrivals)
129
130 def should_print(self):
131 """Control print frequency"""
132 now = self.get_clock().now()
133 if (now - self.last_print).nanoseconds * 1e-9 >= 1.0:
134 self.last_print = now
135 return True
136 return False
137
138 def cb_image(self, msg: Image):
139 """Image callback (Left/Right camera RGB image)"""
140 self.update_arrivals()
141
142 if self.should_print():
143 stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
144 self.get_logger().info(
145 f"📸 {self.topic_type} received\n"
146 f" • frame_id: {msg.header.frame_id}\n"
147 f" • stamp (sec): {stamp_sec:.6f}\n"
148 f" • encoding: {msg.encoding}\n"
149 f" • size (WxH): {msg.width} x {msg.height}\n"
150 f" • step (bytes/row):{msg.step}\n"
151 f" • is_bigendian: {msg.is_bigendian}\n"
152 f" • recv FPS (1s): {self.get_fps():.1f}"
153 )
154
155 # Only RGB images support video dump
156 if (self.topic_type in ["left_rgb_image", "right_rgb_image"]) and self.dump_video_path:
157 self.dump_image_to_video(msg)
158
159 def cb_compressed(self, msg: CompressedImage):
160 """CompressedImage callback (Left/Right camera RGB compressed image)"""
161 self.update_arrivals()
162
163 if self.should_print():
164 stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
165 self.get_logger().info(
166 f"🗜️ {self.topic_type} received\n"
167 f" • frame_id: {msg.header.frame_id}\n"
168 f" • stamp (sec): {stamp_sec:.6f}\n"
169 f" • format: {msg.format}\n"
170 f" • data size: {len(msg.data)}\n"
171 f" • recv FPS (1s): {self.get_fps():.1f}"
172 )
173
174 def cb_camerainfo(self, msg: CameraInfo):
175 """CameraInfo callback (Left/Right camera intrinsic parameters)"""
176 # Camera info will only receive one frame, print it directly
177 stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
178
179 # Format D array
180 d_str = ", ".join([f"{d:.6f}" for d in msg.d])
181
182 # Format K matrix
183 k_str = ", ".join([f"{k:.6f}" for k in msg.k])
184
185 # Format P matrix
186 p_str = ", ".join([f"{p:.6f}" for p in msg.p])
187
188 self.get_logger().info(
189 f"📷 {self.topic_type} received\n"
190 f" • frame_id: {msg.header.frame_id}\n"
191 f" • stamp (sec): {stamp_sec:.6f}\n"
192 f" • width x height: {msg.width} x {msg.height}\n"
193 f" • distortion_model:{msg.distortion_model}\n"
194 f" • D: [{d_str}]\n"
195 f" • K: [{k_str}]\n"
196 f" • P: [{p_str}]\n"
197 f" • binning_x: {msg.binning_x}\n"
198 f" • binning_y: {msg.binning_y}\n"
199 f" • roi: {{ x_offset: {msg.roi.x_offset}, y_offset: {msg.roi.y_offset}, height: {msg.roi.height}, width: {msg.roi.width}, do_rectify: {msg.roi.do_rectify} }}"
200 )
201
202 def dump_image_to_video(self, msg: Image):
203 """Video dump is only supported for RGB images"""
204 # You can add video recording functionality here
205 # Simplified in the Python version, only logs instead
206 if self.should_print():
207 self.get_logger().info(f"📝 Video dump not implemented in Python version")
208
209
210def main(args=None):
211 rclpy.init(args=args)
212 try:
213 node = StereoCameraTopicEcho()
214 rclpy.spin(node)
215 except KeyboardInterrupt:
216 pass
217 except Exception as e:
218 print(f"Error: {e}")
219 finally:
220 if 'node' in locals():
221 node.destroy_node()
222 rclpy.shutdown()
223
224
225if __name__ == '__main__':
226 main()
使用说明:
订阅左相机RGB图像:
ros2 run py_examples echo_camera_stereo --ros-args -p topic_type:=left_rgb_image
订阅右相机RGB图像:
ros2 run py_examples echo_camera_stereo --ros-args -p topic_type:=right_rgb_image
订阅左相机内参:
ros2 run py_examples echo_camera_stereo --ros-args -p topic_type:=left_camera_info
录制左相机视频:
# dump_video_path的值可改为其他路径, 注意提前创建该文件所在目录才能保存 ros2 run py_examples echo_camera_stereo --ros-args -p topic_type:=left_rgb_image -p dump_video_path:=$PWD/left_camera.avi
头部后置单目相机数据订阅
该示例中用到了echo_camera_head_rear,通过订阅/aima/hal/sensor/rgb_head_rear/话题来接收机器人的头部后置单目相机数据,支持RGB图、压缩图和相机内参数据。
功能特点:
支持头部后置相机数据订阅
实时FPS统计和数据显示
支持RGB图视频录制功能
可配置的topic类型选择
支持的数据类型:
rgb_image: RGB图像 (sensor_msgs/Image)rgb_image_compressed: 压缩RGB图像 (sensor_msgs/CompressedImage)camera_info: 相机内参 (sensor_msgs/CameraInfo)
1#!/usr/bin/env python3
2"""
3Head rear monocular camera multi-topic subscription example
4
5Supports selecting the topic type to subscribe via startup parameter --ros-args -p topic_type:=<type>:
6 - rgb_image: RGB image (sensor_msgs/Image)
7 - rgb_image_compressed: RGB compressed image (sensor_msgs/CompressedImage)
8 - camera_info: Camera intrinsic parameters (sensor_msgs/CameraInfo)
9
10Example:
11 ros2 run py_examples echo_camera_head_rear --ros-args -p topic_type:=rgb_image
12 ros2 run py_examples echo_camera_head_rear --ros-args -p topic_type:=rgb_image_compressed
13 ros2 run py_examples echo_camera_head_rear --ros-args -p topic_type:=camera_info
14
15Default topic_type is rgb_image
16"""
17
18import rclpy
19from rclpy.node import Node
20from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSDurabilityPolicy, QoSHistoryPolicy
21from sensor_msgs.msg import Image, CompressedImage, CameraInfo
22from collections import deque
23import time
24
25
26class HeadRearCameraTopicEcho(Node):
27 def __init__(self):
28 super().__init__('head_rear_camera_topic_echo')
29
30 # Select the topic type to subscribe
31 self.declare_parameter('topic_type', 'rgb_image')
32 self.declare_parameter('dump_video_path', '')
33
34 self.topic_type = self.get_parameter('topic_type').value
35 self.dump_video_path = self.get_parameter('dump_video_path').value
36
37 # Set QoS parameters - use sensor data QoS
38 qos = QoSProfile(
39 reliability=QoSReliabilityPolicy.BEST_EFFORT,
40 history=QoSHistoryPolicy.KEEP_LAST,
41 depth=5,
42 durability=QoSDurabilityPolicy.VOLATILE
43 )
44
45 # Create different subscribers based on topic_type
46 if self.topic_type == "rgb_image":
47 self.topic_name = "/aima/hal/sensor/rgb_head_rear/rgb_image"
48 self.sub_image = self.create_subscription(
49 Image, self.topic_name, self.cb_image, qos)
50 self.get_logger().info(
51 f"✅ Subscribing RGB Image: {self.topic_name}")
52 if self.dump_video_path:
53 self.get_logger().info(
54 f"📝 Will dump received images to video: {self.dump_video_path}")
55
56 elif self.topic_type == "rgb_image_compressed":
57 self.topic_name = "/aima/hal/sensor/rgb_head_rear/rgb_image/compressed"
58 self.sub_compressed = self.create_subscription(
59 CompressedImage, self.topic_name, self.cb_compressed, qos)
60 self.get_logger().info(
61 f"✅ Subscribing CompressedImage: {self.topic_name}")
62
63 elif self.topic_type == "camera_info":
64 self.topic_name = "/aima/hal/sensor/rgb_head_rear/camera_info"
65 # CameraInfo subscription must use reliable + transient_local QoS to receive historical messages (even if only one frame is published)
66 camera_qos = QoSProfile(
67 reliability=QoSReliabilityPolicy.RELIABLE,
68 history=QoSHistoryPolicy.KEEP_LAST,
69 depth=1,
70 durability=QoSDurabilityPolicy.TRANSIENT_LOCAL
71 )
72 self.sub_camerainfo = self.create_subscription(
73 CameraInfo, self.topic_name, self.cb_camerainfo, camera_qos)
74 self.get_logger().info(
75 f"✅ Subscribing CameraInfo (with transient_local): {self.topic_name}")
76
77 else:
78 self.get_logger().error(f"Unknown topic_type: {self.topic_type}")
79 raise ValueError("Unknown topic_type")
80
81 # Internal state
82 self.last_print = self.get_clock().now()
83 self.arrivals = deque()
84
85 def update_arrivals(self):
86 """Calculate received FPS"""
87 now = self.get_clock().now()
88 self.arrivals.append(now)
89 while self.arrivals and (now - self.arrivals[0]).nanoseconds * 1e-9 > 1.0:
90 self.arrivals.popleft()
91
92 def get_fps(self):
93 """Get FPS"""
94 return len(self.arrivals)
95
96 def should_print(self):
97 """Control print frequency"""
98 now = self.get_clock().now()
99 if (now - self.last_print).nanoseconds * 1e-9 >= 1.0:
100 self.last_print = now
101 return True
102 return False
103
104 def cb_image(self, msg: Image):
105 """Image callback (RGB image)"""
106 self.update_arrivals()
107
108 if self.should_print():
109 stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
110 self.get_logger().info(
111 f"📸 {self.topic_type} received\n"
112 f" • frame_id: {msg.header.frame_id}\n"
113 f" • stamp (sec): {stamp_sec:.6f}\n"
114 f" • encoding: {msg.encoding}\n"
115 f" • size (WxH): {msg.width} x {msg.height}\n"
116 f" • step (bytes/row):{msg.step}\n"
117 f" • is_bigendian: {msg.is_bigendian}\n"
118 f" • recv FPS (1s): {self.get_fps():.1f}"
119 )
120
121 # Only RGB image supports video dump
122 if self.topic_type == "rgb_image" and self.dump_video_path:
123 self.dump_image_to_video(msg)
124
125 def cb_compressed(self, msg: CompressedImage):
126 """CompressedImage callback (RGB compressed image)"""
127 self.update_arrivals()
128
129 if self.should_print():
130 stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
131 self.get_logger().info(
132 f"🗜️ {self.topic_type} received\n"
133 f" • frame_id: {msg.header.frame_id}\n"
134 f" • stamp (sec): {stamp_sec:.6f}\n"
135 f" • format: {msg.format}\n"
136 f" • data size: {len(msg.data)}\n"
137 f" • recv FPS (1s): {self.get_fps():.1f}"
138 )
139
140 def cb_camerainfo(self, msg: CameraInfo):
141 """CameraInfo callback (camera intrinsic parameters)"""
142 # Camera info will only receive one frame, print it directly
143 stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
144
145 # Format D array
146 d_str = ", ".join([f"{d:.6f}" for d in msg.d])
147
148 # Format K matrix
149 k_str = ", ".join([f"{k:.6f}" for k in msg.k])
150
151 # Format P matrix
152 p_str = ", ".join([f"{p:.6f}" for p in msg.p])
153
154 self.get_logger().info(
155 f"📷 {self.topic_type} received\n"
156 f" • frame_id: {msg.header.frame_id}\n"
157 f" • stamp (sec): {stamp_sec:.6f}\n"
158 f" • width x height: {msg.width} x {msg.height}\n"
159 f" • distortion_model:{msg.distortion_model}\n"
160 f" • D: [{d_str}]\n"
161 f" • K: [{k_str}]\n"
162 f" • P: [{p_str}]\n"
163 f" • binning_x: {msg.binning_x}\n"
164 f" • binning_y: {msg.binning_y}\n"
165 f" • roi: {{ x_offset: {msg.roi.x_offset}, y_offset: {msg.roi.y_offset}, height: {msg.roi.height}, width: {msg.roi.width}, do_rectify: {msg.roi.do_rectify} }}"
166 )
167
168 def dump_image_to_video(self, msg: Image):
169 """Video dump is only supported for RGB images"""
170 # You can add video recording functionality here
171 # Simplified in the Python version, only logs instead
172 if self.should_print():
173 self.get_logger().info(f"📝 Video dump not implemented in Python version")
174
175
176def main(args=None):
177 rclpy.init(args=args)
178 try:
179 node = HeadRearCameraTopicEcho()
180 rclpy.spin(node)
181 except KeyboardInterrupt:
182 pass
183 except Exception as e:
184 print(f"Error: {e}")
185 finally:
186 if 'node' in locals():
187 node.destroy_node()
188 rclpy.shutdown()
189
190
191if __name__ == '__main__':
192 main()
使用说明:
订阅RGB图像数据:
ros2 run py_examples echo_camera_head_rear --ros-args -p topic_type:=rgb_image
订阅压缩图像数据:
ros2 run py_examples echo_camera_head_rear --ros-args -p topic_type:=rgb_image_compressed
订阅相机内参:
ros2 run py_examples echo_camera_head_rear --ros-args -p topic_type:=camera_info
录制视频:
# dump_video_path的值可改为其他路径, 注意提前创建该文件所在目录才能保存 ros2 run py_examples echo_camera_head_rear --ros-args -p topic_type:=rgb_image -p dump_video_path:=$PWD/rear_camera.avi
应用场景:
人脸识别和追踪
目标检测和识别
视觉SLAM
图像处理和计算机视觉算法开发
机器人视觉导航
6.1.13 激光雷达数据订阅
该示例中用到了echo_lidar_data,通过订阅/aima/hal/sensor/lidar_chest_front/话题来接收机器人的激光雷达数据,支持点云数据和IMU数据两种数据类型。
功能特点:
支持激光雷达点云数据订阅
支持激光雷达IMU数据订阅
实时FPS统计和数据显示
可配置的topic类型选择
详细的数据字段信息输出
支持的数据类型:
PointCloud2: 激光雷达点云数据 (sensor_msgs/PointCloud2)Imu: 激光雷达IMU数据 (sensor_msgs/Imu)
技术实现:
使用SensorDataQoS配置(
BEST_EFFORT+VOLATILE)支持点云字段信息解析和显示
支持IMU四元数、角速度和线性加速度数据
提供详细的调试日志输出
应用场景:
激光雷达数据采集和分析
点云数据处理和可视化
机器人导航和定位
SLAM算法开发
环境感知和建图
1#!/usr/bin/env python3
2"""
3Chest LiDAR data subscription example
4
5Supports subscribing to the following topics:
6 1. /aima/hal/sensor/lidar_chest_front/lidar_pointcloud
7 - Data type: sensor_msgs/PointCloud2
8 - frame_id: lidar_chest_front
9 - child_frame_id: /
10 - Content: LiDAR point cloud data
11 2. /aima/hal/sensor/lidar_chest_front/imu
12 - Data type: sensor_msgs/Imu
13 - frame_id: lidar_imu_chest_front
14 - Content: LiDAR IMU data
15
16You can select the topic type to subscribe via startup parameter --ros-args -p topic_type:=<type>:
17 - pointcloud: subscribe to LiDAR point cloud
18 - imu: subscribe to LiDAR IMU
19Default topic_type is pointcloud
20
21Examples:
22 ros2 run py_examples echo_lidar_data --ros-args -p topic_type:=pointcloud
23 ros2 run py_examples echo_lidar_data --ros-args -p topic_type:=imu
24"""
25
26import rclpy
27from rclpy.node import Node
28from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy
29from sensor_msgs.msg import PointCloud2, Imu
30from collections import deque
31import time
32
33
34class LidarChestEcho(Node):
35 def __init__(self):
36 super().__init__('lidar_chest_echo')
37
38 # Select the topic type to subscribe
39 self.declare_parameter('topic_type', 'pointcloud')
40 self.topic_type = self.get_parameter('topic_type').value
41
42 # SensorDataQoS: BEST_EFFORT + VOLATILE
43 qos = QoSProfile(
44 reliability=QoSReliabilityPolicy.BEST_EFFORT,
45 history=QoSHistoryPolicy.KEEP_LAST,
46 depth=5
47 )
48
49 # Create different subscribers based on topic_type
50 if self.topic_type == "pointcloud":
51 self.topic_name = "/aima/hal/sensor/lidar_chest_front/lidar_pointcloud"
52 self.sub_pointcloud = self.create_subscription(
53 PointCloud2, self.topic_name, self.cb_pointcloud, qos)
54 self.get_logger().info(
55 f"✅ Subscribing LIDAR PointCloud2: {self.topic_name}")
56
57 elif self.topic_type == "imu":
58 self.topic_name = "/aima/hal/sensor/lidar_chest_front/imu"
59 self.sub_imu = self.create_subscription(
60 Imu, self.topic_name, self.cb_imu, qos)
61 self.get_logger().info(
62 f"✅ Subscribing LIDAR IMU: {self.topic_name}")
63
64 else:
65 self.get_logger().error(f"Unknown topic_type: {self.topic_type}")
66 raise ValueError("Unknown topic_type")
67
68 # Internal state
69 self.last_print = self.get_clock().now()
70 self.arrivals = deque()
71
72 def update_arrivals(self):
73 """Calculate received FPS"""
74 now = self.get_clock().now()
75 self.arrivals.append(now)
76 while self.arrivals and (now - self.arrivals[0]).nanoseconds * 1e-9 > 1.0:
77 self.arrivals.popleft()
78
79 def get_fps(self):
80 """Get FPS"""
81 return len(self.arrivals)
82
83 def should_print(self):
84 """Control print frequency"""
85 now = self.get_clock().now()
86 if (now - self.last_print).nanoseconds * 1e-9 >= 1.0:
87 self.last_print = now
88 return True
89 return False
90
91 def cb_pointcloud(self, msg: PointCloud2):
92 """PointCloud2 callback (LiDAR point cloud)"""
93 self.update_arrivals()
94
95 if self.should_print():
96 stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
97
98 # Format fields info
99 fields_str = " ".join(
100 [f"{f.name}({f.datatype})" for f in msg.fields])
101
102 self.get_logger().info(
103 f"🟢 LIDAR PointCloud2 received\n"
104 f" • frame_id: {msg.header.frame_id}\n"
105 f" • stamp (sec): {stamp_sec:.6f}\n"
106 f" • width x height: {msg.width} x {msg.height}\n"
107 f" • point_step: {msg.point_step}\n"
108 f" • row_step: {msg.row_step}\n"
109 f" • fields: {fields_str}\n"
110 f" • is_bigendian: {msg.is_bigendian}\n"
111 f" • is_dense: {msg.is_dense}\n"
112 f" • data size: {len(msg.data)}\n"
113 f" • recv FPS (1s): {self.get_fps():1.1f}"
114 )
115
116 def cb_imu(self, msg: Imu):
117 """IMU callback (LiDAR IMU)"""
118 self.update_arrivals()
119
120 if self.should_print():
121 stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
122
123 self.get_logger().info(
124 f"🟢 LIDAR IMU received\n"
125 f" • frame_id: {msg.header.frame_id}\n"
126 f" • stamp (sec): {stamp_sec:.6f}\n"
127 f" • orientation: [{msg.orientation.x:.6f}, {msg.orientation.y:.6f}, {msg.orientation.z:.6f}, {msg.orientation.w:.6f}]\n"
128 f" • angular_velocity:[{msg.angular_velocity.x:.6f}, {msg.angular_velocity.y:.6f}, {msg.angular_velocity.z:.6f}]\n"
129 f" • linear_accel: [{msg.linear_acceleration.x:.6f}, {msg.linear_acceleration.y:.6f}, {msg.linear_acceleration.z:.6f}]\n"
130 f" • recv FPS (1s): {self.get_fps():.1f}"
131 )
132
133
134def main(args=None):
135 rclpy.init(args=args)
136 try:
137 node = LidarChestEcho()
138 rclpy.spin(node)
139 except KeyboardInterrupt:
140 pass
141 except Exception as e:
142 print(f"Error: {e}")
143 finally:
144 if 'node' in locals():
145 node.destroy_node()
146 rclpy.shutdown()
147
148
149if __name__ == '__main__':
150 main()
使用说明:
# 订阅激光雷达点云数据
ros2 run py_examples echo_lidar_data --ros-args -p topic_type:=pointcloud
# 订阅激光雷达IMU数据
ros2 run py_examples echo_lidar_data --ros-args -p topic_type:=imu
输出示例:
[INFO] [lidar_chest_echo]: ✅ Subscribing LIDAR PointCloud2: /aima/hal/sensor/lidar_chest_front/lidar_pointcloud
[INFO] [lidar_chest_echo]: 🟢 LIDAR PointCloud2 received
• frame_id: lidar_chest_front
• stamp (sec): 1234567890.123456
• width x height: 1 x 36000
• point_step: 16
• row_step: 16
• fields: x(7) y(7) z(7) intensity(7)
• is_bigendian: False
• is_dense: True
• data size: 576000
• recv FPS (1s): 10.0
6.1.14 播放视频
该示例中用到了play_video,在运行节点程序前,需要先将视频上传到机器人的**交互计算单元(PC3)**上(用户可在其上创建一个用来存储视频的目录,如/var/tmp/videos/),然后将节点程序中的video_path改为需要播放视频的路径。
注意
⚠️ 请注意!交互计算单元(PC3)独立于二开程序所在的开发计算单元(PC2), 音视频文件务必存入交互计算单元(IP: 10.0.1.42)。
⚠️ 音视频文件夹及该文件夹所有父目录应当为所有用户可访问读取(建议在/var/tmp/下创建子目录存放)
功能说明
通过调用PlayVideo服务,可以让机器人在屏幕上播放指定路径的视频文件。请确保视频文件已上传到交互计算单元,否则播放会失败。
1#!/usr/bin/env python3
2
3import rclpy
4import rclpy.logging
5from rclpy.node import Node
6
7from aimdk_msgs.srv import PlayVideo
8
9
10class PlayVideoClient(Node):
11 def __init__(self):
12 super().__init__('play_video_client')
13 self.client = self.create_client(
14 PlayVideo, '/face_ui_proxy/play_video')
15 self.get_logger().info('✅ PlayVideo client node created.')
16
17 # Wait for the service to become available
18 while not self.client.wait_for_service(timeout_sec=2.0):
19 self.get_logger().info('⏳ Service unavailable, waiting...')
20
21 self.get_logger().info('🟢 Service available, ready to send request.')
22
23 def send_request(self, video_path, mode, priority):
24 req = PlayVideo.Request()
25
26 req.video_path = video_path
27 req.mode = mode
28 req.priority = priority
29
30 # async call
31 self.get_logger().info(
32 f'📨 Sending request to play video: mode={mode} video={video_path}')
33 for i in range(8):
34 req.header.header.stamp = self.get_clock().now().to_msg()
35 future = self.client.call_async(req)
36 rclpy.spin_until_future_complete(self, future, timeout_sec=0.25)
37
38 if future.done():
39 break
40
41 # retry as remote peer is NOT handled well by ROS
42 self.get_logger().info(f'trying ... [{i}]')
43
44 resp = future.result()
45 if resp is None:
46 self.get_logger().error('❌ Service call not completed or timed out.')
47 return False
48
49 if resp.success:
50 self.get_logger().info(
51 f'✅ Request to play video recorded successfully: {resp.message}')
52 return True
53 else:
54 self.get_logger().error(
55 f'❌ Failed to record play-video request: {resp.message}')
56 return False
57
58
59def main(args=None):
60 rclpy.init(args=args)
61 node = None
62
63 try:
64 # video path and priority can be customized
65 video_path = "/agibot/data/home/agi/zhiyuan.mp4"
66 priority = 5
67 # input play mode
68 mode = int(input("Enter video play mode (1: play once, 2: loop): "))
69 if mode not in (1, 2):
70 raise ValueError(f'invalid mode {mode}')
71
72 node = PlayVideoClient()
73 node.send_request(video_path, mode, priority)
74 except KeyboardInterrupt:
75 pass
76 except Exception as e:
77 rclpy.logging.get_logger('main').error(
78 f'Program exited with exception: {e}')
79
80 if node:
81 node.destroy_node()
82 if rclpy.ok():
83 rclpy.shutdown()
84
85
86if __name__ == '__main__':
87 main()
6.1.15 媒体文件播放
该示例中用到了play_media,通过该节点可实现播放指定的媒体文件(如音频文件),支持WAV、MP3等格式的音频文件播放。
功能特点:
支持多种音频格式播放(WAV、MP3等)
支持优先级控制,可设置播放优先级
支持打断机制,可中断当前播放
支持自定义文件路径和播放参数
提供完整的错误处理和状态反馈
技术实现:
使用PlayMediaFile服务进行媒体文件播放
支持优先级级别设置(0-99)
支持中断控制(is_interrupted参数)
提供详细的播放状态反馈
兼容不同的响应字段格式
应用场景:
音频文件播放和媒体控制
语音提示和音效播放
多媒体应用开发
机器人交互音频反馈
注意
⚠️ 请注意!交互计算单元(PC3)独立于二开程序所在的开发计算单元(PC2), 音视频文件务必存入交互计算单元(IP: 10.0.1.42)。
⚠️ 音视频文件夹及该文件夹所有父目录应当为所有用户可访问读取(建议在/var/tmp/下创建子目录存放)
1#!/usr/bin/env python3
2
3import sys
4import rclpy
5import rclpy.logging
6from rclpy.node import Node
7
8from aimdk_msgs.srv import PlayMediaFile
9from aimdk_msgs.msg import TtsPriorityLevel
10
11
12class PlayMediaClient(Node):
13 def __init__(self):
14 super().__init__('play_media_client')
15 self.client = self.create_client(
16 PlayMediaFile, '/aimdk_5Fmsgs/srv/PlayMediaFile')
17 self.get_logger().info('✅ PlayMedia client node created.')
18
19 # Wait for the service to become available
20 while not self.client.wait_for_service(timeout_sec=2.0):
21 self.get_logger().info('⏳ Service unavailable, waiting...')
22
23 self.get_logger().info('🟢 Service available, ready to send request.')
24
25 def send_request(self, media_path):
26 req = PlayMediaFile.Request()
27
28 req.media_file_req.file_name = media_path
29 req.media_file_req.domain = 'demo_client' # required: caller domain
30 req.media_file_req.trace_id = 'demo' # optional
31 req.media_file_req.is_interrupted = True # interrupt same-priority
32 req.media_file_req.priority_weight = 0 # optional: 0~99
33 req.media_file_req.priority_level.value = TtsPriorityLevel.INTERACTION_L6
34
35 self.get_logger().info(
36 f'📨 Sending request to play media: {media_path}')
37 for i in range(8):
38 req.header.header.stamp = self.get_clock().now().to_msg()
39 future = self.client.call_async(req)
40 rclpy.spin_until_future_complete(self, future, timeout_sec=0.25)
41
42 if future.done():
43 break
44
45 # retry as remote peer is NOT handled well by ROS
46 self.get_logger().info(f'trying ... [{i}]')
47
48 resp = future.result()
49 if resp is None:
50 self.get_logger().error('❌ Service call not completed or timed out.')
51 return False
52
53 if resp.tts_resp.is_success:
54 self.get_logger().info('✅ Request to play media file recorded successfully.')
55 return True
56 else:
57 self.get_logger().error('❌ Failed to record play-media request.')
58 return False
59
60
61def main(args=None):
62 rclpy.init(args=args)
63 node = None
64
65 default_media = '/agibot/data/var/interaction/tts_cache/normal/demo.wav'
66 try:
67 if len(sys.argv) > 1:
68 media_path = sys.argv[1]
69 else:
70 media_path = input(
71 f'Enter media file path to play (default: {default_media}): ').strip()
72 if not media_path:
73 media_path = default_media
74
75 node = PlayMediaClient()
76 node.send_request(media_path)
77 except KeyboardInterrupt:
78 pass
79 except Exception as e:
80 rclpy.logging.get_logger('main').error(
81 f'Program exited with exception: {e}')
82
83 if node:
84 node.destroy_node()
85 if rclpy.ok():
86 rclpy.shutdown()
87
88
89if __name__ == '__main__':
90 main()
使用说明:
# 播放默认音频文件
ros2 run py_examples play_media
# 播放指定音频文件
# 注意替换/path/to/your/audio_file.wav为交互板上实际文件路径
ros2 run py_examples play_media /path/to/your/audio_file.wav
# 播放TTS缓存文件
ros2 run py_examples play_media /agibot/data/var/interaction/tts_cache/normal/demo.wav
输出示例:
[INFO] [play_media_file_client_min]: 等待服务: /aimdk_5Fmsgs/srv/PlayMediaFile
[INFO] [play_media_file_client_min]: ✅ 媒体文件播放请求成功
注意事项:
确保音频文件路径正确且文件存在
支持的文件格式:WAV、MP3等
优先级设置影响播放队列顺序
打断功能可中断当前播放的音频
程序具有完善的异常处理机制
6.1.16 TTS (文字转语音)
该示例中用到了play_tts,通过该节点可实现语音播放输入的文字,用户可根据不同的场景输入相应的文本。
功能特点:
支持命令行参数和交互式输入
完整的服务可用性检查和错误处理
支持优先级控制和打断机制
提供详细的播放状态反馈
核心代码
1#!/usr/bin/env python3
2
3import sys
4import rclpy
5import rclpy.logging
6from rclpy.node import Node
7
8from aimdk_msgs.srv import PlayTts
9from aimdk_msgs.msg import TtsPriorityLevel
10
11
12class PlayTTSClient(Node):
13 def __init__(self):
14 super().__init__('play_tts_client')
15
16 # fill in the actual service name
17 self.client = self.create_client(PlayTts, '/aimdk_5Fmsgs/srv/PlayTts')
18 self.get_logger().info('✅ PlayTts client node created.')
19
20 # Wait for the service to become available
21 while not self.client.wait_for_service(timeout_sec=2.0):
22 self.get_logger().info('⏳ Service unavailable, waiting...')
23
24 self.get_logger().info('🟢 Service available, ready to send request.')
25
26 def send_request(self, text):
27 req = PlayTts.Request()
28
29 req.tts_req.text = text
30 req.tts_req.domain = 'demo_client' # required: caller domain
31 req.tts_req.trace_id = 'demo' # optional: request id
32 req.tts_req.is_interrupted = True # required: interrupt same-priority
33 req.tts_req.priority_weight = 0
34 req.tts_req.priority_level.value = 6
35
36 self.get_logger().info(f'📨 Sending request to play tts: text={text}')
37 for i in range(8):
38 req.header.header.stamp = self.get_clock().now().to_msg()
39 future = self.client.call_async(req)
40 rclpy.spin_until_future_complete(self, future, timeout_sec=0.25)
41
42 if future.done():
43 break
44
45 # retry as remote peer is NOT handled well by ROS
46 self.get_logger().info(f'trying ... [{i}]')
47
48 resp = future.result()
49 if resp is None:
50 self.get_logger().error('❌ Service call not completed or timed out.')
51 return False
52
53 if resp.tts_resp.is_success:
54 self.get_logger().info('✅ TTS sent successfully.')
55 return True
56 else:
57 self.get_logger().error('❌ Failed to send TTS.')
58 return False
59
60
61def main(args=None):
62 rclpy.init(args=args)
63 node = None
64
65 try:
66 # get text to speak
67 if len(sys.argv) > 1:
68 text = sys.argv[1]
69 else:
70 text = input('Enter text to speak: ')
71 if not text:
72 text = 'Hello, I am AgiBot X2.'
73
74 node = PlayTTSClient()
75 node.send_request(text)
76 except KeyboardInterrupt:
77 pass
78 except Exception as e:
79 rclpy.logging.get_logger('main').error(
80 f'Program exited with exception: {e}')
81
82 if node:
83 node.destroy_node()
84 if rclpy.ok():
85 rclpy.shutdown()
86
87
88if __name__ == '__main__':
89 main()
使用说明
# 使用命令行参数播报文本(推荐)
ros2 run py_examples play_tts "你好,我是灵犀X2机器人"
# 或者不带参数运行,程序会提示用户输入
ros2 run py_examples play_tts
输出示例
[INFO] [play_tts_client_min]: ✅ 播报请求成功
注意事项
确保TTS服务正常运行
支持中文和英文文本播报
优先级设置影响播放队列顺序
打断功能可中断当前播放的语音
接口参考
服务:
/aimdk_5Fmsgs/srv/PlayTts消息:
aimdk_msgs/srv/PlayTts
6.1.17 麦克风数据接收
该示例中用到了mic_receiver,通过订阅/agent/process_audio_output话题来接收机器人的降噪音频数据,支持内置麦克风和外置麦克风两种音频流,并根据VAD(语音活动检测)状态自动保存完整的语音片段为PCM文件。
功能特点:
支持多音频流同时接收(内置麦克风 stream_id=1,外置麦克风 stream_id=2)
基于VAD状态自动检测语音开始、处理中、结束
自动保存完整语音片段为PCM格式文件
按时间戳和音频流分类存储
支持音频时长计算和统计信息输出
智能缓冲区管理,避免内存泄漏
完善的错误处理和异常管理
详细的调试日志输出
VAD状态说明:
0: 无语音1: 语音开始2: 语音处理中3: 语音结束
技术实现:
支持PCM格式音频文件保存(16kHz, 16位, 单声道)
提供详细的日志输出和状态监控
支持实时音频流处理和文件保存
应用场景:
语音识别和语音处理
音频数据采集和分析
实时语音监控
音频质量检测
多麦克风阵列数据处理
1#!/usr/bin/env python3
2"""
3Microphone data receiving example
4
5This example subscribes to the `/agent/process_audio_output` topic to receive the robot's
6noise-suppressed audio data. It supports both the built-in microphone and the external
7microphone audio streams, and automatically saves complete speech segments as PCM files
8based on the VAD (Voice Activity Detection) state.
9
10Features:
11- Supports receiving multiple audio streams at the same time (built-in mic stream_id=1, external mic stream_id=2)
12- Automatically detects speech start / in-progress / end based on VAD state
13- Automatically saves complete speech segments as PCM files
14- Stores files categorized by timestamp and audio stream
15- Supports audio duration calculation and logging
16
17VAD state description:
18- 0: No speech
19- 1: Speech start
20- 2: Speech in progress
21- 3: Speech end
22"""
23
24import rclpy
25from rclpy.node import Node
26from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy
27from aimdk_msgs.msg import ProcessedAudioOutput, AudioVadStateType
28import os
29import time
30from datetime import datetime
31from collections import defaultdict
32from typing import Dict, List
33
34
35class AudioSubscriber(Node):
36 def __init__(self):
37 super().__init__('audio_subscriber')
38
39 # Audio buffers, stored separately by stream_id
40 # stream_id -> buffer
41 self.audio_buffers: Dict[int, List[bytes]] = defaultdict(list)
42 self.recording_state: Dict[int, bool] = defaultdict(bool)
43
44 # Create audio output directory
45 self.audio_output_dir = "audio_recordings"
46 os.makedirs(self.audio_output_dir, exist_ok=True)
47
48 # VAD state name mapping
49 self.vad_state_names = {
50 0: "No speech",
51 1: "Speech start",
52 2: "Speech in progress",
53 3: "Speech end"
54 }
55
56 # Audio stream name mapping
57 self.stream_names = {
58 1: "Built-in microphone",
59 2: "External microphone"
60 }
61
62 # QoS settings
63 qos = QoSProfile(
64 history=QoSHistoryPolicy.KEEP_LAST,
65 depth=10,
66 reliability=QoSReliabilityPolicy.BEST_EFFORT
67 )
68
69 # Create subscriber
70 self.subscription = self.create_subscription(
71 ProcessedAudioOutput,
72 '/agent/process_audio_output',
73 self.audio_callback,
74 qos
75 )
76
77 self.get_logger().info("Start subscribing to noise-suppressed audio data...")
78
79 def audio_callback(self, msg: ProcessedAudioOutput):
80 """Audio data callback"""
81 try:
82 stream_id = msg.stream_id
83 vad_state = msg.audio_vad_state.value
84 audio_data = bytes(msg.audio_data)
85
86 self.get_logger().info(
87 f"Received audio data: stream_id={stream_id}, "
88 f"vad_state={vad_state}({self.vad_state_names.get(vad_state, 'Unknown state')}), "
89 f"audio_size={len(audio_data)} bytes"
90 )
91
92 self.handle_vad_state(stream_id, vad_state, audio_data)
93
94 except Exception as e:
95 self.get_logger().error(
96 f"Error while processing audio message: {str(e)}")
97
98 def handle_vad_state(self, stream_id: int, vad_state: int, audio_data: bytes):
99 """Handle VAD state changes"""
100 stream_name = self.stream_names.get(
101 stream_id, f"Unknown stream {stream_id}")
102 vad_name = self.vad_state_names.get(
103 vad_state, f"Unknown state {vad_state}")
104
105 self.get_logger().info(
106 f"[{stream_name}] VAD state: {vad_name} audio: {len(audio_data)} bytes"
107 )
108
109 # Speech start
110 if vad_state == 1:
111 self.get_logger().info("🎤 Speech start detected")
112 self.audio_buffers[stream_id].clear()
113 self.recording_state[stream_id] = True
114 if audio_data:
115 self.audio_buffers[stream_id].append(audio_data)
116
117 # Speech in progress
118 elif vad_state == 2:
119 self.get_logger().info("🔄 Speech in progress...")
120 if self.recording_state[stream_id] and audio_data:
121 self.audio_buffers[stream_id].append(audio_data)
122
123 # Speech end
124 elif vad_state == 3:
125 self.get_logger().info("✅ Speech end")
126 if self.recording_state[stream_id] and audio_data:
127 self.audio_buffers[stream_id].append(audio_data)
128
129 if self.recording_state[stream_id] and self.audio_buffers[stream_id]:
130 self.save_audio_segment(stream_id)
131 self.recording_state[stream_id] = False
132
133 # No speech
134 elif vad_state == 0:
135 if self.recording_state[stream_id]:
136 self.get_logger().info("⏹️ Reset recording state")
137 self.recording_state[stream_id] = False
138
139 # Print current buffer status
140 buffer_size = sum(len(chunk)
141 for chunk in self.audio_buffers[stream_id])
142 recording = self.recording_state[stream_id]
143 self.get_logger().debug(
144 f"[Stream {stream_id}] Buffer size: {buffer_size} bytes, recording: {recording}"
145 )
146
147 def save_audio_segment(self, stream_id: int):
148 """Save audio segment"""
149 if not self.audio_buffers[stream_id]:
150 return
151
152 # Merge all audio data
153 audio_data = b''.join(self.audio_buffers[stream_id])
154
155 # Get current timestamp
156 now = datetime.now()
157 timestamp = now.strftime("%Y%m%d_%H%M%S_%f")[:-3] # to milliseconds
158
159 # Create subdirectory by stream_id
160 stream_dir = os.path.join(self.audio_output_dir, f"stream_{stream_id}")
161 os.makedirs(stream_dir, exist_ok=True)
162
163 # Generate filename
164 stream_name = "internal_mic" if stream_id == 1 else "external_mic" if stream_id == 2 else f"stream_{stream_id}"
165 filename = f"{stream_name}_{timestamp}.pcm"
166 filepath = os.path.join(stream_dir, filename)
167
168 try:
169 # Save PCM file
170 with open(filepath, 'wb') as f:
171 f.write(audio_data)
172
173 self.get_logger().info(
174 f"Audio segment saved: {filepath} (size: {len(audio_data)} bytes)")
175
176 # Calculate audio duration (assuming 16 kHz, 16-bit, mono)
177 sample_rate = 16000
178 bits_per_sample = 16
179 channels = 1
180 bytes_per_sample = bits_per_sample // 8
181 total_samples = len(audio_data) // (bytes_per_sample * channels)
182 duration_seconds = total_samples / sample_rate
183
184 self.get_logger().info(
185 f"Audio duration: {duration_seconds:.2f} s ({total_samples} samples)")
186
187 except Exception as e:
188 self.get_logger().error(f"Failed to save audio file: {str(e)}")
189
190
191def main(args=None):
192 rclpy.init(args=args)
193 node = AudioSubscriber()
194
195 try:
196 node.get_logger().info("Listening to noise-suppressed audio data, press Ctrl+C to exit...")
197 rclpy.spin(node)
198 except KeyboardInterrupt:
199 node.get_logger().info("Interrupt signal received, exiting...")
200 finally:
201 node.destroy_node()
202 rclpy.shutdown()
203
204
205if __name__ == '__main__':
206 main()
使用说明:
运行程序:
# 构建Python包 colcon build --packages-select py_examples # 运行麦克风接收程序 ros2 run py_examples mic_receiver
目录结构:
运行节点后会自动创建
audio_recordings目录音频文件按stream_id分类存储:
stream_1/: 内置麦克风音频stream_2/: 外置麦克风音频
文件命名格式:
{stream_name}_{timestamp}.pcminternal_mic_20250909_133649_738.pcm(内置麦克风)external_mic_20250909_133650_123.pcm(外置麦克风)
音频格式:16kHz, 16位, 单声道PCM
播放保存的PCM文件:
# 播放内置麦克风录音 aplay -r 16000 -f S16_LE -c 1 audio_recordings/stream_1/internal_mic_20250909_133649_738.pcm # 播放外置麦克风录音 aplay -r 16000 -f S16_LE -c 1 audio_recordings/stream_2/external_mic_20250909_133650_123.pcm
转换为WAV格式(可选):
# 使用ffmpeg转换为WAV格式 ffmpeg -f s16le -ar 16000 -ac 1 -i external_mic_20250909_133649_738.pcm output.wav
程序控制:
按 Ctrl+C 安全退出程序
程序会自动处理音频流的开始和结束
支持同时处理多个音频流
输出示例:
正常启动和运行:
[INFO] 开始订阅降噪音频数据...
[INFO] 收到音频数据: stream_id=1, vad_state=0(无语音), audio_size=0 bytes
[INFO] [内置麦克风] VAD状态: 无语音 音频数据: 0 bytes
[INFO] 收到音频数据: stream_id=2, vad_state=0(无语音), audio_size=0 bytes
[INFO] [外置麦克风] VAD状态: 无语音 音频数据: 0 bytes
检测到语音开始:
[INFO] 收到音频数据: stream_id=2, vad_state=1(语音开始), audio_size=320 bytes
[INFO] [外置麦克风] VAD状态: 语音开始 音频数据: 320 bytes
[INFO] 🎤 检测到语音开始
语音处理过程:
[INFO] 收到音频数据: stream_id=2, vad_state=2(语音处理中), audio_size=320 bytes
[INFO] [外置麦克风] VAD状态: 语音处理中 音频数据: 320 bytes
[INFO] 🔄 语音处理中...
[INFO] 收到音频数据: stream_id=2, vad_state=2(语音处理中), audio_size=320 bytes
[INFO] [外置麦克风] VAD状态: 语音处理中 音频数据: 320 bytes
[INFO] 🔄 语音处理中...
语音结束和保存:
[INFO] 收到音频数据: stream_id=2, vad_state=3(语音结束), audio_size=320 bytes
[INFO] [外置麦克风] VAD状态: 语音结束 音频数据: 320 bytes
[INFO] ✅ 语音结束
[INFO] 音频段已保存: audio_recordings/stream_2/external_mic_20250909_133649_738.pcm (大小: 960 bytes)
[INFO] 音频时长: 0.06 秒 (480 样本)
多音频流同时处理:
[INFO] 收到音频数据: stream_id=1, vad_state=1(语音开始), audio_size=320 bytes
[INFO] [内置麦克风] VAD状态: 语音开始 音频数据: 320 bytes
[INFO] 🎤 检测到语音开始
[INFO] 收到音频数据: stream_id=2, vad_state=1(语音开始), audio_size=320 bytes
[INFO] [外置麦克风] VAD状态: 语音开始 音频数据: 320 bytes
[INFO] 🎤 检测到语音开始
程序退出:
^C[INFO] 接收到中断信号,正在退出...
[INFO] 程序已安全退出
注意事项:
程序支持同时处理多个音频流(内置和外置麦克风)
每个音频流都有独立的缓冲区和录音状态
音频文件会自动按时间戳和音频流分类保存
程序具有完善的错误处理机制,确保稳定运行
6.1.18 表情控制
该示例中用到了play_emoji,通过该节点可实现表达指定的表情,用户可根据已有的表情列表来选择要变化的表情,具体表情列表可参考表情列表
1#!/usr/bin/env python3
2
3import rclpy
4import rclpy.logging
5from rclpy.node import Node
6
7from aimdk_msgs.srv import PlayEmoji
8
9
10class PlayEmojiClient(Node):
11 def __init__(self):
12 super().__init__('play_emoji_client')
13 self.client = self.create_client(
14 PlayEmoji, '/face_ui_proxy/play_emoji')
15 self.get_logger().info('✅ PlayEmoji client node created.')
16
17 # Wait for the service to become available
18 while not self.client.wait_for_service(timeout_sec=2.0):
19 self.get_logger().info('⏳ Service unavailable, waiting...')
20
21 self.get_logger().info('🟢 Service available, ready to send request.')
22
23 def send_request(self, emoji: int, mode: int, priority: int):
24 req = PlayEmoji.Request()
25
26 req.emotion_id = int(emoji)
27 req.mode = int(mode)
28 req.priority = int(priority)
29
30 self.get_logger().info(
31 f'📨 Sending request to play emoji: id={emotion_id}, mode={mode}, priority={priority}')
32 for i in range(8):
33 req.header.header.stamp = self.get_clock().now().to_msg()
34 future = self.client.call_async(req)
35 rclpy.spin_until_future_complete(self, future, timeout_sec=0.25)
36
37 if future.done():
38 break
39
40 # retry as remote peer is NOT handled well by ROS
41 self.get_logger().info(f'trying ... [{i}]')
42
43 resp = future.result()
44 if resp is None:
45 self.get_logger().error('❌ Service call not completed or timed out.')
46 return False
47
48 if resp.success:
49 self.get_logger().info(
50 f'✅ Emoji played successfully: {resp.message}')
51 return True
52 else:
53 self.get_logger().error(f'❌ Failed to play emoji: {resp.message}')
54 return False
55
56
57def main(args=None):
58 rclpy.init(args=args)
59 node = None
60
61 # Interactive input, same as the original C++ version
62 try:
63 emotion = int(
64 input("Enter emoji ID: 1-blink, 60-bored, 70-abnormal, 80-sleeping, 90-happy ... 190-double angry, 200-adore: "))
65 mode = int(input("Enter play mode (1: play once, 2: loop): "))
66 if mode not in (1, 2):
67 raise ValueError("invalid mode")
68 priority = 10 # default priority
69
70 node = PlayEmojiClient()
71 node.send_request(emotion, mode, priority)
72 except KeyboardInterrupt:
73 pass
74 except Exception as e:
75 rclpy.logging.get_logger('main').error(
76 f'Program exited with exception: {e}')
77
78 if node:
79 node.destroy_node()
80 if rclpy.ok():
81 rclpy.shutdown()
82
83
84if __name__ == '__main__':
85 main()
6.1.19 LED灯带控制
功能说明:演示如何控制机器人的LED灯带,支持多种显示模式和自定义颜色。
核心代码:
1#!/usr/bin/env python3
2
3import sys
4import rclpy
5import rclpy.logging
6from rclpy.node import Node
7
8from aimdk_msgs.msg import CommonRequest
9from aimdk_msgs.srv import LedStripCommand
10
11
12class PlayLightsClient(Node):
13 def __init__(self):
14 super().__init__('play_lights_client')
15
16 # create service client
17 self.client = self.create_client(
18 LedStripCommand, '/aimdk_5Fmsgs/srv/LedStripCommand')
19
20 self.get_logger().info('✅ PlayLights client node created.')
21
22 # Wait for the service to become available
23 while not self.client.wait_for_service(timeout_sec=2.0):
24 self.get_logger().info('⏳ Service unavailable, waiting...')
25
26 self.get_logger().info('🟢 Service available, ready to send request.')
27
28 def send_request(self, led_mode, r, g, b):
29 """Send LED control request"""
30 # create request
31 request = LedStripCommand.Request()
32 request.led_strip_mode = led_mode
33 request.r = r
34 request.g = g
35 request.b = b
36
37 # send request
38 # Note: LED strip is slow to response (up to ~5s)
39 self.get_logger().info(
40 f'📨 Sending request to control led strip: mode={led_mode}, RGB=({r}, {g}, {b})')
41 for i in range(4):
42 request.request.header.stamp = self.get_clock().now().to_msg()
43 future = self.client.call_async(request)
44 rclpy.spin_until_future_complete(self, future, timeout_sec=5)
45
46 if future.done():
47 break
48
49 # retry as remote peer is NOT handled well by ROS
50 self.get_logger().info(f'trying ... [{i}]')
51
52 response = future.result()
53 if response is None:
54 self.get_logger().error('❌ Service call not completed or timed out.')
55 return False
56
57 if response.status_code == 0:
58 self.get_logger().info('✅ LED strip command sent successfully.')
59 return True
60 else:
61 self.get_logger().error(
62 f'❌ LED strip command failed with status: {response.status_code}')
63 return False
64
65
66def main(args=None):
67 rclpy.init(args=args)
68 node = None
69
70 try:
71 # get command line args
72 if len(sys.argv) > 4:
73 # use CLI args
74 led_mode = int(sys.argv[1])
75 if led_mode not in (0, 1, 2, 3):
76 raise ValueError("invalid mode")
77 r = int(sys.argv[2])
78 if r < 0 or r > 255:
79 raise ValueError("invalid R value")
80 g = int(sys.argv[3])
81 if g < 0 or g > 255:
82 raise ValueError("invalid G value")
83 b = int(sys.argv[4])
84 if b < 0 or b > 255:
85 raise ValueError("invalid B value")
86 else:
87 # interactive input
88 print("=== LED strip control example ===")
89 print("Select LED strip mode:")
90 print("0 - Steady on")
91 print("1 - Breathing (4s cycle, sine brightness)")
92 print("2 - Blinking (1s cycle, 0.5s on, 0.5s off)")
93 print("3 - Flowing (2s cycle, light up from left to right)")
94
95 led_mode = int(input("Enter mode (0-3): "))
96 if led_mode not in (0, 1, 2, 3):
97 raise ValueError("invalid mode")
98
99 print("\nSet RGB color values (0-255):")
100 r = int(input("Red (R): "))
101 if r < 0 or r > 255:
102 raise ValueError("invalid R value")
103 g = int(input("Green (G): "))
104 if g < 0 or g > 255:
105 raise ValueError("invalid G value")
106 b = int(input("Blue (B): "))
107 if b < 0 or b > 255:
108 raise ValueError("invalid B value")
109
110 node = PlayLightsClient()
111 node.send_request(led_mode, r, g, b)
112 except KeyboardInterrupt:
113 pass
114 except Exception as e:
115 rclpy.logging.get_logger('main').error(
116 f'Program exited with exception: {e}')
117
118 if node:
119 node.destroy_node()
120 if rclpy.ok():
121 rclpy.shutdown()
122
123
124if __name__ == '__main__':
125 main()
使用说明:
# 构建
colcon build --packages-select py_examples
# 交互式运行
ros2 run py_examples play_lights
# 命令行参数运行
ros2 run py_examples play_lights 1 255 0 0 # 模式1,红色
输出示例:
=== LED灯带控制示例 ===
请选择灯带模式:
0 - 常亮模式
1 - 呼吸模式 (4s周期,亮度正弦变化)
2 - 闪烁模式 (1s周期,0.5s亮,0.5s灭)
3 - 流水模式 (2s周期,从左到右依次点亮)
请输入模式 (0-3): 1
请设置RGB颜色值 (0-255):
红色分量 (R): 255
绿色分量 (G): 0
蓝色分量 (B): 0
发送LED控制命令...
模式: 1, 颜色: RGB(255, 0, 0)
✅ LED strip command sent successfully
技术特点:
支持4种LED显示模式
RGB颜色自定义
同步服务调用
命令行参数支持
输入参数验证
友好的用户交互界面