Commit 669abd24 authored by pfandzelter's avatar pfandzelter
Browse files

Add Client Centric Consistency to ALExANDRA

parent 918c03ae
Pipeline #41502 passed with stages
in 14 minutes and 57 seconds
......@@ -2,7 +2,7 @@
<project version="4">
<component name="ProjectTasksOptions">
<TaskOptions isEnabled="true">
<option name="arguments" value="fmt $FilePath$" />
<option name="arguments" value="-l -w -s $FilePath$" />
<option name="checkSyntaxErrors" value="true" />
<option name="description" />
<option name="exitCodeBehavior" value="ERROR" />
......@@ -14,7 +14,7 @@
<array />
</option>
<option name="outputFromStdout" value="false" />
<option name="program" value="$GoExecPath$" />
<option name="program" value="gofmt" />
<option name="runOnExternalChanges" value="false" />
<option name="scopeName" value="Project Files" />
<option name="trackOnlyRoot" value="true" />
......
package alexandra
import (
"sync"
"github.com/DistributedClocks/GoVector/govec/vclock"
)
type cache struct {
keygroups map[string]struct {
items map[string]*struct {
clocks []vclock.VClock
*sync.Mutex
}
*sync.RWMutex
}
*sync.RWMutex
}
func newCache() *cache {
return &cache{
keygroups: make(map[string]struct {
items map[string]*struct {
clocks []vclock.VClock
*sync.Mutex
}
*sync.RWMutex
}),
RWMutex: &sync.RWMutex{},
}
}
func (c *cache) cLock(kg string, id string) {
c.RLock()
if _, ok := c.keygroups[kg]; !ok {
c.RUnlock()
c.Lock()
if _, ok := c.keygroups[kg]; !ok {
c.keygroups[kg] = struct {
items map[string]*struct {
clocks []vclock.VClock
*sync.Mutex
}
*sync.RWMutex
}{
items: make(map[string]*struct {
clocks []vclock.VClock
*sync.Mutex
}),
RWMutex: &sync.RWMutex{},
}
}
c.Unlock()
c.RLock()
}
c.keygroups[kg].RLock()
if _, ok := c.keygroups[kg].items[id]; !ok {
c.keygroups[kg].RUnlock()
c.keygroups[kg].Lock()
if _, ok := c.keygroups[kg].items[id]; !ok {
c.keygroups[kg].items[id] = &struct {
clocks []vclock.VClock
*sync.Mutex
}{
clocks: make([]vclock.VClock, 0),
Mutex: &sync.Mutex{},
}
}
c.keygroups[kg].Unlock()
c.keygroups[kg].RLock()
}
c.keygroups[kg].items[id].Lock()
}
func (c *cache) cUnlock(kg string, id string) {
c.keygroups[kg].items[id].Unlock()
c.keygroups[kg].RUnlock()
c.RUnlock()
}
func (c *cache) add(kg string, id string, version vclock.VClock) error {
c.cLock(kg, id)
defer c.cUnlock(kg, id)
newClocks := make([]vclock.VClock, 0, len(c.keygroups[kg].items[id].clocks))
for _, v := range c.keygroups[kg].items[id].clocks {
if version.Compare(v, vclock.Concurrent) {
newClocks = append(newClocks, v)
continue
}
if version.Compare(v, vclock.Descendant) {
return nil
}
}
c.keygroups[kg].items[id].clocks = newClocks
return nil
}
func (c *cache) supersede(kg string, id string, version vclock.VClock) error {
c.cLock(kg, id)
defer c.cUnlock(kg, id)
c.keygroups[kg].items[id].clocks = []vclock.VClock{
version.Copy(),
}
return nil
}
func (c *cache) get(kg string, id string) ([]vclock.VClock, error) {
c.cLock(kg, id)
defer c.cUnlock(kg, id)
return c.keygroups[kg].items[id].clocks, nil
}
......@@ -4,12 +4,14 @@ import (
"context"
"errors"
"fmt"
"math"
"math/rand"
"sync"
"time"
clientsProto "git.tu-berlin.de/mcc-fred/fred/proto/client"
alexandraProto "git.tu-berlin.de/mcc-fred/fred/proto/middleware"
"github.com/DistributedClocks/GoVector/govec/vclock"
"github.com/rs/zerolog/log"
)
......@@ -35,7 +37,7 @@ type keygroupSet struct {
}
// ClientsMgr manages all Clients to Fred that Alexandra has. Is used to get fastest clients to keygroups etc. and to read from anywhere
// there are 3 variables important for this configuration: keygruopTimeout, otherNodesToAsk, getSlowerNodeProb. Please see their documentation.
// there are 3 variables important for this configuration: keygroupTimeout, otherNodesToAsk, getSlowerNodeProb. Please see their documentation.
type ClientsMgr struct {
// Mutex for the keygroups map, because it might be changed while iterated over
sync.Mutex
......@@ -44,7 +46,7 @@ type ClientsMgr struct {
keygroups map[string]*keygroupSet
}
func NewClientsManager(clientsCert, clientsKey, lighthouse string) *ClientsMgr {
func newClientsManager(clientsCert, clientsKey, lighthouse string) *ClientsMgr {
mgr := &ClientsMgr{
clients: make(map[string]*Client),
clientsCert: clientsCert,
......@@ -53,18 +55,21 @@ func NewClientsManager(clientsCert, clientsKey, lighthouse string) *ClientsMgr {
keygroups: make(map[string]*keygroupSet),
}
// add the lighthouse client to the clients list
mgr.GetClientTo(lighthouse)
rand.Seed(time.Now().UnixNano())
mgr.getClientTo(lighthouse)
// rand.Seed(time.Now().UnixNano())
return mgr
}
func (m *ClientsMgr) ReadFromAnywhere(ctx context.Context, request *alexandraProto.ReadRequest) (*alexandraProto.ReadResponse, error) {
func (m *ClientsMgr) readFromAnywhere(ctx context.Context, request *alexandraProto.ReadRequest) ([]string, []vclock.VClock, error) {
log.Debug().Msgf("ClientsManager is reading from anywhere. Req= %#v", request)
go m.maybeUpdateKeygroupClients(request.Keygroup)
type readResponse struct {
error bool
data string
error bool
vals []string
versions []vclock.VClock
}
responses := make(chan readResponse)
responsesClosed := false
sentAsks := 0
......@@ -75,8 +80,8 @@ func (m *ClientsMgr) ReadFromAnywhere(ctx context.Context, request *alexandraPro
m.Unlock()
clts := filterClientsToExpiry(set.clients, request.MinExpiry)
if !exists || len(clts) == 0 {
log.Error().Msgf("...found no clients whith minimum expiry. Clients with longer expiry: %#v", set.clients)
return nil, errors.New("there are no members of this keygroup")
log.Error().Msgf("...found no clients with minimum expiry. Clients with longer expiry: %#v", set.clients)
return nil, nil, errors.New("there are no members of this keygroup")
}
askNode := func(client *Client) {
// TODO select channels?
......@@ -86,18 +91,28 @@ func (m *ClientsMgr) ReadFromAnywhere(ctx context.Context, request *alexandraPro
if err != nil {
log.Err(err).Msg("Reading from client returned error")
if !responsesClosed {
responses <- readResponse{error: true, data: ""}
responses <- readResponse{
error: true,
}
}
} else {
log.Debug().Msgf("Reading from client returned data: %#v", res)
if !responsesClosed {
responses <- readResponse{error: false, data: res.Data[0].Val}
r := readResponse{
vals: make([]string, len(res.Data)),
versions: make([]vclock.VClock, len(res.Data)),
}
for i := range res.Data {
r.vals[i] = res.Data[i].Val
r.versions[i] = res.Data[i].Version
}
responses <- r
}
}
}
// Ask the fastest fastestClient
fastestClient, err := m.GetFastestClientWithKeygroup(request.Keygroup, request.MinExpiry)
fastestClient, err := m.getFastestClientWithKeygroup(request.Keygroup, request.MinExpiry)
if err == nil {
go askNode(fastestClient)
sentAsks++
......@@ -134,7 +149,8 @@ func (m *ClientsMgr) ReadFromAnywhere(ctx context.Context, request *alexandraPro
if !res.error {
log.Debug().Msgf("...got Response without error (closing channel): %#v", res)
responsesClosed = true
return &alexandraProto.ReadResponse{Data: res.data}, nil
return res.vals, res.versions, nil
}
if rectRes >= sentAsks {
log.Warn().Msgf("ReadFromAnywhere: no fastestClient was able to answer the read (closing channel). Kg=%s", request.Keygroup)
......@@ -147,14 +163,193 @@ func (m *ClientsMgr) ReadFromAnywhere(ctx context.Context, request *alexandraPro
log.Info().Msg("ReadFromAnywhere: Was not able to reach any queried node, updating cache and retrying...")
m.updateKeygroupClients(request.Keygroup)
client, err := m.GetFastestClientWithKeygroup(request.Keygroup, request.MinExpiry)
client, err := m.getFastestClientWithKeygroup(request.Keygroup, request.MinExpiry)
if err != nil {
return nil, fmt.Errorf("ReadFromAnywhere: there is no client with keygroup %s and expiry %d", request.Keygroup, request.MinExpiry)
return nil, nil, fmt.Errorf("ReadFromAnywhere: there is no client with keygroup %s and expiry %d", request.Keygroup, request.MinExpiry)
}
result, err := client.read(ctx, request.Keygroup, request.Id)
if err != nil {
return nil, nil, fmt.Errorf("ReadFromAnywhere: cannot read from fastest client. err=%v", err)
}
vals := make([]string, len(result.Data))
versions := make([]vclock.VClock, len(result.Data))
for i := range result.Data {
vals[i] = result.Data[i].Val
versions[i] = result.Data[i].Version
}
return vals, versions, nil
}
// GetClientTo returns a client with this address
func (m *ClientsMgr) getClientTo(host string) (client *Client) {
log.Info().Msgf("GetClientTo: Trying to get Fred Client to host %s", host)
client = m.clients[host]
if client != nil {
return
}
client = newClient(host, m.clientsCert, m.clientsKey)
m.clients[host] = client
return
}
func getFastestClient(clts map[string]*Client) (client *Client) {
var minTime float32 = math.MaxFloat32
var minClient *Client
// Set the first client up as fastest client, so that it gets returned if no other client is found.
for _, value := range clts {
minClient = value
break
}
for _, value := range clts {
if value.ReadSpeed < minTime && value.ReadSpeed != -1 {
minTime = value.ReadSpeed
minClient = value
}
}
return minClient
}
func getFastestClientByClientExpiry(clts map[string]*clientExpiry) (client *Client) {
if clts == nil {
return nil
}
var clientsMap = make(map[string]*Client)
for key, value := range clts {
clientsMap[key] = value.client
}
return getFastestClient(clientsMap)
}
// filterClientsToExpiry if param=-1 then only exp==-1; if param=0 then anything; if param>=0 then anything exp >= than param
func filterClientsToExpiry(clientEx map[string]*clientExpiry, expiry int64) (out map[string]*clientExpiry) {
if clientEx == nil {
return nil
}
out = make(map[string]*clientExpiry)
for k, v := range clientEx {
if expiry == -1 && v.expiry == -1 {
out[k] = v
} else if expiry == 0 {
out[k] = v
} else if v.expiry >= expiry {
out[k] = v
}
}
return
}
func (m *ClientsMgr) getClient(keygroup string, slowprob float64) (*Client, error) {
if rand.Float64() < slowprob {
return m.getRandomClientWithKeygroup(keygroup, 1)
}
result, err := client.Read(ctx, request.Keygroup, request.Id)
return m.getFastestClientWithKeygroup(keygroup, 1)
}
// getFastestClient searches for the fastest of the already existing clients
func (m *ClientsMgr) getFastestClient() (client *Client) {
if len(m.clients) == 0 {
log.Info().Msg("ClientsMgr: GetFastestClient was called but there are not clients. Using lighthouse client")
return m.getClientTo(m.lighthouse)
}
return getFastestClient(m.clients)
}
// GetFastestClientWithKeygroup returns the fastest client that has the keygroup with an expiry bigger than the parameter
// set expiry to 1 to get any client, 0=no expiry
func (m *ClientsMgr) getFastestClientWithKeygroup(keygroup string, expiry int64) (client *Client, err error) {
m.maybeUpdateKeygroupClients(keygroup)
m.Lock()
clients := m.keygroups[keygroup]
m.Unlock()
if clients == nil {
log.Debug().Msgf("GetFastestClientWithKeygroup: No clients to keygroup %s", keygroup)
m.updateKeygroupClients(keygroup)
m.Lock()
clients = m.keygroups[keygroup]
m.Unlock()
}
log.Debug().Msgf("Clients before filtering: %#v", clients)
filteredClients := filterClientsToExpiry(clients.clients, expiry)
fastestClient := getFastestClientByClientExpiry(filteredClients)
if fastestClient == nil {
return nil, fmt.Errorf("was not able to find any client to keygroup %s with expiry > %d", keygroup, expiry)
}
return fastestClient, nil
}
func (m *ClientsMgr) getRandomClientWithKeygroup(keygroup string, expiry int64) (client *Client, err error) {
m.maybeUpdateKeygroupClients(keygroup)
m.Lock()
clients := m.keygroups[keygroup]
m.Unlock()
filtered := filterClientsToExpiry(clients.clients, expiry)
// Get random element from this list
log.Debug().Msgf("Len filtered is %#v", len(filtered))
if len(filtered) == 0 {
return nil, fmt.Errorf("was not able to find ANY client to keygroup %s with expiry > %d. Clients: %#v", keygroup, expiry, clients)
}
nodeI := rand.Intn(len(filtered))
curI := 0
for _, v := range filtered {
if nodeI == curI {
return v.client, nil
}
curI++
}
return nil, fmt.Errorf("was not able to find RANDOM client to keygroup %s with expiry > %d", keygroup, expiry)
}
// maybeUpdateKeygroupClients updates the cached keygroups of a client if it hasn't happened $keygroupCacheTimeout
func (m *ClientsMgr) maybeUpdateKeygroupClients(keygroup string) {
m.Lock()
set, exists := m.keygroups[keygroup]
m.Unlock()
if !exists {
log.Debug().Msgf("Keygroup %s has no entries, updating them now", keygroup)
m.updateKeygroupClients(keygroup)
} else if time.Since(set.lastUpdated) > keygroupTimeout {
log.Debug().Msgf("Keygroup %s has not been updated in %.0f minutes, doing it now", keygroup, keygroupTimeout.Minutes())
go m.updateKeygroupClients(keygroup)
}
}
// updateKeygroupClients updates the clients a keygroup has in a blocking way
func (m *ClientsMgr) updateKeygroupClients(keygroup string) {
log.Debug().Msgf("Updating Clients for Keygroup %s", keygroup)
replica, err := m.getFastestClient().getKeygroupReplica(context.Background(), keygroup)
if err != nil {
return nil, fmt.Errorf("ReadFromAnywhere: cannot read from fastest client. err=%v", err)
replica, err = m.getClientTo(m.lighthouse).getKeygroupReplica(context.Background(), keygroup)
if err != nil {
log.Error().Msgf("updateKeygroupClients cannot reach fastest client OR lighthouse...")
return
}
}
log.Debug().Msgf("updateKeygroupClients: Got replicas: %#v", replica)
return &alexandraProto.ReadResponse{Data: result.Data[0].Val}, nil
m.Lock()
defer m.Unlock()
set, exists := m.keygroups[keygroup]
if !exists {
m.keygroups[keygroup] = &keygroupSet{
lastUpdated: time.Now(),
clients: make(map[string]*clientExpiry),
}
set = m.keygroups[keygroup]
}
set.clients = make(map[string]*clientExpiry)
for _, client := range replica.Replica {
set.clients[client.NodeId] = &clientExpiry{
client: m.getClientTo(client.Host),
expiry: client.Expiry,
}
}
set.lastUpdated = time.Now()
m.keygroups[keygroup] = set
log.Debug().Msgf("updateKeygroupClients: new Clients are: %#v", set)
}
......@@ -7,7 +7,8 @@ import (
"io/ioutil"
"time"
fredClients "git.tu-berlin.de/mcc-fred/fred/proto/client"
api "git.tu-berlin.de/mcc-fred/fred/proto/client"
"github.com/DistributedClocks/GoVector/govec/vclock"
"github.com/rs/zerolog/log"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
......@@ -17,12 +18,12 @@ import (
const alphaItemSpeed = float32(0.8)
type Client struct {
Client fredClients.ClientClient
Client api.ClientClient
conn *grpc.ClientConn
ReadSpeed float32
}
func NewClient(host, certFile, keyFile string) *Client {
func newClient(host, certFile, keyFile string) *Client {
if certFile == "" {
log.Fatal().Msg("fredclient: no certificate file given")
......@@ -62,10 +63,10 @@ func NewClient(host, certFile, keyFile string) *Client {
if err != nil {
log.Fatal().Err(err).Msgf("Cannot create Grpc connection to client %s", host)
return &Client{Client: fredClients.NewClientClient(conn)}
return &Client{Client: api.NewClientClient(conn)}
}
log.Info().Msgf("Creating a connection to fred node: %s", host)
return &Client{Client: fredClients.NewClientClient(conn), conn: conn, ReadSpeed: -1}
return &Client{Client: api.NewClientClient(conn), conn: conn, ReadSpeed: -1}
}
// updateItemSpeed saves a moving average of how long it takes for a fred node to respond.
......@@ -82,8 +83,8 @@ func (c *Client) updateItemSpeed(elapsed time.Duration) {
}
}
func (c *Client) CreateKeygroup(ctx context.Context, keygroup string, mutable bool, expiry int64) (*fredClients.Empty, error) {
res, err := c.Client.CreateKeygroup(ctx, &fredClients.CreateKeygroupRequest{
func (c *Client) createKeygroup(ctx context.Context, keygroup string, mutable bool, expiry int64) (*api.Empty, error) {
res, err := c.Client.CreateKeygroup(ctx, &api.CreateKeygroupRequest{
Keygroup: keygroup,
Mutable: mutable,
Expiry: expiry,
......@@ -91,15 +92,15 @@ func (c *Client) CreateKeygroup(ctx context.Context, keygroup string, mutable bo
return res, err
}
func (c *Client) DeleteKeygroup(ctx context.Context, keygroup string) (*fredClients.Empty, error) {
res, err := c.Client.DeleteKeygroup(ctx, &fredClients.DeleteKeygroupRequest{Keygroup: keygroup})
func (c *Client) deleteKeygroup(ctx context.Context, keygroup string) (*api.Empty, error) {
res, err := c.Client.DeleteKeygroup(ctx, &api.DeleteKeygroupRequest{Keygroup: keygroup})
return res, err
}
// Read also updates the moving average item speed
func (c *Client) Read(ctx context.Context, keygroup string, id string) (*fredClients.ReadResponse, error) {
func (c *Client) read(ctx context.Context, keygroup string, id string) (*api.ReadResponse, error) {
start := time.Now()
res, err := c.Client.Read(ctx, &fredClients.ReadRequest{
res, err := c.Client.Read(ctx, &api.ReadRequest{
Keygroup: keygroup,
Id: id,
})
......@@ -110,111 +111,121 @@ func (c *Client) Read(ctx context.Context, keygroup string, id string) (*fredCli
return res, err
}
/*
// Update also updates the moving average item speed
func (c *Client) Update(ctx context.Context, keygroup string, id string, data string) (*fredClients.Empty, error) {
func (c *Client) update(ctx context.Context, keygroup string, id string, data string) (vclock.VClock, error) {
start := time.Now()
_, err := c.Client.Update(ctx, &fredClients.UpdateRequest{
res, err := c.Client.Update(ctx, &api.UpdateRequest{
Keygroup: keygroup,
Id: id,
Data: data,
})
if err == nil {
elapsed := time.Since(start)
c.updateItemSpeed(elapsed)
if err != nil {
return nil, err
}
elapsed := time.Since(start)
c.updateItemSpeed(elapsed)
return res.Version.Version, nil
}*/
// UpdateVersions also updates the moving average item speed
func (c *Client) updateVersions(ctx context.Context, keygroup string, id string, data string, versions []vclock.VClock) (vclock.VClock, error) {
v := make([]*api.Version, len(versions))
for i, vvector := range versions {
v[i] = &api.Version{
Version: vvector.GetMap(),
}
}
return &fredClients.Empty{}, err
}
// Delete also updates the moving average item speed
func (c *Client) Delete(ctx context.Context, keygroup string, id string) (*fredClients.Empty, error) {
start := time.Now()
_, err := c.Client.Delete(ctx, &fredClients.DeleteRequest{
res, err := c.Client.Update(ctx, &api.UpdateRequest{
Keygroup: keygroup,
Id: id,
Data: data,
Versions: v,
})
if err == nil {
elapsed := time.Since(start)
c.updateItemSpeed(elapsed)
if err != nil {
return nil, err
}
return &fredClients.Empty{}, err
elapsed := time.Since(start)
c.updateItemSpeed(elapsed)
return res.Version.Version, nil
}
// Append also updates the moving average item speed
func (c *Client) Append(ctx context.Context, keygroup string, data string) (*fredClients.AppendResponse, error) {
/*
// Delete also updates the moving average item speed
func (c *Client) delete(ctx context.Context, keygroup string, id string) (vclock.VClock, error) {
start := time.Now()
id := uint64(time.Now().UnixNano())
res, err := c.Client.Append(ctx, &fredClients.AppendRequest{
res, err := c.Client.Delete(ctx, &api.DeleteRequest{
Keygroup: keygroup,
Id: id,
Data: data,
})
if err == nil {
elapsed := time.Since(start)
c.updateItemSpeed(elapsed)
if err != nil {
return nil, err
}
return res, err
}