Commit 6901875f authored by pfandzelter's avatar pfandzelter
Browse files

address a rare race condition in nase cache

parent 9b7362b5
// +build !race
package etcdnase
import (
"fmt"
"math/rand"
"net/url"
"os"
"strconv"
"testing"
"time"
......@@ -33,8 +38,8 @@ func TestMain(m *testing.M) {
},
)
// zerolog.SetGlobalLevel(zerolog.FatalLevel)
zerolog.SetGlobalLevel(zerolog.DebugLevel)
zerolog.SetGlobalLevel(zerolog.FatalLevel)
// zerolog.SetGlobalLevel(zerolog.DebugLevel)
fInfo, err := os.Stat(etcdDir)
......@@ -51,9 +56,12 @@ func TestMain(m *testing.M) {
cfg := embed.NewConfig()
cfg.Dir = etcdDir
u, _ := url.Parse("https://127.0.0.1:6000")
cfg.LCUrls = []url.URL{*u}
cfg.ACUrls = []url.URL{*u}
cURL, _ := url.Parse("https://127.0.0.1:6000")
pURL, _ := url.Parse("http://127.0.0.1:6001")
cfg.LCUrls = []url.URL{*cURL}
cfg.ACUrls = []url.URL{*cURL}
cfg.LPUrls = []url.URL{*pURL}
cfg.APUrls = []url.URL{*pURL}
cfg.ForceNewCluster = true
cfg.ClientTLSInfo = transport.TLSInfo{
......@@ -65,6 +73,8 @@ func TestMain(m *testing.M) {
cfg.LogLevel = "error"
cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name)
e, err := embed.StartEtcd(cfg)
if err != nil {
......@@ -87,6 +97,8 @@ func TestMain(m *testing.M) {
stat := m.Run()
e.Close()
fInfo, err = os.Stat(etcdDir)
if err == nil {
......@@ -326,6 +338,89 @@ func TestCache(t *testing.T) {
assert.Contains(t, perm, fred.RemoveUser)
}
// GORACE="halt_on_error=1" go test -run=CacheRace -race
// currently this triggers a race condition, hence this suite is excluded from race testing
func TestCacheRace(t *testing.T) {
concurrent := 10
ops := 100
kg := fred.KeygroupName("kg-cache-race")
user := "user"
err := n.CreateKeygroup(kg, true, 0)
assert.NoError(t, err)
nases := make([]*NameService, concurrent)
for i := 0; i < concurrent; i++ {
id := "node" + strconv.Itoa(i)
x, err := NewNameService(id, []string{"127.0.0.1:6000"}, certBasePath+"nodeA.crt", certBasePath+"nodeA.key", certBasePath+"ca.crt", true)
assert.NoError(t, err)
err = x.RegisterSelf(fmt.Sprintf("localhost:10%d01", i), fmt.Sprintf("localhost:10%d02", i))
assert.NoError(t, err)
err = n.JoinNodeIntoKeygroup(kg, fred.NodeID(id), 0)
assert.NoError(t, err)
nases[i] = x
}
done := make(chan struct{})
for i := 0; i < concurrent; i++ {
for j := 0; j < concurrent; j++ {
go func(nase *NameService) {
for k := 0; k < ops; k++ {
op := rand.Intn(concurrent)
p := []fred.Method{
fred.CreateKeygroup,
fred.DeleteKeygroup,
fred.Read,
fred.Update,
fred.Delete,
fred.AddReplica,
fred.GetReplica,
fred.RemoveReplica,
fred.GetAllReplica,
fred.GetTrigger,
fred.AddTrigger,
fred.RemoveTrigger,
fred.AddUser,
fred.RemoveUser,
}
switch op {
case 0:
m := p[rand.Intn(len(p))]
log.Debug().Msgf("%s adding %s", nase.NodeID, m)
err = nase.AddUserPermissions(user, m, kg)
assert.NoError(t, err)
case 1:
m := p[rand.Intn(len(p))]
log.Debug().Msgf("%s removing %s", nase.NodeID, m)
err = nase.RevokeUserPermissions(user, m, kg)
assert.NoError(t, err)
default:
log.Debug().Msgf("%s getting", nase.NodeID)
_, err = nase.GetUserPermissions(user, kg)
assert.NoError(t, err)
}
}
done <- struct{}{}
}(nases[i])
}
}
for i := 0; i < concurrent*concurrent; i++ {
<-done
}
}
func BenchmarkGet(b *testing.B) {
key := "key-get"
val := "val"
......
......@@ -23,12 +23,24 @@ func (n *NameService) getPrefix(prefix string) (kv map[string]string, err error)
if n.cached {
// let's check the local cache first
// store prefix directly in cache
// currently this approach can lead to data races
// will be fixed in a future release
val, ok := n.local.Get(prefix)
// found something!
if ok {
log.Debug().Msgf("prefix: %s cache hit", prefix)
return val.(map[string]string), nil
_, ok = val.(map[string]string)
if ok {
m := make(map[string]string)
for k, v := range val.(map[string]string) {
m[k] = v
}
return m, nil
}
}
log.Debug().Msgf("prefix: %s cache miss", prefix)
......@@ -66,33 +78,27 @@ func (n *NameService) getPrefix(prefix string) (kv map[string]string, err error)
log.Err(err).Msgf("nase cache: error getting changes to prefix %s", prefix)
}
log.Debug().Msgf("nase cache: got %d changes to prefix %s", len(r.Events), prefix)
val, ok := n.local.Get(prefix)
if !ok {
n.local.Del(prefix)
return
}
prefixMap, ok := val.(map[string]string)
if !ok {
n.local.Del(prefix)
return
}
for _, ev := range r.Events {
if ev.Type == clientv3.EventTypeDelete {
delete(prefixMap, string(ev.Kv.Key))
delete(kv, string(ev.Kv.Key))
log.Debug().Msgf("prefix: %s remote cache invalidation for key %s", prefix, string(ev.Kv.Key))
}
if ev.Type == clientv3.EventTypePut {
prefixMap[string(ev.Kv.Key)] = string(ev.Kv.Value)
kv[string(ev.Kv.Key)] = string(ev.Kv.Value)
log.Debug().Msgf("prefix: %s remote cache update for key %s", prefix, string(ev.Kv.Key))
}
}
n.local.Set(prefix, prefixMap, 1)
prefixMap := make(map[string]string)
for k, v := range kv {
prefixMap[k] = v
}
n.local.Set(prefix, prefixMap, int64(len(prefixMap)))
}
}()
}
......@@ -202,16 +208,22 @@ func (n *NameService) put(key, value string, prefix ...string) (err error) {
continue
}
prefixMap, ok := val.(map[string]string)
m, ok := val.(map[string]string)
if !ok {
n.local.Del(p)
continue
}
prefixMap := make(map[string]string)
for k, v := range m {
prefixMap[k] = v
}
prefixMap[key] = value
n.local.Set(p, prefixMap, 1)
n.local.Set(p, prefixMap, int64(len(prefixMap)))
log.Debug().Msgf("prefix: %s local cache update for key %s", p, key)
}
n.local.Set(key, value, 1)
......@@ -242,16 +254,22 @@ func (n *NameService) delete(key string, prefix ...string) (err error) {
continue
}
prefixMap, ok := val.(map[string]string)
m, ok := val.(map[string]string)
if !ok {
n.local.Del(p)
continue
}
prefixMap := make(map[string]string)
for k, v := range m {
prefixMap[k] = v
}
delete(prefixMap, key)
n.local.Set(p, prefixMap, 1)
n.local.Set(p, prefixMap, int64(len(prefixMap)))
log.Debug().Msgf("prefix: %s local cache invalidation for key %s", p, key)
}
......
......@@ -51,9 +51,12 @@ func TestMain(m *testing.M) {
cfg := embed.NewConfig()
cfg.Dir = etcdDir
u, _ := url.Parse("https://127.0.0.1:6000")
cfg.LCUrls = []url.URL{*u}
cfg.ACUrls = []url.URL{*u}
cURL, _ := url.Parse("https://127.0.0.1:7000")
pURL, _ := url.Parse("http://127.0.0.1:7001")
cfg.LCUrls = []url.URL{*cURL}
cfg.ACUrls = []url.URL{*cURL}
cfg.LPUrls = []url.URL{*pURL}
cfg.APUrls = []url.URL{*pURL}
cfg.ForceNewCluster = true
cfg.ClientTLSInfo = transport.TLSInfo{
......@@ -65,6 +68,8 @@ func TestMain(m *testing.M) {
cfg.LogLevel = "error"
cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name)
e, err := embed.StartEtcd(cfg)
if err != nil {
......@@ -73,7 +78,7 @@ func TestMain(m *testing.M) {
<-e.Server.ReadyNotify()
n, err := etcdnase.NewNameService(string(nodeID), []string{"127.0.0.1:6000"}, certBasePath+"nodeA.crt", certBasePath+"nodeA.key", certBasePath+"ca.crt", true)
n, err := etcdnase.NewNameService(string(nodeID), []string{"127.0.0.1:7000"}, certBasePath+"nodeA.crt", certBasePath+"nodeA.key", certBasePath+"ca.crt", true)
if err != nil {
panic(err)
......@@ -97,6 +102,8 @@ func TestMain(m *testing.M) {
stat := m.Run()
e.Close()
fInfo, err = os.Stat(etcdDir)
if err == nil {
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment