6.1 Python interface usage example

This chapter will guide you through implementing several features shown in the index

Build & Run Instructions

  • Change into the aimdk directory and run the following commands

    source /opt/ros/humble/setup.bash
    colcon build
    source install/setup.bash
    ros2 run py_examples '<corresponding feature name e.g.: get_mc_action>'
    

📝 Code explanation

The complete code implementation includes full error handling, signal handling, timeout handling, and other mechanisms to ensure program robustness. Please view or modify the code in the py_examples directory

Caution

As standard ROS DO NOT handle cross-host service (request-response) well, please refer to SDK examples to use open interfaces in a robust way (with protection mechanisms e.g. exception safety and retransmission)

6.1.1 Developer Mode

Invoke the 'GetSystemState' service to obtain the robot’s current system status, and use 'MigrateSystemState' to access the corresponding developer mode.

Definition of Developer Mode

  1#!/usr/bin/env python3
  2"""
  3System State Migrator - Synchronous Python version
  4"""
  5
  6import sys
  7import time
  8import rclpy
  9from rclpy.node import Node
 10import aimdk_msgs.srv as aimdk_srv
 11import aimdk_msgs.msg as aimdk_msg
 12
 13
 14class SyncSystemStateMigrator(Node):
 15    """Synchronous version of system state migrator"""
 16
 17    def __init__(self, target_state: str):
 18        super().__init__('sync_system_state_migrator')
 19        self.target_state = target_state
 20
 21        # Create service clients
 22        self.migrate_client = self.create_client(
 23            aimdk_srv.MigrateSystemState,
 24            '/aimdk_5Fmsgs/srv/MigrateSystemState'
 25        )
 26
 27        self.get_state_client = self.create_client(
 28            aimdk_srv.GetSystemState,
 29            '/aimdk_5Fmsgs/srv/GetSystemState'
 30        )
 31
 32        self.get_logger().info(f'Target state: {self.target_state}')
 33
 34    def wait_for_services(self, timeout_sec: float = 30.0) -> bool:
 35        """Wait for services to become available"""
 36        self.get_logger().info('Waiting for services...')
 37
 38        start_time = time.time()
 39        while time.time() - start_time < timeout_sec and rclpy.ok():
 40            if (self.migrate_client.wait_for_service(timeout_sec=1.0) and
 41                    self.get_state_client.wait_for_service(timeout_sec=1.0)):
 42                self.get_logger().info('Services are available')
 43                return True
 44
 45            self.get_logger().info('Services not available, waiting...')
 46            time.sleep(0.5)
 47
 48        self.get_logger().error(
 49            f'Timeout waiting for services after {timeout_sec} seconds')
 50        return False
 51
 52    def migrate_state_sync(self, max_retries: int = 5, timeout_sec: float = 30.0) -> bool:
 53        """Send migration request synchronously with retry"""
 54        request = aimdk_srv.MigrateSystemState.Request()
 55        now = self.get_clock().now()
 56        request.header.header.stamp.sec = now.nanoseconds // 1_000_000_000
 57        request.header.header.stamp.nanosec = now.nanoseconds % 1_000_000_000
 58        request.state = self.target_state
 59
 60        for attempt in range(1, max_retries + 1):
 61            self.get_logger().info(
 62                f'Sending migration request to {self.target_state} '
 63                f'(attempt {attempt}/{max_retries})...'
 64            )
 65
 66            future = None
 67            try:
 68                future = self.migrate_client.call_async(request)
 69
 70                start_time = time.time()
 71                while rclpy.ok() and time.time() - start_time < timeout_sec:
 72                    rclpy.spin_once(self, timeout_sec=0.1)
 73                    if future.done():
 74                        response = future.result()
 75                        if response.header.status.value == aimdk_msg.CommonState.SUCCESS:
 76                            self.get_logger().info('Migration request accepted')
 77                            return True
 78                        else:
 79                            self.get_logger().error(
 80                                f'Migration failed: {response.header.message}'
 81                            )
 82                            return False
 83
 84                self.get_logger().warning(
 85                    f'Timeout waiting for migration response '
 86                    f'(attempt {attempt}/{max_retries})'
 87                )
 88
 89            except Exception as e:
 90                self.get_logger().error(
 91                    f'Migration service call failed (attempt {attempt}/{max_retries}): {e}'
 92                )
 93            finally:
 94                if future is not None and not future.done():
 95                    future.cancel()
 96
 97            if attempt < max_retries:
 98                self.get_logger().info('Retrying migration request in 2.0 seconds...')
 99                time.sleep(2.0)
100
101        self.get_logger().error(
102            f'Migration request failed after {max_retries} attempts'
103        )
104        return False
105
106    def get_current_state_sync(self, max_retries: int = 3, timeout_sec: float = 10.0):
107        """Get current system state synchronously with retry"""
108        request = aimdk_srv.GetSystemState.Request()
109        now = self.get_clock().now()
110        request.header.header.stamp.sec = now.nanoseconds // 1_000_000_000
111        request.header.header.stamp.nanosec = now.nanoseconds % 1_000_000_000
112
113        for attempt in range(1, max_retries + 1):
114            future = None
115            try:
116                future = self.get_state_client.call_async(request)
117
118                start_time = time.time()
119                while rclpy.ok() and time.time() - start_time < timeout_sec:
120                    rclpy.spin_once(self, timeout_sec=0.1)
121                    if future.done():
122                        return future.result()
123
124                self.get_logger().warning(
125                    f'Timeout getting system state (attempt {attempt}/{max_retries})'
126                )
127
128            except Exception as e:
129                self.get_logger().error(
130                    f'GetSystemState service call failed (attempt {attempt}/{max_retries}): {e}'
131                )
132            finally:
133                if future is not None and not future.done():
134                    future.cancel()
135
136            if attempt < max_retries:
137                time.sleep(1.0)
138
139        self.get_logger().error(
140            f'Failed to get system state after {max_retries} attempts'
141        )
142        return None
143
144    def monitor_migration_sync(self, max_checks: int = 300) -> bool:
145        """Monitor migration progress synchronously"""
146        self.get_logger().info('Monitoring migration progress...')
147
148        check_count = 0
149        start_time = time.time()
150
151        while check_count < max_checks and rclpy.ok():
152            # Wait between checks
153            time.sleep(1.0)
154
155            # Check current state
156            response = self.get_current_state_sync()
157            if response is None:
158                self.get_logger().warning('Failed to get system state, will retry...')
159                continue
160
161            check_count += 1
162
163            # Check if migration is complete
164            current_state_lower = response.cur_state.lower()
165            target_state_lower = self.target_state.lower()
166
167            state_match = (current_state_lower == target_state_lower)
168            status_match = (response.curr_status.value ==
169                            aimdk_msg.SystemStatus.IN_READY)
170
171            self.get_logger().info(
172                f'Check {check_count}: State="{response.cur_state}" '
173                f'(match={state_match}), Status={response.curr_status.value} (match={status_match})'
174            )
175
176            if state_match and status_match:
177                self.get_logger().info(
178                    f'Migration to {self.target_state} completed successfully!')
179                return True
180
181            # Check timeout
182            if time.time() - start_time > 300.0:  # 5 minutes total timeout
183                self.get_logger().error('Migration timeout after 5 minutes')
184                return False
185
186        self.get_logger().error(f'Migration failed after {max_checks} checks')
187        return False
188
189    def run_migration(self) -> bool:
190        """Run complete migration process"""
191        # Step 1: Wait for services
192        if not self.wait_for_services():
193            return False
194
195        # Step 2: Send migration request
196        if not self.migrate_state_sync():
197            return False
198
199        # Step 3: Monitor migration progress
200        return self.monitor_migration_sync()
201
202
203def main():
204    """
205    Main function
206    """
207    # Check command line arguments
208    if len(sys.argv) != 2:
209        print(f'Usage: {sys.argv[0]} <target_state>')
210        print('\nAvailable target states: Ready, Develop_Nav, Develop_Audio_Linux, Develop_Audio_ROS, Develop_MC')
211        return 1
212
213    # Get target state from command line argument
214    target_state = sys.argv[1]
215
216    # Validate target state
217    if not target_state:
218        print('Error: Target state cannot be empty', file=sys.stderr)
219        return 1
220
221    # Initialize ROS2
222    rclpy.init()
223
224    result = 1
225    node = None
226
227    try:
228        # Create migrator node
229        node = SyncSystemStateMigrator(target_state)
230
231        # Run migration
232        if node.run_migration():
233            print(f'\nMigration to {target_state} completed successfully!')
234            result = 0
235        else:
236            print(f'\nMigration to {target_state} failed')
237            result = 1
238
239    except KeyboardInterrupt:
240        print('\nProgram interrupted by user')
241        result = 1
242    except Exception as e:
243        print(f'Error: {e}', file=sys.stderr)
244        result = 1
245    finally:
246        # Cleanup
247        if node:
248            node.destroy_node()
249        if rclpy.ok():
250            rclpy.shutdown()
251
252    return result
253
254
255if __name__ == '__main__':
256    sys.exit(main())

Usage

ros2 run py_examples migrate_system_state Ready

Example output

[INFO] [1770125663.176574907] [sync_system_state_migrator]: Target state: Ready
[INFO] [1770125663.176758843] [sync_system_state_migrator]: Waiting for services...
[INFO] [1770125663.678726368] [sync_system_state_migrator]: Services are available
[INFO] [1770125663.679642487] [sync_system_state_migrator]: Sending migration request to Ready...
[INFO] [1770125663.692394889] [sync_system_state_migrator]: Migration request accepted
[INFO] [1770125663.693117341] [sync_system_state_migrator]: Monitoring migration progress...
[INFO] [1770125664.728735142] [sync_system_state_migrator]: Check 1: State="Business" (match=False), Status=2 (match=False)
[INFO] [1770125665.733183333] [sync_system_state_migrator]: Check 2: State="Business" (match=False), Status=2 (match=False)
[INFO] [1770125666.736488851] [sync_system_state_migrator]: Check 3: State="Ready" (match=True), Status=1 (match=True)
[INFO] [1770125666.736982424] [sync_system_state_migrator]: Migration to Ready completed successfully!

6.1.2 Get robot mode

Retrieve the robot’s current operating mode by calling the GetMcAction service, including the description, and status information.

Motion Mode Definitions

 1#!/usr/bin/env python3
 2
 3import rclpy
 4import rclpy.logging
 5from rclpy.node import Node
 6
 7from aimdk_msgs.srv import GetMcAction
 8from aimdk_msgs.msg import CommonRequest
 9
10
11class GetMcActionClient(Node):
12    def __init__(self):
13        super().__init__('get_mc_action_client')
14        self.client = self.create_client(
15            GetMcAction, '/aimdk_5Fmsgs/srv/GetMcAction')
16        self.get_logger().info('✅ GetMcAction client node created.')
17
18        # Wait for the service to become available
19        while not self.client.wait_for_service(timeout_sec=2.0):
20            self.get_logger().info('⏳ Service unavailable, waiting...')
21
22        self.get_logger().info('🟢 Service available, ready to send request.')
23
24    def send_request(self):
25        request = GetMcAction.Request()
26        request.request = CommonRequest()
27
28        self.get_logger().info('📨 Sending request to get robot mode')
29        for i in range(8):
30            request.request.header.stamp = self.get_clock().now().to_msg()
31            future = self.client.call_async(request)
32            rclpy.spin_until_future_complete(self, future, timeout_sec=0.25)
33
34            if future.done():
35                break
36
37            # retry as remote peer is NOT handled well by ROS
38            self.get_logger().info(f'trying ... [{i}]')
39
40        response = future.result()
41        if response is None:
42            self.get_logger().error('❌ Service call failed or timed out.')
43            return
44
45        self.get_logger().info('✅ Robot mode get successfully.')
46        self.get_logger().info(f'Mode name: {response.info.action_desc}')
47        self.get_logger().info(f'Mode status: {response.info.status.value}')
48
49
50def main(args=None):
51    rclpy.init(args=args)
52    node = None
53    try:
54        node = GetMcActionClient()
55        node.send_request()
56    except KeyboardInterrupt:
57        pass
58    except Exception as e:
59        rclpy.logging.get_logger('main').error(
60            f'Program exited with exception: {e}')
61
62    if node:
63        node.destroy_node()
64    if rclpy.ok():
65        rclpy.shutdown()
66
67
68if __name__ == '__main__':
69    main()

6.1.3 Set robot mode

This example uses the SetMcAction service. After running the node, enter the corresponding field value of the mode in the terminal, and the robot will immediately switch to the appropriate motion mode.
Before switching to the Stable Standing mode (STAND_DEFAULT), ensure the robot is standing and its feet are already on the ground.
The motion mode switching must follow its state transition digram, other transitions would be rejected
Locomotion Mode(LOCOMOTION_DEFAULT) and Stable Standing Mode(STAND_DEFAULT) are unified and will auto switch internally, so switching manually to the nearer one is enough

  1#!/usr/bin/env python3
  2
  3import sys
  4import rclpy
  5import rclpy.logging
  6from rclpy.node import Node
  7
  8from aimdk_msgs.srv import SetMcAction
  9from aimdk_msgs.msg import RequestHeader, CommonState, McAction, McActionCommand
 10
 11
 12class SetMcActionClient(Node):
 13    def __init__(self):
 14        super().__init__('set_mc_action_client')
 15        self.client = self.create_client(
 16            SetMcAction, '/aimdk_5Fmsgs/srv/SetMcAction'
 17        )
 18        self.get_logger().info('✅ SetMcAction client node created.')
 19
 20        # Wait for the service to become available
 21        while not self.client.wait_for_service(timeout_sec=2.0):
 22            self.get_logger().info('⏳ Service unavailable, waiting...')
 23
 24        self.get_logger().info('🟢 Service available, ready to send request.')
 25
 26    def send_request(self, action_name: str):
 27        req = SetMcAction.Request()
 28        req.header = RequestHeader()
 29        req.source = 'node'
 30        #
 31        cmd = McActionCommand()
 32        cmd.action_desc = action_name
 33        req.command = cmd
 34
 35        self.get_logger().info(
 36            f'📨 Sending request to set robot mode: {action_name}')
 37        for i in range(8):
 38            req.header.stamp = self.get_clock().now().to_msg()
 39            future = self.client.call_async(req)
 40            rclpy.spin_until_future_complete(self, future, timeout_sec=0.25)
 41
 42            if future.done():
 43                break
 44
 45            # retry as remote peer is NOT handled well by ROS
 46            self.get_logger().info(f'trying ... [{i}]')
 47
 48        response = future.result()
 49        if response is None:
 50            self.get_logger().error('❌ Service call failed or timed out.')
 51            return
 52
 53        if response.response.status.value == CommonState.SUCCESS:
 54            self.get_logger().info('✅ Robot mode set successfully.')
 55        else:
 56            self.get_logger().error(
 57                f'❌ Failed to set robot mode: {response.response.message}'
 58            )
 59
 60
 61def main(args=None):
 62    action_info = {
 63        'PASSIVE_DEFAULT': ('PD', 'joints with zero torque'),
 64        'DAMPING_DEFAULT': ('DD', 'joints in damping mode'),
 65        'JOINT_DEFAULT': ('JD', 'Position Control Stand (joints locked)'),
 66        'STAND_DEFAULT': ('SD', 'Stable Stand (auto-balance)'),
 67        'LOCOMOTION_DEFAULT': ('LD', 'locomotion mode (walk or run)'),
 68    }
 69
 70    choices = {}
 71    for k, v in action_info.items():
 72        choices[v[0]] = k
 73
 74    rclpy.init(args=args)
 75    node = None
 76    try:
 77        # Prefer command-line argument, otherwise prompt for input
 78        if len(sys.argv) > 1:
 79            motion = sys.argv[1]
 80        else:
 81            print('{:<4} - {:<20} : {}'.format('abbr',
 82                  'robot mode', 'description'))
 83            for k, v in action_info.items():
 84                print(f'{v[0]:<4} - {k:<20} : {v[1]}')
 85            motion = input('Enter abbr of robot mode:')
 86
 87        action_name = choices.get(motion)
 88        if not action_name:
 89            raise ValueError(f'Invalid abbr of robot mode: {motion}')
 90
 91        node = SetMcActionClient()
 92        node.send_request(action_name)
 93    except KeyboardInterrupt:
 94        pass
 95    except Exception as e:
 96        rclpy.logging.get_logger('main').error(
 97            f'Program exited with exception: {e}')
 98
 99    if node:
100        node.destroy_node()
101    if rclpy.ok():
102        rclpy.shutdown()
103
104
105if __name__ == '__main__':
106    main()

Usage

# Use command-line arguments to set the mode (recommended)
ros2 run py_examples set_mc_action JD  # Zero-Torque >> Position-Control Standing
ros2 run py_examples set_mc_action SD  # Ensure your robot's feet on the ground, Position-Control Standing >> Stable Standing
# Stable Standing >> Locomotion Mode. auto done internally, don't switch manually

# Or run without arguments and the program will prompt for input
ros2 run py_examples set_mc_action

Example output

...
[INFO] [1764066567.502968540] [set_mc_action_client]: ✅ Robot mode set successfully.

Notes

  • Ensure the robot is standing and its feet are on the ground before switching to STAND_DEFAULT mode

  • Mode switching may take several seconds to complete

Interface reference

  • Service: /aimdk_5Fmsgs/srv/SetMcAction

  • Message: aimdk_msgs/srv/SetMcAction

6.1.4 Set robot action

This example uses preset_motion_client; after switching to Stable Stand Mode and starting the node, enter the corresponding field values to perform preset actions with the left (or right) hand such as handshake, raise hand, wave, or air kiss.

Available parameters are listed in the preset motions table

 1#!/usr/bin/env python3
 2
 3import rclpy
 4import rclpy.logging
 5from rclpy.node import Node
 6
 7from aimdk_msgs.srv import SetMcPresetMotion
 8from aimdk_msgs.msg import McPresetMotion, McControlArea, RequestHeader, CommonState
 9
10
11class SetMcPresetMotionClient(Node):
12    def __init__(self):
13        super().__init__('preset_motion_client')
14        self.client = self.create_client(
15            SetMcPresetMotion, '/aimdk_5Fmsgs/srv/SetMcPresetMotion')
16        self.get_logger().info('✅ SetMcPresetMotion client node created.')
17
18        # Wait for the service to become available
19        while not self.client.wait_for_service(timeout_sec=2.0):
20            self.get_logger().info('⏳ Service unavailable, waiting...')
21
22        self.get_logger().info('🟢 Service available, ready to send request.')
23
24    def send_request(self, area_id: int, motion_id: int) -> bool:
25        request = SetMcPresetMotion.Request()
26        request.header = RequestHeader()
27
28        motion = McPresetMotion()
29        area = McControlArea()
30
31        motion.value = motion_id
32        area.value = area_id
33
34        request.motion = motion
35        request.area = area
36        request.interrupt = False
37
38        self.get_logger().info(
39            f'📨 Sending request to set preset motion: motion={motion_id}, area={area_id}')
40
41        for i in range(8):
42            request.header.stamp = self.get_clock().now().to_msg()
43            future = self.client.call_async(request)
44            rclpy.spin_until_future_complete(self, future, timeout_sec=0.25)
45
46            if future.done():
47                break
48
49            # retry as remote peer is NOT handled well by ROS
50            self.get_logger().info(f'trying ... [{i}]')
51
52        response = future.result()
53        if response is None:
54            self.get_logger().error('❌ Service call failed or timed out.')
55            return False
56
57        if response.response.header.code == 0:
58            self.get_logger().info(
59                f'✅ Preset motion set successfully: {response.response.task_id}')
60            return True
61        elif response.response.state.value == CommonState.RUNNING:
62            self.get_logger().info(
63                f'⏳ Preset motion executing: {response.response.task_id}')
64            return True
65        else:
66            self.get_logger().error(
67                f'❌ Failed to set preset motion: {response.response.task_id}'
68            )
69            return False
70
71
72def main(args=None):
73    rclpy.init(args=args)
74    node = None
75    try:
76        area = int(input("Enter arm area ID (1-left, 2-right): "))
77        motion = int(input(
78            "Enter preset motion ID (1001-raise, 1002-wave, 1003-handshake, 1004-airkiss): "))
79
80        node = SetMcPresetMotionClient()
81        node.send_request(area, motion)
82    except KeyboardInterrupt:
83        pass
84    except Exception as e:
85        rclpy.logging.get_logger('main').error(
86            f'Program exited with exception: {e}')
87
88    if node:
89        node.destroy_node()
90    if rclpy.ok():
91        rclpy.shutdown()
92
93
94if __name__ == '__main__':
95    main()

6.1.5 Gripper control

This example uses hand_control. Publish messages to the /aima/hal/joint/hand/command topic to control the gripper.

Attention

Note ⚠️: Before running this example, stop the native MC module on the robot control unit (PC1) by running aima em stop-app mc to obtain control of the robot. Ensure the robot is safe before operating.

 1import rclpy
 2from rclpy.node import Node
 3from aimdk_msgs.msg import HandCommandArray, HandCommand, HandType, MessageHeader
 4import time
 5
 6
 7class HandControl(Node):
 8    def __init__(self):
 9        super().__init__('hand_control')
10
11        # Preset parameter list: [(left hand, right hand), ...]
12        self.position_pairs = [
13            (1.0, 1.0),   # fully open
14            (0.0, 0.0),   # fully closed
15            (0.5, 0.5),   # half open
16            (0.2, 0.8),   # left slightly closed, right more open
17            (0.7, 0.3)    # left more open, right slightly closed
18        ]
19        self.current_index = 0
20        self.last_switch_time = self.get_clock().now().nanoseconds / 1e9  # seconds
21
22        # Create publisher
23        self.publisher_ = self.create_publisher(
24            HandCommandArray,
25            '/aima/hal/joint/hand/command',
26            10
27        )
28
29        # 50 Hz timer
30        self.timer_ = self.create_timer(
31            0.02,  # 20 ms = 50 Hz
32            self.publish_hand_commands
33        )
34
35        self.get_logger().info("Hand control node started!")
36
37    def publish_hand_commands(self):
38        # Check time to decide whether to switch to the next preset
39        now_sec = self.get_clock().now().nanoseconds / 1e9
40        if now_sec - self.last_switch_time >= 2.0:
41            self.current_index = (self.current_index +
42                                  1) % len(self.position_pairs)
43            self.last_switch_time = now_sec
44            self.get_logger().info(
45                f"Switched to preset: {self.current_index}, left={self.position_pairs[self.current_index][0]:.2f}, right={self.position_pairs[self.current_index][1]:.2f}"
46            )
47
48        # Use current preset
49        left_position, right_position = self.position_pairs[self.current_index]
50
51        msg = HandCommandArray()
52        msg.header = MessageHeader()
53
54        # Configure left hand
55        left_hand = HandCommand()
56        left_hand.name = "left_hand"
57        left_hand.position = float(left_position)
58        left_hand.velocity = 1.0
59        left_hand.acceleration = 1.0
60        left_hand.deceleration = 1.0
61        left_hand.effort = 1.0
62
63        # Configure right hand
64        right_hand = HandCommand()
65        right_hand.name = "right_hand"
66        right_hand.position = float(right_position)
67        right_hand.velocity = 1.0
68        right_hand.acceleration = 1.0
69        right_hand.deceleration = 1.0
70        right_hand.effort = 1.0
71
72        msg.left_hand_type = HandType(value=2)  # gripper mode
73        msg.right_hand_type = HandType(value=2)
74        msg.left_hands = [left_hand]
75        msg.right_hands = [right_hand]
76
77        # Publish message
78        self.publisher_.publish(msg)
79        # We only log when switching presets to avoid too much log output
80
81
82def main(args=None):
83    rclpy.init(args=args)
84    hand_control_node = HandControl()
85
86    try:
87        rclpy.spin(hand_control_node)
88    except KeyboardInterrupt:
89        pass
90    finally:
91        hand_control_node.destroy_node()
92        if rclpy.ok():
93            rclpy.shutdown()
94
95
96if __name__ == '__main__':
97    main()

6.1.6 Dexterous Hand Control

This example uses omnihand_control. Publish messages to the /aima/hal/joint/hand/command topic to control the omnihand.

Attention

Note ⚠️: Before running this example, stop the native MC module on the robot control unit (PC1) by running aima em stop-app mc to obtain control of the robot. Ensure the robot is safe before operating.

  1import rclpy
  2from rclpy.node import Node
  3from aimdk_msgs.msg import HandCommandArray, HandCommand, HandType, MessageHeader
  4import time
  5
  6
  7class HandControl(Node):
  8    def __init__(self):
  9        super().__init__('hand_control')
 10
 11        # Create publisher
 12        self.publisher_ = self.create_publisher(
 13            HandCommandArray,
 14            '/aima/hal/joint/hand/command',
 15            10
 16        )
 17
 18        self.timer_ = self.create_timer(
 19            0.8,
 20            self.publish_hand_commands
 21        )
 22
 23        # Initialize variables
 24        self.target_finger = 0
 25        self.step = 1
 26        self.increasing = True
 27        self.get_logger().info("Hand control node started!")
 28
 29    def build_hand_cmd(self, name: str) -> HandCommand:
 30        cmd = HandCommand()
 31        cmd.name = name
 32        cmd.position = 0.0
 33        cmd.velocity = 0.1
 34        cmd.acceleration = 0.0
 35        cmd.deceleration = 0.0
 36        cmd.effort = 0.0
 37        return cmd
 38
 39    def publish_hand_commands(self):
 40        msg = HandCommandArray()
 41        msg.header.stamp = self.get_clock().now().to_msg()
 42        msg.header.frame_id = 'hand_command'
 43        msg.left_hand_type.value = 1      # NIMBLE_HANDS
 44        msg.right_hand_type.value = 1     # NIMBLE_HANDS
 45
 46        # left hand
 47        msg.left_hands = [self.build_hand_cmd('') for _ in range(10)]
 48        msg.left_hands[0].name = 'left_thumb'
 49        for i in range(1, 10):
 50            msg.left_hands[i].name = 'left_index'
 51
 52        # right hand
 53        msg.right_hands = [self.build_hand_cmd('') for _ in range(10)]
 54        msg.right_hands[0].name = 'right_thumb'
 55        for i in range(1, 10):
 56            msg.right_hands[i].name = 'right_pinky'
 57
 58        if self.target_finger < 10:
 59            msg.right_hands[self.target_finger].position = 0.8
 60        else:
 61            target_finger_ = self.target_finger - 10
 62            target_position = 0.8
 63            if target_finger_ < 3:
 64                # The three thumb motors on the left hand need their signs inverted to mirror the right hand's motion
 65                target_position = -target_position
 66            msg.left_hands[target_finger_].position = target_position
 67
 68        self.publisher_.publish(msg)
 69        self.get_logger().info(
 70            f'Published hand command with target_finger: {self.target_finger}')
 71        self.update_target_finger()
 72
 73    def update_target_finger(self):
 74        if self.increasing:
 75            self.target_finger += self.step
 76            if self.target_finger >= 19:
 77                self.target_finger = 19
 78                self.increasing = False
 79        else:
 80            self.target_finger -= self.step
 81            if self.target_finger <= 0:
 82                self.target_finger = 0
 83                self.increasing = True
 84
 85
 86def main(args=None):
 87    rclpy.init(args=args)
 88    hand_control_node = HandControl()
 89
 90    try:
 91        rclpy.spin(hand_control_node)
 92    except KeyboardInterrupt:
 93        pass
 94    finally:
 95        hand_control_node.destroy_node()
 96        if rclpy.ok():
 97            rclpy.shutdown()
 98
 99
100if __name__ == '__main__':
101    main()

6.1.7 Omnihand touch sensors (T2.1 support)

**This example uses omnihand_touch. By subscribing to the /aima/hal/joint/hand/state, you can monitor the pressure of the omnihand finger touch sensor **

Attention

Note ⚠️: Before running this example, stop the native MC module on the robot control unit (PC1) by running aima em stop-app mc to obtain control of the robot. Ensure the robot is safe before operating.

  1#!/usr/bin/env python3
  2
  3import rclpy
  4from rclpy.node import Node
  5from rclpy.qos import QoSProfile, ReliabilityPolicy
  6from aimdk_msgs.msg import HandStateArray, HandCommandArray
  7import time
  8
  9
 10class HandStateSubscriber(Node):
 11    def __init__(self):
 12        super().__init__('hand_state_subscriber')
 13
 14        # Create publisher
 15        self.publisher_ = self.create_publisher(
 16            HandCommandArray,
 17            '/aima/hal/joint/hand/command',
 18            10
 19        )
 20
 21        # Create QoS profile with BEST_EFFORT reliability
 22        qos_profile = QoSProfile(depth=10)
 23        qos_profile.reliability = ReliabilityPolicy.BEST_EFFORT
 24
 25        # Create subscriber
 26        self.subscription = self.create_subscription(
 27            HandStateArray,
 28            '/aima/hal/joint/hand/state',
 29            self.topic_callback,
 30            qos_profile
 31        )
 32
 33        # Create a timer to publish once per second
 34        self.timer = self.create_timer(1.0, self.publish_command)
 35
 36        self.get_logger().info(
 37            "Subscriber started, listening to /aima/hal/joint/hand/state topic..."
 38        )
 39
 40    def topic_callback(self, msg):
 41        """Callback function for handling incoming messages"""
 42        # Print message header information
 43        self.get_logger().info(
 44            f"Message received - Sequence: {msg.header.sequence}, "
 45            f"Timestamp: {msg.header.stamp.sec}.{msg.header.stamp.nanosec:09d}"
 46        )
 47
 48        # Print left hand touch sensor data
 49        self.print_touch_sensor_data("Left Hand", msg.left_touch_sensors)
 50
 51        # Print right hand touch sensor data
 52        self.print_touch_sensor_data("Right Hand", msg.right_touch_sensors)
 53
 54        print()
 55
 56    def print_touch_sensor_data(self, hand_name, sensor_data):
 57        """
 58        Print touch sensor data for a specific hand
 59
 60        Args:
 61            hand_name: Name of the hand (Left Hand/Right Hand)
 62            sensor_data: Touch sensor data structure
 63        """
 64        print(f"=== {hand_name} Touch Sensor Data ===")
 65
 66        # Print palm touch data
 67        print("Palm Touch Data (36 elements): ", end="")
 68        self.print_array(sensor_data.palm_touch_data)
 69
 70        # Print back of hand touch data
 71        print("Back of Hand Touch Data (36 elements): ", end="")
 72        self.print_array(sensor_data.back_of_hand_touch_data)
 73
 74        # Print finger touch data
 75        print("Thumb Touch Data (16 elements): ", end="")
 76        self.print_array(sensor_data.thumb_touch_data)
 77
 78        print("Index Finger Touch Data (16 elements): ", end="")
 79        self.print_array(sensor_data.index_finger_touch_data)
 80
 81        print("Middle Finger Touch Data (16 elements): ", end="")
 82        self.print_array(sensor_data.middle_finger_touch_data)
 83
 84        print("Ring Finger Touch Data (16 elements): ", end="")
 85        self.print_array(sensor_data.ring_finger_touch_data)
 86
 87        print("Little Finger Touch Data (16 elements): ", end="")
 88        self.print_array(sensor_data.little_finger_touch_data)
 89
 90    def print_array(self, arr):
 91        """
 92        Print array of uint8_t elements
 93
 94        Args:
 95            arr: Array to print (list or tuple)
 96        """
 97        print("[", end="")
 98        for i, val in enumerate(arr):
 99            print(f"{val:3d}", end="")
100            if i < len(arr) - 1:
101                print(" ", end="")
102        print("]")
103
104    def publish_command(self):
105        """Publish hand command message"""
106        message = HandCommandArray()
107
108        # Set header
109        message.header.stamp = self.get_clock().now().to_msg()
110        message.header.frame_id = "hand_command"
111
112        # Set the hand type
113        message.left_hand_type.value = 1  # NIMBLE_HANDS
114        message.right_hand_type.value = 1  # NIMBLE_HANDS
115
116        # Create left hand command array
117        from aimdk_msgs.msg import HandCommand
118        message.left_hands = []
119
120        # Set left thumb
121        left_thumb = HandCommand()
122        left_thumb.name = "left_thumb"
123        left_thumb.position = 0.0
124        left_thumb.velocity = 0.1
125        left_thumb.acceleration = 0.0
126        left_thumb.deceleration = 0.0
127        left_thumb.effort = 0.0
128        message.left_hands.append(left_thumb)
129
130        # Set other left fingers
131        for i in range(1, 10):
132            finger = HandCommand()
133            finger.name = "left_index"
134            finger.position = 0.0
135            finger.velocity = 0.1
136            finger.acceleration = 0.0
137            finger.deceleration = 0.0
138            finger.effort = 0.0
139            message.left_hands.append(finger)
140
141        # Create right hand command array
142        message.right_hands = []
143
144        # Set right thumb
145        right_thumb = HandCommand()
146        right_thumb.name = "right_thumb"
147        right_thumb.position = 0.0
148        right_thumb.velocity = 0.1
149        right_thumb.acceleration = 0.0
150        right_thumb.deceleration = 0.0
151        right_thumb.effort = 0.0
152        message.right_hands.append(right_thumb)
153
154        # Set other right fingers (pinky)
155        for i in range(1, 10):
156            finger = HandCommand()
157            finger.name = "right_pinky"
158            finger.position = 0.0
159            finger.velocity = 0.1
160            finger.acceleration = 0.0
161            finger.deceleration = 0.0
162            finger.effort = 0.0
163            message.right_hands.append(finger)
164
165        # Publish the message
166        self.publisher_.publish(message)
167
168        self.get_logger().info("Published hand command")
169
170
171def main(args=None):
172    rclpy.init(args=args)
173    node = HandStateSubscriber()
174    try:
175        rclpy.spin(node)
176    except KeyboardInterrupt:
177        pass
178    finally:
179        node.destroy_node()
180        if rclpy.ok():
181            rclpy.shutdown()
182
183
184if __name__ == '__main__':
185    main()

The pressure sensors are located at the fingertips of all five fingers, as well as on the palm and the back of the hand. At least one motion command needs to be sent to the dexterous hand in order to receive the corresponding pressure data report.

6.1.8 Register custom input source

For versions after v0.7, you must register an input source before controlling the MC. This example registers a custom input source via the /aimdk_5Fmsgs/srv/SetMcInputSource service so MC becomes aware of it; only registered input sources can perform robot velocity control.

  1#!/usr/bin/env python3
  2
  3import rclpy
  4import rclpy.logging
  5from rclpy.node import Node
  6
  7from aimdk_msgs.srv import SetMcInputSource
  8from aimdk_msgs.msg import RequestHeader, McInputAction
  9
 10
 11class McInputClient(Node):
 12    def __init__(self):
 13        super().__init__('set_mc_input_source_client')
 14        self.client = self.create_client(
 15            SetMcInputSource, '/aimdk_5Fmsgs/srv/SetMcInputSource'
 16        )
 17
 18        self.get_logger().info('✅ SetMcInputSource client node created.')
 19
 20        # Wait for the service to become available
 21        while not self.client.wait_for_service(timeout_sec=2.0):
 22            self.get_logger().info('⏳ Service unavailable, waiting...')
 23
 24        self.get_logger().info('🟢 Service available, ready to send request.')
 25
 26    def send_request(self):
 27        req = SetMcInputSource.Request()
 28
 29        # header
 30        req.request.header = RequestHeader()
 31
 32        # action (e.g. 1001 = register)
 33        req.action = McInputAction()
 34        req.action.value = 1001
 35
 36        # input source info
 37        req.input_source.name = 'node'
 38        req.input_source.priority = 40
 39        req.input_source.timeout = 1000  # ms
 40
 41        # Send request and wait for response
 42        self.get_logger().info(
 43            f'📨 Sending input source request: action_id={req.action.value}, '
 44            f'name={req.input_source.name}, priority={req.input_source.priority}'
 45        )
 46        for i in range(8):
 47            req.request.header.stamp = self.get_clock().now().to_msg()
 48            future = self.client.call_async(req)
 49            rclpy.spin_until_future_complete(
 50                self, future, timeout_sec=0.25)
 51
 52            if future.done():
 53                break
 54
 55            # retry as remote peer is NOT handled well by ROS
 56            self.get_logger().info(f'trying ... [{i}]')
 57
 58        if not future.done():
 59            self.get_logger().error('❌ Service call failed or timed out.')
 60            return False
 61
 62        response = future.result()
 63        ret_code = response.response.header.code
 64        task_id = response.response.task_id
 65
 66        if ret_code == 0:
 67            self.get_logger().info(
 68                f'✅ Input source set successfully. task_id={task_id}'
 69            )
 70            return True
 71        else:
 72            self.get_logger().error(
 73                f'❌ Input source set failed. ret_code={ret_code}, task_id={task_id} (duplicated ADD? or MODIFY/ENABLE/DISABLE for unknown source?)'
 74            )
 75            return False
 76
 77
 78def main(args=None):
 79    rclpy.init(args=args)
 80
 81    node = None
 82    try:
 83        node = McInputClient()
 84        ok = node.send_request()
 85        if not ok:
 86            node.get_logger().error('Input source request failed.')
 87    except KeyboardInterrupt:
 88        pass
 89    except Exception as e:
 90        rclpy.logging.get_logger('main').error(
 91            f'Program exited with exception: {e}')
 92
 93    if node:
 94        node.destroy_node()
 95    if rclpy.ok():
 96        rclpy.shutdown()
 97
 98
 99if __name__ == '__main__':
100    main()

6.1.9 Get current input source

This example uses the GetCurrentInputSource service to retrieve information about the currently registered input source, including name, priority, and timeout.

 1#!/usr/bin/env python3
 2
 3import rclpy
 4from rclpy.node import Node
 5from aimdk_msgs.srv import GetCurrentInputSource
 6from aimdk_msgs.msg import CommonRequest
 7
 8
 9class GetCurrentInputSourceClient(Node):
10    def __init__(self):
11        super().__init__('get_current_input_source_client')
12        self.client = self.create_client(
13            GetCurrentInputSource,
14            '/aimdk_5Fmsgs/srv/GetCurrentInputSource'
15        )
16
17        self.get_logger().info('✅ GetCurrentInputSource client node created.')
18
19        # Wait for the service to become available
20        while not self.client.wait_for_service(timeout_sec=2.0):
21            self.get_logger().info('⏳ Service unavailable, waiting...')
22
23        self.get_logger().info('🟢 Service available, ready to send request.')
24
25    def send_request(self):
26        # Create request
27        req = GetCurrentInputSource.Request()
28        req.request = CommonRequest()
29
30        # Send request and wait for response
31        self.get_logger().info('📨 Sending request to get current input source')
32        for i in range(8):
33            req.request.header.stamp = self.get_clock().now().to_msg()
34            future = self.client.call_async(req)
35            rclpy.spin_until_future_complete(self, future, timeout_sec=0.25)
36
37            if future.done():
38                break
39
40            # retry as remote peer is NOT handled well by ROS
41            self.get_logger().info(f'trying ... [{i}]')
42
43        if not future.done():
44            self.get_logger().error('❌ Service call failed or timed out.')
45            return False
46
47        response = future.result()
48        ret_code = response.response.header.code
49        if ret_code == 0:
50            self.get_logger().info(
51                f'✅ Current input source get successfully:')
52            self.get_logger().info(
53                f'Name: {response.input_source.name}')
54            self.get_logger().info(
55                f'Priority: {response.input_source.priority}')
56            self.get_logger().info(
57                f'Timeout: {response.input_source.timeout}')
58            return True
59        else:
60            self.get_logger().error(
61                f'❌ Current input source get failed, return code: {ret_code}')
62            return False
63
64
65def main(args=None):
66    rclpy.init(args=args)
67
68    node = None
69    try:
70        node = GetCurrentInputSourceClient()
71        success = node.send_request()
72    except KeyboardInterrupt:
73        pass
74    except Exception as e:
75        rclpy.logging.get_logger('main').error(
76            f'Program exited with exception: {e}')
77
78    if node:
79        node.destroy_node()
80    if rclpy.ok():
81        rclpy.shutdown()
82
83
84if __name__ == '__main__':
85    main()

Usage

# Get current input source info
ros2 run py_examples get_current_input_source

Example output

[INFO] [get_current_input_source_client]: Current input source: node
[INFO] [get_current_input_source_client]: Priority: 40
[INFO] [get_current_input_source_client]: Timeout: 1000

Notes

  • Ensure the GetCurrentInputSource service is running properly

  • Valid information can only be retrieved after registering an input source

  • A status code of 0 indicates the query succeeded

6.1.10 Control robot locomotion

This example uses mc_locomotion_velocity. The example below controls robot walking by publishing to the /aima/mc/locomotion/velocity topic. For versions after v0.7, you must register an input source before enabling velocity control (this example already registers an input source); see the code for registration steps.

Start the node after switching to Stable Standing Mode:

  1#!/usr/bin/env python3
  2
  3import rclpy
  4from rclpy.node import Node
  5import time
  6import signal
  7import sys
  8
  9from aimdk_msgs.msg import McLocomotionVelocity, MessageHeader
 10from aimdk_msgs.srv import SetMcInputSource
 11
 12
 13class DirectVelocityControl(Node):
 14    def __init__(self):
 15        super().__init__('direct_velocity_control')
 16
 17        self.publisher = self.create_publisher(
 18            McLocomotionVelocity, '/aima/mc/locomotion/velocity', 10)
 19        self.client = self.create_client(
 20            SetMcInputSource, '/aimdk_5Fmsgs/srv/SetMcInputSource')
 21
 22        self.forward_velocity = 0.0
 23        self.lateral_velocity = 0.0
 24        self.angular_velocity = 0.0
 25
 26        self.max_forward_speed = 1.0
 27        self.min_forward_speed = 0.2
 28
 29        self.max_lateral_speed = 1.0
 30        self.min_lateral_speed = 0.2
 31
 32        self.max_angular_speed = 1.0
 33        self.min_angular_speed = 0.1
 34
 35        self.timer = None
 36
 37        self.get_logger().info("Direct velocity control node started!")
 38
 39    def start_publish(self):
 40        if not self.timer:
 41            self.timer = self.create_timer(0.02, self.publish_velocity)
 42
 43    def register_input_source(self):
 44        self.get_logger().info("Registering input source...")
 45
 46        timeout_sec = 8.0
 47        start = self.get_clock().now().nanoseconds / 1e9
 48
 49        while not self.client.wait_for_service(timeout_sec=2.0):
 50            now = self.get_clock().now().nanoseconds / 1e9
 51            if now - start > timeout_sec:
 52                self.get_logger().error("Waiting for service timed out")
 53                return False
 54            self.get_logger().info("Waiting for input source service...")
 55
 56        req = SetMcInputSource.Request()
 57        req.action.value = 1001
 58        req.input_source.name = "node"
 59        req.input_source.priority = 40
 60        req.input_source.timeout = 1000
 61
 62        for i in range(8):
 63            req.request.header.stamp = self.get_clock().now().to_msg()
 64            future = self.client.call_async(req)
 65            rclpy.spin_until_future_complete(self, future, timeout_sec=0.25)
 66
 67            if future.done():
 68                break
 69
 70            # retry as remote peer is NOT handled well by ROS
 71            self.get_logger().info(f"trying to register input source... [{i}]")
 72
 73        if future.done():
 74            try:
 75                response = future.result()
 76                state = response.response.state.value
 77                self.get_logger().info(
 78                    f"Input source set successfully: state={state}, task_id={response.response.task_id}")
 79                return True
 80            except Exception as e:
 81                self.get_logger().error(f"Service call exception: {str(e)}")
 82                return False
 83        else:
 84            self.get_logger().error("Service call failed or timed out")
 85            return False
 86
 87    def publish_velocity(self):
 88        msg = McLocomotionVelocity()
 89        msg.header = MessageHeader()
 90        msg.header.stamp = self.get_clock().now().to_msg()
 91        msg.source = "node"
 92        msg.forward_velocity = self.forward_velocity
 93        msg.lateral_velocity = self.lateral_velocity
 94        msg.angular_velocity = self.angular_velocity
 95
 96        self.publisher.publish(msg)
 97
 98        self.get_logger().info(
 99            f"Publishing velocity: forward {self.forward_velocity:.2f} m/s, "
100            f"lateral {self.lateral_velocity:.2f} m/s, "
101            f"angular {self.angular_velocity:.2f} rad/s"
102        )
103
104    def set_forward(self, forward):
105        # check value range, mc has thresholds to start movement
106        if abs(forward) < 0.005:
107            self.forward_velocity = 0.0
108            return True
109        elif abs(forward) > self.max_forward_speed or abs(forward) < self.min_forward_speed:
110            raise ValueError("out of range")
111        else:
112            self.forward_velocity = forward
113            return True
114
115    def set_lateral(self, lateral):
116        # check value range, mc has thresholds to start movement
117        if abs(lateral) < 0.005:
118            self.lateral_velocity = 0.0
119            return True
120        elif abs(lateral) > self.max_lateral_speed or abs(lateral) < self.min_lateral_speed:
121            raise ValueError("out of range")
122        else:
123            self.lateral_velocity = lateral
124            return True
125
126    def set_angular(self, angular):
127        # check value range, mc has thresholds to start movement
128        if abs(angular) < 0.005:
129            self.angular_velocity = 0.0
130            return True
131        elif abs(angular) > self.max_angular_speed or abs(angular) < self.min_angular_speed:
132            raise ValueError("out of range")
133        else:
134            self.angular_velocity = angular
135            return True
136
137    def clear_velocity(self):
138        self.forward_velocity = 0.0
139        self.lateral_velocity = 0.0
140        self.angular_velocity = 0.0
141
142
143# Global node instance for signal handling
144global_node = None
145
146
147def signal_handler(sig, frame):
148    global global_node
149    if global_node is not None:
150        global_node.clear_velocity()
151        global_node.get_logger().info(
152            f"Received signal {sig}, clearing velocity and shutting down")
153    if rclpy.ok():
154        rclpy.shutdown()
155    sys.exit(0)
156
157
158def main():
159    global global_node
160    rclpy.init()
161
162    node = DirectVelocityControl()
163    global_node = node
164
165    signal.signal(signal.SIGINT, signal_handler)
166    signal.signal(signal.SIGTERM, signal_handler)
167
168    if not node.register_input_source():
169        node.get_logger().error("Input source registration failed, exiting")
170        if rclpy.ok():
171            rclpy.shutdown()
172        return
173
174    # get and check control values
175    # notice that mc has thresholds to start movement
176    try:
177        # get input forward
178        forward = float(
179            input("Please enter forward velocity 0 or ±(0.2 ~ 1.0) m/s: "))
180        node.set_forward(forward)
181        # get input lateral
182        lateral = float(
183            input("Please enter lateral velocity 0 or ±(0.2 ~ 1.0) m/s: "))
184        node.set_lateral(lateral)
185        # get input angular
186        angular = float(
187            input("Please enter angular velocity 0 or ±(0.1 ~ 1.0) rad/s: "))
188        node.set_angular(angular)
189    except Exception as e:
190        node.get_logger().error(f"Invalid input: {e}")
191        if rclpy.ok():
192            rclpy.shutdown()
193        return
194
195    node.get_logger().info("Setting velocity, moving for 5 seconds")
196    node.start_publish()
197
198    start = node.get_clock().now()
199    while (node.get_clock().now() - start).nanoseconds / 1e9 < 5.0:
200        rclpy.spin_once(node, timeout_sec=0.1)
201        time.sleep(0.001)
202
203    node.clear_velocity()
204    node.get_logger().info("5-second motion finished, robot stopped")
205
206    try:
207        rclpy.spin(node)
208    except KeyboardInterrupt:
209        pass
210    finally:
211        if rclpy.ok():
212            rclpy.shutdown()
213
214
215if __name__ == '__main__':
216    main()

