6.1 Python interface usage example
This chapter will guide you through implementing several features shown in the index
Build & Run Instructions
Change into the
aimdkdirectory and run the following commandssource /opt/ros/humble/setup.bash colcon build source install/setup.bash ros2 run py_examples '<corresponding feature name e.g.: get_mc_action>'
📝 Code explanation
The complete code implementation includes full error handling, signal handling, timeout handling, and other mechanisms to ensure program robustness. Please view or modify the code in the py_examples directory
Caution
As standard ROS DO NOT handle cross-host service (request-response) well, please refer to SDK examples to use open interfaces in a robust way (with protection mechanisms e.g. exception safety and retransmission)
6.1.1 Get robot mode
Retrieve the robot’s current operating mode by calling the GetMcAction service, including the description, and status information.
1#!/usr/bin/env python3
2
3import rclpy
4import rclpy.logging
5from rclpy.node import Node
6
7from aimdk_msgs.srv import GetMcAction
8from aimdk_msgs.msg import CommonRequest
9
10
11class GetMcActionClient(Node):
12 def __init__(self):
13 super().__init__('get_mc_action_client')
14 self.client = self.create_client(
15 GetMcAction, '/aimdk_5Fmsgs/srv/GetMcAction')
16 self.get_logger().info('✅ GetMcAction client node created.')
17
18 # Wait for the service to become available
19 while not self.client.wait_for_service(timeout_sec=2.0):
20 self.get_logger().info('⏳ Service unavailable, waiting...')
21
22 self.get_logger().info('🟢 Service available, ready to send request.')
23
24 def send_request(self):
25 request = GetMcAction.Request()
26 request.request = CommonRequest()
27
28 self.get_logger().info('📨 Sending request to get robot mode')
29 for i in range(8):
30 request.request.header.stamp = self.get_clock().now().to_msg()
31 future = self.client.call_async(request)
32 rclpy.spin_until_future_complete(self, future, timeout_sec=0.25)
33
34 if future.done():
35 break
36
37 # retry as remote peer is NOT handled well by ROS
38 self.get_logger().info(f'trying ... [{i}]')
39
40 response = future.result()
41 if response is None:
42 self.get_logger().error('❌ Service call failed or timed out.')
43 return
44
45 self.get_logger().info('✅ Robot mode get successfully.')
46 self.get_logger().info(f'Mode name: {response.info.action_desc}')
47 self.get_logger().info(f'Mode status: {response.info.status.value}')
48
49
50def main(args=None):
51 rclpy.init(args=args)
52 node = None
53 try:
54 node = GetMcActionClient()
55 node.send_request()
56 except KeyboardInterrupt:
57 pass
58 except Exception as e:
59 rclpy.logging.get_logger('main').error(
60 f'Program exited with exception: {e}')
61
62 if node:
63 node.destroy_node()
64 if rclpy.ok():
65 rclpy.shutdown()
66
67
68if __name__ == '__main__':
69 main()
6.1.2 Set robot mode
This example uses the SetMcAction service. After running the node, enter the corresponding field value of the mode in the terminal, and the robot will immediately switch to the appropriate motion mode.
Before switching to the Stable Standing mode (STAND_DEFAULT), ensure the robot is standing and its feet are already on the ground.
The motion mode switching must follow its state transition digram, other transitions would be rejected
Locomotion Mode(LOCOMOTION_DEFAULT) and Stable Standing Mode(STAND_DEFAULT) are unified and will auto switch internally, so switching manually to the nearer one is enough
1#!/usr/bin/env python3
2
3import sys
4import rclpy
5import rclpy.logging
6from rclpy.node import Node
7
8from aimdk_msgs.srv import SetMcAction
9from aimdk_msgs.msg import RequestHeader, CommonState, McAction, McActionCommand
10
11
12class SetMcActionClient(Node):
13 def __init__(self):
14 super().__init__('set_mc_action_client')
15 self.client = self.create_client(
16 SetMcAction, '/aimdk_5Fmsgs/srv/SetMcAction'
17 )
18 self.get_logger().info('✅ SetMcAction client node created.')
19
20 # Wait for the service to become available
21 while not self.client.wait_for_service(timeout_sec=2.0):
22 self.get_logger().info('⏳ Service unavailable, waiting...')
23
24 self.get_logger().info('🟢 Service available, ready to send request.')
25
26 def send_request(self, action_name: str):
27 req = SetMcAction.Request()
28 req.header = RequestHeader()
29
30 cmd = McActionCommand()
31 cmd.action_desc = action_name
32 req.command = cmd
33
34 self.get_logger().info(
35 f'📨 Sending request to set robot mode: {action_name}')
36 for i in range(8):
37 req.header.stamp = self.get_clock().now().to_msg()
38 future = self.client.call_async(req)
39 rclpy.spin_until_future_complete(self, future, timeout_sec=0.25)
40
41 if future.done():
42 break
43
44 # retry as remote peer is NOT handled well by ROS
45 self.get_logger().info(f'trying ... [{i}]')
46
47 response = future.result()
48 if response is None:
49 self.get_logger().error('❌ Service call failed or timed out.')
50 return
51
52 if response.response.status.value == CommonState.SUCCESS:
53 self.get_logger().info('✅ Robot mode set successfully.')
54 else:
55 self.get_logger().error(
56 f'❌ Failed to set robot mode: {response.response.message}'
57 )
58
59
60def main(args=None):
61 action_info = {
62 'PASSIVE_DEFAULT': ('PD', 'joints with zero torque'),
63 'DAMPING_DEFAULT': ('DD', 'joints in damping mode'),
64 'JOINT_DEFAULT': ('JD', 'Position Control Stand (joints locked)'),
65 'STAND_DEFAULT': ('SD', 'Stable Stand (auto-balance)'),
66 'LOCOMOTION_DEFAULT': ('LD', 'locomotion mode (walk or run)'),
67 }
68
69 choices = {}
70 for k, v in action_info.items():
71 choices[v[0]] = k
72
73 rclpy.init(args=args)
74 node = None
75 try:
76 # Prefer command-line argument, otherwise prompt for input
77 if len(sys.argv) > 1:
78 motion = sys.argv[1]
79 else:
80 print('{:<4} - {:<20} : {}'.format('abbr',
81 'robot mode', 'description'))
82 for k, v in action_info.items():
83 print(f'{v[0]:<4} - {k:<20} : {v[1]}')
84 motion = input('Enter abbr of robot mode:')
85
86 action_name = choices.get(motion)
87 if not action_name:
88 raise ValueError(f'Invalid abbr of robot mode: {motion}')
89
90 node = SetMcActionClient()
91 node.send_request(action_name)
92 except KeyboardInterrupt:
93 pass
94 except Exception as e:
95 rclpy.logging.get_logger('main').error(
96 f'Program exited with exception: {e}')
97
98 if node:
99 node.destroy_node()
100 if rclpy.ok():
101 rclpy.shutdown()
102
103
104if __name__ == '__main__':
105 main()
Usage
# Use command-line arguments to set the mode (recommended)
ros2 run py_examples set_mc_action JD # Zero-Torque >> Position-Control Standing
ros2 run py_examples set_mc_action SD # Ensure your robot's feet on the ground, Position-Control Standing >> Stable Standing
# Stable Standing >> Locomotion Mode. auto done internally, don't switch manually
# Or run without arguments and the program will prompt for input
ros2 run py_examples set_mc_action
Example output
...
[INFO] [1764066567.502968540] [set_mc_action_client]: ✅ Robot mode set successfully.
Notes
Ensure the robot is standing and its feet are on the ground before switching to
STAND_DEFAULTmodeMode switching may take several seconds to complete
Interface reference
Service:
/aimdk_5Fmsgs/srv/SetMcActionMessage:
aimdk_msgs/srv/SetMcAction
6.1.3 Set robot action
This example uses preset_motion_client; after switching to Stable Stand Mode and starting the node, enter the corresponding field values to perform preset actions with the left (or right) hand such as handshake, raise hand, wave, or air kiss.
Available parameters are listed in the preset motions table
1#!/usr/bin/env python3
2
3import rclpy
4import rclpy.logging
5from rclpy.node import Node
6
7from aimdk_msgs.srv import SetMcPresetMotion
8from aimdk_msgs.msg import McPresetMotion, McControlArea, RequestHeader, CommonState
9
10
11class SetMcPresetMotionClient(Node):
12 def __init__(self):
13 super().__init__('preset_motion_client')
14 self.client = self.create_client(
15 SetMcPresetMotion, '/aimdk_5Fmsgs/srv/SetMcPresetMotion')
16 self.get_logger().info('✅ SetMcPresetMotion client node created.')
17
18 # Wait for the service to become available
19 while not self.client.wait_for_service(timeout_sec=2.0):
20 self.get_logger().info('⏳ Service unavailable, waiting...')
21
22 self.get_logger().info('🟢 Service available, ready to send request.')
23
24 def send_request(self, area_id: int, motion_id: int) -> bool:
25 request = SetMcPresetMotion.Request()
26 request.header = RequestHeader()
27
28 motion = McPresetMotion()
29 area = McControlArea()
30
31 motion.value = motion_id
32 area.value = area_id
33
34 request.motion = motion
35 request.area = area
36 request.interrupt = False
37
38 self.get_logger().info(
39 f'📨 Sending request to set preset motion: motion={motion_id}, area={area_id}')
40
41 for i in range(8):
42 request.header.stamp = self.get_clock().now().to_msg()
43 future = self.client.call_async(request)
44 rclpy.spin_until_future_complete(self, future, timeout_sec=0.25)
45
46 if future.done():
47 break
48
49 # retry as remote peer is NOT handled well by ROS
50 self.get_logger().info(f'trying ... [{i}]')
51
52 response = future.result()
53 if response is None:
54 self.get_logger().error('❌ Service call failed or timed out.')
55 return False
56
57 if response.response.state.value == CommonState.SUCCESS:
58 self.get_logger().info(
59 f'✅ Preset motion set successfully: {response.response.task_id}')
60 return True
61 elif response.response.state.value == CommonState.RUNNING:
62 self.get_logger().info(
63 f'⏳ Preset motion executing: {response.response.task_id}')
64 return True
65 else:
66 self.get_logger().error(
67 f'❌ Failed to set preset motion: {response.response.task_id}'
68 )
69 return False
70
71
72def main(args=None):
73 rclpy.init(args=args)
74 node = None
75 try:
76 area = int(input("Enter arm area ID (1-left, 2-right): "))
77 motion = int(input(
78 "Enter preset motion ID (1001-raise,1002-wave,1003-handshake,1004-airkiss): "))
79
80 node = SetMcPresetMotionClient()
81 node.send_request(area, motion)
82 except KeyboardInterrupt:
83 pass
84 except Exception as e:
85 rclpy.logging.get_logger('main').error(
86 f'Program exited with exception: {e}')
87
88 if node:
89 node.destroy_node()
90 if rclpy.ok():
91 rclpy.shutdown()
92
93
94if __name__ == '__main__':
95 main()
6.1.4 Gripper control
This example uses hand_control. Publish messages to the /aima/hal/joint/hand/command topic to control the gripper.
Attention
Note ⚠️: Before running this example, stop the native MC module on the robot control unit (PC1) by running aima em stop-app mc to obtain control of the robot. Ensure the robot is safe before operating.
1import rclpy
2from rclpy.node import Node
3from aimdk_msgs.msg import HandCommandArray, HandCommand, HandType, MessageHeader
4import time
5
6
7class HandControl(Node):
8 def __init__(self):
9 super().__init__('hand_control')
10
11 # Preset parameter list: [(left hand, right hand), ...]
12 self.position_pairs = [
13 (1.0, 1.0), # fully open
14 (0.0, 0.0), # fully closed
15 (0.5, 0.5), # half open
16 (0.2, 0.8), # left slightly closed, right more open
17 (0.7, 0.3) # left more open, right slightly closed
18 ]
19 self.current_index = 0
20 self.last_switch_time = self.get_clock().now().nanoseconds / 1e9 # seconds
21
22 # Create publisher
23 self.publisher_ = self.create_publisher(
24 HandCommandArray,
25 '/aima/hal/joint/hand/command',
26 10
27 )
28
29 # 50 Hz timer
30 self.timer_ = self.create_timer(
31 0.02, # 20 ms = 50 Hz
32 self.publish_hand_commands
33 )
34
35 self.get_logger().info("Hand control node started!")
36
37 def publish_hand_commands(self):
38 # Check time to decide whether to switch to the next preset
39 now_sec = self.get_clock().now().nanoseconds / 1e9
40 if now_sec - self.last_switch_time >= 2.0:
41 self.current_index = (self.current_index +
42 1) % len(self.position_pairs)
43 self.last_switch_time = now_sec
44 self.get_logger().info(
45 f"Switched to preset: {self.current_index}, left={self.position_pairs[self.current_index][0]:.2f}, right={self.position_pairs[self.current_index][1]:.2f}"
46 )
47
48 # Use current preset
49 left_position, right_position = self.position_pairs[self.current_index]
50
51 msg = HandCommandArray()
52 msg.header = MessageHeader()
53
54 # Configure left hand
55 left_hand = HandCommand()
56 left_hand.name = "left_hand"
57 left_hand.position = float(left_position)
58 left_hand.velocity = 1.0
59 left_hand.acceleration = 1.0
60 left_hand.deceleration = 1.0
61 left_hand.effort = 1.0
62
63 # Configure right hand
64 right_hand = HandCommand()
65 right_hand.name = "right_hand"
66 right_hand.position = float(right_position)
67 right_hand.velocity = 1.0
68 right_hand.acceleration = 1.0
69 right_hand.deceleration = 1.0
70 right_hand.effort = 1.0
71
72 msg.left_hand_type = HandType(value=2) # gripper mode
73 msg.right_hand_type = HandType(value=2)
74 msg.left_hands = [left_hand]
75 msg.right_hands = [right_hand]
76
77 # Publish message
78 self.publisher_.publish(msg)
79 # We only log when switching presets to avoid too much log output
80
81
82def main(args=None):
83 rclpy.init(args=args)
84 hand_control_node = HandControl()
85
86 try:
87 rclpy.spin(hand_control_node)
88 except KeyboardInterrupt:
89 pass
90 finally:
91 hand_control_node.destroy_node()
92 rclpy.shutdown()
93
94
95if __name__ == '__main__':
96 main()
6.1.5 Dexterous hand control (under upgrade, not available)
This example uses omnihand_control. Publish messages to the /aima/hal/joint/hand/command topic to control the gripper.
Attention
Note ⚠️: Before running this example, stop the native MC module on the robot control unit (PC1) by running aima em stop-app mc to obtain control of the robot. Ensure the robot is safe before operating.
1#!/usr/bin/env python3
2import rclpy
3from rclpy.node import Node
4from aimdk_msgs.msg import HandCommand, HandCommandArray, HandType, MessageHeader
5
6
7class OmnihandControl(Node):
8 def __init__(self):
9 super().__init__('omnihand_control')
10
11 self.num_fingers = 5
12 self.joints_per_finger = 2
13 self.total_joints = self.num_fingers * self.joints_per_finger # 10
14
15 # ===== Define 10 gesture sets directly as arrays =====
16 # Each set has 20 joint params (left 10 + right 10)
17 # 1 = bent, 0 = straight
18 self.gesture_sequence = [
19 # bend 1 finger
20 [1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0,
21 1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
22 # bend 2 fingers
23 [1.0, 1.0, 1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0,
24 1.0, 1.0, 1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0],
25 # bend 3 fingers
26 [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0,
27 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0],
28 # bend 4 fingers
29 [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0, 0.0,
30 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0, 0.0],
31 # bend 5 fingers
32 [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0,
33 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0],
34 # release 1 finger
35 [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0, 0.0,
36 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0, 0.0],
37 # release 2 fingers
38 [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0,
39 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0],
40 # release 3 fingers
41 [1.0, 1.0, 1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0,
42 1.0, 1.0, 1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0],
43 # release 4 fingers
44 [1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0,
45 1.0, 1.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0],
46 # all straight
47 [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0,
48 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
49 ]
50
51 self.current_index = 0
52 self.last_switch_time = self.get_clock().now()
53
54 self.publisher = self.create_publisher(
55 HandCommandArray,
56 '/aima/hal/joint/hand/command',
57 10
58 )
59
60 # Switch gesture every second
61 self.timer = self.create_timer(
62 1.0,
63 self.publish_hand_commands
64 )
65
66 self.get_logger().info("Omnihand demo control node started!")
67
68 def publish_hand_commands(self):
69 now_time = self.get_clock().now()
70 if (now_time - self.last_switch_time).nanoseconds / 1e9 >= 1.0:
71 self.current_index = (self.current_index +
72 1) % len(self.gesture_sequence)
73 self.last_switch_time = now_time
74 self.get_logger().info(
75 f'Switched to gesture {self.current_index + 1}/10')
76
77 joint_array = self.gesture_sequence[self.current_index]
78
79 msg = HandCommandArray()
80 msg.header = MessageHeader()
81
82 # Left hand
83 for i in range(self.total_joints):
84 cmd = HandCommand()
85 cmd.name = f"left_hand_joint{i}"
86 cmd.position = joint_array[i]
87 cmd.velocity = 1.0
88 cmd.acceleration = 1.0
89 cmd.deceleration = 1.0
90 cmd.effort = 1.0
91 msg.left_hands.append(cmd)
92
93 # Right hand
94 for i in range(self.total_joints):
95 cmd = HandCommand()
96 cmd.name = f"right_hand_joint{i}"
97 cmd.position = joint_array[self.total_joints + i]
98 cmd.velocity = 1.0
99 cmd.acceleration = 1.0
100 cmd.deceleration = 1.0
101 cmd.effort = 1.0
102 msg.right_hands.append(cmd)
103
104 msg.left_hand_type = HandType()
105 msg.left_hand_type.value = 1 # dexterous hand mode
106 msg.right_hand_type = HandType()
107 msg.right_hand_type.value = 1
108
109 self.publisher.publish(msg)
110
111
112def main(args=None):
113 rclpy.init(args=args)
114 node = OmnihandControl()
115 try:
116 rclpy.spin(node)
117 except KeyboardInterrupt:
118 pass
119 node.destroy_node()
120 rclpy.shutdown()
121
122
123if __name__ == '__main__':
124 main()
6.1.6 Register custom input source
For versions after v0.7, you must register an input source before controlling the MC. This example registers a custom input source via the /aimdk_5Fmsgs/srv/SetMcInputSource service so MC becomes aware of it; only registered input sources can perform robot velocity control.
1#!/usr/bin/env python3
2
3import rclpy
4import rclpy.logging
5from rclpy.node import Node
6
7from aimdk_msgs.srv import SetMcInputSource
8from aimdk_msgs.msg import RequestHeader, McInputAction
9
10
11class McInputClient(Node):
12 def __init__(self):
13 super().__init__('set_mc_input_source_client')
14 self.client = self.create_client(
15 SetMcInputSource, '/aimdk_5Fmsgs/srv/SetMcInputSource'
16 )
17
18 self.get_logger().info('✅ SetMcInputSource client node created.')
19
20 # Wait for the service to become available
21 while not self.client.wait_for_service(timeout_sec=2.0):
22 self.get_logger().info('⏳ Service unavailable, waiting...')
23
24 self.get_logger().info('🟢 Service available, ready to send request.')
25
26 def send_request(self):
27 req = SetMcInputSource.Request()
28
29 # header
30 req.request.header = RequestHeader()
31
32 # action (e.g. 1001 = register)
33 req.action = McInputAction()
34 req.action.value = 1001
35
36 # input source info
37 req.input_source.name = 'node'
38 req.input_source.priority = 40
39 req.input_source.timeout = 1000 # ms
40
41 # Send request and wait for response
42 self.get_logger().info(
43 f'📨 Sending input source request: action_id={req.action.value}, '
44 f'name={req.input_source.name}, priority={req.input_source.priority}'
45 )
46 for i in range(8):
47 req.request.header.stamp = self.get_clock().now().to_msg()
48 future = self.client.call_async(req)
49 rclpy.spin_until_future_complete(
50 self, future, timeout_sec=0.25)
51
52 if future.done():
53 break
54
55 # retry as remote peer is NOT handled well by ROS
56 self.get_logger().info(f'trying ... [{i}]')
57
58 if not future.done():
59 self.get_logger().error('❌ Service call failed or timed out.')
60 return False
61
62 response = future.result()
63 ret_code = response.response.header.code
64 task_id = response.response.task_id
65
66 if ret_code == 0:
67 self.get_logger().info(
68 f'✅ Input source set successfully. task_id={task_id}'
69 )
70 return True
71 else:
72 self.get_logger().error(
73 f'❌ Input source set failed. ret_code={ret_code}, task_id={task_id} (duplicated ADD? or MODIFY/ENABLE/DISABLE for unknown source?)'
74 )
75 return False
76
77
78def main(args=None):
79 rclpy.init(args=args)
80
81 node = None
82 try:
83 node = McInputClient()
84 ok = node.send_request()
85 if not ok:
86 node.get_logger().error('Input source request failed.')
87 except KeyboardInterrupt:
88 pass
89 except Exception as e:
90 rclpy.logging.get_logger('main').error(
91 f'Program exited with exception: {e}')
92
93 if node:
94 node.destroy_node()
95 if rclpy.ok():
96 rclpy.shutdown()
97
98
99if __name__ == '__main__':
100 main()
6.1.7 Get current input source
This example uses the GetCurrentInputSource service to retrieve information about the currently registered input source, including name, priority, and timeout.
1#!/usr/bin/env python3
2
3import rclpy
4from rclpy.node import Node
5from aimdk_msgs.srv import GetCurrentInputSource
6from aimdk_msgs.msg import CommonRequest
7
8
9class GetCurrentInputSourceClient(Node):
10 def __init__(self):
11 super().__init__('get_current_input_source_client')
12 self.client = self.create_client(
13 GetCurrentInputSource,
14 '/aimdk_5Fmsgs/srv/GetCurrentInputSource'
15 )
16
17 self.get_logger().info('✅ GetCurrentInputSource client node created.')
18
19 # Wait for the service to become available
20 while not self.client.wait_for_service(timeout_sec=2.0):
21 self.get_logger().info('⏳ Service unavailable, waiting...')
22
23 self.get_logger().info('🟢 Service available, ready to send request.')
24
25 def send_request(self):
26 # Create request
27 req = GetCurrentInputSource.Request()
28 req.request = CommonRequest()
29
30 # Send request and wait for response
31 self.get_logger().info('📨 Sending request to get current input source')
32 for i in range(8):
33 req.request.header.stamp = self.get_clock().now().to_msg()
34 future = self.client.call_async(req)
35 rclpy.spin_until_future_complete(self, future, timeout_sec=0.25)
36
37 if future.done():
38 break
39
40 # retry as remote peer is NOT handled well by ROS
41 self.get_logger().info(f'trying ... [{i}]')
42
43 if not future.done():
44 self.get_logger().error('❌ Service call failed or timed out.')
45 return False
46
47 response = future.result()
48 ret_code = response.response.header.code
49 if ret_code == 0:
50 self.get_logger().info(
51 f'✅ Current input source get successfully:')
52 self.get_logger().info(
53 f'Name: {response.input_source.name}')
54 self.get_logger().info(
55 f'Priority: {response.input_source.priority}')
56 self.get_logger().info(
57 f'Timeout: {response.input_source.timeout}')
58 return True
59 else:
60 self.get_logger().error(
61 f'❌ Current input source get failed, return code: {ret_code}')
62 return False
63
64
65def main(args=None):
66 rclpy.init(args=args)
67
68 node = None
69 try:
70 node = GetCurrentInputSourceClient()
71 success = node.send_request()
72 except KeyboardInterrupt:
73 pass
74 except Exception as e:
75 rclpy.logging.get_logger('main').error(
76 f'Program exited with exception: {e}')
77
78 if node:
79 node.destroy_node()
80 if rclpy.ok():
81 rclpy.shutdown()
82
83
84if __name__ == '__main__':
85 main()
Usage
# Get current input source info
ros2 run py_examples get_current_input_source
Example output
[INFO] [get_current_input_source_client]: Current input source: node
[INFO] [get_current_input_source_client]: Priority: 40
[INFO] [get_current_input_source_client]: Timeout: 1000
Notes
Ensure the GetCurrentInputSource service is running properly
Valid information can only be retrieved after registering an input source
A status code of 0 indicates the query succeeded
6.1.8 Control robot locomotion
This example uses mc_locomotion_velocity. The example below controls robot walking by publishing to the /aima/mc/locomotion/velocity topic. For versions after v0.7, you must register an input source before enabling velocity control (this example already registers an input source); see the code for registration steps.
进入稳定站立模式后执行本程序
1#!/usr/bin/env python3
2
3import rclpy
4from rclpy.node import Node
5import time
6import signal
7import sys
8
9from aimdk_msgs.msg import McLocomotionVelocity, MessageHeader
10from aimdk_msgs.srv import SetMcInputSource
11
12
13class DirectVelocityControl(Node):
14 def __init__(self):
15 super().__init__('direct_velocity_control')
16
17 self.publisher = self.create_publisher(
18 McLocomotionVelocity, '/aima/mc/locomotion/velocity', 10)
19 self.client = self.create_client(
20 SetMcInputSource, '/aimdk_5Fmsgs/srv/SetMcInputSource')
21
22 self.forward_velocity = 0.0
23 self.lateral_velocity = 0.0
24 self.angular_velocity = 0.0
25
26 self.max_forward_speed = 1.0
27 self.min_forward_speed = 0.2
28
29 self.max_lateral_speed = 1.0
30 self.min_lateral_speed = 0.2
31
32 self.max_angular_speed = 1.0
33 self.min_angular_speed = 0.1
34
35 self.timer = None
36
37 self.get_logger().info("Direct velocity control node started!")
38
39 def start_publish(self):
40 if not self.timer:
41 self.timer = self.create_timer(0.02, self.publish_velocity)
42
43 def register_input_source(self):
44 self.get_logger().info("Registering input source...")
45
46 timeout_sec = 8.0
47 start = self.get_clock().now().nanoseconds / 1e9
48
49 while not self.client.wait_for_service(timeout_sec=2.0):
50 now = self.get_clock().now().nanoseconds / 1e9
51 if now - start > timeout_sec:
52 self.get_logger().error("Waiting for service timed out")
53 return False
54 self.get_logger().info("Waiting for input source service...")
55
56 req = SetMcInputSource.Request()
57 req.action.value = 1001
58 req.input_source.name = "node"
59 req.input_source.priority = 40
60 req.input_source.timeout = 1000
61
62 for i in range(8):
63 req.request.header.stamp = self.get_clock().now().to_msg()
64 future = self.client.call_async(req)
65 rclpy.spin_until_future_complete(self, future, timeout_sec=0.25)
66
67 if future.done():
68 break
69
70 # retry as remote peer is NOT handled well by ROS
71 self.get_logger().info(f"trying to register input source... [{i}]")
72
73 if future.done():
74 try:
75 response = future.result()
76 state = response.response.state.value
77 self.get_logger().info(
78 f"Input source set successfully: state={state}, task_id={response.response.task_id}")
79 return True
80 except Exception as e:
81 self.get_logger().error(f"Service call exception: {str(e)}")
82 return False
83 else:
84 self.get_logger().error("Service call failed or timed out")
85 return False
86
87 def publish_velocity(self):
88 msg = McLocomotionVelocity()
89 msg.header = MessageHeader()
90 msg.header.stamp = self.get_clock().now().to_msg()
91 msg.source = "node"
92 msg.forward_velocity = self.forward_velocity
93 msg.lateral_velocity = self.lateral_velocity
94 msg.angular_velocity = self.angular_velocity
95
96 self.publisher.publish(msg)
97
98 self.get_logger().info(
99 f"Publishing velocity: forward {self.forward_velocity:.2f} m/s, "
100 f"lateral {self.lateral_velocity:.2f} m/s, "
101 f"angular {self.angular_velocity:.2f} rad/s"
102 )
103
104 def set_forward(self, forward):
105 # check value range, mc has thresholds to start movement
106 if abs(forward) < 0.005:
107 self.forward_velocity = 0.0
108 return True
109 elif abs(forward) > self.max_forward_speed or abs(forward) < self.min_forward_speed:
110 raise ValueError("out of range")
111 else:
112 self.forward_velocity = forward
113 return True
114
115 def set_lateral(self, lateral):
116 # check value range, mc has thresholds to start movement
117 if abs(lateral) < 0.005:
118 self.lateral_velocity = 0.0
119 return True
120 elif abs(lateral) > self.max_lateral_speed or abs(lateral) < self.min_lateral_speed:
121 raise ValueError("out of range")
122 else:
123 self.lateral_velocity = lateral
124 return True
125
126 def set_angular(self, angular):
127 # check value range, mc has thresholds to start movement
128 if abs(angular) < 0.005:
129 self.angular_velocity = 0.0
130 return True
131 elif abs(angular) > self.max_angular_speed or abs(angular) < self.min_angular_speed:
132 raise ValueError("out of range")
133 else:
134 self.angular_velocity = angular
135 return True
136
137 def clear_velocity(self):
138 self.forward_velocity = 0.0
139 self.lateral_velocity = 0.0
140 self.angular_velocity = 0.0
141
142
143# Global node instance for signal handling
144global_node = None
145
146
147def signal_handler(sig, frame):
148 global global_node
149 if global_node is not None:
150 global_node.clear_velocity()
151 global_node.get_logger().info(
152 f"Received signal {sig}, clearing velocity and shutting down")
153 rclpy.shutdown()
154 sys.exit(0)
155
156
157def main():
158 global global_node
159 rclpy.init()
160
161 node = DirectVelocityControl()
162 global_node = node
163
164 signal.signal(signal.SIGINT, signal_handler)
165 signal.signal(signal.SIGTERM, signal_handler)
166
167 if not node.register_input_source():
168 node.get_logger().error("Input source registration failed, exiting")
169 rclpy.shutdown()
170 return
171
172 # get and check control values
173 # notice that mc has thresholds to start movement
174 try:
175 # get input forward
176 forward = float(
177 input("Please enter forward velocity 0 or ±(0.2 ~ 1.0) m/s: "))
178 node.set_forward(forward)
179 # get input lateral
180 lateral = float(
181 input("Please enter lateral velocity 0 or ±(0.2 ~ 1.0) m/s: "))
182 node.set_lateral(lateral)
183 # get input angular
184 angular = float(
185 input("Please enter angular velocity 0 or ±(0.1 ~ 1.0) rad/s: "))
186 node.set_angular(angular)
187 except Exception as e:
188 node.get_logger().error(f"Invalid input: {e}")
189 rclpy.shutdown()
190 return
191
192 node.get_logger().info("Setting velocity, moving for 5 seconds")
193 node.start_publish()
194
195 start = node.get_clock().now()
196 while (node.get_clock().now() - start).nanoseconds / 1e9 < 5.0:
197 rclpy.spin_once(node, timeout_sec=0.1)
198 time.sleep(0.001)
199
200 node.clear_velocity()
201 node.get_logger().info("5-second motion finished, robot stopped")
202
203 rclpy.spin(node)
204 rclpy.shutdown()
205
206
207if __name__ == '__main__':
208 main()
6.1.9 Joint motor control
This example demonstrates how to use ROS2 and the Ruckig library to control robot joint motion.
Attention
Note ⚠️: Before running this example, stop the native MC module on the robot control unit (PC1) by running aima em stop-app mc to obtain control of the robot. Ensure the robot is safe before operating.
! This example directly controls low-level motors (the HAL layer). Before running, verify that the joint safety limits in the code match your robot and ensure safety!
Robot joint control example
This example shows how to use ROS2 and the Ruckig library to control robot joints. The example implements the following features:
Robot joint model definition
Trajectory interpolation using Ruckig
Multi-joint coordinated control
Real-time control of position, velocity, and acceleration
Example feature description
Creates four controller nodes, which control:
Legs x2 (12 joints)
Waist x1 (3 joints)
Arms x2 (14 joints)
Head x1 (2 joints)
Demo features:
Oscillate a specified joint between ±0.5 radians every 10 seconds
Use the Ruckig library to generate smooth motion trajectories
Publish joint control commands in real time
Custom usage
Add new control logic:
Add new control callback functions
Compose joint motions freely
Adjust control frequency:
Modify the
control_timer_period (currently 2 ms)
1#!/usr/bin/env python3
2"""
3Robot joint control example
4This script implements a ROS2-based robot joint control system, using the Ruckig trajectory
5planner to achieve smooth joint motion control.
6
7Main features:
81. Supports controlling multiple joint areas (head, arm, waist, leg)
92. Uses Ruckig for trajectory planning to ensure smooth motion
103. Supports real-time control of joint position, velocity, and acceleration
114. Provides joint limit and PID (stiffness/damping) parameter configuration
12"""
13
14import rclpy
15from rclpy.node import Node
16from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy, DurabilityPolicy
17from aimdk_msgs.msg import JointCommandArray, JointStateArray, JointCommand
18from std_msgs.msg import Header
19import ruckig
20from enum import Enum
21from dataclasses import dataclass
22from typing import List, Dict
23from threading import Lock
24
25# QoS config: define ROS2 Quality of Service parameters
26# Subscriber QoS: best-effort reliability, keep last 10 messages
27subscriber_qos = QoSProfile(
28 reliability=ReliabilityPolicy.BEST_EFFORT,
29 history=HistoryPolicy.KEEP_LAST,
30 depth=10,
31 durability=DurabilityPolicy.VOLATILE
32)
33
34# Publisher QoS: reliable transport, keep last 10 messages
35publisher_qos = QoSProfile(
36 reliability=ReliabilityPolicy.RELIABLE,
37 history=HistoryPolicy.KEEP_LAST,
38 depth=10,
39 durability=DurabilityPolicy.VOLATILE
40)
41
42
43class JointArea(Enum):
44 HEAD = 'HEAD' # Head joints
45 ARM = 'ARM' # Arm joints
46 WAIST = 'WAIST' # Waist joints
47 LEG = 'LEG' # Leg joints
48
49
50@dataclass
51class JointInfo:
52 # Joint information data class
53 name: str # Joint name
54 lower_limit: float # Joint lower angle limit
55 upper_limit: float # Joint upper angle limit
56 kp: float # Position control proportional gain
57 kd: float # Velocity control derivative gain
58
59
60# Robot model configuration: define all joint parameters
61robot_model: Dict[JointArea, List[JointInfo]] = {
62 # Leg joint configuration
63 JointArea.LEG: [
64 # Left leg joints
65 JointInfo("left_hip_pitch_joint", -2.4871,
66 2.4871, 40.0, 4.0), # left hip pitch
67 JointInfo("left_hip_roll_joint", -0.12217,
68 2.9059, 40.0, 4.0), # left hip roll
69 JointInfo("left_hip_yaw_joint", -1.6842,
70 3.4296, 30.0, 3.0), # left hip yaw
71 JointInfo("left_knee_joint", 0.026179,
72 2.1206, 80.0, 8.0), # left knee
73 JointInfo("left_ankle_pitch_joint", -0.80285,
74 0.45378, 40.0, 4.0), # left ankle pitch
75 JointInfo("left_ankle_roll_joint", -0.2618,
76 0.2618, 20.0, 2.0), # left ankle roll
77 # Right leg joints
78 JointInfo("right_hip_pitch_joint", -2.4871,
79 2.4871, 40.0, 4.0), # right hip pitch
80 JointInfo("right_hip_roll_joint", -2.9059,
81 0.12217, 40.0, 4.0), # right hip roll
82 JointInfo("right_hip_yaw_joint", -3.4296,
83 1.6842, 30.0, 3.0), # right hip yaw
84 JointInfo("right_knee_joint", 0.026179,
85 2.1206, 80.0, 8.0), # right knee
86 JointInfo("right_ankle_pitch_joint", -0.80285,
87 0.45378, 40.0, 4.0), # right ankle pitch
88 JointInfo("right_ankle_roll_joint", -0.2618,
89 0.2618, 20.0, 2.0), # right ankle roll
90 ],
91 # Waist joint configuration
92 JointArea.WAIST: [
93 JointInfo("waist_yaw_joint", -3.4296,
94 2.3824, 20.0, 4.0), # waist yaw
95 JointInfo("waist_pitch_joint", -0.17453,
96 0.17453, 20.0, 4.0), # waist pitch
97 JointInfo("waist_roll_joint", -0.48869,
98 0.48869, 20.0, 4.0), # waist roll
99 ],
100 # Arm joint configuration
101 JointArea.ARM: [
102 # Left arm
103 JointInfo("left_shoulder_pitch_joint", -
104 2.5569, 2.5569, 20.0, 2.0), # left shoulder pitch
105 JointInfo("left_shoulder_roll_joint", -
106 0.06108, 3.3598, 20.0, 2.0), # left shoulder roll
107 JointInfo("left_shoulder_yaw_joint", -
108 2.5569, 2.5569, 20.0, 2.0), # left shoulder yaw
109 JointInfo("left_elbow_pitch_joint", -2.4435,
110 0.0, 20.0, 2.0), # left elbow pitch
111 JointInfo("left_elbow_roll_joint", -2.5569,
112 2.5569, 20.0, 2.0), # left elbow roll
113 JointInfo("left_wrist_pitch_joint", -0.5585,
114 0.5585, 20.0, 2.0), # left wrist pitch
115 JointInfo("left_wrist_yaw_joint", -1.5446,
116 1.5446, 20.0, 2.0), # left wrist yaw
117 # Right arm
118 JointInfo("right_shoulder_pitch_joint", -
119 2.5569, 2.5569, 20.0, 2.0), # right shoulder pitch
120 JointInfo("right_shoulder_roll_joint", -
121 3.3598, 0.06108, 20.0, 2.0), # right shoulder roll
122 JointInfo("right_shoulder_yaw_joint", -
123 2.5569, 2.5569, 20.0, 2.0), # right shoulder yaw
124 JointInfo("right_elbow_pitch_joint", 0.0,
125 2.4435, 20.0, 2.0), # right elbow pitch
126 JointInfo("right_elbow_roll_joint", -2.5569,
127 2.5569, 20.0, 2.0), # right elbow roll
128 JointInfo("right_wrist_pitch_joint", -
129 0.5585, 0.5585, 20.0, 2.0), # right wrist pitch
130 JointInfo("right_wrist_yaw_joint", -1.5446,
131 1.5446, 20.0, 2.0), # right wrist yaw
132 ],
133 # Head joint configuration
134 JointArea.HEAD: [
135 JointInfo("head_pitch_joint", -0.61087,
136 0.61087, 20.0, 2.0), # head pitch
137 JointInfo("head_yaw_joint", -0.61087,
138 0.61087, 20.0, 2.0), # head yaw
139 ],
140}
141
142
143class JointControllerNode(Node):
144 """
145 Joint controller node
146 Responsible for receiving joint states, using Ruckig for trajectory planning,
147 and publishing joint commands.
148 """
149
150 def __init__(self, node_name: str, sub_topic: str, pub_topic: str, area: JointArea, dofs: int):
151 """
152 Initialize joint controller
153 Args:
154 node_name: node name
155 sub_topic: topic name to subscribe (joint states)
156 pub_topic: topic name to publish (joint commands)
157 area: joint area (head/arm/waist/leg)
158 dofs: number of DOFs
159 """
160 super().__init__(node_name)
161 self.lock = Lock()
162 self.joint_info = robot_model[area]
163 self.dofs = dofs
164 self.ruckig = ruckig.Ruckig(dofs, 0.002) # 2 ms control period
165 self.input = ruckig.InputParameter(dofs)
166 self.output = ruckig.OutputParameter(dofs)
167 self.ruckig_initialized = False
168
169 # Initialize trajectory parameters
170 self.input.current_position = [0.0] * dofs
171 self.input.current_velocity = [0.0] * dofs
172 self.input.current_acceleration = [0.0] * dofs
173
174 # Motion limits
175 self.input.max_velocity = [1.0] * dofs
176 self.input.max_acceleration = [1.0] * dofs
177 self.input.max_jerk = [25.0] * dofs
178
179 # ROS2 subscriber and publisher
180 self.sub = self.create_subscription(
181 JointStateArray,
182 sub_topic,
183 self.joint_state_callback,
184 subscriber_qos
185 )
186 self.pub = self.create_publisher(
187 JointCommandArray,
188 pub_topic,
189 publisher_qos
190 )
191
192 def joint_state_callback(self, msg: JointStateArray):
193 """
194 Joint state callback
195 Receives and processes joint state messages
196 """
197 self.ruckig_initialized = True
198
199 def control_callback(self, joint_idx):
200 """
201 Control callback
202 Uses Ruckig for trajectory planning and publishes control commands
203 Args:
204 joint_idx: target joint index
205 """
206 # Run Ruckig until the target is reached
207 while self.ruckig.update(self.input, self.output) in [ruckig.Result.Working, ruckig.Result.Finished]:
208 # Update current state
209 self.input.current_position = self.output.new_position
210 self.input.current_velocity = self.output.new_velocity
211 self.input.current_acceleration = self.output.new_acceleration
212
213 # Check if target is reached
214 tolerance = 1e-6
215 current_p = self.output.new_position[joint_idx]
216 if abs(current_p - self.input.target_position[joint_idx]) < tolerance:
217 break
218
219 # Create and publish command
220 cmd = JointCommandArray()
221 for i, joint in enumerate(self.joint_info):
222 j = JointCommand()
223 j.name = joint.name
224 j.position = self.output.new_position[i]
225 j.velocity = self.output.new_velocity[i]
226 j.effort = 0.0
227 j.stiffness = joint.kp
228 j.damping = joint.kd
229 cmd.joints.append(j)
230
231 self.pub.publish(cmd)
232
233 def set_target_position(self, joint_name, position):
234 """
235 Set target joint position
236 Args:
237 joint_name: joint name
238 position: target position
239 """
240 p_s = [0.0] * self.dofs
241 joint_idx = 0
242 for i, joint in enumerate(self.joint_info):
243 if joint.name == joint_name:
244 p_s[i] = position
245 joint_idx = i
246 self.input.target_position = p_s
247 self.input.target_velocity = [0.0] * self.dofs
248 self.input.target_acceleration = [0.0] * self.dofs
249 self.control_callback(joint_idx)
250
251
252def main(args=None):
253 """
254 Main function
255 Initialize ROS2 node and start joint controller
256 """
257 rclpy.init(args=args)
258
259 # Create leg controller node
260 leg_node = JointControllerNode(
261 "leg_node",
262 "/aima/hal/joint/leg/state",
263 "/aima/hal/joint/leg/command",
264 JointArea.LEG,
265 12
266 )
267
268 # waist_node = JointControllerNode(
269 # "waist_node",
270 # "/aima/hal/joint/waist/state",
271 # "/aima/hal/joint/waist/command",
272 # JointArea.WAIST,
273 # 3
274 # )
275
276 # arm_node = JointControllerNode(
277 # "arm_node",
278 # "/aima/hal/joint/arm/state",
279 # "/aima/hal/joint/arm/command",
280 # JointArea.ARM,
281 # 14
282 # )
283
284 # head_node = JointControllerNode(
285 # "head_node",
286 # "/aima/hal/joint/head/state",
287 # "/aima/hal/joint/head/command",
288 # JointArea.HEAD,
289 # 2
290 # )
291
292 position = 0.8
293
294 # Only control the left leg joint. If you want to control a specific joint, assign it directly.
295 def timer_callback():
296 """
297 Timer callback
298 Periodically change target position to achieve oscillating motion
299 """
300 nonlocal position
301 position = -position
302 position = 1.3 + position
303 leg_node.set_target_position("left_knee_joint", position)
304
305 # arm_node.set_target_position("left_shoulder_pitch_joint", position)
306 # waist_node.set_target_position("waist_yaw_joint", position)
307 # head_node.set_target_position("head_pitch_joint", position)
308
309 leg_node.create_timer(3.0, timer_callback)
310
311 # Multi-threaded executor
312 executor = rclpy.executors.MultiThreadedExecutor()
313 executor.add_node(leg_node)
314
315 # executor.add_node(waist_node)
316 # executor.add_node(arm_node)
317 # executor.add_node(head_node)
318
319 try:
320 executor.spin()
321 except KeyboardInterrupt:
322 pass
323 finally:
324 leg_node.destroy_node()
325 # waist_node.destroy_node()
326 # arm_node.destroy_node()
327 # head_node.destroy_node()
328 if rclpy.ok():
329 rclpy.shutdown()
330
331
332if __name__ == '__main__':
333 main()
6.1.10 Keyboard control
This example implements robot forward/backward and turning control using keyboard input from a PC.
Use W A S D to control robot direction; increase/decrease speed by ±0.2 m/s. Use Q/E to increase/decrease angular speed by ±0.1 rad/s. Press ESC to exit and release terminal resources. Press Space to immediately zero speed (emergency stop).
Caution
Note: Before running this example, use the controller to put the robot into the Stable Standing Mode mode. (For position-control stand / locomotion modes press R2 + X; see the mode transition diagram for other modes.) Then run aima em stop-app rc on the robot’s terminal to stop the remote controller and prevent channel occupation.
You must register an input source before using keyboard control (this example registers one).
1#!/usr/bin/env python3
2
3import rclpy
4import rclpy.logging
5from rclpy.node import Node
6from aimdk_msgs.msg import McLocomotionVelocity, MessageHeader
7from aimdk_msgs.srv import SetMcInputSource
8import curses
9import time
10from functools import partial
11
12
13class KeyboardVelocityController(Node):
14 def __init__(self, stdscr):
15 super().__init__('keyboard_velocity_controller')
16 self.stdscr = stdscr
17 self.forward_velocity = 0.0
18 self.lateral_velocity = 0.0
19 self.angular_velocity = 0.0
20 self.step = 0.2
21 self.angular_step = 0.1
22
23 self.publisher = self.create_publisher(
24 McLocomotionVelocity, '/aima/mc/locomotion/velocity', 10)
25 self.client = self.create_client(
26 SetMcInputSource, '/aimdk_5Fmsgs/srv/SetMcInputSource')
27
28 if not self.register_input_source():
29 self.get_logger().error("Input source registration failed, exiting")
30 raise RuntimeError("Failed to register input source")
31
32 # Configure curses
33 curses.cbreak()
34 curses.noecho()
35 self.stdscr.keypad(True)
36 self.stdscr.nodelay(True)
37
38 self.get_logger().info(
39 "Control started: W/S forward/backward, A/D strafe, Q/E turn, Space stop, Esc exit")
40
41 # Timer: check keyboard every 50 ms
42 self.timer = self.create_timer(0.05, self.check_key_and_publish)
43
44 def register_input_source(self):
45 self.get_logger().info("Registering input source...")
46
47 timeout_sec = 8.0
48 start = self.get_clock().now().nanoseconds / 1e9
49
50 while not self.client.wait_for_service(timeout_sec=2.0):
51 now = self.get_clock().now().nanoseconds / 1e9
52 if now - start > timeout_sec:
53 self.get_logger().error("Waiting for service timed out")
54 return False
55 self.get_logger().info("Waiting for input source service...")
56
57 req = SetMcInputSource.Request()
58 req.action.value = 1001
59 req.input_source.name = "node"
60 req.input_source.priority = 40
61 req.input_source.timeout = 1000
62
63 for i in range(8):
64 req.request.header.stamp = self.get_clock().now().to_msg()
65 future = self.client.call_async(req)
66 rclpy.spin_until_future_complete(self, future, timeout_sec=0.25)
67
68 if future.done():
69 break
70
71 # retry as remote peer is NOT handled well by ROS
72 self.get_logger().info(f"trying to register input source... [{i}]")
73
74 if future.done():
75 try:
76 resp = future.result()
77 state = resp.response.state.value
78 self.get_logger().info(
79 f"Input source set successfully: state={state}, task_id={resp.response.task_id}")
80 return True
81 except Exception as e:
82 self.get_logger().error(f"Service exception: {str(e)}")
83 return False
84 else:
85 self.get_logger().error("Service call failed or timed out")
86 return False
87
88 def check_key_and_publish(self):
89 try:
90 ch = self.stdscr.getch()
91 except Exception:
92 ch = -1
93
94 if ch != -1:
95 if ch == ord(' '):
96 self.forward_velocity = 0.0
97 self.lateral_velocity = 0.0
98 self.angular_velocity = 0.0
99 elif ch == ord('w'):
100 self.forward_velocity = min(
101 self.forward_velocity + self.step, 1.0)
102 elif ch == ord('s'):
103 self.forward_velocity = max(
104 self.forward_velocity - self.step, -1.0)
105 elif ch == ord('a'):
106 self.lateral_velocity = min(
107 self.lateral_velocity + self.step, 1.0)
108 elif ch == ord('d'):
109 self.lateral_velocity = max(
110 self.lateral_velocity - self.step, -1.0)
111 elif ch == ord('q'):
112 self.angular_velocity = min(
113 self.angular_velocity + self.angular_step, 1.0)
114 elif ch == ord('e'):
115 self.angular_velocity = max(
116 self.angular_velocity - self.angular_step, -1.0)
117 elif ch == 27: # ESC
118 self.get_logger().info("Exiting control")
119 rclpy.shutdown()
120 return
121
122 msg = McLocomotionVelocity()
123 msg.header = MessageHeader()
124 msg.header.stamp = self.get_clock().now().to_msg()
125 msg.source = "node"
126 msg.forward_velocity = self.forward_velocity
127 msg.lateral_velocity = self.lateral_velocity
128 msg.angular_velocity = self.angular_velocity
129
130 self.publisher.publish(msg)
131
132 # Update UI
133 self.stdscr.clear()
134 self.stdscr.addstr(
135 0, 0, "W/S: Forward/Backward | A/D: Strafe | Q/E: Turn | Space: Stop | ESC: Exit")
136 self.stdscr.addstr(2, 0,
137 f"Speed Status: Forward: {self.forward_velocity:.2f} m/s | "
138 f"Lateral: {self.lateral_velocity:.2f} m/s | "
139 f"Angular: {self.angular_velocity:.2f} rad/s")
140 self.stdscr.refresh()
141
142
143def curses_main(stdscr):
144 rclpy.init()
145 try:
146 node = KeyboardVelocityController(stdscr)
147 rclpy.spin(node)
148 except Exception as e:
149 rclpy.logging.get_logger("main").fatal(
150 f"Program exited with exception: {e}")
151 finally:
152 curses.endwin()
153 rclpy.shutdown()
154
155
156def main():
157 curses.wrapper(curses_main)
158
159
160if __name__ == '__main__':
161 main()
6.1.11 Take photo
This example uses take_photo. Before running the node, set the camera topic to capture. When started, the node creates an /images/ directory and saves the current frame into that directory.
1#!/usr/bin/env python3
2import time
3from pathlib import Path
4
5import rclpy
6from rclpy.node import Node
7from rclpy.qos import qos_profile_sensor_data
8from sensor_msgs.msg import Image
9from cv_bridge import CvBridge
10import cv2
11
12
13class SaveOneRawPy(Node):
14 def __init__(self):
15 super().__init__('save_one_image')
16
17 # parameter: image topic
18 self.declare_parameter(
19 'image_topic', '/aima/hal/sensor/stereo_head_front_left/rgb_image'
20 )
21 self.topic = self.get_parameter(
22 'image_topic').get_parameter_value().string_value
23
24 # save directory
25 self.save_dir = Path('images').resolve()
26 self.save_dir.mkdir(parents=True, exist_ok=True)
27
28 # state
29 self._saved = False
30 self._bridge = CvBridge()
31
32 # subscriber (sensor QoS)
33 self.sub = self.create_subscription(
34 Image,
35 self.topic,
36 self.image_cb,
37 qos_profile_sensor_data
38 )
39 self.get_logger().info(f'Subscribing to raw image: {self.topic}')
40 self.get_logger().info(f'Images will be saved to: {self.save_dir}')
41
42 def image_cb(self, msg: Image):
43 # already saved one, ignore later frames
44 if self._saved:
45 return
46
47 try:
48 enc = msg.encoding.lower()
49 self.get_logger().info(f'Received image with encoding: {enc}')
50
51 # convert from ROS Image to cv2
52 img = self._bridge.imgmsg_to_cv2(
53 msg, desired_encoding='passthrough')
54
55 # normalize to BGR for saving
56 if enc == 'rgb8':
57 img = cv2.cvtColor(img, cv2.COLOR_RGB2BGR)
58 elif enc == 'mono8':
59 img = cv2.cvtColor(img, cv2.COLOR_GRAY2BGR)
60 # if it's bgr8 or other 8-bit bgr that cv2 can save, we just use it
61
62 ts_ms = int(time.time() * 1000)
63 out_path = self.save_dir / f'frame_{ts_ms}.png'
64
65 ok = cv2.imwrite(str(out_path), img)
66 if ok:
67 self.get_logger().info(
68 f'Saved image: {out_path} ({img.shape[1]}x{img.shape[0]})'
69 )
70 self._saved = True
71 # shut down once we got exactly one frame
72 # destroy node first, then shutdown rclpy
73 self.destroy_node()
74 if rclpy.ok():
75 rclpy.shutdown()
76 else:
77 self.get_logger().error(f'cv2.imwrite failed: {out_path}')
78 except Exception as e:
79 self.get_logger().error(f'Failed to decode / save image: {e}')
80
81
82def main():
83 rclpy.init()
84 node = SaveOneRawPy()
85 rclpy.spin(node)
86 # in case the node was already destroyed in the callback
87 if rclpy.ok():
88 try:
89 node.destroy_node()
90 except Exception:
91 pass
92 rclpy.shutdown()
93
94
95if __name__ == '__main__':
96 main()
6.1.12 Camera stream examples
This example collection provides multiple camera subscription and processing demos, supporting depth, stereo, and mono camera streams.
These camera subscription examples are not application-level; they only print basic camera data. If you are familiar with ROS2, you can achieve similar results with ros2 topic echo + ros2 topic hz. You can consult the SDK topic list to jump directly into module development or use these camera examples as scaffolding for your own logic. The published sensor data are raw (no preprocessing like undistortion). For detailed sensor information (e.g., resolution, focal length), check the corresponding camera_info topic.
Depth camera data subscription
This example uses echo_camera_rgbd to subscribe to /aima/hal/sensor/rgbd_head_front/ and receive the robot’s depth camera data. It supports depth pointclouds, depth images, RGB images, compressed RGB images, and camera intrinsics.
Features:
Supports multiple data type subscriptions (depth pointcloud, depth image, RGB image, compressed image, camera intrinsics)
Real-time FPS statistics and data display
Supports RGB image video recording
Configurable topic type selection
Supported data types:
depth_pointcloud: Depth point cloud data (sensor_msgs/PointCloud2)depth_image: Depth image (sensor_msgs/Image)rgb_image: RGB image (sensor_msgs/Image)rgb_image_compressed: Compressed RGB image (sensor_msgs/CompressedImage)camera_info: Camera intrinsic parameters (sensor_msgs/CameraInfo)
1#!/usr/bin/env python3
2"""
3Head depth camera multi-topic subscription example
4
5Supports selecting the topic type to subscribe via startup parameter --ros-args -p topic_type:=<type>:
6 - depth_pointcloud: Depth point cloud (sensor_msgs/PointCloud2)
7 - depth_image: Depth image (sensor_msgs/Image)
8 - depth_camera_info: Camera intrinsic parameters (sensor_msgs/CameraInfo)
9 - rgb_image: RGB image (sensor_msgs/Image)
10 - rgb_image_compressed: RGB compressed image (sensor_msgs/CompressedImage)
11 - rgb_camera_info: Camera intrinsic parameters (sensor_msgs/CameraInfo)
12
13Example:
14 ros2 run py_examples echo_camera_rgbd --ros-args -p topic_type:=depth_pointcloud
15 ros2 run py_examples echo_camera_rgbd --ros-args -p topic_type:=rgb_image
16 ros2 run py_examples echo_camera_rgbd --ros-args -p topic_type:=rgb_camera_info
17
18Default topic_type is rgb_image
19"""
20
21import rclpy
22from rclpy.node import Node
23from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy
24from sensor_msgs.msg import Image, CompressedImage, CameraInfo, PointCloud2
25from collections import deque
26import time
27
28
29class CameraTopicEcho(Node):
30 def __init__(self):
31 super().__init__('camera_topic_echo')
32
33 # Select the topic type to subscribe
34 self.declare_parameter('topic_type', 'rgb_image')
35 self.declare_parameter('dump_video_path', '')
36
37 self.topic_type = self.get_parameter('topic_type').value
38 self.dump_video_path = self.get_parameter('dump_video_path').value
39
40 # SensorDataQoS: BEST_EFFORT + VOLATILE
41 qos = QoSProfile(
42 reliability=QoSReliabilityPolicy.BEST_EFFORT,
43 history=QoSHistoryPolicy.KEEP_LAST,
44 depth=5
45 )
46
47 # Create different subscribers based on topic_type
48 if self.topic_type == "depth_pointcloud":
49 self.topic_name = "/aima/hal/sensor/rgbd_head_front/depth_pointcloud"
50 self.sub_pointcloud = self.create_subscription(
51 PointCloud2, self.topic_name, self.cb_pointcloud, qos)
52 self.get_logger().info(
53 f"✅ Subscribing PointCloud2: {self.topic_name}")
54
55 elif self.topic_type == "depth_image":
56 self.topic_name = "/aima/hal/sensor/rgbd_head_front/depth_image"
57 self.sub_image = self.create_subscription(
58 Image, self.topic_name, self.cb_image, qos)
59 self.get_logger().info(
60 f"✅ Subscribing Depth Image: {self.topic_name}")
61
62 elif self.topic_type == "rgb_image":
63 self.topic_name = "/aima/hal/sensor/rgbd_head_front/rgb_image"
64 self.sub_image = self.create_subscription(
65 Image, self.topic_name, self.cb_image, qos)
66 self.get_logger().info(
67 f"✅ Subscribing RGB Image: {self.topic_name}")
68 if self.dump_video_path:
69 self.get_logger().info(
70 f"📝 Will dump received images to video: {self.dump_video_path}")
71
72 elif self.topic_type == "rgb_image_compressed":
73 self.topic_name = "/aima/hal/sensor/rgbd_head_front/rgb_image/compressed"
74 self.sub_compressed = self.create_subscription(
75 CompressedImage, self.topic_name, self.cb_compressed, qos)
76 self.get_logger().info(
77 f"✅ Subscribing CompressedImage: {self.topic_name}")
78
79 elif self.topic_type == "rgb_camera_info":
80 self.topic_name = "/aima/hal/sensor/rgbd_head_front/rgb_camera_info"
81 # RGB-D CameraInfo is different with other cameras. The best_effort + volatile QoS is enough for 10Hz rgb_camera_info
82 self.sub_camerainfo = self.create_subscription(
83 CameraInfo, self.topic_name, self.cb_camerainfo, qos)
84 self.get_logger().info(
85 f"✅ Subscribing RGB CameraInfo: {self.topic_name}")
86
87 elif self.topic_type == "depth_camera_info":
88 self.topic_name = "/aima/hal/sensor/rgbd_head_front/depth_camera_info"
89 # RGB-D CameraInfo is different with other cameras. The best_effort + volatile QoS is enough for 10Hz depth_camera_info
90 self.sub_camerainfo = self.create_subscription(
91 CameraInfo, self.topic_name, self.cb_camerainfo, qos)
92 self.get_logger().info(
93 f"✅ Subscribing Depth CameraInfo: {self.topic_name}")
94
95 else:
96 self.get_logger().error(f"Unknown topic_type: {self.topic_type}")
97 raise ValueError("Unknown topic_type")
98
99 # Internal state
100 self.last_print = self.get_clock().now()
101 self.arrivals = deque()
102
103 def update_arrivals(self):
104 """Calculate received FPS"""
105 now = self.get_clock().now()
106 self.arrivals.append(now)
107 while self.arrivals and (now - self.arrivals[0]).nanoseconds * 1e-9 > 1.0:
108 self.arrivals.popleft()
109
110 def get_fps(self):
111 """Get FPS"""
112 return len(self.arrivals)
113
114 def should_print(self):
115 """Control print frequency"""
116 now = self.get_clock().now()
117 if (now - self.last_print).nanoseconds * 1e-9 >= 1.0:
118 self.last_print = now
119 return True
120 return False
121
122 def cb_pointcloud(self, msg: PointCloud2):
123 """PointCloud2 callback"""
124 self.update_arrivals()
125
126 if self.should_print():
127 stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
128
129 # Format fields information
130 fields_str = " ".join(
131 [f"{f.name}({f.datatype})" for f in msg.fields])
132
133 self.get_logger().info(
134 f"🌫️ PointCloud2 received\n"
135 f" • frame_id: {msg.header.frame_id}\n"
136 f" • stamp (sec): {stamp_sec:.6f}\n"
137 f" • width x height: {msg.width} x {msg.height}\n"
138 f" • point_step: {msg.point_step}\n"
139 f" • row_step: {msg.row_step}\n"
140 f" • fields: {fields_str}\n"
141 f" • is_bigendian: {msg.is_bigendian}\n"
142 f" • is_dense: {msg.is_dense}\n"
143 f" • data size: {len(msg.data)}\n"
144 f" • recv FPS (1s): {self.get_fps():.1f}"
145 )
146
147 def cb_image(self, msg: Image):
148 """Image callback (Depth/RGB image)"""
149 self.update_arrivals()
150
151 if self.should_print():
152 stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
153 self.get_logger().info(
154 f"📸 {self.topic_type} received\n"
155 f" • frame_id: {msg.header.frame_id}\n"
156 f" • stamp (sec): {stamp_sec:.6f}\n"
157 f" • encoding: {msg.encoding}\n"
158 f" • size (WxH): {msg.width} x {msg.height}\n"
159 f" • step (bytes/row):{msg.step}\n"
160 f" • is_bigendian: {msg.is_bigendian}\n"
161 f" • recv FPS (1s): {self.get_fps():.1f}"
162 )
163
164 # Only RGB image supports video dump
165 if self.topic_type == "rgb_image" and self.dump_video_path:
166 self.dump_image_to_video(msg)
167
168 def cb_compressed(self, msg: CompressedImage):
169 """CompressedImage callback"""
170 self.update_arrivals()
171
172 if self.should_print():
173 stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
174 self.get_logger().info(
175 f"🗜️ CompressedImage received\n"
176 f" • frame_id: {msg.header.frame_id}\n"
177 f" • stamp (sec): {stamp_sec:.6f}\n"
178 f" • format: {msg.format}\n"
179 f" • data size: {len(msg.data)}\n"
180 f" • recv FPS (1s): {self.get_fps():.1f}"
181 )
182
183 def cb_camerainfo(self, msg: CameraInfo):
184 """CameraInfo callback (camera intrinsic parameters)"""
185 # Camera info will only receive one frame, print it directly
186 stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
187
188 # Format D array
189 d_str = ", ".join([f"{d:.6f}" for d in msg.d])
190
191 # Format K matrix
192 k_str = ", ".join([f"{k:.6f}" for k in msg.k])
193
194 # Format P matrix
195 p_str = ", ".join([f"{p:.6f}" for p in msg.p])
196
197 self.get_logger().info(
198 f"📷 {self.topic_type} received\n"
199 f" • frame_id: {msg.header.frame_id}\n"
200 f" • stamp (sec): {stamp_sec:.6f}\n"
201 f" • width x height: {msg.width} x {msg.height}\n"
202 f" • distortion_model:{msg.distortion_model}\n"
203 f" • D: [{d_str}]\n"
204 f" • K: [{k_str}]\n"
205 f" • P: [{p_str}]\n"
206 f" • binning_x: {msg.binning_x}\n"
207 f" • binning_y: {msg.binning_y}\n"
208 f" • roi: {{ x_offset: {msg.roi.x_offset}, y_offset: {msg.roi.y_offset}, height: {msg.roi.height}, width: {msg.roi.width}, do_rectify: {msg.roi.do_rectify} }}"
209 )
210
211 def dump_image_to_video(self, msg: Image):
212 """Video dump is only supported for RGB images"""
213 # You can add video recording functionality here
214 # Simplified in the Python version, only logs instead
215 if self.should_print():
216 self.get_logger().info(f"📝 Video dump not implemented in Python version")
217
218
219def main(args=None):
220 rclpy.init(args=args)
221 try:
222 node = CameraTopicEcho()
223 rclpy.spin(node)
224 except KeyboardInterrupt:
225 pass
226 except Exception as e:
227 print(f"Error: {e}")
228 finally:
229 if 'node' in locals():
230 node.destroy_node()
231 rclpy.shutdown()
232
233
234if __name__ == '__main__':
235 main()
Usage:
Subscribe to depth pointcloud:
ros2 run py_examples echo_camera_rgbd --ros-args -p topic_type:=depth_pointcloud
Subscribe to RGB image data:
ros2 run py_examples echo_camera_rgbd --ros-args -p topic_type:=rgb_image
Subscribe to camera intrinsics:
ros2 run py_examples echo_camera_rgbd --ros-args -p topic_type:=rgb_camera_info ros2 run py_examples echo_camera_rgbd --ros-args -p topic_type:=depth_camera_info
Record RGB video:
# You can change dump_video_path to another path; ensure the directory exists before saving ros2 run py_examples echo_camera_rgbd --ros-args -p topic_type:=rgb_image -p dump_video_path:=$PWD/output.avi
Stereo camera data subscription
This example uses echo_camera_stereo to subscribe to /aima/hal/sensor/stereo_head_front_*/ and receive stereo camera data from the robot, supporting left/right RGB images, compressed images, and camera intrinsics.
Features:
Supports independent left/right camera subscriptions
Real-time FPS statistics and data display
Supports RGB image video recording
Configurable camera selection (left/right)
Supported data types:
left_rgb_image: Left camera RGB image (sensor_msgs/Image)left_rgb_image_compressed: Left camera compressed RGB image (sensor_msgs/CompressedImage)left_camera_info: Left camera intrinsics (sensor_msgs/CameraInfo)right_rgb_image: Right camera RGB image (sensor_msgs/Image)right_rgb_image_compressed: Right camera compressed RGB image (sensor_msgs/CompressedImage)right_camera_info: Right camera intrinsics (sensor_msgs/CameraInfo)
1#!/usr/bin/env python3
2"""
3Head stereo camera multi-topic subscription example
4
5Supports selecting the topic type to subscribe via startup parameter --ros-args -p topic_type:=<type>:
6 - left_rgb_image: Left camera RGB image (sensor_msgs/Image)
7 - left_rgb_image_compressed: Left camera RGB compressed image (sensor_msgs/CompressedImage)
8 - left_camera_info: Left camera intrinsic parameters (sensor_msgs/CameraInfo)
9 - right_rgb_image: Right camera RGB image (sensor_msgs/Image)
10 - right_rgb_image_compressed: Right camera RGB compressed image (sensor_msgs/CompressedImage)
11 - right_camera_info: Right camera intrinsic parameters (sensor_msgs/CameraInfo)
12
13Example:
14 ros2 run py_examples echo_camera_stereo --ros-args -p topic_type:=left_rgb_image
15 ros2 run py_examples echo_camera_stereo --ros-args -p topic_type:=right_rgb_image
16 ros2 run py_examples echo_camera_stereo --ros-args -p topic_type:=left_camera_info
17
18Default topic_type is left_rgb_image
19"""
20
21import rclpy
22from rclpy.node import Node
23from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSDurabilityPolicy, QoSHistoryPolicy
24from sensor_msgs.msg import Image, CompressedImage, CameraInfo
25from collections import deque
26import time
27
28
29class StereoCameraTopicEcho(Node):
30 def __init__(self):
31 super().__init__('stereo_camera_topic_echo')
32
33 # Select the topic type to subscribe
34 self.declare_parameter('topic_type', 'left_rgb_image')
35 self.declare_parameter('dump_video_path', '')
36
37 self.topic_type = self.get_parameter('topic_type').value
38 self.dump_video_path = self.get_parameter('dump_video_path').value
39
40 # Set QoS parameters - use sensor data QoS
41 qos = QoSProfile(
42 reliability=QoSReliabilityPolicy.BEST_EFFORT,
43 history=QoSHistoryPolicy.KEEP_LAST,
44 depth=5,
45 durability=QoSDurabilityPolicy.VOLATILE
46 )
47
48 # Create different subscribers based on topic_type
49 if self.topic_type == "left_rgb_image":
50 self.topic_name = "/aima/hal/sensor/stereo_head_front_left/rgb_image"
51 self.sub_image = self.create_subscription(
52 Image, self.topic_name, self.cb_image, qos)
53 self.get_logger().info(
54 f"✅ Subscribing Left RGB Image: {self.topic_name}")
55 if self.dump_video_path:
56 self.get_logger().info(
57 f"📝 Will dump received images to video: {self.dump_video_path}")
58
59 elif self.topic_type == "left_rgb_image_compressed":
60 self.topic_name = "/aima/hal/sensor/stereo_head_front_left/rgb_image/compressed"
61 self.sub_compressed = self.create_subscription(
62 CompressedImage, self.topic_name, self.cb_compressed, qos)
63 self.get_logger().info(
64 f"✅ Subscribing Left CompressedImage: {self.topic_name}")
65
66 elif self.topic_type == "left_camera_info":
67 self.topic_name = "/aima/hal/sensor/stereo_head_front_left/camera_info"
68 # CameraInfo subscription must use reliable + transient_local QoS to receive historical messages (even if only one frame is published)
69 camera_qos = QoSProfile(
70 reliability=QoSReliabilityPolicy.RELIABLE,
71 history=QoSHistoryPolicy.KEEP_LAST,
72 depth=1,
73 durability=QoSDurabilityPolicy.TRANSIENT_LOCAL
74 )
75 self.sub_camerainfo = self.create_subscription(
76 CameraInfo, self.topic_name, self.cb_camerainfo, camera_qos)
77 self.get_logger().info(
78 f"✅ Subscribing Left CameraInfo (with transient_local): {self.topic_name}")
79
80 elif self.topic_type == "right_rgb_image":
81 self.topic_name = "/aima/hal/sensor/stereo_head_front_right/rgb_image"
82 self.sub_image = self.create_subscription(
83 Image, self.topic_name, self.cb_image, qos)
84 self.get_logger().info(
85 f"✅ Subscribing Right RGB Image: {self.topic_name}")
86 if self.dump_video_path:
87 self.get_logger().info(
88 f"📝 Will dump received images to video: {self.dump_video_path}")
89
90 elif self.topic_type == "right_rgb_image_compressed":
91 self.topic_name = "/aima/hal/sensor/stereo_head_front_right/rgb_image/compressed"
92 self.sub_compressed = self.create_subscription(
93 CompressedImage, self.topic_name, self.cb_compressed, qos)
94 self.get_logger().info(
95 f"✅ Subscribing Right CompressedImage: {self.topic_name}")
96
97 elif self.topic_type == "right_camera_info":
98 self.topic_name = "/aima/hal/sensor/stereo_head_front_right/camera_info"
99 # CameraInfo subscription must use reliable + transient_local QoS to receive historical messages (even if only one frame is published)
100 camera_qos = QoSProfile(
101 reliability=QoSReliabilityPolicy.RELIABLE,
102 history=QoSHistoryPolicy.KEEP_LAST,
103 depth=1,
104 durability=QoSDurabilityPolicy.TRANSIENT_LOCAL
105 )
106 self.sub_camerainfo = self.create_subscription(
107 CameraInfo, self.topic_name, self.cb_camerainfo, camera_qos)
108 self.get_logger().info(
109 f"✅ Subscribing Right CameraInfo (with transient_local): {self.topic_name}")
110
111 else:
112 self.get_logger().error(f"Unknown topic_type: {self.topic_type}")
113 raise ValueError("Unknown topic_type")
114
115 # Internal state
116 self.last_print = self.get_clock().now()
117 self.arrivals = deque()
118
119 def update_arrivals(self):
120 """Calculate received FPS"""
121 now = self.get_clock().now()
122 self.arrivals.append(now)
123 while self.arrivals and (now - self.arrivals[0]).nanoseconds * 1e-9 > 1.0:
124 self.arrivals.popleft()
125
126 def get_fps(self):
127 """Get FPS"""
128 return len(self.arrivals)
129
130 def should_print(self):
131 """Control print frequency"""
132 now = self.get_clock().now()
133 if (now - self.last_print).nanoseconds * 1e-9 >= 1.0:
134 self.last_print = now
135 return True
136 return False
137
138 def cb_image(self, msg: Image):
139 """Image callback (Left/Right camera RGB image)"""
140 self.update_arrivals()
141
142 if self.should_print():
143 stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
144 self.get_logger().info(
145 f"📸 {self.topic_type} received\n"
146 f" • frame_id: {msg.header.frame_id}\n"
147 f" • stamp (sec): {stamp_sec:.6f}\n"
148 f" • encoding: {msg.encoding}\n"
149 f" • size (WxH): {msg.width} x {msg.height}\n"
150 f" • step (bytes/row):{msg.step}\n"
151 f" • is_bigendian: {msg.is_bigendian}\n"
152 f" • recv FPS (1s): {self.get_fps():.1f}"
153 )
154
155 # Only RGB images support video dump
156 if (self.topic_type in ["left_rgb_image", "right_rgb_image"]) and self.dump_video_path:
157 self.dump_image_to_video(msg)
158
159 def cb_compressed(self, msg: CompressedImage):
160 """CompressedImage callback (Left/Right camera RGB compressed image)"""
161 self.update_arrivals()
162
163 if self.should_print():
164 stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
165 self.get_logger().info(
166 f"🗜️ {self.topic_type} received\n"
167 f" • frame_id: {msg.header.frame_id}\n"
168 f" • stamp (sec): {stamp_sec:.6f}\n"
169 f" • format: {msg.format}\n"
170 f" • data size: {len(msg.data)}\n"
171 f" • recv FPS (1s): {self.get_fps():.1f}"
172 )
173
174 def cb_camerainfo(self, msg: CameraInfo):
175 """CameraInfo callback (Left/Right camera intrinsic parameters)"""
176 # Camera info will only receive one frame, print it directly
177 stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
178
179 # Format D array
180 d_str = ", ".join([f"{d:.6f}" for d in msg.d])
181
182 # Format K matrix
183 k_str = ", ".join([f"{k:.6f}" for k in msg.k])
184
185 # Format P matrix
186 p_str = ", ".join([f"{p:.6f}" for p in msg.p])
187
188 self.get_logger().info(
189 f"📷 {self.topic_type} received\n"
190 f" • frame_id: {msg.header.frame_id}\n"
191 f" • stamp (sec): {stamp_sec:.6f}\n"
192 f" • width x height: {msg.width} x {msg.height}\n"
193 f" • distortion_model:{msg.distortion_model}\n"
194 f" • D: [{d_str}]\n"
195 f" • K: [{k_str}]\n"
196 f" • P: [{p_str}]\n"
197 f" • binning_x: {msg.binning_x}\n"
198 f" • binning_y: {msg.binning_y}\n"
199 f" • roi: {{ x_offset: {msg.roi.x_offset}, y_offset: {msg.roi.y_offset}, height: {msg.roi.height}, width: {msg.roi.width}, do_rectify: {msg.roi.do_rectify} }}"
200 )
201
202 def dump_image_to_video(self, msg: Image):
203 """Video dump is only supported for RGB images"""
204 # You can add video recording functionality here
205 # Simplified in the Python version, only logs instead
206 if self.should_print():
207 self.get_logger().info(f"📝 Video dump not implemented in Python version")
208
209
210def main(args=None):
211 rclpy.init(args=args)
212 try:
213 node = StereoCameraTopicEcho()
214 rclpy.spin(node)
215 except KeyboardInterrupt:
216 pass
217 except Exception as e:
218 print(f"Error: {e}")
219 finally:
220 if 'node' in locals():
221 node.destroy_node()
222 rclpy.shutdown()
223
224
225if __name__ == '__main__':
226 main()
Usage:
Subscribe to left camera RGB image:
ros2 run py_examples echo_camera_stereo --ros-args -p topic_type:=left_rgb_image
Subscribe to right camera RGB image:
ros2 run py_examples echo_camera_stereo --ros-args -p topic_type:=right_rgb_image
Subscribe to left camera intrinsics:
ros2 run py_examples echo_camera_stereo --ros-args -p topic_type:=left_camera_info
Record left camera video:
# You can change dump_video_path to another path; ensure the directory exists before saving ros2 run py_examples echo_camera_stereo --ros-args -p topic_type:=left_rgb_image -p dump_video_path:=$PWD/left_camera.avi
Head rear monocular camera data subscription
This example uses echo_camera_head_rear to subscribe to /aima/hal/sensor/rgb_head_rear/ and receive the robot’s head rear monocular camera data, supporting RGB images, compressed images, and camera intrinsics.
Features:
Supports head rear camera data subscription
Real-time FPS statistics and data display
Supports RGB image video recording
Configurable topic type selection
Supported data types:
rgb_image: RGB image (sensor_msgs/Image)rgb_image_compressed: Compressed RGB image (sensor_msgs/CompressedImage)camera_info: Camera intrinsic parameters (sensor_msgs/CameraInfo)
1#!/usr/bin/env python3
2"""
3Head rear monocular camera multi-topic subscription example
4
5Supports selecting the topic type to subscribe via startup parameter --ros-args -p topic_type:=<type>:
6 - rgb_image: RGB image (sensor_msgs/Image)
7 - rgb_image_compressed: RGB compressed image (sensor_msgs/CompressedImage)
8 - camera_info: Camera intrinsic parameters (sensor_msgs/CameraInfo)
9
10Example:
11 ros2 run py_examples echo_camera_head_rear --ros-args -p topic_type:=rgb_image
12 ros2 run py_examples echo_camera_head_rear --ros-args -p topic_type:=rgb_image_compressed
13 ros2 run py_examples echo_camera_head_rear --ros-args -p topic_type:=camera_info
14
15Default topic_type is rgb_image
16"""
17
18import rclpy
19from rclpy.node import Node
20from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSDurabilityPolicy, QoSHistoryPolicy
21from sensor_msgs.msg import Image, CompressedImage, CameraInfo
22from collections import deque
23import time
24
25
26class HeadRearCameraTopicEcho(Node):
27 def __init__(self):
28 super().__init__('head_rear_camera_topic_echo')
29
30 # Select the topic type to subscribe
31 self.declare_parameter('topic_type', 'rgb_image')
32 self.declare_parameter('dump_video_path', '')
33
34 self.topic_type = self.get_parameter('topic_type').value
35 self.dump_video_path = self.get_parameter('dump_video_path').value
36
37 # Set QoS parameters - use sensor data QoS
38 qos = QoSProfile(
39 reliability=QoSReliabilityPolicy.BEST_EFFORT,
40 history=QoSHistoryPolicy.KEEP_LAST,
41 depth=5,
42 durability=QoSDurabilityPolicy.VOLATILE
43 )
44
45 # Create different subscribers based on topic_type
46 if self.topic_type == "rgb_image":
47 self.topic_name = "/aima/hal/sensor/rgb_head_rear/rgb_image"
48 self.sub_image = self.create_subscription(
49 Image, self.topic_name, self.cb_image, qos)
50 self.get_logger().info(
51 f"✅ Subscribing RGB Image: {self.topic_name}")
52 if self.dump_video_path:
53 self.get_logger().info(
54 f"📝 Will dump received images to video: {self.dump_video_path}")
55
56 elif self.topic_type == "rgb_image_compressed":
57 self.topic_name = "/aima/hal/sensor/rgb_head_rear/rgb_image/compressed"
58 self.sub_compressed = self.create_subscription(
59 CompressedImage, self.topic_name, self.cb_compressed, qos)
60 self.get_logger().info(
61 f"✅ Subscribing CompressedImage: {self.topic_name}")
62
63 elif self.topic_type == "camera_info":
64 self.topic_name = "/aima/hal/sensor/rgb_head_rear/camera_info"
65 # CameraInfo subscription must use reliable + transient_local QoS to receive historical messages (even if only one frame is published)
66 camera_qos = QoSProfile(
67 reliability=QoSReliabilityPolicy.RELIABLE,
68 history=QoSHistoryPolicy.KEEP_LAST,
69 depth=1,
70 durability=QoSDurabilityPolicy.TRANSIENT_LOCAL
71 )
72 self.sub_camerainfo = self.create_subscription(
73 CameraInfo, self.topic_name, self.cb_camerainfo, camera_qos)
74 self.get_logger().info(
75 f"✅ Subscribing CameraInfo (with transient_local): {self.topic_name}")
76
77 else:
78 self.get_logger().error(f"Unknown topic_type: {self.topic_type}")
79 raise ValueError("Unknown topic_type")
80
81 # Internal state
82 self.last_print = self.get_clock().now()
83 self.arrivals = deque()
84
85 def update_arrivals(self):
86 """Calculate received FPS"""
87 now = self.get_clock().now()
88 self.arrivals.append(now)
89 while self.arrivals and (now - self.arrivals[0]).nanoseconds * 1e-9 > 1.0:
90 self.arrivals.popleft()
91
92 def get_fps(self):
93 """Get FPS"""
94 return len(self.arrivals)
95
96 def should_print(self):
97 """Control print frequency"""
98 now = self.get_clock().now()
99 if (now - self.last_print).nanoseconds * 1e-9 >= 1.0:
100 self.last_print = now
101 return True
102 return False
103
104 def cb_image(self, msg: Image):
105 """Image callback (RGB image)"""
106 self.update_arrivals()
107
108 if self.should_print():
109 stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
110 self.get_logger().info(
111 f"📸 {self.topic_type} received\n"
112 f" • frame_id: {msg.header.frame_id}\n"
113 f" • stamp (sec): {stamp_sec:.6f}\n"
114 f" • encoding: {msg.encoding}\n"
115 f" • size (WxH): {msg.width} x {msg.height}\n"
116 f" • step (bytes/row):{msg.step}\n"
117 f" • is_bigendian: {msg.is_bigendian}\n"
118 f" • recv FPS (1s): {self.get_fps():.1f}"
119 )
120
121 # Only RGB image supports video dump
122 if self.topic_type == "rgb_image" and self.dump_video_path:
123 self.dump_image_to_video(msg)
124
125 def cb_compressed(self, msg: CompressedImage):
126 """CompressedImage callback (RGB compressed image)"""
127 self.update_arrivals()
128
129 if self.should_print():
130 stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
131 self.get_logger().info(
132 f"🗜️ {self.topic_type} received\n"
133 f" • frame_id: {msg.header.frame_id}\n"
134 f" • stamp (sec): {stamp_sec:.6f}\n"
135 f" • format: {msg.format}\n"
136 f" • data size: {len(msg.data)}\n"
137 f" • recv FPS (1s): {self.get_fps():.1f}"
138 )
139
140 def cb_camerainfo(self, msg: CameraInfo):
141 """CameraInfo callback (camera intrinsic parameters)"""
142 # Camera info will only receive one frame, print it directly
143 stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
144
145 # Format D array
146 d_str = ", ".join([f"{d:.6f}" for d in msg.d])
147
148 # Format K matrix
149 k_str = ", ".join([f"{k:.6f}" for k in msg.k])
150
151 # Format P matrix
152 p_str = ", ".join([f"{p:.6f}" for p in msg.p])
153
154 self.get_logger().info(
155 f"📷 {self.topic_type} received\n"
156 f" • frame_id: {msg.header.frame_id}\n"
157 f" • stamp (sec): {stamp_sec:.6f}\n"
158 f" • width x height: {msg.width} x {msg.height}\n"
159 f" • distortion_model:{msg.distortion_model}\n"
160 f" • D: [{d_str}]\n"
161 f" • K: [{k_str}]\n"
162 f" • P: [{p_str}]\n"
163 f" • binning_x: {msg.binning_x}\n"
164 f" • binning_y: {msg.binning_y}\n"
165 f" • roi: {{ x_offset: {msg.roi.x_offset}, y_offset: {msg.roi.y_offset}, height: {msg.roi.height}, width: {msg.roi.width}, do_rectify: {msg.roi.do_rectify} }}"
166 )
167
168 def dump_image_to_video(self, msg: Image):
169 """Video dump is only supported for RGB images"""
170 # You can add video recording functionality here
171 # Simplified in the Python version, only logs instead
172 if self.should_print():
173 self.get_logger().info(f"📝 Video dump not implemented in Python version")
174
175
176def main(args=None):
177 rclpy.init(args=args)
178 try:
179 node = HeadRearCameraTopicEcho()
180 rclpy.spin(node)
181 except KeyboardInterrupt:
182 pass
183 except Exception as e:
184 print(f"Error: {e}")
185 finally:
186 if 'node' in locals():
187 node.destroy_node()
188 rclpy.shutdown()
189
190
191if __name__ == '__main__':
192 main()
Usage:
Subscribe to RGB image data:
ros2 run py_examples echo_camera_head_rear --ros-args -p topic_type:=rgb_image
Subscribe to compressed image data:
ros2 run py_examples echo_camera_head_rear --ros-args -p topic_type:=rgb_image_compressed
Subscribe to camera intrinsics:
ros2 run py_examples echo_camera_head_rear --ros-args -p topic_type:=camera_info
Record video:
# You can change dump_video_path to another path; ensure the directory exists before saving ros2 run py_examples echo_camera_head_rear --ros-args -p topic_type:=rgb_image -p dump_video_path:=$PWD/rear_camera.avi
Use cases:
Face recognition and tracking
Object detection and recognition
Visual SLAM
Image processing and computer vision algorithm development
Robot visual navigation
6.1.13 LiDAR data subscription
This example uses echo_lidar_data, which subscribes to the /aima/hal/sensor/lidar_chest_front/ topic to receive the robot’s LiDAR data, supporting both point cloud and IMU data types.
Features:
Supports LiDAR point cloud data subscription
Supports LiDAR IMU data subscription
Real-time FPS statistics and data display
Configurable topic type selection
Detailed data field information output
Supported data types:
PointCloud2: LiDAR point cloud data (sensor_msgs/PointCloud2)Imu: LiDAR IMU data (sensor_msgs/Imu)
Technical implementation:
Uses SensorDataQoS configuration (
BEST_EFFORT+VOLATILE)Supports point cloud field parsing and visualization
Supports IMU quaternion, angular velocity, and linear acceleration data
Provides detailed debugging log output
Use cases:
LiDAR data acquisition and analysis
Point cloud data processing and visualization
Robot navigation and localization
SLAM algorithm development
Environmental perception and mapping
1#!/usr/bin/env python3
2"""
3Chest LiDAR data subscription example
4
5Supports subscribing to the following topics:
6 1. /aima/hal/sensor/lidar_chest_front/lidar_pointcloud
7 - Data type: sensor_msgs/PointCloud2
8 - frame_id: lidar_chest_front
9 - child_frame_id: /
10 - Content: LiDAR point cloud data
11 2. /aima/hal/sensor/lidar_chest_front/imu
12 - Data type: sensor_msgs/Imu
13 - frame_id: lidar_imu_chest_front
14 - Content: LiDAR IMU data
15
16You can select the topic type to subscribe via startup parameter --ros-args -p topic_type:=<type>:
17 - pointcloud: subscribe to LiDAR point cloud
18 - imu: subscribe to LiDAR IMU
19Default topic_type is pointcloud
20
21Examples:
22 ros2 run py_examples echo_lidar_data --ros-args -p topic_type:=pointcloud
23 ros2 run py_examples echo_lidar_data --ros-args -p topic_type:=imu
24"""
25
26import rclpy
27from rclpy.node import Node
28from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy
29from sensor_msgs.msg import PointCloud2, Imu
30from collections import deque
31import time
32
33
34class LidarChestEcho(Node):
35 def __init__(self):
36 super().__init__('lidar_chest_echo')
37
38 # Select the topic type to subscribe
39 self.declare_parameter('topic_type', 'pointcloud')
40 self.topic_type = self.get_parameter('topic_type').value
41
42 # SensorDataQoS: BEST_EFFORT + VOLATILE
43 qos = QoSProfile(
44 reliability=QoSReliabilityPolicy.BEST_EFFORT,
45 history=QoSHistoryPolicy.KEEP_LAST,
46 depth=5
47 )
48
49 # Create different subscribers based on topic_type
50 if self.topic_type == "pointcloud":
51 self.topic_name = "/aima/hal/sensor/lidar_chest_front/lidar_pointcloud"
52 self.sub_pointcloud = self.create_subscription(
53 PointCloud2, self.topic_name, self.cb_pointcloud, qos)
54 self.get_logger().info(
55 f"✅ Subscribing LIDAR PointCloud2: {self.topic_name}")
56
57 elif self.topic_type == "imu":
58 self.topic_name = "/aima/hal/sensor/lidar_chest_front/imu"
59 self.sub_imu = self.create_subscription(
60 Imu, self.topic_name, self.cb_imu, qos)
61 self.get_logger().info(
62 f"✅ Subscribing LIDAR IMU: {self.topic_name}")
63
64 else:
65 self.get_logger().error(f"Unknown topic_type: {self.topic_type}")
66 raise ValueError("Unknown topic_type")
67
68 # Internal state
69 self.last_print = self.get_clock().now()
70 self.arrivals = deque()
71
72 def update_arrivals(self):
73 """Calculate received FPS"""
74 now = self.get_clock().now()
75 self.arrivals.append(now)
76 while self.arrivals and (now - self.arrivals[0]).nanoseconds * 1e-9 > 1.0:
77 self.arrivals.popleft()
78
79 def get_fps(self):
80 """Get FPS"""
81 return len(self.arrivals)
82
83 def should_print(self):
84 """Control print frequency"""
85 now = self.get_clock().now()
86 if (now - self.last_print).nanoseconds * 1e-9 >= 1.0:
87 self.last_print = now
88 return True
89 return False
90
91 def cb_pointcloud(self, msg: PointCloud2):
92 """PointCloud2 callback (LiDAR point cloud)"""
93 self.update_arrivals()
94
95 if self.should_print():
96 stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
97
98 # Format fields info
99 fields_str = " ".join(
100 [f"{f.name}({f.datatype})" for f in msg.fields])
101
102 self.get_logger().info(
103 f"🟢 LIDAR PointCloud2 received\n"
104 f" • frame_id: {msg.header.frame_id}\n"
105 f" • stamp (sec): {stamp_sec:.6f}\n"
106 f" • width x height: {msg.width} x {msg.height}\n"
107 f" • point_step: {msg.point_step}\n"
108 f" • row_step: {msg.row_step}\n"
109 f" • fields: {fields_str}\n"
110 f" • is_bigendian: {msg.is_bigendian}\n"
111 f" • is_dense: {msg.is_dense}\n"
112 f" • data size: {len(msg.data)}\n"
113 f" • recv FPS (1s): {self.get_fps():1.1f}"
114 )
115
116 def cb_imu(self, msg: Imu):
117 """IMU callback (LiDAR IMU)"""
118 self.update_arrivals()
119
120 if self.should_print():
121 stamp_sec = msg.header.stamp.sec + msg.header.stamp.nanosec * 1e-9
122
123 self.get_logger().info(
124 f"🟢 LIDAR IMU received\n"
125 f" • frame_id: {msg.header.frame_id}\n"
126 f" • stamp (sec): {stamp_sec:.6f}\n"
127 f" • orientation: [{msg.orientation.x:.6f}, {msg.orientation.y:.6f}, {msg.orientation.z:.6f}, {msg.orientation.w:.6f}]\n"
128 f" • angular_velocity:[{msg.angular_velocity.x:.6f}, {msg.angular_velocity.y:.6f}, {msg.angular_velocity.z:.6f}]\n"
129 f" • linear_accel: [{msg.linear_acceleration.x:.6f}, {msg.linear_acceleration.y:.6f}, {msg.linear_acceleration.z:.6f}]\n"
130 f" • recv FPS (1s): {self.get_fps():.1f}"
131 )
132
133
134def main(args=None):
135 rclpy.init(args=args)
136 try:
137 node = LidarChestEcho()
138 rclpy.spin(node)
139 except KeyboardInterrupt:
140 pass
141 except Exception as e:
142 print(f"Error: {e}")
143 finally:
144 if 'node' in locals():
145 node.destroy_node()
146 rclpy.shutdown()
147
148
149if __name__ == '__main__':
150 main()
Usage:
# Subscribe to LiDAR point cloud data
ros2 run py_examples echo_lidar_data --ros-args -p topic_type:=pointcloud
# Subscribe to LiDAR IMU data
ros2 run py_examples echo_lidar_data --ros-args -p topic_type:=imu
Example output:
[INFO] [lidar_chest_echo]: ✅ Subscribing LIDAR PointCloud2: /aima/hal/sensor/lidar_chest_front/lidar_pointcloud
[INFO] [lidar_chest_echo]: 🟢 LIDAR PointCloud2 received
• frame_id: lidar_chest_front
• stamp (sec): 1234567890.123456
• width x height: 1 x 36000
• point_step: 16
• row_step: 16
• fields: x(7) y(7) z(7) intensity(7)
• is_bigendian: False
• is_dense: True
• data size: 576000
• recv FPS (1s): 10.0
6.1.14 Play video
This example uses play_video. Before running the node, you must upload the video to the robot’s Interaction Computing Unit (PC3) (you may create a directory on it to store videos, e.g. /var/tmp/videos/), and then change the video_path in the node program to the path of the video you want to play.
Attention
⚠️ Attention! The Interaction Computing Unit (PC3) is independent from the Development Computing Unit (PC2) where secondary development programs run. Audio and video files must be stored on the Interaction Computing Unit (IP: 10.0.1.42).
Audio and video files (and all parent directories up to root) must be readable by all users(new subdirectory under /var/tmp/ is recommended)
Function description By calling the PlayVideo service, the robot can play a video file from a specified path on its screen. Please ensure the video file has been uploaded to the Interaction Computing Unit, otherwise playback will fail.
1#!/usr/bin/env python3
2
3import rclpy
4import rclpy.logging
5from rclpy.node import Node
6
7from aimdk_msgs.srv import PlayVideo
8
9
10class PlayVideoClient(Node):
11 def __init__(self):
12 super().__init__('play_video_client')
13 self.client = self.create_client(
14 PlayVideo, '/face_ui_proxy/play_video')
15 self.get_logger().info('✅ PlayVideo client node created.')
16
17 # Wait for the service to become available
18 while not self.client.wait_for_service(timeout_sec=2.0):
19 self.get_logger().info('⏳ Service unavailable, waiting...')
20
21 self.get_logger().info('🟢 Service available, ready to send request.')
22
23 def send_request(self, video_path, mode, priority):
24 req = PlayVideo.Request()
25
26 req.video_path = video_path
27 req.mode = mode
28 req.priority = priority
29
30 # async call
31 self.get_logger().info(
32 f'📨 Sending request to play video: mode={mode} video={video_path}')
33 for i in range(8):
34 req.header.header.stamp = self.get_clock().now().to_msg()
35 future = self.client.call_async(req)
36 rclpy.spin_until_future_complete(self, future, timeout_sec=0.25)
37
38 if future.done():
39 break
40
41 # retry as remote peer is NOT handled well by ROS
42 self.get_logger().info(f'trying ... [{i}]')
43
44 resp = future.result()
45 if resp is None:
46 self.get_logger().error('❌ Service call not completed or timed out.')
47 return False
48
49 if resp.success:
50 self.get_logger().info(
51 f'✅ Request to play video recorded successfully: {resp.message}')
52 return True
53 else:
54 self.get_logger().error(
55 f'❌ Failed to record play-video request: {resp.message}')
56 return False
57
58
59def main(args=None):
60 rclpy.init(args=args)
61 node = None
62
63 try:
64 # video path and priority can be customized
65 video_path = "/agibot/data/home/agi/zhiyuan.mp4"
66 priority = 5
67 # input play mode
68 mode = int(input("Enter video play mode (1: play once, 2: loop): "))
69 if mode not in (1, 2):
70 raise ValueError(f'invalid mode {mode}')
71
72 node = PlayVideoClient()
73 node.send_request(video_path, mode, priority)
74 except KeyboardInterrupt:
75 pass
76 except Exception as e:
77 rclpy.logging.get_logger('main').error(
78 f'Program exited with exception: {e}')
79
80 if node:
81 node.destroy_node()
82 if rclpy.ok():
83 rclpy.shutdown()
84
85
86if __name__ == '__main__':
87 main()
6.1.15 Media file playback
This example uses play_media, which enables playback of specified media files (such as audio files) through the node, supporting audio formats including WAV and MP3.
Features:
Supports playback of multiple audio formats (WAV, MP3, etc.)
Supports priority control with configurable playback priority
Supports interruption mechanism to stop current playback
Supports custom file paths and playback parameters
Provides complete error handling and status feedback
Technical implementation:
Uses the PlayMediaFile service for media file playback
Supports priority level configuration (0–99)
Supports interruption control (is_interrupted parameter)
Provides detailed playback status feedback
Compatible with different response field formats
Use cases:
Audio file playback and media control
Voice prompts and sound effect playback
Multimedia application development
Robot interaction audio feedback
Attention
⚠️ Attention! The Interaction Computing Unit (PC3) is independent from the Development Computing Unit (PC2) where secondary development programs run. Audio and video files must be stored on the Interaction Computing Unit (IP: 10.0.1.42).
Audio and video files (and all parent directories up to root) must be readable by all users(new subdirectory under /var/tmp/ is recommended)
1#!/usr/bin/env python3
2
3import sys
4import rclpy
5import rclpy.logging
6from rclpy.node import Node
7
8from aimdk_msgs.srv import PlayMediaFile
9from aimdk_msgs.msg import TtsPriorityLevel
10
11
12class PlayMediaClient(Node):
13 def __init__(self):
14 super().__init__('play_media_client')
15 self.client = self.create_client(
16 PlayMediaFile, '/aimdk_5Fmsgs/srv/PlayMediaFile')
17 self.get_logger().info('✅ PlayMedia client node created.')
18
19 # Wait for the service to become available
20 while not self.client.wait_for_service(timeout_sec=2.0):
21 self.get_logger().info('⏳ Service unavailable, waiting...')
22
23 self.get_logger().info('🟢 Service available, ready to send request.')
24
25 def send_request(self, media_path):
26 req = PlayMediaFile.Request()
27
28 req.media_file_req.file_name = media_path
29 req.media_file_req.domain = 'demo_client' # required: caller domain
30 req.media_file_req.trace_id = 'demo' # optional
31 req.media_file_req.is_interrupted = True # interrupt same-priority
32 req.media_file_req.priority_weight = 0 # optional: 0~99
33 req.media_file_req.priority_level.value = TtsPriorityLevel.INTERACTION_L6
34
35 self.get_logger().info(
36 f'📨 Sending request to play media: {media_path}')
37 for i in range(8):
38 req.header.header.stamp = self.get_clock().now().to_msg()
39 future = self.client.call_async(req)
40 rclpy.spin_until_future_complete(self, future, timeout_sec=0.25)
41
42 if future.done():
43 break
44
45 # retry as remote peer is NOT handled well by ROS
46 self.get_logger().info(f'trying ... [{i}]')
47
48 resp = future.result()
49 if resp is None:
50 self.get_logger().error('❌ Service call not completed or timed out.')
51 return False
52
53 if resp.tts_resp.is_success:
54 self.get_logger().info('✅ Request to play media file recorded successfully.')
55 return True
56 else:
57 self.get_logger().error('❌ Failed to record play-media request.')
58 return False
59
60
61def main(args=None):
62 rclpy.init(args=args)
63 node = None
64
65 default_media = '/agibot/data/var/interaction/tts_cache/normal/demo.wav'
66 try:
67 if len(sys.argv) > 1:
68 media_path = sys.argv[1]
69 else:
70 media_path = input(
71 f'Enter media file path to play (default: {default_media}): ').strip()
72 if not media_path:
73 media_path = default_media
74
75 node = PlayMediaClient()
76 node.send_request(media_path)
77 except KeyboardInterrupt:
78 pass
79 except Exception as e:
80 rclpy.logging.get_logger('main').error(
81 f'Program exited with exception: {e}')
82
83 if node:
84 node.destroy_node()
85 if rclpy.ok():
86 rclpy.shutdown()
87
88
89if __name__ == '__main__':
90 main()
Usage:
# Play default audio file
ros2 run py_examples play_media
# Play a specified audio file
# Note: replace /path/to/your/audio_file.wav with the actual file path on the interaction unit
ros2 run py_examples play_media /path/to/your/audio_file.wav
# Play a TTS cached file
ros2 run py_examples play_media /agibot/data/var/interaction/tts_cache/normal/demo.wav
Example output:
[INFO] [play_media_file_client_min]: Waiting for service: /aimdk_5Fmsgs/srv/PlayMediaFile
[INFO] [play_media_file_client_min]: ✅ Media file playback request succeeded
Notes:
Ensure the audio file path is correct and the file exists
Supported file formats: WAV, MP3, etc.
Priority settings affect playback queue order
Interruption feature can stop the currently playing audio
The program includes complete exception handling mechanisms
6.1.16 TTS (Text-to-Speech)
This example uses play_tts, which enables the robot to speak the provided text through the node. Users can input different text according to various scenarios.
Features:
Supports command-line parameters and interactive input
Includes complete service availability checks and error handling
Supports priority control and interruption mechanism
Provides detailed playback status feedback
Core code
1#!/usr/bin/env python3
2
3import sys
4import rclpy
5import rclpy.logging
6from rclpy.node import Node
7
8from aimdk_msgs.srv import PlayTts
9from aimdk_msgs.msg import TtsPriorityLevel
10
11
12class PlayTTSClient(Node):
13 def __init__(self):
14 super().__init__('play_tts_client')
15
16 # fill in the actual service name
17 self.client = self.create_client(PlayTts, '/aimdk_5Fmsgs/srv/PlayTts')
18 self.get_logger().info('✅ PlayTts client node created.')
19
20 # Wait for the service to become available
21 while not self.client.wait_for_service(timeout_sec=2.0):
22 self.get_logger().info('⏳ Service unavailable, waiting...')
23
24 self.get_logger().info('🟢 Service available, ready to send request.')
25
26 def send_request(self, text):
27 req = PlayTts.Request()
28
29 req.tts_req.text = text
30 req.tts_req.domain = 'demo_client' # required: caller domain
31 req.tts_req.trace_id = 'demo' # optional: request id
32 req.tts_req.is_interrupted = True # required: interrupt same-priority
33 req.tts_req.priority_weight = 0
34 req.tts_req.priority_level.value = 6
35
36 self.get_logger().info(f'📨 Sending request to play tts: text={text}')
37 for i in range(8):
38 req.header.header.stamp = self.get_clock().now().to_msg()
39 future = self.client.call_async(req)
40 rclpy.spin_until_future_complete(self, future, timeout_sec=0.25)
41
42 if future.done():
43 break
44
45 # retry as remote peer is NOT handled well by ROS
46 self.get_logger().info(f'trying ... [{i}]')
47
48 resp = future.result()
49 if resp is None:
50 self.get_logger().error('❌ Service call not completed or timed out.')
51 return False
52
53 if resp.tts_resp.is_success:
54 self.get_logger().info('✅ TTS sent successfully.')
55 return True
56 else:
57 self.get_logger().error('❌ Failed to send TTS.')
58 return False
59
60
61def main(args=None):
62 rclpy.init(args=args)
63 node = None
64
65 try:
66 # get text to speak
67 if len(sys.argv) > 1:
68 text = sys.argv[1]
69 else:
70 text = input('Enter text to speak: ')
71 if not text:
72 text = 'Hello, I am AgiBot X2.'
73
74 node = PlayTTSClient()
75 node.send_request(text)
76 except KeyboardInterrupt:
77 pass
78 except Exception as e:
79 rclpy.logging.get_logger('main').error(
80 f'Program exited with exception: {e}')
81
82 if node:
83 node.destroy_node()
84 if rclpy.ok():
85 rclpy.shutdown()
86
87
88if __name__ == '__main__':
89 main()
Usage
# Play text using command-line parameters (recommended)
ros2 run py_examples play_tts "Hello, I am the Lingxi X2 robot"
# Or run without parameters and the program will prompt for input
ros2 run py_examples play_tts
Example output
[INFO] [play_tts_client_min]: ✅ Speech playback request succeeded
Notes
Ensure the TTS service is running properly
Supports Chinese and English text playback
Priority settings affect playback queue order
Interruption feature can stop the currently playing speech
Interface reference
Service:
/aimdk_5Fmsgs/srv/PlayTtsMessage:
aimdk_msgs/srv/PlayTts
6.1.17 Microphone data reception
This example uses mic_receiver, which subscribes to the /agent/process_audio_output topic to receive noise-reduced audio data from the robot, supporting both internal and external microphone audio streams, and automatically saving complete speech segments as PCM files based on VAD (Voice Activity Detection) status.
Features:
Supports simultaneous reception of multiple audio streams (internal microphone stream_id=1, external microphone stream_id=2)
Automatically detects speech start, in-progress, and end based on VAD state
Automatically saves complete speech segments as PCM files
Stores recordings categorized by timestamp and audio stream
Supports duration calculation and statistical information output
Intelligent buffer management to prevent memory leaks
Complete error handling and exception management
Detailed debugging log output
VAD state description:
0: No speech1: Speech start2: Speech in progress3: Speech end
Technical implementation:
Supports saving PCM audio files (16 kHz, 16-bit, mono)
Provides detailed log output and status monitoring
Supports real-time audio stream processing and file saving
Use cases:
Speech recognition and speech processing
Audio data acquisition and analysis
Real-time speech monitoring
Audio quality evaluation
Multi-microphone array data processing
1#!/usr/bin/env python3
2"""
3Microphone data receiving example
4
5This example subscribes to the `/agent/process_audio_output` topic to receive the robot's
6noise-suppressed audio data. It supports both the built-in microphone and the external
7microphone audio streams, and automatically saves complete speech segments as PCM files
8based on the VAD (Voice Activity Detection) state.
9
10Features:
11- Supports receiving multiple audio streams at the same time (built-in mic stream_id=1, external mic stream_id=2)
12- Automatically detects speech start / in-progress / end based on VAD state
13- Automatically saves complete speech segments as PCM files
14- Stores files categorized by timestamp and audio stream
15- Supports audio duration calculation and logging
16
17VAD state description:
18- 0: No speech
19- 1: Speech start
20- 2: Speech in progress
21- 3: Speech end
22"""
23
24import rclpy
25from rclpy.node import Node
26from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy
27from aimdk_msgs.msg import ProcessedAudioOutput, AudioVadStateType
28import os
29import time
30from datetime import datetime
31from collections import defaultdict
32from typing import Dict, List
33
34
35class AudioSubscriber(Node):
36 def __init__(self):
37 super().__init__('audio_subscriber')
38
39 # Audio buffers, stored separately by stream_id
40 # stream_id -> buffer
41 self.audio_buffers: Dict[int, List[bytes]] = defaultdict(list)
42 self.recording_state: Dict[int, bool] = defaultdict(bool)
43
44 # Create audio output directory
45 self.audio_output_dir = "audio_recordings"
46 os.makedirs(self.audio_output_dir, exist_ok=True)
47
48 # VAD state name mapping
49 self.vad_state_names = {
50 0: "No speech",
51 1: "Speech start",
52 2: "Speech in progress",
53 3: "Speech end"
54 }
55
56 # Audio stream name mapping
57 self.stream_names = {
58 1: "Built-in microphone",
59 2: "External microphone"
60 }
61
62 # QoS settings
63 # Note: deep queue to avoid missing data in a burst at start of VAD.
64 qos = QoSProfile(
65 history=QoSHistoryPolicy.KEEP_LAST,
66 depth=500,
67 reliability=QoSReliabilityPolicy.BEST_EFFORT
68 )
69
70 # Create subscriber
71 self.subscription = self.create_subscription(
72 ProcessedAudioOutput,
73 '/agent/process_audio_output',
74 self.audio_callback,
75 qos
76 )
77
78 self.get_logger().info("Start subscribing to noise-suppressed audio data...")
79
80 def audio_callback(self, msg: ProcessedAudioOutput):
81 """Audio data callback"""
82 try:
83 stream_id = msg.stream_id
84 vad_state = msg.audio_vad_state.value
85 audio_data = bytes(msg.audio_data)
86
87 self.get_logger().info(
88 f"Received audio data: stream_id={stream_id}, "
89 f"vad_state={vad_state}({self.vad_state_names.get(vad_state, 'Unknown state')}), "
90 f"audio_size={len(audio_data)} bytes"
91 )
92
93 self.handle_vad_state(stream_id, vad_state, audio_data)
94
95 except Exception as e:
96 self.get_logger().error(
97 f"Error while processing audio message: {str(e)}")
98
99 def handle_vad_state(self, stream_id: int, vad_state: int, audio_data: bytes):
100 """Handle VAD state changes"""
101 stream_name = self.stream_names.get(
102 stream_id, f"Unknown stream {stream_id}")
103 vad_name = self.vad_state_names.get(
104 vad_state, f"Unknown state {vad_state}")
105
106 self.get_logger().info(
107 f"[{stream_name}] VAD state: {vad_name} audio: {len(audio_data)} bytes"
108 )
109
110 # Speech start
111 if vad_state == 1:
112 self.get_logger().info("🎤 Speech start detected")
113 if not self.recording_state[stream_id]:
114 self.audio_buffers[stream_id].clear()
115 self.recording_state[stream_id] = True
116 if audio_data:
117 self.audio_buffers[stream_id].append(audio_data)
118
119 # Speech in progress
120 elif vad_state == 2:
121 self.get_logger().info("🔄 Speech in progress...")
122 if self.recording_state[stream_id] and audio_data:
123 self.audio_buffers[stream_id].append(audio_data)
124
125 # Speech end
126 elif vad_state == 3:
127 self.get_logger().info("✅ Speech end")
128 if self.recording_state[stream_id] and audio_data:
129 self.audio_buffers[stream_id].append(audio_data)
130
131 if self.recording_state[stream_id] and self.audio_buffers[stream_id]:
132 self.save_audio_segment(stream_id)
133 self.recording_state[stream_id] = False
134
135 # No speech
136 elif vad_state == 0:
137 if self.recording_state[stream_id]:
138 self.get_logger().info("⏹️ Reset recording state")
139 self.recording_state[stream_id] = False
140
141 # Print current buffer status
142 buffer_size = sum(len(chunk)
143 for chunk in self.audio_buffers[stream_id])
144 recording = self.recording_state[stream_id]
145 self.get_logger().debug(
146 f"[Stream {stream_id}] Buffer size: {buffer_size} bytes, recording: {recording}"
147 )
148
149 def save_audio_segment(self, stream_id: int):
150 """Save audio segment"""
151 if not self.audio_buffers[stream_id]:
152 return
153
154 # Merge all audio data
155 audio_data = b''.join(self.audio_buffers[stream_id])
156
157 # Get current timestamp
158 now = datetime.now()
159 timestamp = now.strftime("%Y%m%d_%H%M%S_%f")[:-3] # to milliseconds
160
161 # Create subdirectory by stream_id
162 stream_dir = os.path.join(self.audio_output_dir, f"stream_{stream_id}")
163 os.makedirs(stream_dir, exist_ok=True)
164
165 # Generate filename
166 stream_name = "internal_mic" if stream_id == 1 else "external_mic" if stream_id == 2 else f"stream_{stream_id}"
167 filename = f"{stream_name}_{timestamp}.pcm"
168 filepath = os.path.join(stream_dir, filename)
169
170 try:
171 # Save PCM file
172 with open(filepath, 'wb') as f:
173 f.write(audio_data)
174
175 self.get_logger().info(
176 f"Audio segment saved: {filepath} (size: {len(audio_data)} bytes)")
177
178 # Calculate audio duration (assuming 16 kHz, 16-bit, mono)
179 sample_rate = 16000
180 bits_per_sample = 16
181 channels = 1
182 bytes_per_sample = bits_per_sample // 8
183 total_samples = len(audio_data) // (bytes_per_sample * channels)
184 duration_seconds = total_samples / sample_rate
185
186 self.get_logger().info(
187 f"Audio duration: {duration_seconds:.2f} s ({total_samples} samples)")
188
189 except Exception as e:
190 self.get_logger().error(f"Failed to save audio file: {str(e)}")
191
192
193def main(args=None):
194 rclpy.init(args=args)
195 node = AudioSubscriber()
196
197 try:
198 node.get_logger().info("Listening to noise-suppressed audio data, press Ctrl+C to exit...")
199 rclpy.spin(node)
200 except KeyboardInterrupt:
201 node.get_logger().info("Interrupt signal received, exiting...")
202 finally:
203 node.destroy_node()
204 rclpy.shutdown()
205
206
207if __name__ == '__main__':
208 main()
Usage:
Run the program:
# Build the Python package colcon build --packages-select py_examples # Run the microphone receiver ros2 run py_examples mic_receiver
Directory structure:
The
audio_recordingsdirectory will be created automatically after running the nodeAudio files are stored by stream_id:
stream_1/: Internal microphone audiostream_2/: External microphone audio
File naming format:
{stream_name}_{timestamp}.pcminternal_mic_20250909_133649_738.pcm(internal microphone)external_mic_20250909_133650_123.pcm(external microphone)
Audio format: 16 kHz, 16-bit, mono PCM
Play saved PCM files:
# Play internal microphone recording aplay -r 16000 -f S16_LE -c 1 audio_recordings/stream_1/internal_mic_20250909_133649_738.pcm # Play external microphone recording aplay -r 16000 -f S16_LE -c 1 audio_recordings/stream_2/external_mic_20250909_133650_123.pcm
Convert to WAV format (optional):
# Convert to WAV format using ffmpeg ffmpeg -f s16le -ar 16000 -ac 1 -i external_mic_20250909_133649_738.pcm output.wav
Program control:
Press Ctrl+C to safely exit the program
The program automatically handles audio stream start and end
Supports processing multiple audio streams simultaneously
Example output:
Normal startup and operation:
[INFO] Started subscribing to noise-reduced audio data...
[INFO] Received audio data: stream_id=1, vad_state=0(no speech), audio_size=0 bytes
[INFO] [Internal mic] VAD state: no speech Audio data: 0 bytes
[INFO] Received audio data: stream_id=2, vad_state=0(no speech), audio_size=0 bytes
[INFO] [External mic] VAD state: no speech Audio data: 0 bytes
Speech start detected:
[INFO] Received audio data: stream_id=2, vad_state=1(speech start), audio_size=320 bytes
[INFO] [External mic] VAD state: speech start Audio data: 320 bytes
[INFO] 🎤 Speech start detected
Speech processing:
[INFO] Received audio data: stream_id=2, vad_state=2(speech in progress), audio_size=320 bytes
[INFO] [External mic] VAD state: speech in progress Audio data: 320 bytes
[INFO] 🔄 Speech in progress...
[INFO] Received audio data: stream_id=2, vad_state=2(speech in progress), audio_size=320 bytes
[INFO] [External mic] VAD state: speech in progress Audio data: 320 bytes
[INFO] 🔄 Speech in progress...
Speech end and save:
[INFO] Received audio data: stream_id=2, vad_state=3(speech end), audio_size=320 bytes
[INFO] [External mic] VAD state: speech end Audio data: 320 bytes
[INFO] ✅ Speech ended
[INFO] Audio segment saved: audio_recordings/stream_2/external_mic_20250909_133649_738.pcm (size: 960 bytes)
[INFO] Audio duration: 0.06 seconds (480 samples)
Simultaneous multi-stream processing:
[INFO] Received audio data: stream_id=1, vad_state=1(speech start), audio_size=320 bytes
[INFO] [Internal mic] VAD state: speech start Audio data: 320 bytes
[INFO] 🎤 Speech start detected
[INFO] Received audio data: stream_id=2, vad_state=1(speech start), audio_size=320 bytes
[INFO] [External mic] VAD state: speech start Audio data: 320 bytes
[INFO] 🎤 Speech start detected
Program exit:
^C[INFO] Interrupt signal received, exiting...
[INFO] Program exited safely
Notes:
The program supports processing multiple audio streams (internal and external microphones)
Each audio stream has an independent buffer and recording state
Audio files are automatically saved by timestamp and audio stream
The program includes complete error handling mechanisms to ensure stable operation
6.1.18 Emoji (expression) control
This example uses play_emoji, which enables the robot to display a specified expression. Users can select expressions from the available list. For the full expression list, refer to the expression list
1#!/usr/bin/env python3
2
3import rclpy
4import rclpy.logging
5from rclpy.node import Node
6
7from aimdk_msgs.srv import PlayEmoji
8
9
10class PlayEmojiClient(Node):
11 def __init__(self):
12 super().__init__('play_emoji_client')
13 self.client = self.create_client(
14 PlayEmoji, '/face_ui_proxy/play_emoji')
15 self.get_logger().info('✅ PlayEmoji client node created.')
16
17 # Wait for the service to become available
18 while not self.client.wait_for_service(timeout_sec=2.0):
19 self.get_logger().info('⏳ Service unavailable, waiting...')
20
21 self.get_logger().info('🟢 Service available, ready to send request.')
22
23 def send_request(self, emoji: int, mode: int, priority: int):
24 req = PlayEmoji.Request()
25
26 req.emotion_id = int(emoji)
27 req.mode = int(mode)
28 req.priority = int(priority)
29
30 self.get_logger().info(
31 f'📨 Sending request to play emoji: id={emotion_id}, mode={mode}, priority={priority}')
32 for i in range(8):
33 req.header.header.stamp = self.get_clock().now().to_msg()
34 future = self.client.call_async(req)
35 rclpy.spin_until_future_complete(self, future, timeout_sec=0.25)
36
37 if future.done():
38 break
39
40 # retry as remote peer is NOT handled well by ROS
41 self.get_logger().info(f'trying ... [{i}]')
42
43 resp = future.result()
44 if resp is None:
45 self.get_logger().error('❌ Service call not completed or timed out.')
46 return False
47
48 if resp.success:
49 self.get_logger().info(
50 f'✅ Emoji played successfully: {resp.message}')
51 return True
52 else:
53 self.get_logger().error(f'❌ Failed to play emoji: {resp.message}')
54 return False
55
56
57def main(args=None):
58 rclpy.init(args=args)
59 node = None
60
61 # Interactive input, same as the original C++ version
62 try:
63 emotion = int(
64 input("Enter emoji ID: 1-blink, 60-bored, 70-abnormal, 80-sleeping, 90-happy ... 190-double angry, 200-adore: "))
65 mode = int(input("Enter play mode (1: play once, 2: loop): "))
66 if mode not in (1, 2):
67 raise ValueError("invalid mode")
68 priority = 10 # default priority
69
70 node = PlayEmojiClient()
71 node.send_request(emotion, mode, priority)
72 except KeyboardInterrupt:
73 pass
74 except Exception as e:
75 rclpy.logging.get_logger('main').error(
76 f'Program exited with exception: {e}')
77
78 if node:
79 node.destroy_node()
80 if rclpy.ok():
81 rclpy.shutdown()
82
83
84if __name__ == '__main__':
85 main()
6.1.19 LED light strip control
Function description: Demonstrates how to control the robot’s LED light strip, supporting multiple display modes and custom colors.
Core code:
1#!/usr/bin/env python3
2
3import sys
4import rclpy
5import rclpy.logging
6from rclpy.node import Node
7
8from aimdk_msgs.msg import CommonRequest
9from aimdk_msgs.srv import LedStripCommand
10
11
12class PlayLightsClient(Node):
13 def __init__(self):
14 super().__init__('play_lights_client')
15
16 # create service client
17 self.client = self.create_client(
18 LedStripCommand, '/aimdk_5Fmsgs/srv/LedStripCommand')
19
20 self.get_logger().info('✅ PlayLights client node created.')
21
22 # Wait for the service to become available
23 while not self.client.wait_for_service(timeout_sec=2.0):
24 self.get_logger().info('⏳ Service unavailable, waiting...')
25
26 self.get_logger().info('🟢 Service available, ready to send request.')
27
28 def send_request(self, led_mode, r, g, b):
29 """Send LED control request"""
30 # create request
31 request = LedStripCommand.Request()
32 request.led_strip_mode = led_mode
33 request.r = r
34 request.g = g
35 request.b = b
36
37 # send request
38 # Note: LED strip is slow to response (up to ~5s)
39 self.get_logger().info(
40 f'📨 Sending request to control led strip: mode={led_mode}, RGB=({r}, {g}, {b})')
41 for i in range(4):
42 request.request.header.stamp = self.get_clock().now().to_msg()
43 future = self.client.call_async(request)
44 rclpy.spin_until_future_complete(self, future, timeout_sec=5)
45
46 if future.done():
47 break
48
49 # retry as remote peer is NOT handled well by ROS
50 self.get_logger().info(f'trying ... [{i}]')
51
52 response = future.result()
53 if response is None:
54 self.get_logger().error('❌ Service call not completed or timed out.')
55 return False
56
57 if response.status_code == 0:
58 self.get_logger().info('✅ LED strip command sent successfully.')
59 return True
60 else:
61 self.get_logger().error(
62 f'❌ LED strip command failed with status: {response.status_code}')
63 return False
64
65
66def main(args=None):
67 rclpy.init(args=args)
68 node = None
69
70 try:
71 # get command line args
72 if len(sys.argv) > 4:
73 # use CLI args
74 led_mode = int(sys.argv[1])
75 if led_mode not in (0, 1, 2, 3):
76 raise ValueError("invalid mode")
77 r = int(sys.argv[2])
78 if r < 0 or r > 255:
79 raise ValueError("invalid R value")
80 g = int(sys.argv[3])
81 if g < 0 or g > 255:
82 raise ValueError("invalid G value")
83 b = int(sys.argv[4])
84 if b < 0 or b > 255:
85 raise ValueError("invalid B value")
86 else:
87 # interactive input
88 print("=== LED strip control example ===")
89 print("Select LED strip mode:")
90 print("0 - Steady on")
91 print("1 - Breathing (4s cycle, sine brightness)")
92 print("2 - Blinking (1s cycle, 0.5s on, 0.5s off)")
93 print("3 - Flowing (2s cycle, light up from left to right)")
94
95 led_mode = int(input("Enter mode (0-3): "))
96 if led_mode not in (0, 1, 2, 3):
97 raise ValueError("invalid mode")
98
99 print("\nSet RGB color values (0-255):")
100 r = int(input("Red (R): "))
101 if r < 0 or r > 255:
102 raise ValueError("invalid R value")
103 g = int(input("Green (G): "))
104 if g < 0 or g > 255:
105 raise ValueError("invalid G value")
106 b = int(input("Blue (B): "))
107 if b < 0 or b > 255:
108 raise ValueError("invalid B value")
109
110 node = PlayLightsClient()
111 node.send_request(led_mode, r, g, b)
112 except KeyboardInterrupt:
113 pass
114 except Exception as e:
115 rclpy.logging.get_logger('main').error(
116 f'Program exited with exception: {e}')
117
118 if node:
119 node.destroy_node()
120 if rclpy.ok():
121 rclpy.shutdown()
122
123
124if __name__ == '__main__':
125 main()
Usage instructions:
# Build
colcon build --packages-select py_examples
# Run interactively
ros2 run py_examples play_lights
# Run with command-line parameters
ros2 run py_examples play_lights 1 255 0 0 # Mode 1, red
Example output:
=== LED Light Strip Control Example ===
Please select LED mode:
0 - Constant mode
1 - Breathing mode (4s cycle, sinusoidal brightness)
2 - Blinking mode (1s cycle, 0.5s on, 0.5s off)
3 - Flowing mode (2s cycle, lights activate from left to right)
Enter mode (0-3): 1
Please set RGB values (0-255):
Red component (R): 255
Green component (G): 0
Blue component (B): 0
Sending LED control command...
Mode: 1, Color: RGB(255, 0, 0)
✅ LED strip command sent successfully
Technical features:
Supports four LED display modes
Custom RGB color configuration
Synchronous service invocation
Command-line parameter support
Input parameter validation
User-friendly interactive interface