Commit 59aa6232 authored by pfandzelter's avatar pfandzelter
Browse files

fix nase caching

parent 8cf493e1
Pipeline #16388 passed with stages
in 17 minutes and 49 seconds
......@@ -9,17 +9,19 @@ import (
// RevokeUserPermissions removes user's permission to perform method on kg by deleting the key in etcd.
func (n *NameService) RevokeUserPermissions(user string, method fred.Method, kg fred.KeygroupName) error {
return n.delete(fmt.Sprintf(fmtUserPermissionString, user, string(kg), method))
prefix := fmt.Sprintf(fmtUserPermissionStringPrefix, user, string(kg))
return n.delete(prefix+string(method), prefix)
}
// AddUserPermissions adds user's permission to perform method on kg by adding the key to etcd.
func (n *NameService) AddUserPermissions(user string, method fred.Method, kg fred.KeygroupName) error {
return n.put(fmt.Sprintf(fmtUserPermissionString, user, string(kg), method), "ok")
prefix := fmt.Sprintf(fmtUserPermissionStringPrefix, user, string(kg))
return n.put(prefix+string(method), "ok", prefix)
}
// GetUserPermissions returns a set of all of the user's permissions on kg from etcd.
func (n *NameService) GetUserPermissions(user string, kg fred.KeygroupName) (map[fred.Method]struct{}, error) {
res, err := n.getPrefix(fmt.Sprintf(fmtUserPermissionString, user, string(kg), ""))
res, err := n.getPrefix(fmt.Sprintf(fmtUserPermissionStringPrefix, user, string(kg)))
if err != nil {
return nil, err
......
......@@ -12,9 +12,9 @@ import (
// ReportFailedNode saves that a node has missed an update to a keygroup that it will get another time
func (n *NameService) ReportFailedNode(nodeID fred.NodeID, kg fred.KeygroupName, id string) error {
log.Warn().Msgf("Nase: ReportFailedNode: Reporting that nodeId %#v has missed kg %#v id %s", nodeID, kg, id)
item := fmt.Sprintf(fmtFailedNodeKgString, nodeID, kg, id)
log.Debug().Msgf("NaSe.ReportFailedNode: Putting %s into NaSe", item)
err := n.put(item, "1")
prefix := fmt.Sprintf(fmtFailedNodeKgStringPrefix, nodeID, kg)
log.Debug().Msgf("NaSe.ReportFailedNode: Putting %s into NaSe", prefix)
err := n.put(prefix+id, "1", prefix)
if err != nil {
log.Err(err).Msgf("NaSe: ReportFailedNode: Node was not able to reach NaSe." +
......
......@@ -14,6 +14,12 @@ import (
// getPrefix gets every key that starts(!) with the specified string
// the keys are sorted ascending by key for easier debugging
func (n *NameService) getPrefix(prefix string) (kv map[string]string, err error) {
// the hard part of caching isn't storing a key-value pair locally
// it's actually knowing when to remove an entry from the cache because it's outdated
// sure, you can set a timeout or other eviction policy but that's more or less arbitrary
// we remove an item from the cache if we delete it from the nase ourselves or nase informs us about deletion via watchers
// we update an item if we update it ourselves or nase informs us about an update via watchers
// prefixes are the hardest part about this
if n.cached {
// let's check the local cache first
// store prefix directly in cache
......@@ -48,18 +54,22 @@ func (n *NameService) getPrefix(prefix string) (kv map[string]string, err error)
if n.cached {
n.local.Set(prefix, kv, 1)
// TODO: use prefix changes to change local cache
go func() {
watchCtx, watchCncl := context.WithCancel(context.Background())
c := n.watcher.Watch(watchCtx, prefix, clientv3.WithPrefix())
log.Debug().Msgf("nase cache: watching for changes to prefix %s", prefix)
defer watchCncl()
for r := range c {
for _, ev := range r.Events {
if ev.IsModify() {
n.local.Del(prefix)
log.Debug().Msgf("prefix: %s remote cache invalidation", prefix)
return
}
if err := r.Err(); err != nil {
log.Err(err).Msgf("nase cache: error getting changes to prefix %s", prefix)
}
log.Debug().Msgf("nase cache: got %d changes to prefix %s", len(r.Events), prefix)
if len(r.Events) != 0 {
n.local.Del(prefix)
log.Debug().Msgf("prefix: %s remote cache invalidation", prefix)
return
}
}
}()
......@@ -106,14 +116,16 @@ func (n *NameService) getExact(key string) (v string, err error) {
c := n.watcher.Watch(watchCtx, key)
defer watchCncl()
// TODO: use key changes to modify local cache directly
for r := range c {
for _, ev := range r.Events {
if ev.IsModify() {
n.local.Del(key)
log.Debug().Msgf("key: %s remote cache invalidation", key)
return
}
if err := r.Err(); err != nil {
log.Err(err).Msgf("nase cache: error getting changes to key %s", key)
}
log.Debug().Msgf("nase cache: got %d changes to mey %s", len(r.Events), key)
if len(r.Events) != 0 {
n.local.Del(key)
log.Debug().Msgf("key: %s remote cache invalidation", key)
return
}
}
}()
......@@ -136,7 +148,7 @@ func (n *NameService) getKeygroupMutable(kg string) (string, error) {
}
func (n *NameService) getKeygroupExpiry(kg string, id string) (int, error) {
resp, err := n.getExact(fmt.Sprintf(fmtKgExpiryString, kg, id))
resp, err := n.getExact(fmt.Sprintf(fmtKgExpiryStringPrefix, kg) + id)
if resp == "" {
return 0, err
}
......@@ -145,12 +157,16 @@ func (n *NameService) getKeygroupExpiry(kg string, id string) (int, error) {
}
// put puts the value into etcd.
func (n *NameService) put(key, value string) (err error) {
func (n *NameService) put(key, value string, prefix ...string) (err error) {
ctx, cncl := context.WithTimeout(context.TODO(), timeout)
defer cncl()
if n.cached {
for _, p := range prefix {
n.local.Del(p)
log.Debug().Msgf("prefix: %s local cache invalidation", p)
}
n.local.Del(key)
log.Debug().Msgf("key: %s local cache invalidation", key)
}
......@@ -165,12 +181,16 @@ func (n *NameService) put(key, value string) (err error) {
}
// delete removes the value from etcd.
func (n *NameService) delete(key string) (err error) {
func (n *NameService) delete(key string, prefix ...string) (err error) {
ctx, cncl := context.WithTimeout(context.TODO(), timeout)
defer cncl()
if n.cached {
for _, p := range prefix {
n.local.Del(p)
log.Debug().Msgf("prefix: %s local cache invalidation", p)
}
n.local.Del(key)
log.Debug().Msgf("key: %s local cache invalidation", key)
......@@ -187,13 +207,15 @@ func (n *NameService) delete(key string) (err error) {
// addOwnKgNodeEntry adds the entry for this node with a status.
func (n *NameService) addOwnKgNodeEntry(kg string, status string) error {
return n.put(n.fmtKgNode(kg), status)
prefix, id := n.fmtKgNode(kg)
return n.put(prefix+id, status, prefix)
}
// addOtherKgNodeEntry adds the entry for a remote node with a status.
func (n *NameService) addOtherKgNodeEntry(node string, kg string, status string) error {
key := fmt.Sprintf(fmtKgNodeString, kg, node)
return n.put(key, status)
prefix := fmt.Sprintf(fmtKgNodeStringPrefix, kg)
key := prefix + node
return n.put(key, status, prefix)
}
// addKgStatusEntry adds the entry for a (new!) keygroup with a status.
......@@ -216,13 +238,15 @@ func (n *NameService) addKgMutableEntry(kg string, mutable bool) error {
// addKgExpiryEntry adds the expiry entry for a keygroup with a status.
func (n *NameService) addKgExpiryEntry(kg string, id string, expiry int) error {
return n.put(fmt.Sprintf(fmtKgExpiryString, kg, id), strconv.Itoa(expiry))
prefix := fmt.Sprintf(fmtKgExpiryStringPrefix, kg)
return n.put(prefix+id, strconv.Itoa(expiry), prefix)
}
// fmtKgNode turns a keygroup name into the key that this node will save its state in
// Currently: kg|[keygroup]|node|[NodeID]
func (n *NameService) fmtKgNode(kg string) string {
return fmt.Sprintf(fmtKgNodeString, kg, n.NodeID)
func (n *NameService) fmtKgNode(kg string) (string, string) {
prefix := fmt.Sprintf(fmtKgNodeStringPrefix, kg)
return prefix, n.NodeID
}
func getNodeNameFromKgNodeString(kgNode string) string {
......
......@@ -104,7 +104,7 @@ func (n *NameService) CreateKeygroup(kg fred.KeygroupName, mutable bool, expiry
defer cncl()
_, err = n.cli.Delete(ctx, fmt.Sprintf(fmtKgNodeString, string(kg), ""), clientv3.WithPrefix())
_, err = n.cli.Delete(ctx, fmt.Sprintf(fmtKgNodeStringPrefix, string(kg)), clientv3.WithPrefix())
if err != nil {
return errors.New(err)
......@@ -115,7 +115,7 @@ func (n *NameService) CreateKeygroup(kg fred.KeygroupName, mutable bool, expiry
// GetKeygroupMembers returns all IDs of the Members of a Keygroup by iterating over all saved keys that start with the keygroup name.
// The value of the map is the expiry in seconds.
func (n *NameService) GetKeygroupMembers(kg fred.KeygroupName, excludeSelf bool) (ids map[fred.NodeID]int, err error) {
nodes, err := n.getPrefix(fmt.Sprintf(fmtKgNodeString, string(kg), ""))
nodes, err := n.getPrefix(fmt.Sprintf(fmtKgNodeStringPrefix, string(kg)))
if err != nil {
return nil, err
......
......@@ -13,18 +13,18 @@ import (
)
const (
fmtKgNodeString = "kg|%s|node|%s"
fmtKgStatusString = "kg|%s|status"
fmtKgMutableString = "kg|%s|mutable"
fmtKgExpiryString = "kg|%s|expiry|node|%s"
fmtNodeAdressString = "node|%s|address"
fmtNodeExternalAdressString = "node|%s|extaddress"
fmtUserPermissionString = "user|%s|kg|%s|method|%s"
fmtFailedNodeKgString = "failnode|%s|kg|%s|%s" // Node, Keygroup, ID
fmtFailedNodePrefix = "failnode|%s|"
nodePrefixString = "node|"
sep = "|"
timeout = 5 * time.Second
fmtKgNodeStringPrefix = "kg|%s|node|"
fmtKgStatusString = "kg|%s|status"
fmtKgMutableString = "kg|%s|mutable"
fmtKgExpiryStringPrefix = "kg|%s|expiry|node|"
fmtNodeAdressString = "node|%s|address"
fmtNodeExternalAdressString = "node|%s|extaddress"
fmtUserPermissionStringPrefix = "user|%s|kg|%s|method|"
fmtFailedNodeKgStringPrefix = "failnode|%s|kg|%s|" // Node, Keygroup, ID
fmtFailedNodePrefix = "failnode|%s|"
nodePrefixString = "node|"
sep = "|"
timeout = 5 * time.Second
)
// NameService is the interface to the etcd server that serves as NaSe
......
......@@ -121,10 +121,7 @@ func testPut(t *testing.T, user, kg, id, value string) {
Expiry: 0,
})
if err != nil {
log.Err(err).Msg(err.(*errors.Error).ErrorStack())
t.Error(err)
}
assert.NoError(t, err)
err = f.E.HandleUpdate(user, fred.Item{
Keygroup: fred.KeygroupName(kg),
......@@ -132,20 +129,14 @@ func testPut(t *testing.T, user, kg, id, value string) {
Val: value,
})
if err != nil {
log.Err(err).Msg(err.(*errors.Error).ErrorStack())
t.Error(err)
}
assert.NoError(t, err)
i, err := f.E.HandleRead(user, fred.Item{
Keygroup: fred.KeygroupName(kg),
ID: id,
})
if err != nil {
log.Err(err).Msg(err.(*errors.Error).ErrorStack())
t.Error(err)
}
assert.NoError(t, err)
assert.Equal(t, kg, string(i.Keygroup))
assert.Equal(t, id, i.ID)
......@@ -167,10 +158,7 @@ func testDelete(t *testing.T, user, kg, id, value string) {
Expiry: 0,
})
if err != nil {
log.Err(err).Msg(err.(*errors.Error).ErrorStack())
t.Error(err)
}
assert.NoError(t, err)
err = f.E.HandleUpdate(user, fred.Item{
Keygroup: fred.KeygroupName(kg),
......@@ -178,20 +166,14 @@ func testDelete(t *testing.T, user, kg, id, value string) {
Val: value,
})
if err != nil {
log.Err(err).Msg(err.(*errors.Error).ErrorStack())
t.Error(err)
}
assert.NoError(t, err)
err = f.E.HandleDelete(user, fred.Item{
Keygroup: fred.KeygroupName(kg),
ID: id,
})
if err != nil {
log.Err(err).Msg(err.(*errors.Error).ErrorStack())
t.Error(err)
}
assert.NoError(t, err)
_, err = f.E.HandleRead(user, fred.Item{
Keygroup: fred.KeygroupName(kg),
......@@ -216,10 +198,7 @@ func testMisformedKVInput(t *testing.T, user, kg, id, value string) {
Expiry: 0,
})
if err != nil {
log.Err(err).Msg(err.(*errors.Error).ErrorStack())
t.Error(err)
}
assert.NoError(t, err)
err = f.E.HandleUpdate(user, fred.Item{
Keygroup: fred.KeygroupName(kg),
......@@ -261,20 +240,14 @@ func testScan(t *testing.T, user string, kg string, mutable bool, updates int, s
Expiry: 0,
})
if err != nil {
log.Err(err).Msg(err.(*errors.Error).ErrorStack())
t.Error(err)
}
assert.NoError(t, err)
defer func() {
err := f.E.HandleDeleteKeygroup(user, fred.Keygroup{
Name: fred.KeygroupName(kg),
})
if err != nil {
log.Err(err).Msg(err.(*errors.Error).ErrorStack())
t.Error(err)
}
assert.NoError(t, err)
}()
// 2. put in a bunch of items
......@@ -290,20 +263,17 @@ func testScan(t *testing.T, user string, kg string, mutable bool, updates int, s
ID: ids[i],
Val: vals[i],
})
if err != nil {
log.Err(err).Msg(err.(*errors.Error).ErrorStack())
t.Error(err)
}
assert.NoError(t, err)
} else {
item, err := f.E.HandleAppend(user, fred.Item{
Keygroup: fred.KeygroupName(kg),
Val: vals[i],
})
if err != nil {
log.Err(err).Msg(err.(*errors.Error).ErrorStack())
t.Error(err)
if !assert.NoError(t, err) {
continue
}
ids[i] = item.ID
}
}
......@@ -337,10 +307,7 @@ func testScan(t *testing.T, user string, kg string, mutable bool, updates int, s
// if scanRange < updates, we expect exactly min(updates - scanStart, scanRange) items
// in this case we obviously want all the values to be correct as well!
if err != nil {
log.Err(err).Msg(err.(*errors.Error).ErrorStack())
t.Error(err)
}
assert.NoError(t, err)
expected := updates - scanStart
if scanRange < expected {
......@@ -377,6 +344,88 @@ func TestScan(t *testing.T) {
testScan(t, "user", "scankeygroup8", false, 10, 0, 11)
}
func TestPermissions(t *testing.T) {
user1 := "user1"
user2 := "user2"
kg := "permissiontest"
err := f.E.HandleCreateKeygroup(user1, fred.Keygroup{
Name: fred.KeygroupName(kg),
Mutable: true,
Expiry: 0,
})
assert.NoError(t, err)
err = f.E.HandleUpdate(user1, fred.Item{
Keygroup: fred.KeygroupName(kg),
ID: "id",
Val: "value",
})
assert.NoError(t, err)
err = f.E.HandleUpdate(user2, fred.Item{
Keygroup: fred.KeygroupName(kg),
ID: "id2",
Val: "value2",
})
assert.Error(t, err)
_, err = f.E.HandleRead(user2, fred.Item{
Keygroup: fred.KeygroupName(kg),
ID: "id",
})
assert.Error(t, err)
err = f.E.HandleAddUser(user1, user2, fred.Keygroup{Name: fred.KeygroupName(kg)}, fred.ConfigureKeygroups)
assert.NoError(t, err)
_, err = f.E.HandleRead(user2, fred.Item{
Keygroup: fred.KeygroupName(kg),
ID: "id",
})
assert.Error(t, err)
err = f.E.HandleAddUser(user1, user2, fred.Keygroup{Name: fred.KeygroupName(kg)}, fred.ReadKeygroup)
assert.NoError(t, err)
i, err := f.E.HandleRead(user2, fred.Item{
Keygroup: fred.KeygroupName(kg),
ID: "id",
})
assert.NoError(t, err)
assert.Equal(t, kg, string(i.Keygroup))
assert.Equal(t, "id", i.ID)
assert.Equal(t, "value", i.Val)
err = f.E.HandleUpdate(user2, fred.Item{
Keygroup: fred.KeygroupName(kg),
ID: "id2",
Val: "value2",
})
assert.Error(t, err)
err = f.E.HandleRemoveUser(user2, user1, fred.Keygroup{Name: fred.KeygroupName(kg)}, fred.ReadKeygroup)
assert.NoError(t, err)
i, err = f.E.HandleRead(user1, fred.Item{
Keygroup: fred.KeygroupName(kg),
ID: "id",
})
assert.Error(t, err)
}
func BenchmarkPut(b *testing.B) {
user := "user"
kg := "benchmarkPut"
......@@ -389,10 +438,7 @@ func BenchmarkPut(b *testing.B) {
Expiry: 0,
})
if err != nil {
log.Err(err).Msg(err.(*errors.Error).ErrorStack())
b.Error(err)
}
assert.NoError(b, err)
for i := 0; i < b.N; i++ {
err = f.E.HandleUpdate(user, fred.Item{
......@@ -401,20 +447,14 @@ func BenchmarkPut(b *testing.B) {
Val: value,
})
if err != nil {
log.Err(err).Msg(err.(*errors.Error).ErrorStack())
b.Error(err)
}
assert.NoError(b, err)
}
err = f.E.HandleDeleteKeygroup(user, fred.Keygroup{
Name: fred.KeygroupName(kg),
})
if err != nil {
log.Err(err).Msg(err.(*errors.Error).ErrorStack())
b.Error(err)
}
assert.NoError(b, err)
}
func BenchmarkGet(b *testing.B) {
......@@ -429,10 +469,7 @@ func BenchmarkGet(b *testing.B) {
Expiry: 0,
})
if err != nil {
log.Err(err).Msg(err.(*errors.Error).ErrorStack())
b.Error(err)
}
assert.NoError(b, err)
err = f.E.HandleUpdate(user, fred.Item{
Keygroup: fred.KeygroupName(kg),
......@@ -440,10 +477,7 @@ func BenchmarkGet(b *testing.B) {
Val: value,
})
if err != nil {
log.Err(err).Msg(err.(*errors.Error).ErrorStack())
b.Error(err)
}
assert.NoError(b, err)
for i := 0; i < b.N; i++ {
_, err := f.E.HandleRead(user, fred.Item{
......@@ -451,20 +485,14 @@ func BenchmarkGet(b *testing.B) {
ID: id,
})
if err != nil {
log.Err(err).Msg(err.(*errors.Error).ErrorStack())
b.Error(err)
}
assert.NoError(b, err)
}
err = f.E.HandleDeleteKeygroup(user, fred.Keygroup{
Name: fred.KeygroupName(kg),
})
if err != nil {
log.Err(err).Msg(err.(*errors.Error).ErrorStack())
b.Error(err)
}
assert.NoError(b, err)
}
func BenchmarkAppend(b *testing.B) {
......@@ -478,10 +506,7 @@ func BenchmarkAppend(b *testing.B) {
Expiry: 0,
})
if err != nil {
log.Err(err).Msg(err.(*errors.Error).ErrorStack())
b.Error(err)
}
assert.NoError(b, err)
for i := 0; i < b.N; i++ {
_, err = f.E.HandleAppend(user, fred.Item{
......@@ -489,18 +514,12 @@ func BenchmarkAppend(b *testing.B) {
Val: value,
})
if err != nil {
log.Err(err).Msg(err.(*errors.Error).ErrorStack())
b.Error(err)
}
assert.NoError(b, err)
}
err = f.E.HandleDeleteKeygroup(user, fred.Keygroup{
Name: fred.KeygroupName(kg),
})
if err != nil {
log.Err(err).Msg(err.(*errors.Error).ErrorStack())
b.Error(err)
}
assert.NoError(b, err)
}
......@@ -37,6 +37,8 @@ func (t *AuthenticationSuite) RunTests() {
logNodeAction(t.c.nodeB, "remove permission to read from keygroup -> should work")
t.c.nodeB.RemoveUser("littleclient", "rbactest", "ReadKeygroup", false)
// TODO: delay is caused by etcd taking some time to inform watcher to invalidate cache
// time.Sleep(100 * time.Second)
logNodeAction(t.c.nodeA, "try to read from keygroup with little client -> should not work")
if val := t.c.littleClient.GetItem("rbactest", "item1", true); val != "" {
logNodeFailure(t.c.nodeA, "", val)
......
......@@ -94,7 +94,7 @@ func (n *Node) CreateKeygroup(kgname string, mutable bool, expiry int, expectErr
}
if err == nil && expectError {
log.Warn().Msg("CreateKeygroup: Expected Error bot got no error :(")
log.Warn().Msg("CreateKeygroup: Expected Error but got no error :(")
n.Errors++
return
}
......@@ -117,7 +117,7 @@ func (n *Node) DeleteKeygroup(kgname string, expectError bool) {
}
if err == nil && expectError {
log.Warn().Msg("DeleteKeygroup: Expected Error bot got no error")
log.Warn().Msg("DeleteKeygroup: Expected Error but got no error")
n.Errors++
return
}
......@@ -139,7 +139,7 @@ func (n *Node) GetKeygroupReplica(kgname string, expectError bool) map[string]in
}
if err == nil && expectError {
log.Warn().Msg("GetKeygroupReplica: Expected Error bot got no error")
log.Warn().Msg("GetKeygroupReplica: Expected Error but got no error")
n.Errors++
}
......@@ -171,7 +171,7 @@ func (n *Node) AddKeygroupReplica(kgname, replicaNodeID string, expiry int, expe