6.1.11 Joint motor control

This example demonstrates how to use ROS2 and the Ruckig library to control robot joint motion.

Attention

Note ⚠️: Before running this example, stop the native MC module on the robot control unit (PC1) by running aima em stop-app mc to obtain control of the robot. Ensure the robot is safe before operating.

! This example directly controls low-level motors (the HAL layer). Before running, verify that the joint safety limits in the code match your robot and ensure safety!

Robot joint control example

This example shows how to use ROS2 and the Ruckig library to control robot joints. The example implements the following features:

  1. Robot joint model definition

  2. Trajectory interpolation using Ruckig

  3. Multi-joint coordinated control

  4. Real-time control of position, velocity, and acceleration

Example feature description

  1. Creates four controller nodes, which control:

    • Legs x2 (12 joints)

    • Waist x1 (3 joints)

    • Arms x2 (14 joints)

    • Head x1 (2 joints)

  2. Demo features:

    • Oscillate a specified joint between ±0.5 radians every 10 seconds

    • Use the Ruckig library to generate smooth motion trajectories

    • Publish joint control commands in real time

Custom usage

  1. Add new control logic:

    • Add new control callback functions

    • Compose joint motions freely

  2. Adjust control frequency:

    • Modify the control_timer_ period (currently 2 ms)

  1#!/usr/bin/env python3
  2"""
  3Robot joint control example
  4This script implements a ROS2-based robot joint control system, using the Ruckig trajectory
  5planner to achieve smooth joint motion control.
  6
  7Main features:
  81. Supports controlling multiple joint areas (head, arm, waist, leg)
  92. Uses Ruckig for trajectory planning to ensure smooth motion
 103. Supports real-time control of joint position, velocity, and acceleration
 114. Provides joint limit and PID (stiffness/damping) parameter configuration
 12"""
 13
 14import rclpy
 15from rclpy.node import Node
 16from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy, DurabilityPolicy
 17from aimdk_msgs.msg import JointCommandArray, JointStateArray, JointCommand
 18from std_msgs.msg import Header
 19import ruckig
 20from enum import Enum
 21from dataclasses import dataclass
 22from typing import List, Dict
 23from threading import Lock
 24
 25# QoS config: define ROS2 Quality of Service parameters
 26# Subscriber QoS: best-effort reliability, keep last 10 messages
 27subscriber_qos = QoSProfile(
 28    reliability=ReliabilityPolicy.BEST_EFFORT,
 29    history=HistoryPolicy.KEEP_LAST,
 30    depth=10,
 31    durability=DurabilityPolicy.VOLATILE
 32)
 33
 34# Publisher QoS: reliable transport, keep last 10 messages
 35publisher_qos = QoSProfile(
 36    reliability=ReliabilityPolicy.RELIABLE,
 37    history=HistoryPolicy.KEEP_LAST,
 38    depth=10,
 39    durability=DurabilityPolicy.VOLATILE
 40)
 41
 42
 43class JointArea(Enum):
 44    HEAD = 'HEAD'    # Head joints
 45    ARM = 'ARM'      # Arm joints
 46    WAIST = 'WAIST'  # Waist joints
 47    LEG = 'LEG'      # Leg joints
 48
 49
 50@dataclass
 51class JointInfo:
 52    # Joint information data class
 53    name: str           # Joint name
 54    lower_limit: float  # Joint lower angle limit
 55    upper_limit: float  # Joint upper angle limit
 56    kp: float           # Position control proportional gain
 57    kd: float           # Velocity control derivative gain
 58
 59
 60# Robot model configuration: define all joint parameters
 61robot_model: Dict[JointArea, List[JointInfo]] = {
 62    # Leg joint configuration
 63    JointArea.LEG: [
 64        # Left leg joints
 65        JointInfo("left_hip_pitch_joint", -2.704, 2.556, 40.0, 4.0),
 66        JointInfo("left_hip_roll_joint", -0.235, 2.906, 40.0, 4.0),
 67        JointInfo("left_hip_yaw_joint", -1.684, 3.430, 30.0, 3.0),
 68        JointInfo("left_knee_joint", 0.0000, 2.4073, 80.0, 8.0),
 69        JointInfo("left_ankle_pitch_joint", -0.803, 0.453, 40.0, 4.0),
 70        JointInfo("left_ankle_roll_joint", -0.2625, 0.2625, 20.0, 2.0),
 71        # Right leg joints
 72        JointInfo("right_hip_pitch_joint", -2.704, 2.556, 40.0, 4.0),
 73        JointInfo("right_hip_roll_joint", -2.906, 0.235, 40.0, 4.0),
 74        JointInfo("right_hip_yaw_joint", -3.430, 1.684, 30.0, 3.0),
 75        JointInfo("right_knee_joint", 0.0000, 2.4073, 80.0, 8.0),
 76        JointInfo("right_ankle_pitch_joint", -0.803, 0.453, 40.0, 4.0),
 77        JointInfo("right_ankle_roll_joint", -0.2625, 0.2625, 20.0, 2.0),
 78    ],
 79    # Waist joint configuration
 80    JointArea.WAIST: [
 81        JointInfo("waist_yaw_joint", -3.43, 2.382, 20.0, 4.0),
 82        JointInfo("waist_pitch_joint", -0.314, 0.314, 20.0, 4.0),
 83        JointInfo("waist_roll_joint", -0.488, 0.488, 20.0, 4.0),
 84    ],
 85    # Arm joint configuration
 86    JointArea.ARM: [
 87        # Left arm
 88        JointInfo("left_shoulder_pitch_joint", -3.08, 2.04, 20.0, 2.0),
 89        JointInfo("left_shoulder_roll_joint", -0.061, 2.993, 20.0, 2.0),
 90        JointInfo("left_shoulder_yaw_joint", -2.556, 2.556, 20.0, 2.0),
 91        JointInfo("left_elbow_joint", -2.3556, 0.0, 20.0, 2.0),
 92        JointInfo("left_wrist_yaw_joint", -2.556, 2.556, 20.0, 2.0),
 93        JointInfo("left_wrist_pitch_joint", -0.558, 0.558, 20.0, 2.0),
 94        JointInfo("left_wrist_roll_joint", -1.571, 0.724, 20.0, 2.0),
 95        # Right arm
 96        JointInfo("right_shoulder_pitch_joint", -3.08, 2.04, 20.0, 2.0),
 97        JointInfo("right_shoulder_roll_joint", -2.993, 0.061, 20.0, 2.0),
 98        JointInfo("right_shoulder_yaw_joint", -2.556, 2.556, 20.0, 2.0),
 99        JointInfo("right_elbow_joint", -2.3556, 0.0000, 20.0, 2.0),
100        JointInfo("right_wrist_yaw_joint", -2.556, 2.556, 20.0, 2.0),
101        JointInfo("right_wrist_pitch_joint", -0.558, 0.558, 20.0, 2.0),
102        JointInfo("right_wrist_roll_joint", -0.724, 1.571, 20.0, 2.0),
103    ],
104    # Head joint configuration
105    JointArea.HEAD: [
106        JointInfo("head_yaw_joint", -0.366, 0.366, 20.0, 2.0),
107        JointInfo("head_pitch_joint", -0.3838, 0.3838, 20.0, 2.0),
108    ],
109}
110
111
112class JointControllerNode(Node):
113    """
114    Joint controller node
115    Responsible for receiving joint states, using Ruckig for trajectory planning,
116    and publishing joint commands.
117    """
118
119    def __init__(self, node_name: str, sub_topic: str, pub_topic: str, area: JointArea, dofs: int):
120        """
121        Initialize joint controller
122        Args:
123            node_name: node name
124            sub_topic: topic name to subscribe (joint states)
125            pub_topic: topic name to publish (joint commands)
126            area: joint area (head/arm/waist/leg)
127            dofs: number of DOFs
128        """
129        super().__init__(node_name)
130        self.lock = Lock()
131        self.joint_info = robot_model[area]
132        self.dofs = dofs
133        self.ruckig = ruckig.Ruckig(dofs, 0.002)  # 2 ms control period
134        self.input = ruckig.InputParameter(dofs)
135        self.output = ruckig.OutputParameter(dofs)
136        self.ruckig_initialized = False
137
138        # Initialize trajectory parameters
139        self.input.current_position = [0.0] * dofs
140        self.input.current_velocity = [0.0] * dofs
141        self.input.current_acceleration = [0.0] * dofs
142
143        # Motion limits
144        self.input.max_velocity = [1.0] * dofs
145        self.input.max_acceleration = [1.0] * dofs
146        self.input.max_jerk = [25.0] * dofs
147
148        # ROS2 subscriber and publisher
149        self.sub = self.create_subscription(
150            JointStateArray,
151            sub_topic,
152            self.joint_state_callback,
153            subscriber_qos
154        )
155        self.pub = self.create_publisher(
156            JointCommandArray,
157            pub_topic,
158            publisher_qos
159        )
160
161    def joint_state_callback(self, msg: JointStateArray):
162        """
163        Joint state callback
164        Receives and processes joint state messages
165        """
166        self.ruckig_initialized = True
167
168    def control_callback(self, joint_idx):
169        """
170        Control callback
171        Uses Ruckig for trajectory planning and publishes control commands
172        Args:
173            joint_idx: target joint index
174        """
175        # Run Ruckig until the target is reached
176        while self.ruckig.update(self.input, self.output) in [ruckig.Result.Working, ruckig.Result.Finished]:
177            # Update current state
178            self.input.current_position = self.output.new_position
179            self.input.current_velocity = self.output.new_velocity
180            self.input.current_acceleration = self.output.new_acceleration
181
182            # Check if target is reached
183            tolerance = 1e-6
184            current_p = self.output.new_position[joint_idx]
185            if abs(current_p - self.input.target_position[joint_idx]) < tolerance:
186                break
187
188            # Create and publish command
189            cmd = JointCommandArray()
190            for i, joint in enumerate(self.joint_info):
191                j = JointCommand()
192                j.name = joint.name
193                j.position = self.output.new_position[i]
194                j.velocity = self.output.new_velocity[i]
195                j.effort = 0.0
196                j.stiffness = joint.kp
197                j.damping = joint.kd
198                cmd.joints.append(j)
199
200            self.pub.publish(cmd)
201
202    def set_target_position(self, joint_name, position):
203        """
204        Set target joint position
205        Args:
206            joint_name: joint name
207            position: target position
208        """
209        p_s = [0.0] * self.dofs
210        joint_idx = 0
211        for i, joint in enumerate(self.joint_info):
212            if joint.name == joint_name:
213                p_s[i] = position
214                joint_idx = i
215        self.input.target_position = p_s
216        self.input.target_velocity = [0.0] * self.dofs
217        self.input.target_acceleration = [0.0] * self.dofs
218        self.control_callback(joint_idx)
219
220
221def main(args=None):
222    """
223    Main function
224    Initialize ROS2 node and start joint controller
225    """
226    rclpy.init(args=args)
227
228    # Create leg controller node
229    leg_node = JointControllerNode(
230        "leg_node",
231        "/aima/hal/joint/leg/state",
232        "/aima/hal/joint/leg/command",
233        JointArea.LEG,
234        12
235    )
236
237    # waist_node = JointControllerNode(
238    #     "waist_node",
239    #     "/aima/hal/joint/waist/state",
240    #     "/aima/hal/joint/waist/command",
241    #     JointArea.WAIST,
242    #     3
243    # )
244
245    # arm_node = JointControllerNode(
246    #     "arm_node",
247    #     "/aima/hal/joint/arm/state",
248    #     "/aima/hal/joint/arm/command",
249    #     JointArea.ARM,
250    #     14
251    # )
252
253    # head_node = JointControllerNode(
254    #     "head_node",
255    #     "/aima/hal/joint/head/state",
256    #     "/aima/hal/joint/head/command",
257    #     JointArea.HEAD,
258    #     2
259    # )
260
261    position = 0.8
262
263    # Only control the left leg joint. If you want to control a specific joint, assign it directly.
264    def timer_callback():
265        """
266        Timer callback
267        Periodically change target position to achieve oscillating motion
268        """
269        nonlocal position
270        position = -position
271        position = 1.3 + position
272        leg_node.set_target_position("left_knee_joint", position)
273
274    #     arm_node.set_target_position("left_shoulder_pitch_joint", position)
275    #     waist_node.set_target_position("waist_yaw_joint", position)
276    #     head_node.set_target_position("head_pitch_joint", position)
277
278    leg_node.create_timer(3.0, timer_callback)
279
280    # Multi-threaded executor
281    executor = rclpy.executors.MultiThreadedExecutor()
282    executor.add_node(leg_node)
283
284    # executor.add_node(waist_node)
285    # executor.add_node(arm_node)
286    # executor.add_node(head_node)
287
288    try:
289        executor.spin()
290    except KeyboardInterrupt:
291        pass
292    finally:
293        leg_node.destroy_node()
294        # waist_node.destroy_node()
295        # arm_node.destroy_node()
296        # head_node.destroy_node()
297        if rclpy.ok():
298            rclpy.shutdown()
299
300
301if __name__ == '__main__':
302    main()

6.1.12 Keyboard control

This example implements robot forward/backward and turning control using keyboard input from a PC.

Use W A S D to control robot direction; increase/decrease speed by ±0.2 m/s. Use Q/E to increase/decrease angular speed by ±0.1 rad/s. Press ESC to exit and release terminal resources. Press Space to immediately zero speed (emergency stop).

Caution

Note: Before running this example, use the controller to put the robot into the Stable Standing Mode mode. (For position-control stand / locomotion modes press R2 + X; see the mode transition diagram for other modes.) Then run aima em stop-app rc on the robot’s terminal to stop the remote controller and prevent channel occupation.

You must register an input source before using keyboard control (this example registers one).

  1#!/usr/bin/env python3
  2
  3import rclpy
  4import rclpy.logging
  5from rclpy.node import Node
  6from aimdk_msgs.msg import McLocomotionVelocity, MessageHeader
  7from aimdk_msgs.srv import SetMcInputSource
  8import curses
  9import time
 10
 11
 12class KeyboardVelocityController(Node):
 13    def __init__(self, stdscr):
 14        super().__init__('keyboard_velocity_controller')
 15        self.stdscr = stdscr
 16        self.forward_velocity = 0.0
 17        self.lateral_velocity = 0.0
 18        self.angular_velocity = 0.0
 19        self.step = 0.2
 20        self.angular_step = 0.1
 21
 22        self.publisher = self.create_publisher(
 23            McLocomotionVelocity, '/aima/mc/locomotion/velocity', 10)
 24        self.client = self.create_client(
 25            SetMcInputSource, '/aimdk_5Fmsgs/srv/SetMcInputSource')
 26
 27        if not self.register_input_source():
 28            self.get_logger().error("Input source registration failed, exiting")
 29            raise RuntimeError("Failed to register input source")
 30
 31        # Configure curses
 32        curses.cbreak()
 33        curses.noecho()
 34        self.stdscr.keypad(True)
 35        self.stdscr.nodelay(True)
 36
 37        self.get_logger().info(
 38            "Control started: W/S forward/backward, A/D strafe, Q/E turn, Space stop, Esc exit")
 39
 40        # Timer: check keyboard every 50 ms
 41        self.timer = self.create_timer(0.05, self.check_key_and_publish)
 42
 43    def register_input_source(self):
 44        self.get_logger().info("Registering input source...")
 45
 46        timeout_sec = 8.0
 47        start = self.get_clock().now().nanoseconds / 1e9
 48
 49        while not self.client.wait_for_service(timeout_sec=2.0):
 50            now = self.get_clock().now().nanoseconds / 1e9
 51            if now - start > timeout_sec:
 52                self.get_logger().error("Waiting for service timed out")
 53                return False
 54            self.get_logger().info("Waiting for input source service...")
 55
 56        req = SetMcInputSource.Request()
 57        req.action.value = 1001
 58        req.input_source.name = "node"
 59        req.input_source.priority = 40
 60        req.input_source.timeout = 1000
 61
 62        for i in range(8):
 63            req.request.header.stamp = self.get_clock().now().to_msg()
 64            future = self.client.call_async(req)
 65            rclpy.spin_until_future_complete(self, future, timeout_sec=0.25)
 66
 67            if future.done():
 68                break
 69
 70            # retry as remote peer is NOT handled well by ROS
 71            self.get_logger().info(f"trying to register input source... [{i}]")
 72
 73        if future.done():
 74            try:
 75                resp = future.result()
 76                state = resp.response.state.value
 77                self.get_logger().info(
 78                    f"Input source set successfully: state={state}, task_id={resp.response.task_id}")
 79                return True
 80            except Exception as e:
 81                self.get_logger().error(f"Service exception: {str(e)}")
 82                return False
 83        else:
 84            self.get_logger().error("Service call failed or timed out")
 85            return False
 86
 87    def check_key_and_publish(self):
 88        try:
 89            ch = self.stdscr.getch()
 90        except Exception:
 91            ch = -1
 92
 93        if ch != -1:
 94            if ch == ord(' '):
 95                self.forward_velocity = 0.0
 96                self.lateral_velocity = 0.0
 97                self.angular_velocity = 0.0
 98            elif ch == ord('w'):
 99                self.forward_velocity = min(
100                    self.forward_velocity + self.step, 1.0)
101            elif ch == ord('s'):
102                self.forward_velocity = max(
103                    self.forward_velocity - self.step, -1.0)
104            elif ch == ord('a'):
105                self.lateral_velocity = min(
106                    self.lateral_velocity + self.step, 1.0)
107            elif ch == ord('d'):
108                self.lateral_velocity = max(
109                    self.lateral_velocity - self.step, -1.0)
110            elif ch == ord('q'):
111                self.angular_velocity = min(
112                    self.angular_velocity + self.angular_step, 1.0)
113            elif ch == ord('e'):
114                self.angular_velocity = max(
115                    self.angular_velocity - self.angular_step, -1.0)
116            elif ch == 27:  # ESC
117                self.get_logger().info("Exiting control")
118                self.timer.cancel()
119                return
120
121        msg = McLocomotionVelocity()
122        msg.header = MessageHeader()
123        msg.header.stamp = self.get_clock().now().to_msg()
124        msg.source = "node"
125        msg.forward_velocity = self.forward_velocity
126        msg.lateral_velocity = self.lateral_velocity
127        msg.angular_velocity = self.angular_velocity
128
129        self.publisher.publish(msg)
130
131        # Update UI
132        self.stdscr.clear()
133        self.stdscr.addstr(
134            0, 0, "W/S: Forward/Backward | A/D: Strafe | Q/E: Turn | Space: Stop | ESC: Exit")
135        self.stdscr.addstr(2, 0,
136                           f"Speed Status: Forward: {self.forward_velocity:.2f} m/s | "
137                           f"Lateral: {self.lateral_velocity:.2f} m/s | "
138                           f"Angular: {self.angular_velocity:.2f} rad/s")
139        self.stdscr.refresh()
140
141
142def curses_main(stdscr):
143    rclpy.init()
144    try:
145        node = KeyboardVelocityController(stdscr)
146        while not node.timer.is_canceled():
147            rclpy.spin_once(node, timeout_sec=0.1)
148    except (Exception, KeyboardInterrupt) as e:
149        rclpy.logging.get_logger("main").fatal(
150            f"Program exited with exception: {e}")
151    finally:
152        curses.endwin()
153        if rclpy.ok():
154            rclpy.shutdown()
155
156
157def main():
158    curses.wrapper(curses_main)
159
160
161if __name__ == '__main__':
162    main()

6.1.13 Take photo

This example uses take_photo. Before running the node, set the camera topic to capture. When started, the node creates an /images/ directory and saves the current frame into that directory.

 1#!/usr/bin/env python3
 2import threading
 3import time
 4from pathlib import Path
 5
 6import rclpy
 7import rclpy.task
 8from rclpy.node import Node
 9from rclpy.qos import qos_profile_sensor_data
10from sensor_msgs.msg import Image
11from cv_bridge import CvBridge
12import cv2
13
14
15class SaveOneRawPy(Node):
16    def __init__(self):
17        super().__init__('save_one_image')
18
19        # parameter: image topic
20        self.declare_parameter(
21            'image_topic', '/aima/hal/sensor/stereo_head_front_left/rgb_image'
22        )
23        self.topic = self.get_parameter(
24            'image_topic').get_parameter_value().string_value
25
26        # save directory
27        self.save_dir = Path('images').resolve()
28        self.save_dir.mkdir(parents=True, exist_ok=True)
29
30        self._bridge = CvBridge()
31
32        self._future = rclpy.task.Future()
33        self._lock = threading.Lock()
34
35        # subscriber (sensor QoS)
36        self.sub = self.create_subscription(
37            Image,
38            self.topic,
39            self.image_cb,
40            qos_profile_sensor_data
41        )
42        self.get_logger().info(f'Subscribing to raw image: {self.topic}')
43        self.get_logger().info(f'Images will be saved to: {self.save_dir}')
44
45    def image_cb(self, msg: Image):
46        with self._lock:
47            if self._future.done():
48                return
49
50        try:
51            enc = msg.encoding.lower()
52            self.get_logger().info(f'Received image with encoding: {enc}')
53
54            img = self._bridge.imgmsg_to_cv2(
55                msg, desired_encoding='passthrough')
56
57            if enc == 'rgb8':
58                img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
59            elif enc == 'mono8':
60                img = cv2.cvtColor(img, cv2.COLOR_GRAY2BGR)
61
62            ts_ms = int(time.time() * 1000)
63            out_path = self.save_dir / f'frame_{ts_ms}.png'
64
65            ok = cv2.imwrite(str(out_path), img)
66            if ok:
67                self.get_logger().info(
68                    f'Saved image: {out_path}  ({img.shape[1]}x{img.shape[0]})'
69                )
70                with self._lock:
71                    if not self._future.done():
72                        self._future.set_result(str(out_path))
73            else:
74                self.get_logger().error(f'cv2.imwrite failed: {out_path}')
75        except Exception as e:
76            self.get_logger().error(f'Failed to decode / save image: {e}')
77            with self._lock:
78                if not self._future.done():
79                    self._future.set_exception(e)
80
81
82def main():
83    rclpy.init()
84    node = SaveOneRawPy()
85
86    try:
87        rclpy.spin_until_future_complete(node, node._future)
88    except KeyboardInterrupt:
89        pass
90    finally:
91        node.destroy_node()
92        if rclpy.ok():
93            rclpy.shutdown()
94
95
96if __name__ == '__main__':
97    main()

