Commit 68a73749 authored by Jakob Dommaschk's avatar Jakob Dommaschk
Browse files

merge new landing controller structure

parents ec2094ea b88642eb
import math
import rclpy
from rclpy.node import Node
from std_msgs.msg import Header
from geometry_msgs.msg import Transform, TransformStamped, Vector3, Quaternion
from tf2_msgs.msg import TFMessage
from px4_msgs.msg import VehicleAttitude
class MinimalSubscriber(Node):
def __init__(self):
super().__init__('minimal_subscriber')
self.sub_att = self.create_subscription(
VehicleAttitude,
'VehicleAttitude_PubSubTopic',
self.process_att_callback,
10)
self.pub_tf2 = self.create_publisher(TFMessage, "/tf", 1)
def process_att_callback(self, msg):
T = Transform()
T.rotation = numpy_ndarray_to_Quaternion(msg.q)
TF2 = TFMessage()
Tstamped = TransformStamped()
Theader = Header()
Theader.stamp = self.get_clock().now().to_msg()
Theader.frame_id = "map"
Tstamped.transform = T
Tstamped.child_frame_id = "pixhawk"
Tstamped.header = Theader
TF2.transforms = [Tstamped]
self.pub_tf2.publish(TF2)
def main(args=None):
rclpy.init(args=args)
minimal_subscriber = MinimalSubscriber()
rclpy.spin(minimal_subscriber)
# Destroy the node explicitly
# (optional - otherwise it will be done automatically
# when the garbage collector destroys the node object)
minimal_subscriber.destroy_node()
rclpy.shutdown()
def numpy_ndarray_to_Quaternion(ndarr):
Q = Quaternion()
Q.w = ndarr[0].astype(float)
Q.x = ndarr[1].astype(float)
Q.y = ndarr[2].astype(float)
Q.z = ndarr[3].astype(float)
return Q
if __name__ == '__main__':
main()
......@@ -6,6 +6,7 @@ import launch_ros.actions
def generate_launch_description():
return launch.LaunchDescription([
# Image Processing Nodes
launch_ros.actions.Node(
package='image_processing',
namespace='',
......@@ -18,6 +19,7 @@ def generate_launch_description():
executable='v4l2_camera_node',
output='screen',
name=['camera_node'])
# launch.actions.ExecuteProcess(
# cmd=['ros2', 'bag', 'record', '/image_raw', '/image_vectors'],
# output='screen'
......
import math
import rclpy
from rclpy.node import Node
from rclpy.time import Time
from std_msgs.msg import Header
from geometry_msgs.msg import Transform, TransformStamped, Quaternion, Vector3
from tf2_msgs.msg import TFMessage
from px4_msgs.msg import VehicleLocalPosition
from tf2_ros import Buffer
from tf2_ros.transform_listener import TransformListener
from tf2_ros.transform_broadcaster import TransformBroadcaster
class TestArucoPublisher(Node):
"""
This node is used to publish artificial aruco markers to test the control
algorithm without the image processing node.
"""
def __init__(self):
super().__init__('aruco_test_publisher')
# parameters
self.OMEGA = .1
self.F = 10.
# inits
self.angle = 0.
# set up transform listener
self.tf_buf = Buffer()
self.tf_listener = TransformListener(self.tf_buf, self, spin_thread=False)
# define position of aruco marker
#self.init_static_frame('world', 'aruco_marker_internal',
# Vector3(),
# Quaternion(x=1.,w=0.)) # z-axis of markers points upwards
# set up transform broadcaster (for debug)
self.tf_broadcaster = TransformBroadcaster(self)
# set up transform broadcaster (for debug)
self.tf_broadcaster = TransformBroadcaster(self)
# subscription callbacks
self.sub_att = self.create_subscription(
VehicleLocalPosition,
'VehicleLocalPosition_PubSubTopic',
self.position_callback,
10)
# set up timers
self.control_timer = self.create_timer(1/self.F, self.update_marker)
def position_callback(self, msg):
trans = Vector3(x=msg.x, y=msg.y, z=msg.z)
tf = Transform(translation=trans)
tf_header = Header(stamp=self.get_clock().now().to_msg(), frame_id='world')
tf_stamped = TransformStamped(transform=tf, header=tf_header, child_frame_id='ned')
self.tf_buf.set_transform(tf_stamped, 'default_authority')
self.publish_aruco_frame()
def init_static_frame(self, source: str, target: str, trans: Vector3, rot: Quaternion):
tf = Transform(translation=trans, rotation=rot)
tf_header = Header(stamp=self.get_clock().now().to_msg(), frame_id=source)
tf_stamped = TransformStamped(transform=tf, header=tf_header, child_frame_id=target)
self.tf_buf.set_transform_static(tf_stamped, 'default_authority')
def set_marker(self, p: Vector3):
tf = Transform(translation=p)
tf_header = Header(stamp=self.get_clock().now().to_msg(), frame_id='world')
tf_stamped = TransformStamped(transform=tf, header=tf_header, child_frame_id='aruco_marker_internal')
self.tf_buf.set_transform_static(tf_stamped, 'default_authority')
def publish_aruco_frame(self):
if self.tf_buf.can_transform('ned', 'aruco_marker_internal', Time()):
tf = self.tf_buf.lookup_transform('ned', 'aruco_marker_internal', Time())
tf.header.stamp = self.get_clock().now().to_msg()
tf.header.frame_id = 'ned_debug'
tf.child_frame_id = 'aruco_marker'
self.tf_broadcaster.sendTransform(tf)
def update_marker(self):
self.angle = self.angle + self.OMEGA/self.F
R = 5
marker = Vector3(
x = R*math.sin(self.angle),
y = R*math.cos(self.angle),
z = 0.
)
self.set_marker(marker)
def main(args=None):
rclpy.init(args=args)
node = TestArucoPublisher()
rclpy.spin(node)
# Destroy the node explicitly
# (optional - otherwise it will be done automatically
# when the garbage collector destroys the node object)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
from geometry_msgs.msg import Vector3, Transform, Quaternion
def vec_add(v1: Vector3, v2: Vector3):
v = Vector3()
v.x = v1.x + v2.x
v.y = v1.y + v2.y
v.z = v1.z + v2.z
return v
def quat_multiply(q1: Quaternion, q2: Quaternion):
q = Quaternion()
q.w = q1.w*q2.w - q1.x*q2.x - q1.y*q2.y - q1.z*q2.z
q.x = q1.w*q2.x + q1.x*q2.w + q1.y*q2.z - q1.z*q2.y
q.y = q1.w*q2.y - q1.x*q2.z + q1.y*q2.w + q1.z*q2.x
q.z = q1.w*q2.z + q1.x*q2.y - q1.y*q2.x + q1.z*q2.w
return q
def quat_inv(q: Quaternion):
q_ret = Quaternion()
q_ret.w = q.w
q_ret.x = -q.x
q_ret.y = -q.y
q_ret.z = -q.z
return q_ret
def quat2vec(q: Quaternion):
v = Vector3()
v.x = q.x
v.y = q.y
v.z = q.z
return v
def vec2quat(v: Vector3):
q = Quaternion()
q.w = 0.
q.x = v.x
q.y = v.y
q.z = v.z
return q
def quat_rotate_vec(v: Vector3, q_rot:Quaternion):
return quat2vec( quat_multiply(quat_multiply(q_rot, vec2quat(v)), quat_inv(q_rot) ) )
def transform_apply(v: Vector3, tf: Transform):
""" Transform object by the tf transform tf
:param object: object to be transform
:param tf: Transform Transformation that shall be applied
:return: transformed object
"""
return vec_add(quat_rotate_vec(v, tf.rotation), tf.translation)
\ No newline at end of file
......@@ -9,7 +9,7 @@ from px4_msgs.msg import Timesync, OffboardControlMode, TrajectorySetpoint, Vehi
class VelocityCommander(Node):
# parameters
# constants
TOPICS = {
"CMD_VEL": 'cmd_vel',
"PX4_TIME": 'Timesync_PubSubTopic',
......@@ -23,8 +23,6 @@ class VelocityCommander(Node):
super().__init__('velocity_commander')
# class variables
self.keep_alive = True
self.key = -1
self.t_sync = 0
self.vel = Vector3()
self.pub_counter = 0
......@@ -50,12 +48,15 @@ class VelocityCommander(Node):
self.pub_vel_timer = self.create_timer(.1, self.timer_callback)
def timer_callback(self):
<<<<<<< HEAD
# change to offboard mode after 10 loops
if self.pub_counter == 10:
# set offboard mode
self.pub_vehicle_command(VehicleCommand.VEHICLE_CMD_DO_SET_MODE, 1., 6.)
#self.arm()
=======
>>>>>>> velocity_controller
self.pub_offb_control_mode()
self.pub_trajectory_setpoint()
......
import rclpy
from rclpy.duration import Duration
from rclpy.time import Time
from rclpy.node import Node
from time import sleep
from std_msgs.msg import Header, Float32
from geometry_msgs.msg import TransformStamped, Twist, Vector3, Transform, Quaternion
from px4_msgs.msg import VehicleAttitude
from tf2_ros import Buffer
from tf2_ros.transform_listener import TransformListener
from tf2_ros.transform_broadcaster import TransformBroadcaster
from landing_controller.velocity_commander import VelocityCommander
from landing_controller.vector_math import *
class VelocityController(Node):
# namings
TOPICS = {
"P_GAIN" : "/landing_controller/p_gain"
}
TOPICS["CMD_VEL"] = VelocityCommander.TOPICS["CMD_VEL"]
def __init__(self):
super().__init__('velocity_controller')
# parameters
# static transformation from pixhawk to camera frame
TF_PIX_TO_CAMERA = Transform(
Vector3(z=.1), # Translation from Pixhawk to Camera
Quaternion() # Rotation from Pixhawk to Camera
)
self.gains = {"P": 1.} # Initial controller gains
# class variables
self.aruco_timeout_reset = True
# set up transform listener
self.tf_buf = Buffer()
self.tf_listener = TransformListener(self.tf_buf, self, spin_thread=False)
# /map frame (https://www.ros.org/reps/rep-0103.html#axis-orientation)
# to North-East-Down frame
self.init_static_frame('map', 'ned', Vector3(),
Quaternion(x=1., w=0.)) # Rotation of 180 degrees around x-axis
# pixhawk to camera -> dependent on drone construction
self.init_static_frame('pixhawk', 'camera',
self.TF_PIX_TO_CAMERA.Translation, self.TF_PIX_TO_CAMERA.Rotation)
# set up transform broadcaster (for debug)
self.tf_broadcaster = TransformBroadcaster(self)
# subscription callbacks
self.sub_att = self.create_subscription(
VehicleAttitude,
'VehicleAttitude_PubSubTopic',
self.attitude_callback,
10)
self.sub_ctrl_gain = self.create_subscription(
Float32,
self.TOPICS["P_GAIN"],
self.process_p_gain_callback,
1)
# init publications
self.vel_publisher = self.create_publisher(Twist, self.TOPICS["CMD_VEL"], 1)
# timer callbacks
self.control_timer = self.create_timer(.1, self.control)
def attitude_callback(self, msg: VehicleAttitude):
# rotation in msg is rotation from FRONT-RIGHT-DOWN (FRD) to
# NORTH-EAST-DOWN (NED)
# (see https://github.com/PX4/PX4-Autopilot/blob/master/msg/vehicle_attitude.msg)
tf = Transform(rotation=numpy_ndarray_to_Quaternion(msg.q))
tf_header = Header(stamp=self.get_clock().now().to_msg(), frame_id='ned')
tf_stamped = TransformStamped(transform=tf, header=tf_header, child_frame_id='pixhawk')
self.tf_buf.set_transform(tf_stamped, 'default_authority')
self.publish_debug_frames()
def process_p_gain_callback(self, msg: Float32):
self.gains["P"] = msg.data
def init_static_frame(self, source: str, target: str, trans: Vector3, rot: Quaternion):
tf = Transform(translation=trans, rotation=rot)
tf_header = Header(stamp=self.get_clock().now().to_msg(), frame_id=source)
tf_stamped = TransformStamped(transform=tf, header=tf_header, child_frame_id=target)
self.tf_buf.set_transform_static(tf_stamped, 'default_authority')
def control(self):
"""Control algorithm to land drone
:param Vector3 e: position error between pixhawk and aruco marker in ned frame
"""
if self.tf_buf.can_transform('ned','aruco_marker', Time()):
# determine control error
e_tf = self.tf_buf.lookup_transform('ned','aruco_marker', Time())
e = Vector3(x=e_tf.transform.translation.x, y=e_tf.transform.translation.y)
# apply controller
print(self.gains["P"])
u = (e.x * self.gains["P"], e.y * self.gains["P"])
print(e)
print(u)
self.pub_xy_vel(u)
def timeout(self):
self.pub_xy_vel((0.,0.))
def pub_xy_vel(self, v: tuple):
"""Publish twist message on TOPICS["CMD_VEL"] where the two entries of
v correspond to the commanded x- and y velocity.
:param tuple v: x and y velocity to be commanded to pixhawk
"""
if len(v) != 2:
self.get_logger().info("Wrong number of entries in v input of send_xy_vel.")
v = (0.,0.)
T = Twist()
T.linear.x = v[0]
T.linear.y = v[1]
self.vel_publisher.publish(T)
def publish_debug_frames(self):
frames = ['ned', 'camera', 'pixhawk', 'aruco_marker']
for frame in frames:
if self.tf_buf.can_transform('map',frame, Time()):
tf = self.tf_buf.lookup_transform('map',frame, Time())
tf.header.stamp = self.get_clock().now().to_msg()
tf.child_frame_id = frame + '_debug'
self.tf_broadcaster.sendTransform(tf)
def numpy_ndarray_to_Quaternion(ndarr):
Q = Quaternion()
Q.w = ndarr[0].astype(float)
Q.x = ndarr[1].astype(float)
Q.y = ndarr[2].astype(float)
Q.z = ndarr[3].astype(float)
return Q
def main(args=None):
rclpy.init(args=args)
vel_ctrl = VelocityController()
rclpy.spin(vel_ctrl)
# Destroy the node explicitly
# (optional - otherwise it will be done automatically
# when the garbage collector destroys the node object)
vel_ctrl.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
......@@ -22,6 +22,13 @@ def generate_launch_description():
return launch.LaunchDescription([
launch_ros.actions.Node(
package='landing_controller',
namespace='',
executable='velocity_controller',
output='screen',
name=['velocity_controller']),
launch_ros.actions.Node(
package='landing_controller',
namespace='',
......
......@@ -24,7 +24,9 @@ setup(
tests_require=['pytest'],
entry_points={
'console_scripts': [
'velocity_commander = landing_controller.velocity_commander:main'
'velocity_commander = landing_controller.velocity_commander:main',
'velocity_controller = landing_controller.velocity_controller:main',
'test_pub_aruco= landing_controller.test_pub_aruco:main'
],
},
)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment