|
|
@@ -0,0 +1,93 @@ |
|
|
|
import rclpy |
|
|
|
import rclpy.node |
|
|
|
|
|
|
|
import servo_interface.msg |
|
|
|
|
|
|
|
import servo.servo_mapping |
|
|
|
import servo.servo_calibration |
|
|
|
|
|
|
|
import board |
|
|
|
import busio |
|
|
|
import adafruit_pca9685 |
|
|
|
|
|
|
|
|
|
|
|
class ServoNode(rclpy.node.Node): |
|
|
|
def __init__(self): |
|
|
|
super().__init__('servo_node') |
|
|
|
self.subscription = self.create_subscription( |
|
|
|
servo_interface.msg.SetServo, |
|
|
|
'set_servo', |
|
|
|
self.listener_callback, |
|
|
|
10 |
|
|
|
) |
|
|
|
|
|
|
|
self.i2c = busio.I2C(board.SCL, board.SDA) |
|
|
|
|
|
|
|
self.pca9685 = {} |
|
|
|
self.pwm = {} |
|
|
|
|
|
|
|
for label in servo.servo_mapping.servo_mapping: |
|
|
|
m = servo.servo_mapping.servo_mapping[label] |
|
|
|
|
|
|
|
address = m['address'] |
|
|
|
channel = m['channel'] |
|
|
|
|
|
|
|
if address not in self.pca9685: |
|
|
|
self.get_logger().info(f'Creating PCA9685 object for i2c address {address}') |
|
|
|
self.pca9685[address] = adafruit_pca9685.PCA9685(self.i2c, address=address) |
|
|
|
self.pca9685[address].frequency = 50 |
|
|
|
|
|
|
|
self.get_logger().debug(f'Adding {address}:{channel} as {label}') |
|
|
|
self.pwm[label] = self.pca9685[address].channels[channel] |
|
|
|
|
|
|
|
self.calibration = servo.servo_calibration.servo_calibration |
|
|
|
self.get_logger().debug(f'Calibration for {label}: {self.calibration[label]}') |
|
|
|
|
|
|
|
|
|
|
|
def listener_callback(self, msg): |
|
|
|
self.get_logger().info(f'label: {msg.label} powered: {msg.powered} angle: {msg.angle}') |
|
|
|
|
|
|
|
if msg.label not in self.pwm: |
|
|
|
self.get_logger().warn(f'no pwm channel associated to {msg.label}') |
|
|
|
return |
|
|
|
|
|
|
|
label = msg.label |
|
|
|
powered = msg.powered |
|
|
|
angle = msg.angle |
|
|
|
|
|
|
|
pwm_channel = self.pwm[label] |
|
|
|
calibration = servo.servo_calibration.servo_calibration[label] |
|
|
|
|
|
|
|
if angle < 0: |
|
|
|
self.get_logger().warn(f'angle must be between 0 and 180: {angle}, forcing 0') |
|
|
|
angle = 0 |
|
|
|
if angle > 180: |
|
|
|
self.get_logger().warn(f'angle must be between 0 and 180: {angle}, forcing 180') |
|
|
|
angle = 180 |
|
|
|
|
|
|
|
|
|
|
|
usec = calibration['intercept'] + angle * calibration['slope'] |
|
|
|
self.get_logger().info(f'usec: {usec}') |
|
|
|
|
|
|
|
duty_cycle = int(usec * (0xffff / 20000)) |
|
|
|
self.get_logger().info(f'duty_cycle: {duty_cycle}') |
|
|
|
|
|
|
|
if powered: |
|
|
|
pwm_channel.duty_cycle = duty_cycle |
|
|
|
else: |
|
|
|
pwm_channel.duty_cycle = 0 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def main(args=None): |
|
|
|
rclpy.init(args=args) |
|
|
|
servo_node = ServoNode() |
|
|
|
rclpy.spin(servo_node) |
|
|
|
servo_node.destroy_node() |
|
|
|
rclpy.shutdown() |
|
|
|
|
|
|
|
|
|
|
|
if __name__ == '__main__': |
|
|
|
main() |