6.1 Python接口使用示例

本章节将带您实现索引所示若干功能

构建 & 运行说明

  • 进入到 aimdk 目录,执行以下命令

    source /opt/ros/humble/setup.bash
    colcon build
    source install/setup.bash
    ros2 run py_examples '对应功能名称如: get_mc_action'
    

📝 代码说明

完整的代码实现包含完整的错误处理、信号处理、超时处理等机制,确保程序的健壮性。请您在 py_examples 目录下查看/修改

小心

ROS的服务(service)机制在跨板通信时存在一些待优化问题, 二次开发时请参考例程添加异常处理、快速重试等保护机制

6.1.1 获取机器人模式

通过调用GetMcAction服务获取机器人当前的运行模式,包括名称和状态信息。

运动模式定义

 1#!/usr/bin/env python3
 2
 3import rclpy
 4import rclpy.logging
 5from rclpy.node import Node
 6
 7from aimdk_msgs.srv import GetMcAction
 8from aimdk_msgs.msg import CommonRequest
 9
10
11class GetMcActionClient(Node):
12    def __init__(self):
13        super().__init__('get_mc_action_client')
14        self.client = self.create_client(
15            GetMcAction, '/aimdk_5Fmsgs/srv/GetMcAction')
16        self.get_logger().info('✅ GetMcAction client node created.')
17
18        # Wait for the service to become available
19        while not self.client.wait_for_service(timeout_sec=2.0):
20            self.get_logger().info('⏳ Service unavailable, waiting...')
21
22        self.get_logger().info('🟢 Service available, ready to send request.')
23
24    def send_request(self):
25        request = GetMcAction.Request()
26        request.request = CommonRequest()
27
28        self.get_logger().info('📨 Sending request to get robot mode')
29        for i in range(8):
30            request.request.header.stamp = self.get_clock().now().to_msg()
31            future = self.client.call_async(request)
32            rclpy.spin_until_future_complete(self, future, timeout_sec=0.25)
33
34            if future.done():
35                break
36
37            # retry as remote peer is NOT handled well by ROS
38            self.get_logger().info(f'trying ... [{i}]')
39
40        response = future.result()
41        if response is None:
42            self.get_logger().error('❌ Service call failed or timed out.')
43            return
44
45        self.get_logger().info('✅ Robot mode get successfully.')
46        self.get_logger().info(f'Mode name: {response.info.action_desc}')
47        self.get_logger().info(f'Mode status: {response.info.status.value}')
48
49
50def main(args=None):
51    rclpy.init(args=args)
52    node = None
53    try:
54        node = GetMcActionClient()
55        node.send_request()
56    except KeyboardInterrupt:
57        pass
58    except Exception as e:
59        rclpy.logging.get_logger('main').error(
60            f'Program exited with exception: {e}')
61
62    if node:
63        node.destroy_node()
64    if rclpy.ok():
65        rclpy.shutdown()
66
67
68if __name__ == '__main__':
69    main()

6.1.2 设置机器人模式

该示例中用到了SetMcAction服务,运行节点程序后终端输入模式对应的字段值,机器人会尝试进入相应的运动模式
切入稳定站立(STAND_DEFAULT)模式前,确保机器人已立起且双脚着地。
模式切换需遵循模式转换图, 跨模式转换不能成功
走跑模式(LOCOMOTION_DEFAULT)和稳定站立模式一体化自动切换,按模式转换图流转到两者中较近的即可

  1#!/usr/bin/env python3
  2
  3import sys
  4import rclpy
  5import rclpy.logging
  6from rclpy.node import Node
  7
  8from aimdk_msgs.srv import SetMcAction
  9from aimdk_msgs.msg import RequestHeader, CommonState, McAction, McActionCommand
 10
 11
 12class SetMcActionClient(Node):
 13    def __init__(self):
 14        super().__init__('set_mc_action_client')
 15        self.client = self.create_client(
 16            SetMcAction, '/aimdk_5Fmsgs/srv/SetMcAction'
 17        )
 18        self.get_logger().info('✅ SetMcAction client node created.')
 19
 20        # Wait for the service to become available
 21        while not self.client.wait_for_service(timeout_sec=2.0):
 22            self.get_logger().info('⏳ Service unavailable, waiting...')
 23
 24        self.get_logger().info('🟢 Service available, ready to send request.')
 25
 26    def send_request(self, action_name: str):
 27        req = SetMcAction.Request()
 28        req.header = RequestHeader()
 29
 30        cmd = McActionCommand()
 31        cmd.action_desc = action_name
 32        req.command = cmd
 33
 34        self.get_logger().info(
 35            f'📨 Sending request to set robot mode: {action_name}')
 36        for i in range(8):
 37            req.header.stamp = self.get_clock().now().to_msg()
 38            future = self.client.call_async(req)
 39            rclpy.spin_until_future_complete(self, future, timeout_sec=0.25)
 40
 41            if future.done():
 42                break
 43
 44            # retry as remote peer is NOT handled well by ROS
 45            self.get_logger().info(f'trying ... [{i}]')
 46
 47        response = future.result()
 48        if response is None:
 49            self.get_logger().error('❌ Service call failed or timed out.')
 50            return
 51
 52        if response.response.status.value == CommonState.SUCCESS:
 53            self.get_logger().info('✅ Robot mode set successfully.')
 54        else:
 55            self.get_logger().error(
 56                f'❌ Failed to set robot mode: {response.response.message}'
 57            )
 58
 59
 60def main(args=None):
 61    action_info = {
 62        'PASSIVE_DEFAULT': ('PD', 'joints with zero torque'),
 63        'DAMPING_DEFAULT': ('DD', 'joints in damping mode'),
 64        'JOINT_DEFAULT': ('JD', 'Position Control Stand (joints locked)'),
 65        'STAND_DEFAULT': ('SD', 'Stable Stand (auto-balance)'),
 66        'LOCOMOTION_DEFAULT': ('LD', 'locomotion mode (walk or run)'),
 67    }
 68
 69    choices = {}
 70    for k, v in action_info.items():
 71        choices[v[0]] = k
 72
 73    rclpy.init(args=args)
 74    node = None
 75    try:
 76        # Prefer command-line argument, otherwise prompt for input
 77        if len(sys.argv) > 1:
 78            motion = sys.argv[1]
 79        else:
 80            print('{:<4} - {:<20} : {}'.format('abbr',
 81                  'robot mode', 'description'))
 82            for k, v in action_info.items():
 83                print(f'{v[0]:<4} - {k:<20} : {v[1]}')
 84            motion = input('Enter abbr of robot mode:')
 85
 86        action_name = choices.get(motion)
 87        if not action_name:
 88            raise ValueError(f'Invalid abbr of robot mode: {motion}')
 89
 90        node = SetMcActionClient()
 91        node.send_request(action_name)
 92    except KeyboardInterrupt:
 93        pass
 94    except Exception as e:
 95        rclpy.logging.get_logger('main').error(
 96            f'Program exited with exception: {e}')
 97
 98    if node:
 99        node.destroy_node()
100    if rclpy.ok():
101        rclpy.shutdown()
102
103
104if __name__ == '__main__':
105    main()

使用说明

# 使用命令行参数设置模式(推荐)
ros2 run py_examples set_mc_action JD  # 零力矩模式>>站姿预备(位控站立)
ros2 run py_examples set_mc_action SD  # 机器人已立起且脚着地后方可执行,站姿预备>>稳定站立 
# 稳定站立>>走跑模式 自动切换,无需手动切换

# 或者不带参数运行,程序会提示用户输入双字母缩写代码
ros2 run py_examples set_mc_action

输出示例

...
[INFO] [1764066567.502968540] [set_mc_action_client]: ✅ Robot mode set successfully.

注意事项

  • 切入STAND_DEFAULT模式前,确保机器人已立起且双脚着地

  • 模式切换可能需要几秒钟时间完成

接口参考

  • 服务:/aimdk_5Fmsgs/srv/SetMcAction

  • 消息:aimdk_msgs/srv/SetMcAction

6.1.3 设置机器人动作

该示例中用到了preset_motion_client,进入稳定站立模式后,启动节点程序,输入相应的字段值可实现左手(右手)的握手(举手、挥手、飞吻)动作。

可供调用的参数见预设动作表

 1#!/usr/bin/env python3
 2
 3import rclpy
 4import rclpy.logging
 5from rclpy.node import Node
 6
 7from aimdk_msgs.srv import SetMcPresetMotion
 8from aimdk_msgs.msg import McPresetMotion, McControlArea, RequestHeader, CommonState
 9
10
11class SetMcPresetMotionClient(Node):
12    def __init__(self):
13        super().__init__('preset_motion_client')
14        self.client = self.create_client(
15            SetMcPresetMotion, '/aimdk_5Fmsgs/srv/SetMcPresetMotion')
16        self.get_logger().info('✅ SetMcPresetMotion client node created.')
17
18        # Wait for the service to become available
19        while not self.client.wait_for_service(timeout_sec=2.0):
20            self.get_logger().info('⏳ Service unavailable, waiting...')
21
22        self.get_logger().info('🟢 Service available, ready to send request.')
23
24    def send_request(self, area_id: int, motion_id: int) -> bool:
25        request = SetMcPresetMotion.Request()
26        request.header = RequestHeader()
27
28        motion = McPresetMotion()
29        area = McControlArea()
30
31        motion.value = motion_id
32        area.value = area_id
33
34        request.motion = motion
35        request.area = area
36        request.interrupt = False
37
38        self.get_logger().info(
39            f'📨 Sending request to set preset motion: motion={motion_id}, area={area_id}')
40
41        for i in range(8):
42            request.header.stamp = self.get_clock().now().to_msg()
43            future = self.client.call_async(request)
44            rclpy.spin_until_future_complete(self, future, timeout_sec=0.25)
45
46            if future.done():
47                break
48
49            # retry as remote peer is NOT handled well by ROS
50            self.get_logger().info(f'trying ... [{i}]')
51
52        response = future.result()
53        if response is None:
54            self.get_logger().error('❌ Service call failed or timed out.')
55            return False
56
57        if response.response.header.code == 0:
58            self.get_logger().info(
59                f'✅ Preset motion set successfully: {response.response.task_id}')
60            return True
61        elif response.response.state.value == CommonState.RUNNING:
62            self.get_logger().info(
63                f'⏳ Preset motion executing: {response.response.task_id}')
64            return True
65        else:
66            self.get_logger().error(
67                f'❌ Failed to set preset motion: {response.response.task_id}'
68            )
69            return False
70
71
72def main(args=None):
73    rclpy.init(args=args)
74    node = None
75    try:
76        area = int(input("Enter arm area ID (1-left, 2-right): "))
77        motion = int(input(
78            "Enter preset motion ID (1001-raise,1002-wave,1003-handshake,1004-airkiss): "))
79
80        node = SetMcPresetMotionClient()
81        node.send_request(area, motion)
82    except KeyboardInterrupt:
83        pass
84    except Exception as e:
85        rclpy.logging.get_logger('main').error(
86            f'Program exited with exception: {e}')
87
88    if node:
89        node.destroy_node()
90    if rclpy.ok():
91        rclpy.shutdown()
92
93
94if __name__ == '__main__':
95    main()

6.1.4 夹爪控制

该示例中用到了hand_control,通过往话题/aima/hal/joint/hand/command发布消息来控制夹爪的运动

注意

注意⚠️ :运行本示例前需要在机器人运控计算单元(PC1)使用aima em stop-app mc关闭原生运控模块,获取机器人控制权。注意确保机器人安全

 1import rclpy
 2from rclpy.node import Node
 3from aimdk_msgs.msg import HandCommandArray, HandCommand, HandType, MessageHeader
 4import time
 5
 6
 7class HandControl(Node):
 8    def __init__(self):
 9        super().__init__('hand_control')
10
11        # Preset parameter list: [(left hand, right hand), ...]
12        self.position_pairs = [
13            (1.0, 1.0),   # fully open
14            (0.0, 0.0),   # fully closed
15            (0.5, 0.5),   # half open
16            (0.2, 0.8),   # left slightly closed, right more open
17            (0.7, 0.3)    # left more open, right slightly closed
18        ]
19        self.current_index = 0
20        self.last_switch_time = self.get_clock().now().nanoseconds / 1e9  # seconds
21
22        # Create publisher
23        self.publisher_ = self.create_publisher(
24            HandCommandArray,
25            '/aima/hal/joint/hand/command',
26            10
27        )
28
29        # 50 Hz timer
30        self.timer_ = self.create_timer(
31            0.02,  # 20 ms = 50 Hz
32            self.publish_hand_commands
33        )
34
35        self.get_logger().info("Hand control node started!")
36
37    def publish_hand_commands(self):
38        # Check time to decide whether to switch to the next preset
39        now_sec = self.get_clock().now().nanoseconds / 1e9
40        if now_sec - self.last_switch_time >= 2.0:
41            self.current_index = (self.current_index +
42                                  1) % len(self.position_pairs)
43            self.last_switch_time = now_sec
44            self.get_logger().info(
45                f"Switched to preset: {self.current_index}, left={self.position_pairs[self.current_index][0]:.2f}, right={self.position_pairs[self.current_index][1]:.2f}"
46            )
47
48        # Use current preset
49        left_position, right_position = self.position_pairs[self.current_index]
50
51        msg = HandCommandArray()
52        msg.header = MessageHeader()
53
54        # Configure left hand
55        left_hand = HandCommand()
56        left_hand.name = "left_hand"
57        left_hand.position = float(left_position)
58        left_hand.velocity = 1.0
59        left_hand.acceleration = 1.0
60        left_hand.deceleration = 1.0
61        left_hand.effort = 1.0
62
63        # Configure right hand
64        right_hand = HandCommand()
65        right_hand.name = "right_hand"
66        right_hand.position = float(right_position)
67        right_hand.velocity = 1.0
68        right_hand.acceleration = 1.0
69        right_hand.deceleration = 1.0
70        right_hand.effort = 1.0
71
72        msg.left_hand_type = HandType(value=2)  # gripper mode
73        msg.right_hand_type = HandType(value=2)
74        msg.left_hands = [left_hand]
75        msg.right_hands = [right_hand]
76
77        # Publish message
78        self.publisher_.publish(msg)
79        # We only log when switching presets to avoid too much log output
80
81
82def main(args=None):
83    rclpy.init(args=args)
84    hand_control_node = HandControl()
85
86    try:
87        rclpy.spin(hand_control_node)
88    except KeyboardInterrupt:
89        pass
90    finally:
91        hand_control_node.destroy_node()
92        rclpy.shutdown()
93
94
95if __name__ == '__main__':
96    main()

6.1.5 灵巧手控制

该示例中用到了omnihand_control,通过往话题/aima/hal/joint/hand/command发布消息来控制灵巧手的运动

注意

注意⚠️ :运行本示例前需要在机器人运控计算单元(PC1)使用aima em stop-app mc关闭原生运控模块,获取机器人控制权。注意确保机器人安全

  1import rclpy
  2from rclpy.node import Node
  3from aimdk_msgs.msg import HandCommandArray, HandCommand, HandType, MessageHeader
  4import time
  5
  6
  7class HandControl(Node):
  8    def __init__(self):
  9        super().__init__('hand_control')
 10
 11        # Create publisher
 12        self.publisher_ = self.create_publisher(
 13            HandCommandArray,
 14            '/aima/hal/joint/hand/command',
 15            10
 16        )
 17
 18        self.timer_ = self.create_timer(
 19            0.8,
 20            self.publish_hand_commands
 21        )
 22
 23        # Initialize variables
 24        self.target_finger = 0
 25        self.step = 1
 26        self.increasing = True
 27        self.get_logger().info("Hand control node started!")
 28
 29    def build_hand_cmd(self, name: str) -> HandCommand:
 30        cmd = HandCommand()
 31        cmd.name = name
 32        cmd.position = 0.0
 33        cmd.velocity = 0.1
 34        cmd.acceleration = 0.0
 35        cmd.deceleration = 0.0
 36        cmd.effort = 0.0
 37        return cmd
 38
 39    def publish_hand_commands(self):
 40        msg = HandCommandArray()
 41        msg.header.stamp = self.get_clock().now().to_msg()
 42        msg.header.frame_id = 'hand_command'
 43        msg.left_hand_type.value = 1      # NIMBLE_HANDS
 44        msg.right_hand_type.value = 1     # NIMBLE_HANDS
 45
 46        # left hand
 47        msg.left_hands = [self.build_hand_cmd('') for _ in range(10)]
 48        msg.left_hands[0].name = 'left_thumb'
 49        for i in range(1, 10):
 50            msg.left_hands[i].name = 'left_index'
 51
 52        # right hand
 53        msg.right_hands = [self.build_hand_cmd('') for _ in range(10)]
 54        msg.right_hands[0].name = 'right_thumb'
 55        for i in range(1, 10):
 56            msg.right_hands[i].name = 'right_pinky'
 57
 58        if self.target_finger < 10:
 59            msg.right_hands[self.target_finger].position = 0.8
 60        else:
 61            target_finger_ = self.target_finger - 10
 62            target_position = 0.8
 63            if target_finger_ < 3:
 64                # The three thumb motors on the left hand need their signs inverted to mirror the right hand's motion
 65                target_position = -target_position
 66            msg.left_hands[target_finger_].position = target_position
 67
 68        self.publisher_.publish(msg)
 69        self.get_logger().info(
 70            f'Published hand command with target_finger: {self.target_finger}')
 71        self.update_target_finger()
 72
 73    def update_target_finger(self):
 74        if self.increasing:
 75            self.target_finger += self.step
 76            if self.target_finger >= 19:
 77                self.target_finger = 19
 78                self.increasing = False
 79        else:
 80            self.target_finger -= self.step
 81            if self.target_finger <= 0:
 82                self.target_finger = 0
 83                self.increasing = True
 84
 85
 86def main(args=None):
 87    rclpy.init(args=args)
 88    hand_control_node = HandControl()
 89
 90    try:
 91        rclpy.spin(hand_control_node)
 92    except KeyboardInterrupt:
 93        pass
 94    finally:
 95        hand_control_node.destroy_node()
 96        rclpy.shutdown()
 97
 98
 99if __name__ == '__main__':
100    main()

6.1.6 注册二开输入源

对于v0.7之后的版本,在实现对MC的控制前,需要先注册输入源。该示例中通过/aimdk_5Fmsgs/srv/SetMcInputSource服务来注册二开的输入源,让MC感知到,只有注册过输入源后才能实现机器人的速度控制

  1#!/usr/bin/env python3
  2
  3import rclpy
  4import rclpy.logging
  5from rclpy.node import Node
  6
  7from aimdk_msgs.srv import SetMcInputSource
  8from aimdk_msgs.msg import RequestHeader, McInputAction
  9
 10
 11class McInputClient(Node):
 12    def __init__(self):
 13        super().__init__('set_mc_input_source_client')
 14        self.client = self.create_client(
 15            SetMcInputSource, '/aimdk_5Fmsgs/srv/SetMcInputSource'
 16        )
 17
 18        self.get_logger().info('✅ SetMcInputSource client node created.')
 19
 20        # Wait for the service to become available
 21        while not self.client.wait_for_service(timeout_sec=2.0):
 22            self.get_logger().info('⏳ Service unavailable, waiting...')
 23
 24        self.get_logger().info('🟢 Service available, ready to send request.')
 25
 26    def send_request(self):
 27        req = SetMcInputSource.Request()
 28
 29        # header
 30        req.request.header = RequestHeader()
 31
 32        # action (e.g. 1001 = register)
 33        req.action = McInputAction()
 34        req.action.value = 1001
 35
 36        # input source info
 37        req.input_source.name = 'node'
 38        req.input_source.priority = 40
 39        req.input_source.timeout = 1000  # ms
 40
 41        # Send request and wait for response
 42        self.get_logger().info(
 43            f'📨 Sending input source request: action_id={req.action.value}, '
 44            f'name={req.input_source.name}, priority={req.input_source.priority}'
 45        )
 46        for i in range(8):
 47            req.request.header.stamp = self.get_clock().now().to_msg()
 48            future = self.client.call_async(req)
 49            rclpy.spin_until_future_complete(
 50                self, future, timeout_sec=0.25)
 51
 52            if future.done():
 53                break
 54
 55            # retry as remote peer is NOT handled well by ROS
 56            self.get_logger().info(f'trying ... [{i}]')
 57
 58        if not future.done():
 59            self.get_logger().error('❌ Service call failed or timed out.')
 60            return False
 61
 62        response = future.result()
 63        ret_code = response.response.header.code
 64        task_id = response.response.task_id
 65
 66        if ret_code == 0:
 67            self.get_logger().info(
 68                f'✅ Input source set successfully. task_id={task_id}'
 69            )
 70            return True
 71        else:
 72            self.get_logger().error(
 73                f'❌ Input source set failed. ret_code={ret_code}, task_id={task_id} (duplicated ADD? or MODIFY/ENABLE/DISABLE for unknown source?)'
 74            )
 75            return False
 76
 77
 78def main(args=None):
 79    rclpy.init(args=args)
 80
 81    node = None
 82    try:
 83        node = McInputClient()
 84        ok = node.send_request()
 85        if not ok:
 86            node.get_logger().error('Input source request failed.')
 87    except KeyboardInterrupt:
 88        pass
 89    except Exception as e:
 90        rclpy.logging.get_logger('main').error(
 91            f'Program exited with exception: {e}')
 92
 93    if node:
 94        node.destroy_node()
 95    if rclpy.ok():
 96        rclpy.shutdown()
 97
 98
 99if __name__ == '__main__':
100    main()

6.1.7 获取当前输入源

该示例中用到了GetCurrentInputSource服务,用于获取当前已注册的输入源信息,包括输入源名称、优先级和超时时间等信息。

 1#!/usr/bin/env python3
 2
 3import rclpy
 4from rclpy.node import Node
 5from aimdk_msgs.srv import GetCurrentInputSource
 6from aimdk_msgs.msg import CommonRequest
 7
 8
 9class GetCurrentInputSourceClient(Node):
10    def __init__(self):
11        super().__init__('get_current_input_source_client')
12        self.client = self.create_client(
13            GetCurrentInputSource,
14            '/aimdk_5Fmsgs/srv/GetCurrentInputSource'
15        )
16
17        self.get_logger().info('✅ GetCurrentInputSource client node created.')
18
19        # Wait for the service to become available
20        while not self.client.wait_for_service(timeout_sec=2.0):
21            self.get_logger().info('⏳ Service unavailable, waiting...')
22
23        self.get_logger().info('🟢 Service available, ready to send request.')
24
25    def send_request(self):
26        # Create request
27        req = GetCurrentInputSource.Request()
28        req.request = CommonRequest()
29
30        # Send request and wait for response
31        self.get_logger().info('📨 Sending request to get current input source')
32        for i in range(8):
33            req.request.header.stamp = self.get_clock().now().to_msg()
34            future = self.client.call_async(req)
35            rclpy.spin_until_future_complete(self, future, timeout_sec=0.25)
36
37            if future.done():
38                break
39
40            # retry as remote peer is NOT handled well by ROS
41            self.get_logger().info(f'trying ... [{i}]')
42
43        if not future.done():
44            self.get_logger().error('❌ Service call failed or timed out.')
45            return False
46
47        response = future.result()
48        ret_code = response.response.header.code
49        if ret_code == 0:
50            self.get_logger().info(
51                f'✅ Current input source get successfully:')
52            self.get_logger().info(
53                f'Name: {response.input_source.name}')
54            self.get_logger().info(
55                f'Priority: {response.input_source.priority}')
56            self.get_logger().info(
57                f'Timeout: {response.input_source.timeout}')
58            return True
59        else:
60            self.get_logger().error(
61                f'❌ Current input source get failed, return code: {ret_code}')
62            return False
63
64
65def main(args=None):
66    rclpy.init(args=args)
67
68    node = None
69    try:
70        node = GetCurrentInputSourceClient()
71        success = node.send_request()
72    except KeyboardInterrupt:
73        pass
74    except Exception as e:
75        rclpy.logging.get_logger('main').error(
76            f'Program exited with exception: {e}')
77
78    if node:
79        node.destroy_node()
80    if rclpy.ok():
81        rclpy.shutdown()
82
83
84if __name__ == '__main__':
85    main()

使用说明

# 获取当前输入源信息
ros2 run py_examples get_current_input_source

输出示例

[INFO] [get_current_input_source_client]: 当前输入源: node
[INFO] [get_current_input_source_client]: 优先级: 40
[INFO] [get_current_input_source_client]: 超时时间: 1000

注意事项

  • 确保GetCurrentInputSource服务正常运行

  • 需要在注册输入源之后才能获取到有效信息

  • 状态码为0表示查询成功

6.1.8 控制机器人走跑

该示例中用到了mc_locomotion_velocity,以下示例通过发布/aima/mc/locomotion/velocity话题控制机器人的行走,对于v0.7之后的版本,实现速度控制前需要注册输入源(该示例已实现注册输入源),具体注册步骤可参考代码。

进入稳定站立模式后执行本程序

  1#!/usr/bin/env python3
  2
  3import rclpy
  4from rclpy.node import Node
  5import time
  6import signal
  7import sys
  8
  9from aimdk_msgs.msg import McLocomotionVelocity, MessageHeader
 10from aimdk_msgs.srv import SetMcInputSource
 11
 12
 13class DirectVelocityControl(Node):
 14    def __init__(self):
 15        super().__init__('direct_velocity_control')
 16
 17        self.publisher = self.create_publisher(
 18            McLocomotionVelocity, '/aima/mc/locomotion/velocity', 10)
 19        self.client = self.create_client(
 20            SetMcInputSource, '/aimdk_5Fmsgs/srv/SetMcInputSource')
 21
 22        self.forward_velocity = 0.0
 23        self.lateral_velocity = 0.0
 24        self.angular_velocity = 0.0
 25
 26        self.max_forward_speed = 1.0
 27        self.min_forward_speed = 0.2
 28
 29        self.max_lateral_speed = 1.0
 30        self.min_lateral_speed = 0.2
 31
 32        self.max_angular_speed = 1.0
 33        self.min_angular_speed = 0.1
 34
 35        self.timer = None
 36
 37        self.get_logger().info("Direct velocity control node started!")
 38
 39    def start_publish(self):
 40        if not self.timer:
 41            self.timer = self.create_timer(0.02, self.publish_velocity)
 42
 43    def register_input_source(self):
 44        self.get_logger().info("Registering input source...")
 45
 46        timeout_sec = 8.0
 47        start = self.get_clock().now().nanoseconds / 1e9
 48
 49        while not self.client.wait_for_service(timeout_sec=2.0):
 50            now = self.get_clock().now().nanoseconds / 1e9
 51            if now - start > timeout_sec:
 52                self.get_logger().error("Waiting for service timed out")
 53                return False
 54            self.get_logger().info("Waiting for input source service...")
 55
 56        req = SetMcInputSource.Request()
 57        req.action.value = 1001
 58        req.input_source.name = "node"
 59        req.input_source.priority = 40
 60        req.input_source.timeout = 1000
 61
 62        for i in range(8):
 63            req.request.header.stamp = self.get_clock().now().to_msg()
 64            future = self.client.call_async(req)
 65            rclpy.spin_until_future_complete(self, future, timeout_sec=0.25)
 66
 67            if future.done():
 68                break
 69
 70            # retry as remote peer is NOT handled well by ROS
 71            self.get_logger().info(f"trying to register input source... [{i}]")
 72
 73        if future.done():
 74            try:
 75                response = future.result()
 76                state = response.response.state.value
 77                self.get_logger().info(
 78                    f"Input source set successfully: state={state}, task_id={response.response.task_id}")
 79                return True
 80            except Exception as e:
 81                self.get_logger().error(f"Service call exception: {str(e)}")
 82                return False
 83        else:
 84            self.get_logger().error("Service call failed or timed out")
 85            return False
 86
 87    def publish_velocity(self):
 88        msg = McLocomotionVelocity()
 89        msg.header = MessageHeader()
 90        msg.header.stamp = self.get_clock().now().to_msg()
 91        msg.source = "node"
 92        msg.forward_velocity = self.forward_velocity
 93        msg.lateral_velocity = self.lateral_velocity
 94        msg.angular_velocity = self.angular_velocity
 95
 96        self.publisher.publish(msg)
 97
 98        self.get_logger().info(
 99            f"Publishing velocity: forward {self.forward_velocity:.2f} m/s, "
100            f"lateral {self.lateral_velocity:.2f} m/s, "
101            f"angular {self.angular_velocity:.2f} rad/s"
102        )
103
104    def set_forward(self, forward):
105        # check value range, mc has thresholds to start movement
106        if abs(forward) < 0.005:
107            self.forward_velocity = 0.0
108            return True
109        elif abs(forward) > self.max_forward_speed or abs(forward) < self.min_forward_speed:
110            raise ValueError("out of range")
111        else:
112            self.forward_velocity = forward
113            return True
114
115    def set_lateral(self, lateral):
116        # check value range, mc has thresholds to start movement
117        if abs(lateral) < 0.005:
118            self.lateral_velocity = 0.0
119            return True
120        elif abs(lateral) > self.max_lateral_speed or abs(lateral) < self.min_lateral_speed:
121            raise ValueError("out of range")
122        else:
123            self.lateral_velocity = lateral
124            return True
125
126    def set_angular(self, angular):
127        # check value range, mc has thresholds to start movement
128        if abs(angular) < 0.005:
129            self.angular_velocity = 0.0
130            return True
131        elif abs(angular) > self.max_angular_speed or abs(angular) < self.min_angular_speed:
132            raise ValueError("out of range")
133        else:
134            self.angular_velocity = angular
135            return True
136
137    def clear_velocity(self):
138        self.forward_velocity = 0.0
139        self.lateral_velocity = 0.0
140        self.angular_velocity = 0.0
141
142
143# Global node instance for signal handling
144global_node = None
145
146
147def signal_handler(sig, frame):
148    global global_node
149    if global_node is not None:
150        global_node.clear_velocity()
151        global_node.get_logger().info(
152            f"Received signal {sig}, clearing velocity and shutting down")
153    rclpy.shutdown()
154    sys.exit(0)
155
156
157def main():
158    global global_node
159    rclpy.init()
160
161    node = DirectVelocityControl()
162    global_node = node
163
164    signal.signal(signal.SIGINT, signal_handler)
165    signal.signal(signal.SIGTERM, signal_handler)
166
167    if not node.register_input_source():
168        node.get_logger().error("Input source registration failed, exiting")
169        rclpy.shutdown()
170        return
171
172    # get and check control values
173    # notice that mc has thresholds to start movement
174    try:
175        # get input forward
176        forward = float(
177            input("Please enter forward velocity 0 or ±(0.2 ~ 1.0) m/s: "))
178        node.set_forward(forward)
179        # get input lateral
180        lateral = float(
181            input("Please enter lateral velocity 0 or ±(0.2 ~ 1.0) m/s: "))
182        node.set_lateral(lateral)
183        # get input angular
184        angular = float(
185            input("Please enter angular velocity 0 or ±(0.1 ~ 1.0) rad/s: "))
186        node.set_angular(angular)
187    except Exception as e:
188        node.get_logger().error(f"Invalid input: {e}")
189        rclpy.shutdown()
190        return
191
192    node.get_logger().info("Setting velocity, moving for 5 seconds")
193    node.start_publish()
194
195    start = node.get_clock().now()
196    while (node.get_clock().now() - start).nanoseconds / 1e9 < 5.0:
197        rclpy.spin_once(node, timeout_sec=0.1)
198        time.sleep(0.001)
199
200    node.clear_velocity()
201    node.get_logger().info("5-second motion finished, robot stopped")
202
203    rclpy.spin(node)
204    rclpy.shutdown()
205
206
207if __name__ == '__main__':
208    main()

6.1.9 关节电机控制

本示例展示了如何使用ROS2和Ruckig库来控制机器人的关节运动。

注意

注意⚠️ :运行本示例前需要在机器人运控计算单元(PC1)使用aima em stop-app mc关闭原生运控模块,获取机器人控制权。注意确保机器人安全

!该示例代码直接对底层电机也就是HAL层进行控制,在运行程序前请检查代码中对关节的安全限位是否与所用机器人相符, 并确保安全!

机器人关节控制示例

这个示例展示了如何使用ROS2和Ruckig库来控制机器人的关节运动。示例实现了以下功能:

  1. 机器人关节模型定义

  2. 基于Ruckig的轨迹插补

  3. 多关节协同控制

  4. 实时位置、速度和加速度控制

示例功能说明

  1. 创建了四个控制器节点,分别控制:

    • 腿部x2(12个关节)

    • 腰部x1(3个关节)

    • 手臂x2(14个关节)

    • 头部x1(2个关节)

  2. 演示功能:

    • 每10秒让指定关节在正负0.5弧度之间摆动

    • 使用Ruckig库实现平滑的运动轨迹

    • 实时发布关节控制命令

自定义使用

  1. 添加新的控制逻辑:

    • 添加新的控制回调函数

    • 自由组合关节运动

  2. 调整控制频率:

    • 修改control_timer_的周期(当前为2ms)

  1#!/usr/bin/env python3
  2"""
  3Robot joint control example
  4This script implements a ROS2-based robot joint control system, using the Ruckig trajectory
  5planner to achieve smooth joint motion control.
  6
  7Main features:
  81. Supports controlling multiple joint areas (head, arm, waist, leg)
  92. Uses Ruckig for trajectory planning to ensure smooth motion
 103. Supports real-time control of joint position, velocity, and acceleration
 114. Provides joint limit and PID (stiffness/damping) parameter configuration
 12"""
 13
 14import rclpy
 15from rclpy.node import Node
 16from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy, DurabilityPolicy
 17from aimdk_msgs.msg import JointCommandArray, JointStateArray, JointCommand
 18from std_msgs.msg import Header
 19import ruckig
 20from enum import Enum
 21from dataclasses import dataclass
 22from typing import List, Dict
 23from threading import Lock
 24
 25# QoS config: define ROS2 Quality of Service parameters
 26# Subscriber QoS: best-effort reliability, keep last 10 messages
 27subscriber_qos = QoSProfile(
 28    reliability=ReliabilityPolicy.BEST_EFFORT,
 29    history=HistoryPolicy.KEEP_LAST,
 30    depth=10,
 31    durability=DurabilityPolicy.VOLATILE
 32)
 33
 34# Publisher QoS: reliable transport, keep last 10 messages
 35publisher_qos = QoSProfile(
 36    reliability=ReliabilityPolicy.RELIABLE,
 37    history=HistoryPolicy.KEEP_LAST,
 38    depth=10,
 39    durability=DurabilityPolicy.VOLATILE
 40)
 41
 42
 43class JointArea(Enum):
 44    HEAD = 'HEAD'    # Head joints
 45    ARM = 'ARM'      # Arm joints
 46    WAIST = 'WAIST'  # Waist joints
 47    LEG = 'LEG'      # Leg joints
 48
 49
 50@dataclass
 51class JointInfo:
 52    # Joint information data class
 53    name: str           # Joint name
 54    lower_limit: float  # Joint lower angle limit
 55    upper_limit: float  # Joint upper angle limit
 56    kp: float           # Position control proportional gain
 57    kd: float           # Velocity control derivative gain
 58
 59
 60# Robot model configuration: define all joint parameters
 61robot_model: Dict[JointArea, List[JointInfo]] = {
 62    # Leg joint configuration
 63    JointArea.LEG: [
 64        # Left leg joints
 65        JointInfo("left_hip_pitch_joint", -2.704, 2.556, 40.0, 4.0),
 66        JointInfo("left_hip_roll_joint", -0.235, 2.906, 40.0, 4.0),
 67        JointInfo("left_hip_yaw_joint", -1.684, 3.430, 30.0, 3.0),
 68        JointInfo("left_knee_joint", 0.0000, 2.4073, 80.0, 8.0),
 69        JointInfo("left_ankle_pitch_joint", -0.803, 0.453, 40.0, 4.0),
 70        JointInfo("left_ankle_roll_joint", -0.2625, 0.2625, 20.0, 2.0),
 71        # Right leg joints
 72        JointInfo("right_hip_pitch_joint", -2.704, 2.556, 40.0, 4.0),
 73        JointInfo("right_hip_roll_joint", -2.906, 0.235, 40.0, 4.0),
 74        JointInfo("right_hip_yaw_joint", -3.430, 1.684, 30.0, 3.0),
 75        JointInfo("right_knee_joint", 0.0000, 2.4073, 80.0, 8.0),
 76        JointInfo("right_ankle_pitch_joint", -0.803, 0.453, 40.0, 4.0),
 77        JointInfo("right_ankle_roll_joint", -0.2625, 0.2625, 20.0, 2.0),
 78    ],
 79    # Waist joint configuration
 80    JointArea.WAIST: [
 81        JointInfo("waist_yaw_joint", -3.43, 2.382, 20.0, 4.0),
 82        JointInfo("waist_pitch_joint", -0.314, 0.314, 20.0, 4.0),
 83        JointInfo("waist_roll_joint", -0.488, 0.488, 20.0, 4.0),
 84    ],
 85    # Arm joint configuration
 86    JointArea.ARM: [
 87        # Left arm
 88        JointInfo("left_shoulder_pitch_joint", -3.08, 2.04, 20.0, 2.0),
 89        JointInfo("left_shoulder_roll_joint", -0.061, 2.993, 20.0, 2.0),
 90        JointInfo("left_shoulder_yaw_joint", -2.556, 2.556, 20.0, 2.0),
 91        JointInfo("left_elbow_joint", -2.3556, 0.0, 20.0, 2.0),
 92        JointInfo("left_wrist_yaw_joint", -2.556, 2.556, 20.0, 2.0),
 93        JointInfo("left_wrist_pitch_joint", -0.558, 0.558, 20.0, 2.0),
 94        JointInfo("left_wrist_roll_joint", -1.571, 0.724, 20.0, 2.0),
 95        # Right arm
 96        JointInfo("right_shoulder_pitch_joint", -3.08, 2.04, 20.0, 2.0),
 97        JointInfo("right_shoulder_roll_joint", -2.993, 0.061, 20.0, 2.0),
 98        JointInfo("right_shoulder_yaw_joint", -2.556, 2.556, 20.0, 2.0),
 99        JointInfo("right_elbow_joint", -2.3556, 0.0000, 20.0, 2.0),
100        JointInfo("right_wrist_yaw_joint", -2.556, 2.556, 20.0, 2.0),
101        JointInfo("right_wrist_pitch_joint", -0.558, 0.558, 20.0, 2.0),
102        JointInfo("right_wrist_roll_joint", -0.724, 1.571, 20.0, 2.0),
103    ],
104    # Head joint configuration
105    JointArea.HEAD: [
106        JointInfo("head_yaw_joint", -0.366, 0.366, 20.0, 2.0),
107        JointInfo("head_pitch_joint", -0.3838, 0.3838, 20.0, 2.0),
108    ],
109}
110
111
112class JointControllerNode(Node):
113    """
114    Joint controller node
115    Responsible for receiving joint states, using Ruckig for trajectory planning,
116    and publishing joint commands.
117    """
118
119    def __init__(self, node_name: str, sub_topic: str, pub_topic: str, area: JointArea, dofs: int):
120        """
121        Initialize joint controller
122        Args:
123            node_name: node name
124            sub_topic: topic name to subscribe (joint states)
125            pub_topic: topic name to publish (joint commands)
126            area: joint area (head/arm/waist/leg)
127            dofs: number of DOFs
128        """
129        super().__init__(node_name)
130        self.lock = Lock()
131        self.joint_info = robot_model[area]
132        self.dofs = dofs
133        self.ruckig = ruckig.Ruckig(dofs, 0.002)  # 2 ms control period
134        self.input = ruckig.InputParameter(dofs)
135        self.output = ruckig.OutputParameter(dofs)
136        self.ruckig_initialized = False
137
138        # Initialize trajectory parameters
139        self.input.current_position = [0.0] * dofs
140        self.input.current_velocity = [0.0] * dofs
141        self.input.current_acceleration = [0.0] * dofs
142
143        # Motion limits
144        self.input.max_velocity = [1.0] * dofs
145        self.input.max_acceleration = [1.0] * dofs
146        self.input.max_jerk = [25.0] * dofs
147
148        # ROS2 subscriber and publisher
149        self.sub = self.create_subscription(
150            JointStateArray,
151            sub_topic,
152            self.joint_state_callback,
153            subscriber_qos
154        )
155        self.pub = self.create_publisher(
156            JointCommandArray,
157            pub_topic,
158            publisher_qos
159        )
160
161    def joint_state_callback(self, msg: JointStateArray):
162        """
163        Joint state callback
164        Receives and processes joint state messages
165        """
166        self.ruckig_initialized = True
167
168    def control_callback(self, joint_idx):
169        """
170        Control callback
171        Uses Ruckig for trajectory planning and publishes control commands
172        Args:
173            joint_idx: target joint index
174        """
175        # Run Ruckig until the target is reached
176        while self.ruckig.update(self.input, self.output) in [ruckig.Result.Working, ruckig.Result.Finished]:
177            # Update current state
178            self.input.current_position = self.output.new_position
179            self.input.current_velocity = self.output.new_velocity
180            self.input.current_acceleration = self.output.new_acceleration
181
182            # Check if target is reached
183            tolerance = 1e-6
184            current_p = self.output.new_position[joint_idx]
185            if abs(current_p - self.input.target_position[joint_idx]) < tolerance:
186                break
187
188            # Create and publish command
189            cmd = JointCommandArray()
190            for i, joint in enumerate(self.joint_info):
191                j = JointCommand()
192                j.name = joint.name
193                j.position = self.output.new_position[i]
194                j.velocity = self.output.new_velocity[i]
195                j.effort = 0.0
196                j.stiffness = joint.kp
197                j.damping = joint.kd
198                cmd.joints.append(j)
199
200            self.pub.publish(cmd)
201
202    def set_target_position(self, joint_name, position):
203        """
204        Set target joint position
205        Args:
206            joint_name: joint name
207            position: target position
208        """
209        p_s = [0.0] * self.dofs
210        joint_idx = 0
211        for i, joint in enumerate(self.joint_info):
212            if joint.name == joint_name:
213                p_s[i] = position
214                joint_idx = i
215        self.input.target_position = p_s
216        self.input.target_velocity = [0.0] * self.dofs
217        self.input.target_acceleration = [0.0] * self.dofs
218        self.control_callback(joint_idx)
219
220
221def main(args=None):
222    """
223    Main function
224    Initialize ROS2 node and start joint controller
225    """
226    rclpy.init(args=args)
227
228    # Create leg controller node
229    leg_node = JointControllerNode(
230        "leg_node",
231        "/aima/hal/joint/leg/state",
232        "/aima/hal/joint/leg/command",
233        JointArea.LEG,
234        12
235    )
236
237    # waist_node = JointControllerNode(
238    #     "waist_node",
239    #     "/aima/hal/joint/waist/state",
240    #     "/aima/hal/joint/waist/command",
241    #     JointArea.WAIST,
242    #     3
243    # )
244
245    # arm_node = JointControllerNode(
246    #     "arm_node",
247    #     "/aima/hal/joint/arm/state",
248    #     "/aima/hal/joint/arm/command",
249    #     JointArea.ARM,
250    #     14
251    # )
252
253    # head_node = JointControllerNode(
254    #     "head_node",
255    #     "/aima/hal/joint/head/state",
256    #     "/aima/hal/joint/head/command",
257    #     JointArea.HEAD,
258    #     2
259    # )
260
261    position = 0.8
262
263    # Only control the left leg joint. If you want to control a specific joint, assign it directly.
264    def timer_callback():
265        """
266        Timer callback
267        Periodically change target position to achieve oscillating motion
268        """
269        nonlocal position
270        position = -position
271        position = 1.3 + position
272        leg_node.set_target_position("left_knee_joint", position)
273
274    #     arm_node.set_target_position("left_shoulder_pitch_joint", position)
275    #     waist_node.set_target_position("waist_yaw_joint", position)
276    #     head_node.set_target_position("head_pitch_joint", position)
277
278    leg_node.create_timer(3.0, timer_callback)
279
280    # Multi-threaded executor
281    executor = rclpy.executors.MultiThreadedExecutor()
282    executor.add_node(leg_node)
283
284    # executor.add_node(waist_node)
285    # executor.add_node(arm_node)
286    # executor.add_node(head_node)
287
288    try:
289        executor.spin()
290    except KeyboardInterrupt:
291        pass
292    finally:
293        leg_node.destroy_node()
294        # waist_node.destroy_node()
295        # arm_node.destroy_node()
296        # head_node.destroy_node()
297        if rclpy.ok():
298            rclpy.shutdown()
299
300
301if __name__ == '__main__':
302    main()

6.1.10 键盘控制机器人

本示例实现了通过PC的键盘输入控制机器人的前进后退转弯的功能。

通过WASD控制机器人行走方向,增加/减少速度(±0.2 m/s),Q/E增加/减少角速度(±0.1 rad/s),ESC退出程序并释放终端资源,Space立即将速度归零,执行急停

小心

注意:运行本示例前需要先使用手柄,将机器人切入稳定站立模式。(位控站立/走跑模式时按R2+X, 其他模式详见模式流转图进行操作),然后在机器人的终端界面使用:aima em stop-app rc关闭遥控器,防止通道占用。

实现键盘控制前需要注册输入源(该示例已实现注册输入源)

  1#!/usr/bin/env python3
  2
  3import rclpy
  4import rclpy.logging
  5from rclpy.node import Node
  6from aimdk_msgs.msg import McLocomotionVelocity, MessageHeader
  7from aimdk_msgs.srv import SetMcInputSource
  8import curses
  9import time
 10from functools import partial
 11
 12
 13class KeyboardVelocityController(Node):
 14    def __init__(self, stdscr):
 15        super().__init__('keyboard_velocity_controller')
 16        self.stdscr = stdscr
 17        self.forward_velocity = 0.0
 18        self.lateral_velocity = 0.0
 19        self.angular_velocity = 0.0
 20        self.step = 0.2
 21        self.angular_step = 0.1
 22
 23        self.publisher = self.create_publisher(
 24            McLocomotionVelocity, '/aima/mc/locomotion/velocity', 10)
 25        self.client = self.create_client(
 26            SetMcInputSource, '/aimdk_5Fmsgs/srv/SetMcInputSource')
 27
 28        if not self.register_input_source():
 29            self.get_logger().error("Input source registration failed, exiting")
 30            raise RuntimeError("Failed to register input source")
 31
 32        # Configure curses
 33        curses.cbreak()
 34        curses.noecho()
 35        self.stdscr.keypad(True)
 36        self.stdscr.nodelay(True)
 37
 38        self.get_logger().info(
 39            "Control started: W/S forward/backward, A/D strafe, Q/E turn, Space stop, Esc exit")
 40
 41        # Timer: check keyboard every 50 ms
 42        self.timer = self.create_timer(0.05, self.check_key_and_publish)
 43
 44    def register_input_source(self):
 45        self.get_logger().info("Registering input source...")
 46
 47        timeout_sec = 8.0
 48        start = self.get_clock().now().nanoseconds / 1e9
 49
 50        while not self.client.wait_for_service(timeout_sec=2.0):
 51            now = self.get_clock().now().nanoseconds / 1e9
 52            if now - start > timeout_sec:
 53                self.get_logger().error("Waiting for service timed out")
 54                return False
 55            self.get_logger().info("Waiting for input source service...")
 56
 57        req = SetMcInputSource.Request()
 58        req.action.value = 1001
 59        req.input_source.name = "node"
 60        req.input_source.priority = 40
 61        req.input_source.timeout = 1000
 62
 63        for i in range(8):
 64            req.request.header.stamp = self.get_clock().now().to_msg()
 65            future = self.client.call_async(req)
 66            rclpy.spin_until_future_complete(self, future, timeout_sec=0.25)
 67
 68            if future.done():
 69                break
 70
 71            # retry as remote peer is NOT handled well by ROS
 72            self.get_logger().info(f"trying to register input source... [{i}]")
 73
 74        if future.done():
 75            try:
 76                resp = future.result()
 77                state = resp.response.state.value
 78                self.get_logger().info(
 79                    f"Input source set successfully: state={state}, task_id={resp.response.task_id}")
 80                return True
 81            except Exception as e:
 82                self.get_logger().error(f"Service exception: {str(e)}")
 83                return False
 84        else:
 85            self.get_logger().error("Service call failed or timed out")
 86            return False
 87
 88    def check_key_and_publish(self):
 89        try:
 90            ch = self.stdscr.getch()
 91        except Exception:
 92            ch = -1
 93
 94        if ch != -1:
 95            if ch == ord(' '):
 96                self.forward_velocity = 0.0
 97                self.lateral_velocity = 0.0
 98                self.angular_velocity = 0.0
 99            elif ch == ord('w'):