6.1.14 Camera stream examples

This example collection provides multiple camera subscription and processing demos, supporting depth, stereo, and mono camera streams.
These camera subscription examples are not application-level; they only print basic camera data. If you are familiar with ROS2, you can achieve similar results with ros2 topic echo + ros2 topic hz. You can consult the SDK topic list to jump directly into module development or use these camera examples as scaffolding for your own logic. The published sensor data are raw (no preprocessing like undistortion). For detailed sensor information (e.g., resolution, focal length), check the corresponding camera_info topic.

Depth camera data subscription

This example uses echo_camera_rgbd to subscribe to /aima/hal/sensor/rgbd_head_front/ and receive the robot’s depth camera data. It supports depth images, RGB images, compressed RGB images, and camera intrinsics.

Features:

  • Supports multiple data type subscriptions (depth image, RGB image, compressed image, camera intrinsics)

  • Real-time FPS statistics and data display

  • Supports RGB image video recording(TBD, see C++ examples)

  • Configurable topic type selection

Supported data types:

  • depth_image: Depth image (sensor_msgs/Image)

  • rgb_image: RGB image (sensor_msgs/Image)

  • rgb_image_compressed: Compressed RGB image (sensor_msgs/CompressedImage)

  • camera_info: Camera intrinsic parameters (sensor_msgs/CameraInfo)

  1#!/usr/bin/env python3
  2"""
  3Head depth camera multi-topic subscription example
  4
  5Supports selecting the topic type to subscribe via startup parameter --ros-args -p topic_type:=<type>:
  6  - depth_image: Depth image (sensor_msgs/Image)
  7  - depth_camera_info: Camera intrinsic parameters (sensor_msgs/CameraInfo)
  8  - rgb_image: RGB image (sensor_msgs/Image)
  9  - rgb_image_compressed: RGB compressed image (sensor_msgs/CompressedImage)
 10  - rgb_camera_info: Camera intrinsic parameters (sensor_msgs/CameraInfo)
 11
 12Example:
 13  ros2 run py_examples echo_camera_rgbd --ros-args -p topic_type:=rgb_image
 14  ros2 run py_examples echo_camera_rgbd --ros-args -p topic_type:=rgb_camera_info
 15
 16Default topic_type is rgb_image
 17"""
 18
 19import rclpy
 20from rclpy.node import Node
 21from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy
 22from sensor_msgs.msg import Image, CompressedImage, CameraInfo
 23from collections import deque
 24import time
 25
 26
 27class CameraTopicEcho(Node):
 28    def __init__(self):
 29        super().__init__('camera_topic_echo')
 30
 31        # Select the topic type to subscribe
 32        self.declare_parameter('topic_type', 'rgb_image')
 33        self.declare_parameter('dump_video_path', '')
 34
 35        self.topic_type = self.get_parameter('topic_type').value
 36        self.dump_video_path = self.get_parameter('dump_video_path').value
 37
 38        # SensorDataQoS: BEST_EFFORT + VOLATILE
 39        qos = QoSProfile(
 40            reliability=QoSReliabilityPolicy.BEST_EFFORT,
 41            history=QoSHistoryPolicy.KEEP_LAST,
 42            depth=5
 43        )
 44
 45        # Create different subscribers based on topic_type
 46        if self.topic_type == "depth_image":
 47            self.topic_name = "/aima/hal/sensor/rgbd_head_front/depth_image"
 48            self.sub_image = self.create_subscription(
 49                Image, self.topic_name, self.cb_image, qos)
 50            self.get_logger().info(
 51                f"✅ Subscribing Depth Image: {self.topic_name}")
 52
 53        elif self.topic_type == "rgb_image":
 54            self.topic_name = "/aima/hal/sensor/rgbd_head_front/rgb_image"
 55            self.sub_image = self.create_subscription(
 56                Image, self.topic_name, self.cb_image, qos)
 57            self.get_logger().info(
 58                f"✅ Subscribing RGB Image: {self.topic_name}")
 59            if self.dump_video_path:
 60                self.get_logger().info(
 61                    f"📝 Will dump received images to video: {self.dump_video_path}")
 62
 63        elif self.topic_type == "rgb_image_compressed":
 64            self.topic_name = "/aima/hal/sensor/rgbd_head_front/rgb_image/compressed"
 65            self.sub_compressed = self.create_subscription(
 66                CompressedImage, self.topic_name, self.cb_compressed, qos)
 67            self.get_logger().info(
 68                f"✅ Subscribing CompressedImage: {self.topic_name}")
 69
 70        elif self.topic_type == "rgb_camera_info":
 71            self.topic_name = "/aima/hal/sensor/rgbd_head_front/rgb_camera_info"
 72            # RGB-D CameraInfo is different with other cameras. The best_effort + volatile QoS is enough for 10Hz rgb_camera_info
 73            self.sub_camerainfo = self.create_subscription(
 74                CameraInfo, self.topic_name, self.cb_camerainfo, qos)
 75            self.get_logger().info(
 76                f"✅ Subscribing RGB CameraInfo: {self.topic_name}")
 77
 78        elif self.topic_type == "depth_camera_info":
 79            self.topic_name = "/aima/hal/sensor/rgbd_head_front/depth_camera_info"
 80            # RGB-D CameraInfo is different with other cameras. The best_effort + volatile QoS is enough for 10Hz depth_camera_info
 81            self.sub_camerainfo = self.create_subscription(
 82                CameraInfo, self.topic_name, self.cb_camerainfo, qos)
 83            self.get_logger().info(
 84                f"✅ Subscribing Depth CameraInfo: {self.topic_name}")
 85
 86        else:
 87            self.get_logger().error(f"Unknown topic_type: {self.topic_type}")
 88            raise ValueError("Unknown topic_type")
 89
 90        # Internal state
 91        self.last_print = self.get_clock().now()
 92        self.print_allowed = False
 93        self.arrivals = deque()
 94
 95    def update_arrivals(self):
 96        """Calculate received FPS"""
 97        now = self.get_clock().now()
 98        self.arrivals.append(now)
 99        while self.arrivals and (now - self.arrivals[0]).nanoseconds * 1e-9 > 1.0:
100            self.arrivals.popleft()
101
102    def get_fps(self):
103        """Get FPS"""
104        return len(self.arrivals)
105
106    def should_print(self, master=True):
107        """Control print frequency"""
108        if not master:
109            return self.print_allowed
110        now = self.get_clock().now()
111        if (now - self.last_print).nanoseconds * 1e-9 >= 1.0:
112            self.last_print = now
113            self.print_allowed = True
114        else:
115            self.print_allowed = False
116        return self.print_allowed
117
118    def cb_image(self, msg: Image):
119        """Image callback (Depth/RGB image)"""
120        self.update_arrivals()
121
122        if self.should_print():
123            stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
124            self.get_logger().info(
125                f"📸 {self.topic_type} received\n"
126                f"  • frame_id:        {msg.header.frame_id}\n"
127                f"  • stamp (sec):     {stamp_sec:.6f}\n"
128                f"  • encoding:        {msg.encoding}\n"
129                f"  • size (WxH):      {msg.width} x {msg.height}\n"
130                f"  • step (bytes/row):{msg.step}\n"
131                f"  • is_bigendian:    {msg.is_bigendian}\n"
132                f"  • recv FPS (1s):   {self.get_fps():.1f}"
133            )
134
135        # Only RGB image supports video dump
136        if self.topic_type == "rgb_image" and self.dump_video_path:
137            self.dump_image_to_video(msg)
138
139    def cb_compressed(self, msg: CompressedImage):
140        """CompressedImage callback"""
141        self.update_arrivals()
142
143        if self.should_print():
144            stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
145            self.get_logger().info(
146                f"🗜️  CompressedImage received\n"
147                f"  • frame_id:        {msg.header.frame_id}\n"
148                f"  • stamp (sec):     {stamp_sec:.6f}\n"
149                f"  • format:          {msg.format}\n"
150                f"  • data size:       {len(msg.data)}\n"
151                f"  • recv FPS (1s):   {self.get_fps():.1f}"
152            )
153
154    def cb_camerainfo(self, msg: CameraInfo):
155        """CameraInfo callback (camera intrinsic parameters)"""
156        # Camera info will only receive one frame, print it directly
157        stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
158
159        # Format D array
160        d_str = ", ".join([f"{d:.6f}" for d in msg.d])
161
162        # Format K matrix
163        k_str = ", ".join([f"{k:.6f}" for k in msg.k])
164
165        # Format P matrix
166        p_str = ", ".join([f"{p:.6f}" for p in msg.p])
167
168        self.get_logger().info(
169            f"📷 {self.topic_type} received\n"
170            f"  • frame_id:        {msg.header.frame_id}\n"
171            f"  • stamp (sec):     {stamp_sec:.6f}\n"
172            f"  • width x height:  {msg.width} x {msg.height}\n"
173            f"  • distortion_model:{msg.distortion_model}\n"
174            f"  • D: [{d_str}]\n"
175            f"  • K: [{k_str}]\n"
176            f"  • P: [{p_str}]\n"
177            f"  • binning_x: {msg.binning_x}\n"
178            f"  • binning_y: {msg.binning_y}\n"
179            f"  • roi: {{ x_offset: {msg.roi.x_offset}, y_offset: {msg.roi.y_offset}, height: {msg.roi.height}, width: {msg.roi.width}, do_rectify: {msg.roi.do_rectify} }}"
180        )
181
182    def dump_image_to_video(self, msg: Image):
183        """Video dump is only supported for RGB images"""
184        # You can add video recording functionality here
185        # Simplified in the Python version, only logs instead
186        if self.should_print(master=False):
187            self.get_logger().info(f"📝 Video dump not implemented in Python version")
188
189
190def main(args=None):
191    rclpy.init(args=args)
192    try:
193        node = CameraTopicEcho()
194        rclpy.spin(node)
195    except KeyboardInterrupt:
196        pass
197    except Exception as e:
198        print(f"Error: {e}")
199    finally:
200        if 'node' in locals():
201            node.destroy_node()
202        if rclpy.ok():
203            rclpy.shutdown()
204
205
206if __name__ == '__main__':
207    main()

Usage:

  1. Subscribe to RGB image data:

    ros2 run py_examples echo_camera_rgbd --ros-args -p topic_type:=rgb_image
    
  2. Subscribe to camera intrinsics:

    ros2 run py_examples echo_camera_rgbd --ros-args -p topic_type:=rgb_camera_info
    ros2 run py_examples echo_camera_rgbd --ros-args -p topic_type:=depth_camera_info
    
  3. Record RGB video:

    # You can change dump_video_path to another path; ensure the directory exists before saving
    ros2 run py_examples echo_camera_rgbd --ros-args -p topic_type:=rgb_image -p dump_video_path:=$PWD/output.avi
    

Stereo camera data subscription

This example uses echo_camera_stereo to subscribe to /aima/hal/sensor/stereo_head_front_*/ and receive stereo camera data from the robot, supporting left/right RGB images, compressed images, and camera intrinsics.

Features:

  • Supports independent left/right camera subscriptions

  • Real-time FPS statistics and data display

  • Supports RGB image video recording(TBD, see C++ examples)

  • Configurable camera selection (left/right)

Supported data types:

  • left_rgb_image: Left camera RGB image (sensor_msgs/Image)

  • left_rgb_image_compressed: Left camera compressed RGB image (sensor_msgs/CompressedImage)

  • left_camera_info: Left camera intrinsics (sensor_msgs/CameraInfo)

  • right_rgb_image: Right camera RGB image (sensor_msgs/Image)

  • right_rgb_image_compressed: Right camera compressed RGB image (sensor_msgs/CompressedImage)

  • right_camera_info: Right camera intrinsics (sensor_msgs/CameraInfo)

  1#!/usr/bin/env python3
  2"""
  3Head stereo camera multi-topic subscription example
  4
  5Supports selecting the topic type to subscribe via startup parameter --ros-args -p topic_type:=<type>:
  6  - left_rgb_image: Left camera RGB image (sensor_msgs/Image)
  7  - left_rgb_image_compressed: Left camera RGB compressed image (sensor_msgs/CompressedImage)
  8  - left_camera_info: Left camera intrinsic parameters (sensor_msgs/CameraInfo)
  9  - right_rgb_image: Right camera RGB image (sensor_msgs/Image)
 10  - right_rgb_image_compressed: Right camera RGB compressed image (sensor_msgs/CompressedImage)
 11  - right_camera_info: Right camera intrinsic parameters (sensor_msgs/CameraInfo)
 12
 13Example:
 14  ros2 run py_examples echo_camera_stereo --ros-args -p topic_type:=left_rgb_image
 15  ros2 run py_examples echo_camera_stereo --ros-args -p topic_type:=right_rgb_image
 16  ros2 run py_examples echo_camera_stereo --ros-args -p topic_type:=left_camera_info
 17
 18Default topic_type is left_rgb_image
 19"""
 20
 21import rclpy
 22from rclpy.node import Node
 23from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSDurabilityPolicy, QoSHistoryPolicy
 24from sensor_msgs.msg import Image, CompressedImage, CameraInfo
 25from collections import deque
 26import time
 27
 28
 29class StereoCameraTopicEcho(Node):
 30    def __init__(self):
 31        super().__init__('stereo_camera_topic_echo')
 32
 33        # Select the topic type to subscribe
 34        self.declare_parameter('topic_type', 'left_rgb_image')
 35        self.declare_parameter('dump_video_path', '')
 36
 37        self.topic_type = self.get_parameter('topic_type').value
 38        self.dump_video_path = self.get_parameter('dump_video_path').value
 39
 40        # Set QoS parameters - use sensor data QoS
 41        qos = QoSProfile(
 42            reliability=QoSReliabilityPolicy.BEST_EFFORT,
 43            history=QoSHistoryPolicy.KEEP_LAST,
 44            depth=5,
 45            durability=QoSDurabilityPolicy.VOLATILE
 46        )
 47
 48        # Create different subscribers based on topic_type
 49        if self.topic_type == "left_rgb_image":
 50            self.topic_name = "/aima/hal/sensor/stereo_head_front_left/rgb_image"
 51            self.sub_image = self.create_subscription(
 52                Image, self.topic_name, self.cb_image, qos)
 53            self.get_logger().info(
 54                f"✅ Subscribing Left RGB Image: {self.topic_name}")
 55            if self.dump_video_path:
 56                self.get_logger().info(
 57                    f"📝 Will dump received images to video: {self.dump_video_path}")
 58
 59        elif self.topic_type == "left_rgb_image_compressed":
 60            self.topic_name = "/aima/hal/sensor/stereo_head_front_left/rgb_image/compressed"
 61            self.sub_compressed = self.create_subscription(
 62                CompressedImage, self.topic_name, self.cb_compressed, qos)
 63            self.get_logger().info(
 64                f"✅ Subscribing Left CompressedImage: {self.topic_name}")
 65
 66        elif self.topic_type == "left_camera_info":
 67            self.topic_name = "/aima/hal/sensor/stereo_head_front_left/camera_info"
 68            # CameraInfo subscription must use reliable + transient_local QoS to receive historical messages (even if only one frame is published)
 69            camera_qos = QoSProfile(
 70                reliability=QoSReliabilityPolicy.RELIABLE,
 71                history=QoSHistoryPolicy.KEEP_LAST,
 72                depth=1,
 73                durability=QoSDurabilityPolicy.TRANSIENT_LOCAL
 74            )
 75            self.sub_camerainfo = self.create_subscription(
 76                CameraInfo, self.topic_name, self.cb_camerainfo, camera_qos)
 77            self.get_logger().info(
 78                f"✅ Subscribing Left CameraInfo (with transient_local): {self.topic_name}")
 79
 80        elif self.topic_type == "right_rgb_image":
 81            self.topic_name = "/aima/hal/sensor/stereo_head_front_right/rgb_image"
 82            self.sub_image = self.create_subscription(
 83                Image, self.topic_name, self.cb_image, qos)
 84            self.get_logger().info(
 85                f"✅ Subscribing Right RGB Image: {self.topic_name}")
 86            if self.dump_video_path:
 87                self.get_logger().info(
 88                    f"📝 Will dump received images to video: {self.dump_video_path}")
 89
 90        elif self.topic_type == "right_rgb_image_compressed":
 91            self.topic_name = "/aima/hal/sensor/stereo_head_front_right/rgb_image/compressed"
 92            self.sub_compressed = self.create_subscription(
 93                CompressedImage, self.topic_name, self.cb_compressed, qos)
 94            self.get_logger().info(
 95                f"✅ Subscribing Right CompressedImage: {self.topic_name}")
 96
 97        elif self.topic_type == "right_camera_info":
 98            self.topic_name = "/aima/hal/sensor/stereo_head_front_right/camera_info"
 99            # CameraInfo subscription must use reliable + transient_local QoS to receive historical messages (even if only one frame is published)
100            camera_qos = QoSProfile(
101                reliability=QoSReliabilityPolicy.RELIABLE,
102                history=QoSHistoryPolicy.KEEP_LAST,
103                depth=1,
104                durability=QoSDurabilityPolicy.TRANSIENT_LOCAL
105            )
106            self.sub_camerainfo = self.create_subscription(
107                CameraInfo, self.topic_name, self.cb_camerainfo, camera_qos)
108            self.get_logger().info(
109                f"✅ Subscribing Right CameraInfo (with transient_local): {self.topic_name}")
110
111        else:
112            self.get_logger().error(f"Unknown topic_type: {self.topic_type}")
113            raise ValueError("Unknown topic_type")
114
115        # Internal state
116        self.last_print = self.get_clock().now()
117        self.print_allowed = False
118        self.arrivals = deque()
119
120    def update_arrivals(self):
121        """Calculate received FPS"""
122        now = self.get_clock().now()
123        self.arrivals.append(now)
124        while self.arrivals and (now - self.arrivals[0]).nanoseconds * 1e-9 > 1.0:
125            self.arrivals.popleft()
126
127    def get_fps(self):
128        """Get FPS"""
129        return len(self.arrivals)
130
131    def should_print(self, master=True):
132        """Control print frequency"""
133        if not master:
134            return self.print_allowed
135        now = self.get_clock().now()
136        if (now - self.last_print).nanoseconds * 1e-9 >= 1.0:
137            self.last_print = now
138            self.print_allowed = True
139        else:
140            self.print_allowed = False
141        return self.print_allowed
142
143    def cb_image(self, msg: Image):
144        """Image callback (Left/Right camera RGB image)"""
145        self.update_arrivals()
146
147        if self.should_print():
148            stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
149            self.get_logger().info(
150                f"📸 {self.topic_type} received\n"
151                f"  • frame_id:        {msg.header.frame_id}\n"
152                f"  • stamp (sec):     {stamp_sec:.6f}\n"
153                f"  • encoding:        {msg.encoding}\n"
154                f"  • size (WxH):      {msg.width} x {msg.height}\n"
155                f"  • step (bytes/row):{msg.step}\n"
156                f"  • is_bigendian:    {msg.is_bigendian}\n"
157                f"  • recv FPS (1s):   {self.get_fps():.1f}"
158            )
159
160        # Only RGB images support video dump
161        if (self.topic_type in ["left_rgb_image", "right_rgb_image"]) and self.dump_video_path:
162            self.dump_image_to_video(msg)
163
164    def cb_compressed(self, msg: CompressedImage):
165        """CompressedImage callback (Left/Right camera RGB compressed image)"""
166        self.update_arrivals()
167
168        if self.should_print():
169            stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
170            self.get_logger().info(
171                f"🗜️  {self.topic_type} received\n"
172                f"  • frame_id:        {msg.header.frame_id}\n"
173                f"  • stamp (sec):     {stamp_sec:.6f}\n"
174                f"  • format:          {msg.format}\n"
175                f"  • data size:       {len(msg.data)}\n"
176                f"  • recv FPS (1s):   {self.get_fps():.1f}"
177            )
178
179    def cb_camerainfo(self, msg: CameraInfo):
180        """CameraInfo callback (Left/Right camera intrinsic parameters)"""
181        # Camera info will only receive one frame, print it directly
182        stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
183
184        # Format D array
185        d_str = ", ".join([f"{d:.6f}" for d in msg.d])
186
187        # Format K matrix
188        k_str = ", ".join([f"{k:.6f}" for k in msg.k])
189
190        # Format P matrix
191        p_str = ", ".join([f"{p:.6f}" for p in msg.p])
192
193        self.get_logger().info(
194            f"📷 {self.topic_type} received\n"
195            f"  • frame_id:        {msg.header.frame_id}\n"
196            f"  • stamp (sec):     {stamp_sec:.6f}\n"
197            f"  • width x height:  {msg.width} x {msg.height}\n"
198            f"  • distortion_model:{msg.distortion_model}\n"
199            f"  • D: [{d_str}]\n"
200            f"  • K: [{k_str}]\n"
201            f"  • P: [{p_str}]\n"
202            f"  • binning_x: {msg.binning_x}\n"
203            f"  • binning_y: {msg.binning_y}\n"
204            f"  • roi: {{ x_offset: {msg.roi.x_offset}, y_offset: {msg.roi.y_offset}, height: {msg.roi.height}, width: {msg.roi.width}, do_rectify: {msg.roi.do_rectify} }}"
205        )
206
207    def dump_image_to_video(self, msg: Image):
208        """Video dump is only supported for RGB images"""
209        # You can add video recording functionality here
210        # Simplified in the Python version, only logs instead
211        if self.should_print(master=False):
212            self.get_logger().info(f"📝 Video dump not implemented in Python version")
213
214
215def main(args=None):
216    rclpy.init(args=args)
217    try:
218        node = StereoCameraTopicEcho()
219        rclpy.spin(node)
220    except KeyboardInterrupt:
221        pass
222    except Exception as e:
223        print(f"Error: {e}")
224    finally:
225        if 'node' in locals():
226            node.destroy_node()
227        if rclpy.ok():
228            rclpy.shutdown()
229
230
231if __name__ == '__main__':
232    main()

Usage:

  1. Subscribe to left camera RGB image:

    ros2 run py_examples echo_camera_stereo --ros-args -p topic_type:=left_rgb_image
    
  2. Subscribe to right camera RGB image:

    ros2 run py_examples echo_camera_stereo --ros-args -p topic_type:=right_rgb_image
    
  3. Subscribe to left camera intrinsics:

    ros2 run py_examples echo_camera_stereo --ros-args -p topic_type:=left_camera_info
    
  4. Record left camera video:

    # You can change dump_video_path to another path; ensure the directory exists before saving
    ros2 run py_examples echo_camera_stereo --ros-args -p topic_type:=left_rgb_image -p dump_video_path:=$PWD/left_camera.avi
    

Head rear monocular camera data subscription

This example uses echo_camera_head_rear to subscribe to /aima/hal/sensor/rgb_head_rear/ and receive the robot’s head rear monocular camera data, supporting RGB images, compressed images, and camera intrinsics.

Features:

  • Supports head rear camera data subscription

  • Real-time FPS statistics and data display

  • Supports RGB image video recording with/without obstructed area masked (TBD, see C++ examples)

  • Configurable topic type selection

Supported data types:

  • rgb_image: RGB image (sensor_msgs/Image)

  • rgb_image_compressed: Compressed RGB image (sensor_msgs/CompressedImage)

  • camera_info: Camera intrinsic parameters (sensor_msgs/CameraInfo)

  1#!/usr/bin/env python3
  2"""
  3Head rear monocular camera multi-topic subscription example
  4
  5Supports selecting the topic type to subscribe via startup parameter --ros-args -p topic_type:=<type>:
  6  - rgb_image: RGB image (sensor_msgs/Image)
  7  - rgb_image_compressed: RGB compressed image (sensor_msgs/CompressedImage)
  8  - camera_info: Camera intrinsic parameters (sensor_msgs/CameraInfo)
  9
 10Example:
 11  ros2 run py_examples echo_camera_head_rear --ros-args -p topic_type:=rgb_image
 12  ros2 run py_examples echo_camera_head_rear --ros-args -p topic_type:=rgb_image_compressed
 13  ros2 run py_examples echo_camera_head_rear --ros-args -p topic_type:=camera_info
 14
 15Default topic_type is rgb_image
 16"""
 17
 18import rclpy
 19from rclpy.node import Node
 20from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSDurabilityPolicy, QoSHistoryPolicy
 21from sensor_msgs.msg import Image, CompressedImage, CameraInfo
 22from collections import deque
 23import time
 24import os
 25import cv2
 26
 27
 28class HeadRearCameraTopicEcho(Node):
 29    def __init__(self):
 30        super().__init__('head_rear_camera_topic_echo')
 31
 32        # Select the topic type to subscribe
 33        self.declare_parameter('topic_type', 'rgb_image')
 34        self.declare_parameter('dump_video_path', '')
 35        self.declare_parameter('with_mask', False)
 36
 37        self.topic_type = self.get_parameter('topic_type').value
 38        self.dump_video_path = self.get_parameter('dump_video_path').value
 39        self.with_mask = self.get_parameter('with_mask').value
 40        self.mask_image = None
 41
 42        # Set QoS parameters - use sensor data QoS
 43        qos = QoSProfile(
 44            reliability=QoSReliabilityPolicy.BEST_EFFORT,
 45            history=QoSHistoryPolicy.KEEP_LAST,
 46            depth=5,
 47            durability=QoSDurabilityPolicy.VOLATILE
 48        )
 49
 50        if self.with_mask and self.dump_video_path:
 51            mask_path = os.path.join(os.path.dirname(
 52                __file__), 'data', 'rgb_head_rear_mask.png')
 53            self.mask_image = cv2.imread(mask_path, cv2.IMREAD_GRAYSCALE)
 54            if self.mask_image is None:
 55                self.get_logger().error(
 56                    f"Failed to load mask file: {mask_path}")
 57                raise ValueError("Failed to load mask file")
 58
 59        # Create different subscribers based on topic_type
 60        if self.topic_type == "rgb_image":
 61            self.topic_name = "/aima/hal/sensor/rgb_head_rear/rgb_image"
 62            self.sub_image = self.create_subscription(
 63                Image, self.topic_name, self.cb_image, qos)
 64            self.get_logger().info(
 65                f"✅ Subscribing RGB Image: {self.topic_name}")
 66            if self.dump_video_path:
 67                mask_state = "with mask" if self.with_mask else "without mask"
 68                self.get_logger().info(
 69                    f"📝 Will dump received images {mask_state} to video: {self.dump_video_path}")
 70
 71        elif self.topic_type == "rgb_image_compressed":
 72            self.topic_name = "/aima/hal/sensor/rgb_head_rear/rgb_image/compressed"
 73            self.sub_compressed = self.create_subscription(
 74                CompressedImage, self.topic_name, self.cb_compressed, qos)
 75            self.get_logger().info(
 76                f"✅ Subscribing CompressedImage: {self.topic_name}")
 77
 78        elif self.topic_type == "camera_info":
 79            self.topic_name = "/aima/hal/sensor/rgb_head_rear/camera_info"
 80            # CameraInfo subscription must use reliable + transient_local QoS to receive historical messages (even if only one frame is published)
 81            camera_qos = QoSProfile(
 82                reliability=QoSReliabilityPolicy.RELIABLE,
 83                history=QoSHistoryPolicy.KEEP_LAST,
 84                depth=1,
 85                durability=QoSDurabilityPolicy.TRANSIENT_LOCAL
 86            )
 87            self.sub_camerainfo = self.create_subscription(
 88                CameraInfo, self.topic_name, self.cb_camerainfo, camera_qos)
 89            self.get_logger().info(
 90                f"✅ Subscribing CameraInfo (with transient_local): {self.topic_name}")
 91
 92        else:
 93            self.get_logger().error(f"Unknown topic_type: {self.topic_type}")
 94            raise ValueError("Unknown topic_type")
 95
 96        # Internal state
 97        self.last_print = self.get_clock().now()
 98        self.print_allowed = False
 99        self.arrivals = deque()
100
101    def update_arrivals(self):
102        """Calculate received FPS"""
103        now = self.get_clock().now()
104        self.arrivals.append(now)
105        while self.arrivals and (now - self.arrivals[0]).nanoseconds * 1e-9 > 1.0:
106            self.arrivals.popleft()
107
108    def get_fps(self):
109        """Get FPS"""
110        return len(self.arrivals)
111
112    def should_print(self, master=True):
113        """Control print frequency"""
114        if not master:
115            return self.print_allowed
116        now = self.get_clock().now()
117        if (now - self.last_print).nanoseconds * 1e-9 >= 1.0:
118            self.last_print = now
119            self.print_allowed = True
120        else:
121            self.print_allowed = False
122        return self.print_allowed
123
124    def cb_image(self, msg: Image):
125        """Image callback (RGB image)"""
126        self.update_arrivals()
127
128        if self.should_print():
129            stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
130            self.get_logger().info(
131                f"📸 {self.topic_type} received\n"
132                f"  • frame_id:        {msg.header.frame_id}\n"
133                f"  • stamp (sec):     {stamp_sec:.6f}\n"
134                f"  • encoding:        {msg.encoding}\n"
135                f"  • size (WxH):      {msg.width} x {msg.height}\n"
136                f"  • step (bytes/row):{msg.step}\n"
137                f"  • is_bigendian:    {msg.is_bigendian}\n"
138                f"  • recv FPS (1s):   {self.get_fps():.1f}"
139            )
140
141        # Only RGB image supports video dump
142        if self.topic_type == "rgb_image" and self.dump_video_path:
143            self.dump_image_to_video(msg)
144
145    def cb_compressed(self, msg: CompressedImage):
146        """CompressedImage callback (RGB compressed image)"""
147        self.update_arrivals()
148
149        if self.should_print():
150            stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
151            self.get_logger().info(
152                f"🗜️  {self.topic_type} received\n"
153                f"  • frame_id:        {msg.header.frame_id}\n"
154                f"  • stamp (sec):     {stamp_sec:.6f}\n"
155                f"  • format:          {msg.format}\n"
156                f"  • data size:       {len(msg.data)}\n"
157                f"  • recv FPS (1s):   {self.get_fps():.1f}"
158            )
159
160    def cb_camerainfo(self, msg: CameraInfo):
161        """CameraInfo callback (camera intrinsic parameters)"""
162        # Camera info will only receive one frame, print it directly
163        stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
164
165        # Format D array
166        d_str = ", ".join([f"{d:.6f}" for d in msg.d])
167
168        # Format K matrix
169        k_str = ", ".join([f"{k:.6f}" for k in msg.k])
170
171        # Format P matrix
172        p_str = ", ".join([f"{p:.6f}" for p in msg.p])
173
174        self.get_logger().info(
175            f"📷 {self.topic_type} received\n"
176            f"  • frame_id:        {msg.header.frame_id}\n"
177            f"  • stamp (sec):     {stamp_sec:.6f}\n"
178            f"  • width x height:  {msg.width} x {msg.height}\n"
179            f"  • distortion_model:{msg.distortion_model}\n"
180            f"  • D: [{d_str}]\n"
181            f"  • K: [{k_str}]\n"
182            f"  • P: [{p_str}]\n"
183            f"  • binning_x: {msg.binning_x}\n"
184            f"  • binning_y: {msg.binning_y}\n"
185            f"  • roi: {{ x_offset: {msg.roi.x_offset}, y_offset: {msg.roi.y_offset}, height: {msg.roi.height}, width: {msg.roi.width}, do_rectify: {msg.roi.do_rectify} }}"
186        )
187
188    def dump_image_to_video(self, msg: Image):
189        """Video dump is only supported for RGB images"""
190        # You can add video recording functionality here
191        # Simplified in the Python version, only logs instead
192        # Note: Refer to cpp implementation, get cv images by cv_bridge first,
193        # then you can use 'image[self.mask_image == 0] = 0' to mask them and
194        # finally use VideoWriter to save them as video
195        if self.should_print(master=False):
196            self.get_logger().info(f"📝 Video dump not implemented in Python version")
197
198
199def main(args=None):
200    rclpy.init(args=args)
201    try:
202        node = HeadRearCameraTopicEcho()
203        rclpy.spin(node)
204    except KeyboardInterrupt:
205        pass
206    except Exception as e:
207        print(f"Error: {e}")
208    finally:
209        if 'node' in locals():
210            node.destroy_node()
211        if rclpy.ok():
212            rclpy.shutdown()
213
214
215if __name__ == '__main__':
216    main()

Usage:

  1. Subscribe to RGB image data:

    ros2 run py_examples echo_camera_head_rear --ros-args -p topic_type:=rgb_image
    
  2. Subscribe to compressed image data:

    ros2 run py_examples echo_camera_head_rear --ros-args -p topic_type:=rgb_image_compressed
    
  3. Subscribe to camera intrinsics:

    ros2 run py_examples echo_camera_head_rear --ros-args -p topic_type:=camera_info
    
  4. Record video:

    # You can change dump_video_path to another path; ensure the directory exists before saving
    ros2 run py_examples echo_camera_head_rear --ros-args -p topic_type:=rgb_image -p dump_video_path:=$PWD/rear_camera.avi
    

Use cases:

  • Face recognition and tracking

  • Object detection and recognition

  • Visual SLAM

  • Image processing and computer vision algorithm development

  • Robot visual navigation

6.1.15 Head touch sensor data subscription

This example uses echo_head_touch_sensor, which subscribes to the /aima/hal/sensor/touch_head topic to receive the robot’s touch sensor data on the head.

Features:

  • The event data would change from “IDLE” to “TOUCH” when robot’s head touched

 1#!/usr/bin/env python3
 2"""
 3Head touch state subscription example
 4"""
 5
 6import rclpy
 7from rclpy.node import Node
 8from aimdk_msgs.msg import TouchState
 9
10
11class TouchStateSubscriber(Node):
12    def __init__(self):
13        super().__init__('touch_state_subscriber')
14
15        # touch event types
16        self.event_type_map = {
17            TouchState.UNKNOWN: "UNKNOWN",
18            TouchState.IDLE: "IDLE",
19            TouchState.TOUCH: "TOUCH",
20            TouchState.SLIDE: "SLIDE",
21            TouchState.PAT_ONCE: "PAT_ONCE",
22            TouchState.PAT_TWICE: "PAT_TWICE",
23            TouchState.PAT_TRIPLE: "PAT_TRIPLE"
24        }
25
26        # create subscriber
27        self.subscription = self.create_subscription(
28            TouchState,
29            '/aima/hal/sensor/touch_head',
30            self.touch_callback,
31            10
32        )
33
34        self.get_logger().info(
35            'TouchState subscriber started, listening to /aima/hal/sensor/touch_head')
36
37    def touch_callback(self, msg):
38        event_str = self.event_type_map.get(
39            msg.event_type, f"INVALID({msg.event_type})")
40
41        self.get_logger().info(f'Timestamp: {msg.header.stamp.sec}.{msg.header.stamp.nanosec:09d}, '
42                               f'Event: {event_str} ({msg.event_type})')
43
44
45def main(args=None):
46    rclpy.init(args=args)
47    node = TouchStateSubscriber()
48    try:
49        rclpy.spin(node)
50    except KeyboardInterrupt:
51        pass
52    finally:
53        node.destroy_node()
54        if rclpy.ok():
55            rclpy.shutdown()
56
57
58if __name__ == '__main__':
59    main()

Usage:

ros2 run py_examples echo_head_touch_sensor

Example output:

[INFO] [1769420383.315173538] [touch_state_subscriber]: Timestamp: 1769420394.129927670, Event: IDLE (1)
[INFO] [1769420383.324978563] [touch_state_subscriber]: Timestamp: 1769420394.139941215, Event: IDLE (1)
[INFO] [1769420383.335265681] [touch_state_subscriber]: Timestamp: 1769420394.149990634, Event: TOUCH (2)
[INFO] [1769420383.344826732] [touch_state_subscriber]: Timestamp: 1769420394.159926892, Event: TOUCH (2)

6.1.16 LiDAR data subscription

This example uses echo_lidar_data, which subscribes to the /aima/hal/sensor/lidar_chest_front/ topic to receive the robot’s LiDAR data, supporting both point cloud and IMU data types.

Features:

  • Supports LiDAR point cloud data subscription

  • Supports LiDAR IMU data subscription

  • Real-time FPS statistics and data display

  • Configurable topic type selection

  • Detailed data field information output

Supported data types:

  • PointCloud2: LiDAR point cloud data (sensor_msgs/PointCloud2)

  • Imu: LiDAR IMU data (sensor_msgs/Imu)

Technical implementation:

  • Uses SensorDataQoS configuration (BEST_EFFORT + VOLATILE)

  • Supports point cloud field parsing and visualization

  • Supports IMU quaternion, angular velocity, and linear acceleration data

  • Provides detailed debugging log output

Use cases:

  • LiDAR data acquisition and analysis

  • Point cloud data processing and visualization

  • Robot navigation and localization

  • SLAM algorithm development

  • Environmental perception and mapping

  1#!/usr/bin/env python3
  2"""
  3Chest LiDAR data subscription example
  4
  5Supports subscribing to the following topics:
  6  1. /aima/hal/sensor/lidar_chest_front/lidar_pointcloud
  7     - Data type: sensor_msgs/PointCloud2
  8     - frame_id: lidar_chest_front
  9     - child_frame_id: /
 10     - Content: LiDAR point cloud data
 11  2. /aima/hal/sensor/lidar_chest_front/imu
 12     - Data type: sensor_msgs/Imu
 13     - frame_id: lidar_imu_chest_front
 14     - Content: LiDAR IMU data
 15
 16You can select the topic type to subscribe via startup parameter --ros-args -p topic_type:=<type>:
 17  - pointcloud: subscribe to LiDAR point cloud
 18  - imu: subscribe to LiDAR IMU
 19Default topic_type is pointcloud
 20
 21Examples:
 22  ros2 run py_examples echo_lidar_data --ros-args -p topic_type:=pointcloud
 23  ros2 run py_examples echo_lidar_data --ros-args -p topic_type:=imu
 24"""
 25
 26import rclpy
 27from rclpy.node import Node
 28from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy
 29from sensor_msgs.msg import PointCloud2, Imu
 30from collections import deque
 31import time
 32
 33
 34class LidarChestEcho(Node):
 35    def __init__(self):
 36        super().__init__('lidar_chest_echo')
 37
 38        # Select the topic type to subscribe
 39        self.declare_parameter('topic_type', 'pointcloud')
 40        self.topic_type = self.get_parameter('topic_type').value
 41
 42        # SensorDataQoS: BEST_EFFORT + VOLATILE
 43        qos = QoSProfile(
 44            reliability=QoSReliabilityPolicy.BEST_EFFORT,
 45            history=QoSHistoryPolicy.KEEP_LAST,
 46            depth=5
 47        )
 48
 49        # Create different subscribers based on topic_type
 50        if self.topic_type == "pointcloud":
 51            self.topic_name = "/aima/hal/sensor/lidar_chest_front/lidar_pointcloud"
 52            self.sub_pointcloud = self.create_subscription(
 53                PointCloud2, self.topic_name, self.cb_pointcloud, qos)
 54            self.get_logger().info(
 55                f"✅ Subscribing LIDAR PointCloud2: {self.topic_name}")
 56
 57        elif self.topic_type == "imu":
 58            self.topic_name = "/aima/hal/sensor/lidar_chest_front/imu"
 59            self.sub_imu = self.create_subscription(
 60                Imu, self.topic_name, self.cb_imu, qos)
 61            self.get_logger().info(
 62                f"✅ Subscribing LIDAR IMU: {self.topic_name}")
 63
 64        else:
 65            self.get_logger().error(f"Unknown topic_type: {self.topic_type}")
 66            raise ValueError("Unknown topic_type")
 67
 68        # Internal state
 69        self.last_print = self.get_clock().now()
 70        self.arrivals = deque()
 71
 72    def update_arrivals(self):
 73        """Calculate received FPS"""
 74        now = self.get_clock().now()
 75        self.arrivals.append(now)
 76        while self.arrivals and (now - self.arrivals[0]).nanoseconds * 1e-9 > 1.0:
 77            self.arrivals.popleft()
 78
 79    def get_fps(self):
 80        """Get FPS"""
 81        return len(self.arrivals)
 82
 83    def should_print(self):
 84        """Control print frequency"""
 85        now = self.get_clock().now()
 86        if (now - self.last_print).nanoseconds * 1e-9 >= 1.0:
 87            self.last_print = now
 88            return True
 89        return False
 90
 91    def cb_pointcloud(self, msg: PointCloud2):
 92        """PointCloud2 callback (LiDAR point cloud)"""
 93        self.update_arrivals()
 94
 95        if self.should_print():
 96            stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
 97
 98            # Format fields info
 99            fields_str = " ".join(
100                [f"{f.name}({f.datatype})" for f in msg.fields])
101
102            self.get_logger().info(
103                f"🟢 LIDAR PointCloud2 received\n"
104                f"  • frame_id:        {msg.header.frame_id}\n"
105                f"  • stamp (sec):     {stamp_sec:.6f}\n"
106                f"  • width x height:  {msg.width} x {msg.height}\n"
107                f"  • point_step:      {msg.point_step}\n"
108                f"  • row_step:        {msg.row_step}\n"
109                f"  • fields:          {fields_str}\n"
110                f"  • is_bigendian:    {msg.is_bigendian}\n"
111                f"  • is_dense:        {msg.is_dense}\n"
112                f"  • data size:       {len(msg.data)}\n"
113                f"  • recv FPS (1s):   {self.get_fps():1.1f}"
114            )
115
116    def cb_imu(self, msg: Imu):
117        """IMU callback (LiDAR IMU)"""
118        self.update_arrivals()
119
120        if self.should_print():
121            stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
122
123            self.get_logger().info(
124                f"🟢 LIDAR IMU received\n"
125                f"  • frame_id:        {msg.header.frame_id}\n"
126                f"  • stamp (sec):     {stamp_sec:.6f}\n"
127                f"  • orientation:     [{msg.orientation.x:.6f}, {msg.orientation.y:.6f}, {msg.orientation.z:.6f}, {msg.orientation.w:.6f}]\n"
128                f"  • angular_velocity:[{msg.angular_velocity.x:.6f}, {msg.angular_velocity.y:.6f}, {msg.angular_velocity.z:.6f}]\n"
129                f"  • linear_accel:    [{msg.linear_acceleration.x:.6f}, {msg.linear_acceleration.y:.6f}, {msg.linear_acceleration.z:.6f}]\n"
130                f"  • recv FPS (1s):   {self.get_fps():.1f}"
131            )
132
133
134def main(args=None):
135    rclpy.init(args=args)
136    try:
137        node = LidarChestEcho()
138        rclpy.spin(node)
139    except KeyboardInterrupt:
140        pass
141    except Exception as e:
142        print(f"Error: {e}")
143    finally:
144        if 'node' in locals():
145            node.destroy_node()
146        if rclpy.ok():
147            rclpy.shutdown()
148
149
150if __name__ == '__main__':
151    main()

Usage:

# Subscribe to LiDAR point cloud data
ros2 run py_examples echo_lidar_data --ros-args -p topic_type:=pointcloud

# Subscribe to LiDAR IMU data
ros2 run py_examples echo_lidar_data --ros-args -p topic_type:=imu

Example output:

[INFO] [lidar_chest_echo]: ✅ Subscribing LIDAR PointCloud2: /aima/hal/sensor/lidar_chest_front/lidar_pointcloud
[INFO] [lidar_chest_echo]: 🟢 LIDAR PointCloud2 received
  • frame_id:        lidar_chest_front
  • stamp (sec):     1234567890.123456
  • width x height:  1 x 36000
  • point_step:      16
  • row_step:        16
  • fields:          x(7) y(7) z(7) intensity(7)
  • is_bigendian:    False
  • is_dense:        True
  • data size:       576000
  • recv FPS (1s):   10.0

6.1.17 Play video

This example uses play_video. Before running the node, you must upload the video to the robot’s Interaction Computing Unit (PC3) (you may create a directory on it to store videos, e.g. /var/tmp/videos/), and then change the video_path in the node program to the path of the video you want to play.

Attention

⚠️ Attention! The Interaction Computing Unit (PC3) is independent from the Development Computing Unit (PC2) where secondary development programs run. Audio and video files must be stored on the Interaction Computing Unit (IP: 10.0.1.42).
Audio and video files (and all parent directories up to root) must be readable by all users(new subdirectory under /var/tmp/ is recommended)

Attention

Before using this feature, run the following command on the Motion Control Computing Unit (PC1) to stop the task_manager module and prevent it from competing with your program for display control:

aima em stop-app task_manager

Function description: By calling the PlayVideo service, the robot can play a video file from a specified path on its screen. Ensure the video file has been uploaded to the Interaction Computing Unit, otherwise playback will fail.

💡 Tip: How to prepare a video file

You can use the C++ echo_camera_rgbd example to record the robot’s depth camera RGB stream as an .mp4 video file, then use it as input for play_video:

Step 1: Record a video on the Development Computing Unit (PC2) (using the C++ example)

ros2 run examples echo_camera_rgbd --ros-args -p topic_type:=rgb_image -p dump_video_path:=$PWD/output.mp4
# Press Ctrl+C to stop recording; the video is saved as output.mp4

Step 2: Transfer the video to the Interaction Computing Unit (PC3, IP: 10.0.1.42)

scp output.mp4 user@10.0.1.42:/var/tmp/videos/output.mp4

Step 3: Run play_video to play the video

ros2 run py_examples play_video  # Set video_path in the code to /var/tmp/videos/output.mp4
 1#!/usr/bin/env python3
 2
 3import rclpy
 4import rclpy.logging
 5from rclpy.node import Node
 6
 7from aimdk_msgs.srv import PlayVideo
 8
 9
10class PlayVideoClient(Node):
11    def __init__(self):
12        super().__init__('play_video_client')
13        self.client = self.create_client(
14            PlayVideo, '/face_ui_proxy/play_video')
15        self.get_logger().info('✅ PlayVideo client node created.')
16
17        # Wait for the service to become available
18        while not self.client.wait_for_service(timeout_sec=2.0):
19            self.get_logger().info('⏳ Service unavailable, waiting...')
20
21        self.get_logger().info('🟢 Service available, ready to send request.')
22
23    def send_request(self, video_path, mode, priority):
24        req = PlayVideo.Request()
25
26        req.video_path = video_path
27        req.mode = mode
28        req.priority = priority
29
30        # async call
31        self.get_logger().info(
32            f'📨 Sending request to play video: mode={mode} video={video_path}')
33        for i in range(8):
34            req.header.header.stamp = self.get_clock().now().to_msg()
35            future = self.client.call_async(req)
36            rclpy.spin_until_future_complete(self, future, timeout_sec=0.25)
37
38            if future.done():
39                break
40
41            # retry as remote peer is NOT handled well by ROS
42            self.get_logger().info(f'trying ... [{i}]')
43
44        resp = future.result()
45        if resp is None:
46            self.get_logger().error('❌ Service call not completed or timed out.')
47            return False
48
49        if resp.success:
50            self.get_logger().info(
51                f'✅ Request to play video recorded successfully: {resp.message}')
52            return True
53        else:
54            self.get_logger().error(
55                f'❌ Failed to record play-video request: {resp.message}')
56            return False
57
58
59def main(args=None):
60    rclpy.init(args=args)
61    node = None
62
63    try:
64        # video path and priority can be customized
65        video_path = "/agibot/data/home/agi/zhiyuan.mp4"
66        priority = 5
67        # input play mode
68        mode = int(input("Enter video play mode (1: play once, 2: loop): "))
69        if mode not in (1, 2):
70            raise ValueError(f'invalid mode {mode}')
71
72        node = PlayVideoClient()
73        node.send_request(video_path, mode, priority)
74    except KeyboardInterrupt:
75        pass
76    except Exception as e:
77        rclpy.logging.get_logger('main').error(
78            f'Program exited with exception: {e}')
79
80    if node:
81        node.destroy_node()
82    if rclpy.ok():
83        rclpy.shutdown()
84
85
86if __name__ == '__main__':
87    main()

6.1.18 Audio File Playback

This example uses play_audio, enabling playback of specified audio files, supporting WAV (PCM-encoded) and RAW PCM formats. Other formats such as MP3 are not supported.

Features:

  • Supports WAV (PCM-encoded) and RAW PCM playback; other formats such as MP3 are not supported

  • Supports priority control with configurable playback priority

  • Supports custom file paths and playback parameters

  • Provides complete error handling and status feedback

Technical implementation:

  • Uses the PlayAudioFile service for audio file playback

  • Supports priority settings (priority: 1–10, priority_weight: 1–100)

  • Provides detailed playback status feedback

Use cases:

  • Audio file playback and media control

  • Voice prompts and sound effect playback

  • Multimedia application development

  • Robot interaction audio feedback

Attention

⚠️ Attention! The Interaction Computing Unit (PC3) is independent from the Development Computing Unit (PC2) where secondary development programs run. Audio and video files must be stored on the Interaction Computing Unit (IP: 10.0.1.42).
Audio and video files (and all parent directories up to root) must be readable by all users(new subdirectory under /var/tmp/ is recommended)

 1#!/usr/bin/env python3
 2
 3import os
 4import sys
 5import rclpy
 6import rclpy.logging
 7from rclpy.node import Node
 8
 9from aimdk_msgs.srv import PlayAudioFile
10
11
12class PlayAudioClient(Node):
13    def __init__(self):
14        super().__init__('PlayAudioClient')
15        self.client = self.create_client(
16            PlayAudioFile, '/aimdk_5Fmsgs/srv/PlayAudioFile')
17        self.get_logger().info('✅ PlayAudio client node created.')
18
19        # Wait for the service to become available
20        while not self.client.wait_for_service(timeout_sec=2.0):
21            self.get_logger().info('⏳ Service unavailable, waiting...')
22
23        self.get_logger().info('🟢 Service available, ready to send request.')
24
25    def send_request(self, media_path):
26        req = PlayAudioFile.Request()
27
28        req.file.pkg_name = 'demo_client'
29        req.file.file_name = os.path.basename(media_path)
30        req.file.file_path = os.path.dirname(media_path) + '/'
31        req.file.priority = 6
32        req.file.priority_weight = 0
33
34        self.get_logger().info(
35            f'📨 Sending request to play media: {media_path}')
36        for i in range(5):
37            req.request.header.stamp = self.get_clock().now().to_msg()
38            future = self.client.call_async(req)
39            rclpy.spin_until_future_complete(self, future, timeout_sec=1.0)
40
41            if future.done():
42                break
43
44            if not rclpy.ok():
45                self.get_logger().warn('Interrupted while waiting')
46                return False
47
48            # retry as remote peer is NOT handled well by ROS
49            self.get_logger().info(f'trying ... [{i}]')
50
51        resp = future.result()
52        if resp is None:
53            self.get_logger().error('❌ Service call not completed or timed out.')
54            return False
55
56        if resp.reponse.status.value == 1:
57            self.get_logger().info('✅ Request to play media file recorded successfully.')
58            return True
59        else:
60            self.get_logger().error('❌ Failed to record play-media request.')
61            return False
62
63
64def main(args=None):
65    rclpy.init(args=args)
66    node = None
67
68    default_media = '/agibot/data/var/interaction/tts_cache/normal/demo.wav'
69    try:
70        if len(sys.argv) > 1:
71            media_path = sys.argv[1]
72        else:
73            media_path = input(
74                f'Enter media file path to play (default: {default_media}): ').strip()
75            if not media_path:
76                media_path = default_media
77
78        node = PlayAudioClient()
79        node.send_request(media_path)
80    except KeyboardInterrupt:
81        pass
82    except Exception as e:
83        rclpy.logging.get_logger('main').error(
84            f'Program exited with exception: {e}')
85
86    if node:
87        node.destroy_node()
88    if rclpy.ok():
89        rclpy.shutdown()
90
91
92if __name__ == '__main__':
93    main()

Usage:

# Play default audio file
ros2 run py_examples play_audio

# Play a specified audio file
# Note: replace /path/to/your/audio_file.wav with the actual file path on the interaction unit
ros2 run py_examples play_audio /path/to/your/audio_file.wav

# Play a TTS cached file
ros2 run py_examples play_audio /agibot/data/var/interaction/tts_cache/normal/demo.wav

Example output:

[INFO] [PlayAudioClient]: ✅ Request to play media file recorded successfully.

Notes:

  • Ensure the audio file path is correct and the file exists

  • Supported file formats: WAV (PCM-encoded), RAW PCM. Other formats such as MP3 are not supported

  • Priority settings affect playback queue order

  • The program includes complete exception handling mechanisms

6.1.19 TTS (Text-to-Speech)

This example uses play_tts, which enables the robot to speak the provided text through the node. Users can input different text according to various scenarios.

Features:

  • Supports command-line parameters and interactive input

  • Includes complete service availability checks and error handling

  • Supports priority control and interruption mechanism

  • Provides detailed playback status feedback

Core code

 1#!/usr/bin/env python3
 2
 3import sys
 4import rclpy
 5import rclpy.logging
 6from rclpy.node import Node
 7
 8from aimdk_msgs.srv import PlayTts
 9from aimdk_msgs.msg import TtsPriorityLevel
10
11
12class PlayTTSClient(Node):
13    def __init__(self):
14        super().__init__('play_tts_client')
15
16        # fill in the actual service name
17        self.client = self.create_client(PlayTts, '/aimdk_5Fmsgs/srv/PlayTts')
18        self.get_logger().info('✅ PlayTts client node created.')
19
20        # Wait for the service to become available
21        while not self.client.wait_for_service(timeout_sec=2.0):
22            self.get_logger().info('⏳ Service unavailable, waiting...')
23
24        self.get_logger().info('🟢 Service available, ready to send request.')
25
26    def send_request(self, text):
27        req = PlayTts.Request()
28
29        req.tts_req.text = text
30        req.tts_req.domain = 'demo_client'   # required: caller domain
31        req.tts_req.trace_id = 'demo'        # optional: request id
32        req.tts_req.is_interrupted = True    # required: interrupt same-priority
33        req.tts_req.priority_weight = 0
34        req.tts_req.priority_level.value = 6
35
36        self.get_logger().info(f'📨 Sending request to play tts: text={text}')
37        for i in range(8):
38            req.header.header.stamp = self.get_clock().now().to_msg()
39            future = self.client.call_async(req)
40            rclpy.spin_until_future_complete(self, future, timeout_sec=0.25)
41
42            if future.done():
43                break
44
45            # retry as remote peer is NOT handled well by ROS
46            self.get_logger().info(f'trying ... [{i}]')
47
48        resp = future.result()
49        if resp is None:
50            self.get_logger().error('❌ Service call not completed or timed out.')
51            return False
52
53        if resp.tts_resp.is_success:
54            self.get_logger().info('✅ TTS sent successfully.')
55            return True
56        else:
57            self.get_logger().error('❌ Failed to send TTS.')
58            return False
59
60
61def main(args=None):
62    rclpy.init(args=args)
63    node = None
64
65    try:
66        # get text to speak
67        if len(sys.argv) > 1:
68            text = sys.argv[1]
69        else:
70            text = input('Enter text to speak: ')
71            if not text:
72                text = 'Hello, I am AgiBot X2.'
73
74        node = PlayTTSClient()
75        node.send_request(text)
76    except KeyboardInterrupt:
77        pass
78    except Exception as e:
79        rclpy.logging.get_logger('main').error(
80            f'Program exited with exception: {e}')
81
82    if node:
83        node.destroy_node()
84    if rclpy.ok():
85        rclpy.shutdown()
86
87
88if __name__ == '__main__':
89    main()

Usage

# Play text using command-line parameters (recommended)
ros2 run py_examples play_tts "Hello, I am the AgiBot X2 robot"

# Or run without parameters and the program will prompt for input
ros2 run py_examples play_tts

Example output

[INFO] [play_tts_client_min]: ✅ Speech playback request succeeded

Notes

  • Ensure the TTS service is running properly

  • Supports Chinese and English text playback

  • Priority settings affect playback queue order

  • Interruption feature can stop the currently playing speech

Interface reference

  • Service: /aimdk_5Fmsgs/srv/PlayTts

  • Message: aimdk_msgs/srv/PlayTts

6.1.20 Microphone data reception

This example uses mic_receiver, which subscribes to the /agent/process_audio_output topic to receive noise-reduced audio data from the robot, supporting both internal and external microphone audio streams, and automatically saving complete speech segments as PCM files based on VAD (Voice Activity Detection) status.

Features:

  • Automatically detects speech start, in-progress, and end based on VAD state

  • Automatically saves complete speech segments as PCM files

  • Stores recordings categorized by timestamp and audio stream

  • Supports duration calculation and statistical information output

  • Intelligent buffer management to prevent memory leaks

  • Complete error handling and exception management

  • Detailed debugging log output

VAD state description:

  • 0: No speech

  • 1: Speech start

  • 2: Speech in progress

  • 3: Speech end

Technical implementation:

  • Supports saving PCM audio files (16 kHz, 16-bit, mono)

  • Provides detailed log output and status monitoring

  • Supports real-time audio stream processing and file saving

Use cases:

  • Speech recognition and speech processing

  • Audio data acquisition and analysis

  • Real-time speech monitoring

  • Audio quality evaluation

  • Multi-microphone array data processing

  1#!/usr/bin/env python3
  2"""
  3Microphone data receiving example
  4
  5This example subscribes to the `/agent/process_audio_output` topic to receive the robot's
  6noise-suppressed audio data. It supports both the built-in microphone and the external
  7microphone audio streams, and automatically saves complete speech segments as PCM files
  8based on the VAD (Voice Activity Detection) state.
  9
 10Features:
 11- Supports receiving multiple audio streams at the same time (built-in mic stream_id=1, external mic stream_id=2)
 12- Automatically detects speech start / in-progress / end based on VAD state
 13- Automatically saves complete speech segments as PCM files
 14- Stores files categorized by timestamp and audio stream
 15- Supports audio duration calculation and logging
 16
 17VAD state description:
 18- 0: No speech
 19- 1: Speech start
 20- 2: Speech in progress
 21- 3: Speech end
 22"""
 23
 24import rclpy
 25from rclpy.node import Node
 26from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy
 27from aimdk_msgs.msg import ProcessedAudioOutput, AudioVadStateType
 28import os
 29import time
 30from datetime import datetime
 31from collections import defaultdict
 32from typing import Dict, List
 33
 34
 35class AudioSubscriber(Node):
 36    def __init__(self):
 37        super().__init__('audio_subscriber')
 38
 39        # Audio buffers, stored separately by stream_id
 40        # stream_id -> buffer
 41        self.audio_buffers: Dict[int, List[bytes]] = defaultdict(list)
 42        self.recording_state: Dict[int, bool] = defaultdict(bool)
 43
 44        # Create audio output directory
 45        self.audio_output_dir = "audio_recordings"
 46        os.makedirs(self.audio_output_dir, exist_ok=True)
 47
 48        # VAD state name mapping
 49        self.vad_state_names = {
 50            0: "No speech",
 51            1: "Speech start",
 52            2: "Speech in progress",
 53            3: "Speech end"
 54        }
 55
 56        # Audio stream name mapping
 57        self.stream_names = {
 58            1: "Built-in microphone",
 59            2: "External microphone"
 60        }
 61
 62        # QoS settings
 63        # Note: deep queue to avoid missing data in a burst at start of VAD.
 64        qos = QoSProfile(
 65            history=QoSHistoryPolicy.KEEP_LAST,
 66            depth=500,
 67            reliability=QoSReliabilityPolicy.BEST_EFFORT
 68        )
 69
 70        # Create subscriber
 71        self.subscription = self.create_subscription(
 72            ProcessedAudioOutput,
 73            '/agent/process_audio_output',
 74            self.audio_callback,
 75            qos
 76        )
 77
 78        self.get_logger().info("Start subscribing to noise-suppressed audio data...")
 79
 80    def audio_callback(self, msg: ProcessedAudioOutput):
 81        """Audio data callback"""
 82        try:
 83            stream_id = msg.stream_id
 84            vad_state = msg.audio_vad_state.value
 85            audio_data = bytes(msg.audio_data)
 86
 87            self.get_logger().info(
 88                f"Received audio data: stream_id={stream_id}, "
 89                f"vad_state={vad_state}({self.vad_state_names.get(vad_state, 'Unknown state')}), "
 90                f"audio_size={len(audio_data)} bytes"
 91            )
 92
 93            self.handle_vad_state(stream_id, vad_state, audio_data)
 94
 95        except Exception as e:
 96            self.get_logger().error(
 97                f"Error while processing audio message: {str(e)}")
 98
 99    def handle_vad_state(self, stream_id: int, vad_state: int, audio_data: bytes):
100        """Handle VAD state changes"""
101        stream_name = self.stream_names.get(
102            stream_id, f"Unknown stream {stream_id}")
103        vad_name = self.vad_state_names.get(
104            vad_state, f"Unknown state {vad_state}")
105
106        self.get_logger().info(
107            f"[{stream_name}] VAD state: {vad_name} audio: {len(audio_data)} bytes"
108        )
109
110        # Speech start
111        if vad_state == 1:
112            self.get_logger().info("🎤 Speech start detected")
113            if not self.recording_state[stream_id]:
114                self.audio_buffers[stream_id].clear()
115                self.recording_state[stream_id] = True
116            if audio_data:
117                self.audio_buffers[stream_id].append(audio_data)
118
119        # Speech in progress
120        elif vad_state == 2:
121            self.get_logger().info("🔄 Speech in progress...")
122            if self.recording_state[stream_id] and audio_data:
123                self.audio_buffers[stream_id].append(audio_data)
124
125        # Speech end
126        elif vad_state == 3:
127            self.get_logger().info("✅ Speech end")
128            if self.recording_state[stream_id] and audio_data:
129                self.audio_buffers[stream_id].append(audio_data)
130
131            if self.recording_state[stream_id] and self.audio_buffers[stream_id]:
132                self.save_audio_segment(stream_id)
133            self.recording_state[stream_id] = False
134
135        # No speech
136        elif vad_state == 0:
137            if self.recording_state[stream_id]:
138                self.get_logger().info("⏹️ Reset recording state")
139                self.recording_state[stream_id] = False
140
141        # Print current buffer status
142        buffer_size = sum(len(chunk)
143                          for chunk in self.audio_buffers[stream_id])
144        recording = self.recording_state[stream_id]
145        self.get_logger().debug(
146            f"[Stream {stream_id}] Buffer size: {buffer_size} bytes, recording: {recording}"
147        )
148
149    def save_audio_segment(self, stream_id: int):
150        """Save audio segment"""
151        if not self.audio_buffers[stream_id]:
152            return
153
154        # Merge all audio data
155        audio_data = b''.join(self.audio_buffers[stream_id])
156
157        # Get current timestamp
158        now = datetime.now()
159        timestamp = now.strftime("%Y%m%d_%H%M%S_%f")[:-3]  # to milliseconds
160
161        # Create subdirectory by stream_id
162        stream_dir = os.path.join(self.audio_output_dir, f"stream_{stream_id}")
163        os.makedirs(stream_dir, exist_ok=True)
164
165        # Generate filename
166        stream_name = "internal_mic" if stream_id == 1 else "external_mic" if stream_id == 2 else f"stream_{stream_id}"
167        filename = f"{stream_name}_{timestamp}.pcm"
168        filepath = os.path.join(stream_dir, filename)
169
170        try:
171            # Save PCM file
172            with open(filepath, 'wb') as f:
173                f.write(audio_data)
174
175            self.get_logger().info(
176                f"Audio segment saved: {filepath} (size: {len(audio_data)} bytes)")
177
178            # Calculate audio duration (assuming 16 kHz, 16-bit, mono)
179            sample_rate = 16000
180            bits_per_sample = 16
181            channels = 1
182            bytes_per_sample = bits_per_sample // 8
183            total_samples = len(audio_data) // (bytes_per_sample * channels)
184            duration_seconds = total_samples / sample_rate
185
186            self.get_logger().info(
187                f"Audio duration: {duration_seconds:.2f} s ({total_samples} samples)")
188
189        except Exception as e:
190            self.get_logger().error(f"Failed to save audio file: {str(e)}")
191
192
193def main(args=None):
194    rclpy.init(args=args)
195    node = AudioSubscriber()
196
197    try:
198        node.get_logger().info("Listening to noise-suppressed audio data, press Ctrl+C to exit...")
199        rclpy.spin(node)
200    except KeyboardInterrupt:
201        node.get_logger().info("Interrupt signal received, exiting...")
202    finally:
203        node.destroy_node()
204        if rclpy.ok():
205            rclpy.shutdown()
206
207
208if __name__ == '__main__':
209    main()

Usage:

  1. Run the program:

    # Build the Python package
    colcon build --packages-select py_examples
    
    # Run the microphone receiver and activate VAD with wake words
    ros2 run py_examples mic_receiver
    
  2. Directory structure:

    • After running, an audio_recordings directory is created automatically

    • Audio files are stored by stream_id (currently not distinguished; stream_1 is always used):

      • stream_1/: Internal microphone audio

      • stream_2/: External microphone audio

  3. File naming format: {stream_name}_{timestamp}.pcm

    • internal_mic_20250909_133649_738.pcm (internal microphone)

    • external_mic_20250909_133650_123.pcm (external microphone)

  4. Audio format: 16 kHz, 16-bit, mono PCM

  5. Play the saved PCM file:

    # Play internal microphone recording
    aplay -r 16000 -f S16_LE -c 1 audio_recordings/stream_1/internal_mic_20250909_133649_738.pcm
    
    # Play external microphone recording
    aplay -r 16000 -f S16_LE -c 1 audio_recordings/stream_2/external_mic_20250909_133650_123.pcm
    
  6. Convert to WAV format (optional):

    # Convert to WAV format using ffmpeg
    ffmpeg -f s16le -ar 16000 -ac 1 -i external_mic_20250909_133649_738.pcm output.wav
    
  7. Program control:

    • Press Ctrl+C to safely exit the program

    • The program automatically handles audio stream start and end

    • Supports processing multiple audio streams simultaneously

Example output:

Normal startup and operation:

[INFO] Started subscribing to noise-reduced audio data...
[INFO] Received audio data: stream_id=1, vad_state=0(no speech), audio_size=0 bytes
[INFO] [Internal mic] VAD state: no speech Audio data: 0 bytes
[INFO] Received audio data: stream_id=2, vad_state=0(no speech), audio_size=0 bytes
[INFO] [External mic] VAD state: no speech Audio data: 0 bytes

Speech start detected:

[INFO] Received audio data: stream_id=2, vad_state=1(speech start), audio_size=320 bytes
[INFO] [External mic] VAD state: speech start Audio data: 320 bytes
[INFO] 🎤 Speech start detected

Speech processing:

[INFO] Received audio data: stream_id=2, vad_state=2(speech in progress), audio_size=320 bytes
[INFO] [External mic] VAD state: speech in progress Audio data: 320 bytes
[INFO] 🔄 Speech in progress...
[INFO] Received audio data: stream_id=2, vad_state=2(speech in progress), audio_size=320 bytes
[INFO] [External mic] VAD state: speech in progress Audio data: 320 bytes
[INFO] 🔄 Speech in progress...

Speech end and save:

[INFO] Received audio data: stream_id=2, vad_state=3(speech end), audio_size=320 bytes
[INFO] [External mic] VAD state: speech end Audio data: 320 bytes
[INFO] ✅ Speech ended
[INFO] Audio segment saved: audio_recordings/stream_2/external_mic_20250909_133649_738.pcm (size: 960 bytes)
[INFO] Audio duration: 0.06 seconds (480 samples)

Simultaneous multi-stream processing:

[INFO] Received audio data: stream_id=1, vad_state=1(speech start), audio_size=320 bytes
[INFO] [Internal mic] VAD state: speech start Audio data: 320 bytes
[INFO] 🎤 Speech start detected
[INFO] Received audio data: stream_id=2, vad_state=1(speech start), audio_size=320 bytes
[INFO] [External mic] VAD state: speech start Audio data: 320 bytes
[INFO] 🎤 Speech start detected

Program exit:

^C[INFO] Interrupt signal received, exiting...
[INFO] Program exited safely

Notes:

  • The program supports processing multiple audio streams (internal and external microphones)

  • Each audio stream has an independent buffer and recording state

  • Audio files are automatically saved by timestamp and audio stream

  • The program includes complete error handling mechanisms to ensure stable operation

6.1.21 Raw Microphone Audio Stream Receiver

  1#!/usr/bin/env python3
  2"""
  3Microphone raw data receiving example
  4
  5This example subscribes to the `/aima/hal/audio/capture` topic to receive the robot's
  6raw audio data. It supports both the built-in microphone and the external
  7microphone audio streams, and automatically saves complete speech segments as PCM files.
  8
  9Features:
 10- Automatically saves raw audio as PCM files
 11- Stores files categorized by timestamp and mic source
 12
 13"""
 14
 15import rclpy
 16from rclpy.node import Node
 17from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy
 18from aimdk_msgs.msg import AudioCapture
 19import os
 20import time
 21from datetime import datetime
 22from collections import defaultdict
 23from typing import Dict, List
 24
 25
 26class RawAudioSubscriber(Node):
 27    def __init__(self):
 28        super().__init__('raw_audio_subscriber')
 29
 30        # Audio buffers, stored separately by channels
 31        self.audio_buffers: Dict[int, List[bytes]] = defaultdict(list)
 32        self.mic_channels = 0
 33        self.ref_channels = 0
 34        self.inited = 0
 35        self.dump_timestamp = 0
 36
 37        # Create audio output directory
 38        now = datetime.now()
 39        timestamp = now.strftime("%Y%m%d_%H%M%S_%f")[:-3]  # to milliseconds
 40
 41        self.audio_output_dir = os.path.join("audio_recordings", timestamp)
 42        os.makedirs(self.audio_output_dir, exist_ok=True)
 43
 44        # QoS settings
 45        # Note: deep queue to avoid missing data
 46        qos = QoSProfile(
 47            history=QoSHistoryPolicy.KEEP_LAST,
 48            depth=500,
 49            reliability=QoSReliabilityPolicy.BEST_EFFORT
 50        )
 51
 52        # Create subscriber
 53        self.subscription = self.create_subscription(
 54            AudioCapture,
 55            '/aima/hal/audio/capture',
 56            self.audio_callback,
 57            qos
 58        )
 59
 60        self.get_logger().info("Start subscribing to raw audio data...")
 61
 62    def audio_callback(self, msg: AudioCapture):
 63        """Audio data callback"""
 64        if not self.inited:
 65            self.mic_channels = msg.mic_channels
 66            self.ref_channels = msg.ref_channels
 67            self.dump_timestamp = time.time()
 68            self.inited = 1
 69        elif self.mic_channels != msg.mic_channels or self.ref_channels != msg.ref_channels:
 70            # quit as MIC switched
 71            self.get_logger().error(
 72                "MIC channels info changed (mic:{self.mic_channels} ref:{self.ref_channels}) -> (mic:{msg.mic_channels} ref:{msg.ref_channels})")
 73            if rclpy.ok():
 74                rclpy.shutdown()
 75            return
 76        audio_data = bytes(msg.data.data)
 77
 78        self.handle_audio_data(audio_data)
 79
 80    def handle_audio_data(self, audio_data: bytes):
 81        """Handle raw audio data"""
 82
 83        # Split S16LE data into channels
 84        channels = self.ref_channels + self.mic_channels
 85        bytes_per_channel = 2
 86        unit_size = channels * bytes_per_channel
 87        base = 0
 88        for k in range(len(audio_data) // unit_size):
 89            for i in range(channels):
 90                tail = base + bytes_per_channel
 91                self.audio_buffers[i].append(audio_data[base:tail])
 92                base = tail
 93
 94    def save_audio_segments(self):
 95        self.get_logger().info("🔄 Flashing cached audio data...")
 96        for i in range(self.mic_channels + self.ref_channels):
 97            self.save_audio_segment(i, i >= self.mic_channels)
 98
 99    def save_audio_segment(self, channel: int, is_ref: bool):
100        """Save audio segment"""
101        if not self.audio_buffers[channel]:
102            return
103
104        # Merge all audio data
105        audio_data = b''.join(self.audio_buffers[channel])
106        self.audio_buffers[channel].clear()
107
108        channel_type = 'mic'
109        if is_ref:
110            channel_type = 'ref'
111
112        # Generate filename
113        filepath = os.path.join(self.audio_output_dir,
114                                f"channel_{channel}_{channel_type}.pcm")
115
116        try:
117            # Save PCM file
118            with open(filepath, 'ab') as f:
119                f.write(audio_data)
120
121            self.get_logger().info(
122                f"Audio segment saved: {filepath} append: {len(audio_data)} bytes)")
123
124        except Exception as e:
125            self.get_logger().error(f"Failed to save audio file: {str(e)}")
126
127    def run(self):
128        self.timer = self.create_timer(1.0, self.save_audio_segments)
129        rclpy.spin(self)
130
131
132def main(args=None):
133    rclpy.init(args=args)
134    node = None
135
136    try:
137        node = RawAudioSubscriber()
138        node.get_logger().info("Listening to raw audio data, press Ctrl+C to exit...")
139        node.run()
140    except KeyboardInterrupt:
141        rclpy.logging.get_logger('main').info(
142            "Interrupt signal received, exiting...")
143
144    if node:
145        node.destroy_node()
146    if rclpy.ok():
147        rclpy.shutdown()
148
149
150if __name__ == '__main__':
151    main()

Usage:

  1. Run the program:

    # Build the Python package
    colcon build --packages-select py_examples
    
    # Run the mic receiver
    ros2 run py_examples mic_raw_receiver
    
  2. Directory structure:

    • After running, an audio_recordings directory is created automatically

    • Audio files are grouped by the program’s start time into a subdirectory:

      • 20250909_133650_123

  3. File naming format: channel_{index}_{type}.pcm, e.g.

    • channel_0_mic.pcm (microphone)

    • channel_4_ref.pcm (reference / echo-cancellation)

  4. Audio format: 16 kHz, 16-bit, mono PCM

  5. Play the saved PCM file:

    • Transfer to a computer and play

    • Play on the robot:

      # Use the raw stream playback example
      ros2 run py_examples play_audio_stream --ros-args -p raw_audio_path:=$PWD/audio_recordings/20250909_133650_123/channel_0_mic.pcm
      # Or use the audio file playback example (copy the file to PC3 first and use its path)
      ros2 run py_examples play_audio please/use/path/on/PC3.pcm
      

6.1.22 Get Current Microphone Source

 1#!/usr/bin/env python3
 2
 3import sys
 4import rclpy
 5import rclpy.logging
 6from rclpy.node import Node
 7
 8from aimdk_msgs.srv import GetMicSourceRequest
 9from aimdk_msgs.msg import CommonRequest, CommonState
10
11
12class GetMicSourceRequestClient(Node):
13    def __init__(self):
14        super().__init__('get_mic_source_request_client')
15        self.client = self.create_client(
16            GetMicSourceRequest, '/aimdk_5Fmsgs/srv/GetMicSourceRequest'
17        )
18        self.get_logger().info('✅ GetMicSourceRequest client node created.')
19
20        # Wait for the service to become available
21        while not self.client.wait_for_service(timeout_sec=2.0):
22            self.get_logger().info('⏳ Service unavailable, waiting...')
23
24        self.get_logger().info('🟢 Service available, ready to send request.')
25
26    def send_request(self):
27        req = GetMicSourceRequest.Request()
28        req.header = CommonRequest()
29
30        self.get_logger().info(
31            f'📨 Sending request to get MIC source')
32        for i in range(8):
33            req.header.header.stamp = self.get_clock().now().to_msg()
34            future = self.client.call_async(req)
35            rclpy.spin_until_future_complete(self, future, timeout_sec=0.25)
36
37            if future.done():
38                break
39
40            # retry as remote peer is NOT handled well by ROS
41            self.get_logger().info(f'trying ... [{i}]')
42
43        response = future.result()
44        if response is None:
45            self.get_logger().error('❌ Service call failed or timed out.')
46            return
47
48        if response.header.status.value == CommonState.SUCCESS:
49            self.get_logger().info('✅ MIC source get successfully.')
50            self.get_logger().info(f'MIC id: {response.mic_source}')
51        else:
52            self.get_logger().error(
53                f'❌ Failed to get MIC source: {response.header.message}'
54            )
55
56
57def main(args=None):
58    rclpy.init(args=args)
59    node = None
60    try:
61        node = GetMicSourceRequestClient()
62        node.send_request()
63    except KeyboardInterrupt:
64        pass
65    except Exception as e:
66        rclpy.logging.get_logger('main').error(
67            f'Program exited with exception: {e}')
68
69    if node:
70        node.destroy_node()
71    if rclpy.ok():
72        rclpy.shutdown()
73
74
75if __name__ == '__main__':
76    main()

6.1.23 Switch Microphone Source

  1#!/usr/bin/env python3
  2
  3import sys
  4import rclpy
  5import rclpy.logging
  6from rclpy.node import Node
  7
  8from aimdk_msgs.srv import SetMicSourceRequest
  9from aimdk_msgs.msg import CommonRequest, CommonState
 10
 11
 12class SetMicSourceRequestClient(Node):
 13    def __init__(self):
 14        super().__init__('set_mic_source_request_client')
 15        self.client = self.create_client(
 16            SetMicSourceRequest, '/aimdk_5Fmsgs/srv/SetMicSourceRequest'
 17        )
 18        self.get_logger().info('✅ SetMicSourceRequest client node created.')
 19
 20        # Wait for the service to become available
 21        while not self.client.wait_for_service(timeout_sec=2.0):
 22            self.get_logger().info('⏳ Service unavailable, waiting...')
 23
 24        self.get_logger().info('🟢 Service available, ready to send request.')
 25
 26    def send_request(self, mic_source: int):
 27        req = SetMicSourceRequest.Request()
 28        req.header = CommonRequest()
 29
 30        req.mic_source = mic_source
 31
 32        self.get_logger().info(
 33            f'📨 Sending request to set mic source: {mic_source}')
 34        for i in range(8):
 35            req.header.header.stamp = self.get_clock().now().to_msg()
 36            future = self.client.call_async(req)
 37            rclpy.spin_until_future_complete(self, future, timeout_sec=0.25)
 38
 39            if future.done():
 40                break
 41
 42            # retry as remote peer is NOT handled well by ROS
 43            self.get_logger().info(f'trying ... [{i}]')
 44
 45        response = future.result()
 46        if response is None:
 47            self.get_logger().error('❌ Service call failed or timed out.')
 48            return
 49
 50        if response.header.status.value == CommonState.SUCCESS:
 51            self.get_logger().info('✅ MIC source set successfully.')
 52        else:
 53            self.get_logger().error(
 54                f'❌ Failed to set MIC source: {response.header.message}'
 55            )
 56
 57
 58def main(args=None):
 59    mic_info = {
 60        0: ('int', 'internal MIC'),
 61        1: ('ext', 'external MIC'),
 62    }
 63
 64    choices = {}
 65    for k, v in mic_info.items():
 66        choices[v[0]] = k
 67
 68    rclpy.init(args=args)
 69    node = None
 70    try:
 71        # Prefer command-line argument, otherwise prompt for input
 72        if len(sys.argv) > 1:
 73            mic = sys.argv[1]
 74        else:
 75            print('{:<4} - {:<5} : {}'.format('abbr',
 76                  'mic_id', 'description'))
 77            for k, v in mic_info.items():
 78                print(f'{v[0]:<4} - {k:<5} : {v[1]}')
 79            mic = input('Enter abbr of MIC source:')
 80
 81        mic_id = choices.get(mic)
 82        if mic_id is None:
 83            raise ValueError(f'Invalid abbr of MIC source: {mic}')
 84
 85        node = SetMicSourceRequestClient()
 86        node.send_request(mic_id)
 87    except KeyboardInterrupt:
 88        pass
 89    except Exception as e:
 90        rclpy.logging.get_logger('main').error(
 91            f'Program exited with exception: {e}')
 92
 93    if node:
 94        node.destroy_node()
 95    if rclpy.ok():
 96        rclpy.shutdown()
 97
 98
 99if __name__ == '__main__':
100    main()

Usage

# Build
colcon build --packages-select py_examples

# Interactive run
ros2 run py_examples set_mic_source

# Run with argument
ros2 run py_examples set_mic_source ext  # ext - external mic, int - built-in mic

6.1.24 Raw Audio Stream Playback

  1#!/usr/bin/env python3
  2"""
  3Raw audio stream playback example
  4
  5This example would get audio focus first and then publish raw audio data
  6at fixed rate to `/aima/hal/audio/playback` topic. And it would stop playing
  7when audio focus is lost.
  8
  9"""
 10
 11import rclpy
 12from rclpy.node import Node
 13from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSDurabilityPolicy, QoSHistoryPolicy
 14from aimdk_msgs.msg import AudioPlayback, AudioInfo, AudioData, FocusResponse, FocusRequester, CommonState
 15from aimdk_msgs.srv import RequestAudioFocus, AbandonAudioFocus
 16import os
 17import signal
 18import time
 19import sys
 20
 21global_node = None
 22
 23
 24def signal_handler(sig, frame):
 25    global global_node
 26    if global_node is not None:
 27        if global_node.is_holding_focus():
 28            global_node.send_request(False)
 29        global_node.get_logger().info(
 30            f"Received signal {sig}, abandon audio focus and shutting down")
 31    if rclpy.ok():
 32        rclpy.shutdown()
 33    sys.exit(0)
 34
 35
 36class FakeMicDevice():
 37    def __init__(self, raw_audio_file):
 38        self.audio_data = []
 39        self.channels = 1
 40        self.sample_rate = 16000
 41        self.bytes_per_sample = 2  # S16LE
 42        self.audio_length = 0
 43        with open(raw_audio_file, 'rb') as f:
 44            self.audio_data = list(f.read())
 45            self.audio_length = len(self.audio_data)
 46
 47        self.index = 0
 48        self.start_time = 0
 49
 50    def get_channel_count(self):
 51        return self.channels
 52
 53    def get_sample_rate(self):
 54        return self.sample_rate
 55
 56    def get_cached_data(self):
 57        now = time.monotonic()
 58        if not self.start_time:
 59            # fake init: assume audio data started 1s ago
 60            self.start_time = now - 1
 61
 62        # cached data range since last call
 63        tail = int((now - self.start_time) * self.sample_rate) * \
 64            self.channels * self.bytes_per_sample
 65        data = self.audio_data[self.index:tail]
 66        if tail < self.audio_length:
 67            self.index = tail
 68        else:
 69            # loopback
 70            self.index = 0
 71            self.start_time = now
 72        return data
 73
 74
 75class AudioStreamPlayer(Node):
 76    def __init__(self):
 77        super().__init__('audio_stream_player')
 78
 79        self.declare_parameter('raw_audio_path', '')
 80        self.raw_audio_path = self.get_parameter('raw_audio_path').value
 81
 82        self.mic_device = FakeMicDevice(self.raw_audio_path)
 83
 84        self.pkg_name = 'audio_stream_player{}'.format(os.getpid())
 85        self.focus = False
 86        self.focus_force = True
 87
 88        self.get_logger().info(f'local pkg name: {self.pkg_name}')
 89        # Create focus request/release clients
 90        self.request_client = self.create_client(
 91            RequestAudioFocus,
 92            '/aimdk_5Fmsgs/srv/RequestAudioFocus')
 93        self.release_client = self.create_client(
 94            AbandonAudioFocus,
 95            '/aimdk_5Fmsgs/srv/AbandonAudioFocus')
 96
 97        # Wait for the service to become available
 98        while not self.request_client.wait_for_service(timeout_sec=2.0):
 99            self.get_logger().info('⏳ Service unavailable, waiting...')
100        self.get_logger().info('🟢 Service available, ready to send request.')
101
102        # Create focus event subscriber and raw audio publisher
103        focus_qos = QoSProfile(
104            reliability=QoSReliabilityPolicy.RELIABLE,
105            history=QoSHistoryPolicy.KEEP_LAST,
106            depth=10,
107            durability=QoSDurabilityPolicy.VOLATILE
108        )
109        self.sub = self.create_subscription(
110            FocusResponse,
111            '/aima/hal/audio/focus_response',
112            self.focus_event_callback, focus_qos
113        )
114        self.pub = self.create_publisher(
115            AudioPlayback,
116            '/aima/hal/audio/playback', 10
117        )
118
119    def send_request(self, focus: bool):
120        if focus:
121            self.send_focus_request()
122        else:
123            self.send_focus_release()
124
125    def send_focus_request(self):
126        req = RequestAudioFocus.Request()
127
128        requester = FocusRequester()
129        requester.pkg_name = self.pkg_name
130        requester.priority = 6
131
132        req.focus_requester = requester
133
134        self.get_logger().info('📨 Sending RequestAudioFocus request')
135        for i in range(8):
136            future = self.request_client.call_async(req)
137            rclpy.spin_until_future_complete(self, future, timeout_sec=0.25)
138
139            if future.done():
140                break
141
142            # retry as remote peer is NOT handled well by ROS
143            self.get_logger().info(f'trying ... [{i}]')
144
145        response = future.result()
146        if response is None:
147            self.get_logger().error('❌ Service call failed or timed out.')
148            return False
149
150        if response.reponse.status.value == CommonState.SUCCESS:
151            focus = response.focus_response.focus_gain
152            self.focus = focus
153            self.get_logger().info(f'✅ RequestAudioFocus done: focus {focus}')
154            return True
155        else:
156            self.get_logger().error(
157                f'❌ Failed in response of RequestAudioFocus: {response.reponse.message}'
158            )
159            return False
160
161    def send_focus_release(self):
162        req = AbandonAudioFocus.Request()
163
164        requester = FocusRequester()
165        requester.pkg_name = self.pkg_name
166
167        req.focus_requester = requester
168
169        self.get_logger().info('📨 Sending AbandonAudioFocus request')
170        for i in range(8):
171            future = self.release_client.call_async(req)
172            rclpy.spin_until_future_complete(self, future, timeout_sec=0.25)
173
174            if future.done():
175                break
176
177            # retry as remote peer is NOT handled well by ROS
178            self.get_logger().info(f'trying ... [{i}]')
179
180        response = future.result()
181        if response is None:
182            self.get_logger().error('❌ Service call failed or timed out.')
183            return False
184
185        if response.reponse.status.value == CommonState.SUCCESS:
186            focus = response.focus_response.focus_gain
187            # always focus False
188            self.focus = focus
189            self.get_logger().info(f'✅ AbandonAudioFocus done: focus {focus}')
190            return True
191        else:
192            self.get_logger().error(
193                f'❌ Failed in response of AbandonAudioFocus: {response.reponse.message}'
194            )
195            return False
196
197    def focus_event_callback(self, msg: FocusResponse):
198        pkg_name = msg.pkg_name
199        focus = msg.focus_gain
200        if msg.pkg_name == self.pkg_name:
201            self.get_logger().info(
202                f'receive foucs out event: focus state: {focus}')
203            self.focus_force = msg.focus_gain
204
205    def is_holding_focus(self):
206        return self.focus and self.focus_force
207
208    def spin_untill_focus(self):
209        while rclpy.ok():
210            self.focus_force = True
211            self.send_request(True)
212            if self.is_holding_focus():
213                return
214            self.get_logger().info("need retry to get focus")
215            t1 = t0 = time.monotonic()
216            while rclpy.ok() and t1 - t0 < 1.0:
217                rclpy.spin_once(self, timeout_sec=0.1)
218                t1 = time.monotonic()
219        return
220
221    def run_once(self):
222        """ flush raw audio data cached since last call """
223
224        raw_audio_msg = AudioPlayback()
225
226        raw_audio_msg.pkg_name = self.pkg_name
227        raw_audio_msg.token_id = self.pkg_name
228        # fill AudioInfo
229        audio_info = AudioInfo()
230        audio_info.channels = self.mic_device.get_channel_count()
231        audio_info.sample_rate = self.mic_device.get_sample_rate()
232        # fill raw audio data
233        audio_data = AudioData()
234        cached_data = self.mic_device.get_cached_data()
235        audio_data.data = cached_data
236        raw_audio_msg.info = audio_info
237        raw_audio_msg.data = audio_data
238        # publish
239        self.pub.publish(raw_audio_msg)
240
241    def run(self):
242        self.get_logger().info('🟢 publishing audio data ...')
243        while rclpy.ok() and self.is_holding_focus():
244            self.run_once()
245            rclpy.spin_once(self, timeout_sec=0.05)
246            time.sleep(0.05)
247
248        if not self.is_holding_focus():
249            self.get_logger().info("focus out, exiting...")
250
251
252def main(args=None):
253    global global_node
254    rclpy.init(args=args)
255    node = None
256
257    try:
258        node = AudioStreamPlayer()
259
260        global_node = node
261        signal.signal(signal.SIGINT, signal_handler)
262        signal.signal(signal.SIGTERM, signal_handler)
263
264        node.spin_untill_focus()
265        node.run()
266
267    except KeyboardInterrupt:
268        rclpy.logging.get_logger('main').info(
269            "Interrupt signal received, exiting...")
270    except Exception as e:
271        rclpy.logging.get_logger('main').error(
272            f'Program exited with exception: {e}')
273
274    if node:
275        node.destroy_node()
276    if rclpy.ok():
277        rclpy.shutdown()
278
279
280if __name__ == '__main__':
281    main()

Usage

# Replace /path/to/your/raw_audio_file.pcm with the actual PCM file path (mono, 16 kHz, S16LE)
ros2 run py_examples play_audio_stream --ros-args -p raw_audio_path:=/path/to/your/raw_audio_file.pcm

6.1.25 Emoji (expression) control

This example uses play_emoji, which enables the robot to display a specified expression. Users can select expressions from the available list. For the full expression list, refer to the expression list

 1#!/usr/bin/env python3
 2
 3import rclpy
 4import rclpy.logging
 5from rclpy.node import Node
 6
 7from aimdk_msgs.srv import PlayEmoji
 8
 9
10class PlayEmojiClient(Node):
11    def __init__(self):
12        super().__init__('play_emoji_client')
13        self.client = self.create_client(
14            PlayEmoji, '/face_ui_proxy/play_emoji')
15        self.get_logger().info('✅ PlayEmoji client node created.')
16
17        # Wait for the service to become available
18        while not self.client.wait_for_service(timeout_sec=2.0):
19            self.get_logger().info('⏳ Service unavailable, waiting...')
20
21        self.get_logger().info('🟢 Service available, ready to send request.')
22
23    def send_request(self, emoji: int, mode: int, priority: int):
24        req = PlayEmoji.Request()
25
26        req.emotion_id = int(emoji)
27        req.mode = int(mode)
28        req.priority = int(priority)
29
30        self.get_logger().info(
31            f'📨 Sending request to play emoji: id={emoji}, mode={mode}, priority={priority}')
32        for i in range(8):
33            req.header.header.stamp = self.get_clock().now().to_msg()
34            future = self.client.call_async(req)
35            rclpy.spin_until_future_complete(self, future, timeout_sec=0.25)
36
37            if future.done():
38                break
39
40            # retry as remote peer is NOT handled well by ROS
41            self.get_logger().info(f'trying ... [{i}]')
42
43        resp = future.result()
44        if resp is None:
45            self.get_logger().error('❌ Service call not completed or timed out.')
46            return False
47
48        if resp.success:
49            self.get_logger().info(
50                f'✅ Emoji played successfully: {resp.message}')
51            return True
52        else:
53            self.get_logger().error(f'❌ Failed to play emoji: {resp.message}')
54            return False
55
56
57def main(args=None):
58    rclpy.init(args=args)
59    node = None
60
61    # Interactive input, same as the original C++ version
62    try:
63        emotion = int(
64            input("Enter emoji ID: 1-blink, 60-bored, 70-abnormal, 80-sleeping, 90-happy ... 190-double angry, 200-adore: "))
65        mode = int(input("Enter play mode (1: play once, 2: loop): "))
66        if mode not in (1, 2):
67            raise ValueError("invalid mode")
68        priority = 10  # default priority
69
70        node = PlayEmojiClient()
71        node.send_request(emotion, mode, priority)
72    except KeyboardInterrupt:
73        pass
74    except Exception as e:
75        rclpy.logging.get_logger('main').error(
76            f'Program exited with exception: {e}')
77
78    if node:
79        node.destroy_node()
80    if rclpy.ok():
81        rclpy.shutdown()
82
83
84if __name__ == '__main__':
85    main()

6.1.26 Slam

In this example, slam is used to send messages to the mapping service via this node to achieve mapping functionality.

 1import sys
 2import rclpy
 3from rclpy.node import Node
 4from rclpy.qos import QoSProfile, QoSDurabilityPolicy, QoSReliabilityPolicy
 5from std_msgs.msg import String
 6
 7
 8class SlamCommandPublisher(Node):
 9    def __init__(self):
10        super().__init__('slam_command_publisher')
11        qos = QoSProfile(
12            depth=10,
13            durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
14            reliability=QoSReliabilityPolicy.RELIABLE,
15        )
16        self.publisher_ = self.create_publisher(
17            String, '/integrated_command', qos)
18
19    def publish_start_mapping(self):
20        msg = String()
21        msg.data = 'start_mapping'
22        self.get_logger().info(f'Publishing: "{msg.data}"')
23        self.publisher_.publish(msg)
24
25    def publish_stop_mapping(self, map_name):
26        msg = String()
27        msg.data = f'stop_mapping:{map_name}'
28        self.get_logger().info(f'Publishing: "{msg.data}"')
29        self.publisher_.publish(msg)
30
31
32def main(args=None):
33    rclpy.init(args=args)
34    slam_command_publisher = SlamCommandPublisher()
35
36    try:
37        while rclpy.ok():
38            input_value = input(
39                "Enter 1 to start mapping, 2 to stop mapping: ")
40
41            if input_value == '1':
42                slam_command_publisher.publish_start_mapping()
43            elif input_value == '2':
44                map_name = input("Enter map name: ")
45                slam_command_publisher.publish_stop_mapping(map_name)
46            else:
47                print("Invalid input. Please enter 1 or 2.")
48    except KeyboardInterrupt:
49        pass
50    finally:
51        slam_command_publisher.destroy_node()
52        if rclpy.ok():
53            rclpy.shutdown()
54
55
56if __name__ == '__main__':
57    main()

Usage

ros2 run py_examples slam
Enter 1 to start mapping, 2 to stop mapping: 1
[INFO] [1772521830.713768092] [slam_command_publisher]: Publishing: 'start_mapping'
Enter 1 to start mapping, 2 to stop mapping: 2
Enter map name: testmap
[INFO] [1772521837.760016698] [slam_command_publisher]: Publishing: 'stop_mapping:testmap'

After starting the mapping process, push the machine or control the robot to move in order to complete the mapping.

6.1.27 Relocate

This example uses the map built in the Map Building step for localization.

  1#!/usr/bin/env python3
  2
  3import rclpy
  4from rclpy.node import Node
  5from std_msgs.msg import String
  6from geometry_msgs.msg import Pose
  7from nav_msgs.msg import Odometry
  8from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSDurabilityPolicy
  9
 10
 11class RelocalizationNode(Node):
 12    def __init__(self):
 13        super().__init__('relocalization_node')
 14
 15        # /integrated_command subscriber requires TRANSIENT_LOCAL durability
 16        command_qos = QoSProfile(
 17            depth=10,
 18            durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
 19        )
 20
 21        # Create publishers
 22        self.integrated_command_pub = self.create_publisher(
 23            String, '/integrated_command', command_qos)
 24        self.relocalization_pose_pub = self.create_publisher(
 25            Pose, '/relocalization_pose', 10)
 26
 27        # Create subscriber with BEST_EFFORT QoS
 28        lidar_loc_qos = QoSProfile(
 29            depth=10,
 30            reliability=QoSReliabilityPolicy.BEST_EFFORT
 31        )
 32
 33        self.odometry_sub = self.create_subscription(
 34            Odometry,
 35            '/slam/lidar_odom',
 36            self.odometry_callback,
 37            qos_profile=lidar_loc_qos
 38        )
 39
 40        # Setup timer to publish messages in sequence
 41        self.success_received = False
 42        self.timeout_timer = None
 43
 44        # Wait for subscribers before publishing
 45        self.publish_timer = self.create_timer(0.2, self.wait_and_publish)
 46
 47    def wait_and_publish(self):
 48        if (self.integrated_command_pub.get_subscription_count() == 0 or
 49                self.relocalization_pose_pub.get_subscription_count() == 0):
 50            self.get_logger().info(
 51                f'Waiting for subscribers... '
 52                f'(command: {self.integrated_command_pub.get_subscription_count()}, '
 53                f'pose: {self.relocalization_pose_pub.get_subscription_count()})',
 54                throttle_duration_sec=2.0)
 55            return
 56        self.publish_timer.cancel()
 57        self.publish_sequence()
 58
 59    def publish_sequence(self):
 60        # Publish integrated_command
 61        integrated_command_msg = String()
 62        integrated_command_msg.data = 'start_relocalization:1778133306741'
 63        self.integrated_command_pub.publish(integrated_command_msg)
 64        self.get_logger().info('Published integrated_command')
 65
 66        # Schedule relocalization_pose publication after delay
 67        self.relocalization_pose_timer = self.create_timer(
 68            1.0,
 69            self.publish_relocalization_pose
 70        )
 71
 72    def publish_relocalization_pose(self):
 73        # Cancel this timer immediately so it only fires once
 74        self.relocalization_pose_timer.cancel()
 75
 76        relocalization_pose_msg = Pose()
 77        relocalization_pose_msg.position.x = 1041.0
 78        relocalization_pose_msg.position.y = 630.0
 79        relocalization_pose_msg.position.z = 0.0
 80        relocalization_pose_msg.orientation.x = 0.0
 81        relocalization_pose_msg.orientation.y = 0.0
 82        relocalization_pose_msg.orientation.z = 0.0
 83        relocalization_pose_msg.orientation.w = 1.0
 84
 85        self.relocalization_pose_pub.publish(relocalization_pose_msg)
 86        self.get_logger().info('Published relocalization_pose')
 87
 88        # Start timeout timer (30 seconds)
 89        self.timeout_timer = self.create_timer(
 90            30.0,
 91            self.timeout_callback
 92        )
 93
 94        self.get_logger().info('Waiting for robot pose data (timeout: 30s)...')
 95
 96    def odometry_callback(self, msg):
 97        if not self.success_received:
 98            self.success_received = True
 99            if self.timeout_timer is not None:
100                self.timeout_timer.cancel()
101            self.get_logger().info('Received odometry data - Relocalization successful!')
102            if rclpy.ok():
103                rclpy.shutdown()
104
105    def timeout_callback(self):
106        if not self.success_received:
107            self.get_logger().error('Timeout reached - Relocalization failed!')
108            if rclpy.ok():
109                rclpy.shutdown()
110
111
112def main(args=None):
113    rclpy.init(args=args)
114    node = RelocalizationNode()
115    try:
116        rclpy.spin(node)
117    except KeyboardInterrupt:
118        pass
119    finally:
120        if rclpy.ok():
121            rclpy.shutdown()
122
123
124if __name__ == '__main__':
125    main()

Usage

ros2 run py_examples relocate 
[INFO] [1772526167.342953490] [relocalization_node]: Published integrated_command
[INFO] [1772526168.346034723] [relocalization_node]: Published relocalization_pose
[INFO] [1772526168.346915278] [relocalization_node]: Waiting for robot pose data (timeout: 5s)...
[INFO] [1772526173.347455156] [relocalization_node]: Received odometry data - Relocalization successful!

Move the robot to the starting point of the map construction, then run the relocation program.

How to obtain these parameters: for map_id and the starting-point pixel coordinates (position.x / position.y), see Obtaining Map Identifiers and Coordinates.

6.1.28 Retrieve a Stored Map

In this example, get_map is used to retrieve stored map data by name, including the map path, resolution, dimensions, and map ID.

 1#!/usr/bin/env python3
 2
 3import rclpy
 4from rclpy.node import Node
 5from rclpy.executors import SingleThreadedExecutor
 6from aimdk_msgs.srv import GetStoredMapByName
 7from std_msgs.msg import Header
 8
 9
10class MapClient(Node):
11    def __init__(self):
12        super().__init__('get_map_client')
13        self.client = self.create_client(
14            GetStoredMapByName,
15            '/aimdk_5Fmsgs/srv/GetStoredMapByName'
16        )
17
18    def send_request(self, map_name):
19        while not self.client.wait_for_service(timeout_sec=1.0):
20            if not rclpy.ok():
21                self.get_logger().error("Interrupted while waiting for service.")
22                return
23            self.get_logger().info("Service not available, waiting again...")
24
25        request = GetStoredMapByName.Request()
26
27        request.header = Header()
28        request.header.stamp = self.get_clock().now().to_msg()
29        request.header.frame_id = ""
30
31        request.map_name = map_name
32
33        future = self.client.call_async(request)
34
35        executor = SingleThreadedExecutor()
36        executor.add_node(self)
37        executor.spin_until_future_complete(future, timeout_sec=60.0)
38        executor.remove_node(self)
39
40        if future.result() is None:
41            self.get_logger().error("Service call failed or timed out.")
42            return
43
44        result = future.result()
45
46        if result.code == 0:
47            self.get_logger().info("Service call succeeded")
48            self.get_logger().info(f"Map path: {result.map_path}")
49            self.get_logger().info(
50                f"Map resolution: {result.map_info.resolution}")
51            self.get_logger().info(f"Map width: {result.map_info.width}")
52            self.get_logger().info(f"Map height: {result.map_info.height}")
53            self.get_logger().info(f"Map id: {result.map_id}")
54        else:
55            self.get_logger().error(
56                f"Service call failed with code: {result.code}")
57
58
59def main(args=None):
60    rclpy.init(args=args)
61
62    client = MapClient()
63    try:
64        client.send_request("map_name")
65    except KeyboardInterrupt:
66        pass
67    finally:
68        if rclpy.ok():
69            rclpy.shutdown()
70
71
72if __name__ == '__main__':
73    main()

Usage

ros2 run py_examples get_map
[INFO] [get_map_client]: Service call succeeded
[INFO] [get_map_client]: Map path: /path/to/test_map.png
[INFO] [get_map_client]: Map resolution: 0.05
[INFO] [get_map_client]: Map width: 200
[INFO] [get_map_client]: Map height: 200
[INFO] [get_map_client]: Map id: 1

The example requests the map named test_map by default. Modify the argument to send_request in the code to request a different map.

How to obtain these parameters: the input is the map name (map_name), not map_id; to obtain map_name, see Obtaining Map Identifiers and Coordinates.

6.1.29 Play Linkcraft Action

In this example, play_linkcraft is used to list the LinkCraft actions available on the robot and select one for playback.

  1#!/usr/bin/env python3
  2
  3import rclpy
  4from rclpy.node import Node
  5from aimdk_msgs.srv import GetRobotResources, ExecuteActionResource
  6
  7
  8class RobotResourceClient(Node):
  9    def __init__(self):
 10        super().__init__('robot_resource_client')
 11
 12        self.get_resource_client = self.create_client(
 13            GetRobotResources,
 14            '/aimdk_5Fmsgs/srv/GetRobotResources'
 15        )
 16        self.play_resource_client = self.create_client(
 17            ExecuteActionResource,
 18            '/aimdk_5Fmsgs/srv/ExecuteActionResource'
 19        )
 20
 21        self.resources = []
 22
 23    def get_robot_resources(self):
 24        while not self.get_resource_client.wait_for_service(timeout_sec=1.0):
 25            if not rclpy.ok():
 26                self.get_logger().error("Interrupted while waiting for the service. Exiting.")
 27                return False
 28            self.get_logger().info("Service not available, waiting again...")
 29
 30        request = GetRobotResources.Request()
 31        future = self.get_resource_client.call_async(request)
 32        rclpy.spin_until_future_complete(self, future)
 33
 34        if future.result() is None:
 35            self.get_logger().error("Failed to call GetRobotResources")
 36            return False
 37
 38        response = future.result()
 39        self.resources = list(response.robot_resources)
 40        self.get_logger().info(f"Found {len(self.resources)} resources")
 41        return True
 42
 43    def print_resource_menu(self):
 44        if not self.resources:
 45            print("(no resources)")
 46            return
 47
 48        print("\n=== Available Robot Resources ===")
 49        for idx, resource in enumerate(self.resources):
 50            ver = resource.current_version
 51            print(f"[{idx}] {ver.name or '<no name>'}")
 52            print(f"     key:     {resource.resource_key}")
 53            print(f"     version: {ver.version}")
 54        print("=================================\n")
 55
 56    def pick_resource(self):
 57        if not self.resources:
 58            return None
 59
 60        while True:
 61            try:
 62                raw = input(
 63                    f"Select resource index [0-{len(self.resources) - 1}] (q to quit): ").strip()
 64            except EOFError:
 65                return None
 66            if raw.lower() in ("q", "quit", "exit"):
 67                return None
 68            if not raw.isdigit():
 69                print("Please enter a number.")
 70                continue
 71            idx = int(raw)
 72            if 0 <= idx < len(self.resources):
 73                return self.resources[idx]
 74            print(f"Index out of range (0-{len(self.resources) - 1}).")
 75
 76    def play_resource(self, resource):
 77        while not self.play_resource_client.wait_for_service(timeout_sec=1.0):
 78            if not rclpy.ok():
 79                self.get_logger().error("Interrupted while waiting for service.")
 80                return
 81            self.get_logger().info("Service not available, waiting...")
 82
 83        request = ExecuteActionResource.Request()
 84        request.resource_key = resource.resource_key
 85        request.resource_version = resource.current_version.version
 86
 87        if "onnx" in request.resource_key:
 88            request.meta = '{"resource_type": "BODY_MONTION"}'
 89        else:
 90            request.meta = '{"resource_type": "ARM_MONTION"}'
 91
 92        self.get_logger().info("Sending request:")
 93        self.get_logger().info(f"  resource_key: {request.resource_key}")
 94        self.get_logger().info(
 95            f"  resource_version: {request.resource_version}")
 96        self.get_logger().info(f"  meta: {request.meta}")
 97
 98        future = self.play_resource_client.call_async(request)
 99        rclpy.spin_until_future_complete(self, future)
100
101        if future.result() is not None:
102            self.handle_response(future.result())
103        else:
104            self.get_logger().error("Failed to call ExecuteActionResource")
105
106    def handle_response(self, response):
107        self.get_logger().info("Service response received:")
108        self.get_logger().info("Response Header:")
109        self.get_logger().info(
110            f"  stamp: {response.header.header.stamp.sec}.{response.header.header.stamp.nanosec:09}")
111        self.get_logger().info(f"  code: {response.header.header.code}")
112        self.get_logger().info("Status:")
113        self.get_logger().info(f"  value: {response.header.status.value}")
114        self.get_logger().info(f"  message: {response.header.message}")
115
116
117def main(args=None):
118    rclpy.init(args=args)
119    client_node = None
120    try:
121        client_node = RobotResourceClient()
122        if not client_node.get_robot_resources():
123            return
124        client_node.print_resource_menu()
125        chosen = client_node.pick_resource()
126        if chosen is None:
127            client_node.get_logger().info("No resource selected, exiting.")
128            return
129        client_node.play_resource(chosen)
130    except KeyboardInterrupt:
131        pass
132    finally:
133        if client_node:
134            client_node.destroy_node()
135        if rclpy.ok():
136            rclpy.shutdown()
137
138
139if __name__ == '__main__':
140    main()

Usage

ros2 run py_examples play_linkcraft
[INFO] [1772526701.308025739] [robot_resource_client]: Service call succeeded
[INFO] [1772526701.308253238] [robot_resource_client]: Robot Resources (0):
[INFO] [1772526701.308537895] [robot_resource_client]: Sending request:
[INFO] [1772526701.308692872] [robot_resource_client]:   resource_key: linkcraft_resource_onnx_01KBM2BHNFM93Z0DK6ES9F6DJA
[INFO] [1772526701.308853424] [robot_resource_client]:   resource_version: 0.0.1
[INFO] [1772526701.309014801] [robot_resource_client]:   meta: {"resource_type": "BODY_MONTION"}
[INFO] [1772526701.338307933] [robot_resource_client]: Service response received:
[INFO] [1772526701.338480169] [robot_resource_client]: Response Header:
[INFO] [1772526701.338647069] [robot_resource_client]:   stamp: 0.000000000
[INFO] [1772526701.338800125] [robot_resource_client]:   code: 0
[INFO] [1772526701.338951109] [robot_resource_client]: Status:
[INFO] [1772526701.339106459] [robot_resource_client]:   value: 1
[INFO] [1772526701.339239979] [robot_resource_client]:   message: 1/1 task(s) failed. Details: localhost: failed (status code: 400) - Failed with status code: 400

6.1.30 LED light strip control

Function description: Demonstrates how to control the robot’s LED light strip, supporting multiple display modes and custom colors.

Attention

Before using this feature, run the following command on the Motion Control Computing Unit (PC1) to stop the task_manager module and prevent it from competing with your program for LED strip control:

aima em stop-app task_manager

Core code:

  1#!/usr/bin/env python3
  2
  3import sys
  4import rclpy
  5import rclpy.logging
  6from rclpy.node import Node
  7
  8from aimdk_msgs.srv import SetPmuLed
  9
 10
 11class PlayLightsClient(Node):
 12    def __init__(self):
 13        super().__init__('play_lights_client')
 14
 15        # create service client
 16        self.client = self.create_client(
 17            SetPmuLed, '/aimdk_5Fmsgs/srv/SetPmuLed')
 18
 19        self.get_logger().info('✅ PlayLights client node created.')
 20
 21        # Wait for the service to become available
 22        while not self.client.wait_for_service(timeout_sec=2.0):
 23            self.get_logger().info('⏳ Service unavailable, waiting...')
 24
 25        self.get_logger().info('🟢 Service available, ready to send request.')
 26
 27    def send_request(self, led_mode, r, g, b, priority=6, reset_priority=False):
 28        """Send LED control request"""
 29        # create request
 30        request = SetPmuLed.Request()
 31        request.led_strip_mode = led_mode
 32        request.r = r
 33        request.g = g
 34        request.b = b
 35        request.priority = priority
 36        request.reset_priority = reset_priority
 37        request.trace_id = 'play_lights'
 38
 39        # send request
 40        # Note: LED strip is slow to response (up to ~5s)
 41        self.get_logger().info(
 42            f'📨 Sending request to control led strip: mode={led_mode}, RGB=({r}, {g}, {b}), priority={priority}, reset_priority={reset_priority}')
 43        for i in range(4):
 44            request.request.header.stamp = self.get_clock().now().to_msg()
 45            future = self.client.call_async(request)
 46            rclpy.spin_until_future_complete(self, future, timeout_sec=5)
 47
 48            if future.done():
 49                break
 50
 51            # retry as remote peer is NOT handled well by ROS
 52            self.get_logger().info(f'trying ... [{i}]')
 53
 54        response = future.result()
 55        if response is None:
 56            self.get_logger().error('❌ Service call not completed or timed out.')
 57            return False
 58
 59        if response.status_code == 0:
 60            self.get_logger().info('✅ LED strip command sent successfully.')
 61            return True
 62        else:
 63            self.get_logger().error(
 64                f'❌ LED strip command failed with status: {response.status_code}')
 65            return False
 66
 67
 68def main(args=None):
 69    rclpy.init(args=args)
 70    node = None
 71
 72    try:
 73        # get command line args
 74        if len(sys.argv) > 6:
 75            # use CLI args
 76            led_mode = int(sys.argv[1])
 77            if led_mode not in (0, 1, 2, 3):
 78                raise ValueError("invalid mode")
 79            r = int(sys.argv[2])
 80            if r < 0 or r > 255:
 81                raise ValueError("invalid R value")
 82            g = int(sys.argv[3])
 83            if g < 0 or g > 255:
 84                raise ValueError("invalid G value")
 85            b = int(sys.argv[4])
 86            if b < 0 or b > 255:
 87                raise ValueError("invalid B value")
 88            priority = int(sys.argv[5])
 89            if priority < 0:
 90                raise ValueError("invalid priority value")
 91            reset_priority = sys.argv[6].lower() in ('true', '1', 'yes')
 92        else:
 93            # interactive input
 94            print("=== LED strip control example ===")
 95            print("Select LED strip mode:")
 96            print("0 - Steady on")
 97            print("1 - Breathing (4s cycle, sine brightness)")
 98            print("2 - Blinking (1s cycle, 0.5s on, 0.5s off)")
 99            print("3 - Flowing (2s cycle, light up from left to right)")
100
101            led_mode = int(input("Enter mode (0-3): "))
102            if led_mode not in (0, 1, 2, 3):
103                raise ValueError("invalid mode")
104
105            print("\nSet RGB color values (0-255):")
106            r = int(input("Red (R): "))
107            if r < 0 or r > 255:
108                raise ValueError("invalid R value")
109            g = int(input("Green (G): "))
110            if g < 0 or g > 255:
111                raise ValueError("invalid G value")
112            b = int(input("Blue (B): "))
113            if b < 0 or b > 255:
114                raise ValueError("invalid B value")
115
116            print("\nSet priority (higher value = higher priority, default=6):")
117            priority = int(input("Priority: "))
118            if priority < 0:
119                raise ValueError("invalid priority value")
120
121            reset_priority = input(
122                "Reset priority after command? (y/n, default=n): ").strip().lower() == 'y'
123
124        node = PlayLightsClient()
125        node.send_request(led_mode, r, g, b, priority, reset_priority)
126    except KeyboardInterrupt:
127        pass
128    except Exception as e:
129        rclpy.logging.get_logger('main').error(
130            f'Program exited with exception: {e}')
131
132    if node:
133        node.destroy_node()
134    if rclpy.ok():
135        rclpy.shutdown()
136
137
138if __name__ == '__main__':
139    main()

Usage instructions:

# Build
colcon build --packages-select py_examples

# Run interactively
ros2 run py_examples play_lights

# Run with command-line parameters: <mode> <R> <G> <B> <priority> <reset_priority(true/false)>
ros2 run py_examples play_lights 1 255 0 0 6 false  # Mode 1, red, priority 6

Example output:

=== LED strip control example ===
Select LED strip mode:
0 - Steady on
1 - Breathing (4s cycle, sine brightness)
2 - Blinking (1s cycle, 0.5s on, 0.5s off)
3 - Flowing (2s cycle, light up from left to right)
Enter mode (0-3): 1

Set RGB color values (0-255):
Red (R): 255
Green (G): 0
Blue (B): 0

Set priority (higher value = higher priority, default=6):
Priority: 6
Reset priority after command? (y/n, default=n): n

[INFO] [play_lights_client]: 📨 Sending request to control led strip: mode=1, RGB=(255, 0, 0), priority=6, reset_priority=False
[INFO] [play_lights_client]: ✅ LED strip command sent successfully.

Technical features:

  • Supports four LED display modes

  • Custom RGB color configuration

  • Priority control (higher value = higher priority)

  • Priority reset support

  • Synchronous service invocation

  • Command-line parameter support

  • Input parameter validation

  • User-friendly interactive interface