Commit c9ca7862 authored by pfandzelter's avatar pfandzelter
Browse files

sort scan and readall output

parent cd6fc8bc
Pipeline #170108 failed with stages
in 16 minutes and 8 seconds
......@@ -2,7 +2,7 @@
<project version="4">
<component name="ProjectTasksOptions">
<TaskOptions isEnabled="true">
<option name="arguments" value="-l -w -s $FilePath$" />
<option name="arguments" value="fmt $FilePath$" />
<option name="checkSyntaxErrors" value="true" />
<option name="description" />
<option name="exitCodeBehavior" value="ERROR" />
......@@ -14,8 +14,8 @@
<array />
</option>
<option name="outputFromStdout" value="false" />
<option name="program" value="gofmt" />
<option name="runOnExternalChanges" value="false" />
<option name="program" value="$GoExecPath$" />
<option name="runOnExternalChanges" value="true" />
<option name="scopeName" value="Project Files" />
<option name="trackOnlyRoot" value="true" />
<option name="workingDir" value="$ProjectFileDir$" />
......@@ -25,8 +25,5 @@
<env name="PATH" value="$GoBinDirs$" />
</envs>
</TaskOptions>
<enabled-global>
<option value="go fmt" />
</enabled-global>
</component>
</project>
\ No newline at end of file
......@@ -82,20 +82,15 @@ github.com/asaskevich/govalidator v0.0.0-20190424111038-f61b66f89f4a/go.mod h1:l
github.com/aws/aws-sdk-go v1.15.11/go.mod h1:mFuSZ37Z9YOHbQEwBWztmVzqXrEkub65tZoCYDt7FT0=
github.com/aws/aws-sdk-go-v2 v1.7.1 h1:TswSc7KNqZ/K1Ijt3IkpXk/2+62vi3Q82Yrr5wSbRBQ=
github.com/aws/aws-sdk-go-v2 v1.7.1/go.mod h1:L5LuPC1ZgDr2xQS7AmIec/Jlc7O/Y1u2KxJyNVab250=
github.com/aws/aws-sdk-go-v2/config v1.5.0 h1:tRQcWXVmO7wC+ApwYc2LiYKfIBoIrdzcJ+7HIh6AlR0=
github.com/aws/aws-sdk-go-v2/config v1.5.0/go.mod h1:RWlPOAW3E3tbtNAqTwvSW54Of/yP3oiZXMI0xfUdjyA=
github.com/aws/aws-sdk-go-v2/credentials v1.3.1 h1:fFeqL5+9kwFKsCb2oci5yAIDsWYqn/Nga8oQ5bIasI8=
github.com/aws/aws-sdk-go-v2/credentials v1.3.1/go.mod h1:r0n73xwsIVagq8RsxmZbGSRQFj9As3je72C2WzUIToc=
github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.1.3 h1:xhdETBWOQEjxB2GIIuYNPPZcdzGfNxs8tQk+sXBUdSE=
github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue v1.1.3/go.mod h1:fiHL5545D891qYHNkErIZtJ57nQbZWdgbkeL+EkjqZQ=
github.com/aws/aws-sdk-go-v2/feature/dynamodb/expression v1.1.3 h1:4yE8dpeaCOOMS4bry9IS01qmODLJ8I5JjxraGdSh97Q=
github.com/aws/aws-sdk-go-v2/feature/dynamodb/expression v1.1.3/go.mod h1:PpfY11KyH7NJWPclM1nB4PAKcfG2kcjAayyvuZkuVJw=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.3.0 h1:s4vtv3Mv1CisI3qm2HGHi1Ls9ZtbCOEqeQn6oz7fTyU=
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.3.0/go.mod h1:2LAuqPx1I6jNfaGDucWfA2zqQCYCOMCDHiCOciALyNw=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.0.1 h1:1Q713GXCiz97619PtNTG/DU23x3SZHQE+PM54QBwUqw=
github.com/aws/aws-sdk-go-v2/internal/configsources v1.0.1/go.mod h1:H2dIRXkSkAPkxIA74UD1wYu0eS+cQxJcPKSmsfeZLUc=
github.com/aws/aws-sdk-go-v2/internal/ini v1.1.1 h1:SDLwr1NKyowP7uqxuLNdvFZhjnoVWxNv456zAp+ZFjU=
github.com/aws/aws-sdk-go-v2/internal/ini v1.1.1/go.mod h1:Zy8smImhTdOETZqfyn01iNOe0CNggVbPjCajyaz6Gvg=
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.4.1 h1:+aMPn6HsRIl/Mk5Ese2wwxYQsHbVIQbtgk5v+7S1FkE=
github.com/aws/aws-sdk-go-v2/service/dynamodb v1.4.1/go.mod h1:jGJgc16tA1bFltVx7X2FHGxU8y2Zn9MwhsRO+aIwFMM=
github.com/aws/aws-sdk-go-v2/service/dynamodbstreams v1.3.1 h1:6GnCq+bpGvZILetZj2N5dBnVeRkJu6HTiQwto9ppIyk=
......@@ -104,11 +99,8 @@ github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.2.1 h1:s/uV8UyM
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.2.1/go.mod h1:v33JQ57i2nekYTA70Mb+O18KeH4KqhdqxTJZNK1zdRE=
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.0.1 h1:Dd3LTXIKaAlcBeAd5xuxifyrjCJNHDZDUeAFm1Rhsn4=
github.com/aws/aws-sdk-go-v2/service/internal/endpoint-discovery v1.0.1/go.mod h1:wyrFI6fQZ148qNco6W7ZxTfkVuf1EfQKZckAWOA1mPg=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.2.1 h1:VJe/XEhrfyfBLupcGg1BfUSK2VMZNdbDcZQ49jnp+h0=
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.2.1/go.mod h1:zceowr5Z1Nh2WVP8bf/3ikB41IZW59E4yIYbg+pC6mw=
github.com/aws/aws-sdk-go-v2/service/sso v1.3.1 h1:H2ZLWHUbbeYtghuqCY5s/7tbBM99PAwCioRJF8QvV/U=
github.com/aws/aws-sdk-go-v2/service/sso v1.3.1/go.mod h1:J3A3RGUvuCZjvSuZEcOpHDnzZP/sKbhDWV2T1EOzFIM=
github.com/aws/aws-sdk-go-v2/service/sts v1.6.0 h1:Y9r6mrzOyAYz4qKaluSH19zqH1236il/nGbsPKOUT0s=
github.com/aws/aws-sdk-go-v2/service/sts v1.6.0/go.mod h1:q7o0j7d7HrJk/vr9uUt3BVRASvcU7gYZB9PUgPiByXg=
github.com/aws/smithy-go v1.6.0 h1:T6puApfBcYiTIsaI+SYWqanjMt5pc3aoyyDrI+0YH54=
github.com/aws/smithy-go v1.6.0/go.mod h1:SObp3lf9smib00L/v3U2eAKG8FyQ7iLrJnQiAmR5n+E=
......
......@@ -174,9 +174,10 @@ func (s *Storage) Read(kg string, id string) ([]string, []vclock.VClock, bool, e
}
// ReadSome returns count number of items in the specified keygroup starting at id.
func (s *Storage) ReadSome(kg, id string, count uint64) (map[string][]string, map[string][]vclock.VClock, error) {
items := make(map[string][]string)
vvectors := make(map[string][]vclock.VClock)
func (s *Storage) ReadSome(kg, id string, count uint64) ([]string, []string, []vclock.VClock, error) {
keys := make([]string, 0)
items := make([]string, 0)
vvectors := make([]vclock.VClock, 0)
err := s.db.View(func(txn *badger.Txn) error {
prefix := makeKeygroupKeyName(kg)
......@@ -199,14 +200,10 @@ func (s *Storage) ReadSome(kg, id string, count uint64) (map[string][]string, ma
return err
}
if _, ok := items[key]; ok {
items[key] = append(items[key], string(v))
vvectors[key] = append(vvectors[key], vvector)
continue
}
keys = append(keys, key)
items = append(items, string(v))
vvectors = append(vvectors, vvector)
items[key] = []string{string(v)}
vvectors[key] = []vclock.VClock{vvector}
i++
}
......@@ -214,16 +211,17 @@ func (s *Storage) ReadSome(kg, id string, count uint64) (map[string][]string, ma
})
if err != nil {
return nil, nil, errors.New(err)
return nil, nil, nil, errors.New(err)
}
return items, vvectors, nil
return keys, items, vvectors, nil
}
// ReadAll returns all items in the specified keygroup.
func (s *Storage) ReadAll(kg string) (map[string][]string, map[string][]vclock.VClock, error) {
items := make(map[string][]string)
vvectors := make(map[string][]vclock.VClock)
func (s *Storage) ReadAll(kg string) ([]string, []string, []vclock.VClock, error) {
keys := make([]string, 0)
items := make([]string, 0)
vvectors := make([]vclock.VClock, 0)
err := s.db.View(func(txn *badger.Txn) error {
prefix := makeKeygroupKeyName(kg)
......@@ -244,24 +242,20 @@ func (s *Storage) ReadAll(kg string) (map[string][]string, map[string][]vclock.V
return err
}
if _, ok := items[key]; ok {
items[key] = append(items[key], string(v))
vvectors[key] = append(vvectors[key], vvector)
continue
}
keys = append(keys, key)
items = append(items, string(v))
vvectors = append(vvectors, vvector)
items[key] = []string{string(v)}
vvectors[key] = []vclock.VClock{vvector}
}
return nil
})
if err != nil {
return nil, nil, errors.New(err)
return nil, nil, nil, errors.New(err)
}
return items, vvectors, nil
return keys, items, vvectors, nil
}
// IDs returns the keys of all items in the specified keygroup.
......
......@@ -105,16 +105,18 @@ func TestReadSome(t *testing.T) {
}
data, _, err := db.ReadSome(kg, "id"+strconv.Itoa(scanStart), uint64(scanRange))
keys, values, _, err := db.ReadSome(kg, "id"+strconv.Itoa(scanStart), uint64(scanRange))
assert.NoError(t, err)
assert.Len(t, data, scanRange)
assert.Len(t, keys, scanRange)
assert.Len(t, values, scanRange)
for i := scanStart; i < scanStart+scanRange; i++ {
assert.Contains(t, data, ids[i])
assert.Len(t, data[ids[i]], 1)
assert.Equal(t, data[ids[i]][0], vals[i])
assert.Contains(t, keys, ids[i])
assert.Contains(t, values, vals[i])
assert.Equal(t, ids[i], keys[i-scanStart])
assert.Equal(t, vals[i], values[i-scanStart])
}
}
......@@ -154,18 +156,18 @@ func TestReadAll(t *testing.T) {
assert.NoError(t, err)
data, _, err := db.ReadAll(kg)
keys, values, _, err := db.ReadAll(kg)
assert.NoError(t, err)
assert.Len(t, data, 3)
assert.Len(t, data["id-1"], 1)
assert.Equal(t, "data-1", data["id-1"][0])
assert.Len(t, data["id-2"], 1)
assert.Equal(t, "data-2", data["id-2"][0])
assert.Len(t, data["id-3"], 1)
assert.Equal(t, "data-3", data["id-3"][0])
assert.Len(t, keys, 3)
assert.Len(t, values, 3)
assert.Equal(t, "id-1", keys[0])
assert.Equal(t, "data-1", values[0])
assert.Equal(t, "id-2", keys[1])
assert.Equal(t, "data-2", values[1])
assert.Equal(t, "id-3", keys[2])
assert.Equal(t, "data-3", values[2])
}
func TestIDs(t *testing.T) {
......
......@@ -8,6 +8,7 @@ import (
"net/url"
"sort"
"strconv"
"strings"
"time"
"git.tu-berlin.de/mcc-fred/fred/pkg/vector"
......@@ -308,7 +309,7 @@ func (s *Storage) Read(kg string, id string) ([]string, []vclock.VClock, bool, e
}
func (s *Storage) ReadSome(kg string, id string, count uint64) (map[string][]string, map[string][]vclock.VClock, error) {
func (s *Storage) ReadSome(kg string, id string, count uint64) ([]string, []string, []vclock.VClock, error) {
// in this case we need to get all items with "Keygroup" kg and then sort them
filt := expression.Name(keygroupName).Equal(expression.Value(kg)).And(expression.Name(keyName).GreaterThanEqual(expression.Value(id)))
......@@ -318,7 +319,7 @@ func (s *Storage) ReadSome(kg string, id string, count uint64) (map[string][]str
if err != nil {
log.Error().Msg(errors.New(err).ErrorStack())
return nil, nil, errors.New(err)
return nil, nil, nil, errors.New(err)
}
params := &dynamodb.ScanInput{
......@@ -337,20 +338,25 @@ func (s *Storage) ReadSome(kg string, id string, count uint64) (map[string][]str
//return nil, nil, errors.New(err)
}
items := make(map[string][]string)
versions := make(map[string][]vclock.VClock)
type item struct {
key string
val string
version vclock.VClock
}
items := make([]item, 0, len(result.Items))
for _, i := range result.Items {
key, ok := i[keyName]
if !ok {
return nil, nil, nil
return nil, nil, nil, errors.Errorf("ReadSome: internal error, can't find key")
}
k, ok := key.(*dynamoDBTypes.AttributeValueMemberS)
if !ok {
return nil, nil, errors.Errorf("ReadSome: malformed key")
return nil, nil, nil, errors.Errorf("ReadSome: malformed key")
}
if k.Value == NULLValue {
......@@ -360,7 +366,7 @@ func (s *Storage) ReadSome(kg string, id string, count uint64) (map[string][]str
val, ok := i[valName]
if !ok {
return nil, nil, errors.Errorf("ReadSome: malformed value")
return nil, nil, nil, errors.Errorf("ReadSome: malformed value")
}
if e, ok := i[expiryKey]; ok {
......@@ -370,7 +376,7 @@ func (s *Storage) ReadSome(kg string, id string, count uint64) (map[string][]str
expiry, err := strconv.Atoi(expiration.Value)
if err != nil {
log.Error().Msg(errors.New(err).ErrorStack())
return nil, nil, errors.New(err)
return nil, nil, nil, errors.New(err)
}
log.Debug().Msgf("ReadSome found key expiring at %d, it is %d now", expiry, time.Now().Unix())
......@@ -388,53 +394,60 @@ func (s *Storage) ReadSome(kg string, id string, count uint64) (map[string][]str
continue
}
it := make([]string, 0, len(values.Value))
vvectors := make([]vclock.VClock, 0, len(values.Value))
for v, data := range values.Value {
version, err := vectorFromString(v)
if err != nil {
log.Error().Msg(errors.New(err).ErrorStack())
return nil, nil, errors.New(err)
return nil, nil, nil, errors.New(err)
}
i, ok := data.(*dynamoDBTypes.AttributeValueMemberS)
it, ok := data.(*dynamoDBTypes.AttributeValueMemberS)
if !ok {
return nil, nil, errors.Errorf("ReadSome: malformed item")
return nil, nil, nil, errors.Errorf("ReadSome: malformed item")
}
vvectors = append(vvectors, version)
items = append(items, item{
k.Value,
it.Value,
version,
})
it = append(it, i.Value)
}
items[k.Value] = it
versions[k.Value] = vvectors
}
// sort and filter
ids := make([]string, len(items))
// now we have a list of items, sort them and convert to lists
keys := make([]string, 0, len(items))
values := make([]string, 0, len(items))
versions := make([]vclock.VClock, 0, len(items))
i := 0
for k := range items {
ids[i] = k
i++
}
// now we have lists of keys and items, we need to sort them by the key attribute alphabetically
sort.Slice(items, func(i, j int) bool {
return strings.ToLower(items[i].key) < strings.ToLower(items[j].key)
})
sort.Strings(ids)
i := 0
curr := ""
for _, x := range items {
if x.key != curr {
curr = x.key
i++
if i > int(count) {
break
}
}
for i = len(items) - 1; i >= int(count); i-- {
delete(items, ids[i])
delete(versions, ids[i])
keys = append(keys, x.key)
values = append(values, x.val)
versions = append(versions, x.version)
}
return items, versions, nil
return keys, values, versions, nil
}
// ReadAll returns all items in the specified keygroup.
func (s *Storage) ReadAll(kg string) (map[string][]string, map[string][]vclock.VClock, error) {
func (s *Storage) ReadAll(kg string) ([]string, []string, []vclock.VClock, error) {
// in this case we need to get all items with "Keygroup" kg and then sort them
filt := expression.Name(keygroupName).Equal(expression.Value(kg))
......@@ -444,7 +457,7 @@ func (s *Storage) ReadAll(kg string) (map[string][]string, map[string][]vclock.V
if err != nil {
log.Error().Msg(errors.New(err).ErrorStack())
return nil, nil, errors.New(err)
return nil, nil, nil, errors.New(err)
}
params := &dynamodb.ScanInput{
......@@ -459,25 +472,30 @@ func (s *Storage) ReadAll(kg string) (map[string][]string, map[string][]vclock.V
result, err := s.svc.Scan(context.TODO(), params)
if err != nil {
log.Error().Msg(errors.New(err).ErrorStack())
return nil, nil, errors.New(err)
return nil, nil, nil, errors.New(err)
}
log.Debug().Msgf("ReadAll: got %d items", len(result.Items))
items := make(map[string][]string)
versions := make(map[string][]vclock.VClock)
type item struct {
key string
val string
version vclock.VClock
}
items := make([]item, 0, len(result.Items))
for _, i := range result.Items {
key, ok := i[keyName]
if !ok {
return nil, nil, nil
return nil, nil, nil, nil
}
k, ok := key.(*dynamoDBTypes.AttributeValueMemberS)
if !ok {
return nil, nil, errors.Errorf("ReadAll: malformed key")
return nil, nil, nil, errors.Errorf("ReadAll: malformed key")
}
if k.Value == NULLValue {
......@@ -487,7 +505,7 @@ func (s *Storage) ReadAll(kg string) (map[string][]string, map[string][]vclock.V
val, ok := i[valName]
if !ok {
return nil, nil, errors.Errorf("ReadAll: malformed value")
return nil, nil, nil, errors.Errorf("ReadAll: malformed value")
}
if e, ok := i[expiryKey]; ok {
......@@ -497,7 +515,7 @@ func (s *Storage) ReadAll(kg string) (map[string][]string, map[string][]vclock.V
expiry, err := strconv.Atoi(expiration.Value)
if err != nil {
log.Error().Msg(errors.New(err).ErrorStack())
return nil, nil, errors.New(err)
return nil, nil, nil, errors.New(err)
}
log.Debug().Msgf("ReadAll found key expiring at %d, it is %d now", expiry, time.Now().Unix())
......@@ -514,33 +532,45 @@ func (s *Storage) ReadAll(kg string) (map[string][]string, map[string][]vclock.V
continue
}
it := make([]string, 0, len(values.Value))
vvectors := make([]vclock.VClock, 0, len(values.Value))
for v, data := range values.Value {
version, err := vectorFromString(v)
if err != nil {
log.Error().Msg(errors.New(err).ErrorStack())
return nil, nil, errors.New(err)
return nil, nil, nil, errors.New(err)
}
i, ok := data.(*dynamoDBTypes.AttributeValueMemberS)
it, ok := data.(*dynamoDBTypes.AttributeValueMemberS)
if !ok {
return nil, nil, errors.Errorf("ReadAll: malformed item")
return nil, nil, nil, errors.Errorf("ReadAll: malformed item")
}
vvectors = append(vvectors, version)
it = append(it, i.Value)
items = append(items, item{
k.Value,
it.Value,
version,
})
}
}
// now we have a list of items, sort them and convert to lists
keys := make([]string, 0, len(items))
values := make([]string, 0, len(items))
versions := make([]vclock.VClock, 0, len(items))
// now we have lists of keys and items, we need to sort them by the key attribute alphabetically
sort.Slice(items, func(i, j int) bool {
return strings.ToLower(items[i].key) < strings.ToLower(items[j].key)
})
items[k.Value] = it
versions[k.Value] = vvectors
for _, x := range items {
keys = append(keys, x.key)
values = append(values, x.val)
versions = append(versions, x.version)
}
return items, versions, nil
return keys, values, versions, nil
}
// Append appends the item to the specified keygroup by incrementing the latest key by one.
......
......@@ -199,16 +199,18 @@ func TestReadSome(t *testing.T) {
}
data, _, err := db.ReadSome(kg, "id"+strconv.Itoa(scanStart), uint64(scanRange))
keys, values, _, err := db.ReadSome(kg, "id"+strconv.Itoa(scanStart), uint64(scanRange))
assert.NoError(t, err)
assert.Len(t, data, scanRange)
assert.Len(t, keys, scanRange)
assert.Len(t, values, scanRange)
for i := scanStart; i < scanStart+scanRange; i++ {
assert.Contains(t, data, ids[i])
assert.Len(t, data[ids[i]], 1)
assert.Equal(t, data[ids[i]][0], vals[i])
assert.Contains(t, keys, ids[i])
assert.Contains(t, values, vals[i])
assert.Equal(t, ids[i], keys[i-scanStart])
assert.Equal(t, vals[i], values[i-scanStart])
}
}
......@@ -248,17 +250,18 @@ func TestReadAll(t *testing.T) {
assert.NoError(t, err)
data, _, err := db.ReadAll(kg)
keys, values, _, err := db.ReadAll(kg)
assert.NoError(t, err)
assert.Len(t, data, 3)
assert.Len(t, data["id-1"], 1)
assert.Equal(t, "data-1", data["id-1"][0])
assert.Len(t, data["id-2"], 1)
assert.Equal(t, "data-2", data["id-2"][0])
assert.Len(t, data["id-3"], 1)
assert.Equal(t, "data-3", data["id-3"][0])
assert.Len(t, keys, 3)
assert.Len(t, values, 3)
assert.Equal(t, "id-1", keys[0])
assert.Equal(t, "data-1", values[0])
assert.Equal(t, "id-2", keys[1])
assert.Equal(t, "data-2", values[1])
assert.Equal(t, "id-3", keys[2])
assert.Equal(t, "data-3", values[2])
}
......
......@@ -20,9 +20,9 @@ type Store interface {
// Read Needs: keygroup, id; Returns: val, version vector, found
Read(kg string, id string) ([]string, []vclock.VClock, bool, error)
// ReadSome Needs: keygroup, id, range; Returns: ids, values, versions
ReadSome(kg string, id string, count uint64) (map[string][]string, map[string][]vclock.VClock, error)
ReadSome(kg string, id string, count uint64) ([]string, []string, []vclock.VClock, error)
// ReadAll Needs: keygroup; Returns: ids, values, versions
ReadAll(kg string) (map[string][]string, map[string][]vclock.VClock, error)
ReadAll(kg string) ([]string, []string, []vclock.VClock, error)
// IDs Needs: keygroup, Returns:[] keygroup, id
IDs(kg string) ([]string, error)
// Exists Needs: keygroup, id
......@@ -176,23 +176,25 @@ func (s *storeService) scan(kg KeygroupName, id string, count uint64) ([]Item, e
return nil, errors.Errorf("no such item %s in keygroup %+v", id, kg)
}
data, vvectors, err := s.iS.ReadSome(string(kg), id, count)
keys, data, vvectors, err := s.iS.ReadSome(string(kg), id, count)
if err != nil {
return nil, err
}
items := make([]Item, 0)
items := make([]Item, len(keys))
for key, item := range data {
for i := range item {
items = append(items, Item{
Keygroup: kg,
ID: key,
Val: item[i],
Version: vvectors[key][i],
Tombstoned: item[i] == "",
})
for i := range keys {
key := keys[i]
item := data[i]
version := vvectors[i]
items[i] = Item{
Keygroup: kg,
ID: key,
Val: item,
Version: version,