Commit 0a7fb8bd authored by pfandzelter's avatar pfandzelter
Browse files

add version checking to alexandra

parent 669abd24
Pipeline #41968 passed with stages
in 14 minutes and 39 seconds
...@@ -104,7 +104,7 @@ func (m *ClientsMgr) readFromAnywhere(ctx context.Context, request *alexandraPro ...@@ -104,7 +104,7 @@ func (m *ClientsMgr) readFromAnywhere(ctx context.Context, request *alexandraPro
} }
for i := range res.Data { for i := range res.Data {
r.vals[i] = res.Data[i].Val r.vals[i] = res.Data[i].Val
r.versions[i] = res.Data[i].Version r.versions[i] = res.Data[i].Version.Version
} }
responses <- r responses <- r
} }
...@@ -179,7 +179,7 @@ func (m *ClientsMgr) readFromAnywhere(ctx context.Context, request *alexandraPro ...@@ -179,7 +179,7 @@ func (m *ClientsMgr) readFromAnywhere(ctx context.Context, request *alexandraPro
for i := range result.Data { for i := range result.Data {
vals[i] = result.Data[i].Val vals[i] = result.Data[i].Val
versions[i] = result.Data[i].Version versions[i] = result.Data[i].Version.Version
} }
return vals, versions, nil return vals, versions, nil
......
...@@ -2,6 +2,7 @@ package alexandra ...@@ -2,6 +2,7 @@ package alexandra
import ( import (
"context" "context"
"fmt"
api "git.tu-berlin.de/mcc-fred/fred/proto/client" api "git.tu-berlin.de/mcc-fred/fred/proto/client"
alexandraProto "git.tu-berlin.de/mcc-fred/fred/proto/middleware" alexandraProto "git.tu-berlin.de/mcc-fred/fred/proto/middleware"
...@@ -31,14 +32,15 @@ func (s *Server) Scan(ctx context.Context, req *alexandraProto.ScanRequest) (*al ...@@ -31,14 +32,15 @@ func (s *Server) Scan(ctx context.Context, req *alexandraProto.ScanRequest) (*al
Data: datum.Val, Data: datum.Val,
} }
err = s.cache.add(req.Keygroup, req.Id, datum.Version) err = s.cache.add(req.Keygroup, req.Id, datum.Version.Version)
} }
return &alexandraProto.ScanResponse{Data: data}, err return &alexandraProto.ScanResponse{Data: data}, err
} }
// Read reads a datum from FReD. Read data are placed in cache (if not in there already). If multiple versions of a // Read reads a datum from FReD. Read data are placed in cache (if not in there already). If multiple versions of a
// datum exist, all versions will be returned to the client so that it can choose one. // datum exist, all versions will be returned to the client so that it can choose one. If the read data is outdated
// compared to seen versions, an error is returned.
func (s *Server) Read(ctx context.Context, req *alexandraProto.ReadRequest) (*alexandraProto.ReadResponse, error) { func (s *Server) Read(ctx context.Context, req *alexandraProto.ReadRequest) (*alexandraProto.ReadResponse, error) {
log.Debug().Msgf("Alexandra has rcvd Read") log.Debug().Msgf("Alexandra has rcvd Read")
...@@ -48,6 +50,20 @@ func (s *Server) Read(ctx context.Context, req *alexandraProto.ReadRequest) (*al ...@@ -48,6 +50,20 @@ func (s *Server) Read(ctx context.Context, req *alexandraProto.ReadRequest) (*al
return nil, err return nil, err
} }
known, err := s.cache.get(req.Keygroup, req.Id)
if err != nil {
return nil, err
}
for _, read := range versions {
for _, seen := range known {
if seen.Compare(read, vclock.Ancestor) {
return nil, fmt.Errorf("read version %v is older than seen version %v", read, seen)
}
}
}
for i := range versions { for i := range versions {
err = s.cache.add(req.Keygroup, req.Id, versions[i]) err = s.cache.add(req.Keygroup, req.Id, versions[i])
if err != nil { if err != nil {
......
...@@ -270,12 +270,12 @@ func (s *Server) Read(ctx context.Context, request *client.ReadRequest) (*client ...@@ -270,12 +270,12 @@ func (s *Server) Read(ctx context.Context, request *client.ReadRequest) (*client
return nil, err return nil, err
} }
var version vclock.VClock versions := make([]vclock.VClock, len(request.Versions))
if request.Version != nil { for i, v := range request.Versions {
version = request.Version.Version versions[i] = v.Version
} }
res, err := s.e.HandleRead(user, fred.Item{Keygroup: fred.KeygroupName(request.Keygroup), ID: request.Id}, version) res, err := s.e.HandleRead(user, fred.Item{Keygroup: fred.KeygroupName(request.Keygroup), ID: request.Id}, versions)
if err != nil { if err != nil {
log.Debug().Msgf("API Server is returning error: %#v", err) log.Debug().Msgf("API Server is returning error: %#v", err)
...@@ -287,9 +287,11 @@ func (s *Server) Read(ctx context.Context, request *client.ReadRequest) (*client ...@@ -287,9 +287,11 @@ func (s *Server) Read(ctx context.Context, request *client.ReadRequest) (*client
for i := 0; i < len(res); i++ { for i := 0; i < len(res); i++ {
data[i] = &client.Item{ data[i] = &client.Item{
Id: res[i].ID, Id: res[i].ID,
Val: res[i].Val, Val: res[i].Val,
Version: res[i].Version.GetMap(), Version: &client.Version{
Version: res[i].Version.GetMap(),
},
} }
} }
...@@ -321,9 +323,11 @@ func (s *Server) Scan(ctx context.Context, request *client.ScanRequest) (*client ...@@ -321,9 +323,11 @@ func (s *Server) Scan(ctx context.Context, request *client.ScanRequest) (*client
for i := 0; i < len(res); i++ { for i := 0; i < len(res); i++ {
data[i] = &client.Item{ data[i] = &client.Item{
Id: res[i].ID, Id: res[i].ID,
Val: res[i].Val, Val: res[i].Val,
Version: res[i].Version.GetMap(), Version: &client.Version{
Version: res[i].Version.GetMap(),
},
} }
} }
......
...@@ -76,7 +76,7 @@ func (h *ExtHandler) HandleDeleteKeygroup(user string, k Keygroup) error { ...@@ -76,7 +76,7 @@ func (h *ExtHandler) HandleDeleteKeygroup(user string, k Keygroup) error {
} }
// HandleRead handles requests to the Read endpoint of the client interface. // HandleRead handles requests to the Read endpoint of the client interface.
func (h *ExtHandler) HandleRead(user string, i Item, version vclock.VClock) ([]Item, error) { func (h *ExtHandler) HandleRead(user string, i Item, versions []vclock.VClock) ([]Item, error) {
allowed, err := h.a.isAllowed(user, Read, i.Keygroup) allowed, err := h.a.isAllowed(user, Read, i.Keygroup)
if err != nil || !allowed { if err != nil || !allowed {
...@@ -88,10 +88,10 @@ func (h *ExtHandler) HandleRead(user string, i Item, version vclock.VClock) ([]I ...@@ -88,10 +88,10 @@ func (h *ExtHandler) HandleRead(user string, i Item, version vclock.VClock) ([]I
// TODO: decide what to do with tombstoned items? right now we just don't show them // TODO: decide what to do with tombstoned items? right now we just don't show them
var r []Item var r []Item
if version == nil { if len(versions) == 0 {
r, err = h.s.read(i.Keygroup, i.ID) r, err = h.s.read(i.Keygroup, i.ID)
} else { } else {
r, err = h.s.readVersion(i.Keygroup, i.ID, version) r, err = h.s.readVersion(i.Keygroup, i.ID, versions)
} }
result := make([]Item, 0, len(r)) result := make([]Item, 0, len(r))
......
...@@ -113,7 +113,10 @@ func (s *storeService) read(kg KeygroupName, id string) ([]Item, error) { ...@@ -113,7 +113,10 @@ func (s *storeService) read(kg KeygroupName, id string) ([]Item, error) {
return items, nil return items, nil
} }
func (s *storeService) readVersion(kg KeygroupName, id string, version vclock.VClock) ([]Item, error) { // readVersion reads data from the data store but also accepts a list of version vectors.
// TODO: what for? Currently it only returns values that are equal or newer than any of the given versions, i.e, it
// TODO: filters out concurrent versions.
func (s *storeService) readVersion(kg KeygroupName, id string, versions []vclock.VClock) ([]Item, error) {
//TODO: make this part of the single writer thing? //TODO: make this part of the single writer thing?
err := checkKGandID(kg, id) err := checkKGandID(kg, id)
...@@ -138,14 +141,17 @@ func (s *storeService) readVersion(kg KeygroupName, id string, version vclock.VC ...@@ -138,14 +141,17 @@ func (s *storeService) readVersion(kg KeygroupName, id string, version vclock.VC
var items []Item var items []Item
for i := range data { for i := range data {
if version.Compare(vvectors[i], vclock.Equal) || version.Compare(vvectors[i], vclock.Ancestor) { for _, v := range versions {
items = append(items, Item{ if v.Compare(vvectors[i], vclock.Equal) || v.Compare(vvectors[i], vclock.Ancestor) {
Keygroup: kg, items = append(items, Item{
ID: id, Keygroup: kg,
Val: data[i], ID: id,
Version: vvectors[i], Val: data[i],
Tombstoned: data[i] == "", Version: vvectors[i],
}) Tombstoned: data[i] == "",
})
continue
}
} }
} }
......
This diff is collapsed.
...@@ -51,13 +51,13 @@ message DeleteKeygroupRequest { ...@@ -51,13 +51,13 @@ message DeleteKeygroupRequest {
message ReadRequest { message ReadRequest {
string keygroup = 1; string keygroup = 1;
string id = 2; string id = 2;
Version version = 3; repeated Version versions = 3;
} }
message Item { message Item {
string id = 1; string id = 1;
string val = 2; string val = 2;
map<string, uint64> version = 3; Version version = 3;
} }
message ReadResponse { message ReadResponse {
......
...@@ -415,7 +415,7 @@ func (n *Node) GetItem(kgname, item string, expectError bool) ([]string, []vcloc ...@@ -415,7 +415,7 @@ func (n *Node) GetItem(kgname, item string, expectError bool) ([]string, []vcloc
for i, it := range res.Data { for i, it := range res.Data {
vals[i] = it.Val vals[i] = it.Val
vvectors[i] = it.Version vvectors[i] = it.Version.Version
} }
return vals, vvectors return vals, vvectors
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment