Commit 85514c73 authored by pfandzelter's avatar pfandzelter
Browse files

Merge branch 'tp/nase-cache-race' into 'main'

address a rare race condition in nase cache

See merge request !161
parents 9b7362b5 6901875f
Pipeline #39964 failed with stages
in 16 minutes and 12 seconds
// +build !race
package etcdnase package etcdnase
import ( import (
"fmt"
"math/rand"
"net/url" "net/url"
"os" "os"
"strconv"
"testing" "testing"
"time" "time"
...@@ -33,8 +38,8 @@ func TestMain(m *testing.M) { ...@@ -33,8 +38,8 @@ func TestMain(m *testing.M) {
}, },
) )
// zerolog.SetGlobalLevel(zerolog.FatalLevel) zerolog.SetGlobalLevel(zerolog.FatalLevel)
zerolog.SetGlobalLevel(zerolog.DebugLevel) // zerolog.SetGlobalLevel(zerolog.DebugLevel)
fInfo, err := os.Stat(etcdDir) fInfo, err := os.Stat(etcdDir)
...@@ -51,9 +56,12 @@ func TestMain(m *testing.M) { ...@@ -51,9 +56,12 @@ func TestMain(m *testing.M) {
cfg := embed.NewConfig() cfg := embed.NewConfig()
cfg.Dir = etcdDir cfg.Dir = etcdDir
u, _ := url.Parse("https://127.0.0.1:6000") cURL, _ := url.Parse("https://127.0.0.1:6000")
cfg.LCUrls = []url.URL{*u} pURL, _ := url.Parse("http://127.0.0.1:6001")
cfg.ACUrls = []url.URL{*u} cfg.LCUrls = []url.URL{*cURL}
cfg.ACUrls = []url.URL{*cURL}
cfg.LPUrls = []url.URL{*pURL}
cfg.APUrls = []url.URL{*pURL}
cfg.ForceNewCluster = true cfg.ForceNewCluster = true
cfg.ClientTLSInfo = transport.TLSInfo{ cfg.ClientTLSInfo = transport.TLSInfo{
...@@ -65,6 +73,8 @@ func TestMain(m *testing.M) { ...@@ -65,6 +73,8 @@ func TestMain(m *testing.M) {
cfg.LogLevel = "error" cfg.LogLevel = "error"
cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name)
e, err := embed.StartEtcd(cfg) e, err := embed.StartEtcd(cfg)
if err != nil { if err != nil {
...@@ -87,6 +97,8 @@ func TestMain(m *testing.M) { ...@@ -87,6 +97,8 @@ func TestMain(m *testing.M) {
stat := m.Run() stat := m.Run()
e.Close()
fInfo, err = os.Stat(etcdDir) fInfo, err = os.Stat(etcdDir)
if err == nil { if err == nil {
...@@ -326,6 +338,89 @@ func TestCache(t *testing.T) { ...@@ -326,6 +338,89 @@ func TestCache(t *testing.T) {
assert.Contains(t, perm, fred.RemoveUser) assert.Contains(t, perm, fred.RemoveUser)
} }
// GORACE="halt_on_error=1" go test -run=CacheRace -race
// currently this triggers a race condition, hence this suite is excluded from race testing
func TestCacheRace(t *testing.T) {
concurrent := 10
ops := 100
kg := fred.KeygroupName("kg-cache-race")
user := "user"
err := n.CreateKeygroup(kg, true, 0)
assert.NoError(t, err)
nases := make([]*NameService, concurrent)
for i := 0; i < concurrent; i++ {
id := "node" + strconv.Itoa(i)
x, err := NewNameService(id, []string{"127.0.0.1:6000"}, certBasePath+"nodeA.crt", certBasePath+"nodeA.key", certBasePath+"ca.crt", true)
assert.NoError(t, err)
err = x.RegisterSelf(fmt.Sprintf("localhost:10%d01", i), fmt.Sprintf("localhost:10%d02", i))
assert.NoError(t, err)
err = n.JoinNodeIntoKeygroup(kg, fred.NodeID(id), 0)
assert.NoError(t, err)
nases[i] = x
}
done := make(chan struct{})
for i := 0; i < concurrent; i++ {
for j := 0; j < concurrent; j++ {
go func(nase *NameService) {
for k := 0; k < ops; k++ {
op := rand.Intn(concurrent)
p := []fred.Method{
fred.CreateKeygroup,
fred.DeleteKeygroup,
fred.Read,
fred.Update,
fred.Delete,
fred.AddReplica,
fred.GetReplica,
fred.RemoveReplica,
fred.GetAllReplica,
fred.GetTrigger,
fred.AddTrigger,
fred.RemoveTrigger,
fred.AddUser,
fred.RemoveUser,
}
switch op {
case 0:
m := p[rand.Intn(len(p))]
log.Debug().Msgf("%s adding %s", nase.NodeID, m)
err = nase.AddUserPermissions(user, m, kg)
assert.NoError(t, err)
case 1:
m := p[rand.Intn(len(p))]
log.Debug().Msgf("%s removing %s", nase.NodeID, m)
err = nase.RevokeUserPermissions(user, m, kg)
assert.NoError(t, err)
default:
log.Debug().Msgf("%s getting", nase.NodeID)
_, err = nase.GetUserPermissions(user, kg)
assert.NoError(t, err)
}
}
done <- struct{}{}
}(nases[i])
}
}
for i := 0; i < concurrent*concurrent; i++ {
<-done
}
}
func BenchmarkGet(b *testing.B) { func BenchmarkGet(b *testing.B) {
key := "key-get" key := "key-get"
val := "val" val := "val"
......
...@@ -23,12 +23,24 @@ func (n *NameService) getPrefix(prefix string) (kv map[string]string, err error) ...@@ -23,12 +23,24 @@ func (n *NameService) getPrefix(prefix string) (kv map[string]string, err error)
if n.cached { if n.cached {
// let's check the local cache first // let's check the local cache first
// store prefix directly in cache // store prefix directly in cache
// currently this approach can lead to data races
// will be fixed in a future release
val, ok := n.local.Get(prefix) val, ok := n.local.Get(prefix)
// found something! // found something!
if ok { if ok {
log.Debug().Msgf("prefix: %s cache hit", prefix) log.Debug().Msgf("prefix: %s cache hit", prefix)
return val.(map[string]string), nil
_, ok = val.(map[string]string)
if ok {
m := make(map[string]string)
for k, v := range val.(map[string]string) {
m[k] = v
}
return m, nil
}
} }
log.Debug().Msgf("prefix: %s cache miss", prefix) log.Debug().Msgf("prefix: %s cache miss", prefix)
...@@ -66,33 +78,27 @@ func (n *NameService) getPrefix(prefix string) (kv map[string]string, err error) ...@@ -66,33 +78,27 @@ func (n *NameService) getPrefix(prefix string) (kv map[string]string, err error)
log.Err(err).Msgf("nase cache: error getting changes to prefix %s", prefix) log.Err(err).Msgf("nase cache: error getting changes to prefix %s", prefix)
} }
log.Debug().Msgf("nase cache: got %d changes to prefix %s", len(r.Events), prefix) log.Debug().Msgf("nase cache: got %d changes to prefix %s", len(r.Events), prefix)
val, ok := n.local.Get(prefix)
if !ok {
n.local.Del(prefix)
return
}
prefixMap, ok := val.(map[string]string)
if !ok {
n.local.Del(prefix)
return
}
for _, ev := range r.Events { for _, ev := range r.Events {
if ev.Type == clientv3.EventTypeDelete { if ev.Type == clientv3.EventTypeDelete {
delete(prefixMap, string(ev.Kv.Key)) delete(kv, string(ev.Kv.Key))
log.Debug().Msgf("prefix: %s remote cache invalidation for key %s", prefix, string(ev.Kv.Key)) log.Debug().Msgf("prefix: %s remote cache invalidation for key %s", prefix, string(ev.Kv.Key))
} }
if ev.Type == clientv3.EventTypePut { if ev.Type == clientv3.EventTypePut {
prefixMap[string(ev.Kv.Key)] = string(ev.Kv.Value) kv[string(ev.Kv.Key)] = string(ev.Kv.Value)
log.Debug().Msgf("prefix: %s remote cache update for key %s", prefix, string(ev.Kv.Key)) log.Debug().Msgf("prefix: %s remote cache update for key %s", prefix, string(ev.Kv.Key))
} }
} }
n.local.Set(prefix, prefixMap, 1)
prefixMap := make(map[string]string)
for k, v := range kv {
prefixMap[k] = v
}
n.local.Set(prefix, prefixMap, int64(len(prefixMap)))
} }
}() }()
} }
...@@ -202,16 +208,22 @@ func (n *NameService) put(key, value string, prefix ...string) (err error) { ...@@ -202,16 +208,22 @@ func (n *NameService) put(key, value string, prefix ...string) (err error) {
continue continue
} }
prefixMap, ok := val.(map[string]string) m, ok := val.(map[string]string)
if !ok { if !ok {
n.local.Del(p) n.local.Del(p)
continue continue
} }
prefixMap := make(map[string]string)
for k, v := range m {
prefixMap[k] = v
}
prefixMap[key] = value prefixMap[key] = value
n.local.Set(p, prefixMap, 1) n.local.Set(p, prefixMap, int64(len(prefixMap)))
log.Debug().Msgf("prefix: %s local cache update for key %s", p, key) log.Debug().Msgf("prefix: %s local cache update for key %s", p, key)
} }
n.local.Set(key, value, 1) n.local.Set(key, value, 1)
...@@ -242,16 +254,22 @@ func (n *NameService) delete(key string, prefix ...string) (err error) { ...@@ -242,16 +254,22 @@ func (n *NameService) delete(key string, prefix ...string) (err error) {
continue continue
} }
prefixMap, ok := val.(map[string]string) m, ok := val.(map[string]string)
if !ok { if !ok {
n.local.Del(p) n.local.Del(p)
continue continue
} }
prefixMap := make(map[string]string)
for k, v := range m {
prefixMap[k] = v
}
delete(prefixMap, key) delete(prefixMap, key)
n.local.Set(p, prefixMap, 1) n.local.Set(p, prefixMap, int64(len(prefixMap)))
log.Debug().Msgf("prefix: %s local cache invalidation for key %s", p, key) log.Debug().Msgf("prefix: %s local cache invalidation for key %s", p, key)
} }
......
...@@ -51,9 +51,12 @@ func TestMain(m *testing.M) { ...@@ -51,9 +51,12 @@ func TestMain(m *testing.M) {
cfg := embed.NewConfig() cfg := embed.NewConfig()
cfg.Dir = etcdDir cfg.Dir = etcdDir
u, _ := url.Parse("https://127.0.0.1:6000") cURL, _ := url.Parse("https://127.0.0.1:7000")
cfg.LCUrls = []url.URL{*u} pURL, _ := url.Parse("http://127.0.0.1:7001")
cfg.ACUrls = []url.URL{*u} cfg.LCUrls = []url.URL{*cURL}
cfg.ACUrls = []url.URL{*cURL}
cfg.LPUrls = []url.URL{*pURL}
cfg.APUrls = []url.URL{*pURL}
cfg.ForceNewCluster = true cfg.ForceNewCluster = true
cfg.ClientTLSInfo = transport.TLSInfo{ cfg.ClientTLSInfo = transport.TLSInfo{
...@@ -65,6 +68,8 @@ func TestMain(m *testing.M) { ...@@ -65,6 +68,8 @@ func TestMain(m *testing.M) {
cfg.LogLevel = "error" cfg.LogLevel = "error"
cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name)
e, err := embed.StartEtcd(cfg) e, err := embed.StartEtcd(cfg)
if err != nil { if err != nil {
...@@ -73,7 +78,7 @@ func TestMain(m *testing.M) { ...@@ -73,7 +78,7 @@ func TestMain(m *testing.M) {
<-e.Server.ReadyNotify() <-e.Server.ReadyNotify()
n, err := etcdnase.NewNameService(string(nodeID), []string{"127.0.0.1:6000"}, certBasePath+"nodeA.crt", certBasePath+"nodeA.key", certBasePath+"ca.crt", true) n, err := etcdnase.NewNameService(string(nodeID), []string{"127.0.0.1:7000"}, certBasePath+"nodeA.crt", certBasePath+"nodeA.key", certBasePath+"ca.crt", true)
if err != nil { if err != nil {
panic(err) panic(err)
...@@ -97,6 +102,8 @@ func TestMain(m *testing.M) { ...@@ -97,6 +102,8 @@ func TestMain(m *testing.M) {
stat := m.Run() stat := m.Run()
e.Close()
fInfo, err = os.Stat(etcdDir) fInfo, err = os.Stat(etcdDir)
if err == nil { if err == nil {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment