Commit d4207701 authored by Jakob Dommaschk's avatar Jakob Dommaschk
Browse files

erste Ansätze. Muss überarbeitet werden

parent ae7e7008
......@@ -4,12 +4,12 @@ import rclpy
from rclpy.node import Node
from std_msgs.msg import Header
from geometry_msgs.msg import Transform, TransformStamped, Vector3, Quaternion
from geometry_msgs.msg import Transform, TransformStamped, Quaternion
from tf2_msgs.msg import TFMessage
from px4_msgs.msg import VehicleAttitude
class MinimalSubscriber(Node):
class OrientationPublisher(Node):
def __init__(self):
super().__init__('minimal_subscriber')
......@@ -24,6 +24,10 @@ class MinimalSubscriber(Node):
def process_att_callback(self, msg):
# rotation in msg is rotation from FRONT-REAR-DOWN (FRD) to
# NORTH-EAST-DOWN (NED)
# (see https://github.com/PX4/PX4-Autopilot/blob/master/msg/vehicle_attitude.msg)
T = Transform()
T.rotation = numpy_ndarray_to_Quaternion(msg.q)
......@@ -33,7 +37,7 @@ class MinimalSubscriber(Node):
Theader = Header()
Theader.stamp = self.get_clock().now().to_msg()
Theader.frame_id = "map"
Theader.frame_id = "ned"
Tstamped.transform = T
Tstamped.child_frame_id = "pixhawk"
......@@ -43,18 +47,46 @@ class MinimalSubscriber(Node):
self.pub_tf2.publish(TF2)
def rot_x_180d(Q: Quaternion):
"""Returns rotation of quaternion of 180 degree around x-axis
"""
q_x_180d = Quaternion()
q_x_180d.x = 1.0
q_x_180d.w = 0.0
Q_new = Quaternion
Q
Q_new = OrientationPublisher.quat_mult(Q, q_x_180d)
return Q_new
def quat_mult(q1: Quaternion, q2: Quaternion):
"""Returns quaternion product of q1 and q2
"""
# that should actually be integrated in ROS but I couldn't find it
Q = Quaternion()
Q.w = q1.w*q2.w - q1.x*q2.x - q1.y*q2.y - q1.z*q2.z
Q.x = q1.w*q2.x + q1.x*q2.w + q1.y*q2.z - q1.z*q2.y
Q.y = q1.w*q2.y - q1.x*q2.z + q1.y*q2.w + q1.z*q2.x
Q.z = q1.w*q2.z + q1.x*q2.y - q1.y*q2.x + q1.z*q2.w
return Q
def main(args=None):
rclpy.init(args=args)
minimal_subscriber = MinimalSubscriber()
orient_pub = OrientationPublisher()
rclpy.spin(minimal_subscriber)
rclpy.spin(orient_pub)
# Destroy the node explicitly
# (optional - otherwise it will be done automatically
# when the garbage collector destroys the node object)
minimal_subscriber.destroy_node()
orient_pub.destroy_node()
rclpy.shutdown()
def numpy_ndarray_to_Quaternion(ndarr):
......
......@@ -6,6 +6,7 @@ import launch_ros.actions
def generate_launch_description():
return launch.LaunchDescription([
# Image Processing Nodes
launch_ros.actions.Node(
package='image_processing',
namespace='',
......@@ -18,6 +19,36 @@ def generate_launch_description():
executable='v4l2_camera_node',
output='screen',
name=['camera_node']),
# Transformation publishers
launch_ros.actions.Node(
package='image_processing',
namespace='',
executable='orientation_publisher',
output='screen',
name=['tf2_pub_orientation_pixhawk']),
# /map frame (https://www.ros.org/reps/rep-0103.html#axis-orientation)
# to North-East-Down frame
launch_ros.actions.Node(
package='tf2_ros',
namespace='',
executable='static_transform_publisher',
output='screen',
name=['tf2_pub_orientation_NED'],
arguments=["0","0","0",
"0.","0.","3.141593",
"/map", "/ned"]),
launch_ros.actions.Node(
package='tf2_ros',
namespace='',
executable='static_transform_publisher',
output='screen',
name=['tf2_pub_orientation_camera'],
arguments=["0","0",".1", # pos of camera relative to pixhawk
"0","0","0", # rotation (yaw, pitch, roll) of camera relative to pixhawk (should be zero)
"/pixhawk", "/camera"]),
# ROS bag
launch.actions.ExecuteProcess(
cmd=['ros2', 'bag', 'record', '/image_raw', '/image_vectors'],
output='screen'
......
import math
import rclpy
from rclpy.node import Node
from std_msgs.msg import Header
from geometry_msgs.msg import Transform, TransformStamped, Quaternion
from tf2_msgs.msg import TFMessage
from px4_msgs.msg import VehicleAttitude
class TestArucoPublisher(Node):
def __init__(self):
super().__init__('minimal_subscriber')
self.pub_tf2 = self.create_publisher(TFMessage, "/tf", 1)
self.pub_tf_timer = self.create_timer(.1, self.pub_tf)
def pub_tf(self):
tfm = TFMessage()
tf = TransformStamped()
tf.header.frame_id = "camera"
tf.child_frame_id = "aruco_marker"
tf.header.stamp = self.get_clock().now().to_msg()
tf.transform.translation.x = 1.
tfm.transforms = [tf]
self.pub_tf2.publish(tfm)
def main(args=None):
rclpy.init(args=args)
node = TestArucoPublisher()
rclpy.spin(node)
# Destroy the node explicitly
# (optional - otherwise it will be done automatically
# when the garbage collector destroys the node object)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
......@@ -23,8 +23,6 @@ class VelocityCommander(Node):
super().__init__('velocity_commander')
# class variables
self.keep_alive = True
self.key = -1
self.t_sync = 0
self.vel = Vector3()
self.pub_counter = 0
......
import rclpy
from rclpy.duration import Duration
from rclpy.node import Node
import math
from std_msgs.msg import Header, Float32
from geometry_msgs.msg import TransformStamped, Twist, Vector3, Transform
from tf2_msgs.msg import TFMessage
from tf2_ros import BufferInterface
from landing_controller.velocity_commander import VelocityCommander
from tf2_ros.transform_listener import TransformListener
class VelocityController(Node):
# parameters
TOPICS = {
"P_GAIN" : "/landing_controller/p_gain"
}
TOPICS["CMD_VEL"] = VelocityCommander.TOPICS["CMD_VEL"]
FRAMES = {
"ARUCO" : "aruco_marker",
"NED" : "ned"
}
# time in seconds in which the timeout function is called
# timeout is earlist executed after ARUCO_TIMEOUT seconds
# and latest after 2*ARUCO_TIMEOUT seconds
ARUCO_TIMEOUT = .5
def __init__(self):
super().__init__('velocity_controller')
# class variables
self.aruco_timeout_reset = True
self.tf_buf = BufferInterface()
self.gains = {"P", 0.} # Initial controller gains
# subscription callbacks
self.sub_tf = self.create_subscription(
TFMessage,
"/tf",
self.process_tf_callback,
1)
self.sub_ctrl_gain = self.create_subscription(
Float32,
self.TOPICS["P_GAIN"],
self.process_p_gain_callback,
1)
# init publications
self.vel_publisher = self.create_publisher(Twist, self.TOPICS["CMD_VEL"], 1)
# timer callbacks
self.aruco_timout_timer = self.create_timer(self.ARUCO_TIMEOUT, self.aruco_timer_callback)
# transform listeners
#self.tf_listener = TransformListener(self.tf_buf, self, spin_thread=True)
def process_tf_callback(self, msg: Twist):
"""Applies control algorithm when new aruco frame was received
:param Twist msg: received Twist message
"""
print("Callback TF")
for transformStamped in msg.transforms:
print('Got child frame ' + transformStamped.child_frame_id)
if transformStamped.child_frame_id == self.FRAMES["ARUCO"]:
self.aruco_timeout_reset = False
tf = self.tf_buf.lookup_transform(self.FRAMES["NED"], self.FRAMES["ARUCO"], 0)
print(tf)
#e = -self.tf_buf.transform(Vector3(), self.FRAMES["NED"], Duration(seconds=.1))
#self.control(e)
return
def control(self, e: Vector3):
"""Control algorithm to land drone
:param Vector3 e: position error between pixhawk and aruco marker in ned frame
"""
# currently, only hovering over the aruco marker is implemented
u = (e.x, e.y) * self.gains["P"]
self.pub_xy_vel(e)
def aruco_timer_callback(self):
"""Checks aruco_timeout_variable and executes timeout if it was not set
since the last call
"""
if self.aruco_timeout_reset == True:
self.timeout()
self.aruco_timeout_reset = True
def timeout(self):
self.pub_xy_vel((0.,0.))
def pub_xy_vel(self, v: tuple):
"""Publish twist message on TOPICS["CMD_VEL"] where the two entries of
v correspond to the commanded x- and y velocity.
:param tuple v: x and y velocity to be commanded to pixhawk
"""
if len(v) != 2:
self.get_logger().info("Wrong number of entries in v input of send_xy_vel.")
v = (0.,0.)
T = Twist()
T.linear.x = v[0]
T.linear.y = v[1]
self.vel_publisher.publish(T)
def process_p_gain_callback(self, msg: Float32):
self.gains["P"] = msg
"""
class TfLine:
def __init__(self, node: Node, line: tuple) -> None:
self.node = node
self.line = line
self.transforms = [Transform()] * len(line-1)
node.create_subscription(TFMessage, "/tf", self.callback, 5)
node.create_subscription(TFMessage, "/tf_static", self.callback, 5)
def callback(self, msg: TFMessage):
for transform in msg.transforms:
self.add_transform(transform)
def add_transform(self, tf: TransformStamped):
p_frame = tf.header.frame_id # parent frame
c_frame = tf.child_frame_id # child frame
if p_frame in self.line and c_frame in self.line:
p_index = self.line.index(p_frame)
c_index = self.line.index(c_frame)
# if c_frame comes after p_frame, refresh transformation
if p_frame + 1 == c_frame:
self.transforms[p_index] = tf.transform
# if c_frame comes before p_frame, invert transformation
elif p_frame - 1 == c_frame:
self.transforms[p_index] = TfLine.invert_transform(tf.transform)
def get_transform(self):
tf = Transform()
for transform in self.transforms:
tf.translation.x = tf.translation.x + transform.translation.x
def invert_transform(tf: Transform):
tf.rotation.w = - tf.rotation.w
tf.translation.x = - tf.translation.x
tf.translation.y = - tf.translation.y
tf.translation.z = - tf.translation.z
return tf
"""
def main(args=None):
rclpy.init(args=args)
vel_ctrl = VelocityController()
rclpy.spin(vel_ctrl)
# Destroy the node explicitly
# (optional - otherwise it will be done automatically
# when the garbage collector destroys the node object)
vel_ctrl.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
......@@ -24,7 +24,9 @@ setup(
tests_require=['pytest'],
entry_points={
'console_scripts': [
'velocity_commander = landing_controller.velocity_commander:main'
'velocity_commander = landing_controller.velocity_commander:main',
'velocity_controller = landing_controller.velocity_controller:main',
'test_pub_aruco= landing_controller.test_pub_aruco:main'
],
},
)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment