Commit 7509cadc authored by pfandzelter's avatar pfandzelter
Browse files

hide some features of alexandra behind an experimental flag

parent 11cd9f43
Pipeline #67122 passed with stages
in 13 minutes and 32 seconds
package main
import (
"crypto/tls"
"crypto/x509"
"flag"
"io/ioutil"
"net"
"os"
"os/signal"
"syscall"
"git.tu-berlin.de/mcc-fred/fred/pkg/alexandra"
"git.tu-berlin.de/mcc-fred/fred/proto/middleware"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
type config struct {
......@@ -23,6 +30,7 @@ type config struct {
isProxied bool
proxyHost string
address string
experimental bool
}
func parseArgs() (c config) {
......@@ -37,6 +45,7 @@ func parseArgs() (c config) {
flag.BoolVar(&(c.isProxied), "is-proxy", false, "Is this behind a proxy?")
flag.StringVar(&(c.proxyHost), "proxy-host", "", "Proxy host if this is proxied")
flag.StringVar(&(c.address), "address", "", "where to start the server")
flag.BoolVar(&(c.experimental), "experimental", false, "enable experimental features")
flag.Parse()
return
}
......@@ -79,7 +88,58 @@ func main() {
}
// Setup alexandra
server := alexandra.NewServer(c.address, c.caCert, c.alexandraCert, c.alexandraKey, c.nodesCert, c.nodesKey, c.lightHouse, c.isProxied, c.proxyHost)
m := alexandra.NewMiddleware(c.nodesCert, c.nodesKey, c.lightHouse, c.isProxied, c.proxyHost, c.experimental)
if c.alexandraCert == "" {
log.Fatal().Msg("alexandra server: no certificate file given")
}
if c.alexandraKey == "" {
log.Fatal().Msg("alexandra server: no key file given")
}
if c.caCert == "" {
log.Fatal().Msg("alexandra server: no root certificate file given")
}
// Load server's certificate and private key
loadedServerCert, err := tls.LoadX509KeyPair(c.alexandraCert, c.alexandraKey)
if err != nil {
log.Fatal().Msgf("alexandra server: could not load key pair: %v", err)
return
}
// Create a new cert pool and add our own CA certificate
rootCAs := x509.NewCertPool()
loaded, err := ioutil.ReadFile(c.caCert)
if err != nil {
log.Fatal().Msgf("alexandra server: unexpected missing certfile: %v", err)
}
rootCAs.AppendCertsFromPEM(loaded)
// Create the credentials and return it
config := &tls.Config{
Certificates: []tls.Certificate{loadedServerCert},
ClientAuth: tls.RequireAndVerifyClientCert,
ClientCAs: rootCAs,
MinVersion: tls.VersionTLS12,
}
lis, err := net.Listen("tcp", c.address)
if err != nil {
log.Fatal().Err(err).Msg("Failed to listen")
return
}
s := grpc.NewServer(grpc.Creds(credentials.NewTLS(config)))
middleware.RegisterMiddlewareServer(s, m)
log.Debug().Msgf("Alexandra Server is listening on %s", c.address)
// Quitting stuff
quit := make(chan os.Signal, 1)
......@@ -89,8 +149,9 @@ func main() {
syscall.SIGQUIT)
go func() {
<-quit
server.Stop()
s.Stop()
}()
server.ServeBlocking()
log.Fatal().Err(s.Serve(lis)).Msg("Alexandra Server")
}
......@@ -44,18 +44,20 @@ type ClientsMgr struct {
clients map[string]*Client
clientsCert, clientsKey, lighthouse string
keygroups map[string]*keygroupSet
experimental bool
}
func newClientsManager(clientsCert, clientsKey, lighthouse string) *ClientsMgr {
func newClientsManager(clientsCert string, clientsKey string, lighthouse string, experimental bool) *ClientsMgr {
mgr := &ClientsMgr{
clients: make(map[string]*Client),
clientsCert: clientsCert,
clientsKey: clientsKey,
lighthouse: lighthouse,
keygroups: make(map[string]*keygroupSet),
clients: make(map[string]*Client),
clientsCert: clientsCert,
clientsKey: clientsKey,
lighthouse: lighthouse,
keygroups: make(map[string]*keygroupSet),
experimental: experimental,
}
// add the lighthouse client to the clients list
mgr.getClientTo(lighthouse)
mgr.getLightHouse()
// rand.Seed(time.Now().UnixNano())
return mgr
}
......@@ -105,11 +107,11 @@ func (m *ClientsMgr) readFromAnywhere(request *middleware.ReadRequest) ([]string
go func(c *Client) {
defer wg.Done()
log.Debug().Msgf("...asking Client %+v for Keygroup %s", c, request.Keygroup)
log.Debug().Msgf("...asking Client %s for Keygroup %s", c.nodeID, request.Keygroup)
res, err := c.Client.Read(context.Background(), &clientsProto.ReadRequest{Id: request.Id, Keygroup: request.Keygroup})
if err != nil {
log.Err(err).Msg("Reading from client returned error")
log.Err(err).Msgf("Reading from client %s returned error", c.nodeID)
return
}
......@@ -120,7 +122,7 @@ func (m *ClientsMgr) readFromAnywhere(request *middleware.ReadRequest) ([]string
for i := range res.Data {
r.vals[i] = res.Data[i].Val
r.versions[i] = res.Data[i].Version.Version
log.Debug().Msgf("Reading from client returned data: %+v %+v", res.Data[i].Val, res.Data[i].Version.Version)
log.Debug().Msgf("Reading from client %s returned data: %+v %+v", c.nodeID, res.Data[i].Val, res.Data[i].Version.Version)
}
responses <- r
......@@ -170,14 +172,21 @@ func (m *ClientsMgr) readFromAnywhere(request *middleware.ReadRequest) ([]string
return vals, versions, nil
}
func (m *ClientsMgr) getLightHouse() (client *Client) {
return m.getClientTo(m.lighthouse, "__lighthouse")
}
// GetClientTo returns a client with this address
func (m *ClientsMgr) getClientTo(host string) (client *Client) {
func (m *ClientsMgr) getClientTo(host string, nodeID string) (client *Client) {
log.Info().Msgf("GetClientTo: Trying to get Fred Client to host %s", host)
client = m.clients[host]
if client != nil {
if client.nodeID == "__lighthouse" && nodeID != "__lighthouse" {
client.nodeID = nodeID
}
return
}
client = newClient(host, m.clientsCert, m.clientsKey)
client = newClient(nodeID, host, m.clientsCert, m.clientsKey)
m.clients[host] = client
return
}
......@@ -228,8 +237,8 @@ func filterClientsToExpiry(clientEx []*clientExpiry, expiry int64) (out []*clien
return
}
func (m *ClientsMgr) getClient(keygroup string, slowprob float64) (*Client, error) {
if rand.Float64() < slowprob {
func (m *ClientsMgr) getClient(keygroup string) (*Client, error) {
if m.experimental && rand.Float64() < UseSlowerNodeProb {
return m.getRandomClientWithKeygroup(keygroup, 0)
}
......@@ -240,7 +249,7 @@ func (m *ClientsMgr) getClient(keygroup string, slowprob float64) (*Client, erro
func (m *ClientsMgr) getFastestClient() (client *Client) {
if len(m.clients) == 0 {
log.Info().Msg("ClientsMgr: GetFastestClient was called but there are not clients. Using lighthouse client")
return m.getClientTo(m.lighthouse)
return m.getLightHouse()
}
clts := make([]*Client, 0, len(m.clients))
......@@ -317,7 +326,7 @@ func (m *ClientsMgr) updateKeygroupClients(keygroup string) {
replica, err := m.getFastestClient().getKeygroupReplica(context.Background(), keygroup)
if err != nil {
log.Debug().Msgf("couldn't get replicas for keygroup %s from fastest client: %s", keygroup, err.Error())
replica, err = m.getClientTo(m.lighthouse).getKeygroupReplica(context.Background(), keygroup)
replica, err = m.getLightHouse().getKeygroupReplica(context.Background(), keygroup)
if err != nil {
log.Debug().Msgf("couldn't get replicas for keygroup %s from lighthouse client: %s", keygroup, err.Error())
log.Error().Msgf("updateKeygroupClients cannot reach fastest client OR lighthouse...")
......@@ -338,7 +347,7 @@ func (m *ClientsMgr) updateKeygroupClients(keygroup string) {
set.clients = make([]*clientExpiry, len(replica.Replica))
for i, client := range replica.Replica {
set.clients[i] = &clientExpiry{
client: m.getClientTo(client.Host),
client: m.getClientTo(client.Host, client.NodeId),
expiry: client.Expiry,
}
}
......
......@@ -18,12 +18,13 @@ import (
const alphaItemSpeed = float32(0.8)
type Client struct {
nodeID string
Client api.ClientClient
conn *grpc.ClientConn
ReadSpeed float32
}
func newClient(host, certFile, keyFile string) *Client {
func newClient(nodeID string, host string, certFile string, keyFile string) *Client {
if certFile == "" {
log.Fatal().Msg("fredclient: no certificate file given")
......@@ -66,12 +67,17 @@ func newClient(host, certFile, keyFile string) *Client {
return &Client{Client: api.NewClientClient(conn)}
}
log.Info().Msgf("Creating a connection to fred node: %s", host)
return &Client{Client: api.NewClientClient(conn), conn: conn, ReadSpeed: -1}
return &Client{
Client: api.NewClientClient(conn),
conn: conn,
ReadSpeed: -1,
nodeID: nodeID,
}
}
// updateItemSpeed saves a moving average of how long it takes for a fred node to respond.
// the expectation is that read, writes, deletes and appends of items in keygroups should give an indication on whether
// a node ist fast to reach for operations on items or not
// a node is fast to reach for operations on items or not
// see https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
func (c *Client) updateItemSpeed(elapsed time.Duration) {
elapsedMs := float32(elapsed.Milliseconds())
......
......@@ -13,8 +13,8 @@ import (
// Scan issues a scan request from the client to the middleware. The request is forwarded to FReD and incoming items are
// checked for their versions by comparing locally cached versions (if any). The local cache is also updated
// (if applicable).
func (s *Server) Scan(ctx context.Context, req *middleware.ScanRequest) (*middleware.ScanResponse, error) {
res, err := s.clientsMgr.getClientTo(s.lighthouse).Client.Scan(ctx, &api.ScanRequest{
func (m *Middleware) Scan(ctx context.Context, req *middleware.ScanRequest) (*middleware.ScanResponse, error) {
res, err := m.clientsMgr.getLightHouse().Client.Scan(ctx, &api.ScanRequest{
Keygroup: req.Keygroup,
Id: req.Id,
Count: req.Count,
......@@ -32,7 +32,7 @@ func (s *Server) Scan(ctx context.Context, req *middleware.ScanRequest) (*middle
Data: datum.Val,
}
err = s.cache.add(req.Keygroup, req.Id, datum.Version.Version)
err = m.cache.add(req.Keygroup, req.Id, datum.Version.Version)
}
return &middleware.ScanResponse{Data: data}, err
......@@ -41,17 +41,17 @@ func (s *Server) Scan(ctx context.Context, req *middleware.ScanRequest) (*middle
// Read reads a datum from FReD. Read data are placed in cache (if not in there already). If multiple versions of a
// datum exist, all versions will be returned to the client so that it can choose one. If the read data is outdated
// compared to seen versions, an error is returned.
func (s *Server) Read(_ context.Context, req *middleware.ReadRequest) (*middleware.ReadResponse, error) {
func (m *Middleware) Read(_ context.Context, req *middleware.ReadRequest) (*middleware.ReadResponse, error) {
log.Debug().Msgf("Alexandra has rcvd Read")
vals, versions, err := s.clientsMgr.readFromAnywhere(req)
vals, versions, err := m.clientsMgr.readFromAnywhere(req)
if err != nil {
log.Error().Err(err)
return nil, err
}
known, err := s.cache.get(req.Keygroup, req.Id)
known, err := m.cache.get(req.Keygroup, req.Id)
if err != nil {
return nil, err
......@@ -80,7 +80,7 @@ func (s *Server) Read(_ context.Context, req *middleware.ReadRequest) (*middlewa
for i := range versions {
log.Debug().Msgf("Alexandra Read: putting version %v in cache for %s", versions[i], req.Id)
err = s.cache.add(req.Keygroup, req.Id, versions[i])
err = m.cache.add(req.Keygroup, req.Id, versions[i])
if err != nil {
log.Error().Err(err)
return nil, err
......@@ -112,17 +112,17 @@ func (s *Server) Read(_ context.Context, req *middleware.ReadRequest) (*middlewa
//
// If spontaneous write (i.e., datum cannot be found in cache), we assume an empty vector clock in the cache and send
// that to FReD. If there is a newer (any) data item in FReD already, this will fail.
func (s *Server) Update(ctx context.Context, req *middleware.UpdateRequest) (*middleware.Empty, error) {
func (m *Middleware) Update(ctx context.Context, req *middleware.UpdateRequest) (*middleware.Empty, error) {
log.Debug().Msgf("Alexandra has rcvd Update")
c, err := s.clientsMgr.getClient(req.Keygroup, UseSlowerNodeProb)
c, err := m.clientsMgr.getClient(req.Keygroup)
if err != nil {
log.Error().Err(err)
return nil, err
}
known, err := s.cache.get(req.Keygroup, req.Id)
known, err := m.cache.get(req.Keygroup, req.Id)
if err != nil {
log.Error().Err(err)
......@@ -149,7 +149,7 @@ func (s *Server) Update(ctx context.Context, req *middleware.UpdateRequest) (*mi
log.Debug().Msgf("Alexandra Update: new version %v for %s", v, req.Id)
err = s.cache.supersede(req.Keygroup, req.Id, known, v)
err = m.cache.supersede(req.Keygroup, req.Id, known, v)
if err != nil {
log.Error().Err(err)
......@@ -169,17 +169,17 @@ func (s *Server) Update(ctx context.Context, req *middleware.UpdateRequest) (*mi
//
// If spontaneous delete (i.e., datum cannot be found in cache), we assume an empty vector clock in the cache and send
// that to FReD. If there is a newer (any) data item in FReD already, this will fail.
func (s *Server) Delete(ctx context.Context, req *middleware.DeleteRequest) (*middleware.Empty, error) {
func (m *Middleware) Delete(ctx context.Context, req *middleware.DeleteRequest) (*middleware.Empty, error) {
log.Debug().Msgf("Alexandra has rcvd Delete")
c, err := s.clientsMgr.getClient(req.Keygroup, UseSlowerNodeProb)
c, err := m.clientsMgr.getClient(req.Keygroup)
if err != nil {
log.Error().Err(err)
return nil, err
}
known, err := s.cache.get(req.Keygroup, req.Id)
known, err := m.cache.get(req.Keygroup, req.Id)
if err != nil {
log.Error().Err(err)
......@@ -198,7 +198,7 @@ func (s *Server) Delete(ctx context.Context, req *middleware.DeleteRequest) (*mi
return nil, err
}
err = s.cache.supersede(req.Keygroup, req.Id, known, v)
err = m.cache.supersede(req.Keygroup, req.Id, known, v)
if err != nil {
log.Error().Err(err)
......@@ -212,8 +212,8 @@ func (s *Server) Delete(ctx context.Context, req *middleware.DeleteRequest) (*mi
// Thus, the request is only passed through to FReD without caching it.
// FReD's append endpoint requires a unique ID for a datum. ALExANDRA automatically uses a Unix nanosecond timestamp for
// this.
func (s *Server) Append(ctx context.Context, req *middleware.AppendRequest) (*middleware.AppendResponse, error) {
c, err := s.clientsMgr.getClient(req.Keygroup, UseSlowerNodeProb)
func (m *Middleware) Append(ctx context.Context, req *middleware.AppendRequest) (*middleware.AppendResponse, error) {
c, err := m.clientsMgr.getClient(req.Keygroup)
if err != nil {
return nil, err
......@@ -229,8 +229,8 @@ func (s *Server) Append(ctx context.Context, req *middleware.AppendRequest) (*mi
// Notify notifies the middleware about a version of a datum that the client has seen by bypassing the middleware. This
// is required to capture external causality.
func (s *Server) Notify(_ context.Context, req *middleware.NotifyRequest) (*middleware.NotifyResponse, error) {
err := s.cache.add(req.Keygroup, req.Id, req.Version)
func (m *Middleware) Notify(_ context.Context, req *middleware.NotifyRequest) (*middleware.NotifyResponse, error) {
err := m.cache.add(req.Keygroup, req.Id, req.Version)
if err != nil {
return nil, err
......@@ -241,15 +241,15 @@ func (s *Server) Notify(_ context.Context, req *middleware.NotifyRequest) (*midd
// CreateKeygroup creates the keygroup and also adds the first node (This is two operations in the eye of FReD:
// CreateKeygroup and AddReplica)
func (s *Server) CreateKeygroup(ctx context.Context, req *middleware.CreateKeygroupRequest) (*middleware.Empty, error) {
func (m *Middleware) CreateKeygroup(ctx context.Context, req *middleware.CreateKeygroupRequest) (*middleware.Empty, error) {
log.Debug().Msgf("AlexandraServer has rcdv CreateKeygroup: %+v", req)
getReplica, err := s.clientsMgr.getFastestClient().getReplica(ctx, req.FirstNodeId)
getReplica, err := m.clientsMgr.getFastestClient().getReplica(ctx, req.FirstNodeId)
if err != nil {
return nil, err
}
log.Debug().Msgf("CreateKeygroup: using node %s (addr=%s)", getReplica.NodeId, getReplica.Host)
_, err = s.clientsMgr.getClientTo(getReplica.Host).createKeygroup(ctx, req.Keygroup, req.Mutable, req.Expiry)
_, err = m.clientsMgr.getClientTo(getReplica.Host, getReplica.NodeId).createKeygroup(ctx, req.Keygroup, req.Mutable, req.Expiry)
if err != nil {
return nil, err
......@@ -259,12 +259,12 @@ func (s *Server) CreateKeygroup(ctx context.Context, req *middleware.CreateKeygr
}
// DeleteKeygroup deletes a keygroup from FReD.
func (s *Server) DeleteKeygroup(ctx context.Context, req *middleware.DeleteKeygroupRequest) (*middleware.Empty, error) {
client, err := s.clientsMgr.getFastestClientWithKeygroup(req.Keygroup, 1)
func (m *Middleware) DeleteKeygroup(ctx context.Context, req *middleware.DeleteKeygroupRequest) (*middleware.Empty, error) {
client, err := m.clientsMgr.getFastestClientWithKeygroup(req.Keygroup, 1)
if err != nil {
return nil, err
}
log.Debug().Msgf("DeleteKeygroup: using node %+v", client)
log.Debug().Msgf("DeleteKeygroup: using node %+v", client.nodeID)
_, err = client.deleteKeygroup(ctx, req.Keygroup)
......@@ -277,8 +277,8 @@ func (s *Server) DeleteKeygroup(ctx context.Context, req *middleware.DeleteKeygr
// AddReplica lets the client explicitly add a new replica for a keygroup. In the future, this should happen
// automatically.
func (s *Server) AddReplica(ctx context.Context, req *middleware.AddReplicaRequest) (*middleware.Empty, error) {
_, err := s.clientsMgr.getFastestClient().Client.AddReplica(ctx, &api.AddReplicaRequest{
func (m *Middleware) AddReplica(ctx context.Context, req *middleware.AddReplicaRequest) (*middleware.Empty, error) {
_, err := m.clientsMgr.getFastestClient().Client.AddReplica(ctx, &api.AddReplicaRequest{
Keygroup: req.Keygroup,
NodeId: req.NodeId,
Expiry: req.Expiry,
......@@ -288,15 +288,15 @@ func (s *Server) AddReplica(ctx context.Context, req *middleware.AddReplicaReque
return nil, err
}
s.clientsMgr.updateKeygroupClients(req.Keygroup)
m.clientsMgr.updateKeygroupClients(req.Keygroup)
return &middleware.Empty{}, err
}
// RemoveReplica lets the client explicitly remove a new replica for a keygroup. In the future, this should happen
// automatically.
func (s *Server) RemoveReplica(ctx context.Context, req *middleware.RemoveReplicaRequest) (*middleware.Empty, error) {
_, err := s.clientsMgr.getFastestClient().Client.RemoveReplica(ctx, &api.RemoveReplicaRequest{
func (m *Middleware) RemoveReplica(ctx context.Context, req *middleware.RemoveReplicaRequest) (*middleware.Empty, error) {
_, err := m.clientsMgr.getFastestClient().Client.RemoveReplica(ctx, &api.RemoveReplicaRequest{
Keygroup: req.Keygroup,
NodeId: req.NodeId,
})
......@@ -304,15 +304,15 @@ func (s *Server) RemoveReplica(ctx context.Context, req *middleware.RemoveReplic
return nil, err
}
s.clientsMgr.updateKeygroupClients(req.Keygroup)
m.clientsMgr.updateKeygroupClients(req.Keygroup)
return &middleware.Empty{}, err
}
// GetReplica returns information about a specific FReD node. In the future, this API will be removed as ALExANDRA
// handles data replication.
func (s *Server) GetReplica(ctx context.Context, req *middleware.GetReplicaRequest) (*middleware.GetReplicaResponse, error) {
res, err := s.clientsMgr.getFastestClient().Client.GetReplica(ctx, &api.GetReplicaRequest{NodeId: req.NodeId})
func (m *Middleware) GetReplica(ctx context.Context, req *middleware.GetReplicaRequest) (*middleware.GetReplicaResponse, error) {
res, err := m.clientsMgr.getFastestClient().Client.GetReplica(ctx, &api.GetReplicaRequest{NodeId: req.NodeId})
if err != nil {
return nil, err
......@@ -322,9 +322,9 @@ func (s *Server) GetReplica(ctx context.Context, req *middleware.GetReplicaReque
}
// GetAllReplica returns a list of all FReD nodes. In the future, this API will be removed as ALExANDRA handles data
//// replication.
func (s *Server) GetAllReplica(ctx context.Context, _ *middleware.GetAllReplicaRequest) (*middleware.GetAllReplicaResponse, error) {
res, err := s.clientsMgr.getFastestClient().Client.GetAllReplica(ctx, &api.Empty{})
// replication.
func (m *Middleware) GetAllReplica(ctx context.Context, _ *middleware.GetAllReplicaRequest) (*middleware.GetAllReplicaResponse, error) {
res, err := m.clientsMgr.getFastestClient().Client.GetAllReplica(ctx, &api.Empty{})
if err != nil {
return nil, err
......@@ -343,8 +343,8 @@ func (s *Server) GetAllReplica(ctx context.Context, _ *middleware.GetAllReplicaR
// GetKeygroupInfo returns a list of all FReD nodes that replicate a given keygroup. In the future, this API will be
// removed as ALExANDRA handles data replication.
func (s *Server) GetKeygroupInfo(ctx context.Context, req *middleware.GetKeygroupInfoRequest) (*middleware.GetKeygroupInfoResponse, error) {
res, err := s.clientsMgr.getFastestClient().Client.GetKeygroupInfo(ctx, &api.GetKeygroupInfoRequest{Keygroup: req.Keygroup})
func (m *Middleware) GetKeygroupInfo(ctx context.Context, req *middleware.GetKeygroupInfoRequest) (*middleware.GetKeygroupInfoResponse, error) {
res, err := m.clientsMgr.getFastestClient().Client.GetKeygroupInfo(ctx, &api.GetKeygroupInfoRequest{Keygroup: req.Keygroup})
if err != nil {
return nil, err
......@@ -365,8 +365,8 @@ func (s *Server) GetKeygroupInfo(ctx context.Context, req *middleware.GetKeygrou
}
// GetKeygroupTriggers returns a list of trigger nodes for a keygroup.
func (s *Server) GetKeygroupTriggers(ctx context.Context, req *middleware.GetKeygroupTriggerRequest) (*middleware.GetKeygroupTriggerResponse, error) {
res, err := s.clientsMgr.getClientTo(s.lighthouse).Client.GetKeygroupTriggers(ctx, &api.GetKeygroupTriggerRequest{
func (m *Middleware) GetKeygroupTriggers(ctx context.Context, req *middleware.GetKeygroupTriggerRequest) (*middleware.GetKeygroupTriggerResponse, error) {
res, err := m.clientsMgr.getLightHouse().Client.GetKeygroupTriggers(ctx, &api.GetKeygroupTriggerRequest{
Keygroup: req.Keygroup,
})
......@@ -385,8 +385,8 @@ func (s *Server) GetKeygroupTriggers(ctx context.Context, req *middleware.GetKey
}
// AddTrigger adds a new trigger to a keygroup.
func (s *Server) AddTrigger(ctx context.Context, req *middleware.AddTriggerRequest) (*middleware.Empty, error) {
_, err := s.clientsMgr.getClientTo(s.lighthouse).Client.AddTrigger(ctx, &api.AddTriggerRequest{
func (m *Middleware) AddTrigger(ctx context.Context, req *middleware.AddTriggerRequest) (*middleware.Empty, error) {
_, err := m.clientsMgr.getLightHouse().Client.AddTrigger(ctx, &api.AddTriggerRequest{
Keygroup: req.Keygroup,
TriggerId: req.TriggerId,
TriggerHost: req.TriggerHost,
......@@ -400,8 +400,8 @@ func (s *Server) AddTrigger(ctx context.Context, req *middleware.AddTriggerReque
}
// RemoveTrigger removes a trigger node for a keygroup.
func (s *Server) RemoveTrigger(ctx context.Context, req *middleware.RemoveTriggerRequest) (*middleware.Empty, error) {
_, err := s.clientsMgr.getClientTo(s.lighthouse).Client.RemoveTrigger(ctx, &api.RemoveTriggerRequest{
func (m *Middleware) RemoveTrigger(ctx context.Context, req *middleware.RemoveTriggerRequest) (*middleware.Empty, error) {
_, err := m.clientsMgr.getLightHouse().Client.RemoveTrigger(ctx, &api.RemoveTriggerRequest{
Keygroup: req.Keygroup,
TriggerId: req.TriggerId,
})
......@@ -414,8 +414,8 @@ func (s *Server) RemoveTrigger(ctx context.Context, req *middleware.RemoveTrigge
}
// AddUser adds permissions to access a keygroup for a particular user to FReD.
func (s *Server) AddUser(ctx context.Context, req *middleware.UserRequest) (*middleware.Empty, error) {
_, err := s.clientsMgr.getClientTo(s.lighthouse).Client.AddUser(ctx, &api.AddUserRequest{
func (m *Middleware) AddUser(ctx context.Context, req *middleware.UserRequest) (*middleware.Empty, error) {
_, err := m.clientsMgr.getLightHouse().Client.AddUser(ctx, &api.AddUserRequest{
User: req.User,
Keygroup: req.Keygroup,
Role: api.UserRole(req.Role),
......@@ -429,8 +429,8 @@ func (s *Server) AddUser(ctx context.Context, req *middleware.UserRequest) (*mid
}
// RemoveUser removes permissions to access a keygroup for a particular user from FReD.
func (s *Server) RemoveUser(ctx context.Context, req *middleware.UserRequest) (*middleware.Empty, error) {
_, err := s.clientsMgr.getClientTo(s.lighthouse).Client.RemoveUser(ctx, &api.RemoveUserRequest{
func (m *Middleware) RemoveUser(ctx context.Context, req *middleware.UserRequest) (*middleware.Empty, error) {
_, err := m.clientsMgr.getLightHouse().Client.RemoveUser(ctx, &api.RemoveUserRequest{
User: req.User,
Keygroup: req.Keygroup,
Role: api.UserRole(req.Role),
......
package alexandra
import (
"crypto/tls"
"crypto/x509"
"io/ioutil"
"net"
"git.tu-berlin.de/mcc-fred/fred/proto/middleware"
"github.com/rs/zerolog/log"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
// Server listens to GRPC requests from clients (and sends them to the relevant Fred Node etc.)
// Middleware listens to GRPC requests from clients (and sends them to the relevant Fred Node etc.)
// The implementation is split up into different files in this folder.
type Server struct {
roots *x509.CertPool
isProxied bool
proxyHost string
clientsMgr *ClientsMgr
lighthouse string
lis net.Listener
cache *cache
*grpc.Server
type Middleware struct {
isProxied bool
proxyHost string
clientsMgr *ClientsMgr
lighthouse string
cache *cache
experimental bool
}
// NewServer creates a new Server for requests from Alexandra Clients
func NewServer(host string, caCert string, serverCert string, serverKey string, nodesCert string, nodesKey string, lighthouse string, isProxied bool, proxyHost string) *Server {
if serverCert == "" {
log.Fatal().Msg("alexandra server: no certificate file given")
}
if serverCert == "" {
log.Fatal().Msg("alexandra server: no key file given")
}
if caCert == "" {
log.Fatal().Msg("alexandra server: no root certificate file given")
}
// Load server's certificate and private key
loadedServerCert, err := tls.LoadX509KeyPair(serverCert, serverKey)
if err != nil {
log.<