diff --git a/pkg/etcdnase/etcdnase_test.go b/pkg/etcdnase/etcdnase_test.go index be97e885d859f7ea88a4fc80b62fc0ec243ee8af..0744ee317977adecd8dbed701bcb8fc932a886cf 100644 --- a/pkg/etcdnase/etcdnase_test.go +++ b/pkg/etcdnase/etcdnase_test.go @@ -1,8 +1,13 @@ +// +build !race + package etcdnase import ( + "fmt" + "math/rand" "net/url" "os" + "strconv" "testing" "time" @@ -33,8 +38,8 @@ func TestMain(m *testing.M) { }, ) - // zerolog.SetGlobalLevel(zerolog.FatalLevel) - zerolog.SetGlobalLevel(zerolog.DebugLevel) + zerolog.SetGlobalLevel(zerolog.FatalLevel) + // zerolog.SetGlobalLevel(zerolog.DebugLevel) fInfo, err := os.Stat(etcdDir) @@ -51,9 +56,12 @@ func TestMain(m *testing.M) { cfg := embed.NewConfig() cfg.Dir = etcdDir - u, _ := url.Parse("https://127.0.0.1:6000") - cfg.LCUrls = []url.URL{*u} - cfg.ACUrls = []url.URL{*u} + cURL, _ := url.Parse("https://127.0.0.1:6000") + pURL, _ := url.Parse("http://127.0.0.1:6001") + cfg.LCUrls = []url.URL{*cURL} + cfg.ACUrls = []url.URL{*cURL} + cfg.LPUrls = []url.URL{*pURL} + cfg.APUrls = []url.URL{*pURL} cfg.ForceNewCluster = true cfg.ClientTLSInfo = transport.TLSInfo{ @@ -65,6 +73,8 @@ func TestMain(m *testing.M) { cfg.LogLevel = "error" + cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name) + e, err := embed.StartEtcd(cfg) if err != nil { @@ -87,6 +97,8 @@ func TestMain(m *testing.M) { stat := m.Run() + e.Close() + fInfo, err = os.Stat(etcdDir) if err == nil { @@ -326,6 +338,89 @@ func TestCache(t *testing.T) { assert.Contains(t, perm, fred.RemoveUser) } +// GORACE="halt_on_error=1" go test -run=CacheRace -race +// currently this triggers a race condition, hence this suite is excluded from race testing +func TestCacheRace(t *testing.T) { + concurrent := 10 + ops := 100 + kg := fred.KeygroupName("kg-cache-race") + user := "user" + + err := n.CreateKeygroup(kg, true, 0) + + assert.NoError(t, err) + + nases := make([]*NameService, concurrent) + + for i := 0; i < concurrent; i++ { + id := "node" + strconv.Itoa(i) + x, err := NewNameService(id, []string{"127.0.0.1:6000"}, certBasePath+"nodeA.crt", certBasePath+"nodeA.key", certBasePath+"ca.crt", true) + assert.NoError(t, err) + + err = x.RegisterSelf(fmt.Sprintf("localhost:10%d01", i), fmt.Sprintf("localhost:10%d02", i)) + + assert.NoError(t, err) + + err = n.JoinNodeIntoKeygroup(kg, fred.NodeID(id), 0) + + assert.NoError(t, err) + + nases[i] = x + } + + done := make(chan struct{}) + + for i := 0; i < concurrent; i++ { + for j := 0; j < concurrent; j++ { + go func(nase *NameService) { + for k := 0; k < ops; k++ { + op := rand.Intn(concurrent) + p := []fred.Method{ + fred.CreateKeygroup, + fred.DeleteKeygroup, + fred.Read, + fred.Update, + fred.Delete, + fred.AddReplica, + fred.GetReplica, + fred.RemoveReplica, + fred.GetAllReplica, + fred.GetTrigger, + fred.AddTrigger, + fred.RemoveTrigger, + fred.AddUser, + fred.RemoveUser, + } + + switch op { + case 0: + m := p[rand.Intn(len(p))] + log.Debug().Msgf("%s adding %s", nase.NodeID, m) + err = nase.AddUserPermissions(user, m, kg) + assert.NoError(t, err) + + case 1: + m := p[rand.Intn(len(p))] + log.Debug().Msgf("%s removing %s", nase.NodeID, m) + err = nase.RevokeUserPermissions(user, m, kg) + assert.NoError(t, err) + + default: + log.Debug().Msgf("%s getting", nase.NodeID) + _, err = nase.GetUserPermissions(user, kg) + assert.NoError(t, err) + } + } + done <- struct{}{} + }(nases[i]) + } + } + + for i := 0; i < concurrent*concurrent; i++ { + <-done + } +} + func BenchmarkGet(b *testing.B) { key := "key-get" val := "val" diff --git a/pkg/etcdnase/helper.go b/pkg/etcdnase/helper.go index 441820dfd4884113cf89e19b53f11b6bbd0f0354..a4a2a47e035bb2a2afdb30efc68106a9c004f31c 100644 --- a/pkg/etcdnase/helper.go +++ b/pkg/etcdnase/helper.go @@ -23,12 +23,24 @@ func (n *NameService) getPrefix(prefix string) (kv map[string]string, err error) if n.cached { // let's check the local cache first // store prefix directly in cache + // currently this approach can lead to data races + // will be fixed in a future release val, ok := n.local.Get(prefix) // found something! if ok { log.Debug().Msgf("prefix: %s cache hit", prefix) - return val.(map[string]string), nil + + _, ok = val.(map[string]string) + if ok { + m := make(map[string]string) + + for k, v := range val.(map[string]string) { + m[k] = v + } + + return m, nil + } } log.Debug().Msgf("prefix: %s cache miss", prefix) @@ -66,33 +78,27 @@ func (n *NameService) getPrefix(prefix string) (kv map[string]string, err error) log.Err(err).Msgf("nase cache: error getting changes to prefix %s", prefix) } log.Debug().Msgf("nase cache: got %d changes to prefix %s", len(r.Events), prefix) - val, ok := n.local.Get(prefix) - - if !ok { - n.local.Del(prefix) - return - } - - prefixMap, ok := val.(map[string]string) - - if !ok { - n.local.Del(prefix) - return - } for _, ev := range r.Events { if ev.Type == clientv3.EventTypeDelete { - delete(prefixMap, string(ev.Kv.Key)) + delete(kv, string(ev.Kv.Key)) log.Debug().Msgf("prefix: %s remote cache invalidation for key %s", prefix, string(ev.Kv.Key)) } if ev.Type == clientv3.EventTypePut { - prefixMap[string(ev.Kv.Key)] = string(ev.Kv.Value) + kv[string(ev.Kv.Key)] = string(ev.Kv.Value) log.Debug().Msgf("prefix: %s remote cache update for key %s", prefix, string(ev.Kv.Key)) } } - n.local.Set(prefix, prefixMap, 1) + + prefixMap := make(map[string]string) + + for k, v := range kv { + prefixMap[k] = v + } + + n.local.Set(prefix, prefixMap, int64(len(prefixMap))) } }() } @@ -202,16 +208,22 @@ func (n *NameService) put(key, value string, prefix ...string) (err error) { continue } - prefixMap, ok := val.(map[string]string) + m, ok := val.(map[string]string) if !ok { n.local.Del(p) continue } + prefixMap := make(map[string]string) + + for k, v := range m { + prefixMap[k] = v + } + prefixMap[key] = value - n.local.Set(p, prefixMap, 1) + n.local.Set(p, prefixMap, int64(len(prefixMap))) log.Debug().Msgf("prefix: %s local cache update for key %s", p, key) } n.local.Set(key, value, 1) @@ -242,16 +254,22 @@ func (n *NameService) delete(key string, prefix ...string) (err error) { continue } - prefixMap, ok := val.(map[string]string) + m, ok := val.(map[string]string) if !ok { n.local.Del(p) continue } + prefixMap := make(map[string]string) + + for k, v := range m { + prefixMap[k] = v + } + delete(prefixMap, key) - n.local.Set(p, prefixMap, 1) + n.local.Set(p, prefixMap, int64(len(prefixMap))) log.Debug().Msgf("prefix: %s local cache invalidation for key %s", p, key) } diff --git a/pkg/fred/fred_test.go b/pkg/fred/fred_test.go index 0151018094a581fc269b3d91ab2c1d1ea04f4cca..b9128efef2eeb8ae8eb0abc35baaad7ee73d90f2 100644 --- a/pkg/fred/fred_test.go +++ b/pkg/fred/fred_test.go @@ -51,9 +51,12 @@ func TestMain(m *testing.M) { cfg := embed.NewConfig() cfg.Dir = etcdDir - u, _ := url.Parse("https://127.0.0.1:6000") - cfg.LCUrls = []url.URL{*u} - cfg.ACUrls = []url.URL{*u} + cURL, _ := url.Parse("https://127.0.0.1:7000") + pURL, _ := url.Parse("http://127.0.0.1:7001") + cfg.LCUrls = []url.URL{*cURL} + cfg.ACUrls = []url.URL{*cURL} + cfg.LPUrls = []url.URL{*pURL} + cfg.APUrls = []url.URL{*pURL} cfg.ForceNewCluster = true cfg.ClientTLSInfo = transport.TLSInfo{ @@ -65,6 +68,8 @@ func TestMain(m *testing.M) { cfg.LogLevel = "error" + cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name) + e, err := embed.StartEtcd(cfg) if err != nil { @@ -73,7 +78,7 @@ func TestMain(m *testing.M) { <-e.Server.ReadyNotify() - n, err := etcdnase.NewNameService(string(nodeID), []string{"127.0.0.1:6000"}, certBasePath+"nodeA.crt", certBasePath+"nodeA.key", certBasePath+"ca.crt", true) + n, err := etcdnase.NewNameService(string(nodeID), []string{"127.0.0.1:7000"}, certBasePath+"nodeA.crt", certBasePath+"nodeA.key", certBasePath+"ca.crt", true) if err != nil { panic(err) @@ -97,6 +102,8 @@ func TestMain(m *testing.M) { stat := m.Run() + e.Close() + fInfo, err = os.Stat(etcdDir) if err == nil {