Commit 2f20a356 authored by pfandzelter's avatar pfandzelter
Browse files

improve nase cache

parent c3c2b5c2
Pipeline #39398 passed with stages
in 16 minutes and 9 seconds
......@@ -4,6 +4,7 @@ import (
"net/url"
"os"
"testing"
"time"
"git.tu-berlin.de/mcc-fred/fred/pkg/fred"
"github.com/go-errors/errors"
......@@ -32,7 +33,8 @@ func TestMain(m *testing.M) {
},
)
zerolog.SetGlobalLevel(zerolog.ErrorLevel)
// zerolog.SetGlobalLevel(zerolog.FatalLevel)
zerolog.SetGlobalLevel(zerolog.DebugLevel)
fInfo, err := os.Stat(etcdDir)
......@@ -315,6 +317,8 @@ func TestCache(t *testing.T) {
assert.NoError(t, err)
time.Sleep(1 * time.Millisecond)
perm, err = n.GetUserPermissions(user, kg)
assert.NoError(t, err)
......@@ -323,7 +327,7 @@ func TestCache(t *testing.T) {
}
func BenchmarkGet(b *testing.B) {
key := "key"
key := "key-get"
val := "val"
err := n.put(key, val)
assert.NoError(b, err)
......@@ -333,17 +337,18 @@ func BenchmarkGet(b *testing.B) {
}
func BenchmarkGetPrefix(b *testing.B) {
key := "key"
key := "key-get-nocache|prefix"
prefix := "key-get-nocache|"
val := "val"
err := n.put(key, val)
err := n.put(key, val, prefix)
assert.NoError(b, err)
for i := 0; i < b.N; i++ {
_, _ = n.getPrefix(key)
_, _ = n.getPrefix(prefix)
}
}
func BenchmarkPut(b *testing.B) {
key := "key"
key := "key-put"
val := "val"
for i := 0; i < b.N; i++ {
_ = n.put(key, val)
......@@ -355,7 +360,7 @@ func BenchmarkGetNoCache(b *testing.B) {
assert.NoError(b, err)
b.ResetTimer()
key := "key"
key := "key-get-nocache"
val := "val"
err = n.put(key, val)
assert.NoError(b, err)
......@@ -369,12 +374,13 @@ func BenchmarkGetPrefixNoCache(b *testing.B) {
assert.NoError(b, err)
b.ResetTimer()
key := "key"
key := "key-get-nocache|prefix"
prefix := "key-get-nocache|"
val := "val"
err = n.put(key, val)
err = n.put(key, val, prefix)
assert.NoError(b, err)
for i := 0; i < b.N; i++ {
_, _ = n.getPrefix(key)
_, _ = n.getPrefix(prefix)
}
}
......@@ -383,7 +389,7 @@ func BenchmarkPutNoCache(b *testing.B) {
assert.NoError(b, err)
b.ResetTimer()
key := "key"
key := "key-put-nocache"
val := "val"
for i := 0; i < b.N; i++ {
_ = n.put(key, val)
......
......@@ -30,9 +30,9 @@ func (n *NameService) getPrefix(prefix string) (kv map[string]string, err error)
log.Debug().Msgf("prefix: %s cache hit", prefix)
return val.(map[string]string), nil
}
}
log.Debug().Msgf("prefix: %s cache miss", prefix)
log.Debug().Msgf("prefix: %s cache miss", prefix)
}
// didn't find anything? ask nameservice, cache, and be sure to invalidate on change
ctx, cncl := context.WithTimeout(context.Background(), timeout)
......@@ -54,7 +54,7 @@ func (n *NameService) getPrefix(prefix string) (kv map[string]string, err error)
if n.cached {
n.local.Set(prefix, kv, 1)
// TODO: use prefix changes to change local cache
// we start the watcher directly
go func() {
watchCtx, watchCncl := context.WithCancel(context.Background())
c := n.watcher.Watch(watchCtx, prefix, clientv3.WithPrefix())
......@@ -66,11 +66,33 @@ func (n *NameService) getPrefix(prefix string) (kv map[string]string, err error)
log.Err(err).Msgf("nase cache: error getting changes to prefix %s", prefix)
}
log.Debug().Msgf("nase cache: got %d changes to prefix %s", len(r.Events), prefix)
if len(r.Events) != 0 {
val, ok := n.local.Get(prefix)
if !ok {
n.local.Del(prefix)
log.Debug().Msgf("prefix: %s remote cache invalidation", prefix)
return
}
prefixMap, ok := val.(map[string]string)
if !ok {
n.local.Del(prefix)
return
}
for _, ev := range r.Events {
if ev.Type == clientv3.EventTypeDelete {
delete(prefixMap, string(ev.Kv.Key))
log.Debug().Msgf("prefix: %s remote cache invalidation for key %s", prefix, string(ev.Kv.Key))
}
if ev.Type == clientv3.EventTypePut {
prefixMap[string(ev.Kv.Key)] = string(ev.Kv.Value)
log.Debug().Msgf("prefix: %s remote cache update for key %s", prefix, string(ev.Kv.Key))
}
}
n.local.Set(prefix, prefixMap, 1)
}
}()
}
......@@ -89,9 +111,9 @@ func (n *NameService) getExact(key string) (v string, err error) {
log.Debug().Msgf("key: %s cache hit", key)
return val.(string), nil
}
}
log.Debug().Msgf("key: %s cache miss", key)
log.Debug().Msgf("key: %s cache miss", key)
}
// didn't find anything? ask nameservice, cache, and be sure to invalidate on change
ctx, cncl := context.WithTimeout(context.Background(), timeout)
......@@ -111,21 +133,30 @@ func (n *NameService) getExact(key string) (v string, err error) {
if n.cached {
n.local.Set(key, v, 1)
// we can always assume that no watcher exists for this key because otherwise we would have had a cache hit
// so we can safely start a new watcher
// that watcher should exit on key deletion, though!
go func() {
watchCtx, watchCncl := context.WithCancel(context.Background())
c := n.watcher.Watch(watchCtx, key)
defer watchCncl()
// TODO: use key changes to modify local cache directly
for r := range c {
if err := r.Err(); err != nil {
log.Err(err).Msgf("nase cache: error getting changes to key %s", key)
}
log.Debug().Msgf("nase cache: got %d changes to mey %s", len(r.Events), key)
if len(r.Events) != 0 {
n.local.Del(key)
log.Debug().Msgf("key: %s remote cache invalidation", key)
return
log.Debug().Msgf("nase cache: got %d changes to key %s", len(r.Events), key)
for _, ev := range r.Events {
if ev.Type == clientv3.EventTypeDelete {
n.local.Del(key)
log.Debug().Msgf("key: %s remote cache invalidation", key)
return
}
if ev.Type == clientv3.EventTypePut {
n.local.Set(key, string(ev.Kv.Value), 1)
log.Debug().Msgf("key: %s remote cache update", key)
}
}
}
}()
......@@ -164,11 +195,27 @@ func (n *NameService) put(key, value string, prefix ...string) (err error) {
if n.cached {
for _, p := range prefix {
n.local.Del(p)
log.Debug().Msgf("prefix: %s local cache invalidation", p)
val, ok := n.local.Get(p)
if !ok {
n.local.Del(p)
continue
}
prefixMap, ok := val.(map[string]string)
if !ok {
n.local.Del(p)
continue
}
prefixMap[key] = value
n.local.Set(p, prefixMap, 1)
log.Debug().Msgf("prefix: %s local cache update for key %s", p, key)
}
n.local.Del(key)
log.Debug().Msgf("key: %s local cache invalidation", key)
n.local.Set(key, value, 1)
log.Debug().Msgf("key: %s local cache update", key)
}
_, err = n.cli.Put(ctx, key, value)
......@@ -188,12 +235,28 @@ func (n *NameService) delete(key string, prefix ...string) (err error) {
if n.cached {
for _, p := range prefix {
n.local.Del(p)
log.Debug().Msgf("prefix: %s local cache invalidation", p)
val, ok := n.local.Get(p)
if !ok {
n.local.Del(p)
continue
}
prefixMap, ok := val.(map[string]string)
if !ok {
n.local.Del(p)
continue
}
delete(prefixMap, key)
n.local.Set(p, prefixMap, 1)
log.Debug().Msgf("prefix: %s local cache invalidation for key %s", p, key)
}
n.local.Del(key)
log.Debug().Msgf("key: %s local cache invalidation", key)
}
_, err = n.cli.Delete(ctx, key)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment