Commit 3044e32b authored by pfandzelter's avatar pfandzelter
Browse files

improve append API

parent 3a6f50e4
Pipeline #40307 passed with stages
in 16 minutes and 47 seconds
......@@ -280,7 +280,7 @@ Data in FReD keygroups can be read, updated/appended and deleted.
Keygroups can be configured to be either mutable or immutable.
Data in immutable keygroups can only be appended, existing keys cannot be updated or deleted.
When you append a new value to an immutable keygroup, a unique and incremental identifier is returned to you so you can later read that data.
You need to specifiy a key in the form of a 64-bit unsigned integer, e.g., a timestamp.
Data in mutable keygroups cannot be appended as there is no concept of key incrementation, but you can insert and update data at specified keys with the update operation.
If you update a key that does not exist yet it will be created.
......
......@@ -142,8 +142,10 @@ func (c *Client) Delete(ctx context.Context, keygroup string, id string) (*fredC
// Append also updates the moving average item speed
func (c *Client) Append(ctx context.Context, keygroup string, data string) (*fredClients.AppendResponse, error) {
start := time.Now()
id := uint64(time.Now().UnixNano())
res, err := c.Client.Append(ctx, &fredClients.AppendRequest{
Keygroup: keygroup,
Id: id,
Data: data,
})
if err == nil {
......
......@@ -6,6 +6,7 @@ import (
"crypto/x509"
"io/ioutil"
"net"
"strconv"
"time"
"git.tu-berlin.de/mcc-fred/fred/pkg/fred"
......@@ -342,7 +343,7 @@ func (s *Server) Append(ctx context.Context, request *client.AppendRequest) (*cl
return nil, err
}
res, err := s.e.HandleAppend(user, fred.Item{Keygroup: fred.KeygroupName(request.Keygroup), Val: request.Data})
res, err := s.e.HandleAppend(user, fred.Item{Keygroup: fred.KeygroupName(request.Keygroup), ID: strconv.FormatUint(request.Id, 10), Val: request.Data})
if err != nil {
return nil, err
......
package badgerdb
import (
"strconv"
"strings"
"time"
......@@ -298,21 +297,7 @@ func (s *Storage) IDs(kg string) ([]string, error) {
}
// Update updates the item with the specified id in the specified keygroup.
func (s *Storage) Update(kg, id, val string, append bool, expiry int, vvector vclock.VClock) error {
if append {
// make sure that we have our local sequence on point
key, err := strconv.Atoi(id)
if err != nil {
return errors.New(err)
}
for n, err := s.incrementSequence(kg); n < uint64(key); n, err = s.incrementSequence(kg) {
if err != nil {
return errors.New(err)
}
}
}
func (s *Storage) Update(kg, id, val string, expiry int, vvector vclock.VClock) error {
err := s.db.Update(func(txn *badger.Txn) error {
key := makeKeyName(kg, id, vvector)
......@@ -408,21 +393,12 @@ func (s *Storage) Delete(kg string, id string, vvector vclock.VClock) error {
}
// Append appends the item to the specified keygroup by incrementing the latest key by one.
func (s *Storage) Append(kg, val string, expiry int) (string, error) {
// first, get the latest key
// maximum of 18446744073709551615, though!
// if you reach this maximum, please send me a letter
n, err := s.incrementSequence(kg)
if err != nil {
return "", errors.New(err)
func (s *Storage) Append(kg string, id string, val string, expiry int) error {
if s.Exists(kg, id) {
return errors.Errorf("key %s for keygroup %s exists in database already and may not be changed", id, kg)
}
id := strconv.FormatUint(n, 10)
log.Debug().Msgf("append got next ID: %s", id)
err = s.db.Update(func(txn *badger.Txn) error {
err := s.db.Update(func(txn *badger.Txn) error {
key := makeKeyName(kg, id, vclock.VClock{})
if expiry > 0 {
......@@ -446,32 +422,10 @@ func (s *Storage) Append(kg, val string, expiry int) (string, error) {
})
if err != nil {
return "", errors.New(err)
}
return id, nil
}
func (s *Storage) incrementSequence(kg string) (uint64, error) {
seq, ok := s.seq[kg]
if !ok {
newSeq, err := s.db.GetSequence(makeLogConfigKeyName(kg), 100)
if err != nil {
return 0, errors.New(err)
}
s.seq[kg] = newSeq
seq = newSeq
}
n, err := seq.Next()
if err != nil {
return 0, errors.New(err)
return errors.New(err)
}
return n, nil
return nil
}
// Exists checks if the given data item exists in the badgerdb database.
......
......@@ -99,7 +99,7 @@ func TestReadSome(t *testing.T) {
ids[i] = "id" + strconv.Itoa(i)
vals[i] = "val" + strconv.Itoa(i)
err = db.Update(kg, ids[i], vals[i], false, 0, vclock.VClock{})
err = db.Update(kg, ids[i], vals[i], 0, vclock.VClock{})
assert.NoError(t, err)
......@@ -124,15 +124,15 @@ func TestReadAll(t *testing.T) {
assert.NoError(t, err)
err = db.Update(kg, "id-1", "data-1", false, 0, vclock.VClock{})
err = db.Update(kg, "id-1", "data-1", 0, vclock.VClock{})
assert.NoError(t, err)
err = db.Update(kg, "id-2", "data-2", false, 0, vclock.VClock{})
err = db.Update(kg, "id-2", "data-2", 0, vclock.VClock{})
assert.NoError(t, err)
err = db.Update(kg, "id-3", "data-3", false, 0, vclock.VClock{})
err = db.Update(kg, "id-3", "data-3", 0, vclock.VClock{})
assert.NoError(t, err)
......@@ -142,15 +142,15 @@ func TestReadAll(t *testing.T) {
assert.NoError(t, err)
err = db.Update(kg2, "id-4", "data-4", false, 0, vclock.VClock{})
err = db.Update(kg2, "id-4", "data-4", 0, vclock.VClock{})
assert.NoError(t, err)
err = db.Update(kg2, "id-5", "data-5", false, 0, vclock.VClock{})
err = db.Update(kg2, "id-5", "data-5", 0, vclock.VClock{})
assert.NoError(t, err)
err = db.Update(kg2, "id-6", "data-6", false, 0, vclock.VClock{})
err = db.Update(kg2, "id-6", "data-6", 0, vclock.VClock{})
assert.NoError(t, err)
......@@ -174,15 +174,15 @@ func TestIDs(t *testing.T) {
assert.NoError(t, err)
err = db.Update(kg, "id-1", "data-1", false, 0, vclock.VClock{})
err = db.Update(kg, "id-1", "data-1", 0, vclock.VClock{})
assert.NoError(t, err)
err = db.Update(kg, "id-2", "data-2", false, 0, vclock.VClock{})
err = db.Update(kg, "id-2", "data-2", 0, vclock.VClock{})
assert.NoError(t, err)
err = db.Update(kg, "id-3", "data-3", false, 0, vclock.VClock{})
err = db.Update(kg, "id-3", "data-3", 0, vclock.VClock{})
assert.NoError(t, err)
......@@ -192,15 +192,15 @@ func TestIDs(t *testing.T) {
assert.NoError(t, err)
err = db.Update(kg2, "id-1", "data-1", false, 0, vclock.VClock{})
err = db.Update(kg2, "id-1", "data-1", 0, vclock.VClock{})
assert.NoError(t, err)
err = db.Update(kg2, "id-2", "data-2", false, 0, vclock.VClock{})
err = db.Update(kg2, "id-2", "data-2", 0, vclock.VClock{})
assert.NoError(t, err)
err = db.Update(kg2, "id-3", "data-3", false, 0, vclock.VClock{})
err = db.Update(kg2, "id-3", "data-3", 0, vclock.VClock{})
assert.NoError(t, err)
......@@ -224,7 +224,7 @@ func TestItemExists(t *testing.T) {
assert.NoError(t, err)
err = db.Update(kg, id, value, false, 0, vclock.VClock{})
err = db.Update(kg, id, value, 0, vclock.VClock{})
assert.NoError(t, err)
ex := db.Exists(kg, id)
......@@ -245,7 +245,7 @@ func TestItemGet(t *testing.T) {
assert.NoError(t, err)
err = db.Update(kg, id, value, false, 0, vclock.VClock{})
err = db.Update(kg, id, value, 0, vclock.VClock{})
assert.NoError(t, err)
......@@ -265,7 +265,7 @@ func TestItemDelete(t *testing.T) {
err := db.CreateKeygroup(kg)
assert.NoError(t, err)
err = db.Update(kg, id, value, false, 0, vclock.VClock{})
err = db.Update(kg, id, value, 0, vclock.VClock{})
assert.NoError(t, err)
......@@ -300,7 +300,7 @@ func TestItemAfterDeleteKeygroup(t *testing.T) {
assert.NoError(t, err)
err = db.Update(kg, id, value, false, 0, vclock.VClock{})
err = db.Update(kg, id, value, 0, vclock.VClock{})
assert.NoError(t, err)
......@@ -323,7 +323,7 @@ func TestExpiry(t *testing.T) {
assert.NoError(t, err)
err = db.Update(kg, id, value, false, 10, vclock.VClock{})
err = db.Update(kg, id, value, 10, vclock.VClock{})
assert.NoError(t, err)
......@@ -357,25 +357,20 @@ func TestAppend(t *testing.T) {
assert.NoError(t, err)
key1, err := db.Append(kg, v1, 0)
err = db.Append(kg, "0", v1, 0)
assert.NoError(t, err)
assert.Equal(t, "0", key1)
key2, err := db.Append(kg, v2, 0)
err = db.Append(kg, "1", v2, 0)
assert.NoError(t, err)
assert.Equal(t, "1", key2)
for i := 2; i < 100; i++ {
v := "value-" + strconv.Itoa(i)
key, err := db.Append(kg, v, 0)
id := strconv.Itoa(i)
err := db.Append(kg, id, v, 0)
assert.NoError(t, err)
assert.Equal(t, strconv.Itoa(i), key)
}
}
......@@ -396,7 +391,8 @@ func TestConcurrentAppend(t *testing.T) {
go func(id int, keys *map[string]struct{}) {
for j := 2; j < items; j++ {
v := fmt.Sprintf("value-%d-%d", id, j)
key, err := db.Append(kg, v, 0)
key := strconv.Itoa(items*id + j)
err := db.Append(kg, key, v, 0)
assert.NoError(t, err)
......@@ -423,6 +419,24 @@ func TestConcurrentAppend(t *testing.T) {
}
func TestDualAppend(t *testing.T) {
kg := "logdual"
v1 := "value-1"
v2 := "value-2"
err := db.CreateKeygroup(kg)
assert.NoError(t, err)
err = db.Append(kg, "0", v1, 0)
assert.NoError(t, err)
err = db.Append(kg, "0", v2, 0)
assert.Error(t, err)
}
func TestTriggerNodes(t *testing.T) {
kg := "kg1"
......@@ -512,7 +526,7 @@ func TestPutItemVersion(t *testing.T) {
v1.Set("X", 10)
v1.Set("Y", 3)
v1.Set("Z", 7)
err = db.Update(kg, id, value, false, 0, v1)
err = db.Update(kg, id, value, 0, v1)
assert.NoError(t, err)
......@@ -541,7 +555,7 @@ func TestDeleteVersion(t *testing.T) {
v1.Set("X", 10)
v1.Set("Y", 3)
v1.Set("Z", 7)
err = db.Update(kg, id, value, false, 0, v1)
err = db.Update(kg, id, value, 0, v1)
assert.NoError(t, err)
......@@ -569,7 +583,7 @@ func TestClose(t *testing.T) {
assert.NoError(t, err)
err = db.Update(kg, id, value, false, 0, vclock.VClock{})
err = db.Update(kg, id, value, 0, vclock.VClock{})
assert.NoError(t, err)
......
......@@ -27,13 +27,12 @@ const (
valName = "Value"
triggerName = "Trigger"
expiryKey = "Expiry"
counterName = "Counter"
NULLValue = "%NULL%"
)
// Storage is a struct that saves all necessary information to access the database, in this case the session for
// DynamoDB and the table name. The DynamoDB table is set up with the following attributes:
// Keygroup (S) | Key (S) | Value (Document) | Expiry (N) | Trigger (Document) | Counter (N)
// Keygroup (S) | Key (S) | Value (Document) | Expiry (N) | Trigger (Document)
// where "Keygroup" is the partition key and "Key" is the sort key (both together form the primary key).
// Set this up with the following aws-cli command (table name in this case is "fred"):
//
......@@ -48,8 +47,8 @@ const (
// --time-to-live-specification "Enabled=true, AttributeName=Expiry"
//
// Two types of items are stored here:
// * Keygroup configuration is stored with the NULL "Key" and the keygroup name: this has the "Counter" attribute
// for append-only keygroups and the "Trigger" attribute that stores a map of trigger nodes for that keygroup
// * Keygroup configuration is stored with the NULL "Key" and the keygroup name: this has the "Trigger" attribute that
// stores a map of trigger nodes for that keygroup
// * Keys are stored with a "Keygroup" and unique "Key", where the Value is a list of version vectors and values - the
// additional "Expiry" attribute can be set to let the keys expire, and it is updated with each update to the data item
// (note that this means that in DynamoDB, all versions of an item expire at the same time, not necessarily in the
......@@ -478,46 +477,14 @@ func (s *Storage) ReadAll(kg string) (map[string][]string, map[string][]vclock.V
}
// Append appends the item to the specified keygroup by incrementing the latest key by one.
func (s *Storage) Append(kg, val string, expiry int) (string, error) {
input := &dynamodb.UpdateItemInput{
TableName: aws.String(s.dynamotable),
Key: map[string]dynamoDBTypes.AttributeValue{
keygroupName: &dynamoDBTypes.AttributeValueMemberS{
Value: kg,
},
keyName: &dynamoDBTypes.AttributeValueMemberS{
Value: NULLValue,
},
},
ExpressionAttributeNames: map[string]string{
"#counter": counterName,
},
ExpressionAttributeValues: map[string]dynamoDBTypes.AttributeValue{
":increase": &dynamoDBTypes.AttributeValueMemberN{
Value: "1",
},
},
UpdateExpression: aws.String("SET #counter = #counter + :increase"),
ReturnValues: "UPDATED_NEW",
}
func (s *Storage) Append(kg string, id string, val string, expiry int) error {
cond := expression.AttributeNotExists(expression.Name(keygroupName))
out, err := s.svc.UpdateItem(context.TODO(), input)
expr, err := expression.NewBuilder().WithCondition(cond).Build()
if err != nil {
log.Debug().Msgf("Append: could not update keygroup counter %s: %s", kg, errors.New(err).ErrorStack())
return "", errors.New(err)
}
c, ok := out.Attributes[counterName]
if !ok {
return "", errors.Errorf("could not increase counter")
}
newID, ok := c.(*dynamoDBTypes.AttributeValueMemberN)
if !ok {
return "", errors.Errorf("could not increase counter: malformed counter type")
log.Error().Msg(errors.New(err).ErrorStack())
return errors.New(err)
}
in := &dynamodb.PutItemInput{
......@@ -526,7 +493,7 @@ func (s *Storage) Append(kg, val string, expiry int) (string, error) {
Value: kg,
},
keyName: &dynamoDBTypes.AttributeValueMemberS{
Value: newID.Value,
Value: id,
},
valName: &dynamoDBTypes.AttributeValueMemberM{
Value: map[string]dynamoDBTypes.AttributeValue{
......@@ -536,7 +503,10 @@ func (s *Storage) Append(kg, val string, expiry int) (string, error) {
},
},
},
TableName: aws.String(s.dynamotable),
TableName: aws.String(s.dynamotable),
ConditionExpression: expr.Condition(),
ExpressionAttributeNames: expr.Names(),
ExpressionAttributeValues: expr.Values(),
}
if expiry > 0 {
......@@ -548,10 +518,10 @@ func (s *Storage) Append(kg, val string, expiry int) (string, error) {
_, err = s.svc.PutItem(context.TODO(), in)
if err != nil {
log.Error().Msg(errors.New(err).ErrorStack())
return "", errors.New(err)
return errors.New(err)
}
return newID.Value, nil
return nil
}
// IDs returns the keys of all items in the specified keygroup.
......@@ -634,35 +604,7 @@ func (s *Storage) IDs(kg string) ([]string, error) {
}
// Update updates the item with the specified id in the specified keygroup.
func (s *Storage) Update(kg string, id string, val string, append bool, expiry int, vvector vclock.VClock) error {
if append {
input := &dynamodb.UpdateItemInput{
TableName: aws.String(s.dynamotable),
Key: map[string]dynamoDBTypes.AttributeValue{
keygroupName: &dynamoDBTypes.AttributeValueMemberS{
Value: kg,
},
},
ExpressionAttributeNames: map[string]string{
"#counter": counterName,
},
ExpressionAttributeValues: map[string]dynamoDBTypes.AttributeValue{
":increase": &dynamoDBTypes.AttributeValueMemberN{
Value: "-1",
},
},
UpdateExpression: aws.String("SET #counter = #counter + :increase"),
ReturnValues: "UPDATED_NEW",
}
_, err := s.svc.UpdateItem(context.TODO(), input)
if err != nil {
log.Debug().Msgf("Update: could not update keygroup counter %s: %s", kg, err.Error())
return errors.New(err)
}
}
func (s *Storage) Update(kg string, id string, val string, expiry int, vvector vclock.VClock) error {
version := vectorToString(vvector)
input := &dynamodb.UpdateItemInput{
......@@ -905,9 +847,6 @@ func (s *Storage) CreateKeygroup(kg string) error {
keyName: &dynamoDBTypes.AttributeValueMemberS{
Value: NULLValue,
},
counterName: &dynamoDBTypes.AttributeValueMemberN{
Value: "-1",
},
},
TableName: aws.String(s.dynamotable),
}
......
......@@ -79,6 +79,7 @@ func TestMain(m *testing.M) {
if err != nil {
log.Fatal().Msg(err.Error())
return
}
cfg.Credentials = credentials.NewStaticCredentialsProvider("TEST_KEY", "TEST_SECRET", "")
......@@ -197,7 +198,7 @@ func TestItemExists(t *testing.T) {
assert.NoError(t, err)
err = db.Update(kg, id, value, false, 0, vclock.VClock{})
err = db.Update(kg, id, value, 0, vclock.VClock{})
assert.NoError(t, err)
ex := db.Exists(kg, id)
......@@ -218,7 +219,7 @@ func TestItemGet(t *testing.T) {
assert.NoError(t, err)
err = db.Update(kg, id, value, false, 0, vclock.VClock{})
err = db.Update(kg, id, value, 0, vclock.VClock{})
assert.NoError(t, err)
......@@ -238,7 +239,7 @@ func TestItemDelete(t *testing.T) {
err := db.CreateKeygroup(kg)
assert.NoError(t, err)
err = db.Update(kg, id, value, false, 0, vclock.VClock{})
err = db.Update(kg, id, value, 0, vclock.VClock{})
assert.NoError(t, err)
......@@ -281,7 +282,7 @@ func TestReadSome(t *testing.T) {
ids[i] = "id" + strconv.Itoa(i)
vals[i] = "val" + strconv.Itoa(i)
err = db.Update(kg, ids[i], vals[i], false, 0, vclock.VClock{})
err = db.Update(kg, ids[i], vals[i], 0, vclock.VClock{})
assert.NoError(t, err)
......@@ -306,15 +307,15 @@ func TestReadAll(t *testing.T) {
assert.NoError(t, err)
err = db.Update(kg, "id-1", "data-1", false, 0, vclock.VClock{})
err = db.Update(kg, "id-1", "data-1", 0, vclock.VClock{})
assert.NoError(t, err)
err = db.Update(kg, "id-2", "data-2", false, 0, vclock.VClock{})
err = db.Update(kg, "id-2", "data-2", 0, vclock.VClock{})
assert.NoError(t, err)
err = db.Update(kg, "id-3", "data-3", false, 0, vclock.VClock{})
err = db.Update(kg, "id-3", "data-3", 0, vclock.VClock{})
assert.NoError(t, err)
......@@ -324,15 +325,15 @@ func TestReadAll(t *testing.T) {
assert.NoError(t, err)
err = db.Update(kg2, "id-4", "data-4", false, 0, vclock.VClock{})
err = db.Update(kg2, "id-4", "data-4", 0, vclock.VClock{})
assert.NoError(t, err)
err = db.Update(kg2, "id-5", "data-5", false, 0, vclock.VClock{})
err = db.Update(kg2, "id-5", "data-5", 0, vclock.VClock{})
assert.NoError(t, err)
err = db.Update(kg2, "id-6", "data-6", false, 0, vclock.VClock{})
err = db.Update(kg2, "id-6", "data-6", 0, vclock.VClock{})
assert.NoError(t, err)
......@@ -356,15 +357,15 @@ func TestIDs(t *testing.T) {
assert.NoError(t, err)
err = db.Update(kg, "id-1", "data-1", false, 0, vclock.VClock{})
err = db.Update(kg, "id-1", "data-1", 0, vclock.VClock{})
assert.NoError(t, err)
err = db.Update(kg, "id-2", "data-2", false, 0, vclock.VClock{})
err = db.Update(kg, "id-2", "data-2", 0, vclock.VClock{})
assert.NoError(t, err)
err = db.Update(kg, "id-3", "data-3", false, 0, vclock.VClock{})
err = db.Update(kg, "id-3", "data-3", 0, vclock.VClock{})
assert.NoError(t, err)
......@@ -374,15 +375,15 @@ func TestIDs(t *testing.T) {
assert.NoError(t, err)
err = db.Update(kg2, "id-1", "data-1", false, 0, vclock.VClock{})
err = db.Update(kg2, "id-1", "data-1", 0, vclock.VClock{})
assert.NoError(t, err)
err = db.Update(kg2, "id-2", "data-2", false, 0, vclock.VClock{})