100                self.forward_velocity = min(
101                    self.forward_velocity + self.step, 1.0)
102            elif ch == ord('s'):
103                self.forward_velocity = max(
104                    self.forward_velocity - self.step, -1.0)
105            elif ch == ord('a'):
106                self.lateral_velocity = min(
107                    self.lateral_velocity + self.step, 1.0)
108            elif ch == ord('d'):
109                self.lateral_velocity = max(
110                    self.lateral_velocity - self.step, -1.0)
111            elif ch == ord('q'):
112                self.angular_velocity = min(
113                    self.angular_velocity + self.angular_step, 1.0)
114            elif ch == ord('e'):
115                self.angular_velocity = max(
116                    self.angular_velocity - self.angular_step, -1.0)
117            elif ch == 27:  # ESC
118                self.get_logger().info("Exiting control")
119                rclpy.shutdown()
120                return
121
122        msg = McLocomotionVelocity()
123        msg.header = MessageHeader()
124        msg.header.stamp = self.get_clock().now().to_msg()
125        msg.source = "node"
126        msg.forward_velocity = self.forward_velocity
127        msg.lateral_velocity = self.lateral_velocity
128        msg.angular_velocity = self.angular_velocity
129
130        self.publisher.publish(msg)
131
132        # Update UI
133        self.stdscr.clear()
134        self.stdscr.addstr(
135            0, 0, "W/S: Forward/Backward | A/D: Strafe | Q/E: Turn | Space: Stop | ESC: Exit")
136        self.stdscr.addstr(2, 0,
137                           f"Speed Status: Forward: {self.forward_velocity:.2f} m/s | "
138                           f"Lateral: {self.lateral_velocity:.2f} m/s | "
139                           f"Angular: {self.angular_velocity:.2f} rad/s")
140        self.stdscr.refresh()
141
142
143def curses_main(stdscr):
144    rclpy.init()
145    try:
146        node = KeyboardVelocityController(stdscr)
147        rclpy.spin(node)
148    except Exception as e:
149        rclpy.logging.get_logger("main").fatal(
150            f"Program exited with exception: {e}")
151    finally:
152        curses.endwin()
153        rclpy.shutdown()
154
155
156def main():
157    curses.wrapper(curses_main)
158
159
160if __name__ == '__main__':
161    main()

6.1.11 拍照

该示例中用到了take_photo,在运行节点程序前,修改需要拍照的相机话题,启动节点程序后,会创建/images/目录,将当前帧图像保存在这个目录下。

 1#!/usr/bin/env python3
 2import time
 3from pathlib import Path
 4
 5import rclpy
 6from rclpy.node import Node
 7from rclpy.qos import qos_profile_sensor_data
 8from sensor_msgs.msg import Image
 9from cv_bridge import CvBridge
10import cv2
11
12
13class SaveOneRawPy(Node):
14    def __init__(self):
15        super().__init__('save_one_image')
16
17        # parameter: image topic
18        self.declare_parameter(
19            'image_topic', '/aima/hal/sensor/stereo_head_front_left/rgb_image'
20        )
21        self.topic = self.get_parameter(
22            'image_topic').get_parameter_value().string_value
23
24        # save directory
25        self.save_dir = Path('images').resolve()
26        self.save_dir.mkdir(parents=True, exist_ok=True)
27
28        # state
29        self._saved = False
30        self._bridge = CvBridge()
31
32        # subscriber (sensor QoS)
33        self.sub = self.create_subscription(
34            Image,
35            self.topic,
36            self.image_cb,
37            qos_profile_sensor_data
38        )
39        self.get_logger().info(f'Subscribing to raw image: {self.topic}')
40        self.get_logger().info(f'Images will be saved to: {self.save_dir}')
41
42    def image_cb(self, msg: Image):
43        # already saved one, ignore later frames
44        if self._saved:
45            return
46
47        try:
48            enc = msg.encoding.lower()
49            self.get_logger().info(f'Received image with encoding: {enc}')
50
51            # convert from ROS Image to cv2
52            img = self._bridge.imgmsg_to_cv2(
53                msg, desired_encoding='passthrough')
54
55            # normalize to BGR for saving
56            if enc == 'rgb8':
57                img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
58            elif enc == 'mono8':
59                img = cv2.cvtColor(img, cv2.COLOR_GRAY2BGR)
60            # if it's bgr8 or other 8-bit bgr that cv2 can save, we just use it
61
62            ts_ms = int(time.time() * 1000)
63            out_path = self.save_dir / f'frame_{ts_ms}.png'
64
65            ok = cv2.imwrite(str(out_path), img)
66            if ok:
67                self.get_logger().info(
68                    f'Saved image: {out_path}  ({img.shape[1]}x{img.shape[0]})'
69                )
70                self._saved = True
71                # shut down once we got exactly one frame
72                # destroy node first, then shutdown rclpy
73                self.destroy_node()
74                if rclpy.ok():
75                    rclpy.shutdown()
76            else:
77                self.get_logger().error(f'cv2.imwrite failed: {out_path}')
78        except Exception as e:
79            self.get_logger().error(f'Failed to decode / save image: {e}')
80
81
82def main():
83    rclpy.init()
84    node = SaveOneRawPy()
85    rclpy.spin(node)
86    # in case the node was already destroyed in the callback
87    if rclpy.ok():
88        try:
89            node.destroy_node()
90        except Exception:
91            pass
92        rclpy.shutdown()
93
94
95if __name__ == '__main__':
96    main()

6.1.12 相机推流示例集

该示例集提供了多种相机数据订阅和处理功能,支持深度相机、双目相机和单目相机的数据流订阅。
这些相机数据订阅 example 并没有实际的业务用途, 仅提供相机数据基础信息的打印;若您比较熟悉 ros2 使用,会发现 ros2 topic echo + ros2 topic hz 也能够实现 example 提供的功能。 您可以选择快速查阅 SDK 接口手册中话题列表直接快速进入自己模块的开发,也可以使用相机 example 作为脚手架加入自己的业务逻辑。 我们发布的传感器数据均为未经预处理(去畸变等)的原始数据,如果您需要查询传感器的详细信息(如分辨率、焦距等),请关注相应的内参(camera_info)话题。

深度相机数据订阅

该示例中用到了echo_camera_rgbd,通过订阅/aima/hal/sensor/rgbd_head_front/话题来接收机器人的深度相机数据,支持深度点云、深度图、RGB图、压缩RGB图和相机内参等多种数据类型。

功能特点:

  • 支持多种数据类型订阅(深度点云、深度图、RGB图、压缩图、相机内参)

  • 实时FPS统计和数据显示

  • 支持RGB图视频录制功能(TBD, 请参考C++示例)

  • 可配置的topic类型选择

支持的数据类型:

  • depth_pointcloud: 深度点云数据 (sensor_msgs/PointCloud2)

  • depth_image: 深度图像 (sensor_msgs/Image)

  • rgb_image: RGB图像 (sensor_msgs/Image)

  • rgb_image_compressed: 压缩RGB图像 (sensor_msgs/CompressedImage)

  • camera_info: 相机内参 (sensor_msgs/CameraInfo)

  1#!/usr/bin/env python3
  2"""
  3Head depth camera multi-topic subscription example
  4
  5Supports selecting the topic type to subscribe via startup parameter --ros-args -p topic_type:=<type>:
  6  - depth_pointcloud: Depth point cloud (sensor_msgs/PointCloud2)
  7  - depth_image: Depth image (sensor_msgs/Image)
  8  - depth_camera_info: Camera intrinsic parameters (sensor_msgs/CameraInfo)
  9  - rgb_image: RGB image (sensor_msgs/Image)
 10  - rgb_image_compressed: RGB compressed image (sensor_msgs/CompressedImage)
 11  - rgb_camera_info: Camera intrinsic parameters (sensor_msgs/CameraInfo)
 12
 13Example:
 14  ros2 run py_examples echo_camera_rgbd --ros-args -p topic_type:=depth_pointcloud
 15  ros2 run py_examples echo_camera_rgbd --ros-args -p topic_type:=rgb_image
 16  ros2 run py_examples echo_camera_rgbd --ros-args -p topic_type:=rgb_camera_info
 17
 18Default topic_type is rgb_image
 19"""
 20
 21import rclpy
 22from rclpy.node import Node
 23from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy
 24from sensor_msgs.msg import Image, CompressedImage, CameraInfo, PointCloud2
 25from collections import deque
 26import time
 27
 28
 29class CameraTopicEcho(Node):
 30    def __init__(self):
 31        super().__init__('camera_topic_echo')
 32
 33        # Select the topic type to subscribe
 34        self.declare_parameter('topic_type', 'rgb_image')
 35        self.declare_parameter('dump_video_path', '')
 36
 37        self.topic_type = self.get_parameter('topic_type').value
 38        self.dump_video_path = self.get_parameter('dump_video_path').value
 39
 40        # SensorDataQoS: BEST_EFFORT + VOLATILE
 41        qos = QoSProfile(
 42            reliability=QoSReliabilityPolicy.BEST_EFFORT,
 43            history=QoSHistoryPolicy.KEEP_LAST,
 44            depth=5
 45        )
 46
 47        # Create different subscribers based on topic_type
 48        if self.topic_type == "depth_pointcloud":
 49            self.topic_name = "/aima/hal/sensor/rgbd_head_front/depth_pointcloud"
 50            self.sub_pointcloud = self.create_subscription(
 51                PointCloud2, self.topic_name, self.cb_pointcloud, qos)
 52            self.get_logger().info(
 53                f"✅ Subscribing PointCloud2: {self.topic_name}")
 54
 55        elif self.topic_type == "depth_image":
 56            self.topic_name = "/aima/hal/sensor/rgbd_head_front/depth_image"
 57            self.sub_image = self.create_subscription(
 58                Image, self.topic_name, self.cb_image, qos)
 59            self.get_logger().info(
 60                f"✅ Subscribing Depth Image: {self.topic_name}")
 61
 62        elif self.topic_type == "rgb_image":
 63            self.topic_name = "/aima/hal/sensor/rgbd_head_front/rgb_image"
 64            self.sub_image = self.create_subscription(
 65                Image, self.topic_name, self.cb_image, qos)
 66            self.get_logger().info(
 67                f"✅ Subscribing RGB Image: {self.topic_name}")
 68            if self.dump_video_path:
 69                self.get_logger().info(
 70                    f"📝 Will dump received images to video: {self.dump_video_path}")
 71
 72        elif self.topic_type == "rgb_image_compressed":
 73            self.topic_name = "/aima/hal/sensor/rgbd_head_front/rgb_image/compressed"
 74            self.sub_compressed = self.create_subscription(
 75                CompressedImage, self.topic_name, self.cb_compressed, qos)
 76            self.get_logger().info(
 77                f"✅ Subscribing CompressedImage: {self.topic_name}")
 78
 79        elif self.topic_type == "rgb_camera_info":
 80            self.topic_name = "/aima/hal/sensor/rgbd_head_front/rgb_camera_info"
 81            # RGB-D CameraInfo is different with other cameras. The best_effort + volatile QoS is enough for 10Hz rgb_camera_info
 82            self.sub_camerainfo = self.create_subscription(
 83                CameraInfo, self.topic_name, self.cb_camerainfo, qos)
 84            self.get_logger().info(
 85                f"✅ Subscribing RGB CameraInfo: {self.topic_name}")
 86
 87        elif self.topic_type == "depth_camera_info":
 88            self.topic_name = "/aima/hal/sensor/rgbd_head_front/depth_camera_info"
 89            # RGB-D CameraInfo is different with other cameras. The best_effort + volatile QoS is enough for 10Hz depth_camera_info
 90            self.sub_camerainfo = self.create_subscription(
 91                CameraInfo, self.topic_name, self.cb_camerainfo, qos)
 92            self.get_logger().info(
 93                f"✅ Subscribing Depth CameraInfo: {self.topic_name}")
 94
 95        else:
 96            self.get_logger().error(f"Unknown topic_type: {self.topic_type}")
 97            raise ValueError("Unknown topic_type")
 98
 99        # Internal state
100        self.last_print = self.get_clock().now()
101        self.print_allowed = False
102        self.arrivals = deque()
103
104    def update_arrivals(self):
105        """Calculate received FPS"""
106        now = self.get_clock().now()
107        self.arrivals.append(now)
108        while self.arrivals and (now - self.arrivals[0]).nanoseconds * 1e-9 > 1.0:
109            self.arrivals.popleft()
110
111    def get_fps(self):
112        """Get FPS"""
113        return len(self.arrivals)
114
115    def should_print(self, master=True):
116        """Control print frequency"""
117        if not master:
118            return self.print_allowed
119        now = self.get_clock().now()
120        if (now - self.last_print).nanoseconds * 1e-9 >= 1.0:
121            self.last_print = now
122            self.print_allowed = True
123        else:
124            self.print_allowed = False
125        return self.print_allowed
126
127    def cb_pointcloud(self, msg: PointCloud2):
128        """PointCloud2 callback"""
129        self.update_arrivals()
130
131        if self.should_print():
132            stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
133
134            # Format fields information
135            fields_str = " ".join(
136                [f"{f.name}({f.datatype})" for f in msg.fields])
137
138            self.get_logger().info(
139                f"🌫️ PointCloud2 received\n"
140                f"  • frame_id:        {msg.header.frame_id}\n"
141                f"  • stamp (sec):     {stamp_sec:.6f}\n"
142                f"  • width x height:  {msg.width} x {msg.height}\n"
143                f"  • point_step:      {msg.point_step}\n"
144                f"  • row_step:        {msg.row_step}\n"
145                f"  • fields:          {fields_str}\n"
146                f"  • is_bigendian:    {msg.is_bigendian}\n"
147                f"  • is_dense:        {msg.is_dense}\n"
148                f"  • data size:       {len(msg.data)}\n"
149                f"  • recv FPS (1s):   {self.get_fps():.1f}"
150            )
151
152    def cb_image(self, msg: Image):
153        """Image callback (Depth/RGB image)"""
154        self.update_arrivals()
155
156        if self.should_print():
157            stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
158            self.get_logger().info(
159                f"📸 {self.topic_type} received\n"
160                f"  • frame_id:        {msg.header.frame_id}\n"
161                f"  • stamp (sec):     {stamp_sec:.6f}\n"
162                f"  • encoding:        {msg.encoding}\n"
163                f"  • size (WxH):      {msg.width} x {msg.height}\n"
164                f"  • step (bytes/row):{msg.step}\n"
165                f"  • is_bigendian:    {msg.is_bigendian}\n"
166                f"  • recv FPS (1s):   {self.get_fps():.1f}"
167            )
168
169        # Only RGB image supports video dump
170        if self.topic_type == "rgb_image" and self.dump_video_path:
171            self.dump_image_to_video(msg)
172
173    def cb_compressed(self, msg: CompressedImage):
174        """CompressedImage callback"""
175        self.update_arrivals()
176
177        if self.should_print():
178            stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
179            self.get_logger().info(
180                f"🗜️  CompressedImage received\n"
181                f"  • frame_id:        {msg.header.frame_id}\n"
182                f"  • stamp (sec):     {stamp_sec:.6f}\n"
183                f"  • format:          {msg.format}\n"
184                f"  • data size:       {len(msg.data)}\n"
185                f"  • recv FPS (1s):   {self.get_fps():.1f}"
186            )
187
188    def cb_camerainfo(self, msg: CameraInfo):
189        """CameraInfo callback (camera intrinsic parameters)"""
190        # Camera info will only receive one frame, print it directly
191        stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
192
193        # Format D array
194        d_str = ", ".join([f"{d:.6f}" for d in msg.d])
195
196        # Format K matrix
197        k_str = ", ".join([f"{k:.6f}" for k in msg.k])
198
199        # Format P matrix
200        p_str = ", ".join([f"{p:.6f}" for p in msg.p])
201
202        self.get_logger().info(
203            f"📷 {self.topic_type} received\n"
204            f"  • frame_id:        {msg.header.frame_id}\n"
205            f"  • stamp (sec):     {stamp_sec:.6f}\n"
206            f"  • width x height:  {msg.width} x {msg.height}\n"
207            f"  • distortion_model:{msg.distortion_model}\n"
208            f"  • D: [{d_str}]\n"
209            f"  • K: [{k_str}]\n"
210            f"  • P: [{p_str}]\n"
211            f"  • binning_x: {msg.binning_x}\n"
212            f"  • binning_y: {msg.binning_y}\n"
213            f"  • roi: {{ x_offset: {msg.roi.x_offset}, y_offset: {msg.roi.y_offset}, height: {msg.roi.height}, width: {msg.roi.width}, do_rectify: {msg.roi.do_rectify} }}"
214        )
215
216    def dump_image_to_video(self, msg: Image):
217        """Video dump is only supported for RGB images"""
218        # You can add video recording functionality here
219        # Simplified in the Python version, only logs instead
220        if self.should_print(master=False):
221            self.get_logger().info(f"📝 Video dump not implemented in Python version")
222
223
224def main(args=None):
225    rclpy.init(args=args)
226    try:
227        node = CameraTopicEcho()
228        rclpy.spin(node)
229    except KeyboardInterrupt:
230        pass
231    except Exception as e:
232        print(f"Error: {e}")
233    finally:
234        if 'node' in locals():
235            node.destroy_node()
236        rclpy.shutdown()
237
238
239if __name__ == '__main__':
240    main()

使用说明:

  1. 订阅深度点云数据:

    ros2 run py_examples echo_camera_rgbd --ros-args -p topic_type:=depth_pointcloud
    
  2. 订阅RGB图像数据:

    ros2 run py_examples echo_camera_rgbd --ros-args -p topic_type:=rgb_image
    
  3. 订阅相机内参:

    ros2 run py_examples echo_camera_rgbd --ros-args -p topic_type:=rgb_camera_info
    ros2 run py_examples echo_camera_rgbd --ros-args -p topic_type:=depth_camera_info
    
  4. 录制RGB视频:

    # dump_video_path的值可改为其他路径, 注意提前创建该文件所在目录才能保存
    ros2 run py_examples echo_camera_rgbd --ros-args -p topic_type:=rgb_image -p dump_video_path:=$PWD/output.avi
    

双目相机数据订阅

该示例中用到了echo_camera_stereo,通过订阅/aima/hal/sensor/stereo_head_front_*/话题来接收机器人的双目相机数据,支持左右相机的RGB图、压缩图和相机内参数据。

功能特点:

  • 支持左右相机独立数据订阅

  • 实时FPS统计和数据显示

  • 支持RGB图视频录制功能(TBD, 请参考C++示例)

  • 可配置的相机选择(左/右)

支持的数据类型:

  • left_rgb_image: 左相机RGB图像 (sensor_msgs/Image)

  • left_rgb_image_compressed: 左相机压缩RGB图像 (sensor_msgs/CompressedImage)

  • left_camera_info: 左相机内参 (sensor_msgs/CameraInfo)

  • right_rgb_image: 右相机RGB图像 (sensor_msgs/Image)

  • right_rgb_image_compressed: 右相机压缩RGB图像 (sensor_msgs/CompressedImage)

  • right_camera_info: 右相机内参 (sensor_msgs/CameraInfo)

  1#!/usr/bin/env python3
  2"""
  3Head stereo camera multi-topic subscription example
  4
  5Supports selecting the topic type to subscribe via startup parameter --ros-args -p topic_type:=<type>:
  6  - left_rgb_image: Left camera RGB image (sensor_msgs/Image)
  7  - left_rgb_image_compressed: Left camera RGB compressed image (sensor_msgs/CompressedImage)
  8  - left_camera_info: Left camera intrinsic parameters (sensor_msgs/CameraInfo)
  9  - right_rgb_image: Right camera RGB image (sensor_msgs/Image)
 10  - right_rgb_image_compressed: Right camera RGB compressed image (sensor_msgs/CompressedImage)
 11  - right_camera_info: Right camera intrinsic parameters (sensor_msgs/CameraInfo)
 12
 13Example:
 14  ros2 run py_examples echo_camera_stereo --ros-args -p topic_type:=left_rgb_image
 15  ros2 run py_examples echo_camera_stereo --ros-args -p topic_type:=right_rgb_image
 16  ros2 run py_examples echo_camera_stereo --ros-args -p topic_type:=left_camera_info
 17
 18Default topic_type is left_rgb_image
 19"""
 20
 21import rclpy
 22from rclpy.node import Node
 23from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSDurabilityPolicy, QoSHistoryPolicy
 24from sensor_msgs.msg import Image, CompressedImage, CameraInfo
 25from collections import deque
 26import time
 27
 28
 29class StereoCameraTopicEcho(Node):
 30    def __init__(self):
 31        super().__init__('stereo_camera_topic_echo')
 32
 33        # Select the topic type to subscribe
 34        self.declare_parameter('topic_type', 'left_rgb_image')
 35        self.declare_parameter('dump_video_path', '')
 36
 37        self.topic_type = self.get_parameter('topic_type').value
 38        self.dump_video_path = self.get_parameter('dump_video_path').value
 39
 40        # Set QoS parameters - use sensor data QoS
 41        qos = QoSProfile(
 42            reliability=QoSReliabilityPolicy.BEST_EFFORT,
 43            history=QoSHistoryPolicy.KEEP_LAST,
 44            depth=5,
 45            durability=QoSDurabilityPolicy.VOLATILE
 46        )
 47
 48        # Create different subscribers based on topic_type
 49        if self.topic_type == "left_rgb_image":
 50            self.topic_name = "/aima/hal/sensor/stereo_head_front_left/rgb_image"
 51            self.sub_image = self.create_subscription(
 52                Image, self.topic_name, self.cb_image, qos)
 53            self.get_logger().info(
 54                f"✅ Subscribing Left RGB Image: {self.topic_name}")
 55            if self.dump_video_path:
 56                self.get_logger().info(
 57                    f"📝 Will dump received images to video: {self.dump_video_path}")
 58
 59        elif self.topic_type == "left_rgb_image_compressed":
 60            self.topic_name = "/aima/hal/sensor/stereo_head_front_left/rgb_image/compressed"
 61            self.sub_compressed = self.create_subscription(
 62                CompressedImage, self.topic_name, self.cb_compressed, qos)
 63            self.get_logger().info(
 64                f"✅ Subscribing Left CompressedImage: {self.topic_name}")
 65
 66        elif self.topic_type == "left_camera_info":
 67            self.topic_name = "/aima/hal/sensor/stereo_head_front_left/camera_info"
 68            # CameraInfo subscription must use reliable + transient_local QoS to receive historical messages (even if only one frame is published)
 69            camera_qos = QoSProfile(
 70                reliability=QoSReliabilityPolicy.RELIABLE,
 71                history=QoSHistoryPolicy.KEEP_LAST,
 72                depth=1,
 73                durability=QoSDurabilityPolicy.TRANSIENT_LOCAL
 74            )
 75            self.sub_camerainfo = self.create_subscription(
 76                CameraInfo, self.topic_name, self.cb_camerainfo, camera_qos)
 77            self.get_logger().info(
 78                f"✅ Subscribing Left CameraInfo (with transient_local): {self.topic_name}")
 79
 80        elif self.topic_type == "right_rgb_image":
 81            self.topic_name = "/aima/hal/sensor/stereo_head_front_right/rgb_image"
 82            self.sub_image = self.create_subscription(
 83                Image, self.topic_name, self.cb_image, qos)
 84            self.get_logger().info(
 85                f"✅ Subscribing Right RGB Image: {self.topic_name}")
 86            if self.dump_video_path:
 87                self.get_logger().info(
 88                    f"📝 Will dump received images to video: {self.dump_video_path}")
 89
 90        elif self.topic_type == "right_rgb_image_compressed":
 91            self.topic_name = "/aima/hal/sensor/stereo_head_front_right/rgb_image/compressed"
 92            self.sub_compressed = self.create_subscription(
 93                CompressedImage, self.topic_name, self.cb_compressed, qos)
 94            self.get_logger().info(
 95                f"✅ Subscribing Right CompressedImage: {self.topic_name}")
 96
 97        elif self.topic_type == "right_camera_info":
 98            self.topic_name = "/aima/hal/sensor/stereo_head_front_right/camera_info"
 99            # CameraInfo subscription must use reliable + transient_local QoS to receive historical messages (even if only one frame is published)
100            camera_qos = QoSProfile(
101                reliability=QoSReliabilityPolicy.RELIABLE,
102                history=QoSHistoryPolicy.KEEP_LAST,
103                depth=1,
104                durability=QoSDurabilityPolicy.TRANSIENT_LOCAL
105            )
106            self.sub_camerainfo = self.create_subscription(
107                CameraInfo, self.topic_name, self.cb_camerainfo, camera_qos)
108            self.get_logger().info(
109                f"✅ Subscribing Right CameraInfo (with transient_local): {self.topic_name}")
110
111        else:
112            self.get_logger().error(f"Unknown topic_type: {self.topic_type}")
113            raise ValueError("Unknown topic_type")
114
115        # Internal state
116        self.last_print = self.get_clock().now()
117        self.print_allowed = False
118        self.arrivals = deque()
119
120    def update_arrivals(self):
121        """Calculate received FPS"""
122        now = self.get_clock().now()
123        self.arrivals.append(now)
124        while self.arrivals and (now - self.arrivals[0]).nanoseconds * 1e-9 > 1.0:
125            self.arrivals.popleft()
126
127    def get_fps(self):
128        """Get FPS"""
129        return len(self.arrivals)
130
131    def should_print(self, master=True):
132        """Control print frequency"""
133        if not master:
134            return self.print_allowed
135        now = self.get_clock().now()
136        if (now - self.last_print).nanoseconds * 1e-9 >= 1.0:
137            self.last_print = now
138            self.print_allowed = True
139        else:
140            self.print_allowed = False
141        return self.print_allowed
142
143    def cb_image(self, msg: Image):
144        """Image callback (Left/Right camera RGB image)"""
145        self.update_arrivals()
146
147        if self.should_print():
148            stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
149            self.get_logger().info(
150                f"📸 {self.topic_type} received\n"
151                f"  • frame_id:        {msg.header.frame_id}\n"
152                f"  • stamp (sec):     {stamp_sec:.6f}\n"
153                f"  • encoding:        {msg.encoding}\n"
154                f"  • size (WxH):      {msg.width} x {msg.height}\n"
155                f"  • step (bytes/row):{msg.step}\n"
156                f"  • is_bigendian:    {msg.is_bigendian}\n"
157                f"  • recv FPS (1s):   {self.get_fps():.1f}"
158            )
159
160        # Only RGB images support video dump
161        if (self.topic_type in ["left_rgb_image", "right_rgb_image"]) and self.dump_video_path:
162            self.dump_image_to_video(msg)
163
164    def cb_compressed(self, msg: CompressedImage):
165        """CompressedImage callback (Left/Right camera RGB compressed image)"""
166        self.update_arrivals()
167
168        if self.should_print():
169            stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
170            self.get_logger().info(
171                f"🗜️  {self.topic_type} received\n"
172                f"  • frame_id:        {msg.header.frame_id}\n"
173                f"  • stamp (sec):     {stamp_sec:.6f}\n"
174                f"  • format:          {msg.format}\n"
175                f"  • data size:       {len(msg.data)}\n"
176                f"  • recv FPS (1s):   {self.get_fps():.1f}"
177            )
178
179    def cb_camerainfo(self, msg: CameraInfo):
180        """CameraInfo callback (Left/Right camera intrinsic parameters)"""
181        # Camera info will only receive one frame, print it directly
182        stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
183
184        # Format D array
185        d_str = ", ".join([f"{d:.6f}" for d in msg.d])
186
187        # Format K matrix
188        k_str = ", ".join([f"{k:.6f}" for k in msg.k])
189
190        # Format P matrix
191        p_str = ", ".join([f"{p:.6f}" for p in msg.p])
192
193        self.get_logger().info(
194            f"📷 {self.topic_type} received\n"
195            f"  • frame_id:        {msg.header.frame_id}\n"
196            f"  • stamp (sec):     {stamp_sec:.6f}\n"
197            f"  • width x height:  {msg.width} x {msg.height}\n"
198            f"  • distortion_model:{msg.distortion_model}\n"
199            f"  • D: [{d_str}]\n"
200            f"  • K: [{k_str}]\n"
201            f"  • P: [{p_str}]\n"
202            f"  • binning_x: {msg.binning_x}\n"
203            f"  • binning_y: {msg.binning_y}\n"
204            f"  • roi: {{ x_offset: {msg.roi.x_offset}, y_offset: {msg.roi.y_offset}, height: {msg.roi.height}, width: {msg.roi.width}, do_rectify: {msg.roi.do_rectify} }}"
205        )
206
207    def dump_image_to_video(self, msg: Image):
208        """Video dump is only supported for RGB images"""
209        # You can add video recording functionality here
210        # Simplified in the Python version, only logs instead
211        if self.should_print(master=False):
212            self.get_logger().info(f"📝 Video dump not implemented in Python version")
213
214
215def main(args=None):
216    rclpy.init(args=args)
217    try:
218        node = StereoCameraTopicEcho()
219        rclpy.spin(node)
220    except KeyboardInterrupt:
221        pass
222    except Exception as e:
223        print(f"Error: {e}")
224    finally:
225        if 'node' in locals():
226            node.destroy_node()
227        rclpy.shutdown()
228
229
230if __name__ == '__main__':
231    main()

使用说明:

  1. 订阅左相机RGB图像:

    ros2 run py_examples echo_camera_stereo --ros-args -p topic_type:=left_rgb_image
    
  2. 订阅右相机RGB图像:

    ros2 run py_examples echo_camera_stereo --ros-args -p topic_type:=right_rgb_image
    
  3. 订阅左相机内参:

    ros2 run py_examples echo_camera_stereo --ros-args -p topic_type:=left_camera_info
    
  4. 录制左相机视频:

    # dump_video_path的值可改为其他路径, 注意提前创建该文件所在目录才能保存
    ros2 run py_examples echo_camera_stereo --ros-args -p topic_type:=left_rgb_image -p dump_video_path:=$PWD/left_camera.avi
    

头部后置单目相机数据订阅

该示例中用到了echo_camera_head_rear,通过订阅/aima/hal/sensor/rgb_head_rear/话题来接收机器人的头部后置单目相机数据,支持RGB图、压缩图和相机内参数据。

功能特点:

  • 支持头部后置相机数据订阅

  • 实时FPS统计和数据显示

  • 支持RGB图视频录制功能及把手遮挡区域mask处理(TBD, 请参考C++示例)

  • 可配置的topic类型选择

支持的数据类型:

  • rgb_image: RGB图像 (sensor_msgs/Image)

  • rgb_image_compressed: 压缩RGB图像 (sensor_msgs/CompressedImage)

  • camera_info: 相机内参 (sensor_msgs/CameraInfo)

  1#!/usr/bin/env python3
  2"""
  3Head rear monocular camera multi-topic subscription example
  4
  5Supports selecting the topic type to subscribe via startup parameter --ros-args -p topic_type:=<type>:
  6  - rgb_image: RGB image (sensor_msgs/Image)
  7  - rgb_image_compressed: RGB compressed image (sensor_msgs/CompressedImage)
  8  - camera_info: Camera intrinsic parameters (sensor_msgs/CameraInfo)
  9
 10Example:
 11  ros2 run py_examples echo_camera_head_rear --ros-args -p topic_type:=rgb_image
 12  ros2 run py_examples echo_camera_head_rear --ros-args -p topic_type:=rgb_image_compressed
 13  ros2 run py_examples echo_camera_head_rear --ros-args -p topic_type:=camera_info
 14
 15Default topic_type is rgb_image
 16"""
 17
 18import rclpy
 19from rclpy.node import Node
 20from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSDurabilityPolicy, QoSHistoryPolicy
 21from sensor_msgs.msg import Image, CompressedImage, CameraInfo
 22from collections import deque
 23import time
 24import os
 25import cv2
 26
 27
 28class HeadRearCameraTopicEcho(Node):
 29    def __init__(self):
 30        super().__init__('head_rear_camera_topic_echo')
 31
 32        # Select the topic type to subscribe
 33        self.declare_parameter('topic_type', 'rgb_image')
 34        self.declare_parameter('dump_video_path', '')
 35        self.declare_parameter('with_mask', False)
 36
 37        self.topic_type = self.get_parameter('topic_type').value
 38        self.dump_video_path = self.get_parameter('dump_video_path').value
 39        self.with_mask = self.get_parameter('with_mask').value
 40        self.mask_image = None
 41
 42        # Set QoS parameters - use sensor data QoS
 43        qos = QoSProfile(
 44            reliability=QoSReliabilityPolicy.BEST_EFFORT,
 45            history=QoSHistoryPolicy.KEEP_LAST,
 46            depth=5,
 47            durability=QoSDurabilityPolicy.VOLATILE
 48        )
 49
 50        if self.with_mask and self.dump_video_path:
 51            mask_path = os.path.join(os.path.dirname(
 52                __file__), 'data', 'rgb_head_rear_mask.png')
 53            self.mask_image = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)
 54            if self.mask_image is None:
 55                self.get_logger().error(
 56                    f"Failed to load mask file: {mask_path}")
 57                raise ValueError("Failed to load mask file")
 58
 59        # Create different subscribers based on topic_type
 60        if self.topic_type == "rgb_image":
 61            self.topic_name = "/aima/hal/sensor/rgb_head_rear/rgb_image"
 62            self.sub_image = self.create_subscription(
 63                Image, self.topic_name, self.cb_image, qos)
 64            self.get_logger().info(
 65                f"✅ Subscribing RGB Image: {self.topic_name}")
 66            if self.dump_video_path:
 67                mask_state = "with mask" if self.with_mask else "without mask"
 68                self.get_logger().info(
 69                    f"📝 Will dump received images {mask_state} to video: {self.dump_video_path}")
 70
 71        elif self.topic_type == "rgb_image_compressed":
 72            self.topic_name = "/aima/hal/sensor/rgb_head_rear/rgb_image/compressed"
 73            self.sub_compressed = self.create_subscription(
 74                CompressedImage, self.topic_name, self.cb_compressed, qos)
 75            self.get_logger().info(
 76                f"✅ Subscribing CompressedImage: {self.topic_name}")
 77
 78        elif self.topic_type == "camera_info":
 79            self.topic_name = "/aima/hal/sensor/rgb_head_rear/camera_info"
 80            # CameraInfo subscription must use reliable + transient_local QoS to receive historical messages (even if only one frame is published)
 81            camera_qos = QoSProfile(
 82                reliability=QoSReliabilityPolicy.RELIABLE,
 83                history=QoSHistoryPolicy.KEEP_LAST,
 84                depth=1,
 85                durability=QoSDurabilityPolicy.TRANSIENT_LOCAL
 86            )
 87            self.sub_camerainfo = self.create_subscription(
 88                CameraInfo, self.topic_name, self.cb_camerainfo, camera_qos)
 89            self.get_logger().info(
 90                f"✅ Subscribing CameraInfo (with transient_local): {self.topic_name}")
 91
 92        else:
 93            self.get_logger().error(f"Unknown topic_type: {self.topic_type}")
 94            raise ValueError("Unknown topic_type")
 95
 96        # Internal state
 97        self.last_print = self.get_clock().now()
 98        self.print_allowed = False
 99        self.arrivals = deque()
100
101    def update_arrivals(self):
102        """Calculate received FPS"""
103        now = self.get_clock().now()
104        self.arrivals.append(now)
105        while self.arrivals and (now - self.arrivals[0]).nanoseconds * 1e-9 > 1.0:
106            self.arrivals.popleft()
107
108    def get_fps(self):
109        """Get FPS"""
110        return len(self.arrivals)
111
112    def should_print(self, master=True):
113        """Control print frequency"""
114        if not master:
115            return self.print_allowed
116        now = self.get_clock().now()
117        if (now - self.last_print).nanoseconds * 1e-9 >= 1.0:
118            self.last_print = now
119            self.print_allowed = True
120        else:
121            self.print_allowed = False
122        return self.print_allowed
123
124    def cb_image(self, msg: Image):
125        """Image callback (RGB image)"""
126        self.update_arrivals()
127
128        if self.should_print():
129            stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
130            self.get_logger().info(
131                f"📸 {self.topic_type} received\n"
132                f"  • frame_id:        {msg.header.frame_id}\n"
133                f"  • stamp (sec):     {stamp_sec:.6f}\n"
134                f"  • encoding:        {msg.encoding}\n"
135                f"  • size (WxH):      {msg.width} x {msg.height}\n"
136                f"  • step (bytes/row):{msg.step}\n"
137                f"  • is_bigendian:    {msg.is_bigendian}\n"
138                f"  • recv FPS (1s):   {self.get_fps():.1f}"
139            )
140
141        # Only RGB image supports video dump
142        if self.topic_type == "rgb_image" and self.dump_video_path:
143            self.dump_image_to_video(msg)
144
145    def cb_compressed(self, msg: CompressedImage):
146        """CompressedImage callback (RGB compressed image)"""
147        self.update_arrivals()
148
149        if self.should_print():
150            stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
151            self.get_logger().info(
152                f"🗜️  {self.topic_type} received\n"
153                f"  • frame_id:        {msg.header.frame_id}\n"
154                f"  • stamp (sec):     {stamp_sec:.6f}\n"
155                f"  • format:          {msg.format}\n"
156                f"  • data size:       {len(msg.data)}\n"
157                f"  • recv FPS (1s):   {self.get_fps():.1f}"
158            )
159
160    def cb_camerainfo(self, msg: CameraInfo):
161        """CameraInfo callback (camera intrinsic parameters)"""
162        # Camera info will only receive one frame, print it directly
163        stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
164
165        # Format D array
166        d_str = ", ".join([f"{d:.6f}" for d in msg.d])
167
168        # Format K matrix
169        k_str = ", ".join([f"{k:.6f}" for k in msg.k])
170
171        # Format P matrix
172        p_str = ", ".join([f"{p:.6f}" for p in msg.p])
173
174        self.get_logger().info(
175            f"📷 {self.topic_type} received\n"
176            f"  • frame_id:        {msg.header.frame_id}\n"
177            f"  • stamp (sec):     {stamp_sec:.6f}\n"
178            f"  • width x height:  {msg.width} x {msg.height}\n"
179            f"  • distortion_model:{msg.distortion_model}\n"
180            f"  • D: [{d_str}]\n"
181            f"  • K: [{k_str}]\n"
182            f"  • P: [{p_str}]\n"
183            f"  • binning_x: {msg.binning_x}\n"
184            f"  • binning_y: {msg.binning_y}\n"
185            f"  • roi: {{ x_offset: {msg.roi.x_offset}, y_offset: {msg.roi.y_offset}, height: {msg.roi.height}, width: {msg.roi.width}, do_rectify: {msg.roi.do_rectify} }}"
186        )
187
188    def dump_image_to_video(self, msg: Image):
189        """Video dump is only supported for RGB images"""
190        # You can add video recording functionality here
191        # Simplified in the Python version, only logs instead
192        # Note: Refer to cpp implementation, get cv images by cv_bridge first,
193        # then you can use 'image[self.mask_image == 0] = 0' to mask them and
194        # finally use VideoWriter to save them as video
195        if self.should_print(master=False):
196            self.get_logger().info(f"📝 Video dump not implemented in Python version")
197
198
199def main(args=None):
200    rclpy.init(args=args)
201    try:
202        node = HeadRearCameraTopicEcho()
203        rclpy.spin(node)
204    except KeyboardInterrupt:
205        pass
206    except Exception as e:
207        print(f"Error: {e}")
208    finally:
209        if 'node' in locals():
210            node.destroy_node()
211        rclpy.shutdown()
212
213
214if __name__ == '__main__':
215    main()

使用说明:

  1. 订阅RGB图像数据:

    ros2 run py_examples echo_camera_head_rear --ros-args -p topic_type:=rgb_image
    
  2. 订阅压缩图像数据:

    ros2 run py_examples echo_camera_head_rear --ros-args -p topic_type:=rgb_image_compressed
    
  3. 订阅相机内参:

    ros2 run py_examples echo_camera_head_rear --ros-args -p topic_type:=camera_info
    
  4. 录制视频:

    # dump_video_path的值可改为其他路径, 注意提前创建该文件所在目录才能保存
    ros2 run py_examples echo_camera_head_rear --ros-args -p topic_type:=rgb_image -p dump_video_path:=$PWD/rear_camera.avi
    

应用场景:

  • 人脸识别和追踪

  • 目标检测和识别

  • 视觉SLAM

  • 图像处理和计算机视觉算法开发

  • 机器人视觉导航

6.1.13 头部触摸传感器数据订阅

该示例中用到了 echo_head_touch_sensor,通过订阅/aima/hal/sensor/touch_head话题来接收机器人的头部触摸传感器的反馈数据。

功能特点:

  • 订阅了头部传感器的反馈数据,当头部被触摸时候,输出会从 IDLE->TOUCH

 1#!/usr/bin/env python3
 2"""
 3Head touch state subscription example
 4"""
 5
 6import rclpy
 7from rclpy.node import Node
 8from aimdk_msgs.msg import TouchState
 9
10
11class TouchStateSubscriber(Node):
12    def __init__(self):
13        super().__init__('touch_state_subscriber')
14
15        # touch event types
16        self.event_type_map = {
17            TouchState.UNKNOWN: "UNKNOWN",
18            TouchState.IDLE: "IDLE",
19            TouchState.TOUCH: "TOUCH",
20            TouchState.SLIDE: "SLIDE",
21            TouchState.PAT_ONCE: "PAT_ONCE",
22            TouchState.PAT_TWICE: "PAT_TWICE",
23            TouchState.PAT_TRIPLE: "PAT_TRIPLE"
24        }
25
26        # create subscriber
27        self.subscription = self.create_subscription(
28            TouchState,
29            '/aima/hal/sensor/touch_head',
30            self.touch_callback,
31            10
32        )
33
34        self.get_logger().info(
35            'TouchState subscriber started, listening to /aima/hal/sensor/touch_head')
36
37    def touch_callback(self, msg):
38        event_str = self.event_type_map.get(
39            msg.event_type, f"INVALID({msg.event_type})")
40
41        self.get_logger().info(f'Timestamp: {msg.header.stamp.sec}.{msg.header.stamp.nanosec:09d}, '
42                               f'Event: {event_str} ({msg.event_type})')
43
44
45def main(args=None):
46    rclpy.init(args=args)
47    node = TouchStateSubscriber()
48    rclpy.spin(node)
49    node.destroy_node()
50    rclpy.shutdown()
51
52
53if __name__ == '__main__':
54    main()

使用说明:

ros2 run py_examples echo_head_touch_sensor

输出示例:

[INFO] [1769420383.315173538] [touch_state_subscriber]: Timestamp: 1769420394.129927670, Event: IDLE (1)
[INFO] [1769420383.324978563] [touch_state_subscriber]: Timestamp: 1769420394.139941215, Event: IDLE (1)
[INFO] [1769420383.335265681] [touch_state_subscriber]: Timestamp: 1769420394.149990634, Event: TOUCH (2)
[INFO] [1769420383.344826732] [touch_state_subscriber]: Timestamp: 1769420394.159926892, Event: TOUCH (2)

6.1.14 激光雷达数据订阅

该示例中用到了echo_lidar_data,通过订阅/aima/hal/sensor/lidar_chest_front/话题来接收机器人的激光雷达数据,支持点云数据和IMU数据两种数据类型。

功能特点:

  • 支持激光雷达点云数据订阅

  • 支持激光雷达IMU数据订阅

  • 实时FPS统计和数据显示

  • 可配置的topic类型选择

  • 详细的数据字段信息输出

支持的数据类型:

  • PointCloud2: 激光雷达点云数据 (sensor_msgs/PointCloud2)

  • Imu: 激光雷达IMU数据 (sensor_msgs/Imu)

技术实现:

  • 使用SensorDataQoS配置(BEST_EFFORT + VOLATILE

  • 支持点云字段信息解析和显示

  • 支持IMU四元数、角速度和线性加速度数据

  • 提供详细的调试日志输出

应用场景:

  • 激光雷达数据采集和分析

  • 点云数据处理和可视化

  • 机器人导航和定位

  • SLAM算法开发

  • 环境感知和建图

  1#!/usr/bin/env python3
  2"""
  3Chest LiDAR data subscription example
  4
  5Supports subscribing to the following topics:
  6  1. /aima/hal/sensor/lidar_chest_front/lidar_pointcloud
  7     - Data type: sensor_msgs/PointCloud2
  8     - frame_id: lidar_chest_front
  9     - child_frame_id: /
 10     - Content: LiDAR point cloud data
 11  2. /aima/hal/sensor/lidar_chest_front/imu
 12     - Data type: sensor_msgs/Imu
 13     - frame_id: lidar_imu_chest_front
 14     - Content: LiDAR IMU data
 15
 16You can select the topic type to subscribe via startup parameter --ros-args -p topic_type:=<type>:
 17  - pointcloud: subscribe to LiDAR point cloud
 18  - imu: subscribe to LiDAR IMU
 19Default topic_type is pointcloud
 20
 21Examples:
 22  ros2 run py_examples echo_lidar_data --ros-args -p topic_type:=pointcloud
 23  ros2 run py_examples echo_lidar_data --ros-args -p topic_type:=imu
 24"""
 25
 26import rclpy
 27from rclpy.node import Node
 28from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy
 29from sensor_msgs.msg import PointCloud2, Imu
 30from collections import deque
 31import time
 32
 33
 34class LidarChestEcho(Node):
 35    def __init__(self):
 36        super().__init__('lidar_chest_echo')
 37
 38        # Select the topic type to subscribe
 39        self.declare_parameter('topic_type', 'pointcloud')
 40        self.topic_type = self.get_parameter('topic_type').value
 41
 42        # SensorDataQoS: BEST_EFFORT + VOLATILE
 43        qos = QoSProfile(
 44            reliability=QoSReliabilityPolicy.BEST_EFFORT,
 45            history=QoSHistoryPolicy.KEEP_LAST,
 46            depth=5
 47        )
 48
 49        # Create different subscribers based on topic_type
 50        if self.topic_type == "pointcloud":
 51            self.topic_name = "/aima/hal/sensor/lidar_chest_front/lidar_pointcloud"
 52            self.sub_pointcloud = self.create_subscription(
 53                PointCloud2, self.topic_name, self.cb_pointcloud, qos)
 54            self.get_logger().info(
 55                f"✅ Subscribing LIDAR PointCloud2: {self.topic_name}")
 56
 57        elif self.topic_type == "imu":
 58            self.topic_name = "/aima/hal/sensor/lidar_chest_front/imu"
 59            self.sub_imu = self.create_subscription(
 60                Imu, self.topic_name, self.cb_imu, qos)
 61            self.get_logger().info(
 62                f"✅ Subscribing LIDAR IMU: {self.topic_name}")
 63
 64        else:
 65            self.get_logger().error(f"Unknown topic_type: {self.topic_type}")
 66            raise ValueError("Unknown topic_type")
 67
 68        # Internal state
 69        self.last_print = self.get_clock().now()
 70        self.arrivals = deque()
 71
 72    def update_arrivals(self):
 73        """Calculate received FPS"""
 74        now = self.get_clock().now()
 75        self.arrivals.append(now)
 76        while self.arrivals and (now - self.arrivals[0]).nanoseconds * 1e-9 > 1.0:
 77            self.arrivals.popleft()
 78
 79    def get_fps(self):
 80        """Get FPS"""
 81        return len(self.arrivals)
 82
 83    def should_print(self):
 84        """Control print frequency"""
 85        now = self.get_clock().now()
 86        if (now - self.last_print).nanoseconds * 1e-9 >= 1.0:
 87            self.last_print = now
 88            return True
 89        return False
 90
 91    def cb_pointcloud(self, msg: PointCloud2):
 92        """PointCloud2 callback (LiDAR point cloud)"""
 93        self.update_arrivals()
 94
 95        if self.should_print():
 96            stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
 97
 98            # Format fields info
 99            fields_str = " ".join(
100                [f"{f.name}({f.datatype})" for f in msg.fields])
101
102            self.get_logger().info(
103                f"🟢 LIDAR PointCloud2 received\n"
104                f"  • frame_id:        {msg.header.frame_id}\n"
105                f"  • stamp (sec):     {stamp_sec:.6f}\n"
106                f"  • width x height:  {msg.width} x {msg.height}\n"
107                f"  • point_step:      {msg.point_step}\n"
108                f"  • row_step:        {msg.row_step}\n"
109                f"  • fields:          {fields_str}\n"
110                f"  • is_bigendian:    {msg.is_bigendian}\n"
111                f"  • is_dense:        {msg.is_dense}\n"
112                f"  • data size:       {len(msg.data)}\n"
113                f"  • recv FPS (1s):   {self.get_fps():1.1f}"
114            )
115
116    def cb_imu(self, msg: Imu):
117        """IMU callback (LiDAR IMU)"""
118        self.update_arrivals()
119
120        if self.should_print():
121            stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
122
123            self.get_logger().info(
124                f"🟢 LIDAR IMU received\n"
125                f"  • frame_id:        {msg.header.frame_id}\n"
126                f"  • stamp (sec):     {stamp_sec:.6f}\n"
127                f"  • orientation:     [{msg.orientation.x:.6f}, {msg.orientation.y:.6f}, {msg.orientation.z:.6f}, {msg.orientation.w:.6f}]\n"
128                f"  • angular_velocity:[{msg.angular_velocity.x:.6f}, {msg.angular_velocity.y:.6f}, {msg.angular_velocity.z:.6f}]\n"
129                f"  • linear_accel:    [{msg.linear_acceleration.x:.6f}, {msg.linear_acceleration.y:.6f}, {msg.linear_acceleration.z:.6f}]\n"
130                f"  • recv FPS (1s):   {self.get_fps():.1f}"
131            )
132
133
134def main(args=None):
135    rclpy.init(args=args)
136    try:
137        node = LidarChestEcho()
138        rclpy.spin(node)
139    except KeyboardInterrupt:
140        pass
141    except Exception as e:
142        print(f"Error: {e}")
143    finally:
144        if 'node' in locals():
145            node.destroy_node()
146        rclpy.shutdown()
147
148
149if __name__ == '__main__':
150    main()

使用说明:

# 订阅激光雷达点云数据
ros2 run py_examples echo_lidar_data --ros-args -p topic_type:=pointcloud

# 订阅激光雷达IMU数据
ros2 run py_examples echo_lidar_data --ros-args -p topic_type:=imu

输出示例:

[INFO] [lidar_chest_echo]: ✅ Subscribing LIDAR PointCloud2: /aima/hal/sensor/lidar_chest_front/lidar_pointcloud
[INFO] [lidar_chest_echo]: 🟢 LIDAR PointCloud2 received
  • frame_id:        lidar_chest_front
  • stamp (sec):     1234567890.123456
  • width x height:  1 x 36000
  • point_step:      16
  • row_step:        16
  • fields:          x(7) y(7) z(7) intensity(7)
  • is_bigendian:    False
  • is_dense:        True
  • data size:       576000
  • recv FPS (1s):   10.0

6.1.15 播放视频

该示例中用到了play_video,在运行节点程序前,需要先将视频上传到机器人的**交互计算单元(PC3)**上(用户可在其上创建一个用来存储视频的目录,如/var/tmp/videos/),然后将节点程序中的video_path改为需要播放视频的路径。

注意

⚠️ 请注意!交互计算单元(PC3)独立于二开程序所在的开发计算单元(PC2), 音视频文件务必存入交互计算单元(IP: 10.0.1.42)。
⚠️ 音视频文件夹及该文件夹所有父目录应当为所有用户可访问读取(建议在/var/tmp/下创建子目录存放)

功能说明
通过调用PlayVideo服务,可以让机器人在屏幕上播放指定路径的视频文件。请确保视频文件已上传到交互计算单元,否则播放会失败。

 1#!/usr/bin/env python3
 2
 3import rclpy
 4import rclpy.logging
 5from rclpy.node import Node
 6
 7from aimdk_msgs.srv import PlayVideo
 8
 9
10class PlayVideoClient(Node):
11    def __init__(self):
12        super().__init__('play_video_client')
13        self.client = self.create_client(
14            PlayVideo, '/face_ui_proxy/play_video')
15        self.get_logger().info('✅ PlayVideo client node created.')
16
17        # Wait for the service to become available
18        while not self.client.wait_for_service(timeout_sec=2.0):
19            self.get_logger().info('⏳ Service unavailable, waiting...')
20
21        self.get_logger().info('🟢 Service available, ready to send request.')
22
23    def send_request(self, video_path, mode, priority):
24        req = PlayVideo.Request()
25
26        req.video_path = video_path
27        req.mode = mode
28        req.priority = priority
29
30        # async call
31        self.get_logger().info(
32            f'📨 Sending request to play video: mode={mode} video={video_path}')
33        for i in range(8):
34            req.header.header.stamp = self.get_clock().now().to_msg()
35            future = self.client.call_async(req)
36            rclpy.spin_until_future_complete(self, future, timeout_sec=0.25)
37
38            if future.done():
39                break
40
41            # retry as remote peer is NOT handled well by ROS
42            self.get_logger().info(f'trying ... [{i}]')
43
44        resp = future.result()
45        if resp is None:
46            self.get_logger().error('❌ Service call not completed or timed out.')
47            return False
48
49        if resp.success:
50            self.get_logger().info(
51                f'✅ Request to play video recorded successfully: {resp.message}')
52            return True
53        else:
54            self.get_logger().error(
55                f'❌ Failed to record play-video request: {resp.message}')
56            return False
57
58
59def main(args=None):
60    rclpy.init(args=args)
61    node = None
62
63    try:
64        # video path and priority can be customized
65        video_path = "/agibot/data/home/agi/zhiyuan.mp4"
66        priority = 5
67        # input play mode
68        mode = int(input("Enter video play mode (1: play once, 2: loop): "))
69        if mode not in (1, 2):
70            raise ValueError(f'invalid mode {mode}')
71
72        node = PlayVideoClient()
73        node.send_request(video_path, mode, priority)
74    except KeyboardInterrupt:
75        pass
76    except Exception as e:
77        rclpy.logging.get_logger('main').error(
78            f'Program exited with exception: {e}')
79
80    if node:
81        node.destroy_node()
82    if rclpy.ok():
83        rclpy.shutdown()
84
85
86if __name__ == '__main__':
87    main()

6.1.16 媒体文件播放

该示例中用到了play_media,通过该节点可实现播放指定的媒体文件(如音频文件),支持WAV、MP3等格式的音频文件播放。

功能特点:

  • 支持多种音频格式播放(WAV、MP3等)

  • 支持优先级控制,可设置播放优先级

  • 支持打断机制,可中断当前播放

  • 支持自定义文件路径和播放参数

  • 提供完整的错误处理和状态反馈

技术实现:

  • 使用PlayMediaFile服务进行媒体文件播放

  • 支持优先级级别设置(0-99)

  • 支持中断控制(is_interrupted参数)

  • 提供详细的播放状态反馈

  • 兼容不同的响应字段格式

应用场景:

  • 音频文件播放和媒体控制

  • 语音提示和音效播放

  • 多媒体应用开发

  • 机器人交互音频反馈

注意

⚠️ 请注意!交互计算单元(PC3)独立于二开程序所在的开发计算单元(PC2), 音视频文件务必存入交互计算单元(IP: 10.0.1.42)。
⚠️ 音视频文件夹及该文件夹所有父目录应当为所有用户可访问读取(建议在/var/tmp/下创建子目录存放)

 1#!/usr/bin/env python3
 2
 3import sys
 4import rclpy
 5import rclpy.logging
 6from rclpy.node import Node
 7
 8from aimdk_msgs.srv import PlayMediaFile
 9from aimdk_msgs.msg import TtsPriorityLevel
10
11
12class PlayMediaClient(Node):
13    def __init__(self):
14        super().__init__('play_media_client')
15        self.client = self.create_client(
16            PlayMediaFile, '/aimdk_5Fmsgs/srv/PlayMediaFile')
17        self.get_logger().info('✅ PlayMedia client node created.')
18
19        # Wait for the service to become available
20        while not self.client.wait_for_service(timeout_sec=2.0):
21            self.get_logger().info('⏳ Service unavailable, waiting...')
22
23        self.get_logger().info('🟢 Service available, ready to send request.')
24
25    def send_request(self, media_path):
26        req = PlayMediaFile.Request()
27
28        req.media_file_req.file_name = media_path
29        req.media_file_req.domain = 'demo_client'       # required: caller domain
30        req.media_file_req.trace_id = 'demo'            # optional
31        req.media_file_req.is_interrupted = True        # interrupt same-priority
32        req.media_file_req.priority_weight = 0          # optional: 0~99
33        req.media_file_req.priority_level.value = TtsPriorityLevel.INTERACTION_L6
34
35        self.get_logger().info(
36            f'📨 Sending request to play media: {media_path}')
37        for i in range(8):
38            req.header.header.stamp = self.get_clock().now().to_msg()
39            future = self.client.call_async(req)
40            rclpy.spin_until_future_complete(self, future, timeout_sec=0.25)
41
42            if future.done():
43                break
44
45            # retry as remote peer is NOT handled well by ROS
46            self.get_logger().info(f'trying ... [{i}]')
47
48        resp = future.result()
49        if resp is None:
50            self.get_logger().error('❌ Service call not completed or timed out.')
51            return False
52
53        if resp.tts_resp.is_success:
54            self.get_logger().info('✅ Request to play media file recorded successfully.')
55            return True
56        else:
57            self.get_logger().error('❌ Failed to record play-media request.')
58            return False
59
60
61def main(args=None):
62    rclpy.init(args=args)
63    node = None
64
65    default_media = '/agibot/data/var/interaction/tts_cache/normal/demo.wav'
66    try:
67        if len(sys.argv) > 1:
68            media_path = sys.argv[1]
69        else:
70            media_path = input(
71                f'Enter media file path to play (default: {default_media}): ').strip()
72            if not media_path:
73                media_path = default_media
74
75        node = PlayMediaClient()
76        node.send_request(media_path)
77    except KeyboardInterrupt:
78        pass
79    except Exception as e:
80        rclpy.logging.get_logger('main').error(
81            f'Program exited with exception: {e}')
82
83    if node:
84        node.destroy_node()
85    if rclpy.ok():
86        rclpy.shutdown()
87
88
89if __name__ == '__main__':
90    main()

使用说明:

# 播放默认音频文件
ros2 run py_examples play_media

# 播放指定音频文件
# 注意替换/path/to/your/audio_file.wav为交互板上实际文件路径
ros2 run py_examples play_media /path/to/your/audio_file.wav

# 播放TTS缓存文件
ros2 run py_examples play_media /agibot/data/var/interaction/tts_cache/normal/demo.wav

输出示例:

[INFO] [play_media_file_client_min]: 等待服务: /aimdk_5Fmsgs/srv/PlayMediaFile
[INFO] [play_media_file_client_min]: ✅ 媒体文件播放请求成功

注意事项:

  • 确保音频文件路径正确且文件存在

  • 支持的文件格式:WAV、MP3等

  • 优先级设置影响播放队列顺序

  • 打断功能可中断当前播放的音频

  • 程序具有完善的异常处理机制

6.1.17 TTS (文字转语音)

该示例中用到了play_tts,通过该节点可实现语音播放输入的文字,用户可根据不同的场景输入相应的文本。

功能特点:

  • 支持命令行参数和交互式输入

  • 完整的服务可用性检查和错误处理

  • 支持优先级控制和打断机制

  • 提供详细的播放状态反馈

核心代码

 1#!/usr/bin/env python3
 2
 3import sys
 4import rclpy
 5import rclpy.logging
 6from rclpy.node import Node
 7
 8from aimdk_msgs.srv import PlayTts
 9from aimdk_msgs.msg import TtsPriorityLevel
10
11
12class PlayTTSClient(Node):
13    def __init__(self):
14        super().__init__('play_tts_client')
15
16        # fill in the actual service name
17        self.client = self.create_client(PlayTts, '/aimdk_5Fmsgs/srv/PlayTts')
18        self.get_logger().info('✅ PlayTts client node created.')
19
20        # Wait for the service to become available
21        while not self.client.wait_for_service(timeout_sec=2.0):
22            self.get_logger().info('⏳ Service unavailable, waiting...')
23
24        self.get_logger().info('🟢 Service available, ready to send request.')
25
26    def send_request(self, text):
27        req = PlayTts.Request()
28
29        req.tts_req.text = text
30        req.tts_req.domain = 'demo_client'   # required: caller domain
31        req.tts_req.trace_id = 'demo'        # optional: request id
32        req.tts_req.is_interrupted = True    # required: interrupt same-priority
33        req.tts_req.priority_weight = 0
34        req.tts_req.priority_level.value = 6
35
36        self.get_logger().info(f'📨 Sending request to play tts: text={text}')
37        for i in range(8):
38            req.header.header.stamp = self.get_clock().now().to_msg()
39            future = self.client.call_async(req)
40            rclpy.spin_until_future_complete(self, future, timeout_sec=0.25)
41
42            if future.done():
43                break
44
45            # retry as remote peer is NOT handled well by ROS
46            self.get_logger().info(f'trying ... [{i}]')
47
48        resp = future.result()
49        if resp is None:
50            self.get_logger().error('❌ Service call not completed or timed out.')
51            return False
52
53        if resp.tts_resp.is_success:
54            self.get_logger().info('✅ TTS sent successfully.')
55            return True
56        else:
57            self.get_logger().error('❌ Failed to send TTS.')
58            return False
59
60
61def main(args=None):
62    rclpy.init(args=args)
63    node = None
64
65    try:
66        # get text to speak
67        if len(sys.argv) > 1:
68            text = sys.argv[1]
69        else:
70            text = input('Enter text to speak: ')
71            if not text:
72                text = 'Hello, I am AgiBot X2.'
73
74        node = PlayTTSClient()
75        node.send_request(text)
76    except KeyboardInterrupt:
77        pass
78    except Exception as e:
79        rclpy.logging.get_logger('main').error(
80            f'Program exited with exception: {e}')
81
82    if node:
83        node.destroy_node()
84    if rclpy.ok():
85        rclpy.shutdown()
86
87
88if __name__ == '__main__':
89    main()

使用说明

# 使用命令行参数播报文本(推荐)
ros2 run py_examples play_tts "你好,我是灵犀X2机器人"

# 或者不带参数运行,程序会提示用户输入
ros2 run py_examples play_tts

输出示例

[INFO] [play_tts_client_min]: ✅ 播报请求成功

注意事项

  • 确保TTS服务正常运行

  • 支持中文和英文文本播报

  • 优先级设置影响播放队列顺序

  • 打断功能可中断当前播放的语音

接口参考

  • 服务:/aimdk_5Fmsgs/srv/PlayTts

  • 消息:aimdk_msgs/srv/PlayTts

6.1.18 麦克风数据接收

该示例中用到了mic_receiver,通过订阅/agent/process_audio_output话题来接收机器人的降噪音频数据,支持内置麦克风和外置麦克风两种音频流,并根据VAD(语音活动检测)状态自动保存完整的语音片段为PCM文件。

功能特点:

  • 支持多音频流同时接收(内置麦克风 stream_id=1,外置麦克风 stream_id=2)

  • 基于VAD状态自动检测语音开始、处理中、结束

  • 自动保存完整语音片段为PCM格式文件

  • 按时间戳和音频流分类存储

  • 支持音频时长计算和统计信息输出

  • 智能缓冲区管理,避免内存泄漏

  • 完善的错误处理和异常管理

  • 详细的调试日志输出

VAD状态说明:

  • 0: 无语音

  • 1: 语音开始

  • 2: 语音处理中

  • 3: 语音结束

技术实现:

  • 支持PCM格式音频文件保存(16kHz, 16位, 单声道)

  • 提供详细的日志输出和状态监控

  • 支持实时音频流处理和文件保存

应用场景:

  • 语音识别和语音处理

  • 音频数据采集和分析

  • 实时语音监控

  • 音频质量检测

  • 多麦克风阵列数据处理

  1#!/usr/bin/env python3
  2"""
  3Microphone data receiving example
  4
  5This example subscribes to the `/agent/process_audio_output` topic to receive the robot's
  6noise-suppressed audio data. It supports both the built-in microphone and the external
  7microphone audio streams, and automatically saves complete speech segments as PCM files
  8based on the VAD (Voice Activity Detection) state.
  9
 10Features:
 11- Supports receiving multiple audio streams at the same time (built-in mic stream_id=1, external mic stream_id=2)
 12- Automatically detects speech start / in-progress / end based on VAD state
 13- Automatically saves complete speech segments as PCM files
 14- Stores files categorized by timestamp and audio stream
 15- Supports audio duration calculation and logging
 16
 17VAD state description:
 18- 0: No speech
 19- 1: Speech start
 20- 2: Speech in progress
 21- 3: Speech end
 22"""
 23
 24import rclpy
 25from rclpy.node import Node
 26from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy
 27from aimdk_msgs.msg import ProcessedAudioOutput, AudioVadStateType
 28import os
 29import time
 30from datetime import datetime
 31from collections import defaultdict
 32from typing import Dict, List
 33
 34
 35class AudioSubscriber(Node):
 36    def __init__(self):
 37        super().__init__('audio_subscriber')
 38
 39        # Audio buffers, stored separately by stream_id
 40        # stream_id -> buffer
 41        self.audio_buffers: Dict[int, List[bytes]] = defaultdict(list)
 42        self.recording_state: Dict[int, bool] = defaultdict(bool)
 43
 44        # Create audio output directory
 45        self.audio_output_dir = "audio_recordings"
 46        os.makedirs(self.audio_output_dir, exist_ok=True)
 47
 48        # VAD state name mapping
 49        self.vad_state_names = {
 50            0: "No speech",
 51            1: "Speech start",
 52            2: "Speech in progress",
 53            3: "Speech end"
 54        }
 55
 56        # Audio stream name mapping
 57        self.stream_names = {
 58            1: "Built-in microphone",
 59            2: "External microphone"
 60        }
 61
 62        # QoS settings
 63        # Note: deep queue to avoid missing data in a burst at start of VAD.
 64        qos = QoSProfile(
 65            history=QoSHistoryPolicy.KEEP_LAST,
 66            depth=500,
 67            reliability=QoSReliabilityPolicy.BEST_EFFORT
 68        )
 69
 70        # Create subscriber
 71        self.subscription = self.create_subscription(
 72            ProcessedAudioOutput,
 73            '/agent/process_audio_output',
 74            self.audio_callback,
 75            qos
 76        )
 77
 78        self.get_logger().info("Start subscribing to noise-suppressed audio data...")
 79
 80    def audio_callback(self, msg: ProcessedAudioOutput):
 81        """Audio data callback"""
 82        try:
 83            stream_id = msg.stream_id
 84            vad_state = msg.audio_vad_state.value
 85            audio_data = bytes(msg.audio_data)
 86
 87            self.get_logger().info(
 88                f"Received audio data: stream_id={stream_id}, "
 89                f"vad_state={vad_state}({self.vad_state_names.get(vad_state, 'Unknown state')}), "
 90                f"audio_size={len(audio_data)} bytes"
 91            )
 92
 93            self.handle_vad_state(stream_id, vad_state, audio_data)
 94
 95        except Exception as e:
 96            self.get_logger().error(
 97                f"Error while processing audio message: {str(e)}")
 98
 99    def handle_vad_state(self, stream_id: int, vad_state: int, audio_data: bytes):
100        """Handle VAD state changes"""
101        stream_name = self.stream_names.get(
102            stream_id, f"Unknown stream {stream_id}")
103        vad_name = self.vad_state_names.get(
104            vad_state, f"Unknown state {vad_state}")
105
106        self.get_logger().info(
107            f"[{stream_name}] VAD state: {vad_name} audio: {len(audio_data)} bytes"
108        )
109
110        # Speech start
111        if vad_state == 1:
112            self.get_logger().info("🎤 Speech start detected")
113            if not self.recording_state[stream_id]:
114                self.audio_buffers[stream_id].clear()
115                self.recording_state[stream_id] = True
116            if audio_data:
117                self.audio_buffers[stream_id].append(audio_data)
118
119        # Speech in progress
120        elif vad_state == 2:
121            self.get_logger().info("🔄 Speech in progress...")
122            if self.recording_state[stream_id] and audio_data:
123                self.audio_buffers[stream_id].append(audio_data)
124
125        # Speech end
126        elif vad_state == 3:
127            self.get_logger().info("✅ Speech end")
128            if self.recording_state[stream_id] and audio_data:
129                self.audio_buffers[stream_id].append(audio_data)
130
131            if self.recording_state[stream_id] and self.audio_buffers[stream_id]:
132                self.save_audio_segment(stream_id)
133            self.recording_state[stream_id] = False
134
135        # No speech
136        elif vad_state == 0:
137            if self.recording_state[stream_id]:
138                self.get_logger().info("⏹️ Reset recording state")
139                self.recording_state[stream_id] = False
140
141        # Print current buffer status
142        buffer_size = sum(len(chunk)
143                          for chunk in self.audio_buffers[stream_id])
144        recording = self.recording_state[stream_id]
145        self.get_logger().debug(
146            f"[Stream {stream_id}] Buffer size: {buffer_size} bytes, recording: {recording}"
147        )
148
149    def save_audio_segment(self, stream_id: int):
150        """Save audio segment"""
151        if not self.audio_buffers[stream_id]:
152            return
153
154        # Merge all audio data
155        audio_data = b''.join(self.audio_buffers[stream_id])
156
157        # Get current timestamp
158        now = datetime.now()
159        timestamp = now.strftime("%Y%m%d_%H%M%S_%f")[:-3]  # to milliseconds
160
161        # Create subdirectory by stream_id
162        stream_dir = os.path.join(self.audio_output_dir, f"stream_{stream_id}")
163        os.makedirs(stream_dir, exist_ok=True)
164
165        # Generate filename
166        stream_name = "internal_mic" if stream_id == 1 else "external_mic" if stream_id == 2 else f"stream_{stream_id}"
167        filename = f"{stream_name}_{timestamp}.pcm"
168        filepath = os.path.join(stream_dir, filename)
169
170        try:
171            # Save PCM file
172            with open(filepath, 'wb') as f:
173                f.write(audio_data)
174
175            self.get_logger().info(
176                f"Audio segment saved: {filepath} (size: {len(audio_data)} bytes)")
177
178            # Calculate audio duration (assuming 16 kHz, 16-bit, mono)
179            sample_rate = 16000
180            bits_per_sample = 16
181            channels = 1
182            bytes_per_sample = bits_per_sample // 8
183            total_samples = len(audio_data) // (bytes_per_sample * channels)
184            duration_seconds = total_samples / sample_rate
185
186            self.get_logger().info(
187                f"Audio duration: {duration_seconds:.2f} s ({total_samples} samples)")
188
189        except Exception as e:
190            self.get_logger().error(f"Failed to save audio file: {str(e)}")
191
192
193def main(args=None):
194    rclpy.init(args=args)
195    node = AudioSubscriber()
196
197    try:
198        node.get_logger().info("Listening to noise-suppressed audio data, press Ctrl+C to exit...")
199        rclpy.spin(node)
200    except KeyboardInterrupt:
201        node.get_logger().info("Interrupt signal received, exiting...")
202    finally:
203        node.destroy_node()
204        rclpy.shutdown()
205
206
207if __name__ == '__main__':
208    main()

使用说明:

  1. 运行程序

    # 构建Python包
    colcon build --packages-select py_examples
    
    # 运行麦克风接收程序, 配合唤醒词触发VAD
    ros2 run py_examples mic_receiver
    
  2. 目录结构

    • 运行节点后会自动创建audio_recordings目录

    • 音频文件按stream_id分类存储:

      • stream_1/: 内置麦克风音频

      • stream_2/: 外置麦克风音频

  3. 文件命名格式{stream_name}_{timestamp}.pcm

    • internal_mic_20250909_133649_738.pcm (内置麦克风)

    • external_mic_20250909_133650_123.pcm (外置麦克风)

  4. 音频格式:16kHz, 16位, 单声道PCM

  5. 播放保存的PCM文件

    # 播放内置麦克风录音
    aplay -r 16000 -f S16_LE -c 1 audio_recordings/stream_1/internal_mic_20250909_133649_738.pcm
    
    # 播放外置麦克风录音
    aplay -r 16000 -f S16_LE -c 1 audio_recordings/stream_2/external_mic_20250909_133650_123.pcm
    
  6. 转换为WAV格式(可选):

    # 使用ffmpeg转换为WAV格式
    ffmpeg -f s16le -ar 16000 -ac 1 -i external_mic_20250909_133649_738.pcm output.wav
    
  7. 程序控制

    • Ctrl+C 安全退出程序

    • 程序会自动处理音频流的开始和结束

    • 支持同时处理多个音频流

输出示例:

正常启动和运行:

[INFO] 开始订阅降噪音频数据...
[INFO] 收到音频数据: stream_id=1, vad_state=0(无语音), audio_size=0 bytes
[INFO] [内置麦克风] VAD状态: 无语音 音频数据: 0 bytes
[INFO] 收到音频数据: stream_id=2, vad_state=0(无语音), audio_size=0 bytes
[INFO] [外置麦克风] VAD状态: 无语音 音频数据: 0 bytes

检测到语音开始:

[INFO] 收到音频数据: stream_id=2, vad_state=1(语音开始), audio_size=320 bytes
[INFO] [外置麦克风] VAD状态: 语音开始 音频数据: 320 bytes
[INFO] 🎤 检测到语音开始

语音处理过程:

[INFO] 收到音频数据: stream_id=2, vad_state=2(语音处理中), audio_size=320 bytes
[INFO] [外置麦克风] VAD状态: 语音处理中 音频数据: 320 bytes
[INFO] 🔄 语音处理中...
[INFO] 收到音频数据: stream_id=2, vad_state=2(语音处理中), audio_size=320 bytes
[INFO] [外置麦克风] VAD状态: 语音处理中 音频数据: 320 bytes
[INFO] 🔄 语音处理中...

语音结束和保存:

[INFO] 收到音频数据: stream_id=2, vad_state=3(语音结束), audio_size=320 bytes
[INFO] [外置麦克风] VAD状态: 语音结束 音频数据: 320 bytes
[INFO] ✅ 语音结束
[INFO] 音频段已保存: audio_recordings/stream_2/external_mic_20250909_133649_738.pcm (大小: 960 bytes)
[INFO] 音频时长: 0.06 秒 (480 样本)

多音频流同时处理:

[INFO] 收到音频数据: stream_id=1, vad_state=1(语音开始), audio_size=320 bytes
[INFO] [内置麦克风] VAD状态: 语音开始 音频数据: 320 bytes
[INFO] 🎤 检测到语音开始
[INFO] 收到音频数据: stream_id=2, vad_state=1(语音开始), audio_size=320 bytes
[INFO] [外置麦克风] VAD状态: 语音开始 音频数据: 320 bytes
[INFO] 🎤 检测到语音开始

程序退出:

^C[INFO] 接收到中断信号,正在退出...
[INFO] 程序已安全退出

注意事项:

  • 程序支持同时处理多个音频流(内置和外置麦克风)

  • 每个音频流都有独立的缓冲区和录音状态

  • 音频文件会自动按时间戳和音频流分类保存

  • 程序具有完善的错误处理机制,确保稳定运行

6.1.19 表情控制

该示例中用到了play_emoji,通过该节点可实现表达指定的表情,用户可根据已有的表情列表来选择要变化的表情,具体表情列表可参考表情列表

 1#!/usr/bin/env python3
 2
 3import rclpy
 4import rclpy.logging
 5from rclpy.node import Node
 6
 7from aimdk_msgs.srv import PlayEmoji
 8
 9
10class PlayEmojiClient(Node):
11    def __init__(self):
12        super().__init__('play_emoji_client')
13        self.client = self.create_client(
14            PlayEmoji, '/face_ui_proxy/play_emoji')
15        self.get_logger().info('✅ PlayEmoji client node created.')
16
17        # Wait for the service to become available
18        while not self.client.wait_for_service(timeout_sec=2.0):
19            self.get_logger().info('⏳ Service unavailable, waiting...')
20
21        self.get_logger().info('🟢 Service available, ready to send request.')
22
23    def send_request(self, emoji: int, mode: int, priority: int):
24        req = PlayEmoji.Request()
25
26        req.emotion_id = int(emoji)
27        req.mode = int(mode)
28        req.priority = int(priority)
29
30        self.get_logger().info(
31            f'📨 Sending request to play emoji: id={emoji}, mode={mode}, priority={priority}')
32        for i in range(8):
33            req.header.header.stamp = self.get_clock().now().to_msg()
34            future = self.client.call_async(req)
35            rclpy.spin_until_future_complete(self, future, timeout_sec=0.25)
36
37            if future.done():
38                break
39
40            # retry as remote peer is NOT handled well by ROS
41            self.get_logger().info(f'trying ... [{i}]')
42
43        resp = future.result()
44        if resp is None:
45            self.get_logger().error('❌ Service call not completed or timed out.')
46            return False
47
48        if resp.success:
49            self.get_logger().info(
50                f'✅ Emoji played successfully: {resp.message}')
51            return True
52        else:
53            self.get_logger().error(f'❌ Failed to play emoji: {resp.message}')
54            return False
55
56
57def main(args=None):
58    rclpy.init(args=args)
59    node = None
60
61    # Interactive input, same as the original C++ version
62    try:
63        emotion = int(
64            input("Enter emoji ID: 1-blink, 60-bored, 70-abnormal, 80-sleeping, 90-happy ... 190-double angry, 200-adore: "))
65        mode = int(input("Enter play mode (1: play once, 2: loop): "))
66        if mode not in (1, 2):
67            raise ValueError("invalid mode")
68        priority = 10  # default priority
69
70        node = PlayEmojiClient()
71        node.send_request(emotion, mode, priority)
72    except KeyboardInterrupt:
73        pass
74    except Exception as e:
75        rclpy.logging.get_logger('main').error(
76            f'Program exited with exception: {e}')
77
78    if node:
79        node.destroy_node()
80    if rclpy.ok():
81        rclpy.shutdown()
82
83
84if __name__ == '__main__':
85    main()

6.1.20 LED灯带控制

功能说明:演示如何控制机器人的LED灯带,支持多种显示模式和自定义颜色。

核心代码

  1#!/usr/bin/env python3
  2
  3import sys
  4import rclpy
  5import rclpy.logging
  6from rclpy.node import Node
  7
  8from aimdk_msgs.msg import CommonRequest
  9from aimdk_msgs.srv import LedStripCommand
 10
 11
 12class PlayLightsClient(Node):
 13    def __init__(self):
 14        super().__init__('play_lights_client')
 15
 16        # create service client
 17        self.client = self.create_client(
 18            LedStripCommand, '/aimdk_5Fmsgs/srv/LedStripCommand')
 19
 20        self.get_logger().info('✅ PlayLights client node created.')
 21
 22        # Wait for the service to become available
 23        while not self.client.wait_for_service(timeout_sec=2.0):
 24            self.get_logger().info('⏳ Service unavailable, waiting...')
 25
 26        self.get_logger().info('🟢 Service available, ready to send request.')
 27
 28    def send_request(self, led_mode, r, g, b):
 29        """Send LED control request"""
 30        # create request
 31        request = LedStripCommand.Request()
 32        request.led_strip_mode = led_mode
 33        request.r = r
 34        request.g = g
 35        request.b = b
 36
 37        # send request
 38        # Note: LED strip is slow to response (up to ~5s)
 39        self.get_logger().info(
 40            f'📨 Sending request to control led strip: mode={led_mode}, RGB=({r}, {g}, {b})')
 41        for i in range(4):
 42            request.request.header.stamp = self.get_clock().now().to_msg()
 43            future = self.client.call_async(request)
 44            rclpy.spin_until_future_complete(self, future, timeout_sec=5)
 45
 46            if future.done():
 47                break
 48
 49            # retry as remote peer is NOT handled well by ROS
 50            self.get_logger().info(f'trying ... [{i}]')
 51
 52        response = future.result()
 53        if response is None:
 54            self.get_logger().error('❌ Service call not completed or timed out.')
 55            return False
 56
 57        if response.status_code == 0:
 58            self.get_logger().info('✅ LED strip command sent successfully.')
 59            return True
 60        else:
 61            self.get_logger().error(
 62                f'❌ LED strip command failed with status: {response.status_code}')
 63            return False
 64
 65
 66def main(args=None):
 67    rclpy.init(args=args)
 68    node = None
 69
 70    try:
 71        # get command line args
 72        if len(sys.argv) > 4:
 73            # use CLI args
 74            led_mode = int(sys.argv[1])
 75            if led_mode not in (0, 1, 2, 3):
 76                raise ValueError("invalid mode")
 77            r = int(sys.argv[2])
 78            if r < 0 or r > 255:
 79                raise ValueError("invalid R value")
 80            g = int(sys.argv[3])
 81            if g < 0 or g > 255:
 82                raise ValueError("invalid G value")
 83            b = int(sys.argv[4])
 84            if b < 0 or b > 255:
 85                raise ValueError("invalid B value")
 86        else:
 87            # interactive input
 88            print("=== LED strip control example ===")
 89            print("Select LED strip mode:")
 90            print("0 - Steady on")
 91            print("1 - Breathing (4s cycle, sine brightness)")
 92            print("2 - Blinking (1s cycle, 0.5s on, 0.5s off)")
 93            print("3 - Flowing (2s cycle, light up from left to right)")
 94
 95            led_mode = int(input("Enter mode (0-3): "))
 96            if led_mode not in (0, 1, 2, 3):
 97                raise ValueError("invalid mode")
 98
 99            print("\nSet RGB color values (0-255):")
100            r = int(input("Red (R): "))
101            if r < 0 or r > 255:
102                raise ValueError("invalid R value")
103            g = int(input("Green (G): "))
104            if g < 0 or g > 255:
105                raise ValueError("invalid G value")
106            b = int(input("Blue (B): "))
107            if b < 0 or b > 255:
108                raise ValueError("invalid B value")
109
110        node = PlayLightsClient()
111        node.send_request(led_mode, r, g, b)
112    except KeyboardInterrupt:
113        pass
114    except Exception as e:
115        rclpy.logging.get_logger('main').error(
116            f'Program exited with exception: {e}')
117
118    if node:
119        node.destroy_node()
120    if rclpy.ok():
121        rclpy.shutdown()
122
123
124if __name__ == '__main__':
125    main()

使用说明

# 构建
colcon build --packages-select py_examples

# 交互式运行
ros2 run py_examples play_lights

# 命令行参数运行
ros2 run py_examples play_lights 1 255 0 0  # 模式1,红色

输出示例

=== LED灯带控制示例 ===
请选择灯带模式:
0 - 常亮模式
1 - 呼吸模式 (4s周期,亮度正弦变化)
2 - 闪烁模式 (1s周期,0.5s亮,0.5s灭)
3 - 流水模式 (2s周期,从左到右依次点亮)
请输入模式 (0-3): 1

请设置RGB颜色值 (0-255):
红色分量 (R): 255
绿色分量 (G): 0
蓝色分量 (B): 0

发送LED控制命令...
模式: 1, 颜色: RGB(255, 0, 0)
✅ LED strip command sent successfully

技术特点

  • 支持4种LED显示模式

  • RGB颜色自定义

  • 同步服务调用

  • 命令行参数支持

  • 输入参数验证

  • 友好的用户交互界面