6.1 Python接口使用示例

本章节将带您实现索引所示若干功能

构建 & 运行说明

  • 进入到 aimdk 目录,执行以下命令

    source /opt/ros/humble/setup.bash
    colcon build
    source install/setup.bash
    ros2 run py_examples '对应功能名称如: get_mc_action'
    

📝 代码说明

完整的代码实现包含完整的错误处理、信号处理、超时处理等机制,确保程序的健壮性。请您在 py_examples 目录下查看/修改

6.1.1 获取机器人模式

通过调用GetMcAction服务获取机器人当前的运行模式,包括模式ID、描述和状态信息。

运动模式定义

 1#!/usr/bin/env python3
 2import rclpy
 3from rclpy.node import Node
 4from aimdk_msgs.srv import GetMcAction
 5from aimdk_msgs.msg import CommonRequest
 6
 7
 8class GetMcActionClient(Node):
 9    def __init__(self):
10        super().__init__('get_mc_action_client')
11        self.client = self.create_client(
12            GetMcAction, '/aimdk_5Fmsgs/srv/GetMcAction')
13        self.get_logger().info('Get MC Action Client created')
14        while not self.client.wait_for_service(timeout_sec=1.0):
15            self.get_logger().info('Service not available, waiting...')
16
17    def send_request(self):
18        request = GetMcAction.Request()
19        request.request = CommonRequest()
20
21        self.get_logger().info('Sending request to get robot mode')
22        for i in range(8):
23            future = self.client.call_async(request)
24            rclpy.spin_until_future_complete(self, future, timeout_sec=0.25)
25
26            if future.done():
27                break
28
29            # retry as remote peer: mc module can be under heavy load
30            self.get_logger().info(f"trying to register input source... [{i}]")
31
32        if future.result() is not None:
33            response = future.result()
34            self.get_logger().info(
35                'Robot modes (1: default mode, 100: JOINT_DEFAULT, 200: STAND_DEFAULT, 300: LOCOMOTION_DEFAULT)')
36            self.get_logger().info(
37                f'Current robot mode: {response.info.current_action.value}')
38            self.get_logger().info(
39                f'Mode description: {response.info.action_desc}')
40            self.get_logger().info(
41                f'Mode status: {response.info.status.value}')
42        else:
43            self.get_logger().error('Service call failed')
44
45
46def main(args=None):
47    rclpy.init(args=args)
48    client = GetMcActionClient()
49    client.send_request()
50    client.destroy_node()
51    rclpy.shutdown()
52
53
54if __name__ == '__main__':
55    main()

6.1.2 设置机器人模式

该示例中用到了SetMcAction服务,运行节点程序后终端输入模式对应的字段值,机器人会立即进入相应的运动模式
切入稳定站立(STAND_DEFAULT)模式前,确保机器人脚已经着地。

 1#!/usr/bin/env python3
 2import sys
 3import rclpy
 4from rclpy.node import Node
 5
 6from aimdk_msgs.srv import SetMcAction
 7from aimdk_msgs.msg import RequestHeader, CommonState, McAction, McActionCommand
 8
 9
10class SetMcActionClient(Node):
11    def __init__(self):
12        super().__init__('set_mc_action_client')
13        self.client = self.create_client(
14            SetMcAction, '/aimdk_5Fmsgs/srv/SetMcAction'
15        )
16        self.get_logger().info('✅ SetMcAction client node created.')
17
18        # Wait for the service to become available
19        while not self.client.wait_for_service(timeout_sec=1.0):
20            self.get_logger().info('⏳ Service unavailable, waiting...')
21
22    def send_request(self, action_value: int):
23        req = SetMcAction.Request()
24        req.header = RequestHeader()
25
26        cmd = McActionCommand()
27        act = McAction()
28        act.value = action_value
29        cmd.action = act
30        cmd.action_desc = f"Set robot mode to: {action_value}"
31        req.command = cmd
32
33        self.get_logger().info(
34            f'Sending request to set robot mode: {action_value}')
35        for i in range(8):
36            req.header.stamp = self.get_clock().now().to_msg()
37            future = self.client.call_async(req)
38            rclpy.spin_until_future_complete(self, future, timeout_sec=0.25)
39
40            if future.done():
41                break
42
43            # retry as remote peer: mc module can be under heavy load
44            self.get_logger().info(f"trying to register input source... [{i}]")
45
46        if not future.done():
47            self.get_logger().error('❌ Service call not completed or timed out.')
48            return
49
50        resp = future.result()
51        if resp is None:
52            self.get_logger().error('❌ Service call failed — no response received.')
53            return
54
55        if resp.response.status.value == CommonState.SUCCESS:
56            self.get_logger().info('✅ Robot mode set successfully.')
57        else:
58            self.get_logger().error(
59                f'❌ Failed to set robot mode: {resp.response.message}'
60            )
61
62
63def main(args=None):
64    rclpy.init(args=args)
65    node = SetMcActionClient()
66
67    # Prefer command-line argument, otherwise prompt for input
68    if len(sys.argv) > 1:
69        try:
70            motion = int(sys.argv[1])
71            node.get_logger().info(f'Using command-line argument: {motion}')
72        except ValueError:
73            node.get_logger().error('Invalid command-line argument — must be an integer.')
74            node.destroy_node()
75            rclpy.shutdown()
76            return
77    else:
78        motion = int(
79            input(
80                "Enter robot mode (1: Default, 100: Position Control Stand, "
81                "200: Stable Stand, 300: Walking): "
82            )
83        )
84
85    try:
86        node.send_request(motion)
87    except KeyboardInterrupt:
88        node.get_logger().info('🟡 Interrupted by user.')
89
90    node.destroy_node()
91    rclpy.shutdown()
92
93
94if __name__ == '__main__':
95    main()

使用说明

# 使用命令行参数设置模式(推荐)
ros2 run py_examples set_mc_action 100  # 站姿预备(位控站立)模式
ros2 run py_examples set_mc_action 300  # 走跑模式

# 或者不带参数运行,程序会提示用户输入
ros2 run py_examples set_mc_action

输出示例

[INFO] [set_mc_action_client]: 设置机器人模式成功

注意事项

  • 切入stand_default模式前,确保机器人脚已经着地

  • 模式切换可能需要几秒钟时间完成

接口参考

  • 服务:/aimdk_5Fmsgs/srv/SetMcAction

  • 消息:aimdk_msgs/srv/SetMcAction

6.1.3 设置机器人动作

该示例中用到了preset_motion_client,启动节点程序后,输入相应的字段值可实现左手(右手)的握手(举手、挥手、飞吻)动作。

可供调用的参数见预设动作表

  1#!/usr/bin/env python3
  2
  3import rclpy
  4from rclpy.node import Node
  5from rclpy.task import Future
  6import signal
  7import sys
  8
  9from aimdk_msgs.srv import SetMcPresetMotion
 10from aimdk_msgs.msg import McPresetMotion, McControlArea, RequestHeader, CommonState
 11
 12g_node = None
 13
 14
 15def signal_handler(sig, frame):
 16    global g_node
 17    if g_node is not None:
 18        g_node.get_logger().info(f'接收到信号 {sig},正在关闭...')
 19        g_node.destroy_node()
 20    rclpy.shutdown()
 21    sys.exit(0)
 22
 23
 24class PresetMotionClient(Node):
 25
 26    def __init__(self):
 27        super().__init__('preset_motion_client')
 28
 29        self.client = self.create_client(
 30            SetMcPresetMotion, '/aimdk_5Fmsgs/srv/SetMcPresetMotion'
 31        )
 32
 33        self.get_logger().info("预设动作服务客户端已创建")
 34
 35        timeout_sec = 8.0
 36        start = self.get_clock().now()
 37        while not self.client.wait_for_service(timeout_sec=2.0):
 38            now = self.get_clock().now()
 39            if (now - start).nanoseconds / 1e9 > timeout_sec:
 40                self.get_logger().error("等待服务超时")
 41                rclpy.shutdown()
 42                return
 43            self.get_logger().info("等待服务启动中...")
 44
 45    def send_motion_request(self, area_id: int, motion_id: int) -> bool:
 46        request = SetMcPresetMotion.Request()
 47
 48        request.header = RequestHeader()
 49
 50        motion = McPresetMotion()
 51        area = McControlArea()
 52
 53        motion.value = motion_id
 54        area.value = area_id
 55
 56        request.motion = motion
 57        request.area = area
 58        request.interrupt = False
 59
 60        self.get_logger().info(
 61            f"手臂:(ID={area_id});发送预设动作请求:  (ID={motion_id})"
 62        )
 63
 64        for i in range(8):
 65            request.header.stamp = self.get_clock().now().to_msg()
 66            future = self.client.call_async(request)
 67            rclpy.spin_until_future_complete(self, future, timeout_sec=0.25)
 68
 69            if future.done():
 70                break
 71
 72            # retry as remote peer: mc module can be under heavy load
 73            self.get_logger().info(f"trying to register input source... [{i}]")
 74
 75        if future.done():
 76            try:
 77                response = future.result()
 78                if response.response.state.value == CommonState.SUCCESS:
 79                    self.get_logger().info(
 80                        f"动作执行成功: {response.response.task_id}")
 81                    return True
 82                elif response.response.state.value == CommonState.RUNNING:
 83                    self.get_logger().info(
 84                        f"动作正在执行中: {response.response.task_id}")
 85                    return True
 86                else:
 87                    self.get_logger().warn(
 88                        f"动作执行失败: {response.response.task_id}")
 89                    return False
 90            except Exception as e:
 91                self.get_logger().error(f"调用服务异常: {str(e)}")
 92                return False
 93        else:
 94            self.get_logger().error("服务调用失败或超时")
 95            return False
 96
 97
 98def main(args=None):
 99    global g_node
100    rclpy.init(args=args)
101
102    signal.signal(signal.SIGINT, signal_handler)
103    signal.signal(signal.SIGTERM, signal_handler)
104
105    g_node = PresetMotionClient()
106
107    Hand_area = int(input("请输入手臂区域(左手=1,右手=2):"))
108    Hand_motion = int(input("请输入预设动作ID(举手=1001,挥手=1002,握手=1003,飞吻=1004):"))
109
110    success = g_node.send_motion_request(Hand_area, Hand_motion)
111    if not success:
112        g_node.get_logger().error("发送挥手请求失败")
113        rclpy.shutdown()
114        return 1
115
116    g_node.destroy_node()
117    rclpy.shutdown()
118
119
120if __name__ == '__main__':
121    main()

6.1.4 夹爪控制

该示例中用到了hand_control,通过往话题/aima/hal/joint/hand/command发布消息来控制夹爪的运动

注意

注意⚠️ :运行本示例前需要在机器人运控计算单元(PC1)使用aima em stop-app mc关闭原生运控模块,获取机器人控制权。注意确保机器人安全

 1import rclpy
 2from rclpy.node import Node
 3from aimdk_msgs.msg import HandCommandArray, HandCommand, HandType, MessageHeader
 4import time
 5
 6
 7class HandControl(Node):
 8    def __init__(self):
 9        super().__init__('hand_control')
10
11        # Preset parameter list: [(left hand, right hand), ...]
12        self.position_pairs = [
13            (1.0, 1.0),   # fully open
14            (0.0, 0.0),   # fully closed
15            (0.5, 0.5),   # half open
16            (0.2, 0.8),   # left slightly closed, right more open
17            (0.7, 0.3)    # left more open, right slightly closed
18        ]
19        self.current_index = 0
20        self.last_switch_time = self.get_clock().now().nanoseconds / 1e9  # seconds
21
22        # Create publisher
23        self.publisher_ = self.create_publisher(
24            HandCommandArray,
25            '/aima/hal/joint/hand/command',
26            10
27        )
28
29        # 50 Hz timer
30        self.timer_ = self.create_timer(
31            0.02,  # 20 ms = 50 Hz
32            self.publish_hand_commands
33        )
34
35        self.get_logger().info("Hand control node started!")
36
37    def publish_hand_commands(self):
38        # Check time to decide whether to switch to the next preset
39        now_sec = self.get_clock().now().nanoseconds / 1e9
40        if now_sec - self.last_switch_time >= 2.0:
41            self.current_index = (self.current_index +
42                                  1) % len(self.position_pairs)
43            self.last_switch_time = now_sec
44            self.get_logger().info(
45                f"Switched to preset: {self.current_index}, left={self.position_pairs[self.current_index][0]:.2f}, right={self.position_pairs[self.current_index][1]:.2f}"
46            )
47
48        # Use current preset
49        left_position, right_position = self.position_pairs[self.current_index]
50
51        msg = HandCommandArray()
52        msg.header = MessageHeader()
53
54        # Configure left hand
55        left_hand = HandCommand()
56        left_hand.name = "left_hand"
57        left_hand.position = float(left_position)
58        left_hand.velocity = 1.0
59        left_hand.acceleration = 1.0
60        left_hand.deceleration = 1.0
61        left_hand.effort = 1.0
62
63        # Configure right hand
64        right_hand = HandCommand()
65        right_hand.name = "right_hand"
66        right_hand.position = float(right_position)
67        right_hand.velocity = 1.0
68        right_hand.acceleration = 1.0
69        right_hand.deceleration = 1.0
70        right_hand.effort = 1.0
71
72        msg.left_hand_type = HandType(value=2)  # gripper mode
73        msg.right_hand_type = HandType(value=2)
74        msg.left_hands = [left_hand]
75        msg.right_hands = [right_hand]
76
77        # Publish message
78        self.publisher_.publish(msg)
79        # We only log when switching presets to avoid too much log output
80
81
82def main(args=None):
83    rclpy.init(args=args)
84    hand_control_node = HandControl()
85
86    try:
87        rclpy.spin(hand_control_node)
88    except KeyboardInterrupt:
89        pass
90    finally:
91        hand_control_node.destroy_node()
92        rclpy.shutdown()
93
94
95if __name__ == '__main__':
96    main()

6.1.5 灵巧手控制 (升级中暂不开放)

该示例中用到了omnihand_control,通过往话题/aima/hal/joint/hand/command发布消息来控制夹爪的运动

注意

