6.1 Python接口使用示例
本章节将带您实现索引所示若干功能
构建 & 运行说明
进入到
aimdk目录,执行以下命令source /opt/ros/humble/setup.bash colcon build source install/setup.bash ros2 run py_examples '对应功能名称如: get_mc_action'
📝 代码说明
完整的代码实现包含完整的错误处理、信号处理、超时处理等机制,确保程序的健壮性。请您在 py_examples 目录下查看/修改
6.1.1 获取机器人模式
通过调用GetMcAction服务获取机器人当前的运行模式,包括模式ID、描述和状态信息。
1#!/usr/bin/env python3
2import rclpy
3from rclpy.node import Node
4from aimdk_msgs.srv import GetMcAction
5from aimdk_msgs.msg import CommonRequest
6
7
8class GetMcActionClient(Node):
9 def __init__(self):
10 super().__init__('get_mc_action_client')
11 self.client = self.create_client(
12 GetMcAction, '/aimdk_5Fmsgs/srv/GetMcAction')
13 self.get_logger().info('Get MC Action Client created')
14 while not self.client.wait_for_service(timeout_sec=1.0):
15 self.get_logger().info('Service not available, waiting...')
16
17 def send_request(self):
18 request = GetMcAction.Request()
19 request.request = CommonRequest()
20
21 self.get_logger().info('Sending request to get robot mode')
22 for i in range(8):
23 future = self.client.call_async(request)
24 rclpy.spin_until_future_complete(self, future, timeout_sec=0.25)
25
26 if future.done():
27 break
28
29 # retry as remote peer: mc module can be under heavy load
30 self.get_logger().info(f"trying to register input source... [{i}]")
31
32 if future.result() is not None:
33 response = future.result()
34 self.get_logger().info(
35 'Robot modes (1: default mode, 100: JOINT_DEFAULT, 200: STAND_DEFAULT, 300: LOCOMOTION_DEFAULT)')
36 self.get_logger().info(
37 f'Current robot mode: {response.info.current_action.value}')
38 self.get_logger().info(
39 f'Mode description: {response.info.action_desc}')
40 self.get_logger().info(
41 f'Mode status: {response.info.status.value}')
42 else:
43 self.get_logger().error('Service call failed')
44
45
46def main(args=None):
47 rclpy.init(args=args)
48 client = GetMcActionClient()
49 client.send_request()
50 client.destroy_node()
51 rclpy.shutdown()
52
53
54if __name__ == '__main__':
55 main()
6.1.2 设置机器人模式
该示例中用到了SetMcAction服务,运行节点程序后终端输入模式对应的字段值,机器人会立即进入相应的运动模式。
切入稳定站立(STAND_DEFAULT)模式前,确保机器人脚已经着地。
1#!/usr/bin/env python3
2import sys
3import rclpy
4from rclpy.node import Node
5
6from aimdk_msgs.srv import SetMcAction
7from aimdk_msgs.msg import RequestHeader, CommonState, McAction, McActionCommand
8
9
10class SetMcActionClient(Node):
11 def __init__(self):
12 super().__init__('set_mc_action_client')
13 self.client = self.create_client(
14 SetMcAction, '/aimdk_5Fmsgs/srv/SetMcAction'
15 )
16 self.get_logger().info('✅ SetMcAction client node created.')
17
18 # Wait for the service to become available
19 while not self.client.wait_for_service(timeout_sec=1.0):
20 self.get_logger().info('⏳ Service unavailable, waiting...')
21
22 def send_request(self, action_value: int):
23 req = SetMcAction.Request()
24 req.header = RequestHeader()
25
26 cmd = McActionCommand()
27 act = McAction()
28 act.value = action_value
29 cmd.action = act
30 cmd.action_desc = f"Set robot mode to: {action_value}"
31 req.command = cmd
32
33 self.get_logger().info(
34 f'Sending request to set robot mode: {action_value}')
35 for i in range(8):
36 req.header.stamp = self.get_clock().now().to_msg()
37 future = self.client.call_async(req)
38 rclpy.spin_until_future_complete(self, future, timeout_sec=0.25)
39
40 if future.done():
41 break
42
43 # retry as remote peer: mc module can be under heavy load
44 self.get_logger().info(f"trying to register input source... [{i}]")
45
46 if not future.done():
47 self.get_logger().error('❌ Service call not completed or timed out.')
48 return
49
50 resp = future.result()
51 if resp is None:
52 self.get_logger().error('❌ Service call failed — no response received.')
53 return
54
55 if resp.response.status.value == CommonState.SUCCESS:
56 self.get_logger().info('✅ Robot mode set successfully.')
57 else:
58 self.get_logger().error(
59 f'❌ Failed to set robot mode: {resp.response.message}'
60 )
61
62
63def main(args=None):
64 rclpy.init(args=args)
65 node = SetMcActionClient()
66
67 # Prefer command-line argument, otherwise prompt for input
68 if len(sys.argv) > 1:
69 try:
70 motion = int(sys.argv[1])
71 node.get_logger().info(f'Using command-line argument: {motion}')
72 except ValueError:
73 node.get_logger().error('Invalid command-line argument — must be an integer.')
74 node.destroy_node()
75 rclpy.shutdown()
76 return
77 else:
78 motion = int(
79 input(
80 "Enter robot mode (1: Default, 100: Position Control Stand, "
81 "200: Stable Stand, 300: Walking): "
82 )
83 )
84
85 try:
86 node.send_request(motion)
87 except KeyboardInterrupt:
88 node.get_logger().info('🟡 Interrupted by user.')
89
90 node.destroy_node()
91 rclpy.shutdown()
92
93
94if __name__ == '__main__':
95 main()
使用说明
# 使用命令行参数设置模式(推荐)
ros2 run py_examples set_mc_action 100 # 站姿预备(位控站立)模式
ros2 run py_examples set_mc_action 300 # 走跑模式
# 或者不带参数运行,程序会提示用户输入
ros2 run py_examples set_mc_action
输出示例
[INFO] [set_mc_action_client]: 设置机器人模式成功
注意事项
切入
stand_default模式前,确保机器人脚已经着地模式切换可能需要几秒钟时间完成
接口参考
服务:
/aimdk_5Fmsgs/srv/SetMcAction消息:
aimdk_msgs/srv/SetMcAction
6.1.3 设置机器人动作
该示例中用到了preset_motion_client,启动节点程序后,输入相应的字段值可实现左手(右手)的握手(举手、挥手、飞吻)动作。
可供调用的参数见预设动作表
1#!/usr/bin/env python3
2
3import rclpy
4from rclpy.node import Node
5from rclpy.task import Future
6import signal
7import sys
8
9from aimdk_msgs.srv import SetMcPresetMotion
10from aimdk_msgs.msg import McPresetMotion, McControlArea, RequestHeader, CommonState
11
12g_node = None
13
14
15def signal_handler(sig, frame):
16 global g_node
17 if g_node is not None:
18 g_node.get_logger().info(f'接收到信号 {sig},正在关闭...')
19 g_node.destroy_node()
20 rclpy.shutdown()
21 sys.exit(0)
22
23
24class PresetMotionClient(Node):
25
26 def __init__(self):
27 super().__init__('preset_motion_client')
28
29 self.client = self.create_client(
30 SetMcPresetMotion, '/aimdk_5Fmsgs/srv/SetMcPresetMotion'
31 )
32
33 self.get_logger().info("预设动作服务客户端已创建")
34
35 timeout_sec = 8.0
36 start = self.get_clock().now()
37 while not self.client.wait_for_service(timeout_sec=2.0):
38 now = self.get_clock().now()
39 if (now - start).nanoseconds / 1e9 > timeout_sec:
40 self.get_logger().error("等待服务超时")
41 rclpy.shutdown()
42 return
43 self.get_logger().info("等待服务启动中...")
44
45 def send_motion_request(self, area_id: int, motion_id: int) -> bool:
46 request = SetMcPresetMotion.Request()
47
48 request.header = RequestHeader()
49
50 motion = McPresetMotion()
51 area = McControlArea()
52
53 motion.value = motion_id
54 area.value = area_id
55
56 request.motion = motion
57 request.area = area
58 request.interrupt = False
59
60 self.get_logger().info(
61 f"手臂:(ID={area_id});发送预设动作请求: (ID={motion_id})"
62 )
63
64 for i in range(8):
65 request.header.stamp = self.get_clock().now().to_msg()
66 future = self.client.call_async(request)
67 rclpy.spin_until_future_complete(self, future, timeout_sec=0.25)
68
69 if future.done():
70 break
71
72 # retry as remote peer: mc module can be under heavy load
73 self.get_logger().info(f"trying to register input source... [{i}]")
74
75 if future.done():
76 try:
77 response = future.result()
78 if response.response.state.value == CommonState.SUCCESS:
79 self.get_logger().info(
80 f"动作执行成功: {response.response.task_id}")
81 return True
82 elif response.response.state.value == CommonState.RUNNING:
83 self.get_logger().info(
84 f"动作正在执行中: {response.response.task_id}")
85 return True
86 else:
87 self.get_logger().warn(
88 f"动作执行失败: {response.response.task_id}")
89 return False
90 except Exception as e:
91 self.get_logger().error(f"调用服务异常: {str(e)}")
92 return False
93 else:
94 self.get_logger().error("服务调用失败或超时")
95 return False
96
97
98def main(args=None):
99 global g_node
100 rclpy.init(args=args)
101
102 signal.signal(signal.SIGINT, signal_handler)
103 signal.signal(signal.SIGTERM, signal_handler)
104
105 g_node = PresetMotionClient()
106
107 Hand_area = int(input("请输入手臂区域(左手=1,右手=2):"))
108 Hand_motion = int(input("请输入预设动作ID(举手=1001,挥手=1002,握手=1003,飞吻=1004):"))
109
110 success = g_node.send_motion_request(Hand_area, Hand_motion)
111 if not success:
112 g_node.get_logger().error("发送挥手请求失败")
113 rclpy.shutdown()
114 return 1
115
116 g_node.destroy_node()
117 rclpy.shutdown()
118
119
120if __name__ == '__main__':
121 main()
6.1.4 夹爪控制
该示例中用到了hand_control,通过往话题/aima/hal/joint/hand/command发布消息来控制夹爪的运动
注意
注意⚠️ :运行本示例前需要在机器人运控计算单元(PC1)使用aima em stop-app mc关闭原生运控模块,获取机器人控制权。注意确保机器人安全
1import rclpy
2from rclpy.node import Node
3from aimdk_msgs.msg import HandCommandArray, HandCommand, HandType, MessageHeader
4import time
5
6
7class HandControl(Node):
8 def __init__(self):
9 super().__init__('hand_control')
10
11 # Preset parameter list: [(left hand, right hand), ...]
12 self.position_pairs = [
13 (1.0, 1.0), # fully open
14 (0.0, 0.0), # fully closed
15 (0.5, 0.5), # half open
16 (0.2, 0.8), # left slightly closed, right more open
17 (0.7, 0.3) # left more open, right slightly closed
18 ]
19 self.current_index = 0
20 self.last_switch_time = self.get_clock().now().nanoseconds / 1e9 # seconds
21
22 # Create publisher
23 self.publisher_ = self.create_publisher(
24 HandCommandArray,
25 '/aima/hal/joint/hand/command',
26 10
27 )
28
29 # 50 Hz timer
30 self.timer_ = self.create_timer(
31 0.02, # 20 ms = 50 Hz
32 self.publish_hand_commands
33 )
34
35 self.get_logger().info("Hand control node started!")
36
37 def publish_hand_commands(self):
38 # Check time to decide whether to switch to the next preset
39 now_sec = self.get_clock().now().nanoseconds / 1e9
40 if now_sec - self.last_switch_time >= 2.0:
41 self.current_index = (self.current_index +
42 1) % len(self.position_pairs)
43 self.last_switch_time = now_sec
44 self.get_logger().info(
45 f"Switched to preset: {self.current_index}, left={self.position_pairs[self.current_index][0]:.2f}, right={self.position_pairs[self.current_index][1]:.2f}"
46 )
47
48 # Use current preset
49 left_position, right_position = self.position_pairs[self.current_index]
50
51 msg = HandCommandArray()
52 msg.header = MessageHeader()
53
54 # Configure left hand
55 left_hand = HandCommand()
56 left_hand.name = "left_hand"
57 left_hand.position = float(left_position)
58 left_hand.velocity = 1.0
59 left_hand.acceleration = 1.0
60 left_hand.deceleration = 1.0
61 left_hand.effort = 1.0
62
63 # Configure right hand
64 right_hand = HandCommand()
65 right_hand.name = "right_hand"
66 right_hand.position = float(right_position)
67 right_hand.velocity = 1.0
68 right_hand.acceleration = 1.0
69 right_hand.deceleration = 1.0
70 right_hand.effort = 1.0
71
72 msg.left_hand_type = HandType(value=2) # gripper mode
73 msg.right_hand_type = HandType(value=2)
74 msg.left_hands = [left_hand]
75 msg.right_hands = [right_hand]
76
77 # Publish message
78 self.publisher_.publish(msg)
79 # We only log when switching presets to avoid too much log output
80
81
82def main(args=None):
83 rclpy.init(args=args)
84 hand_control_node = HandControl()
85
86 try:
87 rclpy.spin(hand_control_node)
88 except KeyboardInterrupt:
89 pass
90 finally:
91 hand_control_node.destroy_node()
92 rclpy.shutdown()
93
94
95if __name__ == '__main__':
96 main()
6.1.5 灵巧手控制 (升级中暂不开放)
该示例中用到了omnihand_control,通过往话题/aima/hal/joint/hand/command发布消息来控制夹爪的运动
注意
注意⚠️ :运行本示例前需要在机器人运控计算单元(PC1)使用aima em stop-app mc关闭原生运控模块,获取机器人控制权。注意确保机器人安全
1#!/usr/bin/env python3
2import rclpy
3from rclpy.node import Node
4from aimdk_msgs.msg import HandCommand, HandCommandArray, HandType, MessageHeader
5
6
7class OmnihandControl(Node):
8 def __init__(self):
9 super().__init__('omnihand_control')
10
11 self.num_fingers = 5
12 self.joints_per_finger = 2
13 self.total_joints = self.num_fingers * self.joints_per_finger # 10
14
15 # ===== Define 10 gesture sets directly as arrays =====
16 # Each set has 20 joint params (left 10 + right 10)
17 # 1 = bent, 0 = straight
18 self.gesture_sequence = [
19 # bend 1 finger
20 [1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0,
21 1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
22 # bend 2 fingers
23 [1.0, 1.0, 1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0,
24 1.0, 1.0, 1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0],
25 # bend 3 fingers
26 [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0,
27 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0],
28 # bend 4 fingers
29 [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0, 0.0,
30 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0, 0.0],
31 # bend 5 fingers
32 [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0,
33 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0],
34 # release 1 finger
35 [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0, 0.0,
36 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0, 0.0],
37 # release 2 fingers
38 [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0,
39 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0],
40 # release 3 fingers
41 [1.0, 1.0, 1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0,
42 1.0, 1.0, 1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0],
43 # release 4 fingers
44 [1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0,
45 1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
46 # all straight
47 [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0,
48 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
49 ]
50
51 self.current_index = 0
52 self.last_switch_time = self.get_clock().now()
53
54 self.publisher = self.create_publisher(
55 HandCommandArray,
56 '/aima/hal/joint/hand/command',
57 10
58 )
59
60 # Switch gesture every second
61 self.timer = self.create_timer(
62 1.0,
63 self.publish_hand_commands
64 )
65
66 self.get_logger().info("Omnihand demo control node started!")
67
68 def publish_hand_commands(self):
69 now_time = self.get_clock().now()
70 if (now_time - self.last_switch_time).nanoseconds / 1e9 >= 1.0:
71 self.current_index = (self.current_index +
72 1) % len(self.gesture_sequence)
73 self.last_switch_time = now_time
74 self.get_logger().info(
75 f'Switched to gesture {self.current_index + 1}/10')
76
77 joint_array = self.gesture_sequence[self.current_index]
78
79 msg = HandCommandArray()
80 msg.header = MessageHeader()
81
82 # Left hand
83 for i in range(self.total_joints):
84 cmd = HandCommand()
85 cmd.name = f"left_hand_joint{i}"
86 cmd.position = joint_array[i]
87 cmd.velocity = 1.0
88 cmd.acceleration = 1.0
89 cmd.deceleration = 1.0
90 cmd.effort = 1.0
91 msg.left_hands.append(cmd)
92
93 # Right hand
94 for i in range(self.total_joints):
95 cmd = HandCommand()
96 cmd.name = f"right_hand_joint{i}"
97 cmd.position = joint_array[self.total_joints + i]
98 cmd.velocity = 1.0
99 cmd.acceleration = 1.0
100 cmd.deceleration = 1.0
101 cmd.effort = 1.0
102 msg.right_hands.append(cmd)
103
104 msg.left_hand_type = HandType()
105 msg.left_hand_type.value = 1 # dexterous hand mode
106 msg.right_hand_type = HandType()
107 msg.right_hand_type.value = 1
108
109 self.publisher.publish(msg)
110
111
112def main(args=None):
113 rclpy.init(args=args)
114 node = OmnihandControl()
115 try:
116 rclpy.spin(node)
117 except KeyboardInterrupt:
118 pass
119 node.destroy_node()
120 rclpy.shutdown()
121
122
123if __name__ == '__main__':
124 main()
6.1.6 注册二开输入源
对于v0.7之后的版本,在实现对MC的控制前,需要先注册输入源。该示例中通过/aimdk_5Fmsgs/srv/SetMcInputSource服务来注册二开的输入源,让MC感知到,只有注册过输入源后才能实现机器人的速度控制
1#!/usr/bin/env python3
2
3import rclpy
4import rclpy.logging
5from rclpy.node import Node
6import signal
7import sys
8
9from aimdk_msgs.srv import SetMcInputSource
10from aimdk_msgs.msg import RequestHeader, McInputAction
11
12client_node = None # for shutting down cleanly in signal handler
13
14
15def signal_handler(signum, frame):
16 if client_node is not None:
17 client_node.get_logger().info(
18 f"Received signal {signum}, shutting down...")
19 if rclpy.ok():
20 rclpy.shutdown()
21
22
23class McInputClient(Node):
24 def __init__(self):
25 super().__init__('set_mc_input_source_client')
26 self.client = self.create_client(
27 SetMcInputSource, '/aimdk_5Fmsgs/srv/SetMcInputSource'
28 )
29
30 self.get_logger().info('✅ SetMcInputSource client created.')
31
32 timeout_sec = 8.0
33 start = self.get_clock().now().nanoseconds / 1e9
34
35 while not self.client.wait_for_service(timeout_sec=2.0):
36 now = self.get_clock().now().nanoseconds / 1e9
37 if now - start > timeout_sec:
38 self.get_logger().error('❌ Timed out waiting for service.')
39 raise RuntimeError('Service unavailable')
40 self.get_logger().info('Waiting for input source service...')
41
42 self.get_logger().info('🟢 Service available, ready to send request.')
43
44 def send_input_request(self) -> bool:
45 try:
46 req = SetMcInputSource.Request()
47
48 # header
49 req.request.header = RequestHeader()
50
51 # action (e.g. 1001 = register)
52 req.action = McInputAction()
53 req.action.value = 1001
54
55 # input source info
56 req.input_source.name = 'node'
57 req.input_source.priority = 40
58 req.input_source.timeout = 1000 # ms
59
60 self.get_logger().info(
61 f"📨 Sending input source request: action_id={req.action.value}, "
62 f"name={req.input_source.name}, priority={req.input_source.priority}"
63 )
64
65 for i in range(8):
66 req.request.header.stamp = self.get_clock().now().to_msg()
67 future = self.client.call_async(req)
68 rclpy.spin_until_future_complete(
69 self, future, timeout_sec=0.25)
70
71 if future.done():
72 break
73
74 # retry as remote peer: mc module can be under heavy load
75 self.get_logger().info(
76 f"trying to register input source... [{i}]")
77
78 if not future.done():
79 self.get_logger().error('❌ Service call failed or timed out.')
80 return False
81
82 resp = future.result()
83 ret_code = resp.response.header.code
84 task_id = resp.response.task_id
85
86 if ret_code == 0:
87 self.get_logger().info(
88 f"✅ Input source set successfully. task_id={task_id}"
89 )
90 return True
91 else:
92 self.get_logger().error(
93 f"❌ Input source set failed. ret_code={ret_code}, task_id={task_id} (duplicated ADD? or MODIFY/ENABLE/DISABLE for unknown source?)"
94 )
95 return False
96
97 except Exception as e:
98 self.get_logger().error(f"❌ Exception while calling service: {e}")
99 return False
100
101
102def main(args=None):
103 global client_node
104 rclpy.init(args=args)
105
106 # register signal handlers early
107 signal.signal(signal.SIGINT, signal_handler)
108 signal.signal(signal.SIGTERM, signal_handler)
109
110 try:
111 client_node = McInputClient()
112 ok = client_node.send_input_request()
113 if not ok:
114 client_node.get_logger().error('Input source request failed.')
115 except Exception as e:
116 rclpy.logging.get_logger('main').error(
117 f'Program exited with exception: {e}')
118 finally:
119 if rclpy.ok():
120 rclpy.shutdown()
121
122
123if __name__ == '__main__':
124 main()
6.1.7 获取当前输入源
该示例中用到了GetCurrentInputSource服务,用于获取当前已注册的输入源信息,包括输入源名称、优先级和超时时间等信息。
1#!/usr/bin/env python3
2
3import rclpy
4from rclpy.node import Node
5from aimdk_msgs.srv import GetCurrentInputSource
6from aimdk_msgs.msg import CommonRequest
7
8
9class GetCurrentInputSourceClient(Node):
10 def __init__(self):
11 super().__init__('get_current_input_source_client')
12 self.cli = self.create_client(
13 GetCurrentInputSource,
14 '/aimdk_5Fmsgs/srv/GetCurrentInputSource'
15 )
16
17 def send_request(self):
18 # Wait for the service to be available
19 timeout_sec = 8.0
20 start = self.get_clock().now().nanoseconds / 1e9
21
22 while not self.cli.wait_for_service(timeout_sec=2.0):
23 now = self.get_clock().now().nanoseconds / 1e9
24 if now - start > timeout_sec:
25 self.get_logger().error("Waiting for service timed out")
26 return False
27 self.get_logger().info("Waiting for input source service...")
28
29 # Create request
30 req = GetCurrentInputSource.Request()
31 req.request = CommonRequest()
32
33 # Send request and wait for response
34 self.get_logger().info("Sending request to get current input source")
35 for i in range(8):
36 req.request.header.stamp = self.get_clock().now().to_msg()
37 future = self.cli.call_async(req)
38 rclpy.spin_until_future_complete(self, future, timeout_sec=0.25)
39
40 if future.done():
41 break
42
43 if future.done():
44 try:
45 response = future.result()
46 ret_code = response.response.header.code
47 if ret_code == 0:
48 self.get_logger().info(
49 f"Current input source: {response.input_source.name}")
50 self.get_logger().info(
51 f"Priority: {response.input_source.priority}")
52 self.get_logger().info(
53 f"Timeout: {response.input_source.timeout}")
54 return True
55 else:
56 self.get_logger().error(
57 f"Service returned failure, return code: {ret_code}")
58 return False
59 except Exception as e:
60 self.get_logger().error(f"Exception occurred: {str(e)}")
61 return False
62 else:
63 self.get_logger().error("Service call failed or timed out")
64 return False
65
66
67def main(args=None):
68 rclpy.init(args=args)
69
70 node = GetCurrentInputSourceClient()
71
72 success = node.send_request()
73
74 node.destroy_node()
75 rclpy.shutdown()
76
77 return 0 if success else 1
78
79
80if __name__ == '__main__':
81 exit(main())
使用说明
# 获取当前输入源信息
ros2 run py_examples get_current_input_source
输出示例
[INFO] [get_current_input_source_client]: 当前输入源: node
[INFO] [get_current_input_source_client]: 优先级: 40
[INFO] [get_current_input_source_client]: 超时时间: 1000
注意事项
确保GetCurrentInputSource服务正常运行
需要在注册输入源之后才能获取到有效信息
状态码为0表示查询成功
6.1.8 控制机器人走跑
该示例中用到了mc_locomotion_velocity,以下示例通过发布/aima/mc/locomotion/velocity话题控制机器人的行走,对于v0.7之后的版本,实现速度控制前需要注册输入源(该示例已实现注册输入源),具体注册步骤可参考代码。
1#!/usr/bin/env python3
2
3import rclpy
4from rclpy.node import Node
5import threading
6import time
7import signal
8import sys
9
10from aimdk_msgs.msg import McLocomotionVelocity, MessageHeader
11from aimdk_msgs.srv import SetMcInputSource
12
13
14class DirectVelocityControl(Node):
15 def __init__(self):
16 super().__init__('direct_velocity_control')
17
18 self.publisher = self.create_publisher(
19 McLocomotionVelocity, '/aima/mc/locomotion/velocity', 10)
20 self.client = self.create_client(
21 SetMcInputSource, '/aimdk_5Fmsgs/srv/SetMcInputSource')
22
23 self.forward_velocity = 0.0
24 self.lateral_velocity = 0.0
25 self.angular_velocity = 0.0
26
27 self.max_forward_speed = 1.0
28 self.min_forward_speed = 0.2
29
30 self.max_lateral_speed = 0.5
31 self.min_lateral_speed = 0.2
32
33 self.max_angular_speed = 1.0
34 self.min_angular_speed = 0.1
35
36 self.timer = None
37
38 self.get_logger().info("Direct velocity control node started!")
39
40 def start_publish(self):
41 if not self.timer:
42 self.timer = self.create_timer(0.02, self.publish_velocity)
43
44 def register_input_source(self):
45 self.get_logger().info("Registering input source...")
46
47 timeout_sec = 8.0
48 start = self.get_clock().now().nanoseconds / 1e9
49
50 while not self.client.wait_for_service(timeout_sec=2.0):
51 now = self.get_clock().now().nanoseconds / 1e9
52 if now - start > timeout_sec:
53 self.get_logger().error("Waiting for service timed out")
54 return False
55 self.get_logger().info("Waiting for input source service...")
56
57 req = SetMcInputSource.Request()
58 req.action.value = 1001
59 req.input_source.name = "node"
60 req.input_source.priority = 40
61 req.input_source.timeout = 1000
62
63 for i in range(8):
64 req.request.header.stamp = self.get_clock().now().to_msg()
65 future = self.client.call_async(req)
66 rclpy.spin_until_future_complete(self, future, timeout_sec=0.25)
67
68 if future.done():
69 break
70
71 # retry as remote peer: mc module can be under heavy load
72 self.get_logger().info(f"trying to register input source... [{i}]")
73
74 if future.done():
75 try:
76 response = future.result()
77 state = response.response.state.value
78 self.get_logger().info(
79 f"Input source set successfully: state={state}, task_id={response.response.task_id}")
80 return True
81 except Exception as e:
82 self.get_logger().error(f"Service call exception: {str(e)}")
83 return False
84 else:
85 self.get_logger().error("Service call failed or timed out")
86 return False
87
88 def publish_velocity(self):
89 msg = McLocomotionVelocity()
90 msg.header = MessageHeader()
91 msg.header.stamp = self.get_clock().now().to_msg()
92 msg.source = "node"
93 msg.forward_velocity = self.forward_velocity
94 msg.lateral_velocity = self.lateral_velocity
95 msg.angular_velocity = self.angular_velocity
96
97 self.publisher.publish(msg)
98
99 self.get_logger().info(
100 f"Publishing velocity: forward {self.forward_velocity:.2f} m/s, "
101 f"lateral {self.lateral_velocity:.2f} m/s, "
102 f"angular {self.angular_velocity:.2f} rad/s"
103 )
104
105 def set_forward(self, forward):
106 # check value range, mc has thresholds to start movement
107 if abs(forward) < 0.005:
108 self.forward_velocity = 0.0
109 return True
110 elif abs(forward) > self.max_forward_speed or abs(forward) < self.min_forward_speed:
111 raise ValueError("out of range")
112 else:
113 self.forward_velocity = forward
114 return True
115
116 def set_lateral(self, lateral):
117 # check value range, mc has thresholds to start movement
118 if abs(lateral) < 0.005:
119 self.lateral_velocity = 0.0
120 return True
121 elif abs(lateral) > self.max_lateral_speed or abs(lateral) < self.min_lateral_speed:
122 raise ValueError("out of range")
123 else:
124 self.lateral_velocity = lateral
125 return True
126
127 def set_angular(self, angular):
128 # check value range, mc has thresholds to start movement
129 if abs(angular) < 0.005:
130 self.angular_velocity = 0.0
131 return True
132 elif abs(angular) > self.max_angular_speed or abs(angular) < self.min_angular_speed:
133 raise ValueError("out of range")
134 else:
135 self.angular_velocity = angular
136 return True
137
138 def clear_velocity(self):
139 self.forward_velocity = 0.0
140 self.lateral_velocity = 0.0
141 self.angular_velocity = 0.0
142
143
144# Global node instance for signal handling
145global_node = None
146
147
148def signal_handler(sig, frame):
149 global global_node
150 if global_node is not None:
151 global_node.clear_velocity()
152 global_node.get_logger().info(
153 f"Received signal {sig}, clearing velocity and shutting down")
154 rclpy.shutdown()
155 sys.exit(0)
156
157
158def main():
159 global global_node
160 rclpy.init()
161
162 node = DirectVelocityControl()
163 global_node = node
164
165 signal.signal(signal.SIGINT, signal_handler)
166 signal.signal(signal.SIGTERM, signal_handler)
167
168 if not node.register_input_source():
169 node.get_logger().error("Input source registration failed, exiting")
170 rclpy.shutdown()
171 return
172
173 # get and check control values
174 # notice that mc has thresholds to start movement
175 try:
176 # get input forward
177 forward = float(
178 input("Please enter forward velocity 0 or ±(0.2 ~ 1.0) m/s: "))
179 node.set_forward(forward)
180 # get input lateral
181 lateral = float(
182 input("Please enter lateral velocity 0 or ±(0.2 ~ 0.5) m/s: "))
183 node.set_lateral(lateral)
184 # get input angular
185 angular = float(
186 input("Please enter angular velocity 0 or ±(0.1 ~ 1.0) rad/s: "))
187 node.set_angular(angular)
188 except Exception as e:
189 node.get_logger().error(f"Invalid input: {e}")
190 rclpy.shutdown()
191 return
192
193 node.get_logger().info("Setting velocity, moving for 5 seconds")
194 node.start_publish()
195
196 start = node.get_clock().now()
197 while (node.get_clock().now() - start).nanoseconds / 1e9 < 5.0:
198 rclpy.spin_once(node, timeout_sec=0.1)
199 time.sleep(0.001)
200
201 node.clear_velocity()
202 node.get_logger().info("5-second motion finished, robot stopped")
203
204 rclpy.spin(node)
205 rclpy.shutdown()
206
207
208if __name__ == '__main__':
209 main()
6.1.9 关节电机控制
本示例展示了如何使用ROS2和Ruckig库来控制机器人的关节运动。
注意
注意⚠️ :运行本示例前需要在机器人运控计算单元(PC1)使用aima em stop-app mc关闭原生运控模块,获取机器人控制权。注意确保机器人安全
!该示例代码直接对底层电机也就是HAL层进行控制,在运行程序前请检查代码中对关节的安全限位是否与所用机器人相符, 并确保安全!
机器人关节控制示例
这个示例展示了如何使用ROS2和Ruckig库来控制机器人的关节运动。示例实现了以下功能:
机器人关节模型定义
基于Ruckig的轨迹插补
多关节协同控制
实时位置、速度和加速度控制
示例功能说明
创建了四个控制器节点,分别控制:
腿部x2(12个关节)
腰部x1(3个关节)
手臂x2(14个关节)
头部x1(2个关节)
演示功能:
每10秒让指定关节在正负0.5弧度之间摆动
使用Ruckig库实现平滑的运动轨迹
实时发布关节控制命令
自定义使用
添加新的控制逻辑:
添加新的控制回调函数
自由组合关节运动
调整控制频率:
修改
control_timer_的周期(当前为2ms)
1#!/usr/bin/env python3
2"""
3Robot joint control example
4This script implements a ROS2-based robot joint control system, using the Ruckig trajectory
5planner to achieve smooth joint motion control.
6
7Main features:
81. Supports controlling multiple joint areas (head, arm, waist, leg)
92. Uses Ruckig for trajectory planning to ensure smooth motion
103. Supports real-time control of joint position, velocity, and acceleration
114. Provides joint limit and PID (stiffness/damping) parameter configuration
12"""
13
14import rclpy
15from rclpy.node import Node
16from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy, DurabilityPolicy
17from aimdk_msgs.msg import JointCommandArray, JointStateArray, JointCommand
18from std_msgs.msg import Header
19import ruckig
20from enum import Enum
21from dataclasses import dataclass
22from typing import List, Dict
23from threading import Lock
24import time
25
26# QoS config: define ROS2 Quality of Service parameters
27# Subscriber QoS: best-effort reliability, keep last 10 messages
28subscriber_qos = QoSProfile(
29 reliability=ReliabilityPolicy.BEST_EFFORT,
30 history=HistoryPolicy.KEEP_LAST,
31 depth=10,
32 durability=DurabilityPolicy.VOLATILE
33)
34
35# Publisher QoS: reliable transport, keep last 10 messages
36publisher_qos = QoSProfile(
37 reliability=ReliabilityPolicy.RELIABLE,
38 history=HistoryPolicy.KEEP_LAST,
39 depth=10,
40 durability=DurabilityPolicy.VOLATILE
41)
42
43
44class JointArea(Enum):
45 HEAD = 'HEAD' # Head joints
46 ARM = 'ARM' # Arm joints
47 WAIST = 'WAIST' # Waist joints
48 LEG = 'LEG' # Leg joints
49
50
51@dataclass
52class JointInfo:
53 # Joint information data class
54 name: str # Joint name
55 lower_limit: float # Joint lower angle limit
56 upper_limit: float # Joint upper angle limit
57 kp: float # Position control proportional gain
58 kd: float # Velocity control derivative gain
59
60
61# Robot model configuration: define all joint parameters
62robot_model: Dict[JointArea, List[JointInfo]] = {
63 # Leg joint configuration
64 JointArea.LEG: [
65 # Left leg joints
66 JointInfo("left_hip_pitch_joint", -2.4871,
67 2.4871, 40.0, 4.0), # left hip pitch
68 JointInfo("left_hip_roll_joint", -0.12217,
69 2.9059, 40.0, 4.0), # left hip roll
70 JointInfo("left_hip_yaw_joint", -1.6842,
71 3.4296, 30.0, 3.0), # left hip yaw
72 JointInfo("left_knee_joint", 0.026179,
73 2.1206, 80.0, 8.0), # left knee
74 JointInfo("left_ankle_pitch_joint", -0.80285,
75 0.45378, 40.0, 4.0), # left ankle pitch
76 JointInfo("left_ankle_roll_joint", -0.2618,
77 0.2618, 20.0, 2.0), # left ankle roll
78 # Right leg joints
79 JointInfo("right_hip_pitch_joint", -2.4871,
80 2.4871, 40.0, 4.0), # right hip pitch
81 JointInfo("right_hip_roll_joint", -2.9059,
82 0.12217, 40.0, 4.0), # right hip roll
83 JointInfo("right_hip_yaw_joint", -3.4296,
84 1.6842, 30.0, 3.0), # right hip yaw
85 JointInfo("right_knee_joint", 0.026179,
86 2.1206, 80.0, 8.0), # right knee
87 JointInfo("right_ankle_pitch_joint", -0.80285,
88 0.45378, 40.0, 4.0), # right ankle pitch
89 JointInfo("right_ankle_roll_joint", -0.2618,
90 0.2618, 20.0, 2.0), # right ankle roll
91 ],
92 # Waist joint configuration
93 JointArea.WAIST: [
94 JointInfo("waist_yaw_joint", -3.4296,
95 2.3824, 20.0, 4.0), # waist yaw
96 JointInfo("waist_pitch_joint", -0.17453,
97 0.17453, 20.0, 4.0), # waist pitch
98 JointInfo("waist_roll_joint", -0.48869,
99 0.48869, 20.0, 4.0), # waist roll
100 ],
101 # Arm joint configuration
102 JointArea.ARM: [
103 # Left arm
104 JointInfo("left_shoulder_pitch_joint", -
105 2.5569, 2.5569, 20.0, 2.0), # left shoulder pitch
106 JointInfo("left_shoulder_roll_joint", -
107 0.06108, 3.3598, 20.0, 2.0), # left shoulder roll
108 JointInfo("left_shoulder_yaw_joint", -
109 2.5569, 2.5569, 20.0, 2.0), # left shoulder yaw
110 JointInfo("left_elbow_pitch_joint", -2.4435,
111 0.0, 20.0, 2.0), # left elbow pitch
112 JointInfo("left_elbow_roll_joint", -2.5569,
113 2.5569, 20.0, 2.0), # left elbow roll
114 JointInfo("left_wrist_pitch_joint", -0.5585,
115 0.5585, 20.0, 2.0), # left wrist pitch
116 JointInfo("left_wrist_yaw_joint", -1.5446,
117 1.5446, 20.0, 2.0), # left wrist yaw
118 # Right arm
119 JointInfo("right_shoulder_pitch_joint", -
120 2.5569, 2.5569, 20.0, 2.0), # right shoulder pitch
121 JointInfo("right_shoulder_roll_joint", -
122 3.3598, 0.06108, 20.0, 2.0), # right shoulder roll
123 JointInfo("right_shoulder_yaw_joint", -
124 2.5569, 2.5569, 20.0, 2.0), # right shoulder yaw
125 JointInfo("right_elbow_pitch_joint", 0.0,
126 2.4435, 20.0, 2.0), # right elbow pitch
127 JointInfo("right_elbow_roll_joint", -2.5569,
128 2.5569, 20.0, 2.0), # right elbow roll
129 JointInfo("right_wrist_pitch_joint", -
130 0.5585, 0.5585, 20.0, 2.0), # right wrist pitch
131 JointInfo("right_wrist_yaw_joint", -1.5446,
132 1.5446, 20.0, 2.0), # right wrist yaw
133 ],
134 # Head joint configuration
135 JointArea.HEAD: [
136 JointInfo("head_pitch_joint", -0.61087,
137 0.61087, 20.0, 2.0), # head pitch
138 JointInfo("head_yaw_joint", -0.61087,
139 0.61087, 20.0, 2.0), # head yaw
140 ],
141}
142
143
144class JointControllerNode(Node):
145 """
146 Joint controller node
147 Responsible for receiving joint states, using Ruckig for trajectory planning,
148 and publishing joint commands.
149 """
150
151 def __init__(self, node_name: str, sub_topic: str, pub_topic: str, area: JointArea, dofs: int):
152 """
153 Initialize joint controller
154 Args:
155 node_name: node name
156 sub_topic: topic name to subscribe (joint states)
157 pub_topic: topic name to publish (joint commands)
158 area: joint area (head/arm/waist/leg)
159 dofs: number of DOFs
160 """
161 super().__init__(node_name)
162 self.lock = Lock()
163 self.joint_info = robot_model[area]
164 self.dofs = dofs
165 self.ruckig = ruckig.Ruckig(dofs, 0.002) # 2 ms control period
166 self.input = ruckig.InputParameter(dofs)
167 self.output = ruckig.OutputParameter(dofs)
168 self.ruckig_initialized = False
169
170 # Initialize trajectory parameters
171 self.input.current_position = [0.0] * dofs
172 self.input.current_velocity = [0.0] * dofs
173 self.input.current_acceleration = [0.0] * dofs
174
175 # Motion limits
176 self.input.max_velocity = [1.0] * dofs
177 self.input.max_acceleration = [1.0] * dofs
178 self.input.max_jerk = [25.0] * dofs
179
180 # ROS2 subscriber and publisher
181 self.sub = self.create_subscription(
182 JointStateArray,
183 sub_topic,
184 self.joint_state_callback,
185 subscriber_qos
186 )
187 self.pub = self.create_publisher(
188 JointCommandArray,
189 pub_topic,
190 publisher_qos
191 )
192
193 def joint_state_callback(self, msg: JointStateArray):
194 """
195 Joint state callback
196 Receives and processes joint state messages
197 """
198 self.ruckig_initialized = True
199
200 def control_callback(self, joint_idx):
201 """
202 Control callback
203 Uses Ruckig for trajectory planning and publishes control commands
204 Args:
205 joint_idx: target joint index
206 """
207 # Run Ruckig until the target is reached
208 while self.ruckig.update(self.input, self.output) in [ruckig.Result.Working, ruckig.Result.Finished]:
209 # Update current state
210 self.input.current_position = self.output.new_position
211 self.input.current_velocity = self.output.new_velocity
212 self.input.current_acceleration = self.output.new_acceleration
213
214 # Check if target is reached
215 tolerance = 1e-6
216 current_p = self.output.new_position[joint_idx]
217 if abs(current_p - self.input.target_position[joint_idx]) < tolerance:
218 break
219
220 # Create and publish command
221 cmd = JointCommandArray()
222 for i, joint in enumerate(self.joint_info):
223 j = JointCommand()
224 j.name = joint.name
225 j.position = self.output.new_position[i]
226 j.velocity = self.output.new_velocity[i]
227 j.effort = 0.0
228 j.stiffness = joint.kp
229 j.damping = joint.kd
230 cmd.joints.append(j)
231
232 self.pub.publish(cmd)
233
234 def set_target_position(self, joint_name, position):
235 """
236 Set target joint position
237 Args:
238 joint_name: joint name
239 position: target position
240 """
241 p_s = [0.0] * self.dofs
242 joint_idx = 0
243 for i, joint in enumerate(self.joint_info):
244 if joint.name == joint_name:
245 p_s[i] = position
246 joint_idx = i
247 self.input.target_position = p_s
248 self.input.target_velocity = [0.0] * self.dofs
249 self.input.target_acceleration = [0.0] * self.dofs
250 self.control_callback(joint_idx)
251
252
253def main(args=None):
254 """
255 Main function
256 Initialize ROS2 node and start joint controller
257 """
258 rclpy.init(args=args)
259
260 # Create leg controller node
261 leg_node = JointControllerNode(
262 "leg_node",
263 "/aima/hal/joint/leg/state",
264 "/aima/hal/joint/leg/command",
265 JointArea.LEG,
266 12
267 )
268
269 # waist_node = JointControllerNode(
270 # "waist_node",
271 # "/aima/hal/joint/waist/state",
272 # "/aima/hal/joint/waist/command",
273 # JointArea.WAIST,
274 # 3
275 # )
276
277 # arm_node = JointControllerNode(
278 # "arm_node",
279 # "/aima/hal/joint/arm/state",
280 # "/aima/hal/joint/arm/command",
281 # JointArea.ARM,
282 # 14
283 # )
284
285 # head_node = JointControllerNode(
286 # "head_node",
287 # "/aima/hal/joint/head/state",
288 # "/aima/hal/joint/head/command",
289 # JointArea.HEAD,
290 # 2
291 # )
292
293 position = 0.8
294
295 # Only control the left leg joint. If you want to control a specific joint, assign it directly.
296 def timer_callback():
297 """
298 Timer callback
299 Periodically change target position to achieve oscillating motion
300 """
301 nonlocal position
302 position = -position
303 position = 1.3 + position
304 leg_node.set_target_position("left_knee_joint", position)
305
306 # arm_node.set_target_position("left_shoulder_pitch_joint", position)
307 # waist_node.set_target_position("waist_yaw_joint", position)
308 # head_node.set_target_position("head_pitch_joint", position)
309
310 leg_node.create_timer(3.0, timer_callback)
311
312 # Multi-threaded executor
313 executor = rclpy.executors.MultiThreadedExecutor()
314 executor.add_node(leg_node)
315
316 # executor.add_node(waist_node)
317 # executor.add_node(arm_node)
318 # executor.add_node(head_node)
319
320 try:
321 executor.spin()
322 except KeyboardInterrupt:
323 pass
324 finally:
325 leg_node.destroy_node()
326 # waist_node.destroy_node()
327 # arm_node.destroy_node()
328 # head_node.destroy_node()
329 if rclpy.ok():
330 rclpy.shutdown()
331
332
333if __name__ == '__main__':
334 main()
6.1.10 键盘控制机器人
本示例实现了通过PC的键盘输入控制机器人的前进后退转弯的功能。
通过WASD控制机器人行走方向,增加/减少速度(±0.2 m/s),Q/E增加/减少角速度(±0.1 rad/s),ESC退出程序并释放终端资源,Space立即将速度归零,执行急停。
小心
注意:运行本示例前需要先使用手柄,将机器人切入稳定站立模式。(位控站立/走跑模式时按R2+X, 其他模式详见模式流转图进行操作),然后在机器人的终端界面使用:aima em stop-app rc关闭遥控器,防止通道占用。
实现键盘控制前需要注册输入源(该示例已实现注册输入源)
1#!/usr/bin/env python3
2
3import rclpy
4import rclpy.logging
5from rclpy.node import Node
6from aimdk_msgs.msg import McLocomotionVelocity, MessageHeader
7from aimdk_msgs.srv import SetMcInputSource
8import curses
9import time
10from functools import partial
11
12
13class KeyboardVelocityController(Node):
14 def __init__(self, stdscr):
15 super().__init__('keyboard_velocity_controller')
16 self.stdscr = stdscr
17 self.forward_velocity = 0.0
18 self.lateral_velocity = 0.0
19 self.angular_velocity = 0.0
20 self.step = 0.2
21 self.angular_step = 0.1
22
23 self.publisher = self.create_publisher(
24 McLocomotionVelocity, '/aima/mc/locomotion/velocity', 10)
25 self.client = self.create_client(
26 SetMcInputSource, '/aimdk_5Fmsgs/srv/SetMcInputSource')
27
28 if not self.register_input_source():
29 self.get_logger().error("Input source registration failed, exiting")
30 raise RuntimeError("Failed to register input source")
31
32 # Configure curses
33 curses.cbreak()
34 curses.noecho()
35 self.stdscr.keypad(True)
36 self.stdscr.nodelay(True)
37
38 self.get_logger().info(
39 "Control started: W/S forward/backward, A/D strafe, Q/E turn, Space stop, Esc exit")
40
41 # Timer: check keyboard every 50 ms
42 self.timer = self.create_timer(0.05, self.check_key_and_publish)
43
44 def register_input_source(self):
45 self.get_logger().info("Registering input source...")
46
47 timeout_sec = 8.0
48 start = self.get_clock().now().nanoseconds / 1e9
49
50 while not self.client.wait_for_service(timeout_sec=2.0):
51 now = self.get_clock().now().nanoseconds / 1e9
52 if now - start > timeout_sec:
53 self.get_logger().error("Waiting for service timed out")
54 return False
55 self.get_logger().info("Waiting for input source service...")
56
57 req = SetMcInputSource.Request()
58 req.action.value = 1001
59 req.input_source.name = "node"
60 req.input_source.priority = 40
61 req.input_source.timeout = 1000
62
63 for i in range(8):
64 req.request.header.stamp = self.get_clock().now().to_msg()
65 future = self.client.call_async(req)
66 rclpy.spin_until_future_complete(self, future, timeout_sec=0.25)
67
68 if future.done():
69 break
70
71 # retry as remote peer: mc module can be under heavy load
72 self.get_logger().info(f"trying to register input source... [{i}]")
73
74 if future.done():
75 try:
76 resp = future.result()
77 state = resp.response.state.value
78 self.get_logger().info(
79 f"Input source set successfully: state={state}, task_id={resp.response.task_id}")
80 return True
81 except Exception as e:
82 self.get_logger().error(f"Service exception: {str(e)}")
83 return False
84 else:
85 self.get_logger().error("Service call failed or timed out")
86 return False
87
88 def check_key_and_publish(self):
89 try:
90 ch = self.stdscr.getch()
91 except Exception:
92 ch = -1
93
94 if ch != -1:
95 if ch == ord(' '):
96 self.forward_velocity = 0.0
97 self.lateral_velocity = 0.0
98 self.angular_velocity = 0.0
99 elif ch == ord('w'):
100 self.forward_velocity = min(
101 self.forward_velocity + self.step, 1.0)
102 elif ch == ord('s'):
103 self.forward_velocity = max(
104 self.forward_velocity - self.step, -1.0)
105 elif ch == ord('a'):
106 self.lateral_velocity = min(
107 self.lateral_velocity + self.step, 1.0)
108 elif ch == ord('d'):
109 self.lateral_velocity = max(
110 self.lateral_velocity - self.step, -1.0)
111 elif ch == ord('q'):
112 self.angular_velocity = min(
113 self.angular_velocity + self.angular_step, 1.0)
114 elif ch == ord('e'):
115 self.angular_velocity = max(
116 self.angular_velocity - self.angular_step, -1.0)
117 elif ch == 27: # ESC
118 self.get_logger().info("Exiting control")
119 rclpy.shutdown()
120 return
121
122 msg = McLocomotionVelocity()
123 msg.header = MessageHeader()
124 msg.header.stamp = self.get_clock().now().to_msg()
125 msg.source = "node"
126 msg.forward_velocity = self.forward_velocity
127 msg.lateral_velocity = self.lateral_velocity
128 msg.angular_velocity = self.angular_velocity
129
130 self.publisher.publish(msg)
131
132 # Update UI
133 self.stdscr.clear()
134 self.stdscr.addstr(
135 0, 0, "W/S: Forward/Backward | A/D: Strafe | Q/E: Turn | Space: Stop | ESC: Exit")
136 self.stdscr.addstr(2, 0,
137 f"Speed Status: Forward: {self.forward_velocity:.2f} m/s | "
138 f"Lateral: {self.lateral_velocity:.2f} m/s | "
139 f"Angular: {self.angular_velocity:.2f} rad/s")
140 self.stdscr.refresh()
141
142
143def curses_main(stdscr):
144 rclpy.init()
145 try:
146 node = KeyboardVelocityController(stdscr)
147 rclpy.spin(node)
148 except Exception as e:
149 rclpy.logging.get_logger("main").fatal(
150 f"Program exited with exception: {e}")
151 finally:
152 curses.endwin()
153 rclpy.shutdown()
154
155
156def main():
157 curses.wrapper(curses_main)
158
159
160if __name__ == '__main__':
161 main()
6.1.11 拍照
该示例中用到了take_photo,在运行节点程序前,修改需要拍照的相机话题,启动节点程序后,会创建/images/目录,将当前帧图像保存在这个目录下。
1#!/usr/bin/env python3
2import time
3from pathlib import Path
4
5import rclpy
6from rclpy.node import Node
7from rclpy.qos import qos_profile_sensor_data
8from sensor_msgs.msg import Image
9from cv_bridge import CvBridge
10import cv2
11
12
13class SaveOneRawPy(Node):
14 def __init__(self):
15 super().__init__('save_one_image')
16
17 # parameter: image topic
18 self.declare_parameter(
19 'image_topic', '/aima/hal/sensor/stereo_head_front_left/rgb_image'
20 )
21 self.topic = self.get_parameter(
22 'image_topic').get_parameter_value().string_value
23
24 # save directory
25 self.save_dir = Path('images').resolve()
26 self.save_dir.mkdir(parents=True, exist_ok=True)
27
28 # state
29 self._saved = False
30 self._bridge = CvBridge()
31
32 # subscriber (sensor QoS)
33 self.sub = self.create_subscription(
34 Image,
35 self.topic,
36 self.image_cb,
37 qos_profile_sensor_data
38 )
39 self.get_logger().info(f'Subscribing to raw image: {self.topic}')
40 self.get_logger().info(f'Images will be saved to: {self.save_dir}')
41
42 def image_cb(self, msg: Image):
43 # already saved one, ignore later frames
44 if self._saved:
45 return
46
47 try:
48 enc = msg.encoding.lower()
49 self.get_logger().info(f'Received image with encoding: {enc}')
50
51 # convert from ROS Image to cv2
52 img = self._bridge.imgmsg_to_cv2(
53 msg, desired_encoding='passthrough')
54
55 # normalize to BGR for saving
56 if enc == 'rgb8':
57 img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
58 elif enc == 'mono8':
59 img = cv2.cvtColor(img, cv2.COLOR_GRAY2BGR)
60 # if it's bgr8 or other 8-bit bgr that cv2 can save, we just use it
61
62 ts_ms = int(time.time() * 1000)
63 out_path = self.save_dir / f'frame_{ts_ms}.png'
64
65 ok = cv2.imwrite(str(out_path), img)
66 if ok:
67 self.get_logger().info(
68 f'Saved image: {out_path} ({img.shape[1]}x{img.shape[0]})'
69 )
70 self._saved = True
71 # shut down once we got exactly one frame
72 # destroy node first, then shutdown rclpy
73 self.destroy_node()
74 if rclpy.ok():
75 rclpy.shutdown()
76 else:
77 self.get_logger().error(f'cv2.imwrite failed: {out_path}')
78 except Exception as e:
79 self.get_logger().error(f'Failed to decode / save image: {e}')
80
81
82def main():
83 rclpy.init()
84 node = SaveOneRawPy()
85 rclpy.spin(node)
86 # in case the node was already destroyed in the callback
87 if rclpy.ok():
88 try:
89 node.destroy_node()
90 except Exception:
91 pass
92 rclpy.shutdown()
93
94
95if __name__ == '__main__':
96 main()
6.1.12 相机推流示例集
该示例集提供了多种相机数据订阅和处理功能,支持深度相机、双目相机和单目相机的数据流订阅。
这些相机数据订阅 example 并没有实际的业务用途, 仅提供相机数据基础信息的打印;若您比较熟悉 ros2 使用,会发现 ros2 topic echo + ros2 topic hz 也能够实现 example 提供的功能。
您可以选择快速查阅 SDK 接口手册中话题列表直接快速进入自己模块的开发,也可以使用相机 example 作为脚手架加入自己的业务逻辑。
我们发布的传感器数据均为未经预处理(去畸变等)的原始数据,如果您需要查询传感器的详细信息(如分辨率、焦距等),请关注相应的内参(camera_info)话题。
深度相机数据订阅
该示例中用到了echo_camera_rgbd,通过订阅/aima/hal/sensor/rgbd_head_front/话题来接收机器人的深度相机数据,支持深度点云、深度图、RGB图、压缩RGB图和相机内参等多种数据类型。
功能特点:
支持多种数据类型订阅(深度点云、深度图、RGB图、压缩图、相机内参)
实时FPS统计和数据显示
支持RGB图视频录制功能
可配置的topic类型选择
支持的数据类型:
depth_pointcloud: 深度点云数据 (sensor_msgs/PointCloud2)depth_image: 深度图像 (sensor_msgs/Image)rgb_image: RGB图像 (sensor_msgs/Image)rgb_image_compressed: 压缩RGB图像 (sensor_msgs/CompressedImage)camera_info: 相机内参 (sensor_msgs/CameraInfo)
1#!/usr/bin/env python3
2"""
3Head depth camera multi-topic subscription example
4
5Supports selecting the topic type to subscribe via startup parameter --ros-args -p topic_type:=<type>:
6 - depth_pointcloud: Depth point cloud (sensor_msgs/PointCloud2)
7 - depth_image: Depth image (sensor_msgs/Image)
8 - depth_camera_info: Camera intrinsic parameters (sensor_msgs/CameraInfo)
9 - rgb_image: RGB image (sensor_msgs/Image)
10 - rgb_image_compressed: RGB compressed image (sensor_msgs/CompressedImage)
11 - rgb_camera_info: Camera intrinsic parameters (sensor_msgs/CameraInfo)
12
13Example:
14 ros2 run py_examples echo_camera_rgbd --ros-args -p topic_type:=depth_pointcloud
15 ros2 run py_examples echo_camera_rgbd --ros-args -p topic_type:=rgb_image
16 ros2 run py_examples echo_camera_rgbd --ros-args -p topic_type:=rgb_camera_info
17
18Default topic_type is rgb_image
19"""
20
21import rclpy
22from rclpy.node import Node
23from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy
24from sensor_msgs.msg import Image, CompressedImage, CameraInfo, PointCloud2
25from collections import deque
26import time
27
28
29class CameraTopicEcho(Node):
30 def __init__(self):
31 super().__init__('camera_topic_echo')
32
33 # Select the topic type to subscribe
34 self.declare_parameter('topic_type', 'rgb_image')
35 self.declare_parameter('dump_video_path', '')
36
37 self.topic_type = self.get_parameter('topic_type').value
38 self.dump_video_path = self.get_parameter('dump_video_path').value
39
40 # SensorDataQoS: BEST_EFFORT + VOLATILE
41 qos = QoSProfile(
42 reliability=QoSReliabilityPolicy.BEST_EFFORT,
43 history=QoSHistoryPolicy.KEEP_LAST,
44 depth=5
45 )
46
47 # Create different subscribers based on topic_type
48 if self.topic_type == "depth_pointcloud":
49 self.topic_name = "/aima/hal/sensor/rgbd_head_front/depth_pointcloud"
50 self.sub_pointcloud = self.create_subscription(
51 PointCloud2, self.topic_name, self.cb_pointcloud, qos)
52 self.get_logger().info(
53 f"✅ Subscribing PointCloud2: {self.topic_name}")
54
55 elif self.topic_type == "depth_image":
56 self.topic_name = "/aima/hal/sensor/rgbd_head_front/depth_image"
57 self.sub_image = self.create_subscription(
58 Image, self.topic_name, self.cb_image, qos)
59 self.get_logger().info(
60 f"✅ Subscribing Depth Image: {self.topic_name}")
61
62 elif self.topic_type == "rgb_image":
63 self.topic_name = "/aima/hal/sensor/rgbd_head_front/rgb_image"
64 self.sub_image = self.create_subscription(
65 Image, self.topic_name, self.cb_image, qos)
66 self.get_logger().info(
67 f"✅ Subscribing RGB Image: {self.topic_name}")
68 if self.dump_video_path:
69 self.get_logger().info(
70 f"📝 Will dump received images to video: {self.dump_video_path}")
71
72 elif self.topic_type == "rgb_image_compressed":
73 self.topic_name = "/aima/hal/sensor/rgbd_head_front/rgb_image/compressed"
74 self.sub_compressed = self.create_subscription(
75 CompressedImage, self.topic_name, self.cb_compressed, qos)
76 self.get_logger().info(
77 f"✅ Subscribing CompressedImage: {self.topic_name}")
78
79 elif self.topic_type == "rgb_camera_info":
80 self.topic_name = "/aima/hal/sensor/rgbd_head_front/rgb_camera_info"
81 # RGB-D CameraInfo is different with other cameras. The best_effort + volatile QoS is enough for 10Hz rgb_camera_info
82 self.sub_camerainfo = self.create_subscription(
83 CameraInfo, self.topic_name, self.cb_camerainfo, qos)
84 self.get_logger().info(
85 f"✅ Subscribing RGB CameraInfo: {self.topic_name}")
86
87 elif self.topic_type == "depth_camera_info":
88 self.topic_name = "/aima/hal/sensor/rgbd_head_front/depth_camera_info"
89 # RGB-D CameraInfo is different with other cameras. The best_effort + volatile QoS is enough for 10Hz depth_camera_info
90 self.sub_camerainfo = self.create_subscription(
91 CameraInfo, self.topic_name, self.cb_camerainfo, qos)
92 self.get_logger().info(
93 f"✅ Subscribing Depth CameraInfo: {self.topic_name}")
94
95 else:
96 self.get_logger().error(f"Unknown topic_type: {self.topic_type}")
97 raise ValueError("Unknown topic_type")
98
99 # Internal state
100 self.last_print = self.get_clock().now()
101 self.arrivals = deque()
102
103 def update_arrivals(self):
104 """Calculate received FPS"""
105 now = self.get_clock().now()
106 self.arrivals.append(now)
107 while self.arrivals and (now - self.arrivals[0]).nanoseconds * 1e-9 > 1.0:
108 self.arrivals.popleft()
109
110 def get_fps(self):
111 """Get FPS"""
112 return len(self.arrivals)
113
114 def should_print(self):
115 """Control print frequency"""
116 now = self.get_clock().now()
117 if (now - self.last_print).nanoseconds * 1e-9 >= 1.0:
118 self.last_print = now
119 return True
120 return False
121
122 def cb_pointcloud(self, msg: PointCloud2):
123 """PointCloud2 callback"""
124 self.update_arrivals()
125
126 if self.should_print():
127 stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
128
129 # Format fields information
130 fields_str = " ".join(
131 [f"{f.name}({f.datatype})" for f in msg.fields])
132
133 self.get_logger().info(
134 f"🌫️ PointCloud2 received\n"
135 f" • frame_id: {msg.header.frame_id}\n"
136 f" • stamp (sec): {stamp_sec:.6f}\n"
137 f" • width x height: {msg.width} x {msg.height}\n"
138 f" • point_step: {msg.point_step}\n"
139 f" • row_step: {msg.row_step}\n"
140 f" • fields: {fields_str}\n"
141 f" • is_bigendian: {msg.is_bigendian}\n"
142 f" • is_dense: {msg.is_dense}\n"
143 f" • data size: {len(msg.data)}\n"
144 f" • recv FPS (1s): {self.get_fps():.1f}"
145 )
146
147 def cb_image(self, msg: Image):
148 """Image callback (Depth/RGB image)"""
149 self.update_arrivals()
150
151 if self.should_print():
152 stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
153 self.get_logger().info(
154 f"📸 {self.topic_type} received\n"
155 f" • frame_id: {msg.header.frame_id}\n"
156 f" • stamp (sec): {stamp_sec:.6f}\n"
157 f" • encoding: {msg.encoding}\n"
158 f" • size (WxH): {msg.width} x {msg.height}\n"
159 f" • step (bytes/row):{msg.step}\n"
160 f" • is_bigendian: {msg.is_bigendian}\n"
161 f" • recv FPS (1s): {self.get_fps():.1f}"
162 )
163
164 # Only RGB image supports video dump
165 if self.topic_type == "rgb_image" and self.dump_video_path:
166 self.dump_image_to_video(msg)
167
168 def cb_compressed(self, msg: CompressedImage):
169 """CompressedImage callback"""
170 self.update_arrivals()
171
172 if self.should_print():
173 stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
174 self.get_logger().info(
175 f"🗜️ CompressedImage received\n"
176 f" • frame_id: {msg.header.frame_id}\n"
177 f" • stamp (sec): {stamp_sec:.6f}\n"
178 f" • format: {msg.format}\n"
179 f" • data size: {len(msg.data)}\n"
180 f" • recv FPS (1s): {self.get_fps():.1f}"
181 )
182
183 def cb_camerainfo(self, msg: CameraInfo):
184 """CameraInfo callback (camera intrinsic parameters)"""
185 # Camera info will only receive one frame, print it directly
186 stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
187
188 # Format D array
189 d_str = ", ".join([f"{d:.6f}" for d in msg.d])
190
191 # Format K matrix
192 k_str = ", ".join([f"{k:.6f}" for k in msg.k])
193
194 # Format P matrix
195 p_str = ", ".join([f"{p:.6f}" for p in msg.p])
196
197 self.get_logger().info(
198 f"📷 {self.topic_type} received\n"
199 f" • frame_id: {msg.header.frame_id}\n"
200 f" • stamp (sec): {stamp_sec:.6f}\n"
201 f" • width x height: {msg.width} x {msg.height}\n"
202 f" • distortion_model:{msg.distortion_model}\n"
203 f" • D: [{d_str}]\n"
204 f" • K: [{k_str}]\n"
205 f" • P: [{p_str}]\n"
206 f" • binning_x: {msg.binning_x}\n"
207 f" • binning_y: {msg.binning_y}\n"
208 f" • roi: {{ x_offset: {msg.roi.x_offset}, y_offset: {msg.roi.y_offset}, height: {msg.roi.height}, width: {msg.roi.width}, do_rectify: {msg.roi.do_rectify} }}"
209 )
210
211 def dump_image_to_video(self, msg: Image):
212 """Video dump is only supported for RGB images"""
213 # You can add video recording functionality here
214 # Simplified in the Python version, only logs instead
215 if self.should_print():
216 self.get_logger().info(f"📝 Video dump not implemented in Python version")
217
218
219def main(args=None):
220 rclpy.init(args=args)
221 try:
222 node = CameraTopicEcho()
223 rclpy.spin(node)
224 except KeyboardInterrupt:
225 pass
226 except Exception as e:
227 print(f"Error: {e}")
228 finally:
229 if 'node' in locals():
230 node.destroy_node()
231 rclpy.shutdown()
232
233
234if __name__ == '__main__':
235 main()
使用说明:
订阅深度点云数据:
ros2 run py_examples echo_camera_rgbd --ros-args -p topic_type:=depth_pointcloud
订阅RGB图像数据:
ros2 run py_examples echo_camera_rgbd --ros-args -p topic_type:=rgb_image
订阅相机内参:
ros2 run py_examples echo_camera_rgbd --ros-args -p topic_type:=rgb_camera_info ros2 run py_examples echo_camera_rgbd --ros-args -p topic_type:=depth_camera_info
录制RGB视频:
# dump_video_path的值可改为其他路径, 注意提前创建该文件所在目录才能保存 ros2 run py_examples echo_camera_rgbd --ros-args -p topic_type:=rgb_image -p dump_video_path:=$PWD/output.avi
双目相机数据订阅
该示例中用到了echo_camera_stereo,通过订阅/aima/hal/sensor/stereo_head_front_*/话题来接收机器人的双目相机数据,支持左右相机的RGB图、压缩图和相机内参数据。
功能特点:
支持左右相机独立数据订阅
实时FPS统计和数据显示
支持RGB图视频录制功能
可配置的相机选择(左/右)
支持的数据类型:
left_rgb_image: 左相机RGB图像 (sensor_msgs/Image)left_rgb_image_compressed: 左相机压缩RGB图像 (sensor_msgs/CompressedImage)left_camera_info: 左相机内参 (sensor_msgs/CameraInfo)right_rgb_image: 右相机RGB图像 (sensor_msgs/Image)right_rgb_image_compressed: 右相机压缩RGB图像 (sensor_msgs/CompressedImage)right_camera_info: 右相机内参 (sensor_msgs/CameraInfo)
1#!/usr/bin/env python3
2"""
3Head stereo camera multi-topic subscription example
4
5Supports selecting the topic type to subscribe via startup parameter --ros-args -p topic_type:=<type>:
6 - left_rgb_image: Left camera RGB image (sensor_msgs/Image)
7 - left_rgb_image_compressed: Left camera RGB compressed image (sensor_msgs/CompressedImage)
8 - left_camera_info: Left camera intrinsic parameters (sensor_msgs/CameraInfo)
9 - right_rgb_image: Right camera RGB image (sensor_msgs/Image)
10 - right_rgb_image_compressed: Right camera RGB compressed image (sensor_msgs/CompressedImage)
11 - right_camera_info: Right camera intrinsic parameters (sensor_msgs/CameraInfo)
12
13Example:
14 ros2 run py_examples echo_camera_stereo --ros-args -p topic_type:=left_rgb_image
15 ros2 run py_examples echo_camera_stereo --ros-args -p topic_type:=right_rgb_image
16 ros2 run py_examples echo_camera_stereo --ros-args -p topic_type:=left_camera_info
17
18Default topic_type is left_rgb_image
19"""
20
21import rclpy
22from rclpy.node import Node
23from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSDurabilityPolicy, QoSHistoryPolicy
24from sensor_msgs.msg import Image, CompressedImage, CameraInfo
25from collections import deque
26import time
27
28
29class StereoCameraTopicEcho(Node):
30 def __init__(self):
31 super().__init__('stereo_camera_topic_echo')
32
33 # Select the topic type to subscribe
34 self.declare_parameter('topic_type', 'left_rgb_image')
35 self.declare_parameter('dump_video_path', '')
36
37 self.topic_type = self.get_parameter('topic_type').value
38 self.dump_video_path = self.get_parameter('dump_video_path').value
39
40 # Set QoS parameters - use sensor data QoS
41 qos = QoSProfile(
42 reliability=QoSReliabilityPolicy.BEST_EFFORT,
43 history=QoSHistoryPolicy.KEEP_LAST,
44 depth=5,
45 durability=QoSDurabilityPolicy.VOLATILE
46 )
47
48 # Create different subscribers based on topic_type
49 if self.topic_type == "left_rgb_image":
50 self.topic_name = "/aima/hal/sensor/stereo_head_front_left/rgb_image"
51 self.sub_image = self.create_subscription(
52 Image, self.topic_name, self.cb_image, qos)
53 self.get_logger().info(
54 f"✅ Subscribing Left RGB Image: {self.topic_name}")
55 if self.dump_video_path:
56 self.get_logger().info(
57 f"📝 Will dump received images to video: {self.dump_video_path}")
58
59 elif self.topic_type == "left_rgb_image_compressed":
60 self.topic_name = "/aima/hal/sensor/stereo_head_front_left/rgb_image/compressed"
61 self.sub_compressed = self.create_subscription(
62 CompressedImage, self.topic_name, self.cb_compressed, qos)
63 self.get_logger().info(
64 f"✅ Subscribing Left CompressedImage: {self.topic_name}")
65
66 elif self.topic_type == "left_camera_info":
67 self.topic_name = "/aima/hal/sensor/stereo_head_front_left/camera_info"
68 # CameraInfo subscription must use reliable + transient_local QoS to receive historical messages (even if only one frame is published)
69 camera_qos = QoSProfile(
70 reliability=QoSReliabilityPolicy.RELIABLE,
71 history=QoSHistoryPolicy.KEEP_LAST,
72 depth=1,
73 durability=QoSDurabilityPolicy.TRANSIENT_LOCAL
74 )
75 self.sub_camerainfo = self.create_subscription(
76 CameraInfo, self.topic_name, self.cb_camerainfo, camera_qos)
77 self.get_logger().info(
78 f"✅ Subscribing Left CameraInfo (with transient_local): {self.topic_name}")
79
80 elif self.topic_type == "right_rgb_image":
81 self.topic_name = "/aima/hal/sensor/stereo_head_front_right/rgb_image"
82 self.sub_image = self.create_subscription(
83 Image, self.topic_name, self.cb_image, qos)
84 self.get_logger().info(
85 f"✅ Subscribing Right RGB Image: {self.topic_name}")
86 if self.dump_video_path:
87 self.get_logger().info(
88 f"📝 Will dump received images to video: {self.dump_video_path}")
89
90 elif self.topic_type == "right_rgb_image_compressed":
91 self.topic_name = "/aima/hal/sensor/stereo_head_front_right/rgb_image/compressed"
92 self.sub_compressed = self.create_subscription(
93 CompressedImage, self.topic_name, self.cb_compressed, qos)
94 self.get_logger().info(
95 f"✅ Subscribing Right CompressedImage: {self.topic_name}")
96
97 elif self.topic_type == "right_camera_info":
98 self.topic_name = "/aima/hal/sensor/stereo_head_front_right/camera_info"
99 # CameraInfo subscription must use reliable + transient_local QoS to receive historical messages (even if only one frame is published)
100 camera_qos = QoSProfile(
101 reliability=QoSReliabilityPolicy.RELIABLE,
102 history=QoSHistoryPolicy.KEEP_LAST,
103 depth=1,
104 durability=QoSDurabilityPolicy.TRANSIENT_LOCAL
105 )
106 self.sub_camerainfo = self.create_subscription(
107 CameraInfo, self.topic_name, self.cb_camerainfo, camera_qos)
108 self.get_logger().info(
109 f"✅ Subscribing Right CameraInfo (with transient_local): {self.topic_name}")
110
111 else:
112 self.get_logger().error(f"Unknown topic_type: {self.topic_type}")
113 raise ValueError("Unknown topic_type")
114
115 # Internal state
116 self.last_print = self.get_clock().now()
117 self.arrivals = deque()
118
119 def update_arrivals(self):
120 """Calculate received FPS"""
121 now = self.get_clock().now()
122 self.arrivals.append(now)
123 while self.arrivals and (now - self.arrivals[0]).nanoseconds * 1e-9 > 1.0:
124 self.arrivals.popleft()
125
126 def get_fps(self):
127 """Get FPS"""
128 return len(self.arrivals)
129
130 def should_print(self):
131 """Control print frequency"""
132 now = self.get_clock().now()
133 if (now - self.last_print).nanoseconds * 1e-9 >= 1.0:
134 self.last_print = now
135 return True
136 return False
137
138 def cb_image(self, msg: Image):
139 """Image callback (Left/Right camera RGB image)"""
140 self.update_arrivals()
141
142 if self.should_print():
143 stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
144 self.get_logger().info(
145 f"📸 {self.topic_type} received\n"
146 f" • frame_id: {msg.header.frame_id}\n"
147 f" • stamp (sec): {stamp_sec:.6f}\n"
148 f" • encoding: {msg.encoding}\n"
149 f" • size (WxH): {msg.width} x {msg.height}\n"
150 f" • step (bytes/row):{msg.step}\n"
151 f" • is_bigendian: {msg.is_bigendian}\n"
152 f" • recv FPS (1s): {self.get_fps():.1f}"
153 )
154
155 # Only RGB images support video dump
156 if (self.topic_type in ["left_rgb_image", "right_rgb_image"]) and self.dump_video_path:
157 self.dump_image_to_video(msg)
158
159 def cb_compressed(self, msg: CompressedImage):
160 """CompressedImage callback (Left/Right camera RGB compressed image)"""
161 self.update_arrivals()
162
163 if self.should_print():
164 stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
165 self.get_logger().info(
166 f"🗜️ {self.topic_type} received\n"
167 f" • frame_id: {msg.header.frame_id}\n"
168 f" • stamp (sec): {stamp_sec:.6f}\n"
169 f" • format: {msg.format}\n"
170 f" • data size: {len(msg.data)}\n"
171 f" • recv FPS (1s): {self.get_fps():.1f}"
172 )
173
174 def cb_camerainfo(self, msg: CameraInfo):
175 """CameraInfo callback (Left/Right camera intrinsic parameters)"""
176 # Camera info will only receive one frame, print it directly
177 stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
178
179 # Format D array
180 d_str = ", ".join([f"{d:.6f}" for d in msg.d])
181
182 # Format K matrix
183 k_str = ", ".join([f"{k:.6f}" for k in msg.k])
184
185 # Format P matrix
186 p_str = ", ".join([f"{p:.6f}" for p in msg.p])
187
188 self.get_logger().info(
189 f"📷 {self.topic_type} received\n"
190 f" • frame_id: {msg.header.frame_id}\n"
191 f" • stamp (sec): {stamp_sec:.6f}\n"
192 f" • width x height: {msg.width} x {msg.height}\n"
193 f" • distortion_model:{msg.distortion_model}\n"
194 f" • D: [{d_str}]\n"
195 f" • K: [{k_str}]\n"
196 f" • P: [{p_str}]\n"
197 f" • binning_x: {msg.binning_x}\n"
198 f" • binning_y: {msg.binning_y}\n"
199 f" • roi: {{ x_offset: {msg.roi.x_offset}, y_offset: {msg.roi.y_offset}, height: {msg.roi.height}, width: {msg.roi.width}, do_rectify: {msg.roi.do_rectify} }}"
200 )
201
202 def dump_image_to_video(self, msg: Image):
203 """Video dump is only supported for RGB images"""
204 # You can add video recording functionality here
205 # Simplified in the Python version, only logs instead
206 if self.should_print():
207 self.get_logger().info(f"📝 Video dump not implemented in Python version")
208
209
210def main(args=None):
211 rclpy.init(args=args)
212 try:
213 node = StereoCameraTopicEcho()
214 rclpy.spin(node)
215 except KeyboardInterrupt:
216 pass
217 except Exception as e:
218 print(f"Error: {e}")
219 finally:
220 if 'node' in locals():
221 node.destroy_node()
222 rclpy.shutdown()
223
224
225if __name__ == '__main__':
226 main()
使用说明:
订阅左相机RGB图像:
ros2 run py_examples echo_camera_stereo --ros-args -p topic_type:=left_rgb_image
订阅右相机RGB图像:
ros2 run py_examples echo_camera_stereo --ros-args -p topic_type:=right_rgb_image
订阅左相机内参:
ros2 run py_examples echo_camera_stereo --ros-args -p topic_type:=left_camera_info
录制左相机视频:
# dump_video_path的值可改为其他路径, 注意提前创建该文件所在目录才能保存 ros2 run py_examples echo_camera_stereo --ros-args -p topic_type:=left_rgb_image -p dump_video_path:=$PWD/left_camera.avi
头部后置单目相机数据订阅
该示例中用到了echo_camera_head_rear,通过订阅/aima/hal/sensor/rgb_head_rear/话题来接收机器人的头部后置单目相机数据,支持RGB图、压缩图和相机内参数据。
功能特点:
支持头部后置相机数据订阅
实时FPS统计和数据显示
支持RGB图视频录制功能
可配置的topic类型选择
支持的数据类型:
rgb_image: RGB图像 (sensor_msgs/Image)rgb_image_compressed: 压缩RGB图像 (sensor_msgs/CompressedImage)camera_info: 相机内参 (sensor_msgs/CameraInfo)
1#!/usr/bin/env python3
2"""
3Head rear monocular camera multi-topic subscription example
4
5Supports selecting the topic type to subscribe via startup parameter --ros-args -p topic_type:=<type>:
6 - rgb_image: RGB image (sensor_msgs/Image)
7 - rgb_image_compressed: RGB compressed image (sensor_msgs/CompressedImage)
8 - camera_info: Camera intrinsic parameters (sensor_msgs/CameraInfo)
9
10Example:
11 ros2 run py_examples echo_camera_head_rear --ros-args -p topic_type:=rgb_image
12 ros2 run py_examples echo_camera_head_rear --ros-args -p topic_type:=rgb_image_compressed
13 ros2 run py_examples echo_camera_head_rear --ros-args -p topic_type:=camera_info
14
15Default topic_type is rgb_image
16"""
17
18import rclpy
19from rclpy.node import Node
20from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSDurabilityPolicy, QoSHistoryPolicy
21from sensor_msgs.msg import Image, CompressedImage, CameraInfo
22from collections import deque
23import time
24
25
26class HeadRearCameraTopicEcho(Node):
27 def __init__(self):
28 super().__init__('head_rear_camera_topic_echo')
29
30 # Select the topic type to subscribe
31 self.declare_parameter('topic_type', 'rgb_image')
32 self.declare_parameter('dump_video_path', '')
33
34 self.topic_type = self.get_parameter('topic_type').value
35 self.dump_video_path = self.get_parameter('dump_video_path').value
36
37 # Set QoS parameters - use sensor data QoS
38 qos = QoSProfile(
39 reliability=QoSReliabilityPolicy.BEST_EFFORT,
40 history=QoSHistoryPolicy.KEEP_LAST,
41 depth=5,
42 durability=QoSDurabilityPolicy.VOLATILE
43 )
44
45 # Create different subscribers based on topic_type
46 if self.topic_type == "rgb_image":
47 self.topic_name = "/aima/hal/sensor/rgb_head_rear/rgb_image"
48 self.sub_image = self.create_subscription(
49 Image, self.topic_name, self.cb_image, qos)
50 self.get_logger().info(
51 f"✅ Subscribing RGB Image: {self.topic_name}")
52 if self.dump_video_path:
53 self.get_logger().info(
54 f"📝 Will dump received images to video: {self.dump_video_path}")
55
56 elif self.topic_type == "rgb_image_compressed":
57 self.topic_name = "/aima/hal/sensor/rgb_head_rear/rgb_image/compressed"
58 self.sub_compressed = self.create_subscription(
59 CompressedImage, self.topic_name, self.cb_compressed, qos)
60 self.get_logger().info(
61 f"✅ Subscribing CompressedImage: {self.topic_name}")
62
63 elif self.topic_type == "camera_info":
64 self.topic_name = "/aima/hal/sensor/rgb_head_rear/camera_info"
65 # CameraInfo subscription must use reliable + transient_local QoS to receive historical messages (even if only one frame is published)
66 camera_qos = QoSProfile(
67 reliability=QoSReliabilityPolicy.RELIABLE,
68 history=QoSHistoryPolicy.KEEP_LAST,
69 depth=1,
70 durability=QoSDurabilityPolicy.TRANSIENT_LOCAL
71 )
72 self.sub_camerainfo = self.create_subscription(
73 CameraInfo, self.topic_name, self.cb_camerainfo, camera_qos)
74 self.get_logger().info(
75 f"✅ Subscribing CameraInfo (with transient_local): {self.topic_name}")
76
77 else:
78 self.get_logger().error(f"Unknown topic_type: {self.topic_type}")
79 raise ValueError("Unknown topic_type")
80
81 # Internal state
82 self.last_print = self.get_clock().now()
83 self.arrivals = deque()
84
85 def update_arrivals(self):
86 """Calculate received FPS"""
87 now = self.get_clock().now()
88 self.arrivals.append(now)
89 while self.arrivals and (now - self.arrivals[0]).nanoseconds * 1e-9 > 1.0:
90 self.arrivals.popleft()
91
92 def get_fps(self):
93 """Get FPS"""
94 return len(self.arrivals)
95
96 def should_print(self):
97 """Control print frequency"""
98 now = self.get_clock().now()
99 if (now - self.last_print).nanoseconds * 1e-9 >= 1.0:
100 self.last_print = now
101 return True
102 return False
103
104 def cb_image(self, msg: Image):
105 """Image callback (RGB image)"""
106 self.update_arrivals()
107
108 if self.should_print():
109 stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
110 self.get_logger().info(
111 f"📸 {self.topic_type} received\n"
112 f" • frame_id: {msg.header.frame_id}\n"
113 f" • stamp (sec): {stamp_sec:.6f}\n"
114 f" • encoding: {msg.encoding}\n"
115 f" • size (WxH): {msg.width} x {msg.height}\n"
116 f" • step (bytes/row):{msg.step}\n"
117 f" • is_bigendian: {msg.is_bigendian}\n"
118 f" • recv FPS (1s): {self.get_fps():.1f}"
119 )
120
121 # Only RGB image supports video dump
122 if self.topic_type == "rgb_image" and self.dump_video_path:
123 self.dump_image_to_video(msg)
124
125 def cb_compressed(self, msg: CompressedImage):
126 """CompressedImage callback (RGB compressed image)"""
127 self.update_arrivals()
128
129 if self.should_print():
130 stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
131 self.get_logger().info(
132 f"🗜️ {self.topic_type} received\n"
133 f" • frame_id: {msg.header.frame_id}\n"
134 f" • stamp (sec): {stamp_sec:.6f}\n"
135 f" • format: {msg.format}\n"
136 f" • data size: {len(msg.data)}\n"
137 f" • recv FPS (1s): {self.get_fps():.1f}"
138 )
139
140 def cb_camerainfo(self, msg: CameraInfo):
141 """CameraInfo callback (camera intrinsic parameters)"""
142 # Camera info will only receive one frame, print it directly
143 stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
144
145 # Format D array
146 d_str = ", ".join([f"{d:.6f}" for d in msg.d])
147
148 # Format K matrix
149 k_str = ", ".join([f"{k:.6f}" for k in msg.k])
150
151 # Format P matrix
152 p_str = ", ".join([f"{p:.6f}" for p in msg.p])
153
154 self.get_logger().info(
155 f"📷 {self.topic_type} received\n"
156 f" • frame_id: {msg.header.frame_id}\n"
157 f" • stamp (sec): {stamp_sec:.6f}\n"
158 f" • width x height: {msg.width} x {msg.height}\n"
159 f" • distortion_model:{msg.distortion_model}\n"
160 f" • D: [{d_str}]\n"
161 f" • K: [{k_str}]\n"
162 f" • P: [{p_str}]\n"
163 f" • binning_x: {msg.binning_x}\n"
164 f" • binning_y: {msg.binning_y}\n"
165 f" • roi: {{ x_offset: {msg.roi.x_offset}, y_offset: {msg.roi.y_offset}, height: {msg.roi.height}, width: {msg.roi.width}, do_rectify: {msg.roi.do_rectify} }}"
166 )
167
168 def dump_image_to_video(self, msg: Image):
169 """Video dump is only supported for RGB images"""
170 # You can add video recording functionality here
171 # Simplified in the Python version, only logs instead
172 if self.should_print():
173 self.get_logger().info(f"📝 Video dump not implemented in Python version")
174
175
176def main(args=None):
177 rclpy.init(args=args)
178 try:
179 node = HeadRearCameraTopicEcho()
180 rclpy.spin(node)
181 except KeyboardInterrupt:
182 pass
183 except Exception as e:
184 print(f"Error: {e}")
185 finally:
186 if 'node' in locals():
187 node.destroy_node()
188 rclpy.shutdown()
189
190
191if __name__ == '__main__':
192 main()
使用说明:
订阅RGB图像数据:
ros2 run py_examples echo_camera_head_rear --ros-args -p topic_type:=rgb_image
订阅压缩图像数据:
ros2 run py_examples echo_camera_head_rear --ros-args -p topic_type:=rgb_image_compressed
订阅相机内参:
ros2 run py_examples echo_camera_head_rear --ros-args -p topic_type:=camera_info
录制视频:
# dump_video_path的值可改为其他路径, 注意提前创建该文件所在目录才能保存 ros2 run py_examples echo_camera_head_rear --ros-args -p topic_type:=rgb_image -p dump_video_path:=$PWD/rear_camera.avi
应用场景:
人脸识别和追踪
目标检测和识别
视觉SLAM
图像处理和计算机视觉算法开发
机器人视觉导航
6.1.13 激光雷达数据订阅
该示例中用到了echo_lidar_data,通过订阅/aima/hal/sensor/lidar_chest_front/话题来接收机器人的激光雷达数据,支持点云数据和IMU数据两种数据类型。
功能特点:
支持激光雷达点云数据订阅
支持激光雷达IMU数据订阅
实时FPS统计和数据显示
可配置的topic类型选择
详细的数据字段信息输出
支持的数据类型:
PointCloud2: 激光雷达点云数据 (sensor_msgs/PointCloud2)Imu: 激光雷达IMU数据 (sensor_msgs/Imu)
技术实现:
使用SensorDataQoS配置(
BEST_EFFORT+VOLATILE)支持点云字段信息解析和显示
支持IMU四元数、角速度和线性加速度数据
提供详细的调试日志输出
应用场景:
激光雷达数据采集和分析
点云数据处理和可视化
机器人导航和定位
SLAM算法开发
环境感知和建图
1#!/usr/bin/env python3
2"""
3Chest LiDAR data subscription example
4
5Supports subscribing to the following topics:
6 1. /aima/hal/sensor/lidar_chest_front/lidar_pointcloud
7 - Data type: sensor_msgs/PointCloud2
8 - frame_id: lidar_chest_front
9 - child_frame_id: /
10 - Content: LiDAR point cloud data
11 2. /aima/hal/sensor/lidar_chest_front/imu
12 - Data type: sensor_msgs/Imu
13 - frame_id: lidar_imu_chest_front
14 - Content: LiDAR IMU data
15
16You can select the topic type to subscribe via startup parameter --ros-args -p topic_type:=<type>:
17 - pointcloud: subscribe to LiDAR point cloud
18 - imu: subscribe to LiDAR IMU
19Default topic_type is pointcloud
20
21Examples:
22 ros2 run py_examples echo_lidar_data --ros-args -p topic_type:=pointcloud
23 ros2 run py_examples echo_lidar_data --ros-args -p topic_type:=imu
24"""
25
26import rclpy
27from rclpy.node import Node
28from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy
29from sensor_msgs.msg import PointCloud2, Imu
30from collections import deque
31import time
32
33
34class LidarChestEcho(Node):
35 def __init__(self):
36 super().__init__('lidar_chest_echo')
37
38 # Select the topic type to subscribe
39 self.declare_parameter('topic_type', 'pointcloud')
40 self.topic_type = self.get_parameter('topic_type').value
41
42 # SensorDataQoS: BEST_EFFORT + VOLATILE
43 qos = QoSProfile(
44 reliability=QoSReliabilityPolicy.BEST_EFFORT,
45 history=QoSHistoryPolicy.KEEP_LAST,
46 depth=5
47 )
48
49 # Create different subscribers based on topic_type
50 if self.topic_type == "pointcloud":
51 self.topic_name = "/aima/hal/sensor/lidar_chest_front/lidar_pointcloud"
52 self.sub_pointcloud = self.create_subscription(
53 PointCloud2, self.topic_name, self.cb_pointcloud, qos)
54 self.get_logger().info(
55 f"✅ Subscribing LIDAR PointCloud2: {self.topic_name}")
56
57 elif self.topic_type == "imu":
58 self.topic_name = "/aima/hal/sensor/lidar_chest_front/imu"
59 self.sub_imu = self.create_subscription(
60 Imu, self.topic_name, self.cb_imu, qos)
61 self.get_logger().info(
62 f"✅ Subscribing LIDAR IMU: {self.topic_name}")
63
64 else:
65 self.get_logger().error(f"Unknown topic_type: {self.topic_type}")
66 raise ValueError("Unknown topic_type")
67
68 # Internal state
69 self.last_print = self.get_clock().now()
70 self.arrivals = deque()
71
72 def update_arrivals(self):
73 """Calculate received FPS"""
74 now = self.get_clock().now()
75 self.arrivals.append(now)
76 while self.arrivals and (now - self.arrivals[0]).nanoseconds * 1e-9 > 1.0:
77 self.arrivals.popleft()
78
79 def get_fps(self):
80 """Get FPS"""
81 return len(self.arrivals)
82
83 def should_print(self):
84 """Control print frequency"""
85 now = self.get_clock().now()
86 if (now - self.last_print).nanoseconds * 1e-9 >= 1.0:
87 self.last_print = now
88 return True
89 return False
90
91 def cb_pointcloud(self, msg: PointCloud2):
92 """PointCloud2 callback (LiDAR point cloud)"""
93 self.update_arrivals()
94
95 if self.should_print():
96 stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
97
98 # Format fields info
99 fields_str = " ".join(
100 [f"{f.name}({f.datatype})" for f in msg.fields])
101
102 self.get_logger().info(
103 f"🟢 LIDAR PointCloud2 received\n"
104 f" • frame_id: {msg.header.frame_id}\n"
105 f" • stamp (sec): {stamp_sec:.6f}\n"
106 f" • width x height: {msg.width} x {msg.height}\n"
107 f" • point_step: {msg.point_step}\n"
108 f" • row_step: {msg.row_step}\n"
109 f" • fields: {fields_str}\n"
110 f" • is_bigendian: {msg.is_bigendian}\n"
111 f" • is_dense: {msg.is_dense}\n"
112 f" • data size: {len(msg.data)}\n"
113 f" • recv FPS (1s): {self.get_fps():1.1f}"
114 )
115
116 def cb_imu(self, msg: Imu):
117 """IMU callback (LiDAR IMU)"""
118 self.update_arrivals()
119
120 if self.should_print():
121 stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
122
123 self.get_logger().info(
124 f"🟢 LIDAR IMU received\n"
125 f" • frame_id: {msg.header.frame_id}\n"
126 f" • stamp (sec): {stamp_sec:.6f}\n"
127 f" • orientation: [{msg.orientation.x:.6f}, {msg.orientation.y:.6f}, {msg.orientation.z:.6f}, {msg.orientation.w:.6f}]\n"
128 f" • angular_velocity:[{msg.angular_velocity.x:.6f}, {msg.angular_velocity.y:.6f}, {msg.angular_velocity.z:.6f}]\n"
129 f" • linear_accel: [{msg.linear_acceleration.x:.6f}, {msg.linear_acceleration.y:.6f}, {msg.linear_acceleration.z:.6f}]\n"
130 f" • recv FPS (1s): {self.get_fps():.1f}"
131 )
132
133
134def main(args=None):
135 rclpy.init(args=args)
136 try:
137 node = LidarChestEcho()
138 rclpy.spin(node)
139 except KeyboardInterrupt:
140 pass
141 except Exception as e:
142 print(f"Error: {e}")
143 finally:
144 if 'node' in locals():
145 node.destroy_node()
146 rclpy.shutdown()
147
148
149if __name__ == '__main__':
150 main()
使用说明:
# 订阅激光雷达点云数据
ros2 run py_examples echo_lidar_data --ros-args -p topic_type:=pointcloud
# 订阅激光雷达IMU数据
ros2 run py_examples echo_lidar_data --ros-args -p topic_type:=imu
输出示例:
[INFO] [lidar_chest_echo]: ✅ Subscribing LIDAR PointCloud2: /aima/hal/sensor/lidar_chest_front/lidar_pointcloud
[INFO] [lidar_chest_echo]: 🟢 LIDAR PointCloud2 received
• frame_id: lidar_chest_front
• stamp (sec): 1234567890.123456
• width x height: 1 x 36000
• point_step: 16
• row_step: 16
• fields: x(7) y(7) z(7) intensity(7)
• is_bigendian: False
• is_dense: True
• data size: 576000
• recv FPS (1s): 10.0
6.1.14 播放视频
该示例中用到了play_video,在运行节点程序前,需要先将视频上传到机器人的**交互计算单元(PC3)**上(用户可在其上创建一个用来存储视频的目录),然后将节点程序中的video_path改为需要播放视频的路径。
注意
⚠️ 请注意!交互计算单元(PC3)独立于二开程序所在的开发计算单元(PC2), 音视频文件务必存入交互计算单元(IP: 10.0.1.42)。
功能说明
通过调用PlayVideo服务,可以让机器人在屏幕上播放指定路径的视频文件。请确保视频文件已上传到交互计算单元,否则播放会失败。
1#!/usr/bin/env python3
2
3import rclpy
4from rclpy.node import Node
5from aimdk_msgs.srv import PlayVideo
6import threading
7
8
9class PlayVideoClient(Node):
10 def __init__(self):
11 super().__init__('play_video_client')
12 self.client = self.create_client(
13 PlayVideo, '/face_ui_proxy/play_video')
14 self.finished = threading.Event() # used to exit main loop
15
16 def send_request(self, video_path, mode, priority):
17 # wait for service to be available
18 while not self.client.wait_for_service(timeout_sec=1.0) and rclpy.ok():
19 self.get_logger().warn('Waiting for PlayVideo service to start...')
20 if not rclpy.ok():
21 return
22
23 req = PlayVideo.Request()
24 req.header.header.stamp = self.get_clock().now().to_msg()
25 req.video_path = video_path
26 req.mode = mode
27 req.priority = priority
28
29 # async call
30 future = self.client.call_async(req)
31 future.add_done_callback(self.callback)
32
33 def callback(self, future):
34 try:
35 response = future.result()
36 if response.success:
37 self.get_logger().info(
38 f'✅ Video played successfully: {response.message}')
39 else:
40 self.get_logger().error(
41 f'❌ Video play failed: {response.message}')
42 except Exception as e:
43 self.get_logger().error(f'Service call error: {e}')
44 self.finished.set() # notify main loop to exit
45
46
47def main(args=None):
48 rclpy.init(args=args)
49 node = PlayVideoClient()
50
51 # video path and priority can be customized
52 video_path = "/agibot/data/home/agi/zhiyuan.mp4"
53 priority = 5
54 # input play mode
55 try:
56 mode = int(input("Enter video play mode (1: play once, 2: loop): "))
57 except Exception:
58 mode = 2
59
60 node.send_request(video_path, mode, priority)
61
62 # spin_once + event to wait for exit, supports Ctrl+C
63 try:
64 while rclpy.ok() and not node.finished.is_set():
65 rclpy.spin_once(node, timeout_sec=0.1)
66 except KeyboardInterrupt:
67 pass
68
69 node.destroy_node()
70 rclpy.shutdown()
71
72
73if __name__ == '__main__':
74 main()
6.1.15 媒体文件播放
该示例中用到了play_media,通过该节点可实现播放指定的媒体文件(如音频文件),支持WAV、MP3等格式的音频文件播放。
功能特点:
支持多种音频格式播放(WAV、MP3等)
支持优先级控制,可设置播放优先级
支持打断机制,可中断当前播放
支持自定义文件路径和播放参数
提供完整的错误处理和状态反馈
技术实现:
使用PlayMediaFile服务进行媒体文件播放
支持优先级级别设置(0-99)
支持中断控制(is_interrupted参数)
提供详细的播放状态反馈
兼容不同的响应字段格式
应用场景:
音频文件播放和媒体控制
语音提示和音效播放
多媒体应用开发
机器人交互音频反馈
注意
⚠️ 请注意!交互计算单元(PC3)独立于二开程序所在的开发计算单元(PC2), 音视频文件务必存入交互计算单元(IP: 10.0.1.42)。
1#!/usr/bin/env python3
2import sys
3import rclpy
4from rclpy.node import Node
5
6from aimdk_msgs.srv import PlayMediaFile
7from aimdk_msgs.msg import TtsPriorityLevel
8
9
10def main(args=None):
11 rclpy.init(args=args)
12 node = Node('play_media_file_client_min')
13
14 # 1) service name
15 service_name = '/aimdk_5Fmsgs/srv/PlayMediaFile'
16 client = node.create_client(PlayMediaFile, service_name)
17
18 # 2) input file path (if no arg, ask user)
19 default_file = '/agibot/data/var/interaction/tts_cache/normal/demo.wav'
20 if len(sys.argv) > 1:
21 file_name = sys.argv[1]
22 else:
23 file_name = input(
24 f'Enter media file path to play (default: {default_file}): ').strip()
25 if not file_name:
26 file_name = default_file
27
28 # 3) build request
29 req = PlayMediaFile.Request()
30 req.media_file_req.file_name = file_name
31 req.media_file_req.domain = 'demo_client' # required: caller domain
32 req.media_file_req.trace_id = 'demo' # optional
33 req.media_file_req.is_interrupted = True # interrupt same-priority
34 req.media_file_req.priority_weight = 0 # optional: 0~99
35 req.media_file_req.priority_level.value = TtsPriorityLevel.INTERACTION_L6
36
37 # 4) wait for service and call
38 node.get_logger().info(f'Waiting for service: {service_name}')
39 if not client.wait_for_service(timeout_sec=5.0):
40 node.get_logger().error(f'Service unavailable: {service_name}')
41 node.destroy_node()
42 rclpy.shutdown()
43 sys.exit(1)
44
45 future = client.call_async(req)
46 try:
47 rclpy.spin_until_future_complete(node, future, timeout_sec=10.0)
48 except KeyboardInterrupt:
49 node.get_logger().warn('Interrupted while waiting')
50 node.destroy_node()
51 rclpy.shutdown()
52 sys.exit(1)
53
54 if not future.done():
55 node.get_logger().error('Call timed out or not completed')
56 node.destroy_node()
57 rclpy.shutdown()
58 sys.exit(1)
59
60 # 5) handle response (success is in tts_resp)
61 try:
62 resp = future.result()
63 success = resp.tts_resp.is_success
64
65 if success:
66 node.get_logger().info(
67 f'✅ Media file play request succeeded: {file_name}')
68 else:
69 node.get_logger().error(
70 f'❌ Media file play request failed: {file_name}')
71 except Exception as e:
72 node.get_logger().error(f'Call exception: {e}')
73
74 node.destroy_node()
75 rclpy.shutdown()
76
77
78if __name__ == '__main__':
79 main()
使用说明:
# 播放默认音频文件
ros2 run py_examples play_media
# 播放指定音频文件
# 注意替换/path/to/your/audio_file.wav为交互板上实际文件路径
ros2 run py_examples play_media /path/to/your/audio_file.wav
# 播放TTS缓存文件
ros2 run py_examples play_media /agibot/data/var/interaction/tts_cache/normal/demo.wav
输出示例:
[INFO] [play_media_file_client_min]: 等待服务: /aimdk_5Fmsgs/srv/PlayMediaFile
[INFO] [play_media_file_client_min]: ✅ 媒体文件播放请求成功
注意事项:
确保音频文件路径正确且文件存在
支持的文件格式:WAV、MP3等
优先级设置影响播放队列顺序
打断功能可中断当前播放的音频
程序具有完善的异常处理机制
6.1.16 TTS (文字转语音)
该示例中用到了play_tts,通过该节点可实现语音播放输入的文字,用户可根据不同的场景输入相应的文本。
功能特点:
支持命令行参数和交互式输入
完整的服务可用性检查和错误处理
支持优先级控制和打断机制
提供详细的播放状态反馈
核心代码
1#!/usr/bin/env python3
2
3import sys
4import rclpy
5from rclpy.node import Node
6from rclpy.task import Future
7from aimdk_msgs.srv import PlayTts
8from aimdk_msgs.msg import TtsPriorityLevel
9
10
11class PlayTTSClient(Node):
12 def __init__(self):
13 super().__init__('play_tts_client_min')
14
15 # fill in the actual service name
16 self.service_name = '/aimdk_5Fmsgs/srv/PlayTts'
17 self.client = self.create_client(PlayTts, self.service_name)
18
19 def send_request(self, text):
20 # wait for service, 8s timeout
21 if not self.client.wait_for_service(timeout_sec=8.0):
22 self.get_logger().error(
23 f'Service unavailable: {self.service_name}')
24 return False
25
26 req = PlayTts.Request()
27 req.header.header.stamp = self.get_clock().now().to_msg()
28
29 req.tts_req.text = text
30 req.tts_req.domain = 'demo_client' # required: caller domain
31 req.tts_req.trace_id = 'demo' # optional: request id
32 req.tts_req.is_interrupted = True # required: interrupt same-priority
33 req.tts_req.priority_weight = 0
34 req.tts_req.priority_level.value = 6
35
36 for i in range(8):
37 req.header.header.stamp = self.get_clock().now().to_msg()
38 future = self.client.call_async(req)
39 rclpy.spin_until_future_complete(self, future, timeout_sec=0.25)
40
41 if future.done():
42 break
43
44 # retry if remote peer module can be under heavy load
45 self.get_logger().info(f"trying to send play tts request... [{i}]")
46
47 if not future.done():
48 self.get_logger().error('Call timed out')
49 return False
50
51 resp = future.result()
52 if resp is not None:
53 if resp.tts_resp.is_success:
54 self.get_logger().info('✅ TTS request succeeded')
55 return True
56 else:
57 self.get_logger().error('❌ TTS request failed')
58 return False
59 else:
60 self.get_logger().error('Call failed, future.result() returned None')
61 return False
62
63
64def main(args=None):
65 rclpy.init(args=args)
66 node = PlayTTSClient()
67
68 # get text to speak
69 if len(sys.argv) > 1:
70 text = sys.argv[1]
71 else:
72 text = input('Enter text to speak: ')
73 if not text:
74 text = 'Hello, I am AgiBot X2.'
75
76 success = node.send_request(text)
77 node.destroy_node()
78 rclpy.shutdown()
79
80 return 0 if success else 1
81
82
83if __name__ == '__main__':
84 exit(main())
使用说明
# 使用命令行参数播报文本(推荐)
ros2 run py_examples play_tts "你好,我是灵犀X2机器人"
# 或者不带参数运行,程序会提示用户输入
ros2 run py_examples play_tts
输出示例
[INFO] [play_tts_client_min]: ✅ 播报请求成功
注意事项
确保TTS服务正常运行
支持中文和英文文本播报
优先级设置影响播放队列顺序
打断功能可中断当前播放的语音
接口参考
服务:
/aimdk_5Fmsgs/srv/PlayTts消息:
aimdk_msgs/srv/PlayTts
6.1.17 麦克风数据接收
该示例中用到了mic_receiver,通过订阅/agent/process_audio_output话题来接收机器人的降噪音频数据,支持内置麦克风和外置麦克风两种音频流,并根据VAD(语音活动检测)状态自动保存完整的语音片段为PCM文件。
功能特点:
支持多音频流同时接收(内置麦克风 stream_id=1,外置麦克风 stream_id=2)
基于VAD状态自动检测语音开始、处理中、结束
自动保存完整语音片段为PCM格式文件
按时间戳和音频流分类存储
支持音频时长计算和统计信息输出
智能缓冲区管理,避免内存泄漏
完善的错误处理和异常管理
详细的调试日志输出
VAD状态说明:
0: 无语音1: 语音开始2: 语音处理中3: 语音结束
技术实现:
支持PCM格式音频文件保存(16kHz, 16位, 单声道)
提供详细的日志输出和状态监控
支持实时音频流处理和文件保存
应用场景:
语音识别和语音处理
音频数据采集和分析
实时语音监控
音频质量检测
多麦克风阵列数据处理
1#!/usr/bin/env python3
2"""
3Microphone data receiving example
4
5This example subscribes to the `/agent/process_audio_output` topic to receive the robot's
6noise-suppressed audio data. It supports both the built-in microphone and the external
7microphone audio streams, and automatically saves complete speech segments as PCM files
8based on the VAD (Voice Activity Detection) state.
9
10Features:
11- Supports receiving multiple audio streams at the same time (built-in mic stream_id=1, external mic stream_id=2)
12- Automatically detects speech start / in-progress / end based on VAD state
13- Automatically saves complete speech segments as PCM files
14- Stores files categorized by timestamp and audio stream
15- Supports audio duration calculation and logging
16
17VAD state description:
18- 0: No speech
19- 1: Speech start
20- 2: Speech in progress
21- 3: Speech end
22"""
23
24import rclpy
25from rclpy.node import Node
26from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy
27from aimdk_msgs.msg import ProcessedAudioOutput, AudioVadStateType
28import os
29import time
30from datetime import datetime
31from collections import defaultdict
32from typing import Dict, List
33
34
35class AudioSubscriber(Node):
36 def __init__(self):
37 super().__init__('audio_subscriber')
38
39 # Audio buffers, stored separately by stream_id
40 # stream_id -> buffer
41 self.audio_buffers: Dict[int, List[bytes]] = defaultdict(list)
42 self.recording_state: Dict[int, bool] = defaultdict(bool)
43
44 # Create audio output directory
45 self.audio_output_dir = "audio_recordings"
46 os.makedirs(self.audio_output_dir, exist_ok=True)
47
48 # VAD state name mapping
49 self.vad_state_names = {
50 0: "No speech",
51 1: "Speech start",
52 2: "Speech in progress",
53 3: "Speech end"
54 }
55
56 # Audio stream name mapping
57 self.stream_names = {
58 1: "Built-in microphone",
59 2: "External microphone"
60 }
61
62 # QoS settings
63 qos = QoSProfile(
64 history=QoSHistoryPolicy.KEEP_LAST,
65 depth=10,
66 reliability=QoSReliabilityPolicy.BEST_EFFORT
67 )
68
69 # Create subscriber
70 self.subscription = self.create_subscription(
71 ProcessedAudioOutput,
72 '/agent/process_audio_output',
73 self.audio_callback,
74 qos
75 )
76
77 self.get_logger().info("Start subscribing to noise-suppressed audio data...")
78
79 def audio_callback(self, msg: ProcessedAudioOutput):
80 """Audio data callback"""
81 try:
82 stream_id = msg.stream_id
83 vad_state = msg.audio_vad_state.value
84 audio_data = bytes(msg.audio_data)
85
86 self.get_logger().info(
87 f"Received audio data: stream_id={stream_id}, "
88 f"vad_state={vad_state}({self.vad_state_names.get(vad_state, 'Unknown state')}), "
89 f"audio_size={len(audio_data)} bytes"
90 )
91
92 self.handle_vad_state(stream_id, vad_state, audio_data)
93
94 except Exception as e:
95 self.get_logger().error(
96 f"Error while processing audio message: {str(e)}")
97
98 def handle_vad_state(self, stream_id: int, vad_state: int, audio_data: bytes):
99 """Handle VAD state changes"""
100 stream_name = self.stream_names.get(
101 stream_id, f"Unknown stream {stream_id}")
102 vad_name = self.vad_state_names.get(
103 vad_state, f"Unknown state {vad_state}")
104
105 self.get_logger().info(
106 f"[{stream_name}] VAD state: {vad_name} audio: {len(audio_data)} bytes"
107 )
108
109 # Speech start
110 if vad_state == 1:
111 self.get_logger().info("🎤 Speech start detected")
112 self.audio_buffers[stream_id].clear()
113 self.recording_state[stream_id] = True
114 if audio_data:
115 self.audio_buffers[stream_id].append(audio_data)
116
117 # Speech in progress
118 elif vad_state == 2:
119 self.get_logger().info("🔄 Speech in progress...")
120 if self.recording_state[stream_id] and audio_data:
121 self.audio_buffers[stream_id].append(audio_data)
122
123 # Speech end
124 elif vad_state == 3:
125 self.get_logger().info("✅ Speech end")
126 if self.recording_state[stream_id] and audio_data:
127 self.audio_buffers[stream_id].append(audio_data)
128
129 if self.recording_state[stream_id] and self.audio_buffers[stream_id]:
130 self.save_audio_segment(stream_id)
131 self.recording_state[stream_id] = False
132
133 # No speech
134 elif vad_state == 0:
135 if self.recording_state[stream_id]:
136 self.get_logger().info("⏹️ Reset recording state")
137 self.recording_state[stream_id] = False
138
139 # Print current buffer status
140 buffer_size = sum(len(chunk)
141 for chunk in self.audio_buffers[stream_id])
142 recording = self.recording_state[stream_id]
143 self.get_logger().debug(
144 f"[Stream {stream_id}] Buffer size: {buffer_size} bytes, recording: {recording}"
145 )
146
147 def save_audio_segment(self, stream_id: int):
148 """Save audio segment"""
149 if not self.audio_buffers[stream_id]:
150 return
151
152 # Merge all audio data
153 audio_data = b''.join(self.audio_buffers[stream_id])
154
155 # Get current timestamp
156 now = datetime.now()
157 timestamp = now.strftime("%Y%m%d_%H%M%S_%f")[:-3] # to milliseconds
158
159 # Create subdirectory by stream_id
160 stream_dir = os.path.join(self.audio_output_dir, f"stream_{stream_id}")
161 os.makedirs(stream_dir, exist_ok=True)
162
163 # Generate filename
164 stream_name = "internal_mic" if stream_id == 1 else "external_mic" if stream_id == 2 else f"stream_{stream_id}"
165 filename = f"{stream_name}_{timestamp}.pcm"
166 filepath = os.path.join(stream_dir, filename)
167
168 try:
169 # Save PCM file
170 with open(filepath, 'wb') as f:
171 f.write(audio_data)
172
173 self.get_logger().info(
174 f"Audio segment saved: {filepath} (size: {len(audio_data)} bytes)")
175
176 # Calculate audio duration (assuming 16 kHz, 16-bit, mono)
177 sample_rate = 16000
178 bits_per_sample = 16
179 channels = 1
180 bytes_per_sample = bits_per_sample // 8
181 total_samples = len(audio_data) // (bytes_per_sample * channels)
182 duration_seconds = total_samples / sample_rate
183
184 self.get_logger().info(
185 f"Audio duration: {duration_seconds:.2f} s ({total_samples} samples)")
186
187 except Exception as e:
188 self.get_logger().error(f"Failed to save audio file: {str(e)}")
189
190
191def main(args=None):
192 rclpy.init(args=args)
193 node = AudioSubscriber()
194
195 try:
196 node.get_logger().info("Listening to noise-suppressed audio data, press Ctrl+C to exit...")
197 rclpy.spin(node)
198 except KeyboardInterrupt:
199 node.get_logger().info("Interrupt signal received, exiting...")
200 finally:
201 node.destroy_node()
202 rclpy.shutdown()
203
204
205if __name__ == '__main__':
206 main()
使用说明:
运行程序:
# 构建Python包 colcon build --packages-select py_examples # 运行麦克风接收程序 ros2 run py_examples mic_receiver
目录结构:
运行节点后会自动创建
audio_recordings目录音频文件按stream_id分类存储:
stream_1/: 内置麦克风音频stream_2/: 外置麦克风音频
文件命名格式:
{stream_name}_{timestamp}.pcminternal_mic_20250909_133649_738.pcm(内置麦克风)external_mic_20250909_133650_123.pcm(外置麦克风)
音频格式:16kHz, 16位, 单声道PCM
播放保存的PCM文件:
# 播放内置麦克风录音 aplay -r 16000 -f S16_LE -c 1 audio_recordings/stream_1/internal_mic_20250909_133649_738.pcm # 播放外置麦克风录音 aplay -r 16000 -f S16_LE -c 1 audio_recordings/stream_2/external_mic_20250909_133650_123.pcm
转换为WAV格式(可选):
# 使用ffmpeg转换为WAV格式 ffmpeg -f s16le -ar 16000 -ac 1 -i external_mic_20250909_133649_738.pcm output.wav
程序控制:
按 Ctrl+C 安全退出程序
程序会自动处理音频流的开始和结束
支持同时处理多个音频流
输出示例:
正常启动和运行:
[INFO] 开始订阅降噪音频数据...
[INFO] 收到音频数据: stream_id=1, vad_state=0(无语音), audio_size=0 bytes
[INFO] [内置麦克风] VAD状态: 无语音 音频数据: 0 bytes
[INFO] 收到音频数据: stream_id=2, vad_state=0(无语音), audio_size=0 bytes
[INFO] [外置麦克风] VAD状态: 无语音 音频数据: 0 bytes
检测到语音开始:
[INFO] 收到音频数据: stream_id=2, vad_state=1(语音开始), audio_size=320 bytes
[INFO] [外置麦克风] VAD状态: 语音开始 音频数据: 320 bytes
[INFO] 🎤 检测到语音开始
语音处理过程:
[INFO] 收到音频数据: stream_id=2, vad_state=2(语音处理中), audio_size=320 bytes
[INFO] [外置麦克风] VAD状态: 语音处理中 音频数据: 320 bytes
[INFO] 🔄 语音处理中...
[INFO] 收到音频数据: stream_id=2, vad_state=2(语音处理中), audio_size=320 bytes
[INFO] [外置麦克风] VAD状态: 语音处理中 音频数据: 320 bytes
[INFO] 🔄 语音处理中...
语音结束和保存:
[INFO] 收到音频数据: stream_id=2, vad_state=3(语音结束), audio_size=320 bytes
[INFO] [外置麦克风] VAD状态: 语音结束 音频数据: 320 bytes
[INFO] ✅ 语音结束
[INFO] 音频段已保存: audio_recordings/stream_2/external_mic_20250909_133649_738.pcm (大小: 960 bytes)
[INFO] 音频时长: 0.06 秒 (480 样本)
多音频流同时处理:
[INFO] 收到音频数据: stream_id=1, vad_state=1(语音开始), audio_size=320 bytes
[INFO] [内置麦克风] VAD状态: 语音开始 音频数据: 320 bytes
[INFO] 🎤 检测到语音开始
[INFO] 收到音频数据: stream_id=2, vad_state=1(语音开始), audio_size=320 bytes
[INFO] [外置麦克风] VAD状态: 语音开始 音频数据: 320 bytes
[INFO] 🎤 检测到语音开始
程序退出:
^C[INFO] 接收到中断信号,正在退出...
[INFO] 程序已安全退出
注意事项:
程序支持同时处理多个音频流(内置和外置麦克风)
每个音频流都有独立的缓冲区和录音状态
音频文件会自动按时间戳和音频流分类保存
程序具有完善的错误处理机制,确保稳定运行
6.1.18 表情控制
该示例中用到了play_emoji,通过该节点可实现表达指定的表情,用户可根据已有的表情列表来选择要变化的表情,具体表情列表可参考表情列表
1#!/usr/bin/env python3
2import rclpy
3from rclpy.node import Node
4from aimdk_msgs.srv import PlayEmoji
5import threading
6import time
7
8
9class PlayEmojiClient(Node):
10 def __init__(self):
11 super().__init__('play_emoji_client')
12 self.client = self.create_client(
13 PlayEmoji, '/face_ui_proxy/play_emoji')
14 self.finished = threading.Event()
15 self.get_logger().info('PlayEmoji client node started.')
16
17 def send_request(self, emoji: int, mode: int, priority: int):
18 # Wait for service to be available
19 while not self.client.wait_for_service(timeout_sec=2.0) and rclpy.ok():
20 self.get_logger().warn('Waiting for PlayEmoji service...')
21
22 if not rclpy.ok():
23 return
24
25 req = PlayEmoji.Request()
26 # Equivalent to C++: request->header.header.stamp = this->now();
27 req.header.header.stamp = self.get_clock().now().to_msg()
28 req.emotion_id = int(emoji)
29 req.mode = int(mode)
30 req.priority = int(priority)
31
32 future = self.client.call_async(req)
33 future.add_done_callback(self._on_response)
34
35 def _on_response(self, future):
36 try:
37 resp = future.result()
38 if resp.success:
39 self.get_logger().info(
40 f'✅ Emoji played successfully: {resp.message}')
41 else:
42 self.get_logger().error(f'❌ Emoji play failed: {resp.message}')
43 except Exception as e:
44 self.get_logger().error(f'Call failed: {e}')
45 finally:
46 self.finished.set()
47
48
49def main(args=None):
50 rclpy.init(args=args)
51 node = PlayEmojiClient()
52
53 # Interactive input, same as the original C++ version
54 try:
55 emotion = int(
56 input("Enter emoji ID: 1-blink, 60-bored, 70-abnormal, 80-sleeping, 90-happy ... 190-double angry, 200-adore: "))
57 except Exception:
58 emotion = 1
59 try:
60 mode = int(input("Enter play mode (1: play once, 2: loop): "))
61 except Exception:
62 mode = 1
63 priority = 10 # default priority
64
65 node.send_request(emotion, mode, priority)
66
67 # Polling-style spin, supports immediate exit with Ctrl+C
68 try:
69 while rclpy.ok() and not node.finished.is_set():
70 rclpy.spin_once(node, timeout_sec=0.1)
71 time.sleep(0.05)
72 except KeyboardInterrupt:
73 pass
74
75 node.destroy_node()
76 rclpy.shutdown()
77
78
79if __name__ == '__main__':
80 main()
6.1.19 LED灯带控制
功能说明:演示如何控制机器人的LED灯带,支持多种显示模式和自定义颜色。
核心代码:
1#!/usr/bin/env python3
2
3import rclpy
4import rclpy.logging
5from rclpy.node import Node
6import signal
7import sys
8
9from aimdk_msgs.msg import CommonRequest
10from aimdk_msgs.srv import LedStripCommand
11
12client_node = None # for shutting down cleanly in signal handler
13
14
15def signal_handler(signum, frame):
16 if client_node is not None:
17 client_node.get_logger().info(
18 f"Received signal {signum}, shutting down...")
19 if rclpy.ok():
20 rclpy.shutdown()
21
22
23class PlayLightsClient(Node):
24 def __init__(self):
25 super().__init__('play_lights_client')
26
27 # create service client
28 self.client = self.create_client(
29 LedStripCommand, '/aimdk_5Fmsgs/srv/LedStripCommand')
30
31 self.get_logger().info('PlayLights client node started.')
32
33 timeout_sec = 8.0
34 start = self.get_clock().now().nanoseconds / 1e9
35
36 while not self.client.wait_for_service(timeout_sec=2.0):
37 now = self.get_clock().now().nanoseconds / 1e9
38 if now - start > timeout_sec:
39 self.get_logger().error('❌ Timed out waiting for service.')
40 raise RuntimeError('Service unavailable')
41 self.get_logger().info('Waiting for LedStripCommand service...')
42
43 self.get_logger().info('🟢 Service available, ready to send request.')
44
45 def send_request(self, led_mode, r, g, b):
46 """Send LED control request"""
47 try:
48 # create request
49 request = LedStripCommand.Request()
50 request.led_strip_mode = led_mode
51 request.r = r
52 request.g = g
53 request.b = b
54
55 # send request
56 # Note: LED strip is slow to response (up to ~5s)
57 for i in range(4):
58 request.request.header.stamp = self.get_clock().now().to_msg()
59 future = self.client.call_async(request)
60 rclpy.spin_until_future_complete(
61 self, future, timeout_sec=5)
62
63 if future.done():
64 break
65
66 # retry as remote peer can be under heavy load
67 self.get_logger().info(f"trying to control led strip... [{i}]")
68
69 if not future.done():
70 self.get_logger().error('❌ Service call failed or timed out.')
71 return False
72
73 response = future.result()
74
75 if response.status_code == 0:
76 self.get_logger().info("✅ LED strip command sent successfully")
77 return True
78 else:
79 self.get_logger().error(
80 f"❌ LED strip command failed with status: {response.status_code}")
81 return False
82
83 except Exception as e:
84 self.get_logger().error(f"❌ Exception while calling service: {e}")
85 return False
86
87
88def main(args=None):
89 global client_node
90 rclpy.init(args=args)
91
92 # register signal handlers early
93 signal.signal(signal.SIGINT, signal_handler)
94 signal.signal(signal.SIGTERM, signal_handler)
95
96 try:
97 # get command line args
98 if len(sys.argv) > 4:
99 # use CLI args
100 led_mode = int(sys.argv[1])
101 r = int(sys.argv[2])
102 g = int(sys.argv[3])
103 b = int(sys.argv[4])
104 rclpy.logging.get_logger('main').info(
105 f'Using CLI args: mode={led_mode}, RGB=({r},{g},{b})')
106 else:
107 # interactive input
108 print("=== LED strip control example ===")
109 print("Select LED strip mode:")
110 print("0 - Steady on")
111 print("1 - Breathing (4s cycle, sine brightness)")
112 print("2 - Blinking (1s cycle, 0.5s on, 0.5s off)")
113 print("3 - Flowing (2s cycle, light up from left to right)")
114
115 led_mode = int(input("Enter mode (0-3): "))
116
117 print("\nSet RGB color values (0-255):")
118 r = int(input("Red (R): "))
119 g = int(input("Green (G): "))
120 b = int(input("Blue (B): "))
121
122 # clamp input
123 led_mode = max(0, min(3, led_mode))
124 r = max(0, min(255, r))
125 g = max(0, min(255, g))
126 b = max(0, min(255, b))
127
128 client_node = PlayLightsClient()
129
130 client_node.get_logger().info(f"Sending LED control command...")
131 client_node.get_logger().info(
132 f"Mode: {led_mode}, Color: RGB({r}, {g}, {b})")
133
134 # send request
135 ok = client_node.send_request(led_mode, r, g, b)
136 if not ok:
137 client_node.get_logger().error('LED strip control request failed.')
138
139 except Exception as e:
140 rclpy.logging.get_logger('main').error(f'Error occurred: {e}')
141 finally:
142 if rclpy.ok():
143 rclpy.shutdown()
144
145
146if __name__ == '__main__':
147 main()
使用说明:
# 构建
colcon build --packages-select py_examples
# 交互式运行
ros2 run py_examples play_lights
# 命令行参数运行
ros2 run py_examples play_lights 1 255 0 0 # 模式1,红色
输出示例:
=== LED灯带控制示例 ===
请选择灯带模式:
0 - 常亮模式
1 - 呼吸模式 (4s周期,亮度正弦变化)
2 - 闪烁模式 (1s周期,0.5s亮,0.5s灭)
3 - 流水模式 (2s周期,从左到右依次点亮)
请输入模式 (0-3): 1
请设置RGB颜色值 (0-255):
红色分量 (R): 255
绿色分量 (G): 0
蓝色分量 (B): 0
发送LED控制命令...
模式: 1, 颜色: RGB(255, 0, 0)
✅ LED strip command sent successfully
技术特点:
支持4种LED显示模式
RGB颜色自定义
同步服务调用
命令行参数支持
输入参数验证
友好的用户交互界面