Commit 6d191f76 authored by pfandzelter's avatar pfandzelter
Browse files

Tests for Client Centric Consistency

parent 0a7fb8bd
Pipeline #43148 passed with stages
in 21 minutes and 21 seconds
......@@ -6,4 +6,5 @@
!tests/3NodeTest/cmd
!tests/3NodeTest/pkg/
!tests/AlexandraTest/
!tests/consistency
!proto
\ No newline at end of file
......@@ -31,5 +31,4 @@ COPY --from=golang /go/bin/frednode fred
EXPOSE 443
EXPOSE 5555
ENV PATH=.
ENTRYPOINT ["fred"]
ENTRYPOINT [ "./fred" ]
......@@ -36,7 +36,7 @@ func parseArgs() (c config) {
flag.StringVar(&(c.logHandler), "log-handler", "dev", "dev or prod")
flag.BoolVar(&(c.isProxied), "is-proxy", false, "Is this behind a proxy?")
flag.StringVar(&(c.proxyHost), "proxy-host", "", "Proxy host if this is proxied")
flag.StringVar(&(c.address), "address", "172.26.4.1:10000", "where to start the server")
flag.StringVar(&(c.address), "address", "", "where to start the server")
flag.Parse()
return
}
......@@ -53,7 +53,9 @@ func main() {
},
)
log.Info().Msgf("%#v", c)
zerolog.DisableSampling(true)
log.Info().Msgf("Current configuration:\n%+v", c)
} else if c.logHandler != "prod" {
log.Fatal().Msg("Log Handler has to be either dev or prod")
}
......
......@@ -201,6 +201,8 @@ func main() {
NoColor: false,
},
)
zerolog.DisableSampling(true)
} else if fc.Log.Handler != "prod" {
log.Fatal().Msg("Log ExtHandler has to be either dev or prod")
}
......@@ -240,8 +242,7 @@ func main() {
// is, _ := json.MarshalIndent(fc, "", " ")
// return string(is)
// })())
log.Debug().Msg("Current configuration:")
log.Debug().Msgf("%#v", fc)
log.Info().Msgf("Current configuration:\n%+v", fc)
switch fc.Log.Level {
case "debug":
......@@ -265,7 +266,7 @@ func main() {
switch fc.Storage.Adaptor {
case "badgerdb":
log.Debug().Msgf("badgerdb struct is: %#v", fc.Bdb)
log.Debug().Msgf("badgerdb struct is: %+v", fc.Bdb)
store = badgerdb.New(fc.Bdb.Path)
case "memory":
store = badgerdb.NewMemory()
......
......@@ -45,6 +45,8 @@ func main() {
NoColor: false,
},
)
zerolog.DisableSampling(true)
} else if *loghandler != "prod" {
log.Fatal().Msg("Log Handler has to be either dev or prod")
}
......
......@@ -35,7 +35,7 @@ type Server struct {
// PutItemTrigger calls HandleUpdate on the Inthandler
func (s *Server) PutItemTrigger(_ context.Context, request *trigger.PutItemTriggerRequest) (*trigger.Empty, error) {
log.Debug().Msgf("Trigger Node has rcvd PutItem. In: %#v", request)
log.Debug().Msgf("Trigger Node has rcvd PutItem. In: %+v", request)
s.log = append(s.log, LogEntry{
Op: "put",
......@@ -49,7 +49,7 @@ func (s *Server) PutItemTrigger(_ context.Context, request *trigger.PutItemTrigg
// DeleteItemTrigger calls this Method on the Inthandler
func (s *Server) DeleteItemTrigger(_ context.Context, request *trigger.DeleteItemTriggerRequest) (*trigger.Empty, error) {
log.Debug().Msgf("Trigger Node has rcvd DeleteItem. In: %#v", request)
log.Debug().Msgf("Trigger Node has rcvd DeleteItem. In: %+v", request)
s.log = append(s.log, LogEntry{
Op: "del",
......@@ -146,6 +146,8 @@ func main() {
NoColor: false,
},
)
zerolog.DisableSampling(true)
} else if *loghandler != "prod" {
log.Fatal().Msg("Log Handler has to be either dev or prod")
}
......
......@@ -47,6 +47,8 @@ func main() {
NoColor: false,
},
)
zerolog.DisableSampling(true)
} else if *loghandler != "prod" {
log.Fatal().Msg("Log Handler has to be either dev or prod")
}
......
package alexandra
import (
"fmt"
"sync"
"github.com/DistributedClocks/GoVector/govec/vclock"
"github.com/rs/zerolog/log"
)
type cache struct {
......@@ -84,28 +86,61 @@ func (c *cache) add(kg string, id string, version vclock.VClock) error {
newClocks := make([]vclock.VClock, 0, len(c.keygroups[kg].items[id].clocks))
// let's first go to the existing versions
for _, v := range c.keygroups[kg].items[id].clocks {
// seems like we have that version in store already!
// just return, nothing to do here
if version.Compare(v, vclock.Equal) {
return nil
}
// if there is a concurrent version to our new version, we need to keep that
if version.Compare(v, vclock.Concurrent) {
newClocks = append(newClocks, v)
continue
}
// if by chance we come upon a newer version, this has all been pointless
// actually, this is a bad sign: we seem to have read outdated data!
if version.Compare(v, vclock.Descendant) {
return nil
return fmt.Errorf("add failed because your version %v is older than cached version %v", version, v)
}
}
// now actually add the new version
newClocks = append(newClocks, version.Copy())
log.Debug().Msgf("Cache Add: new: %+v old: %+v", newClocks, c.keygroups[kg].items[id].clocks)
// and store our new clocks
c.keygroups[kg].items[id].clocks = newClocks
return nil
}
func (c *cache) supersede(kg string, id string, version vclock.VClock) error {
func (c *cache) supersede(kg string, id string, known []vclock.VClock, version vclock.VClock) error {
c.cLock(kg, id)
defer c.cUnlock(kg, id)
c.keygroups[kg].items[id].clocks = []vclock.VClock{
newClocks := []vclock.VClock{
version.Copy(),
}
// add all clocks from cache as well, unless they are in the "known" array
for _, v := range c.keygroups[kg].items[id].clocks {
discard := false
for _, k := range known {
if v.Compare(k, vclock.Equal) {
discard = true
break
}
}
if discard {
continue
}
newClocks = append(newClocks, v)
}
c.keygroups[kg].items[id].clocks = newClocks
return nil
}
......
......@@ -10,7 +10,7 @@ import (
"time"
clientsProto "git.tu-berlin.de/mcc-fred/fred/proto/client"
alexandraProto "git.tu-berlin.de/mcc-fred/fred/proto/middleware"
"git.tu-berlin.de/mcc-fred/fred/proto/middleware"
"github.com/DistributedClocks/GoVector/govec/vclock"
"github.com/rs/zerolog/log"
)
......@@ -33,7 +33,7 @@ type clientExpiry struct {
// keygroupSet represents a keygroups clients and the last time this information was updated from FreD
type keygroupSet struct {
lastUpdated time.Time
clients map[string]*clientExpiry
clients []*clientExpiry
}
// ClientsMgr manages all Clients to Fred that Alexandra has. Is used to get fastest clients to keygroups etc. and to read from anywhere
......@@ -60,103 +60,88 @@ func newClientsManager(clientsCert, clientsKey, lighthouse string) *ClientsMgr {
return mgr
}
func (m *ClientsMgr) readFromAnywhere(ctx context.Context, request *alexandraProto.ReadRequest) ([]string, []vclock.VClock, error) {
log.Debug().Msgf("ClientsManager is reading from anywhere. Req= %#v", request)
go m.maybeUpdateKeygroupClients(request.Keygroup)
func (m *ClientsMgr) readFromAnywhere(request *middleware.ReadRequest) ([]string, []vclock.VClock, error) {
log.Debug().Msgf("ClientsManager is reading from anywhere. Req= %+v", request)
m.maybeUpdateKeygroupClients(request.Keygroup)
// Start a coroutine to every fastestClient we ask
m.Lock()
set, exists := m.keygroups[request.Keygroup]
m.Unlock()
type readResponse struct {
error bool
vals []string
versions []vclock.VClock
}
responses := make(chan readResponse)
responsesClosed := false
sentAsks := 0
clientsToAsk := make(map[*Client]struct{})
// Start a coroutine to every fastestClient we ask
m.Lock()
set, exists := m.keygroups[request.Keygroup]
m.Unlock()
clts := filterClientsToExpiry(set.clients, request.MinExpiry)
if !exists || len(clts) == 0 {
log.Error().Msgf("...found no clients with minimum expiry. Clients with longer expiry: %#v", set.clients)
log.Error().Msgf("...found no clients with minimum expiry. Clients with longer expiry: %+v", set.clients)
return nil, nil, errors.New("there are no members of this keygroup")
}
askNode := func(client *Client) {
// TODO select channels?
// TODO buffered Channel
log.Debug().Msgf("...asking Client %#v for Keygroup %s", client, request.Keygroup)
res, err := client.Client.Read(context.Background(), &clientsProto.ReadRequest{Id: request.Id, Keygroup: request.Keygroup})
if err != nil {
log.Err(err).Msg("Reading from client returned error")
if !responsesClosed {
responses <- readResponse{
error: true,
}
}
} else {
log.Debug().Msgf("Reading from client returned data: %#v", res)
if !responsesClosed {
r := readResponse{
vals: make([]string, len(res.Data)),
versions: make([]vclock.VClock, len(res.Data)),
}
for i := range res.Data {
r.vals[i] = res.Data[i].Val
r.versions[i] = res.Data[i].Version.Version
}
responses <- r
}
}
}
// let's figure out who we want to ask
// Ask the fastest fastestClient
fastestClient, err := m.getFastestClientWithKeygroup(request.Keygroup, request.MinExpiry)
if err == nil {
go askNode(fastestClient)
sentAsks++
clientsToAsk[fastestClient] = struct{}{}
}
// and add a maximum of otherNodeToAsk other clients
for i := 0; i < otherNodesToAsk; i++ {
clientsToAsk[clts[rand.Intn(len(clts))].client] = struct{}{}
}
// Ask $otherNodesToAsk other Clients
if len(clts) > 2 { // If its only one element long the one node is also the fastest node
if otherNodesToAsk > len(clts) {
for _, client := range clts {
go askNode(client.client)
sentAsks++
var wg sync.WaitGroup
responses := make(chan readResponse, len(clientsToAsk))
done := make(chan struct{})
for c := range clientsToAsk {
wg.Add(1)
go func(c *Client) {
defer wg.Done()
log.Debug().Msgf("...asking Client %+v for Keygroup %s", c, request.Keygroup)
res, err := c.Client.Read(context.Background(), &clientsProto.ReadRequest{Id: request.Id, Keygroup: request.Keygroup})
if err != nil {
log.Err(err).Msg("Reading from client returned error")
return
}
} else {
i := 0
otherClientsNames := make([]string, len(clts))
for k := range clts {
otherClientsNames[i] = k
i++
r := readResponse{
vals: make([]string, len(res.Data)),
versions: make([]vclock.VClock, len(res.Data)),
}
for i := 0; i < otherNodesToAsk; i++ {
id := rand.Intn(len(otherClientsNames))
go askNode(clts[otherClientsNames[id]].client)
sentAsks++
for i := range res.Data {
r.vals[i] = res.Data[i].Val
r.versions[i] = res.Data[i].Version.Version
log.Debug().Msgf("Reading from client returned data: %+v %+v", res.Data[i].Val, res.Data[i].Version.Version)
}
}
responses <- r
}(c)
}
// Wait for results and return the first good one
var res readResponse
rectRes := 0
for res = range responses {
log.Debug().Msgf("...waiting for the first answer to return it. res=%#v", res)
rectRes++
if !res.error {
log.Debug().Msgf("...got Response without error (closing channel): %#v", res)
responsesClosed = true
return res.vals, res.versions, nil
}
if rectRes >= sentAsks {
log.Warn().Msgf("ReadFromAnywhere: no fastestClient was able to answer the read (closing channel). Kg=%s", request.Keygroup)
responsesClosed = true
break
}
// wait for all responses to come in and close the channel
go func() {
wg.Wait()
done <- struct{}{}
close(responses)
}()
// if we get a response, return that
// otherwise, if done is called
select {
case r := <-responses:
return r.vals, r.versions, nil
case <-done:
break
}
// There was no successful response -- Update the keygroup information and try one last time
......@@ -169,7 +154,7 @@ func (m *ClientsMgr) readFromAnywhere(ctx context.Context, request *alexandraPro
return nil, nil, fmt.Errorf("ReadFromAnywhere: there is no client with keygroup %s and expiry %d", request.Keygroup, request.MinExpiry)
}
result, err := client.read(ctx, request.Keygroup, request.Id)
result, err := client.read(context.Background(), request.Keygroup, request.Id)
if err != nil {
return nil, nil, fmt.Errorf("ReadFromAnywhere: cannot read from fastest client. err=%v", err)
}
......@@ -197,7 +182,7 @@ func (m *ClientsMgr) getClientTo(host string) (client *Client) {
return
}
func getFastestClient(clts map[string]*Client) (client *Client) {
func getFastestClient(clts []*Client) (client *Client) {
var minTime float32 = math.MaxFloat32
var minClient *Client
// Set the first client up as fastest client, so that it gets returned if no other client is found.
......@@ -214,30 +199,30 @@ func getFastestClient(clts map[string]*Client) (client *Client) {
return minClient
}
func getFastestClientByClientExpiry(clts map[string]*clientExpiry) (client *Client) {
func getFastestClientByClientExpiry(clts []*clientExpiry) (client *Client) {
if clts == nil {
return nil
}
var clientsMap = make(map[string]*Client)
for key, value := range clts {
clientsMap[key] = value.client
var clients = make([]*Client, len(clts))
for i, c := range clts {
clients[i] = c.client
}
return getFastestClient(clientsMap)
return getFastestClient(clients)
}
// filterClientsToExpiry if param=-1 then only exp==-1; if param=0 then anything; if param>=0 then anything exp >= than param
func filterClientsToExpiry(clientEx map[string]*clientExpiry, expiry int64) (out map[string]*clientExpiry) {
func filterClientsToExpiry(clientEx []*clientExpiry, expiry int64) (out []*clientExpiry) {
if clientEx == nil {
return nil
}
out = make(map[string]*clientExpiry)
for k, v := range clientEx {
out = make([]*clientExpiry, 0)
for _, v := range clientEx {
if expiry == -1 && v.expiry == -1 {
out[k] = v
out = append(out, v)
} else if expiry == 0 {
out[k] = v
out = append(out, v)
} else if v.expiry >= expiry {
out[k] = v
out = append(out, v)
}
}
return
......@@ -245,10 +230,10 @@ func filterClientsToExpiry(clientEx map[string]*clientExpiry, expiry int64) (out
func (m *ClientsMgr) getClient(keygroup string, slowprob float64) (*Client, error) {
if rand.Float64() < slowprob {
return m.getRandomClientWithKeygroup(keygroup, 1)
return m.getRandomClientWithKeygroup(keygroup, 0)
}
return m.getFastestClientWithKeygroup(keygroup, 1)
return m.getFastestClientWithKeygroup(keygroup, 0)
}
// getFastestClient searches for the fastest of the already existing clients
......@@ -257,7 +242,14 @@ func (m *ClientsMgr) getFastestClient() (client *Client) {
log.Info().Msg("ClientsMgr: GetFastestClient was called but there are not clients. Using lighthouse client")
return m.getClientTo(m.lighthouse)
}
return getFastestClient(m.clients)
clts := make([]*Client, 0, len(m.clients))
for _, c := range m.clients {
clts = append(clts, c)
}
return getFastestClient(clts)
}
// GetFastestClientWithKeygroup returns the fastest client that has the keygroup with an expiry bigger than the parameter
......@@ -274,7 +266,7 @@ func (m *ClientsMgr) getFastestClientWithKeygroup(keygroup string, expiry int64)
clients = m.keygroups[keygroup]
m.Unlock()
}
log.Debug().Msgf("Clients before filtering: %#v", clients)
log.Debug().Msgf("Clients before filtering: %+v", clients)
filteredClients := filterClientsToExpiry(clients.clients, expiry)
fastestClient := getFastestClientByClientExpiry(filteredClients)
if fastestClient == nil {
......@@ -290,9 +282,9 @@ func (m *ClientsMgr) getRandomClientWithKeygroup(keygroup string, expiry int64)
m.Unlock()
filtered := filterClientsToExpiry(clients.clients, expiry)
// Get random element from this list
log.Debug().Msgf("Len filtered is %#v", len(filtered))
log.Debug().Msgf("Len filtered is %+v", len(filtered))
if len(filtered) == 0 {
return nil, fmt.Errorf("was not able to find ANY client to keygroup %s with expiry > %d. Clients: %#v", keygroup, expiry, clients)
return nil, fmt.Errorf("was not able to find ANY client to keygroup %s with expiry > %d. Clients: %+v", keygroup, expiry, clients)
}
nodeI := rand.Intn(len(filtered))
curI := 0
......@@ -315,7 +307,7 @@ func (m *ClientsMgr) maybeUpdateKeygroupClients(keygroup string) {
m.updateKeygroupClients(keygroup)
} else if time.Since(set.lastUpdated) > keygroupTimeout {
log.Debug().Msgf("Keygroup %s has not been updated in %.0f minutes, doing it now", keygroup, keygroupTimeout.Minutes())
go m.updateKeygroupClients(keygroup)
m.updateKeygroupClients(keygroup)
}
}
......@@ -330,7 +322,7 @@ func (m *ClientsMgr) updateKeygroupClients(keygroup string) {
return
}
}
log.Debug().Msgf("updateKeygroupClients: Got replicas: %#v", replica)
log.Debug().Msgf("updateKeygroupClients: Got replicas: %+v", replica)
m.Lock()
defer m.Unlock()
......@@ -338,18 +330,17 @@ func (m *ClientsMgr) updateKeygroupClients(keygroup string) {
if !exists {
m.keygroups[keygroup] = &keygroupSet{
lastUpdated: time.Now(),
clients: make(map[string]*clientExpiry),
}
set = m.keygroups[keygroup]
}
set.clients = make(map[string]*clientExpiry)
for _, client := range replica.Replica {
set.clients[client.NodeId] = &clientExpiry{
set.clients = make([]*clientExpiry, len(replica.Replica))
for i, client := range replica.Replica {
set.clients[i] = &clientExpiry{
client: m.getClientTo(client.Host),
expiry: client.Expiry,
}
}
set.lastUpdated = time.Now()
m.keygroups[keygroup] = set
log.Debug().Msgf("updateKeygroupClients: new Clients are: %#v", set)
log.Debug().Msgf("updateKeygroupClients: new Clients are: %+v", set)
}
......@@ -5,7 +5,7 @@ import (
"fmt"
api "git.tu-berlin.de/mcc-fred/fred/proto/client"
alexandraProto "git.tu-berlin.de/mcc-fred/fred/proto/middleware"
"git.tu-berlin.de/mcc-fred/fred/proto/middleware"
"github.com/DistributedClocks/GoVector/govec/vclock"
"github.com/rs/zerolog/log"
)
......@@ -13,7 +13,7 @@ import (
// Scan issues a scan request from the client to the middleware. The request is forwarded to FReD and incoming items are
// checked for their versions by comparing locally cached versions (if any). The local cache is also updated
// (if applicable).
func (s *Server) Scan(ctx context.Context, req *alexandraProto.ScanRequest) (*alexandraProto.ScanResponse, error) {
func (s *Server) Scan(ctx context.Context, req *middleware.ScanRequest) (*middleware.ScanResponse, error) {
res, err := s.clientsMgr.getClientTo(s.lighthouse).Client.Scan(ctx, &api.ScanRequest{
Keygroup: req.Keygroup,
Id: req.Id,
......@@ -24,10 +24,10 @@ func (s *Server) Scan(ctx context.Context, req *alexandraProto.ScanRequest) (*al
return nil, err
}
data := make([]*alexandraProto.Data, len(res.Data))
data := make([]*middleware.Data, len(res.Data))
for i, datum := range res.Data {
data[i] = &alexandraProto.Data{
data[i] = &middleware.Data{
Id: datum.Id,
Data: datum.Val,
}
......@@ -35,18 +35,19 @@ func (s *Server) Scan(ctx context.Context, req *alexandraProto.ScanRequest) (*al
err = s.cache.add(req.Keygroup, req.Id, datum.Version.Version)
}
return &alexandraProto.ScanResponse{Data: data}, err
return &middleware.ScanResponse{Data: data}, err
}
// Read reads a datum from FReD. Read data are placed in cache (if not in there already). If multiple versions of a
// datum exist, all versions will be returned to the client so that it can choose one. If the read data is outdated
// compared to seen versions, an error is returned.
func (s *Server) Read(ctx context.Context, req *alexandraProto.ReadRequest) (*alexandraProto.ReadResponse, error) {
func (s *Server) Read(_ context.Context, req *middleware.ReadRequest) (*middleware.ReadResponse, error) {
log.Debug().Msgf("Alexandra has rcvd Read")
vals, versions, err := s.clientsMgr.readFromAnywhere(ctx, req)
vals, versions, err := s.clientsMgr.readFromAnywhere(req)
if err != nil {
log.Error().Err(err)
return nil, err
}
......@@ -56,31 +57,48 @@ func (s *Server) Read(ctx context.Context, req *alexandraProto.ReadRequest) (*al
return nil, err
}
for _, read := range versions {
for _, seen := range known {
if seen.Compare(read, vclock.Ancestor) {
return nil, fmt.Errorf("read version %v is older than seen version %v", read, seen)
for _, seen := range known {
// TODO: is there a more elegant solution to this?
// essentially we need to check the read data to cover all versions we have seen so far
// this means that for every version we have seen so far, there must be at least one version in the read data
// (preferably exactly one version) that is equal or newer to that seen version.
covered := false
for _, read := range versions {
if seen.Compare(read, vclock.Descendant) || seen.Compare(read, vclock.Equal) {
covered = true
break
}
}
if !covered {
log.Error().Msgf("Alexandra Read has seen version %v is not covered by given versions %+v", seen, versions)
return nil, fmt.Errorf("seen version %v is not covered by given versions %+v", seen, versions)
}
}
log.Debug().Msgf("Alexandra Read key %s in kg %s: got vals %v versions %v", req.Id, req.Keygroup, vals, versions)
for i := range versions {
log.Debug().Msgf("Alexandra Read: putting version %v in cache for %s", versions[i], req.Id)
err = s.cache.add(req.Keygroup, req.Id, versions[i])
if err != nil {
log.Error().Err(err)
return nil, err
}
}