Commit 8cf493e1 authored by pfandzelter's avatar pfandzelter
Browse files

Merge branch 'tp/check-nase-caching' into 'main'

fix append-only keygroups replication

See merge request !145
parents 0569b2e2 b06dabd0
Pipeline #16362 passed with stages
in 15 minutes and 28 seconds
......@@ -263,7 +263,21 @@ func (s *Storage) IDs(kg string) ([]string, error) {
}
// Update updates the item with the specified id in the specified keygroup.
func (s *Storage) Update(kg, id, val string, expiry int) error {
func (s *Storage) Update(kg, id, val string, append bool, expiry int) error {
if append {
// make sure that we have our local sequence on point
key, err := strconv.Atoi(id)
if err != nil {
return errors.New(err)
}
for n, err := s.incrementSequence(kg); n < uint64(key); n, err = s.incrementSequence(kg) {
if err != nil {
return errors.New(err)
}
}
}
err := s.db.Update(func(txn *badger.Txn) error {
key := makeKeyName(kg, id)
......@@ -322,20 +336,7 @@ func (s *Storage) Append(kg, val string, expiry int) (string, error) {
// maximum of 18446744073709551615, though!
// if you reach this maximum, please send me a letter
seq, ok := s.seq[kg]
if !ok {
newSeq, err := s.db.GetSequence(makeLogConfigKeyName(kg), 100)
if err != nil {
return "", errors.New(err)
}
s.seq[kg] = newSeq
seq = newSeq
}
n, err := seq.Next()
n, err := s.incrementSequence(kg)
if err != nil {
return "", errors.New(err)
......@@ -374,6 +375,29 @@ func (s *Storage) Append(kg, val string, expiry int) (string, error) {
return id, nil
}
func (s *Storage) incrementSequence(kg string) (uint64, error) {
seq, ok := s.seq[kg]
if !ok {
newSeq, err := s.db.GetSequence(makeLogConfigKeyName(kg), 100)
if err != nil {
return 0, errors.New(err)
}
s.seq[kg] = newSeq
seq = newSeq
}
n, err := seq.Next()
if err != nil {
return 0, errors.New(err)
}
return n, nil
}
// Exists checks if the given data item exists in the badgerdb database.
func (s *Storage) Exists(kg string, id string) bool {
err := s.db.View(func(txn *badger.Txn) error {
......
......@@ -102,7 +102,7 @@ func TestReadSome(t *testing.T) {
ids[i] = "id" + strconv.Itoa(i)
vals[i] = "val" + strconv.Itoa(i)
err = db.Update(kg, ids[i], vals[i], 0)
err = db.Update(kg, ids[i], vals[i], false, 0)
if err != nil {
log.Err(err).Msg(err.(*errors.Error).ErrorStack())
......@@ -135,21 +135,21 @@ func TestReadAll(t *testing.T) {
t.Error(err)
}
err = db.Update(kg, "id-1", "data-1", 0)
err = db.Update(kg, "id-1", "data-1", false, 0)
if err != nil {
log.Err(err).Msg(err.(*errors.Error).ErrorStack())
t.Error(err)
}
err = db.Update(kg, "id-2", "data-2", 0)
err = db.Update(kg, "id-2", "data-2", false, 0)
if err != nil {
log.Err(err).Msg(err.(*errors.Error).ErrorStack())
t.Error(err)
}
err = db.Update(kg, "id-3", "data-3", 0)
err = db.Update(kg, "id-3", "data-3", false, 0)
if err != nil {
log.Err(err).Msg(err.(*errors.Error).ErrorStack())
......@@ -165,21 +165,21 @@ func TestReadAll(t *testing.T) {
t.Error(err)
}
err = db.Update(kg2, "id-1", "data-1", 0)
err = db.Update(kg2, "id-1", "data-1", false, 0)
if err != nil {
log.Err(err).Msg(err.(*errors.Error).ErrorStack())
t.Error(err)
}
err = db.Update(kg2, "id-2", "data-2", 0)
err = db.Update(kg2, "id-2", "data-2", false, 0)
if err != nil {
log.Err(err).Msg(err.(*errors.Error).ErrorStack())
t.Error(err)
}
err = db.Update(kg2, "id-3", "data-3", 0)
err = db.Update(kg2, "id-3", "data-3", false, 0)
if err != nil {
log.Err(err).Msg(err.(*errors.Error).ErrorStack())
......@@ -208,21 +208,21 @@ func TestIDs(t *testing.T) {
t.Error(err)
}
err = db.Update(kg, "id-1", "data-1", 0)
err = db.Update(kg, "id-1", "data-1", false, 0)
if err != nil {
log.Err(err).Msg(err.(*errors.Error).ErrorStack())
t.Error(err)
}
err = db.Update(kg, "id-2", "data-2", 0)
err = db.Update(kg, "id-2", "data-2", false, 0)
if err != nil {
log.Err(err).Msg(err.(*errors.Error).ErrorStack())
t.Error(err)
}
err = db.Update(kg, "id-3", "data-3", 0)
err = db.Update(kg, "id-3", "data-3", false, 0)
if err != nil {
log.Err(err).Msg(err.(*errors.Error).ErrorStack())
......@@ -238,21 +238,21 @@ func TestIDs(t *testing.T) {
t.Error(err)
}
err = db.Update(kg2, "id-1", "data-1", 0)
err = db.Update(kg2, "id-1", "data-1", false, 0)
if err != nil {
log.Err(err).Msg(err.(*errors.Error).ErrorStack())
t.Error(err)
}
err = db.Update(kg2, "id-2", "data-2", 0)
err = db.Update(kg2, "id-2", "data-2", false, 0)
if err != nil {
log.Err(err).Msg(err.(*errors.Error).ErrorStack())
t.Error(err)
}
err = db.Update(kg2, "id-3", "data-3", 0)
err = db.Update(kg2, "id-3", "data-3", false, 0)
if err != nil {
log.Err(err).Msg(err.(*errors.Error).ErrorStack())
......@@ -283,7 +283,7 @@ func TestItemExists(t *testing.T) {
t.Error(err)
}
err = db.Update(kg, id, value, 0)
err = db.Update(kg, id, value, false, 0)
if err != nil {
log.Err(err).Msg(err.(*errors.Error).ErrorStack())
......@@ -314,7 +314,7 @@ func TestItemGet(t *testing.T) {
t.Error(err)
}
err = db.Update(kg, id, value, 0)
err = db.Update(kg, id, value, false, 0)
if err != nil {
log.Err(err).Msg(err.(*errors.Error).ErrorStack())
......@@ -343,7 +343,7 @@ func TestItemDelete(t *testing.T) {
t.Error(err)
}
err = db.Update(kg, id, value, 0)
err = db.Update(kg, id, value, false, 0)
if err != nil {
log.Err(err).Msg(err.(*errors.Error).ErrorStack())
......@@ -390,7 +390,7 @@ func TestItemAfterDeleteKeygroup(t *testing.T) {
t.Error(err)
}
err = db.Update(kg, id, value, 0)
err = db.Update(kg, id, value, false, 0)
if err != nil {
log.Err(err).Msg(err.(*errors.Error).ErrorStack())
......@@ -422,7 +422,7 @@ func TestExpiry(t *testing.T) {
t.Error(err)
}
err = db.Update(kg, id, value, 10)
err = db.Update(kg, id, value, false, 10)
if err != nil {
log.Err(err).Msg(err.(*errors.Error).ErrorStack())
......@@ -662,7 +662,7 @@ func TestClose(t *testing.T) {
t.Error(err)
}
err = db.Update(kg, id, value, 0)
err = db.Update(kg, id, value, false, 0)
if err != nil {
log.Err(err).Msg(err.(*errors.Error).ErrorStack())
......
......@@ -412,7 +412,7 @@ func (s *Storage) IDs(kg string) ([]string, error) {
}
// Update updates the item with the specified id in the specified keygroup.
func (s *Storage) Update(kg, id, val string, expiry int) error {
func (s *Storage) Update(kg, id, val string, _ bool, expiry int) error {
key := makeKeyName(kg, id)
......
......@@ -8,7 +8,6 @@ import (
"github.com/go-errors/errors"
"github.com/rs/zerolog/log"
"go.etcd.io/etcd/api/v3/mvccpb"
"go.etcd.io/etcd/client/v3"
)
......@@ -20,10 +19,9 @@ func (n *NameService) getPrefix(prefix string) (kv map[string]string, err error)
// store prefix directly in cache
val, ok := n.local.Get(prefix)
log.Debug().Msgf("prefix: %s cache hit", prefix)
// found something!
if ok {
log.Debug().Msgf("prefix: %s cache hit", prefix)
return val.(map[string]string), nil
}
}
......@@ -58,11 +56,9 @@ func (n *NameService) getPrefix(prefix string) (kv map[string]string, err error)
for r := range c {
for _, ev := range r.Events {
if ev.IsModify() {
if ev.Type == mvccpb.DELETE {
n.local.Del(prefix)
log.Debug().Msgf("prefix: %s remote cache invalidation", prefix)
return
}
n.local.Del(prefix)
log.Debug().Msgf("prefix: %s remote cache invalidation", prefix)
return
}
}
}
......@@ -78,13 +74,11 @@ func (n *NameService) getExact(key string) (v string, err error) {
// let's check the local cache first
val, ok := n.local.Get(key)
log.Debug().Msgf("key: %s cache hit", key)
// found something!
if ok {
log.Debug().Msgf("key: %s cache hit", key)
return val.(string), nil
}
}
log.Debug().Msgf("key: %s cache miss", key)
......@@ -115,11 +109,10 @@ func (n *NameService) getExact(key string) (v string, err error) {
for r := range c {
for _, ev := range r.Events {
if ev.IsModify() {
if ev.Type == mvccpb.DELETE {
n.local.Del(key)
log.Debug().Msgf("key: %s remote cache invalidation", key)
return
}
n.local.Del(key)
log.Debug().Msgf("key: %s remote cache invalidation", key)
return
}
}
}
......@@ -160,7 +153,6 @@ func (n *NameService) put(key, value string) (err error) {
if n.cached {
n.local.Del(key)
log.Debug().Msgf("key: %s local cache invalidation", key)
}
_, err = n.cli.Put(ctx, key, value)
......
......@@ -155,7 +155,7 @@ func (h *exthandler) HandleAppend(user string, i Item) (Item, error) {
return i, errors.Errorf("error updating item")
}
if err := h.r.relayUpdate(result); err != nil {
if err := h.r.relayAppend(result); err != nil {
log.Err(err).Msg(err.(*errors.Error).ErrorStack())
return result, errors.Errorf("error updating item")
}
......@@ -200,7 +200,7 @@ func (h *exthandler) HandleUpdate(user string, i Item) error {
return errors.Errorf("error updating item")
}
if err := h.s.update(i, expiry); err != nil {
if err := h.s.update(i, false, expiry); err != nil {
log.Printf("%#v", err)
log.Err(err).Msg(err.(*errors.Error).ErrorStack())
return errors.Errorf("error updating item")
......
......@@ -31,6 +31,7 @@ type IntHandler interface {
HandleCreateKeygroup(k Keygroup) error
HandleDeleteKeygroup(k Keygroup) error
HandleUpdate(i Item) error
HandleAppend(i Item) error
HandleDelete(i Item) error
HandleAddReplica(k Keygroup, n Node) error
HandleRemoveReplica(k Keygroup, n Node) error
......@@ -105,7 +106,7 @@ func New(config *Config) (f Fred) {
log.Err(err).Msg("Was not able to get Items from node")
}
expiry, _ := config.NaSe.GetExpiry(item.Keygroup)
err = s.update(item, expiry)
err = s.update(item, false, expiry)
if err != nil {
log.Error().Msgf("Could not update missed item %s", item.ID)
}
......
......@@ -86,7 +86,29 @@ func (h *inthandler) HandleUpdate(i Item) error {
return err
}
if err := h.s.update(i, expiry); err != nil {
if err := h.s.update(i, false, expiry); err != nil {
log.Err(err).Msg(err.(*errors.Error).ErrorStack())
return errors.Errorf("error updating item")
}
if err := h.t.triggerUpdate(i); err != nil {
log.Err(err).Msg(err.(*errors.Error).ErrorStack())
return errors.Errorf("error updating item")
}
return nil
}
// HandleAppend handles requests to the Append endpoint of the internal interface.
func (h *inthandler) HandleAppend(i Item) error {
expiry, err := h.n.GetExpiry(i.Keygroup)
if err != nil {
return err
}
if err := h.s.update(i, true, expiry); err != nil {
log.Err(err).Msg(err.(*errors.Error).ErrorStack())
return errors.Errorf("error updating item")
}
......
......@@ -10,6 +10,7 @@ type Client interface {
SendCreateKeygroup(host string, kgname KeygroupName, expiry int) error
SendDeleteKeygroup(host string, kgname KeygroupName) error
SendUpdate(host string, kgname KeygroupName, id string, value string) error
SendAppend(host string, kgname KeygroupName, id string, value string) error
SendDelete(host string, kgname KeygroupName, id string) error
SendAddReplica(host string, kgname KeygroupName, node Node, expiry int) error
SendRemoveReplica(host string, kgname KeygroupName, node Node) error
......@@ -158,7 +159,7 @@ func (s *replicationService) relayUpdate(i Item) error {
// inform all other nodes about the update, get a list of all nodes subscribed to this keygroup
ids, err := s.n.GetKeygroupMembers(i.Keygroup, true)
if err != nil {
log.Err(err).Msg("Cannot delete keygroup because the nase threw an error")
log.Err(err).Msg("Cannot relayUpdate because the nase threw an error")
return err
}
......@@ -183,6 +184,48 @@ func (s *replicationService) relayUpdate(i Item) error {
return nil
}
// relayAppend handles replication after requests to the Append endpoint of the client interface.
// It sends the append to all other nodes by calling their Append method
func (s *replicationService) relayAppend(i Item) error {
log.Debug().Msgf("relayAppend from replservice: in %#v", i)
exists, err := s.n.ExistsKeygroup(i.Keygroup)
if err != nil {
return err
}
if !exists {
err = errors.Errorf("no such keygroup according to NaSe: %#v", i.Keygroup)
return err
}
// inform all other nodes about the append, get a list of all nodes subscribed to this keygroup
ids, err := s.n.GetKeygroupMembers(i.Keygroup, true)
if err != nil {
log.Err(err).Msg("Cannot relayAppend because the nase threw an error")
return err
}
for id := range ids {
addr, err := s.n.GetNodeAddress(id)
if err != nil {
log.Err(err).Msg("Cannot Get node address from NaSe")
return err
}
log.Debug().Msgf("relayAppend from replservice: sending %#v to %#v", i, addr)
if err := s.c.SendAppend(addr, i.Keygroup, i.ID, i.Val); err != nil {
err = s.reportNodeFail(id, i.Keygroup, i.ID)
if err != nil {
log.Debug().Msg(err.Error())
}
}
}
return nil
}
// relayDelete handles replication after requests to the Delete endpoint of the client interface.
func (s *replicationService) relayDelete(i Item) error {
log.Debug().Msgf("RelayDelete from replservice: in %#v", i)
......@@ -334,15 +377,31 @@ func (s *replicationService) addReplica(k Keygroup, n Node, relay bool) error {
}
}
mutable, err := s.n.IsMutable(k.Name)
if err != nil {
return err
}
log.Debug().Msgf("AddReplica from replservice: About to send %d Elements to new node", len(i))
for _, item := range i {
// iterate over all data for that keygroup and send it to the new node
// a batch might be better here
log.Debug().Msgf("AddReplica from replservice: sending %#v to %#v", item, n)
if err := s.c.SendUpdate(newNodeAddr, k.Name, item.ID, item.Val); err != nil {
err = s.reportNodeFail(n.ID, k.Name, item.ID)
if err != nil {
log.Debug().Msg(err.Error())
if mutable {
log.Debug().Msgf("AddReplica from replservice: sending %#v to %#v (update)", item, n)
if err := s.c.SendUpdate(newNodeAddr, k.Name, item.ID, item.Val); err != nil {
err = s.reportNodeFail(n.ID, k.Name, item.ID)
if err != nil {
log.Debug().Msg(err.Error())
}
}
} else {
log.Debug().Msgf("AddReplica from replservice: sending %#v to %#v (append)", item, n)
if err := s.c.SendAppend(newNodeAddr, k.Name, item.ID, item.Val); err != nil {
err = s.reportNodeFail(n.ID, k.Name, item.ID)
if err != nil {
log.Debug().Msg(err.Error())
}
}
}
}
......@@ -394,7 +453,7 @@ func (s *replicationService) removeReplica(k Keygroup, n Node, relay bool) error
return err
}
// Check if to-be-deleted node is in the keygorup
if _, ok := members[n.ID]; !ok{
if _, ok := members[n.ID]; !ok {
return errors.Errorf("Can not remove node from keygroup it is not a member of.")
}
// Check if the node is the last member of the keygroup (so the only one holding data)
......
......@@ -5,7 +5,7 @@ import "github.com/go-errors/errors"
// Store is an interface for the storage medium that the key-value val items are persisted on.
type Store interface {
// Needs: keygroup, id, val
Update(kg, id, val string, expiry int) error
Update(kg, id, val string, append bool, expiry int) error
// Needs: keygroup, id
Delete(kg, id string) error
// Needs: keygroup, val, Returns: key
......@@ -151,7 +151,7 @@ func (s *storeService) exists(i Item) bool {
}
// Update updates an item in the key-value store.
func (s *storeService) update(i Item, expiry int) error {
func (s *storeService) update(i Item, append bool, expiry int) error {
err := checkItem(i)
if err != nil {
......@@ -162,7 +162,7 @@ func (s *storeService) update(i Item, expiry int) error {
return errors.Errorf("no such keygroup in store: %#v", i.Keygroup)
}
err = s.iS.Update(string(i.Keygroup), i.ID, i.Val, expiry)
err = s.iS.Update(string(i.Keygroup), i.ID, i.Val, append, expiry)
if err != nil {
return err
......
......@@ -11,45 +11,31 @@ import (
)
// Client is an peering client to communicate with peers.
type Client struct{}
type Client struct {
conn map[string]peering.NodeClient
}
// NewClient creates a new empty client to communicate with peers.
func NewClient() *Client {
return &Client{}
return &Client{
conn: make(map[string]peering.NodeClient),
}
}
// createConnAndClient creates a new connection to a server.
// Maybe it could be useful to reuse these?
// IDK whether this would be faster to store them in a map
func (c *Client) getConnAndClient(host string) (client peering.NodeClient, conn *grpc.ClientConn) {
// getClient creates a new connection to a server or uses an existing one.
func (c *Client) getClient(host string) (peering.NodeClient, error) {
conn, err := grpc.Dial(host, grpc.WithInsecure())
if err != nil {
log.Fatal().Err(err).Msg("Cannot create Grpc connection")
return nil, nil
}
log.Debug().Msgf("Interclient: Created Connection to %s", host)
client = peering.NewNodeClient(conn)
return
}
// logs the response and returns the correct error message
func (c *Client) dealWithStatusResponse(res *peering.StatusResponse, err error, from string) error {
if res != nil {
log.Debug().Msgf("Interclient got Response from %s, Status %s with Message %s and Error %s", from, res.Status, res.ErrorMessage, err)
} else {
log.Debug().Msgf("Interclient got empty Response from %s", from)
if err != nil {
log.Error().Err(err).Msg("Cannot create Grpc connection")
return nil, errors.New(err)
}
if err != nil || res == nil {
return errors.New(err)
}
log.Debug().Msgf("Interclient: Created Connection to %s", host)
if res.Status == peering.EnumStatus_ERROR {
return errors.New(res.ErrorMessage)
}
return nil
c.conn[host] = peering.NewNodeClient(conn)
return c.conn[host], nil
}
// Destroy currently does nothing, but might delete open connections if they are implemented
......@@ -58,103 +44,151 @@ func (c *Client) Destroy() {
// SendCreateKeygroup sends this command to the server at this address
func (c *Client) SendCreateKeygroup(host string, kgname fred.KeygroupName, expiry int) error {
client, conn := c.getConnAndClient(host)
res, err := client.CreateKeygroup(context.Background(), &peering.CreateKeygroupRequest{Keygroup: string(kgname), Expiry: int64(expiry)})
err = c.dealWithStatusResponse(res, err, "CreateKeygroup")
client, err := c.getClient(host)
if err != nil {
return err
return errors.New(err)
}
return conn.Close()
_, err = client.CreateKeygroup(context.Background(), &peering.CreateKeygroupRequest{Keygroup: string(kgname), Expiry: int64(expiry)})
if err != nil {
return errors.New(err)
}
return nil
}