Commit 26fc0eb0 authored by pfandzelter's avatar pfandzelter
Browse files

check keygroup before adding replica

parent 7509cadc
Pipeline #67551 passed with stages
in 16 minutes and 40 seconds
......@@ -224,7 +224,7 @@ func (s *replicationService) relayAppend(i Item) error {
return nil
}
// addReplica handles replication after requests to the AddReplica endpoint. It relays this command if "relay" is set to "true".
// addReplica handles replication after requests to the AddReplica endpoint.
func (s *replicationService) addReplica(k Keygroup, n Node) error {
log.Debug().Msgf("AddReplica from replservice: in kg=%+v no=%+v", k, n)
......@@ -251,6 +251,12 @@ func (s *replicationService) addReplica(k Keygroup, n Node) error {
return err
}
// if we are not the new node and we are also not a replica for that keygroup, we have nothing to do with that
// request: abort
if n.ID != s.n.GetNodeID() && !s.s.existsKeygroup(k.Name) {
return errors.Errorf("keygroup %+v is unknown and we are not the new node %+v", k, n)
}
// Write the news into the NaSe first
err = s.n.JoinNodeIntoKeygroup(k.Name, n.ID, k.Expiry)
......@@ -267,6 +273,11 @@ func (s *replicationService) addReplica(k Keygroup, n Node) error {
}
// send all existing data to the new node
// there are three basic possibilities:
// we are a replica for this keygroup and we need to send someone else the data
// we are not a replica for this keygroup and we need to get the data from somewhere
// we are not a replica but we are also not the new replica
// in the last case we just have nothing to do with this and error out
// TODO so this one array contains all data items? Maybe not a good idea if there is a lot of data to be sent
var i []Item
......
......@@ -238,9 +238,18 @@ func (s *storeService) readAll(kg KeygroupName) ([]Item, error) {
// exists checks if an item exists in the key-value store.
func (s *storeService) exists(i Item) bool {
if !s.iS.ExistsKeygroup(string(i.Keygroup)) {
return false
}
return s.iS.Exists(string(i.Keygroup), i.ID)
}
// existsKeygroup checks if a keygroup exists locally.
func (s *storeService) existsKeygroup(name KeygroupName) bool {
return s.iS.ExistsKeygroup(string(name))
}
// Update updates an item in the key-value store.
func (s *storeService) update(i Item, expiry int) (vclock.VClock, error) {
// no version given means:
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment