Commit 835aadcc authored by pfandzelter's avatar pfandzelter
Browse files

Merge branch 'ts/alexandra-better' into 'main'

Alexandra can be better

See merge request !120
parents 2725948b 752a123b
Pipeline #30957 passed with stages
in 13 minutes and 33 seconds
......@@ -44,8 +44,6 @@ func parseArgs() (c config) {
func main() {
c := parseArgs()
log.Info().Msgf("%#v", c)
// Setup Logging as always
if c.logHandler == "dev" {
log.Logger = log.Output(
......@@ -54,6 +52,8 @@ func main() {
NoColor: false,
},
)
log.Info().Msgf("%#v", c)
} else if c.logHandler != "prod" {
log.Fatal().Msg("Log Handler has to be either dev or prod")
}
......
package alexandra
import (
"context"
"errors"
"fmt"
"math/rand"
"sync"
"time"
clientsProto "git.tu-berlin.de/mcc-fred/fred/proto/client"
alexandraProto "git.tu-berlin.de/mcc-fred/fred/proto/middleware"
"github.com/rs/zerolog/log"
)
const (
// When to update stored information about clients
keygroupTimeout = 2 * time.Minute
// How many other nodes to ask when ReadFromAnywhere is called
otherNodesToAsk = 3
// UseSlowerNodeProb In how many percent of the cases: instead of using the fastest client, use a random client to update its readSpeed
// only used for Read,Write,Delete,Append (since these are the only operations that update the readSpeed)
UseSlowerNodeProb = 0.089
)
type clientExpiry struct {
client *Client
expiry int64
}
// keygroupSet represents a keygroups clients and the last time this information was updated from FreD
type keygroupSet struct {
lastUpdated time.Time
clients map[string]*clientExpiry
}
// ClientsMgr manages all Clients to Fred that Alexandra has. Is used to get fastest clients to keygroups etc. and to read from anywhere
// there are 3 variables important for this configuration: keygruopTimeout, otherNodesToAsk, getSlowerNodeProb. Please see their documentation.
type ClientsMgr struct {
// Mutex for the keygroups map, because it might be changed while iterated over
sync.Mutex
clients map[string]*Client
clientsCert, clientsKey, lighthouse string
keygroups map[string]*keygroupSet
}
func NewClientsManager(clientsCert, clientsKey, lighthouse string) *ClientsMgr {
mgr := &ClientsMgr{
clients: make(map[string]*Client),
clientsCert: clientsCert,
clientsKey: clientsKey,
lighthouse: lighthouse,
keygroups: make(map[string]*keygroupSet),
}
// add the lighthouse client to the clients list
mgr.GetClientTo(lighthouse)
rand.Seed(time.Now().UnixNano())
return mgr
}
func (m *ClientsMgr) ReadFromAnywhere(ctx context.Context, request *alexandraProto.ReadRequest) (*alexandraProto.ReadResponse, error) {
log.Debug().Msgf("ClientsManager is reading from anywhere. Req= %#v", request)
go m.maybeUpdateKeygroupClients(request.Keygroup)
type readResponse struct {
error bool
data string
}
responses := make(chan readResponse)
responsesClosed := false
sentAsks := 0
// Start a coroutine to every fastestClient we ask
m.Lock()
set, exists := m.keygroups[request.Keygroup]
m.Unlock()
clts := filterClientsToExpiry(set.clients, request.MinExpiry)
if !exists || len(clts) == 0 {
log.Error().Msgf("...found no clients whith minimum expiry. Clients with longer expiry: %#v", set.clients)
return nil, errors.New("there are no members of this keygroup")
}
askNode := func(client *Client) {
// TODO select channels?
// TODO buffered Channel
log.Debug().Msgf("...asking Client %#v for Keygroup %s", client, request.Keygroup)
res, err := client.Client.Read(context.Background(), &clientsProto.ReadRequest{Id: request.Id, Keygroup: request.Keygroup})
if err != nil {
log.Err(err).Msg("Reading from client returned error")
if !responsesClosed {
responses <- readResponse{error: true, data: ""}
}
} else {
log.Debug().Msgf("Reading from client returned data: %#v", res)
if !responsesClosed {
responses <- readResponse{error: false, data: res.Data}
}
}
}
// Ask the fastest fastestClient
fastestClient, err := m.GetFastestClientWithKeygroup(request.Keygroup, request.MinExpiry)
if err == nil {
go askNode(fastestClient)
sentAsks++
}
// Ask $otherNodesToAsk other Clients
if len(clts) > 2 { // If its only one element long the one node is also the fastest node
if otherNodesToAsk > len(clts) {
for _, client := range clts {
go askNode(client.client)
sentAsks++
}
} else {
i := 0
otherClientsNames := make([]string, len(clts))
for k := range clts {
otherClientsNames[i] = k
i++
}
for i := 0; i < otherNodesToAsk; i++ {
id := rand.Intn(len(otherClientsNames))
go askNode(clts[otherClientsNames[id]].client)
sentAsks++
}
}
}
// Wait for results and return the first good one
var res readResponse
rectRes := 0
for res = range responses {
log.Debug().Msgf("...waiting for the first answer to return it. res=%#v", res)
rectRes++
if !res.error {
log.Debug().Msgf("...got Response without error (closing channel): %#v", res)
responsesClosed = true
return &alexandraProto.ReadResponse{Data: res.data}, nil
}
if rectRes >= sentAsks {
log.Warn().Msgf("ReadFromAnywhere: no fastestClient was able to answer the read (closing channel). Kg=%s", request.Keygroup)
responsesClosed = true
break
}
}
// There was no successful response -- Update the keygroup information and try one last time
log.Info().Msg("ReadFromAnywhere: Was not able to reach any queried node, updating cache and retrying...")
m.updateKeygroupClients(request.Keygroup)
client, err := m.GetFastestClientWithKeygroup(request.Keygroup, request.MinExpiry)
if err != nil {
return nil, fmt.Errorf("ReadFromAnywhere: there is no client with keygroup %s and expiry %d", request.Keygroup, request.MinExpiry)
}
result, err := client.Read(ctx, request.Keygroup, request.Id)
if err != nil {
return nil, fmt.Errorf("ReadFromAnywhere: cannot read from fastest client. err=%v", err)
}
return &alexandraProto.ReadResponse{Data: result.Data}, nil
}
package alexandra
import (
"context"
"crypto/tls"
"crypto/x509"
"io/ioutil"
"time"
alexandraProto "git.tu-berlin.de/mcc-fred/fred/proto/middleware"
fredClients "git.tu-berlin.de/mcc-fred/fred/proto/client"
"github.com/rs/zerolog/log"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
type ClientsMgr struct {
clients map[string]*Client
clientsCert, clientsKey, clientsCA string
}
// Alpha is the value used for the exponential moving average. range=[0;1] with higher value => discount older observations faster
const alphaItemSpeed = float32(0.8)
type Client struct {
client alexandraProto.MiddlewareClient
conn *grpc.ClientConn
}
func newClientsManager(clientsCert, clientsKey, clientsCA string) *ClientsMgr {
return &ClientsMgr{
clients: make(map[string]*Client),
clientsCert: clientsCert,
clientsKey: clientsKey,
clientsCA: clientsCA,
}
}
func (m *ClientsMgr) GetClientTo(host string) (client *Client) {
client = m.clients[host]
if client != nil {
return
}
client = newClient(host, m.clientsCert, m.clientsKey, m.clientsCA)
m.clients[host] = client
return
Client fredClients.ClientClient
conn *grpc.ClientConn
ReadSpeed float32
}
func newClient(host, certFile, keyFile, caFile string) *Client {
func NewClient(host, certFile, keyFile string) *Client {
cert, err := tls.LoadX509KeyPair(certFile, keyFile)
if err != nil {
......@@ -51,7 +33,7 @@ func newClient(host, certFile, keyFile, caFile string) *Client {
// Create a new cert pool and add our own CA certificate
rootCAs := x509.NewCertPool()
loaded, err := ioutil.ReadFile(caFile)
loaded, err := ioutil.ReadFile("/cert/ca.crt")
if err != nil {
log.Fatal().Msgf("unexpected missing certfile: %v", err)
......@@ -61,6 +43,7 @@ func newClient(host, certFile, keyFile, caFile string) *Client {
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{cert},
MinVersion: tls.VersionTLS12,
RootCAs: rootCAs,
}
......@@ -70,8 +53,157 @@ func newClient(host, certFile, keyFile, caFile string) *Client {
if err != nil {
log.Fatal().Err(err).Msgf("Cannot create Grpc connection to client %s", host)
return &Client{client: alexandraProto.NewMiddlewareClient(conn)}
return &Client{Client: fredClients.NewClientClient(conn)}
}
log.Info().Msgf("Creating a connection to fred node: %s", host)
return &Client{client: alexandraProto.NewMiddlewareClient(conn), conn: conn}
return &Client{Client: fredClients.NewClientClient(conn), conn: conn, ReadSpeed: -1}
}
// updateItemSpeed saves a moving average of how long it takes for a fred node to respond.
// the expectation is that read, writes, deletes and appends of items in keygroups should give an indication on whether
// a node ist fast to reach for operations on items or not
// see https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
func (c *Client) updateItemSpeed(elapsed time.Duration) {
elapsedMs := float32(elapsed.Milliseconds())
if c.ReadSpeed == -1 {
// Read speed was not initialized
c.ReadSpeed = elapsedMs
} else {
c.ReadSpeed = alphaItemSpeed*elapsedMs + (1-alphaItemSpeed)*c.ReadSpeed
}
}
func (c *Client) CreateKeygroup(ctx context.Context, keygroup string, mutable bool, expiry int64) (*fredClients.StatusResponse, error) {
res, err := c.Client.CreateKeygroup(ctx, &fredClients.CreateKeygroupRequest{
Keygroup: keygroup,
Mutable: mutable,
Expiry: expiry,
})
return res, err
}
func (c *Client) DeleteKeygroup(ctx context.Context, keygroup string) (*fredClients.StatusResponse, error) {
res, err := c.Client.DeleteKeygroup(ctx, &fredClients.DeleteKeygroupRequest{Keygroup: keygroup})
return res, err
}
// Read also updates the moving average item speed
func (c *Client) Read(ctx context.Context, keygroup string, id string) (*fredClients.ReadResponse, error) {
start := time.Now()
res, err := c.Client.Read(ctx, &fredClients.ReadRequest{
Keygroup: keygroup,
Id: id,
})
if err == nil {
elapsed := time.Since(start)
c.updateItemSpeed(elapsed)
}
return res, err
}
// Update also updates the moving average item speed
func (c *Client) Update(ctx context.Context, keygroup string, id string, data string) (*fredClients.StatusResponse, error) {
start := time.Now()
res, err := c.Client.Update(ctx, &fredClients.UpdateRequest{
Keygroup: keygroup,
Id: id,
Data: data,
})
if err == nil {
elapsed := time.Since(start)
c.updateItemSpeed(elapsed)
}
return res, err
}
// Delete also updates the moving average item speed
func (c *Client) Delete(ctx context.Context, keygroup string, id string) (*fredClients.StatusResponse, error) {
start := time.Now()
res, err := c.Client.Delete(ctx, &fredClients.DeleteRequest{
Keygroup: keygroup,
Id: id,
})
if err == nil {
elapsed := time.Since(start)
c.updateItemSpeed(elapsed)
}
return res, err
}
// Append also updates the moving average item speed
func (c *Client) Append(ctx context.Context, keygroup string, data string) (*fredClients.AppendResponse, error) {
start := time.Now()
res, err := c.Client.Append(ctx, &fredClients.AppendRequest{
Keygroup: keygroup,
Data: data,
})
if err == nil {
elapsed := time.Since(start)
c.updateItemSpeed(elapsed)
}
return res, err
}
func (c *Client) AddReplica(ctx context.Context, keygroup string, nodeID string, expiry int64) (*fredClients.StatusResponse, error) {
res, err := c.Client.AddReplica(ctx, &fredClients.AddReplicaRequest{
Keygroup: keygroup,
NodeId: nodeID,
Expiry: expiry,
})
return res, err
}
func (c *Client) GetKeygroupReplica(ctx context.Context, keygroup string) (*fredClients.GetKeygroupReplicaResponse, error) {
res, err := c.Client.GetKeygroupReplica(ctx, &fredClients.GetKeygroupReplicaRequest{Keygroup: keygroup})
return res, err
}
func (c *Client) RemoveReplica(ctx context.Context, keygroup string, nodeID string) (*fredClients.StatusResponse, error) {
return c.Client.RemoveReplica(ctx, &fredClients.RemoveReplicaRequest{
Keygroup: keygroup,
NodeId: nodeID,
})
}
func (c *Client) GetReplica(ctx context.Context, nodeID string) (*fredClients.GetReplicaResponse, error) {
return c.Client.GetReplica(ctx, &fredClients.GetReplicaRequest{NodeId: nodeID})
}
func (c *Client) GetAllReplica(ctx context.Context) (*fredClients.GetAllReplicaResponse, error) {
return c.Client.GetAllReplica(ctx, &fredClients.GetAllReplicaRequest{})
}
func (c *Client) GetKeygroupTriggers(ctx context.Context, keygroup string) (*fredClients.GetKeygroupTriggerResponse, error) {
return c.Client.GetKeygroupTriggers(ctx, &fredClients.GetKeygroupTriggerRequest{Keygroup: keygroup})
}
func (c *Client) AddTrigger(ctx context.Context, keygroup string, triggerID string, triggerHost string) (*fredClients.StatusResponse, error) {
return c.Client.AddTrigger(ctx, &fredClients.AddTriggerRequest{
Keygroup: keygroup,
TriggerId: triggerID,
TriggerHost: triggerHost,
})
}
func (c *Client) RemoveTrigger(ctx context.Context, keygroup, triggerID string) (*fredClients.StatusResponse, error) {
return c.Client.RemoveTrigger(ctx, &fredClients.RemoveTriggerRequest{
Keygroup: keygroup,
TriggerId: triggerID,
})
}
func (c *Client) AddUser(ctx context.Context, user, keygroup string, role fredClients.UserRole) (*fredClients.StatusResponse, error) {
return c.Client.AddUser(ctx, &fredClients.UserRequest{
User: user,
Keygroup: keygroup,
Role: role,
})
}
func (c *Client) RemoveUser(ctx context.Context, user, keygroup string, role fredClients.UserRole) (*fredClients.StatusResponse, error) {
return c.Client.RemoveUser(ctx, &fredClients.UserRequest{
User: user,
Keygroup: keygroup,
Role: role,
})
}
package alexandra
import (
"context"
"fmt"
"math"
"math/rand"
"time"
"github.com/rs/zerolog/log"
)
// GetClientTo returns a client with this address
func (m *ClientsMgr) GetClientTo(host string) (client *Client) {
log.Info().Msgf("GetClientTo: Trying to get Fred Client to host %s", host)
client = m.clients[host]
if client != nil {
return
}
client = NewClient(host, m.clientsCert, m.clientsKey)
m.clients[host] = client
return
}
func getFastestClient(clts map[string]*Client) (client *Client) {
var minTime float32 = math.MaxFloat32
var minClient *Client
// Set the first client up as fastest client, so that it gets returned if no other client is found.
for _, value := range clts {
minClient = value
break
}
for _, value := range clts {
if value.ReadSpeed < minTime && value.ReadSpeed != -1 {
minTime = value.ReadSpeed
minClient = value
}
}
return minClient
}
func getFastestClientByClientExpiry(clts map[string]*clientExpiry) (client *Client) {
if clts == nil {
return nil
}
var clientsMap = make(map[string]*Client)
for key, value := range clts {
clientsMap[key] = value.client
}
return getFastestClient(clientsMap)
}
// filterClientsToExpiry if param=-1 then only exp==-1; if param=0 then anything; if param>=0 then anything exp >= than param
func filterClientsToExpiry(clientEx map[string]*clientExpiry, expiry int64) (out map[string]*clientExpiry) {
if clientEx == nil {
return nil
}
out = make(map[string]*clientExpiry)
for k, v := range clientEx {
if expiry == -1 && v.expiry == -1 {
out[k] = v
} else if expiry == 0 {
out[k] = v
} else if v.expiry >= expiry {
out[k] = v
}
}
return
}
// GetFastestClient searches for the fastest of the already existing clients
func (m *ClientsMgr) GetFastestClient() (client *Client) {
if len(m.clients) == 0 {
log.Info().Msg("Fredclients: GetFastestClient was called but there are not clients. Using lighthouse client")
return m.GetClientTo(m.lighthouse)
}
return getFastestClient(m.clients)
}
// maybeUpdateKeygroupClients updates the cached keygroups of a client if it hasn't happened $keygroupCacheTimeout
func (m *ClientsMgr) maybeUpdateKeygroupClients(keygroup string) {
m.Lock()
set, exists := m.keygroups[keygroup]
m.Unlock()
if !exists {
log.Debug().Msgf("Keygroup %s has no entries, updating them now", keygroup)
m.updateKeygroupClients(keygroup)
} else if time.Since(set.lastUpdated) > keygroupTimeout {
log.Debug().Msgf("Keygroup %s has not been updated in %.0f minutes, doing it now", keygroup, keygroupTimeout.Minutes())
go m.updateKeygroupClients(keygroup)
}
}
// updateKeygroupClients updates the clients a keygroup has in a blocking way
func (m *ClientsMgr) updateKeygroupClients(keygroup string) {
log.Debug().Msgf("Updating Clients for Keygroup %s", keygroup)
replica, err := m.GetFastestClient().GetKeygroupReplica(context.Background(), keygroup)
if err != nil {
replica, err = m.GetClientTo(m.lighthouse).GetKeygroupReplica(context.Background(), keygroup)
if err != nil {
log.Error().Msgf("updateKeygroupClients cannot reach fastest client OR lighthouse...")
return
}
}
log.Debug().Msgf("updateKeygroupClients: Got replicas: %#v", replica)
m.Lock()
defer m.Unlock()
set, exists := m.keygroups[keygroup]
if !exists {
m.keygroups[keygroup] = &keygroupSet{
lastUpdated: time.Now(),
clients: make(map[string]*clientExpiry),
}
set = m.keygroups[keygroup]
}
set.clients = make(map[string]*clientExpiry)
for _, client := range replica.Replica {
set.clients[client.NodeId] = &clientExpiry{
client: m.GetClientTo(client.Host),
expiry: client.Expiry,
}
}
set.lastUpdated = time.Now()
m.keygroups[keygroup] = set
log.Debug().Msgf("updateKeygroupClients: new Clients are: %#v", set)
}
// GetFastestClientWithKeygroup returns the fastest client that has the keygroup with an expiry bigger than the parameter
// set expiry to 1 to get any client, 0=no expiry
func (m *ClientsMgr) GetFastestClientWithKeygroup(keygroup string, expiry int64) (client *Client, err error) {
m.maybeUpdateKeygroupClients(keygroup)
m.Lock()
clients := m.keygroups[keygroup]
m.Unlock()
if clients == nil {
log.Debug().Msgf("GetFastestClientWithKeygroup: No clients to keygroup %s", keygroup)
m.updateKeygroupClients(keygroup)
m.Lock()
clients = m.keygroups[keygroup]
m.Unlock()
}
log.Debug().Msgf("Clients before filtering: %#v", clients)
filteredClients := filterClientsToExpiry(clients.clients, expiry)
fastestClient := getFastestClientByClientExpiry(filteredClients)
if fastestClient == nil {
return nil, fmt.Errorf("was not able to find any client to keygroup %s with expiry > %d", keygroup, expiry)
}
return fastestClient, nil
}
func (m *ClientsMgr) GetRandomClientWithKeygroup(keygroup string, expiry int64) (client *Client, err error) {
m.maybeUpdateKeygroupClients(keygroup)
m.Lock()
clients := m.keygroups[keygroup]
m.Unlock()
filtered := filterClientsToExpiry(clients.clients, expiry)
// Get random element from this list
log.Debug().Msgf("Len filtered is %#v", len(filtered))
if len(filtered) == 0 {
return nil, fmt.Errorf("was not able to find ANY client to keygroup %s with expiry > %d. Clients: %#v", keygroup, expiry, clients)
}
nodeI := rand.Intn(len(filtered))
curI := 0
for _, v := range filtered {
if nodeI == curI {
return v.client, nil
}
curI++
}
return nil, fmt.Errorf("was not able to find RANDOM client to keygroup %s with expiry > %d", keygroup, expiry)
}
......@@ -2,26 +2,103 @@ package alexandra
import (
"context"
"math/rand"