Commit 225c6ead authored by Jakob Dommaschk's avatar Jakob Dommaschk
Browse files

new architecture for landing controller

parent d4207701
import math
import rclpy
from rclpy.node import Node
from std_msgs.msg import Header
from geometry_msgs.msg import Transform, TransformStamped, Quaternion
from tf2_msgs.msg import TFMessage
from px4_msgs.msg import VehicleAttitude
class OrientationPublisher(Node):
def __init__(self):
super().__init__('minimal_subscriber')
self.sub_att = self.create_subscription(
VehicleAttitude,
'VehicleAttitude_PubSubTopic',
self.process_att_callback,
10)
self.pub_tf2 = self.create_publisher(TFMessage, "/tf", 1)
def process_att_callback(self, msg):
# rotation in msg is rotation from FRONT-REAR-DOWN (FRD) to
# NORTH-EAST-DOWN (NED)
# (see https://github.com/PX4/PX4-Autopilot/blob/master/msg/vehicle_attitude.msg)
T = Transform()
T.rotation = numpy_ndarray_to_Quaternion(msg.q)
TF2 = TFMessage()
Tstamped = TransformStamped()
Theader = Header()
Theader.stamp = self.get_clock().now().to_msg()
Theader.frame_id = "ned"
Tstamped.transform = T
Tstamped.child_frame_id = "pixhawk"
Tstamped.header = Theader
TF2.transforms = [Tstamped]
self.pub_tf2.publish(TF2)
def rot_x_180d(Q: Quaternion):
"""Returns rotation of quaternion of 180 degree around x-axis
"""
q_x_180d = Quaternion()
q_x_180d.x = 1.0
q_x_180d.w = 0.0
Q_new = Quaternion
Q
Q_new = OrientationPublisher.quat_mult(Q, q_x_180d)
return Q_new
def quat_mult(q1: Quaternion, q2: Quaternion):
"""Returns quaternion product of q1 and q2
"""
# that should actually be integrated in ROS but I couldn't find it
Q = Quaternion()
Q.w = q1.w*q2.w - q1.x*q2.x - q1.y*q2.y - q1.z*q2.z
Q.x = q1.w*q2.x + q1.x*q2.w + q1.y*q2.z - q1.z*q2.y
Q.y = q1.w*q2.y - q1.x*q2.z + q1.y*q2.w + q1.z*q2.x
Q.z = q1.w*q2.z + q1.x*q2.y - q1.y*q2.x + q1.z*q2.w
return Q
def main(args=None):
rclpy.init(args=args)
orient_pub = OrientationPublisher()
rclpy.spin(orient_pub)
# Destroy the node explicitly
# (optional - otherwise it will be done automatically
# when the garbage collector destroys the node object)
orient_pub.destroy_node()
rclpy.shutdown()
def numpy_ndarray_to_Quaternion(ndarr):
Q = Quaternion()
Q.w = ndarr[0].astype(float)
Q.x = ndarr[1].astype(float)
Q.y = ndarr[2].astype(float)
Q.z = ndarr[3].astype(float)
return Q
if __name__ == '__main__':
main()
......@@ -20,34 +20,6 @@ def generate_launch_description():
output='screen',
name=['camera_node']),
# Transformation publishers
launch_ros.actions.Node(
package='image_processing',
namespace='',
executable='orientation_publisher',
output='screen',
name=['tf2_pub_orientation_pixhawk']),
# /map frame (https://www.ros.org/reps/rep-0103.html#axis-orientation)
# to North-East-Down frame
launch_ros.actions.Node(
package='tf2_ros',
namespace='',
executable='static_transform_publisher',
output='screen',
name=['tf2_pub_orientation_NED'],
arguments=["0","0","0",
"0.","0.","3.141593",
"/map", "/ned"]),
launch_ros.actions.Node(
package='tf2_ros',
namespace='',
executable='static_transform_publisher',
output='screen',
name=['tf2_pub_orientation_camera'],
arguments=["0","0",".1", # pos of camera relative to pixhawk
"0","0","0", # rotation (yaw, pitch, roll) of camera relative to pixhawk (should be zero)
"/pixhawk", "/camera"]),
# ROS bag
launch.actions.ExecuteProcess(
cmd=['ros2', 'bag', 'record', '/image_raw', '/image_vectors'],
......
......@@ -2,34 +2,110 @@ import math
import rclpy
from rclpy.node import Node
from rclpy.time import Time
from std_msgs.msg import Header
from geometry_msgs.msg import Transform, TransformStamped, Quaternion
from geometry_msgs.msg import Transform, TransformStamped, Quaternion, Vector3
from tf2_msgs.msg import TFMessage
from px4_msgs.msg import VehicleAttitude
from px4_msgs.msg import VehicleLocalPosition
from tf2_ros import Buffer
from tf2_ros.transform_listener import TransformListener
from tf2_ros.transform_broadcaster import TransformBroadcaster
class TestArucoPublisher(Node):
"""
This node is used to publish artificial aruco markers to test the control
algorithm without the image processing node.
"""
def __init__(self):
super().__init__('minimal_subscriber')
super().__init__('aruco_test_publisher')
# parameters
self.OMEGA = .1
self.F = 10.
# inits
self.angle = 0.
# set up transform listener
self.tf_buf = Buffer()
self.tf_listener = TransformListener(self.tf_buf, self, spin_thread=False)
# define position of aruco marker
#self.init_static_frame('world', 'aruco_marker_internal',
# Vector3(),
# Quaternion(x=1.,w=0.)) # z-axis of markers points upwards
# set up transform broadcaster (for debug)
self.tf_broadcaster = TransformBroadcaster(self)
# set up transform broadcaster (for debug)
self.tf_broadcaster = TransformBroadcaster(self)
# subscription callbacks
self.sub_att = self.create_subscription(
VehicleLocalPosition,
'VehicleLocalPosition_PubSubTopic',
self.position_callback,
10)
# set up timers
self.control_timer = self.create_timer(1/self.F, self.update_marker)
def position_callback(self, msg):
trans = Vector3(x=msg.x, y=msg.y, z=msg.z)
tf = Transform(translation=trans)
tf_header = Header(stamp=self.get_clock().now().to_msg(), frame_id='world')
tf_stamped = TransformStamped(transform=tf, header=tf_header, child_frame_id='ned')
self.tf_buf.set_transform(tf_stamped, 'default_authority')
self.publish_aruco_frame()
def init_static_frame(self, source: str, target: str, trans: Vector3, rot: Quaternion):
tf = Transform(translation=trans, rotation=rot)
tf_header = Header(stamp=self.get_clock().now().to_msg(), frame_id=source)
tf_stamped = TransformStamped(transform=tf, header=tf_header, child_frame_id=target)
self.tf_buf.set_transform_static(tf_stamped, 'default_authority')
def set_marker(self, p: Vector3):
tf = Transform(translation=p)
tf_header = Header(stamp=self.get_clock().now().to_msg(), frame_id='world')
tf_stamped = TransformStamped(transform=tf, header=tf_header, child_frame_id='aruco_marker_internal')
self.tf_buf.set_transform_static(tf_stamped, 'default_authority')
self.pub_tf2 = self.create_publisher(TFMessage, "/tf", 1)
def publish_aruco_frame(self):
if self.tf_buf.can_transform('ned', 'aruco_marker_internal', Time()):
tf = self.tf_buf.lookup_transform('ned', 'aruco_marker_internal', Time())
self.pub_tf_timer = self.create_timer(.1, self.pub_tf)
tf.header.stamp = self.get_clock().now().to_msg()
tf.header.frame_id = 'ned_debug'
tf.child_frame_id = 'aruco_marker'
self.tf_broadcaster.sendTransform(tf)
def pub_tf(self):
tfm = TFMessage()
tf = TransformStamped()
def update_marker(self):
self.angle = self.angle + self.OMEGA/self.F
tf.header.frame_id = "camera"
tf.child_frame_id = "aruco_marker"
tf.header.stamp = self.get_clock().now().to_msg()
tf.transform.translation.x = 1.
R = 5
marker = Vector3(
x = R*math.sin(self.angle),
y = R*math.cos(self.angle),
z = 0.
)
tfm.transforms = [tf]
self.set_marker(marker)
self.pub_tf2.publish(tfm)
......
from geometry_msgs.msg import Vector3, Transform, Quaternion
def vec_add(v1: Vector3, v2: Vector3):
v = Vector3()
v.x = v1.x + v2.x
v.y = v1.y + v2.y
v.z = v1.z + v2.z
return v
def quat_multiply(q1: Quaternion, q2: Quaternion):
q = Quaternion()
q.w = q1.w*q2.w - q1.x*q2.x - q1.y*q2.y - q1.z*q2.z
q.x = q1.w*q2.x + q1.x*q2.w + q1.y*q2.z - q1.z*q2.y
q.y = q1.w*q2.y - q1.x*q2.z + q1.y*q2.w + q1.z*q2.x
q.z = q1.w*q2.z + q1.x*q2.y - q1.y*q2.x + q1.z*q2.w
return q
def quat_inv(q: Quaternion):
q_ret = Quaternion()
q_ret.w = q.w
q_ret.x = -q.x
q_ret.y = -q.y
q_ret.z = -q.z
return q_ret
def quat2vec(q: Quaternion):
v = Vector3()
v.x = q.x
v.y = q.y
v.z = q.z
return v
def vec2quat(v: Vector3):
q = Quaternion()
q.w = 0.
q.x = v.x
q.y = v.y
q.z = v.z
return q
def quat_rotate_vec(v: Vector3, q_rot:Quaternion):
return quat2vec( quat_multiply(quat_multiply(q_rot, vec2quat(v)), quat_inv(q_rot) ) )
def transform_apply(v: Vector3, tf: Transform):
""" Transform object by the tf transform tf
:param object: object to be transform
:param tf: Transform Transformation that shall be applied
:return: transformed object
"""
return vec_add(quat_rotate_vec(v, tf.rotation), tf.translation)
\ No newline at end of file
......@@ -52,7 +52,7 @@ class VelocityCommander(Node):
if self.pub_counter == 10:
# set offboard mode
self.pub_vehicle_command(VehicleCommand.VEHICLE_CMD_DO_SET_MODE, 1., 6.)
self.arm()
#self.arm()
self.pub_offb_control_mode()
self.pub_trajectory_setpoint()
......
import rclpy
from rclpy.duration import Duration
from rclpy.time import Time
from rclpy.node import Node
from time import sleep
import math
from std_msgs.msg import Header, Float32
from geometry_msgs.msg import TransformStamped, Twist, Vector3, Transform
from tf2_msgs.msg import TFMessage
from tf2_ros import BufferInterface
from geometry_msgs.msg import TransformStamped, Twist, Vector3, Transform, Quaternion
from landing_controller.velocity_commander import VelocityCommander
from px4_msgs.msg import VehicleAttitude
from tf2_ros import Buffer
from tf2_ros.transform_listener import TransformListener
from tf2_ros.transform_broadcaster import TransformBroadcaster
from landing_controller.velocity_commander import VelocityCommander
from landing_controller.vector_math import *
class VelocityController(Node):
......@@ -22,8 +29,7 @@ class VelocityController(Node):
TOPICS["CMD_VEL"] = VelocityCommander.TOPICS["CMD_VEL"]
FRAMES = {
"ARUCO" : "aruco_marker",
"NED" : "ned"
"ARUCO" : "aruco_marker"
}
# time in seconds in which the timeout function is called
......@@ -37,15 +43,31 @@ class VelocityController(Node):
# class variables
self.aruco_timeout_reset = True
self.tf_buf = BufferInterface()
self.gains = {"P", 0.} # Initial controller gains
self.gains = {"P": 0.} # Initial controller gains
# set up transform listener
self.tf_buf = Buffer()
self.tf_listener = TransformListener(self.tf_buf, self, spin_thread=False)
# /map frame (https://www.ros.org/reps/rep-0103.html#axis-orientation)
# to North-East-Down frame
self.init_static_frame('map', 'ned', Vector3(),
Quaternion(x=1., w=0.)) # Rotation of 180 degrees around x-axis
# pixhawk to camera -> dependent on drone construction
self.init_static_frame('pixhawk', 'camera',
Vector3(z=.1), # Translation from Pixhawk to Camera
Quaternion()) # Rotation from Pixhawk to Camera
# set up transform broadcaster (for debug)
self.tf_broadcaster = TransformBroadcaster(self)
# subscription callbacks
self.sub_tf = self.create_subscription(
TFMessage,
"/tf",
self.process_tf_callback,
1)
self.sub_att = self.create_subscription(
VehicleAttitude,
'VehicleAttitude_PubSubTopic',
self.attitude_callback,
10)
self.sub_ctrl_gain = self.create_subscription(
Float32,
self.TOPICS["P_GAIN"],
......@@ -56,42 +78,54 @@ class VelocityController(Node):
self.vel_publisher = self.create_publisher(Twist, self.TOPICS["CMD_VEL"], 1)
# timer callbacks
self.aruco_timout_timer = self.create_timer(self.ARUCO_TIMEOUT, self.aruco_timer_callback)
self.control_timer = self.create_timer(.1, self.control)
# transform listeners
#self.tf_listener = TransformListener(self.tf_buf, self, spin_thread=True)
def process_tf_callback(self, msg: Twist):
"""Applies control algorithm when new aruco frame was received
def attitude_callback(self, msg: VehicleAttitude):
# rotation in msg is rotation from FRONT-RIGHT-DOWN (FRD) to
# NORTH-EAST-DOWN (NED)
# (see https://github.com/PX4/PX4-Autopilot/blob/master/msg/vehicle_attitude.msg)
:param Twist msg: received Twist message
"""
print("Callback TF")
for transformStamped in msg.transforms:
print('Got child frame ' + transformStamped.child_frame_id)
if transformStamped.child_frame_id == self.FRAMES["ARUCO"]:
self.aruco_timeout_reset = False
tf = Transform(rotation=numpy_ndarray_to_Quaternion(msg.q))
tf_header = Header(stamp=self.get_clock().now().to_msg(), frame_id='ned')
tf_stamped = TransformStamped(transform=tf, header=tf_header, child_frame_id='pixhawk')
tf = self.tf_buf.lookup_transform(self.FRAMES["NED"], self.FRAMES["ARUCO"], 0)
print(tf)
self.tf_buf.set_transform(tf_stamped, 'default_authority')
#e = -self.tf_buf.transform(Vector3(), self.FRAMES["NED"], Duration(seconds=.1))
#self.control(e)
self.publish_debug_frames()
return
def process_p_gain_callback(self, msg: Float32):
self.gains["P"] = msg.data
def init_static_frame(self, source: str, target: str, trans: Vector3, rot: Quaternion):
tf = Transform(translation=trans, rotation=rot)
tf_header = Header(stamp=self.get_clock().now().to_msg(), frame_id=source)
tf_stamped = TransformStamped(transform=tf, header=tf_header, child_frame_id=target)
self.tf_buf.set_transform_static(tf_stamped, 'default_authority')
def control(self, e: Vector3):
def control(self):
"""Control algorithm to land drone
:param Vector3 e: position error between pixhawk and aruco marker in ned frame
"""
# currently, only hovering over the aruco marker is implemented
u = (e.x, e.y) * self.gains["P"]
if self.tf_buf.can_transform('ned','aruco_marker', Time()):
# determine control error
e_tf = self.tf_buf.lookup_transform('ned','aruco_marker', Time())
e = Vector3(x=e_tf.transform.translation.x, y=e_tf.transform.translation.y)
# apply controller
print(self.gains["P"])
u = (e.x * self.gains["P"], e.y * self.gains["P"])
print(e)
print(u)
self.pub_xy_vel(e)
self.pub_xy_vel(u)
def aruco_timer_callback(self):
......@@ -99,10 +133,26 @@ class VelocityController(Node):
since the last call
"""
"""
targets = ['ned','pixhawk','camera','aruco']
for target in targets:
if self.tf_buf.can_transform('map',target, Time()):
print(target)"""
if self.tf_buf.can_transform('ned','camera', Time()):
self.tf_ned_aruco = self.tf_buf.lookup_transform('ned','camera', Time())
print(transform_apply(Vector3(y=1.), self.tf_ned_aruco.transform))
else:
print("At least one frame does not exist")
"""
if self.aruco_timeout_reset == True:
self.timeout()
self.aruco_timeout_reset = True
self.aruco_timeout_reset = True"""
def timeout(self):
......@@ -127,63 +177,26 @@ class VelocityController(Node):
self.vel_publisher.publish(T)
def process_p_gain_callback(self, msg: Float32):
self.gains["P"] = msg
"""
class TfLine:
def __init__(self, node: Node, line: tuple) -> None:
self.node = node
self.line = line
self.transforms = [Transform()] * len(line-1)
node.create_subscription(TFMessage, "/tf", self.callback, 5)
node.create_subscription(TFMessage, "/tf_static", self.callback, 5)
def callback(self, msg: TFMessage):
for transform in msg.transforms:
self.add_transform(transform)
def add_transform(self, tf: TransformStamped):
p_frame = tf.header.frame_id # parent frame
c_frame = tf.child_frame_id # child frame
if p_frame in self.line and c_frame in self.line:
p_index = self.line.index(p_frame)
c_index = self.line.index(c_frame)
def publish_debug_frames(self):
frames = ['ned', 'camera', 'pixhawk']
# if c_frame comes after p_frame, refresh transformation
if p_frame + 1 == c_frame:
self.transforms[p_index] = tf.transform
# if c_frame comes before p_frame, invert transformation
elif p_frame - 1 == c_frame:
self.transforms[p_index] = TfLine.invert_transform(tf.transform)
def get_transform(self):
tf = Transform()
for transform in self.transforms:
tf.translation.x = tf.translation.x + transform.translation.x
def invert_transform(tf: Transform):
tf.rotation.w = - tf.rotation.w
tf.translation.x = - tf.translation.x
tf.translation.y = - tf.translation.y
tf.translation.z = - tf.translation.z
return tf
"""
for frame in frames:
if self.tf_buf.can_transform('map',frame, Time()):
tf = self.tf_buf.lookup_transform('map',frame, Time())
tf.header.stamp = self.get_clock().now().to_msg()
tf.child_frame_id = frame + '_debug'
self.tf_broadcaster.sendTransform(tf)