注意⚠️ :运行本示例前需要在机器人运控计算单元(PC1)使用aima em stop-app mc关闭原生运控模块,获取机器人控制权。注意确保机器人安全

  1#!/usr/bin/env python3
  2import rclpy
  3from rclpy.node import Node
  4from aimdk_msgs.msg import HandCommand, HandCommandArray, HandType, MessageHeader
  5
  6
  7class OmnihandControl(Node):
  8    def __init__(self):
  9        super().__init__('omnihand_control')
 10
 11        self.num_fingers = 5
 12        self.joints_per_finger = 2
 13        self.total_joints = self.num_fingers * self.joints_per_finger  # 10
 14
 15        # ===== Define 10 gesture sets directly as arrays =====
 16        # Each set has 20 joint params (left 10 + right 10)
 17        # 1 = bent, 0 = straight
 18        self.gesture_sequence = [
 19            # bend 1 finger
 20            [1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0,
 21             1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
 22            # bend 2 fingers
 23            [1.0, 1.0, 1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0,
 24             1.0, 1.0, 1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0],
 25            # bend 3 fingers
 26            [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0,
 27             1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0],
 28            # bend 4 fingers
 29            [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0, 0.0,
 30             1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0, 0.0],
 31            # bend 5 fingers
 32            [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0,
 33             1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0],
 34            # release 1 finger
 35            [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0, 0.0,
 36             1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0, 0.0],
 37            # release 2 fingers
 38            [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0,
 39             1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0],
 40            # release 3 fingers
 41            [1.0, 1.0, 1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0,
 42             1.0, 1.0, 1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0],
 43            # release 4 fingers
 44            [1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0,
 45             1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
 46            # all straight
 47            [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0,
 48             0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
 49        ]
 50
 51        self.current_index = 0
 52        self.last_switch_time = self.get_clock().now()
 53
 54        self.publisher = self.create_publisher(
 55            HandCommandArray,
 56            '/aima/hal/joint/hand/command',
 57            10
 58        )
 59
 60        # Switch gesture every second
 61        self.timer = self.create_timer(
 62            1.0,
 63            self.publish_hand_commands
 64        )
 65
 66        self.get_logger().info("Omnihand demo control node started!")
 67
 68    def publish_hand_commands(self):
 69        now_time = self.get_clock().now()
 70        if (now_time - self.last_switch_time).nanoseconds / 1e9 >= 1.0:
 71            self.current_index = (self.current_index +
 72                                  1) % len(self.gesture_sequence)
 73            self.last_switch_time = now_time
 74            self.get_logger().info(
 75                f'Switched to gesture {self.current_index + 1}/10')
 76
 77        joint_array = self.gesture_sequence[self.current_index]
 78
 79        msg = HandCommandArray()
 80        msg.header = MessageHeader()
 81
 82        # Left hand
 83        for i in range(self.total_joints):
 84            cmd = HandCommand()
 85            cmd.name = f"left_hand_joint{i}"
 86            cmd.position = joint_array[i]
 87            cmd.velocity = 1.0
 88            cmd.acceleration = 1.0
 89            cmd.deceleration = 1.0
 90            cmd.effort = 1.0
 91            msg.left_hands.append(cmd)
 92
 93        # Right hand
 94        for i in range(self.total_joints):
 95            cmd = HandCommand()
 96            cmd.name = f"right_hand_joint{i}"
 97            cmd.position = joint_array[self.total_joints + i]
 98            cmd.velocity = 1.0
 99            cmd.acceleration = 1.0
100            cmd.deceleration = 1.0
101            cmd.effort = 1.0
102            msg.right_hands.append(cmd)
103
104        msg.left_hand_type = HandType()
105        msg.left_hand_type.value = 1  # dexterous hand mode
106        msg.right_hand_type = HandType()
107        msg.right_hand_type.value = 1
108
109        self.publisher.publish(msg)
110
111
112def main(args=None):
113    rclpy.init(args=args)
114    node = OmnihandControl()
115    try:
116        rclpy.spin(node)
117    except KeyboardInterrupt:
118        pass
119    node.destroy_node()
120    rclpy.shutdown()
121
122
123if __name__ == '__main__':
124    main()

6.1.6 注册二开输入源

对于v0.7之后的版本,在实现对MC的控制前,需要先注册输入源。该示例中通过/aimdk_5Fmsgs/srv/SetMcInputSource服务来注册二开的输入源,让MC感知到,只有注册过输入源后才能实现机器人的速度控制

  1#!/usr/bin/env python3
  2
  3import rclpy
  4import rclpy.logging
  5from rclpy.node import Node
  6import signal
  7import sys
  8
  9from aimdk_msgs.srv import SetMcInputSource
 10from aimdk_msgs.msg import RequestHeader, McInputAction
 11
 12client_node = None  # for shutting down cleanly in signal handler
 13
 14
 15def signal_handler(signum, frame):
 16    if client_node is not None:
 17        client_node.get_logger().info(
 18            f"Received signal {signum}, shutting down...")
 19    if rclpy.ok():
 20        rclpy.shutdown()
 21
 22
 23class McInputClient(Node):
 24    def __init__(self):
 25        super().__init__('set_mc_input_source_client')
 26        self.client = self.create_client(
 27            SetMcInputSource, '/aimdk_5Fmsgs/srv/SetMcInputSource'
 28        )
 29
 30        self.get_logger().info('✅ SetMcInputSource client created.')
 31
 32        timeout_sec = 8.0
 33        start = self.get_clock().now().nanoseconds / 1e9
 34
 35        while not self.client.wait_for_service(timeout_sec=2.0):
 36            now = self.get_clock().now().nanoseconds / 1e9
 37            if now - start > timeout_sec:
 38                self.get_logger().error('❌ Timed out waiting for service.')
 39                raise RuntimeError('Service unavailable')
 40            self.get_logger().info('Waiting for input source service...')
 41
 42        self.get_logger().info('🟢 Service available, ready to send request.')
 43
 44    def send_input_request(self) -> bool:
 45        try:
 46            req = SetMcInputSource.Request()
 47
 48            # header
 49            req.request.header = RequestHeader()
 50
 51            # action (e.g. 1001 = register)
 52            req.action = McInputAction()
 53            req.action.value = 1001
 54
 55            # input source info
 56            req.input_source.name = 'node'
 57            req.input_source.priority = 40
 58            req.input_source.timeout = 1000  # ms
 59
 60            self.get_logger().info(
 61                f"📨 Sending input source request: action_id={req.action.value}, "
 62                f"name={req.input_source.name}, priority={req.input_source.priority}"
 63            )
 64
 65            for i in range(8):
 66                req.request.header.stamp = self.get_clock().now().to_msg()
 67                future = self.client.call_async(req)
 68                rclpy.spin_until_future_complete(
 69                    self, future, timeout_sec=0.25)
 70
 71                if future.done():
 72                    break
 73
 74                # retry as remote peer: mc module can be under heavy load
 75                self.get_logger().info(
 76                    f"trying to register input source... [{i}]")
 77
 78            if not future.done():
 79                self.get_logger().error('❌ Service call failed or timed out.')
 80                return False
 81
 82            resp = future.result()
 83            ret_code = resp.response.header.code
 84            task_id = resp.response.task_id
 85
 86            if ret_code == 0:
 87                self.get_logger().info(
 88                    f"✅ Input source set successfully. task_id={task_id}"
 89                )
 90                return True
 91            else:
 92                self.get_logger().error(
 93                    f"❌ Input source set failed. ret_code={ret_code}, task_id={task_id} (duplicated ADD? or MODIFY/ENABLE/DISABLE for unknown source?)"
 94                )
 95                return False
 96
 97        except Exception as e:
 98            self.get_logger().error(f"❌ Exception while calling service: {e}")
 99            return False
100
101
102def main(args=None):
103    global client_node
104    rclpy.init(args=args)
105
106    # register signal handlers early
107    signal.signal(signal.SIGINT, signal_handler)
108    signal.signal(signal.SIGTERM, signal_handler)
109
110    try:
111        client_node = McInputClient()
112        ok = client_node.send_input_request()
113        if not ok:
114            client_node.get_logger().error('Input source request failed.')
115    except Exception as e:
116        rclpy.logging.get_logger('main').error(
117            f'Program exited with exception: {e}')
118    finally:
119        if rclpy.ok():
120            rclpy.shutdown()
121
122
123if __name__ == '__main__':
124    main()

6.1.7 获取当前输入源

该示例中用到了GetCurrentInputSource服务,用于获取当前已注册的输入源信息,包括输入源名称、优先级和超时时间等信息。

 1#!/usr/bin/env python3
 2
 3import rclpy
 4from rclpy.node import Node
 5from aimdk_msgs.srv import GetCurrentInputSource
 6from aimdk_msgs.msg import CommonRequest
 7
 8
 9class GetCurrentInputSourceClient(Node):
10    def __init__(self):
11        super().__init__('get_current_input_source_client')
12        self.cli = self.create_client(
13            GetCurrentInputSource,
14            '/aimdk_5Fmsgs/srv/GetCurrentInputSource'
15        )
16
17    def send_request(self):
18        # Wait for the service to be available
19        timeout_sec = 8.0
20        start = self.get_clock().now().nanoseconds / 1e9
21
22        while not self.cli.wait_for_service(timeout_sec=2.0):
23            now = self.get_clock().now().nanoseconds / 1e9
24            if now - start > timeout_sec:
25                self.get_logger().error("Waiting for service timed out")
26                return False
27            self.get_logger().info("Waiting for input source service...")
28
29        # Create request
30        req = GetCurrentInputSource.Request()
31        req.request = CommonRequest()
32
33        # Send request and wait for response
34        self.get_logger().info("Sending request to get current input source")
35        for i in range(8):
36            req.request.header.stamp = self.get_clock().now().to_msg()
37            future = self.cli.call_async(req)
38            rclpy.spin_until_future_complete(self, future, timeout_sec=0.25)
39
40            if future.done():
41                break
42
43        if future.done():
44            try:
45                response = future.result()
46                ret_code = response.response.header.code
47                if ret_code == 0:
48                    self.get_logger().info(
49                        f"Current input source: {response.input_source.name}")
50                    self.get_logger().info(
51                        f"Priority: {response.input_source.priority}")
52                    self.get_logger().info(
53                        f"Timeout: {response.input_source.timeout}")
54                    return True
55                else:
56                    self.get_logger().error(
57                        f"Service returned failure, return code: {ret_code}")
58                    return False
59            except Exception as e:
60                self.get_logger().error(f"Exception occurred: {str(e)}")
61                return False
62        else:
63            self.get_logger().error("Service call failed or timed out")
64            return False
65
66
67def main(args=None):
68    rclpy.init(args=args)
69
70    node = GetCurrentInputSourceClient()
71
72    success = node.send_request()
73
74    node.destroy_node()
75    rclpy.shutdown()
76
77    return 0 if success else 1
78
79
80if __name__ == '__main__':
81    exit(main())

使用说明

# 获取当前输入源信息
ros2 run py_examples get_current_input_source

输出示例

[INFO] [get_current_input_source_client]: 当前输入源: node
[INFO] [get_current_input_source_client]: 优先级: 40
[INFO] [get_current_input_source_client]: 超时时间: 1000

注意事项

  • 确保GetCurrentInputSource服务正常运行

  • 需要在注册输入源之后才能获取到有效信息

  • 状态码为0表示查询成功

6.1.8 控制机器人走跑

该示例中用到了mc_locomotion_velocity,以下示例通过发布/aima/mc/locomotion/velocity话题控制机器人的行走,对于v0.7之后的版本,实现速度控制前需要注册输入源(该示例已实现注册输入源),具体注册步骤可参考代码。

  1#!/usr/bin/env python3
  2
  3import rclpy
  4from rclpy.node import Node
  5import threading
  6import time
  7import signal
  8import sys
  9
 10from aimdk_msgs.msg import McLocomotionVelocity, MessageHeader
 11from aimdk_msgs.srv import SetMcInputSource
 12
 13
 14class DirectVelocityControl(Node):
 15    def __init__(self):
 16        super().__init__('direct_velocity_control')
 17
 18        self.publisher = self.create_publisher(
 19            McLocomotionVelocity, '/aima/mc/locomotion/velocity', 10)
 20        self.client = self.create_client(
 21            SetMcInputSource, '/aimdk_5Fmsgs/srv/SetMcInputSource')
 22
 23        self.forward_velocity = 0.0
 24        self.lateral_velocity = 0.0
 25        self.angular_velocity = 0.0
 26
 27        self.max_forward_speed = 1.0
 28        self.min_forward_speed = 0.2
 29
 30        self.max_lateral_speed = 0.5
 31        self.min_lateral_speed = 0.2
 32
 33        self.max_angular_speed = 1.0
 34        self.min_angular_speed = 0.1
 35
 36        self.timer = None
 37
 38        self.get_logger().info("Direct velocity control node started!")
 39
 40    def start_publish(self):
 41        if not self.timer:
 42            self.timer = self.create_timer(0.02, self.publish_velocity)
 43
 44    def register_input_source(self):
 45        self.get_logger().info("Registering input source...")
 46
 47        timeout_sec = 8.0
 48        start = self.get_clock().now().nanoseconds / 1e9
 49
 50        while not self.client.wait_for_service(timeout_sec=2.0):
 51            now = self.get_clock().now().nanoseconds / 1e9
 52            if now - start > timeout_sec:
 53                self.get_logger().error("Waiting for service timed out")
 54                return False
 55            self.get_logger().info("Waiting for input source service...")
 56
 57        req = SetMcInputSource.Request()
 58        req.action.value = 1001
 59        req.input_source.name = "node"
 60        req.input_source.priority = 40
 61        req.input_source.timeout = 1000
 62
 63        for i in range(8):
 64            req.request.header.stamp = self.get_clock().now().to_msg()
 65            future = self.client.call_async(req)
 66            rclpy.spin_until_future_complete(self, future, timeout_sec=0.25)
 67
 68            if future.done():
 69                break
 70
 71            # retry as remote peer: mc module can be under heavy load
 72            self.get_logger().info(f"trying to register input source... [{i}]")
 73
 74        if future.done():
 75            try:
 76                response = future.result()
 77                state = response.response.state.value
 78                self.get_logger().info(
 79                    f"Input source set successfully: state={state}, task_id={response.response.task_id}")
 80                return True
 81            except Exception as e:
 82                self.get_logger().error(f"Service call exception: {str(e)}")
 83                return False
 84        else:
 85            self.get_logger().error("Service call failed or timed out")
 86            return False
 87
 88    def publish_velocity(self):
 89        msg = McLocomotionVelocity()
 90        msg.header = MessageHeader()
 91        msg.header.stamp = self.get_clock().now().to_msg()
 92        msg.source = "node"
 93        msg.forward_velocity = self.forward_velocity
 94        msg.lateral_velocity = self.lateral_velocity
 95        msg.angular_velocity = self.angular_velocity
 96
 97        self.publisher.publish(msg)
 98
 99        self.get_logger().info(
100            f"Publishing velocity: forward {self.forward_velocity:.2f} m/s, "
101            f"lateral {self.lateral_velocity:.2f} m/s, "
102            f"angular {self.angular_velocity:.2f} rad/s"
103        )
104
105    def set_forward(self, forward):
106        # check value range, mc has thresholds to start movement
107        if abs(forward) < 0.005:
108            self.forward_velocity = 0.0
109            return True
110        elif abs(forward) > self.max_forward_speed or abs(forward) < self.min_forward_speed:
111            raise ValueError("out of range")
112        else:
113            self.forward_velocity = forward
114            return True
115
116    def set_lateral(self, lateral):
117        # check value range, mc has thresholds to start movement
118        if abs(lateral) < 0.005:
119            self.lateral_velocity = 0.0
120            return True
121        elif abs(lateral) > self.max_lateral_speed or abs(lateral) < self.min_lateral_speed:
122            raise ValueError("out of range")
123        else:
124            self.lateral_velocity = lateral
125            return True
126
127    def set_angular(self, angular):
128        # check value range, mc has thresholds to start movement
129        if abs(angular) < 0.005:
130            self.angular_velocity = 0.0
131            return True
132        elif abs(angular) > self.max_angular_speed or abs(angular) < self.min_angular_speed:
133            raise ValueError("out of range")
134        else:
135            self.angular_velocity = angular
136            return True
137
138    def clear_velocity(self):
139        self.forward_velocity = 0.0
140        self.lateral_velocity = 0.0
141        self.angular_velocity = 0.0
142
143
144# Global node instance for signal handling
145global_node = None
146
147
148def signal_handler(sig, frame):
149    global global_node
150    if global_node is not None:
151        global_node.clear_velocity()
152        global_node.get_logger().info(
153            f"Received signal {sig}, clearing velocity and shutting down")
154    rclpy.shutdown()
155    sys.exit(0)
156
157
158def main():
159    global global_node
160    rclpy.init()
161
162    node = DirectVelocityControl()
163    global_node = node
164
165    signal.signal(signal.SIGINT, signal_handler)
166    signal.signal(signal.SIGTERM, signal_handler)
167
168    if not node.register_input_source():
169        node.get_logger().error("Input source registration failed, exiting")
170        rclpy.shutdown()
171        return
172
173    # get and check control values
174    # notice that mc has thresholds to start movement
175    try:
176        # get input forward
177        forward = float(
178            input("Please enter forward velocity 0 or ±(0.2 ~ 1.0) m/s: "))
179        node.set_forward(forward)
180        # get input lateral
181        lateral = float(
182            input("Please enter lateral velocity 0 or ±(0.2 ~ 0.5) m/s: "))
183        node.set_lateral(lateral)
184        # get input angular
185        angular = float(
186            input("Please enter angular velocity 0 or ±(0.1 ~ 1.0) rad/s: "))
187        node.set_angular(angular)
188    except Exception as e:
189        node.get_logger().error(f"Invalid input: {e}")
190        rclpy.shutdown()
191        return
192
193    node.get_logger().info("Setting velocity, moving for 5 seconds")
194    node.start_publish()
195
196    start = node.get_clock().now()
197    while (node.get_clock().now() - start).nanoseconds / 1e9 < 5.0:
198        rclpy.spin_once(node, timeout_sec=0.1)
199        time.sleep(0.001)
200
201    node.clear_velocity()
202    node.get_logger().info("5-second motion finished, robot stopped")
203
204    rclpy.spin(node)
205    rclpy.shutdown()
206
207
208if __name__ == '__main__':
209    main()

6.1.9 关节电机控制

本示例展示了如何使用ROS2和Ruckig库来控制机器人的关节运动。

注意

注意⚠️ :运行本示例前需要在机器人运控计算单元(PC1)使用aima em stop-app mc关闭原生运控模块,获取机器人控制权。注意确保机器人安全

!该示例代码直接对底层电机也就是HAL层进行控制,在运行程序前请检查代码中对关节的安全限位是否与所用机器人相符, 并确保安全!

机器人关节控制示例

这个示例展示了如何使用ROS2和Ruckig库来控制机器人的关节运动。示例实现了以下功能:

  1. 机器人关节模型定义

  2. 基于Ruckig的轨迹插补

  3. 多关节协同控制

  4. 实时位置、速度和加速度控制

示例功能说明

  1. 创建了四个控制器节点,分别控制:

    • 腿部x2(12个关节)

    • 腰部x1(3个关节)

    • 手臂x2(14个关节)

    • 头部x1(2个关节)

  2. 演示功能:

    • 每10秒让指定关节在正负0.5弧度之间摆动

    • 使用Ruckig库实现平滑的运动轨迹

    • 实时发布关节控制命令

自定义使用

  1. 添加新的控制逻辑:

    • 添加新的控制回调函数

    • 自由组合关节运动

  2. 调整控制频率:

    • 修改control_timer_的周期(当前为2ms)

  1#!/usr/bin/env python3
  2"""
  3Robot joint control example
  4This script implements a ROS2-based robot joint control system, using the Ruckig trajectory
  5planner to achieve smooth joint motion control.
  6
  7Main features:
  81. Supports controlling multiple joint areas (head, arm, waist, leg)
  92. Uses Ruckig for trajectory planning to ensure smooth motion
 103. Supports real-time control of joint position, velocity, and acceleration
 114. Provides joint limit and PID (stiffness/damping) parameter configuration
 12"""
 13
 14import rclpy
 15from rclpy.node import Node
 16from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy, DurabilityPolicy
 17from aimdk_msgs.msg import JointCommandArray, JointStateArray, JointCommand
 18from std_msgs.msg import Header
 19import ruckig
 20from enum import Enum
 21from dataclasses import dataclass
 22from typing import List, Dict
 23from threading import Lock
 24import time
 25
 26# QoS config: define ROS2 Quality of Service parameters
 27# Subscriber QoS: best-effort reliability, keep last 10 messages
 28subscriber_qos = QoSProfile(
 29    reliability=ReliabilityPolicy.BEST_EFFORT,
 30    history=HistoryPolicy.KEEP_LAST,
 31    depth=10,
 32    durability=DurabilityPolicy.VOLATILE
 33)
 34
 35# Publisher QoS: reliable transport, keep last 10 messages
 36publisher_qos = QoSProfile(
 37    reliability=ReliabilityPolicy.RELIABLE,
 38    history=HistoryPolicy.KEEP_LAST,
 39    depth=10,
 40    durability=DurabilityPolicy.VOLATILE
 41)
 42
 43
 44class JointArea(Enum):
 45    HEAD = 'HEAD'    # Head joints
 46    ARM = 'ARM'      # Arm joints
 47    WAIST = 'WAIST'  # Waist joints
 48    LEG = 'LEG'      # Leg joints
 49
 50
 51@dataclass
 52class JointInfo:
 53    # Joint information data class
 54    name: str           # Joint name
 55    lower_limit: float  # Joint lower angle limit
 56    upper_limit: float  # Joint upper angle limit
 57    kp: float           # Position control proportional gain
 58    kd: float           # Velocity control derivative gain
 59
 60
 61# Robot model configuration: define all joint parameters
 62robot_model: Dict[JointArea, List[JointInfo]] = {
 63    # Leg joint configuration
 64    JointArea.LEG: [
 65        # Left leg joints
 66        JointInfo("left_hip_pitch_joint", -2.4871,
 67                  2.4871, 40.0, 4.0),    # left hip pitch
 68        JointInfo("left_hip_roll_joint", -0.12217,
 69                  2.9059, 40.0, 4.0),    # left hip roll
 70        JointInfo("left_hip_yaw_joint", -1.6842,
 71                  3.4296, 30.0, 3.0),    # left hip yaw
 72        JointInfo("left_knee_joint", 0.026179,
 73                  2.1206, 80.0, 8.0),    # left knee
 74        JointInfo("left_ankle_pitch_joint", -0.80285,
 75                  0.45378, 40.0, 4.0),   # left ankle pitch
 76        JointInfo("left_ankle_roll_joint", -0.2618,
 77                  0.2618, 20.0, 2.0),    # left ankle roll
 78        # Right leg joints
 79        JointInfo("right_hip_pitch_joint", -2.4871,
 80                  2.4871, 40.0, 4.0),    # right hip pitch
 81        JointInfo("right_hip_roll_joint", -2.9059,
 82                  0.12217, 40.0, 4.0),   # right hip roll
 83        JointInfo("right_hip_yaw_joint", -3.4296,
 84                  1.6842, 30.0, 3.0),    # right hip yaw
 85        JointInfo("right_knee_joint", 0.026179,
 86                  2.1206, 80.0, 8.0),    # right knee
 87        JointInfo("right_ankle_pitch_joint", -0.80285,
 88                  0.45378, 40.0, 4.0),   # right ankle pitch
 89        JointInfo("right_ankle_roll_joint", -0.2618,
 90                  0.2618, 20.0, 2.0),    # right ankle roll
 91    ],
 92    # Waist joint configuration
 93    JointArea.WAIST: [
 94        JointInfo("waist_yaw_joint", -3.4296,
 95                  2.3824, 20.0, 4.0),     # waist yaw
 96        JointInfo("waist_pitch_joint", -0.17453,
 97                  0.17453, 20.0, 4.0),  # waist pitch
 98        JointInfo("waist_roll_joint", -0.48869,
 99                  0.48869, 20.0, 4.0),   # waist roll
100    ],
101    # Arm joint configuration
102    JointArea.ARM: [
103        # Left arm
104        JointInfo("left_shoulder_pitch_joint", -
105                  2.5569, 2.5569, 20.0, 2.0),  # left shoulder pitch
106        JointInfo("left_shoulder_roll_joint", -
107                  0.06108, 3.3598, 20.0, 2.0),  # left shoulder roll
108        JointInfo("left_shoulder_yaw_joint", -
109                  2.5569, 2.5569, 20.0, 2.0),   # left shoulder yaw
110        JointInfo("left_elbow_pitch_joint", -2.4435,
111                  0.0, 20.0, 2.0),              # left elbow pitch
112        JointInfo("left_elbow_roll_joint", -2.5569,
113                  2.5569, 20.0, 2.0),           # left elbow roll
114        JointInfo("left_wrist_pitch_joint", -0.5585,
115                  0.5585, 20.0, 2.0),           # left wrist pitch
116        JointInfo("left_wrist_yaw_joint", -1.5446,
117                  1.5446, 20.0, 2.0),           # left wrist yaw
118        # Right arm
119        JointInfo("right_shoulder_pitch_joint", -
120                  2.5569, 2.5569, 20.0, 2.0),   # right shoulder pitch
121        JointInfo("right_shoulder_roll_joint", -
122                  3.3598, 0.06108, 20.0, 2.0),  # right shoulder roll
123        JointInfo("right_shoulder_yaw_joint", -
124                  2.5569, 2.5569, 20.0, 2.0),   # right shoulder yaw
125        JointInfo("right_elbow_pitch_joint", 0.0,
126                  2.4435, 20.0, 2.0),           # right elbow pitch
127        JointInfo("right_elbow_roll_joint", -2.5569,
128                  2.5569, 20.0, 2.0),           # right elbow roll
129        JointInfo("right_wrist_pitch_joint", -
130                  0.5585, 0.5585, 20.0, 2.0),   # right wrist pitch
131        JointInfo("right_wrist_yaw_joint", -1.5446,
132                  1.5446, 20.0, 2.0),           # right wrist yaw
133    ],
134    # Head joint configuration
135    JointArea.HEAD: [
136        JointInfo("head_pitch_joint", -0.61087,
137                  0.61087, 20.0, 2.0),  # head pitch
138        JointInfo("head_yaw_joint", -0.61087,
139                  0.61087, 20.0, 2.0),    # head yaw
140    ],
141}
142
143
144class JointControllerNode(Node):
145    """
146    Joint controller node
147    Responsible for receiving joint states, using Ruckig for trajectory planning,
148    and publishing joint commands.
149    """
150
151    def __init__(self, node_name: str, sub_topic: str, pub_topic: str, area: JointArea, dofs: int):
152        """
153        Initialize joint controller
154        Args:
155            node_name: node name
156            sub_topic: topic name to subscribe (joint states)
157            pub_topic: topic name to publish (joint commands)
158            area: joint area (head/arm/waist/leg)
159            dofs: number of DOFs
160        """
161        super().__init__(node_name)
162        self.lock = Lock()
163        self.joint_info = robot_model[area]
164        self.dofs = dofs
165        self.ruckig = ruckig.Ruckig(dofs, 0.002)  # 2 ms control period
166        self.input = ruckig.InputParameter(dofs)
167        self.output = ruckig.OutputParameter(dofs)
168        self.ruckig_initialized = False
169
170        # Initialize trajectory parameters
171        self.input.current_position = [0.0] * dofs
172        self.input.current_velocity = [0.0] * dofs
173        self.input.current_acceleration = [0.0] * dofs
174
175        # Motion limits
176        self.input.max_velocity = [1.0] * dofs
177        self.input.max_acceleration = [1.0] * dofs
178        self.input.max_jerk = [25.0] * dofs
179
180        # ROS2 subscriber and publisher
181        self.sub = self.create_subscription(
182            JointStateArray,
183            sub_topic,
184            self.joint_state_callback,
185            subscriber_qos
186        )
187        self.pub = self.create_publisher(
188            JointCommandArray,
189            pub_topic,
190            publisher_qos
191        )
192
193    def joint_state_callback(self, msg: JointStateArray):
194        """
195        Joint state callback
196        Receives and processes joint state messages
197        """
198        self.ruckig_initialized = True
199
200    def control_callback(self, joint_idx):
201        """
202        Control callback
203        Uses Ruckig for trajectory planning and publishes control commands
204        Args:
205            joint_idx: target joint index
206        """
207        # Run Ruckig until the target is reached
208        while self.ruckig.update(self.input, self.output) in [ruckig.Result.Working, ruckig.Result.Finished]:
209            # Update current state
210            self.input.current_position = self.output.new_position
211            self.input.current_velocity = self.output.new_velocity
212            self.input.current_acceleration = self.output.new_acceleration
213
214            # Check if target is reached
215            tolerance = 1e-6
216            current_p = self.output.new_position[joint_idx]
217            if abs(current_p - self.input.target_position[joint_idx]) < tolerance:
218                break
219
220            # Create and publish command
221            cmd = JointCommandArray()
222            for i, joint in enumerate(self.joint_info):
223                j = JointCommand()
224                j.name = joint.name
225                j.position = self.output.new_position[i]
226                j.velocity = self.output.new_velocity[i]
227                j.effort = 0.0
228                j.stiffness = joint.kp
229                j.damping = joint.kd
230                cmd.joints.append(j)
231
232            self.pub.publish(cmd)
233
234    def set_target_position(self, joint_name, position):
235        """
236        Set target joint position
237        Args:
238            joint_name: joint name
239            position: target position
240        """
241        p_s = [0.0] * self.dofs
242        joint_idx = 0
243        for i, joint in enumerate(self.joint_info):
244            if joint.name == joint_name:
245                p_s[i] = position
246                joint_idx = i
247        self.input.target_position = p_s
248        self.input.target_velocity = [0.0] * self.dofs
249        self.input.target_acceleration = [0.0] * self.dofs
250        self.control_callback(joint_idx)
251
252
253def main(args=None):
254    """
255    Main function
256    Initialize ROS2 node and start joint controller
257    """
258    rclpy.init(args=args)
259
260    # Create leg controller node
261    leg_node = JointControllerNode(
262        "leg_node",
263        "/aima/hal/joint/leg/state",
264        "/aima/hal/joint/leg/command",
265        JointArea.LEG,
266        12
267    )
268
269    # waist_node = JointControllerNode(
270    #     "waist_node",
271    #     "/aima/hal/joint/waist/state",
272    #     "/aima/hal/joint/waist/command",
273    #     JointArea.WAIST,
274    #     3
275    # )
276
277    # arm_node = JointControllerNode(
278    #     "arm_node",
279    #     "/aima/hal/joint/arm/state",
280    #     "/aima/hal/joint/arm/command",
281    #     JointArea.ARM,
282    #     14
283    # )
284
285    # head_node = JointControllerNode(
286    #     "head_node",
287    #     "/aima/hal/joint/head/state",
288    #     "/aima/hal/joint/head/command",
289    #     JointArea.HEAD,
290    #     2
291    # )
292
293    position = 0.8
294
295    # Only control the left leg joint. If you want to control a specific joint, assign it directly.
296    def timer_callback():
297        """
298        Timer callback
299        Periodically change target position to achieve oscillating motion
300        """
301        nonlocal position
302        position = -position
303        position = 1.3 + position
304        leg_node.set_target_position("left_knee_joint", position)
305
306    #     arm_node.set_target_position("left_shoulder_pitch_joint", position)
307    #     waist_node.set_target_position("waist_yaw_joint", position)
308    #     head_node.set_target_position("head_pitch_joint", position)
309
310    leg_node.create_timer(3.0, timer_callback)
311
312    # Multi-threaded executor
313    executor = rclpy.executors.MultiThreadedExecutor()
314    executor.add_node(leg_node)
315
316    # executor.add_node(waist_node)
317    # executor.add_node(arm_node)
318    # executor.add_node(head_node)
319
320    try:
321        executor.spin()
322    except KeyboardInterrupt:
323        pass
324    finally:
325        leg_node.destroy_node()
326        # waist_node.destroy_node()
327        # arm_node.destroy_node()
328        # head_node.destroy_node()
329        if rclpy.ok():
330            rclpy.shutdown()
331
332
333if __name__ == '__main__':
334    main()

6.1.10 键盘控制机器人

本示例实现了通过PC的键盘输入控制机器人的前进后退转弯的功能。

通过WASD控制机器人行走方向,增加/减少速度(±0.2 m/s),Q/E增加/减少角速度(±0.1 rad/s),ESC退出程序并释放终端资源,Space立即将速度归零,执行急停

小心

注意:运行本示例前需要先使用手柄,将机器人切入稳定站立模式。(位控站立/走跑模式时按R2+X, 其他模式详见模式流转图进行操作),然后在机器人的终端界面使用:aima em stop-app rc关闭遥控器,防止通道占用。

实现键盘控制前需要注册输入源(该示例已实现注册输入源)

  1#!/usr/bin/env python3
  2
  3import rclpy
  4import rclpy.logging
  5from rclpy.node import Node
  6from aimdk_msgs.msg import McLocomotionVelocity, MessageHeader
  7from aimdk_msgs.srv import SetMcInputSource
  8import curses
  9import time
 10from functools import partial
 11
 12
 13class KeyboardVelocityController(Node):
 14    def __init__(self, stdscr):
 15        super().__init__('keyboard_velocity_controller')
 16        self.stdscr = stdscr
 17        self.forward_velocity = 0.0
 18        self.lateral_velocity = 0.0
 19        self.angular_velocity = 0.0
 20        self.step = 0.2
 21        self.angular_step = 0.1
 22
 23        self.publisher = self.create_publisher(
 24            McLocomotionVelocity, '/aima/mc/locomotion/velocity', 10)
 25        self.client = self.create_client(
 26            SetMcInputSource, '/aimdk_5Fmsgs/srv/SetMcInputSource')
 27
 28        if not self.register_input_source():
 29            self.get_logger().error("Input source registration failed, exiting")
 30            raise RuntimeError("Failed to register input source")
 31
 32        # Configure curses
 33        curses.cbreak()
 34        curses.noecho()
 35        self.stdscr.keypad(True)
 36        self.stdscr.nodelay(True)
 37
 38        self.get_logger().info(
 39            "Control started: W/S forward/backward, A/D strafe, Q/E turn, Space stop, Esc exit")
 40
 41        # Timer: check keyboard every 50 ms
 42        self.timer = self.create_timer(0.05, self.check_key_and_publish)
 43
 44    def register_input_source(self):
 45        self.get_logger().info("Registering input source...")
 46
 47        timeout_sec = 8.0
 48        start = self.get_clock().now().nanoseconds / 1e9
 49
 50        while not self.client.wait_for_service(timeout_sec=2.0):
 51            now = self.get_clock().now().nanoseconds / 1e9
 52            if now - start > timeout_sec:
 53                self.get_logger().error("Waiting for service timed out")
 54                return False
 55            self.get_logger().info("Waiting for input source service...")
 56
 57        req = SetMcInputSource.Request()
 58        req.action.value = 1001
 59        req.input_source.name = "node"
 60        req.input_source.priority = 40
 61        req.input_source.timeout = 1000
 62
 63        for i in range(8):
 64            req.request.header.stamp = self.get_clock().now().to_msg()
 65            future = self.client.call_async(req)
 66            rclpy.spin_until_future_complete(self, future, timeout_sec=0.25)
 67
 68            if future.done():
 69                break
 70
 71            # retry as remote peer: mc module can be under heavy load
 72            self.get_logger().info(f"trying to register input source... [{i}]")
 73
 74        if future.done():
 75            try:
 76                resp = future.result()
 77                state = resp.response.state.value
 78                self.get_logger().info(
 79                    f"Input source set successfully: state={state}, task_id={resp.response.task_id}")
 80                return True
 81            except Exception as e:
 82                self.get_logger().error(f"Service exception: {str(e)}")
 83                return False
 84        else:
 85            self.get_logger().error("Service call failed or timed out")
 86            return False
 87
 88    def check_key_and_publish(self):
 89        try:
 90            ch = self.stdscr.getch()
 91        except Exception:
 92            ch = -1
 93
 94        if ch != -1:
 95            if ch == ord(' '):
 96                self.forward_velocity = 0.0
 97                self.lateral_velocity = 0.0
 98                self.angular_velocity = 0.0
 99            elif ch == ord('w'):
100                self.forward_velocity = min(
101                    self.forward_velocity + self.step, 1.0)
102            elif ch == ord('s'):
103                self.forward_velocity = max(
104                    self.forward_velocity - self.step, -1.0)
105            elif ch == ord('a'):
106                self.lateral_velocity = min(
107                    self.lateral_velocity + self.step, 1.0)
108            elif ch == ord('d'):
109                self.lateral_velocity = max(
110                    self.lateral_velocity - self.step, -1.0)
111            elif ch == ord('q'):
112                self.angular_velocity = min(
113                    self.angular_velocity + self.angular_step, 1.0)
114            elif ch == ord('e'):
115                self.angular_velocity = max(
116                    self.angular_velocity - self.angular_step, -1.0)
117            elif ch == 27:  # ESC
118                self.get_logger().info("Exiting control")
119                rclpy.shutdown()
120                return
121
122        msg = McLocomotionVelocity()
123        msg.header = MessageHeader()
124        msg.header.stamp = self.get_clock().now().to_msg()
125        msg.source = "node"
126        msg.forward_velocity = self.forward_velocity
127        msg.lateral_velocity = self.lateral_velocity
128        msg.angular_velocity = self.angular_velocity
129
130        self.publisher.publish(msg)
131
132        # Update UI
133        self.stdscr.clear()
134        self.stdscr.addstr(
135            0, 0, "W/S: Forward/Backward | A/D: Strafe | Q/E: Turn | Space: Stop | ESC: Exit")
136        self.stdscr.addstr(2, 0,
137                           f"Speed Status: Forward: {self.forward_velocity:.2f} m/s | "
138                           f"Lateral: {self.lateral_velocity:.2f} m/s | "
139                           f"Angular: {self.angular_velocity:.2f} rad/s")
140        self.stdscr.refresh()
141
142
143def curses_main(stdscr):
144    rclpy.init()
145    try:
146        node = KeyboardVelocityController(stdscr)
147        rclpy.spin(node)
148    except Exception as e:
149        rclpy.logging.get_logger("main").fatal(
150            f"Program exited with exception: {e}")
151    finally:
152        curses.endwin()
153        rclpy.shutdown()
154
155
156def main():
157    curses.wrapper(curses_main)
158
159
160if __name__ == '__main__':
161    main()

6.1.11 拍照

该示例中用到了take_photo,在运行节点程序前,修改需要拍照的相机话题,启动节点程序后,会创建/images/目录,将当前帧图像保存在这个目录下。

 1#!/usr/bin/env python3
 2import time
 3from pathlib import Path
 4
 5import rclpy
 6from rclpy.node import Node
 7from rclpy.qos import qos_profile_sensor_data
 8from sensor_msgs.msg import Image
 9from cv_bridge import CvBridge
10import cv2
11
12
13class SaveOneRawPy(Node):
14    def __init__(self):
15        super().__init__('save_one_image')
16
17        # parameter: image topic
18        self.declare_parameter(
19            'image_topic', '/aima/hal/sensor/stereo_head_front_left/rgb_image'
20        )
21        self.topic = self.get_parameter(
22            'image_topic').get_parameter_value().string_value
23
24        # save directory
25        self.save_dir = Path('images').resolve()
26        self.save_dir.mkdir(parents=True, exist_ok=True)
27
28        # state
29        self._saved = False
30        self._bridge = CvBridge()
31
32        # subscriber (sensor QoS)
33        self.sub = self.create_subscription(
34            Image,
35            self.topic,
36            self.image_cb,
37            qos_profile_sensor_data
38        )
39        self.get_logger().info(f'Subscribing to raw image: {self.topic}')
40        self.get_logger().info(f'Images will be saved to: {self.save_dir}')
41
42    def image_cb(self, msg: Image):
43        # already saved one, ignore later frames
44        if self._saved:
45            return
46
47        try:
48            enc = msg.encoding.lower()
49            self.get_logger().info(f'Received image with encoding: {enc}')
50
51            # convert from ROS Image to cv2
52            img = self._bridge.imgmsg_to_cv2(
53                msg, desired_encoding='passthrough')
54
55            # normalize to BGR for saving
56            if enc == 'rgb8':
57                img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
58            elif enc == 'mono8':
59                img = cv2.cvtColor(img, cv2.COLOR_GRAY2BGR)
60            # if it's bgr8 or other 8-bit bgr that cv2 can save, we just use it
61
62            ts_ms = int(time.time() * 1000)
63            out_path = self.save_dir / f'frame_{ts_ms}.png'
64
65            ok = cv2.imwrite(str(out_path), img)
66            if ok:
67                self.get_logger().info(
68                    f'Saved image: {out_path}  ({img.shape[1]}x{img.shape[0]})'
69                )
70                self._saved = True
71                # shut down once we got exactly one frame
72                # destroy node first, then shutdown rclpy
73                self.destroy_node()
74                if rclpy.ok():
75                    rclpy.shutdown()
76            else:
77                self.get_logger().error(f'cv2.imwrite failed: {out_path}')
78        except Exception as e:
79            self.get_logger().error(f'Failed to decode / save image: {e}')
80
81
82def main():
83    rclpy.init()
84    node = SaveOneRawPy()
85    rclpy.spin(node)
86    # in case the node was already destroyed in the callback
87    if rclpy.ok():
88        try:
89            node.destroy_node()
90        except Exception:
91            pass
92        rclpy.shutdown()
93
94
95if __name__ == '__main__':
96    main()

6.1.12 相机推流示例集

该示例集提供了多种相机数据订阅和处理功能,支持深度相机、双目相机和单目相机的数据流订阅。
这些相机数据订阅 example 并没有实际的业务用途, 仅提供相机数据基础信息的打印;若您比较熟悉 ros2 使用,会发现 ros2 topic echo + ros2 topic hz 也能够实现 example 提供的功能。 您可以选择快速查阅 SDK 接口手册中话题列表直接快速进入自己模块的开发,也可以使用相机 example 作为脚手架加入自己的业务逻辑。 我们发布的传感器数据均为未经预处理(去畸变等)的原始数据,如果您需要查询传感器的详细信息(如分辨率、焦距等),请关注相应的内参(camera_info)话题。

深度相机数据订阅

该示例中用到了echo_camera_rgbd,通过订阅/aima/hal/sensor/rgbd_head_front/话题来接收机器人的深度相机数据,支持深度点云、深度图、RGB图、压缩RGB图和相机内参等多种数据类型。

功能特点:

  • 支持多种数据类型订阅(深度点云、深度图、RGB图、压缩图、相机内参)

  • 实时FPS统计和数据显示

  • 支持RGB图视频录制功能

  • 可配置的topic类型选择

支持的数据类型:

  • depth_pointcloud: 深度点云数据 (sensor_msgs/PointCloud2)

  • depth_image: 深度图像 (sensor_msgs/Image)

  • rgb_image: RGB图像 (sensor_msgs/Image)

  • rgb_image_compressed: 压缩RGB图像 (sensor_msgs/CompressedImage)

  • camera_info: 相机内参 (sensor_msgs/CameraInfo)

  1#!/usr/bin/env python3
  2"""
  3Head depth camera multi-topic subscription example
  4
  5Supports selecting the topic type to subscribe via startup parameter --ros-args -p topic_type:=<type>:
  6  - depth_pointcloud: Depth point cloud (sensor_msgs/PointCloud2)
  7  - depth_image: Depth image (sensor_msgs/Image)
  8  - depth_camera_info: Camera intrinsic parameters (sensor_msgs/CameraInfo)
  9  - rgb_image: RGB image (sensor_msgs/Image)
 10  - rgb_image_compressed: RGB compressed image (sensor_msgs/CompressedImage)
 11  - rgb_camera_info: Camera intrinsic parameters (sensor_msgs/CameraInfo)
 12
 13Example:
 14  ros2 run py_examples echo_camera_rgbd --ros-args -p topic_type:=depth_pointcloud
 15  ros2 run py_examples echo_camera_rgbd --ros-args -p topic_type:=rgb_image
 16  ros2 run py_examples echo_camera_rgbd --ros-args -p topic_type:=rgb_camera_info
 17
 18Default topic_type is rgb_image
 19"""
 20
 21import rclpy
 22from rclpy.node import Node
 23from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy
 24from sensor_msgs.msg import Image, CompressedImage, CameraInfo, PointCloud2
 25from collections import deque
 26import time
 27
 28
 29class CameraTopicEcho(Node):
 30    def __init__(self):
 31        super().__init__('camera_topic_echo')
 32
 33        # Select the topic type to subscribe
 34        self.declare_parameter('topic_type', 'rgb_image')
 35        self.declare_parameter('dump_video_path', '')
 36
 37        self.topic_type = self.get_parameter('topic_type').value
 38        self.dump_video_path = self.get_parameter('dump_video_path').value
 39
 40        # SensorDataQoS: BEST_EFFORT + VOLATILE
 41        qos = QoSProfile(
 42            reliability=QoSReliabilityPolicy.BEST_EFFORT,
 43            history=QoSHistoryPolicy.KEEP_LAST,
 44            depth=5
 45        )
 46
 47        # Create different subscribers based on topic_type
 48        if self.topic_type == "depth_pointcloud":
 49            self.topic_name = "/aima/hal/sensor/rgbd_head_front/depth_pointcloud"
 50            self.sub_pointcloud = self.create_subscription(
 51                PointCloud2, self.topic_name, self.cb_pointcloud, qos)
 52            self.get_logger().info(
 53                f"✅ Subscribing PointCloud2: {self.topic_name}")
 54
 55        elif self.topic_type == "depth_image":
 56            self.topic_name = "/aima/hal/sensor/rgbd_head_front/depth_image"
 57            self.sub_image = self.create_subscription(
 58                Image, self.topic_name, self.cb_image, qos)
 59            self.get_logger().info(
 60                f"✅ Subscribing Depth Image: {self.topic_name}")
 61
 62        elif self.topic_type == "rgb_image":
 63            self.topic_name = "/aima/hal/sensor/rgbd_head_front/rgb_image"
 64            self.sub_image = self.create_subscription(
 65                Image, self.topic_name, self.cb_image, qos)
 66            self.get_logger().info(
 67                f"✅ Subscribing RGB Image: {self.topic_name}")
 68            if self.dump_video_path:
 69                self.get_logger().info(
 70                    f"📝 Will dump received images to video: {self.dump_video_path}")
 71
 72        elif self.topic_type == "rgb_image_compressed":
 73            self.topic_name = "/aima/hal/sensor/rgbd_head_front/rgb_image/compressed"
 74            self.sub_compressed = self.create_subscription(
 75                CompressedImage, self.topic_name, self.cb_compressed, qos)
 76            self.get_logger().info(
 77                f"✅ Subscribing CompressedImage: {self.topic_name}")
 78
 79        elif self.topic_type == "rgb_camera_info":
 80            self.topic_name = "/aima/hal/sensor/rgbd_head_front/rgb_camera_info"
 81            # RGB-D CameraInfo is different with other cameras. The best_effort + volatile QoS is enough for 10Hz rgb_camera_info
 82            self.sub_camerainfo = self.create_subscription(
 83                CameraInfo, self.topic_name, self.cb_camerainfo, qos)
 84            self.get_logger().info(
 85                f"✅ Subscribing RGB CameraInfo: {self.topic_name}")
 86
 87        elif self.topic_type == "depth_camera_info":
 88            self.topic_name = "/aima/hal/sensor/rgbd_head_front/depth_camera_info"
 89            # RGB-D CameraInfo is different with other cameras. The best_effort + volatile QoS is enough for 10Hz depth_camera_info
 90            self.sub_camerainfo = self.create_subscription(
 91                CameraInfo, self.topic_name, self.cb_camerainfo, qos)
 92            self.get_logger().info(
 93                f"✅ Subscribing Depth CameraInfo: {self.topic_name}")
 94
 95        else:
 96            self.get_logger().error(f"Unknown topic_type: {self.topic_type}")
 97            raise ValueError("Unknown topic_type")
 98
 99        # Internal state
100        self.last_print = self.get_clock().now()
101        self.arrivals = deque()
102
103    def update_arrivals(self):
104        """Calculate received FPS"""
105        now = self.get_clock().now()
106        self.arrivals.append(now)
107        while self.arrivals and (now - self.arrivals[0]).nanoseconds * 1e-9 > 1.0:
108            self.arrivals.popleft()
109
110    def get_fps(self):
111        """Get FPS"""
112        return len(self.arrivals)
113
114    def should_print(self):
115        """Control print frequency"""
116        now = self.get_clock().now()
117        if (now - self.last_print).nanoseconds * 1e-9 >= 1.0:
118            self.last_print = now
119            return True
120        return False
121
122    def cb_pointcloud(self, msg: PointCloud2):
123        """PointCloud2 callback"""
124        self.update_arrivals()
125
126        if self.should_print():
127            stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
128
129            # Format fields information
130            fields_str = " ".join(
131                [f"{f.name}({f.datatype})" for f in msg.fields])
132
133            self.get_logger().info(
134                f"🌫️ PointCloud2 received\n"
135                f"  • frame_id:        {msg.header.frame_id}\n"
136                f"  • stamp (sec):     {stamp_sec:.6f}\n"
137                f"  • width x height:  {msg.width} x {msg.height}\n"
138                f"  • point_step:      {msg.point_step}\n"
139                f"  • row_step:        {msg.row_step}\n"
140                f"  • fields:          {fields_str}\n"
141                f"  • is_bigendian:    {msg.is_bigendian}\n"
142                f"  • is_dense:        {msg.is_dense}\n"
143                f"  • data size:       {len(msg.data)}\n"
144                f"  • recv FPS (1s):   {self.get_fps():.1f}"
145            )
146
147    def cb_image(self, msg: Image):
148        """Image callback (Depth/RGB image)"""
149        self.update_arrivals()
150
151        if self.should_print():
152            stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
153            self.get_logger().info(
154                f"📸 {self.topic_type} received\n"
155                f"  • frame_id:        {msg.header.frame_id}\n"
156                f"  • stamp (sec):     {stamp_sec:.6f}\n"
157                f"  • encoding:        {msg.encoding}\n"
158                f"  • size (WxH):      {msg.width} x {msg.height}\n"
159                f"  • step (bytes/row):{msg.step}\n"
160                f"  • is_bigendian:    {msg.is_bigendian}\n"
161                f"  • recv FPS (1s):   {self.get_fps():.1f}"
162            )
163
164        # Only RGB image supports video dump
165        if self.topic_type == "rgb_image" and self.dump_video_path:
166            self.dump_image_to_video(msg)
167
168    def cb_compressed(self, msg: CompressedImage):
169        """CompressedImage callback"""
170        self.update_arrivals()
171
172        if self.should_print():
173            stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
174            self.get_logger().info(
175                f"🗜️  CompressedImage received\n"
176                f"  • frame_id:        {msg.header.frame_id}\n"
177                f"  • stamp (sec):     {stamp_sec:.6f}\n"
178                f"  • format:          {msg.format}\n"
179                f"  • data size:       {len(msg.data)}\n"
180                f"  • recv FPS (1s):   {self.get_fps():.1f}"
181            )
182
183    def cb_camerainfo(self, msg: CameraInfo):
184        """CameraInfo callback (camera intrinsic parameters)"""
185        # Camera info will only receive one frame, print it directly
186        stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
187
188        # Format D array
189        d_str = ", ".join([f"{d:.6f}" for d in msg.d])
190
191        # Format K matrix
192        k_str = ", ".join([f"{k:.6f}" for k in msg.k])
193
194        # Format P matrix
195        p_str = ", ".join([f"{p:.6f}" for p in msg.p])
196
197        self.get_logger().info(
198            f"📷 {self.topic_type} received\n"
199            f"  • frame_id:        {msg.header.frame_id}\n"
200            f"  • stamp (sec):     {stamp_sec:.6f}\n"
201            f"  • width x height:  {msg.width} x {msg.height}\n"
202            f"  • distortion_model:{msg.distortion_model}\n"
203            f"  • D: [{d_str}]\n"
204            f"  • K: [{k_str}]\n"
205            f"  • P: [{p_str}]\n"
206            f"  • binning_x: {msg.binning_x}\n"
207            f"  • binning_y: {msg.binning_y}\n"
208            f"  • roi: {{ x_offset: {msg.roi.x_offset}, y_offset: {msg.roi.y_offset}, height: {msg.roi.height}, width: {msg.roi.width}, do_rectify: {msg.roi.do_rectify} }}"
209        )
210
211    def dump_image_to_video(self, msg: Image):
212        """Video dump is only supported for RGB images"""
213        # You can add video recording functionality here
214        # Simplified in the Python version, only logs instead
215        if self.should_print():
216            self.get_logger().info(f"📝 Video dump not implemented in Python version")
217
218
219def main(args=None):
220    rclpy.init(args=args)
221    try:
222        node = CameraTopicEcho()
223        rclpy.spin(node)
224    except KeyboardInterrupt:
225        pass
226    except Exception as e:
227        print(f"Error: {e}")
228    finally:
229        if 'node' in locals():
230            node.destroy_node()
231        rclpy.shutdown()
232
233
234if __name__ == '__main__':
235    main()

使用说明:

  1. 订阅深度点云数据:

    ros2 run py_examples echo_camera_rgbd --ros-args -p topic_type:=depth_pointcloud
    
  2. 订阅RGB图像数据:

    ros2 run py_examples echo_camera_rgbd --ros-args -p topic_type:=rgb_image
    
  3. 订阅相机内参:

    ros2 run py_examples echo_camera_rgbd --ros-args -p topic_type:=rgb_camera_info
    ros2 run py_examples echo_camera_rgbd --ros-args -p topic_type:=depth_camera_info
    
  4. 录制RGB视频:

    # dump_video_path的值可改为其他路径, 注意提前创建该文件所在目录才能保存
    ros2 run py_examples echo_camera_rgbd --ros-args -p topic_type:=rgb_image -p dump_video_path:=$PWD/output.avi
    

双目相机数据订阅

该示例中用到了echo_camera_stereo,通过订阅/aima/hal/sensor/stereo_head_front_*/话题来接收机器人的双目相机数据,支持左右相机的RGB图、压缩图和相机内参数据。

功能特点:

  • 支持左右相机独立数据订阅

  • 实时FPS统计和数据显示

  • 支持RGB图视频录制功能

  • 可配置的相机选择(左/右)

支持的数据类型:

  • left_rgb_image: 左相机RGB图像 (sensor_msgs/Image)

  • left_rgb_image_compressed: 左相机压缩RGB图像 (sensor_msgs/CompressedImage)

  • left_camera_info: 左相机内参 (sensor_msgs/CameraInfo)

  • right_rgb_image: 右相机RGB图像 (sensor_msgs/Image)

  • right_rgb_image_compressed: 右相机压缩RGB图像 (sensor_msgs/CompressedImage)

  • right_camera_info: 右相机内参 (sensor_msgs/CameraInfo)

  1#!/usr/bin/env python3
  2"""
  3Head stereo camera multi-topic subscription example
  4
  5Supports selecting the topic type to subscribe via startup parameter --ros-args -p topic_type:=<type>:
  6  - left_rgb_image: Left camera RGB image (sensor_msgs/Image)
  7  - left_rgb_image_compressed: Left camera RGB compressed image (sensor_msgs/CompressedImage)
  8  - left_camera_info: Left camera intrinsic parameters (sensor_msgs/CameraInfo)
  9  - right_rgb_image: Right camera RGB image (sensor_msgs/Image)
 10  - right_rgb_image_compressed: Right camera RGB compressed image (sensor_msgs/CompressedImage)
 11  - right_camera_info: Right camera intrinsic parameters (sensor_msgs/CameraInfo)
 12
 13Example:
 14  ros2 run py_examples echo_camera_stereo --ros-args -p topic_type:=left_rgb_image
 15  ros2 run py_examples echo_camera_stereo --ros-args -p topic_type:=right_rgb_image
 16  ros2 run py_examples echo_camera_stereo --ros-args -p topic_type:=left_camera_info
 17
 18Default topic_type is left_rgb_image
 19"""
 20
 21import rclpy
 22from rclpy.node import Node
 23from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSDurabilityPolicy, QoSHistoryPolicy
 24from sensor_msgs.msg import Image, CompressedImage, CameraInfo
 25from collections import deque
 26import time
 27
 28
 29class StereoCameraTopicEcho(Node):
 30    def __init__(self):
 31        super().__init__('stereo_camera_topic_echo')
 32
 33        # Select the topic type to subscribe
 34        self.declare_parameter('topic_type', 'left_rgb_image')
 35        self.declare_parameter('dump_video_path', '')
 36
 37        self.topic_type = self.get_parameter('topic_type').value
 38        self.dump_video_path = self.get_parameter('dump_video_path').value
 39
 40        # Set QoS parameters - use sensor data QoS
 41        qos = QoSProfile(
 42            reliability=QoSReliabilityPolicy.BEST_EFFORT,
 43            history=QoSHistoryPolicy.KEEP_LAST,
 44            depth=5,
 45            durability=QoSDurabilityPolicy.VOLATILE
 46        )
 47
 48        # Create different subscribers based on topic_type
 49        if self.topic_type == "left_rgb_image":
 50            self.topic_name = "/aima/hal/sensor/stereo_head_front_left/rgb_image"
 51            self.sub_image = self.create_subscription(
 52                Image, self.topic_name, self.cb_image, qos)
 53            self.get_logger().info(
 54                f"✅ Subscribing Left RGB Image: {self.topic_name}")
 55            if self.dump_video_path:
 56                self.get_logger().info(
 57                    f"📝 Will dump received images to video: {self.dump_video_path}")
 58
 59        elif self.topic_type == "left_rgb_image_compressed":
 60            self.topic_name = "/aima/hal/sensor/stereo_head_front_left/rgb_image/compressed"
 61            self.sub_compressed = self.create_subscription(
 62                CompressedImage, self.topic_name, self.cb_compressed, qos)
 63            self.get_logger().info(
 64                f"✅ Subscribing Left CompressedImage: {self.topic_name}")
 65
 66        elif self.topic_type == "left_camera_info":
 67            self.topic_name = "/aima/hal/sensor/stereo_head_front_left/camera_info"
 68            # CameraInfo subscription must use reliable + transient_local QoS to receive historical messages (even if only one frame is published)
 69            camera_qos = QoSProfile(
 70                reliability=QoSReliabilityPolicy.RELIABLE,
 71                history=QoSHistoryPolicy.KEEP_LAST,
 72                depth=1,
 73                durability=QoSDurabilityPolicy.TRANSIENT_LOCAL
 74            )
 75            self.sub_camerainfo = self.create_subscription(
 76                CameraInfo, self.topic_name, self.cb_camerainfo, camera_qos)
 77            self.get_logger().info(
 78                f"✅ Subscribing Left CameraInfo (with transient_local): {self.topic_name}")
 79
 80        elif self.topic_type == "right_rgb_image":
 81            self.topic_name = "/aima/hal/sensor/stereo_head_front_right/rgb_image"
 82            self.sub_image = self.create_subscription(
 83                Image, self.topic_name, self.cb_image, qos)
 84            self.get_logger().info(
 85                f"✅ Subscribing Right RGB Image: {self.topic_name}")
 86            if self.dump_video_path:
 87                self.get_logger().info(
 88                    f"📝 Will dump received images to video: {self.dump_video_path}")
 89
 90        elif self.topic_type == "right_rgb_image_compressed":
 91            self.topic_name = "/aima/hal/sensor/stereo_head_front_right/rgb_image/compressed"
 92            self.sub_compressed = self.create_subscription(
 93                CompressedImage, self.topic_name, self.cb_compressed, qos)
 94            self.get_logger().info(
 95                f"✅ Subscribing Right CompressedImage: {self.topic_name}")
 96
 97        elif self.topic_type == "right_camera_info":
 98            self.topic_name = "/aima/hal/sensor/stereo_head_front_right/camera_info"
 99            # CameraInfo subscription must use reliable + transient_local QoS to receive historical messages (even if only one frame is published)
100            camera_qos = QoSProfile(
101                reliability=QoSReliabilityPolicy.RELIABLE,
102                history=QoSHistoryPolicy.KEEP_LAST,
103                depth=1,
104                durability=QoSDurabilityPolicy.TRANSIENT_LOCAL
105            )
106            self.sub_camerainfo = self.create_subscription(
107                CameraInfo, self.topic_name, self.cb_camerainfo, camera_qos)
108            self.get_logger().info(
109                f"✅ Subscribing Right CameraInfo (with transient_local): {self.topic_name}")
110
111        else:
112            self.get_logger().error(f"Unknown topic_type: {self.topic_type}")
113            raise ValueError("Unknown topic_type")
114
115        # Internal state
116        self.last_print = self.get_clock().now()
117        self.arrivals = deque()
118
119    def update_arrivals(self):
120        """Calculate received FPS"""
121        now = self.get_clock().now()
122        self.arrivals.append(now)
123        while self.arrivals and (now - self.arrivals[0]).nanoseconds * 1e-9 > 1.0:
124            self.arrivals.popleft()
125
126    def get_fps(self):
127        """Get FPS"""
128        return len(self.arrivals)
129
130    def should_print(self):
131        """Control print frequency"""
132        now = self.get_clock().now()
133        if (now - self.last_print).nanoseconds * 1e-9 >= 1.0:
134            self.last_print = now
135            return True
136        return False
137
138    def cb_image(self, msg: Image):
139        """Image callback (Left/Right camera RGB image)"""
140        self.update_arrivals()
141
142        if self.should_print():
143            stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
144            self.get_logger().info(
145                f"📸 {self.topic_type} received\n"
146                f"  • frame_id:        {msg.header.frame_id}\n"
147                f"  • stamp (sec):     {stamp_sec:.6f}\n"
148                f"  • encoding:        {msg.encoding}\n"
149                f"  • size (WxH):      {msg.width} x {msg.height}\n"
150                f"  • step (bytes/row):{msg.step}\n"
151                f"  • is_bigendian:    {msg.is_bigendian}\n"
152                f"  • recv FPS (1s):   {self.get_fps():.1f}"
153            )
154
155        # Only RGB images support video dump
156        if (self.topic_type in ["left_rgb_image", "right_rgb_image"]) and self.dump_video_path:
157            self.dump_image_to_video(msg)
158
159    def cb_compressed(self, msg: CompressedImage):
160        """CompressedImage callback (Left/Right camera RGB compressed image)"""
161        self.update_arrivals()
162
163        if self.should_print():
164            stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
165            self.get_logger().info(
166                f"🗜️  {self.topic_type} received\n"
167                f"  • frame_id:        {msg.header.frame_id}\n"
168                f"  • stamp (sec):     {stamp_sec:.6f}\n"
169                f"  • format:          {msg.format}\n"
170                f"  • data size:       {len(msg.data)}\n"
171                f"  • recv FPS (1s):   {self.get_fps():.1f}"
172            )
173
174    def cb_camerainfo(self, msg: CameraInfo):
175        """CameraInfo callback (Left/Right camera intrinsic parameters)"""
176        # Camera info will only receive one frame, print it directly
177        stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
178
179        # Format D array
180        d_str = ", ".join([f"{d:.6f}" for d in msg.d])
181
182        # Format K matrix
183        k_str = ", ".join([f"{k:.6f}" for k in msg.k])
184
185        # Format P matrix
186        p_str = ", ".join([f"{p:.6f}" for p in msg.p])
187
188        self.get_logger().info(
189            f"📷 {self.topic_type} received\n"
190            f"  • frame_id:        {msg.header.frame_id}\n"
191            f"  • stamp (sec):     {stamp_sec:.6f}\n"
192            f"  • width x height:  {msg.width} x {msg.height}\n"
193            f"  • distortion_model:{msg.distortion_model}\n"
194            f"  • D: [{d_str}]\n"
195            f"  • K: [{k_str}]\n"
196            f"  • P: [{p_str}]\n"
197            f"  • binning_x: {msg.binning_x}\n"
198            f"  • binning_y: {msg.binning_y}\n"
199            f"  • roi: {{ x_offset: {msg.roi.x_offset}, y_offset: {msg.roi.y_offset}, height: {msg.roi.height}, width: {msg.roi.width}, do_rectify: {msg.roi.do_rectify} }}"
200        )
201
202    def dump_image_to_video(self, msg: Image):
203        """Video dump is only supported for RGB images"""
204        # You can add video recording functionality here
205        # Simplified in the Python version, only logs instead
206        if self.should_print():
207            self.get_logger().info(f"📝 Video dump not implemented in Python version")
208
209
210def main(args=None):
211    rclpy.init(args=args)
212    try:
213        node = StereoCameraTopicEcho()
214        rclpy.spin(node)
215    except KeyboardInterrupt:
216        pass
217    except Exception as e:
218        print(f"Error: {e}")
219    finally:
220        if 'node' in locals():
221            node.destroy_node()
222        rclpy.shutdown()
223
224
225if __name__ == '__main__':
226    main()

使用说明:

  1. 订阅左相机RGB图像:

    ros2 run py_examples echo_camera_stereo --ros-args -p topic_type:=left_rgb_image
    
  2. 订阅右相机RGB图像:

    ros2 run py_examples echo_camera_stereo --ros-args -p topic_type:=right_rgb_image
    
  3. 订阅左相机内参:

    ros2 run py_examples echo_camera_stereo --ros-args -p topic_type:=left_camera_info
    
  4. 录制左相机视频:

    # dump_video_path的值可改为其他路径, 注意提前创建该文件所在目录才能保存
    ros2 run py_examples echo_camera_stereo --ros-args -p topic_type:=left_rgb_image -p dump_video_path:=$PWD/left_camera.avi
    

头部后置单目相机数据订阅

该示例中用到了echo_camera_head_rear,通过订阅/aima/hal/sensor/rgb_head_rear/话题来接收机器人的头部后置单目相机数据,支持RGB图、压缩图和相机内参数据。

功能特点:

  • 支持头部后置相机数据订阅

  • 实时FPS统计和数据显示

  • 支持RGB图视频录制功能

  • 可配置的topic类型选择

支持的数据类型:

  • rgb_image: RGB图像 (sensor_msgs/Image)

  • rgb_image_compressed: 压缩RGB图像 (sensor_msgs/CompressedImage)

  • camera_info: 相机内参 (sensor_msgs/CameraInfo)

  1#!/usr/bin/env python3
  2"""
  3Head rear monocular camera multi-topic subscription example
  4
  5Supports selecting the topic type to subscribe via startup parameter --ros-args -p topic_type:=<type>:
  6  - rgb_image: RGB image (sensor_msgs/Image)
  7  - rgb_image_compressed: RGB compressed image (sensor_msgs/CompressedImage)
  8  - camera_info: Camera intrinsic parameters (sensor_msgs/CameraInfo)
  9
 10Example:
 11  ros2 run py_examples echo_camera_head_rear --ros-args -p topic_type:=rgb_image
 12  ros2 run py_examples echo_camera_head_rear --ros-args -p topic_type:=rgb_image_compressed
 13  ros2 run py_examples echo_camera_head_rear --ros-args -p topic_type:=camera_info
 14
 15Default topic_type is rgb_image
 16"""
 17
 18import rclpy
 19from rclpy.node import Node
 20from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSDurabilityPolicy, QoSHistoryPolicy
 21from sensor_msgs.msg import Image, CompressedImage, CameraInfo
 22from collections import deque
 23import time
 24
 25
 26class HeadRearCameraTopicEcho(Node):
 27    def __init__(self):
 28        super().__init__('head_rear_camera_topic_echo')
 29
 30        # Select the topic type to subscribe
 31        self.declare_parameter('topic_type', 'rgb_image')
 32        self.declare_parameter('dump_video_path', '')
 33
 34        self.topic_type = self.get_parameter('topic_type').value
 35        self.dump_video_path = self.get_parameter('dump_video_path').value
 36
 37        # Set QoS parameters - use sensor data QoS
 38        qos = QoSProfile(
 39            reliability=QoSReliabilityPolicy.BEST_EFFORT,
 40            history=QoSHistoryPolicy.KEEP_LAST,
 41            depth=5,
 42            durability=QoSDurabilityPolicy.VOLATILE
 43        )
 44
 45        # Create different subscribers based on topic_type
 46        if self.topic_type == "rgb_image":
 47            self.topic_name = "/aima/hal/sensor/rgb_head_rear/rgb_image"
 48            self.sub_image = self.create_subscription(
 49                Image, self.topic_name, self.cb_image, qos)
 50            self.get_logger().info(
 51                f"✅ Subscribing RGB Image: {self.topic_name}")
 52            if self.dump_video_path:
 53                self.get_logger().info(
 54                    f"📝 Will dump received images to video: {self.dump_video_path}")
 55
 56        elif self.topic_type == "rgb_image_compressed":
 57            self.topic_name = "/aima/hal/sensor/rgb_head_rear/rgb_image/compressed"
 58            self.sub_compressed = self.create_subscription(
 59                CompressedImage, self.topic_name, self.cb_compressed, qos)
 60            self.get_logger().info(
 61                f"✅ Subscribing CompressedImage: {self.topic_name}")
 62
 63        elif self.topic_type == "camera_info":
 64            self.topic_name = "/aima/hal/sensor/rgb_head_rear/camera_info"
 65            # CameraInfo subscription must use reliable + transient_local QoS to receive historical messages (even if only one frame is published)
 66            camera_qos = QoSProfile(
 67                reliability=QoSReliabilityPolicy.RELIABLE,
 68                history=QoSHistoryPolicy.KEEP_LAST,
 69                depth=1,
 70                durability=QoSDurabilityPolicy.TRANSIENT_LOCAL
 71            )
 72            self.sub_camerainfo = self.create_subscription(
 73                CameraInfo, self.topic_name, self.cb_camerainfo, camera_qos)
 74            self.get_logger().info(
 75                f"✅ Subscribing CameraInfo (with transient_local): {self.topic_name}")
 76
 77        else:
 78            self.get_logger().error(f"Unknown topic_type: {self.topic_type}")
 79            raise ValueError("Unknown topic_type")
 80
 81        # Internal state
 82        self.last_print = self.get_clock().now()
 83        self.arrivals = deque()
 84
 85    def update_arrivals(self):
 86        """Calculate received FPS"""
 87        now = self.get_clock().now()
 88        self.arrivals.append(now)
 89        while self.arrivals and (now - self.arrivals[0]).nanoseconds * 1e-9 > 1.0:
 90            self.arrivals.popleft()
 91
 92    def get_fps(self):
 93        """Get FPS"""
 94        return len(self.arrivals)
 95
 96    def should_print(self):
 97        """Control print frequency"""
 98        now = self.get_clock().now()
 99        if (now - self.last_print).nanoseconds * 1e-9 >= 1.0:
100            self.last_print = now
101            return True
102        return False
103
104    def cb_image(self, msg: Image):
105        """Image callback (RGB image)"""
106        self.update_arrivals()
107
108        if self.should_print():
109            stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
110            self.get_logger().info(
111                f"📸 {self.topic_type} received\n"
112                f"  • frame_id:        {msg.header.frame_id}\n"
113                f"  • stamp (sec):     {stamp_sec:.6f}\n"
114                f"  • encoding:        {msg.encoding}\n"
115                f"  • size (WxH):      {msg.width} x {msg.height}\n"
116                f"  • step (bytes/row):{msg.step}\n"
117                f"  • is_bigendian:    {msg.is_bigendian}\n"
118                f"  • recv FPS (1s):   {self.get_fps():.1f}"
119            )
120
121        # Only RGB image supports video dump
122        if self.topic_type == "rgb_image" and self.dump_video_path:
123            self.dump_image_to_video(msg)
124
125    def cb_compressed(self, msg: CompressedImage):
126        """CompressedImage callback (RGB compressed image)"""
127        self.update_arrivals()
128
129        if self.should_print():
130            stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
131            self.get_logger().info(
132                f"🗜️  {self.topic_type} received\n"
133                f"  • frame_id:        {msg.header.frame_id}\n"
134                f"  • stamp (sec):     {stamp_sec:.6f}\n"
135                f"  • format:          {msg.format}\n"
136                f"  • data size:       {len(msg.data)}\n"
137                f"  • recv FPS (1s):   {self.get_fps():.1f}"
138            )
139
140    def cb_camerainfo(self, msg: CameraInfo):
141        """CameraInfo callback (camera intrinsic parameters)"""
142        # Camera info will only receive one frame, print it directly
143        stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
144
145        # Format D array
146        d_str = ", ".join([f"{d:.6f}" for d in msg.d])
147
148        # Format K matrix
149        k_str = ", ".join([f"{k:.6f}" for k in msg.k])
150
151        # Format P matrix
152        p_str = ", ".join([f"{p:.6f}" for p in msg.p])
153
154        self.get_logger().info(
155            f"📷 {self.topic_type} received\n"
156            f"  • frame_id:        {msg.header.frame_id}\n"
157            f"  • stamp (sec):     {stamp_sec:.6f}\n"
158            f"  • width x height:  {msg.width} x {msg.height}\n"
159            f"  • distortion_model:{msg.distortion_model}\n"
160            f"  • D: [{d_str}]\n"
161            f"  • K: [{k_str}]\n"
162            f"  • P: [{p_str}]\n"
163            f"  • binning_x: {msg.binning_x}\n"
164            f"  • binning_y: {msg.binning_y}\n"
165            f"  • roi: {{ x_offset: {msg.roi.x_offset}, y_offset: {msg.roi.y_offset}, height: {msg.roi.height}, width: {msg.roi.width}, do_rectify: {msg.roi.do_rectify} }}"
166        )
167
168    def dump_image_to_video(self, msg: Image):
169        """Video dump is only supported for RGB images"""
170        # You can add video recording functionality here
171        # Simplified in the Python version, only logs instead
172        if self.should_print():
173            self.get_logger().info(f"📝 Video dump not implemented in Python version")
174
175
176def main(args=None):
177    rclpy.init(args=args)
178    try:
179        node = HeadRearCameraTopicEcho()
180        rclpy.spin(node)
181    except KeyboardInterrupt:
182        pass
183    except Exception as e:
184        print(f"Error: {e}")
185    finally:
186        if 'node' in locals():
187            node.destroy_node()
188        rclpy.shutdown()
189
190
191if __name__ == '__main__':
192    main()

使用说明:

  1. 订阅RGB图像数据:

    ros2 run py_examples echo_camera_head_rear --ros-args -p topic_type:=rgb_image
    
  2. 订阅压缩图像数据:

    ros2 run py_examples echo_camera_head_rear --ros-args -p topic_type:=rgb_image_compressed
    
  3. 订阅相机内参:

    ros2 run py_examples echo_camera_head_rear --ros-args -p topic_type:=camera_info
    
  4. 录制视频:

    # dump_video_path的值可改为其他路径, 注意提前创建该文件所在目录才能保存
    ros2 run py_examples echo_camera_head_rear --ros-args -p topic_type:=rgb_image -p dump_video_path:=$PWD/rear_camera.avi
    

应用场景:

  • 人脸识别和追踪

  • 目标检测和识别

  • 视觉SLAM

  • 图像处理和计算机视觉算法开发

  • 机器人视觉导航

6.1.13 激光雷达数据订阅

该示例中用到了echo_lidar_data,通过订阅/aima/hal/sensor/lidar_chest_front/话题来接收机器人的激光雷达数据,支持点云数据和IMU数据两种数据类型。

功能特点:

  • 支持激光雷达点云数据订阅

  • 支持激光雷达IMU数据订阅

  • 实时FPS统计和数据显示

  • 可配置的topic类型选择

  • 详细的数据字段信息输出

支持的数据类型:

  • PointCloud2: 激光雷达点云数据 (sensor_msgs/PointCloud2)

  • Imu: 激光雷达IMU数据 (sensor_msgs/Imu)

技术实现:

  • 使用SensorDataQoS配置(BEST_EFFORT + VOLATILE

  • 支持点云字段信息解析和显示

  • 支持IMU四元数、角速度和线性加速度数据

  • 提供详细的调试日志输出

应用场景:

  • 激光雷达数据采集和分析

  • 点云数据处理和可视化

  • 机器人导航和定位

  • SLAM算法开发

  • 环境感知和建图

  1#!/usr/bin/env python3
  2"""
  3Chest LiDAR data subscription example
  4
  5Supports subscribing to the following topics:
  6  1. /aima/hal/sensor/lidar_chest_front/lidar_pointcloud
  7     - Data type: sensor_msgs/PointCloud2
  8     - frame_id: lidar_chest_front
  9     - child_frame_id: /
 10     - Content: LiDAR point cloud data
 11  2. /aima/hal/sensor/lidar_chest_front/imu
 12     - Data type: sensor_msgs/Imu
 13     - frame_id: lidar_imu_chest_front
 14     - Content: LiDAR IMU data
 15
 16You can select the topic type to subscribe via startup parameter --ros-args -p topic_type:=<type>:
 17  - pointcloud: subscribe to LiDAR point cloud
 18  - imu: subscribe to LiDAR IMU
 19Default topic_type is pointcloud
 20
 21Examples:
 22  ros2 run py_examples echo_lidar_data --ros-args -p topic_type:=pointcloud
 23  ros2 run py_examples echo_lidar_data --ros-args -p topic_type:=imu
 24"""
 25
 26import rclpy
 27from rclpy.node import Node
 28from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy
 29from sensor_msgs.msg import PointCloud2, Imu
 30from collections import deque
 31import time
 32
 33
 34class LidarChestEcho(Node):
 35    def __init__(self):
 36        super().__init__('lidar_chest_echo')
 37
 38        # Select the topic type to subscribe
 39        self.declare_parameter('topic_type', 'pointcloud')
 40        self.topic_type = self.get_parameter('topic_type').value
 41
 42        # SensorDataQoS: BEST_EFFORT + VOLATILE
 43        qos = QoSProfile(
 44            reliability=QoSReliabilityPolicy.BEST_EFFORT,
 45            history=QoSHistoryPolicy.KEEP_LAST,
 46            depth=5
 47        )
 48
 49        # Create different subscribers based on topic_type
 50        if self.topic_type == "pointcloud":
 51            self.topic_name = "/aima/hal/sensor/lidar_chest_front/lidar_pointcloud"
 52            self.sub_pointcloud = self.create_subscription(
 53                PointCloud2, self.topic_name, self.cb_pointcloud, qos)
 54            self.get_logger().info(
 55                f"✅ Subscribing LIDAR PointCloud2: {self.topic_name}")
 56
 57        elif self.topic_type == "imu":
 58            self.topic_name = "/aima/hal/sensor/lidar_chest_front/imu"
 59            self.sub_imu = self.create_subscription(
 60                Imu, self.topic_name, self.cb_imu, qos)
 61            self.get_logger().info(
 62                f"✅ Subscribing LIDAR IMU: {self.topic_name}")
 63
 64        else:
 65            self.get_logger().error(f"Unknown topic_type: {self.topic_type}")
 66            raise ValueError("Unknown topic_type")
 67
 68        # Internal state
 69        self.last_print = self.get_clock().now()
 70        self.arrivals = deque()
 71
 72    def update_arrivals(self):
 73        """Calculate received FPS"""
 74        now = self.get_clock().now()
 75        self.arrivals.append(now)
 76        while self.arrivals and (now - self.arrivals[0]).nanoseconds * 1e-9 > 1.0:
 77            self.arrivals.popleft()
 78
 79    def get_fps(self):
 80        """Get FPS"""
 81        return len(self.arrivals)
 82
 83    def should_print(self):
 84        """Control print frequency"""
 85        now = self.get_clock().now()
 86        if (now - self.last_print).nanoseconds * 1e-9 >= 1.0:
 87            self.last_print = now
 88            return True
 89        return False
 90
 91    def cb_pointcloud(self, msg: PointCloud2):
 92        """PointCloud2 callback (LiDAR point cloud)"""
 93        self.update_arrivals()
 94
 95        if self.should_print():
 96            stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
 97
 98            # Format fields info
 99            fields_str = " ".join(
100                [f"{f.name}({f.datatype})" for f in msg.fields])
101
102            self.get_logger().info(
103                f"🟢 LIDAR PointCloud2 received\n"
104                f"  • frame_id:        {msg.header.frame_id}\n"
105                f"  • stamp (sec):     {stamp_sec:.6f}\n"
106                f"  • width x height:  {msg.width} x {msg.height}\n"
107                f"  • point_step:      {msg.point_step}\n"
108                f"  • row_step:        {msg.row_step}\n"
109                f"  • fields:          {fields_str}\n"
110                f"  • is_bigendian:    {msg.is_bigendian}\n"
111                f"  • is_dense:        {msg.is_dense}\n"
112                f"  • data size:       {len(msg.data)}\n"
113                f"  • recv FPS (1s):   {self.get_fps():1.1f}"
114            )
115
116    def cb_imu(self, msg: Imu):
117        """IMU callback (LiDAR IMU)"""
118        self.update_arrivals()
119
120        if self.should_print():
121            stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
122
123            self.get_logger().info(
124                f"🟢 LIDAR IMU received\n"
125                f"  • frame_id:        {msg.header.frame_id}\n"
126                f"  • stamp (sec):     {stamp_sec:.6f}\n"
127                f"  • orientation:     [{msg.orientation.x:.6f}, {msg.orientation.y:.6f}, {msg.orientation.z:.6f}, {msg.orientation.w:.6f}]\n"
128                f"  • angular_velocity:[{msg.angular_velocity.x:.6f}, {msg.angular_velocity.y:.6f}, {msg.angular_velocity.z:.6f}]\n"
129                f"  • linear_accel:    [{msg.linear_acceleration.x:.6f}, {msg.linear_acceleration.y:.6f}, {msg.linear_acceleration.z:.6f}]\n"
130                f"  • recv FPS (1s):   {self.get_fps():.1f}"
131            )
132
133
134def main(args=None):
135    rclpy.init(args=args)
136    try:
137        node = LidarChestEcho()
138        rclpy.spin(node)
139    except KeyboardInterrupt:
140        pass
141    except Exception as e:
142        print(f"Error: {e}")
143    finally:
144        if 'node' in locals():
145            node.destroy_node()
146        rclpy.shutdown()
147
148
149if __name__ == '__main__':
150    main()

使用说明:

# 订阅激光雷达点云数据
ros2 run py_examples echo_lidar_data --ros-args -p topic_type:=pointcloud

# 订阅激光雷达IMU数据
ros2 run py_examples echo_lidar_data --ros-args -p topic_type:=imu

输出示例:

[INFO] [lidar_chest_echo]: ✅ Subscribing LIDAR PointCloud2: /aima/hal/sensor/lidar_chest_front/lidar_pointcloud
[INFO] [lidar_chest_echo]: 🟢 LIDAR PointCloud2 received
  • frame_id:        lidar_chest_front
  • stamp (sec):     1234567890.123456
  • width x height:  1 x 36000
  • point_step:      16
  • row_step:        16
  • fields:          x(7) y(7) z(7) intensity(7)
  • is_bigendian:    False
  • is_dense:        True
  • data size:       576000
  • recv FPS (1s):   10.0

6.1.14 播放视频

该示例中用到了play_video,在运行节点程序前,需要先将视频上传到机器人的**交互计算单元(PC3)**上(用户可在其上创建一个用来存储视频的目录),然后将节点程序中的video_path改为需要播放视频的路径。

注意

⚠️ 请注意!交互计算单元(PC3)独立于二开程序所在的开发计算单元(PC2), 音视频文件务必存入交互计算单元(IP: 10.0.1.42)。

功能说明
通过调用PlayVideo服务,可以让机器人在屏幕上播放指定路径的视频文件。请确保视频文件已上传到交互计算单元,否则播放会失败。

 1#!/usr/bin/env python3
 2
 3import rclpy
 4from rclpy.node import Node
 5from aimdk_msgs.srv import PlayVideo
 6import threading
 7
 8
 9class PlayVideoClient(Node):
10    def __init__(self):
11        super().__init__('play_video_client')
12        self.client = self.create_client(
13            PlayVideo, '/face_ui_proxy/play_video')
14        self.finished = threading.Event()  # used to exit main loop
15
16    def send_request(self, video_path, mode, priority):
17        # wait for service to be available
18        while not self.client.wait_for_service(timeout_sec=1.0) and rclpy.ok():
19            self.get_logger().warn('Waiting for PlayVideo service to start...')
20        if not rclpy.ok():
21            return
22
23        req = PlayVideo.Request()
24        req.header.header.stamp = self.get_clock().now().to_msg()
25        req.video_path = video_path
26        req.mode = mode
27        req.priority = priority
28
29        # async call
30        future = self.client.call_async(req)
31        future.add_done_callback(self.callback)
32
33    def callback(self, future):
34        try:
35            response = future.result()
36            if response.success:
37                self.get_logger().info(
38                    f'✅ Video played successfully: {response.message}')
39            else:
40                self.get_logger().error(
41                    f'❌ Video play failed: {response.message}')
42        except Exception as e:
43            self.get_logger().error(f'Service call error: {e}')
44        self.finished.set()  # notify main loop to exit
45
46
47def main(args=None):
48    rclpy.init(args=args)
49    node = PlayVideoClient()
50
51    # video path and priority can be customized
52    video_path = "/agibot/data/home/agi/zhiyuan.mp4"
53    priority = 5
54    # input play mode
55    try:
56        mode = int(input("Enter video play mode (1: play once, 2: loop): "))
57    except Exception:
58        mode = 2
59
60    node.send_request(video_path, mode, priority)
61
62    # spin_once + event to wait for exit, supports Ctrl+C
63    try:
64        while rclpy.ok() and not node.finished.is_set():
65            rclpy.spin_once(node, timeout_sec=0.1)
66    except KeyboardInterrupt:
67        pass
68
69    node.destroy_node()
70    rclpy.shutdown()
71
72
73if __name__ == '__main__':
74    main()

6.1.15 媒体文件播放

该示例中用到了play_media,通过该节点可实现播放指定的媒体文件(如音频文件),支持WAV、MP3等格式的音频文件播放。

功能特点:

  • 支持多种音频格式播放(WAV、MP3等)

  • 支持优先级控制,可设置播放优先级

  • 支持打断机制,可中断当前播放

  • 支持自定义文件路径和播放参数

  • 提供完整的错误处理和状态反馈

技术实现:

  • 使用PlayMediaFile服务进行媒体文件播放

  • 支持优先级级别设置(0-99)

  • 支持中断控制(is_interrupted参数)

  • 提供详细的播放状态反馈

  • 兼容不同的响应字段格式

应用场景:

  • 音频文件播放和媒体控制

  • 语音提示和音效播放

  • 多媒体应用开发

  • 机器人交互音频反馈

注意

⚠️ 请注意!交互计算单元(PC3)独立于二开程序所在的开发计算单元(PC2), 音视频文件务必存入交互计算单元(IP: 10.0.1.42)。

 1#!/usr/bin/env python3
 2import sys
 3import rclpy
 4from rclpy.node import Node
 5
 6from aimdk_msgs.srv import PlayMediaFile
 7from aimdk_msgs.msg import TtsPriorityLevel
 8
 9
10def main(args=None):
11    rclpy.init(args=args)
12    node = Node('play_media_file_client_min')
13
14    # 1) service name
15    service_name = '/aimdk_5Fmsgs/srv/PlayMediaFile'
16    client = node.create_client(PlayMediaFile, service_name)
17
18    # 2) input file path (if no arg, ask user)
19    default_file = '/agibot/data/var/interaction/tts_cache/normal/demo.wav'
20    if len(sys.argv) > 1:
21        file_name = sys.argv[1]
22    else:
23        file_name = input(
24            f'Enter media file path to play (default: {default_file}): ').strip()
25        if not file_name:
26            file_name = default_file
27
28    # 3) build request
29    req = PlayMediaFile.Request()
30    req.media_file_req.file_name = file_name
31    req.media_file_req.domain = 'demo_client'       # required: caller domain
32    req.media_file_req.trace_id = 'demo'            # optional
33    req.media_file_req.is_interrupted = True        # interrupt same-priority
34    req.media_file_req.priority_weight = 0          # optional: 0~99
35    req.media_file_req.priority_level.value = TtsPriorityLevel.INTERACTION_L6
36
37    # 4) wait for service and call
38    node.get_logger().info(f'Waiting for service: {service_name}')
39    if not client.wait_for_service(timeout_sec=5.0):
40        node.get_logger().error(f'Service unavailable: {service_name}')
41        node.destroy_node()
42        rclpy.shutdown()
43        sys.exit(1)
44
45    future = client.call_async(req)
46    try:
47        rclpy.spin_until_future_complete(node, future, timeout_sec=10.0)
48    except KeyboardInterrupt:
49        node.get_logger().warn('Interrupted while waiting')
50        node.destroy_node()
51        rclpy.shutdown()
52        sys.exit(1)
53
54    if not future.done():
55        node.get_logger().error('Call timed out or not completed')
56        node.destroy_node()
57        rclpy.shutdown()
58        sys.exit(1)
59
60    # 5) handle response (success is in tts_resp)
61    try:
62        resp = future.result()
63        success = resp.tts_resp.is_success
64
65        if success:
66            node.get_logger().info(
67                f'✅ Media file play request succeeded: {file_name}')
68        else:
69            node.get_logger().error(
70                f'❌ Media file play request failed: {file_name}')
71    except Exception as e:
72        node.get_logger().error(f'Call exception: {e}')
73
74    node.destroy_node()
75    rclpy.shutdown()
76
77
78if __name__ == '__main__':
79    main()

使用说明:

# 播放默认音频文件
ros2 run py_examples play_media

# 播放指定音频文件
# 注意替换/path/to/your/audio_file.wav为交互板上实际文件路径
ros2 run py_examples play_media /path/to/your/audio_file.wav

# 播放TTS缓存文件
ros2 run py_examples play_media /agibot/data/var/interaction/tts_cache/normal/demo.wav

输出示例:

[INFO] [play_media_file_client_min]: 等待服务: /aimdk_5Fmsgs/srv/PlayMediaFile
[INFO] [play_media_file_client_min]: ✅ 媒体文件播放请求成功

注意事项:

  • 确保音频文件路径正确且文件存在

  • 支持的文件格式:WAV、MP3等

  • 优先级设置影响播放队列顺序

  • 打断功能可中断当前播放的音频

  • 程序具有完善的异常处理机制

6.1.16 TTS (文字转语音)

该示例中用到了play_tts,通过该节点可实现语音播放输入的文字,用户可根据不同的场景输入相应的文本。

功能特点:

  • 支持命令行参数和交互式输入

  • 完整的服务可用性检查和错误处理

  • 支持优先级控制和打断机制

  • 提供详细的播放状态反馈

核心代码

 1#!/usr/bin/env python3
 2
 3import sys
 4import rclpy
 5from rclpy.node import Node
 6from rclpy.task import Future
 7from aimdk_msgs.srv import PlayTts
 8from aimdk_msgs.msg import TtsPriorityLevel
 9
10
11class PlayTTSClient(Node):
12    def __init__(self):
13        super().__init__('play_tts_client_min')
14
15        # fill in the actual service name
16        self.service_name = '/aimdk_5Fmsgs/srv/PlayTts'
17        self.client = self.create_client(PlayTts, self.service_name)
18
19    def send_request(self, text):
20        # wait for service, 8s timeout
21        if not self.client.wait_for_service(timeout_sec=8.0):
22            self.get_logger().error(
23                f'Service unavailable: {self.service_name}')
24            return False
25
26        req = PlayTts.Request()
27        req.header.header.stamp = self.get_clock().now().to_msg()
28
29        req.tts_req.text = text
30        req.tts_req.domain = 'demo_client'   # required: caller domain
31        req.tts_req.trace_id = 'demo'        # optional: request id
32        req.tts_req.is_interrupted = True    # required: interrupt same-priority
33        req.tts_req.priority_weight = 0
34        req.tts_req.priority_level.value = 6
35
36        for i in range(8):
37            req.header.header.stamp = self.get_clock().now().to_msg()
38            future = self.client.call_async(req)
39            rclpy.spin_until_future_complete(self, future, timeout_sec=0.25)
40
41            if future.done():
42                break
43
44            # retry if remote peer module can be under heavy load
45            self.get_logger().info(f"trying to send play tts request... [{i}]")
46
47        if not future.done():
48            self.get_logger().error('Call timed out')
49            return False
50
51        resp = future.result()
52        if resp is not None:
53            if resp.tts_resp.is_success:
54                self.get_logger().info('✅ TTS request succeeded')
55                return True
56            else:
57                self.get_logger().error('❌ TTS request failed')
58                return False
59        else:
60            self.get_logger().error('Call failed, future.result() returned None')
61            return False
62
63
64def main(args=None):
65    rclpy.init(args=args)
66    node = PlayTTSClient()
67
68    # get text to speak
69    if len(sys.argv) > 1:
70        text = sys.argv[1]
71    else:
72        text = input('Enter text to speak: ')
73        if not text:
74            text = 'Hello, I am AgiBot X2.'
75
76    success = node.send_request(text)
77    node.destroy_node()
78    rclpy.shutdown()
79
80    return 0 if success else 1
81
82
83if __name__ == '__main__':
84    exit(main())

使用说明

# 使用命令行参数播报文本(推荐)
ros2 run py_examples play_tts "你好,我是灵犀X2机器人"

# 或者不带参数运行,程序会提示用户输入
ros2 run py_examples play_tts

输出示例

[INFO] [play_tts_client_min]: ✅ 播报请求成功

注意事项

  • 确保TTS服务正常运行

  • 支持中文和英文文本播报

  • 优先级设置影响播放队列顺序

  • 打断功能可中断当前播放的语音

接口参考

  • 服务:/aimdk_5Fmsgs/srv/PlayTts

  • 消息:aimdk_msgs/srv/PlayTts

6.1.17 麦克风数据接收

该示例中用到了mic_receiver,通过订阅/agent/process_audio_output话题来接收机器人的降噪音频数据,支持内置麦克风和外置麦克风两种音频流,并根据VAD(语音活动检测)状态自动保存完整的语音片段为PCM文件。

功能特点:

  • 支持多音频流同时接收(内置麦克风 stream_id=1,外置麦克风 stream_id=2)

  • 基于VAD状态自动检测语音开始、处理中、结束

  • 自动保存完整语音片段为PCM格式文件

  • 按时间戳和音频流分类存储

  • 支持音频时长计算和统计信息输出

  • 智能缓冲区管理,避免内存泄漏

  • 完善的错误处理和异常管理

  • 详细的调试日志输出

VAD状态说明:

  • 0: 无语音

  • 1: 语音开始

  • 2: 语音处理中

  • 3: 语音结束

技术实现:

  • 支持PCM格式音频文件保存(16kHz, 16位, 单声道)

  • 提供详细的日志输出和状态监控

  • 支持实时音频流处理和文件保存

应用场景:

  • 语音识别和语音处理

  • 音频数据采集和分析

  • 实时语音监控

  • 音频质量检测

  • 多麦克风阵列数据处理

  1#!/usr/bin/env python3
  2"""
  3Microphone data receiving example
  4
  5This example subscribes to the `/agent/process_audio_output` topic to receive the robot's
  6noise-suppressed audio data. It supports both the built-in microphone and the external
  7microphone audio streams, and automatically saves complete speech segments as PCM files
  8based on the VAD (Voice Activity Detection) state.
  9
 10Features:
 11- Supports receiving multiple audio streams at the same time (built-in mic stream_id=1, external mic stream_id=2)
 12- Automatically detects speech start / in-progress / end based on VAD state
 13- Automatically saves complete speech segments as PCM files
 14- Stores files categorized by timestamp and audio stream
 15- Supports audio duration calculation and logging
 16
 17VAD state description:
 18- 0: No speech
 19- 1: Speech start
 20- 2: Speech in progress
 21- 3: Speech end
 22"""
 23
 24import rclpy
 25from rclpy.node import Node
 26from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy
 27from aimdk_msgs.msg import ProcessedAudioOutput, AudioVadStateType
 28import os
 29import time
 30from datetime import datetime
 31from collections import defaultdict
 32from typing import Dict, List
 33
 34
 35class AudioSubscriber(Node):
 36    def __init__(self):
 37        super().__init__('audio_subscriber')
 38
 39        # Audio buffers, stored separately by stream_id
 40        # stream_id -> buffer
 41        self.audio_buffers: Dict[int, List[bytes]] = defaultdict(list)
 42        self.recording_state: Dict[int, bool] = defaultdict(bool)
 43
 44        # Create audio output directory
 45        self.audio_output_dir = "audio_recordings"
 46        os.makedirs(self.audio_output_dir, exist_ok=True)
 47
 48        # VAD state name mapping
 49        self.vad_state_names = {
 50            0: "No speech",
 51            1: "Speech start",
 52            2: "Speech in progress",
 53            3: "Speech end"
 54        }
 55
 56        # Audio stream name mapping
 57        self.stream_names = {
 58            1: "Built-in microphone",
 59            2: "External microphone"
 60        }
 61
 62        # QoS settings
 63        qos = QoSProfile(
 64            history=QoSHistoryPolicy.KEEP_LAST,
 65            depth=10,
 66            reliability=QoSReliabilityPolicy.BEST_EFFORT
 67        )
 68
 69        # Create subscriber
 70        self.subscription = self.create_subscription(
 71            ProcessedAudioOutput,
 72            '/agent/process_audio_output',
 73            self.audio_callback,
 74            qos
 75        )
 76
 77        self.get_logger().info("Start subscribing to noise-suppressed audio data...")
 78
 79    def audio_callback(self, msg: ProcessedAudioOutput):
 80        """Audio data callback"""
 81        try:
 82            stream_id = msg.stream_id
 83            vad_state = msg.audio_vad_state.value
 84            audio_data = bytes(msg.audio_data)
 85
 86            self.get_logger().info(
 87                f"Received audio data: stream_id={stream_id}, "
 88                f"vad_state={vad_state}({self.vad_state_names.get(vad_state, 'Unknown state')}), "
 89                f"audio_size={len(audio_data)} bytes"
 90            )
 91
 92            self.handle_vad_state(stream_id, vad_state, audio_data)
 93
 94        except Exception as e:
 95            self.get_logger().error(
 96                f"Error while processing audio message: {str(e)}")
 97
 98    def handle_vad_state(self, stream_id: int, vad_state: int, audio_data: bytes):
 99        """Handle VAD state changes"""
100        stream_name = self.stream_names.get(
101            stream_id, f"Unknown stream {stream_id}")
102        vad_name = self.vad_state_names.get(
103            vad_state, f"Unknown state {vad_state}")
104
105        self.get_logger().info(
106            f"[{stream_name}] VAD state: {vad_name} audio: {len(audio_data)} bytes"
107        )
108
109        # Speech start
110        if vad_state == 1:
111            self.get_logger().info("🎤 Speech start detected")
112            self.audio_buffers[stream_id].clear()
113            self.recording_state[stream_id] = True
114            if audio_data:
115                self.audio_buffers[stream_id].append(audio_data)
116
117        # Speech in progress
118        elif vad_state == 2:
119            self.get_logger().info("🔄 Speech in progress...")
120            if self.recording_state[stream_id] and audio_data:
121                self.audio_buffers[stream_id].append(audio_data)
122
123        # Speech end
124        elif vad_state == 3:
125            self.get_logger().info("✅ Speech end")
126            if self.recording_state[stream_id] and audio_data:
127                self.audio_buffers[stream_id].append(audio_data)
128
129            if self.recording_state[stream_id] and self.audio_buffers[stream_id]:
130                self.save_audio_segment(stream_id)
131            self.recording_state[stream_id] = False
132
133        # No speech
134        elif vad_state == 0:
135            if self.recording_state[stream_id]:
136                self.get_logger().info("⏹️ Reset recording state")
137                self.recording_state[stream_id] = False
138
139        # Print current buffer status
140        buffer_size = sum(len(chunk)
141                          for chunk in self.audio_buffers[stream_id])
142        recording = self.recording_state[stream_id]
143        self.get_logger().debug(
144            f"[Stream {stream_id}] Buffer size: {buffer_size} bytes, recording: {recording}"
145        )
146
147    def save_audio_segment(self, stream_id: int):
148        """Save audio segment"""
149        if not self.audio_buffers[stream_id]:
150            return
151
152        # Merge all audio data
153        audio_data = b''.join(self.audio_buffers[stream_id])
154
155        # Get current timestamp
156        now = datetime.now()
157        timestamp = now.strftime("%Y%m%d_%H%M%S_%f")[:-3]  # to milliseconds
158
159        # Create subdirectory by stream_id
160        stream_dir = os.path.join(self.audio_output_dir, f"stream_{stream_id}")
161        os.makedirs(stream_dir, exist_ok=True)
162
163        # Generate filename
164        stream_name = "internal_mic" if stream_id == 1 else "external_mic" if stream_id == 2 else f"stream_{stream_id}"
165        filename = f"{stream_name}_{timestamp}.pcm"
166        filepath = os.path.join(stream_dir, filename)
167
168        try:
169            # Save PCM file
170            with open(filepath, 'wb') as f:
171                f.write(audio_data)
172
173            self.get_logger().info(
174                f"Audio segment saved: {filepath} (size: {len(audio_data)} bytes)")
175
176            # Calculate audio duration (assuming 16 kHz, 16-bit, mono)
177            sample_rate = 16000
178            bits_per_sample = 16
179            channels = 1
180            bytes_per_sample = bits_per_sample // 8
181            total_samples = len(audio_data) // (bytes_per_sample * channels)
182            duration_seconds = total_samples / sample_rate
183
184            self.get_logger().info(
185                f"Audio duration: {duration_seconds:.2f} s ({total_samples} samples)")
186
187        except Exception as e:
188            self.get_logger().error(f"Failed to save audio file: {str(e)}")
189
190
191def main(args=None):
192    rclpy.init(args=args)
193    node = AudioSubscriber()
194
195    try:
196        node.get_logger().info("Listening to noise-suppressed audio data, press Ctrl+C to exit...")
197        rclpy.spin(node)
198    except KeyboardInterrupt:
199        node.get_logger().info("Interrupt signal received, exiting...")
200    finally:
201        node.destroy_node()
202        rclpy.shutdown()
203
204
205if __name__ == '__main__':
206    main()

使用说明:

  1. 运行程序

    # 构建Python包
    colcon build --packages-select py_examples
    
    # 运行麦克风接收程序
    ros2 run py_examples mic_receiver
    
  2. 目录结构

    • 运行节点后会自动创建audio_recordings目录

    • 音频文件按stream_id分类存储:

      • stream_1/: 内置麦克风音频

      • stream_2/: 外置麦克风音频

  3. 文件命名格式{stream_name}_{timestamp}.pcm

    • internal_mic_20250909_133649_738.pcm (内置麦克风)

    • external_mic_20250909_133650_123.pcm (外置麦克风)

  4. 音频格式:16kHz, 16位, 单声道PCM

  5. 播放保存的PCM文件

    # 播放内置麦克风录音
    aplay -r 16000 -f S16_LE -c 1 audio_recordings/stream_1/internal_mic_20250909_133649_738.pcm
    
    # 播放外置麦克风录音
    aplay -r 16000 -f S16_LE -c 1 audio_recordings/stream_2/external_mic_20250909_133650_123.pcm
    
  6. 转换为WAV格式(可选):

    # 使用ffmpeg转换为WAV格式
    ffmpeg -f s16le -ar 16000 -ac 1 -i external_mic_20250909_133649_738.pcm output.wav
    
  7. 程序控制

    • Ctrl+C 安全退出程序

    • 程序会自动处理音频流的开始和结束

    • 支持同时处理多个音频流

输出示例:

正常启动和运行:

[INFO] 开始订阅降噪音频数据...
[INFO] 收到音频数据: stream_id=1, vad_state=0(无语音), audio_size=0 bytes
[INFO] [内置麦克风] VAD状态: 无语音 音频数据: 0 bytes
[INFO] 收到音频数据: stream_id=2, vad_state=0(无语音), audio_size=0 bytes
[INFO] [外置麦克风] VAD状态: 无语音 音频数据: 0 bytes

检测到语音开始:

[INFO] 收到音频数据: stream_id=2, vad_state=1(语音开始), audio_size=320 bytes
[INFO] [外置麦克风] VAD状态: 语音开始 音频数据: 320 bytes
[INFO] 🎤 检测到语音开始

语音处理过程:

[INFO] 收到音频数据: stream_id=2, vad_state=2(语音处理中), audio_size=320 bytes
[INFO] [外置麦克风] VAD状态: 语音处理中 音频数据: 320 bytes
[INFO] 🔄 语音处理中...
[INFO] 收到音频数据: stream_id=2, vad_state=2(语音处理中), audio_size=320 bytes
[INFO] [外置麦克风] VAD状态: 语音处理中 音频数据: 320 bytes
[INFO] 🔄 语音处理中...

语音结束和保存:

[INFO] 收到音频数据: stream_id=2, vad_state=3(语音结束), audio_size=320 bytes
[INFO] [外置麦克风] VAD状态: 语音结束 音频数据: 320 bytes
[INFO] ✅ 语音结束
[INFO] 音频段已保存: audio_recordings/stream_2/external_mic_20250909_133649_738.pcm (大小: 960 bytes)
[INFO] 音频时长: 0.06 秒 (480 样本)

多音频流同时处理:

[INFO] 收到音频数据: stream_id=1, vad_state=1(语音开始), audio_size=320 bytes
[INFO] [内置麦克风] VAD状态: 语音开始 音频数据: 320 bytes
[INFO] 🎤 检测到语音开始
[INFO] 收到音频数据: stream_id=2, vad_state=1(语音开始), audio_size=320 bytes
[INFO] [外置麦克风] VAD状态: 语音开始 音频数据: 320 bytes
[INFO] 🎤 检测到语音开始

程序退出:

^C[INFO] 接收到中断信号,正在退出...
[INFO] 程序已安全退出

注意事项:

  • 程序支持同时处理多个音频流(内置和外置麦克风)

  • 每个音频流都有独立的缓冲区和录音状态

  • 音频文件会自动按时间戳和音频流分类保存

  • 程序具有完善的错误处理机制,确保稳定运行

6.1.18 表情控制

该示例中用到了play_emoji,通过该节点可实现表达指定的表情,用户可根据已有的表情列表来选择要变化的表情,具体表情列表可参考表情列表

 1#!/usr/bin/env python3
 2import rclpy
 3from rclpy.node import Node
 4from aimdk_msgs.srv import PlayEmoji
 5import threading
 6import time
 7
 8
 9class PlayEmojiClient(Node):
10    def __init__(self):
11        super().__init__('play_emoji_client')
12        self.client = self.create_client(
13            PlayEmoji, '/face_ui_proxy/play_emoji')
14        self.finished = threading.Event()
15        self.get_logger().info('PlayEmoji client node started.')
16
17    def send_request(self, emoji: int, mode: int, priority: int):
18        # Wait for service to be available
19        while not self.client.wait_for_service(timeout_sec=2.0) and rclpy.ok():
20            self.get_logger().warn('Waiting for PlayEmoji service...')
21
22        if not rclpy.ok():
23            return
24
25        req = PlayEmoji.Request()
26        # Equivalent to C++: request->header.header.stamp = this->now();
27        req.header.header.stamp = self.get_clock().now().to_msg()
28        req.emotion_id = int(emoji)
29        req.mode = int(mode)
30        req.priority = int(priority)
31
32        future = self.client.call_async(req)
33        future.add_done_callback(self._on_response)
34
35    def _on_response(self, future):
36        try:
37            resp = future.result()
38            if resp.success:
39                self.get_logger().info(
40                    f'✅ Emoji played successfully: {resp.message}')
41            else:
42                self.get_logger().error(f'❌ Emoji play failed: {resp.message}')
43        except Exception as e:
44            self.get_logger().error(f'Call failed: {e}')
45        finally:
46            self.finished.set()
47
48
49def main(args=None):
50    rclpy.init(args=args)
51    node = PlayEmojiClient()
52
53    # Interactive input, same as the original C++ version
54    try:
55        emotion = int(
56            input("Enter emoji ID: 1-blink, 60-bored, 70-abnormal, 80-sleeping, 90-happy ... 190-double angry, 200-adore: "))
57    except Exception:
58        emotion = 1
59    try:
60        mode = int(input("Enter play mode (1: play once, 2: loop): "))
61    except Exception:
62        mode = 1
63    priority = 10  # default priority
64
65    node.send_request(emotion, mode, priority)
66
67    # Polling-style spin, supports immediate exit with Ctrl+C
68    try:
69        while rclpy.ok() and not node.finished.is_set():
70            rclpy.spin_once(node, timeout_sec=0.1)
71            time.sleep(0.05)
72    except KeyboardInterrupt:
73        pass
74
75    node.destroy_node()
76    rclpy.shutdown()
77
78
79if __name__ == '__main__':
80    main()

6.1.19 LED灯带控制

功能说明:演示如何控制机器人的LED灯带,支持多种显示模式和自定义颜色。

核心代码

  1#!/usr/bin/env python3
  2
  3import rclpy
  4import rclpy.logging
  5from rclpy.node import Node
  6import signal
  7import sys
  8
  9from aimdk_msgs.msg import CommonRequest
 10from aimdk_msgs.srv import LedStripCommand
 11
 12client_node = None  # for shutting down cleanly in signal handler
 13
 14
 15def signal_handler(signum, frame):
 16    if client_node is not None:
 17        client_node.get_logger().info(
 18            f"Received signal {signum}, shutting down...")
 19    if rclpy.ok():
 20        rclpy.shutdown()
 21
 22
 23class PlayLightsClient(Node):
 24    def __init__(self):
 25        super().__init__('play_lights_client')
 26
 27        # create service client
 28        self.client = self.create_client(
 29            LedStripCommand, '/aimdk_5Fmsgs/srv/LedStripCommand')
 30
 31        self.get_logger().info('PlayLights client node started.')
 32
 33        timeout_sec = 8.0
 34        start = self.get_clock().now().nanoseconds / 1e9
 35
 36        while not self.client.wait_for_service(timeout_sec=2.0):
 37            now = self.get_clock().now().nanoseconds / 1e9
 38            if now - start > timeout_sec:
 39                self.get_logger().error('❌ Timed out waiting for service.')
 40                raise RuntimeError('Service unavailable')
 41            self.get_logger().info('Waiting for LedStripCommand service...')
 42
 43        self.get_logger().info('🟢 Service available, ready to send request.')
 44
 45    def send_request(self, led_mode, r, g, b):
 46        """Send LED control request"""
 47        try:
 48            # create request
 49            request = LedStripCommand.Request()
 50            request.led_strip_mode = led_mode
 51            request.r = r
 52            request.g = g
 53            request.b = b
 54
 55            # send request
 56            # Note: LED strip is slow to response (up to ~5s)
 57            for i in range(4):
 58                request.request.header.stamp = self.get_clock().now().to_msg()
 59                future = self.client.call_async(request)
 60                rclpy.spin_until_future_complete(
 61                    self, future, timeout_sec=5)
 62
 63                if future.done():
 64                    break
 65
 66                # retry as remote peer can be under heavy load
 67                self.get_logger().info(f"trying to control led strip... [{i}]")
 68
 69            if not future.done():
 70                self.get_logger().error('❌ Service call failed or timed out.')
 71                return False
 72
 73            response = future.result()
 74
 75            if response.status_code == 0:
 76                self.get_logger().info("✅ LED strip command sent successfully")
 77                return True
 78            else:
 79                self.get_logger().error(
 80                    f"❌ LED strip command failed with status: {response.status_code}")
 81                return False
 82
 83        except Exception as e:
 84            self.get_logger().error(f"❌ Exception while calling service: {e}")
 85            return False
 86
 87
 88def main(args=None):
 89    global client_node
 90    rclpy.init(args=args)
 91
 92    # register signal handlers early
 93    signal.signal(signal.SIGINT, signal_handler)
 94    signal.signal(signal.SIGTERM, signal_handler)
 95
 96    try:
 97        # get command line args
 98        if len(sys.argv) > 4:
 99            # use CLI args
100            led_mode = int(sys.argv[1])
101            r = int(sys.argv[2])
102            g = int(sys.argv[3])
103            b = int(sys.argv[4])
104            rclpy.logging.get_logger('main').info(
105                f'Using CLI args: mode={led_mode}, RGB=({r},{g},{b})')
106        else:
107            # interactive input
108            print("=== LED strip control example ===")
109            print("Select LED strip mode:")
110            print("0 - Steady on")
111            print("1 - Breathing (4s cycle, sine brightness)")
112            print("2 - Blinking (1s cycle, 0.5s on, 0.5s off)")
113            print("3 - Flowing (2s cycle, light up from left to right)")
114
115            led_mode = int(input("Enter mode (0-3): "))
116
117            print("\nSet RGB color values (0-255):")
118            r = int(input("Red (R): "))
119            g = int(input("Green (G): "))
120            b = int(input("Blue (B): "))
121
122        # clamp input
123        led_mode = max(0, min(3, led_mode))
124        r = max(0, min(255, r))
125        g = max(0, min(255, g))
126        b = max(0, min(255, b))
127
128        client_node = PlayLightsClient()
129
130        client_node.get_logger().info(f"Sending LED control command...")
131        client_node.get_logger().info(
132            f"Mode: {led_mode}, Color: RGB({r}, {g}, {b})")
133
134        # send request
135        ok = client_node.send_request(led_mode, r, g, b)
136        if not ok:
137            client_node.get_logger().error('LED strip control request failed.')
138
139    except Exception as e:
140        rclpy.logging.get_logger('main').error(f'Error occurred: {e}')
141    finally:
142        if rclpy.ok():
143            rclpy.shutdown()
144
145
146if __name__ == '__main__':
147    main()

使用说明

# 构建
colcon build --packages-select py_examples

# 交互式运行
ros2 run py_examples play_lights

# 命令行参数运行
ros2 run py_examples play_lights 1 255 0 0  # 模式1,红色

输出示例

=== LED灯带控制示例 ===
请选择灯带模式:
0 - 常亮模式
1 - 呼吸模式 (4s周期,亮度正弦变化)
2 - 闪烁模式 (1s周期,0.5s亮,0.5s灭)
3 - 流水模式 (2s周期,从左到右依次点亮)
请输入模式 (0-3): 1

请设置RGB颜色值 (0-255):
红色分量 (R): 255
绿色分量 (G): 0
蓝色分量 (B): 0

发送LED控制命令...
模式: 1, 颜色: RGB(255, 0, 0)
✅ LED strip command sent successfully

技术特点

  • 支持4种LED显示模式

  • RGB颜色自定义

  • 同步服务调用

  • 命令行参数支持

  • 输入参数验证

  • 友好的用户交互界面