Commit 050a8815 authored by pfandzelter's avatar pfandzelter
Browse files

Merge branch 'tp/dynamodb-tests' into 'main'

DynamoDB Fix

See merge request !153
parents ae9081c4 4e60753b
Pipeline #34326 passed with stages
in 16 minutes and 25 seconds
This diff is collapsed.
......@@ -113,7 +113,7 @@ func TestReadSome(t *testing.T) {
for i := scanStart; i < scanStart+scanRange; i++ {
assert.Contains(t, res, ids[i])
assert.Equal(t, res[ids[i]], vals[i])
assert.Equal(t, vals[i], res[ids[i]])
}
}
......
package dynamo
import (
"sort"
"strconv"
"strings"
"time"
......@@ -8,7 +9,6 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/dynamodb"
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbattribute"
"github.com/aws/aws-sdk-go/service/dynamodb/dynamodbiface"
"github.com/aws/aws-sdk-go/service/dynamodb/expression"
"github.com/go-errors/errors"
......@@ -16,8 +16,11 @@ import (
)
const (
keyName = "Key"
sep = "|"
keyName = "Key"
valName = "Value"
counterName = "Counter"
expiryKey = "Expiry"
sep = "|"
)
// Storage is a struct that saves all necessary information to access the database, in this case the session for DynamoDB and the table name.
......@@ -62,24 +65,8 @@ func getKey(key string) (kg, id string) {
return
}
// New creates a new Session for DynamoDB.
func New(table, region string) (s *Storage, err error) {
log.Debug().Msgf("creating a new dynamodb connection to table %s in region %s", table, region)
log.Debug().Msg("Checked creds - OK!")
sess := session.Must(session.NewSession(&aws.Config{
Region: aws.String(region),
}))
log.Debug().Msg("Created session - OK!")
svc := dynamodb.New(sess)
log.Debug().Msg("Created service - OK!")
func NewFromExisting(table string, svc dynamodbiface.DynamoDBAPI) (s *Storage, err error) {
log.Debug().Msgf("Loading table %s...", table)
// check if the table with that name even exists
// if not: error out
desc, err := svc.DescribeTable(&dynamodb.DescribeTableInput{
......@@ -107,10 +94,29 @@ func New(table, region string) (s *Storage, err error) {
return &Storage{
dynamotable: table,
svc: dynamodbiface.DynamoDBAPI(svc),
svc: svc,
}, nil
}
// New creates a new Session for DynamoDB.
func New(table, region string) (s *Storage, err error) {
log.Debug().Msgf("creating a new dynamodb connection to table %s in region %s", table, region)
log.Debug().Msg("Checked creds - OK!")
sess := session.Must(session.NewSession(&aws.Config{
Region: aws.String(region),
}))
log.Debug().Msg("Created session - OK!")
svc := dynamodb.New(sess)
log.Debug().Msg("Created service - OK!")
return NewFromExisting(table, svc)
}
// Close closes the underlying DynamoDB connection (no cleanup needed at the moment).
func (s *Storage) Close() error {
return nil
......@@ -138,17 +144,26 @@ func (s *Storage) Read(kg string, id string) (string, error) {
return "", errors.Errorf("could not find item %s in keygroup %s", id, kg)
}
Item := struct {
Key string
Value string
}{}
val, ok := result.Item[valName]
err = dynamodbattribute.UnmarshalMap(result.Item, &Item)
if err != nil {
return "", errors.New(err)
if !ok {
return "", errors.Errorf("could not find item %s in keygroup %s", id, kg)
}
if e, ok := result.Item[expiryKey]; ok {
expiry, err := strconv.Atoi(*e.N)
if err != nil {
return "", errors.Errorf("could not find item %s in keygroup %s", id, kg)
}
log.Debug().Msgf("Read found key expiring at %d, it is %d now", expiry, time.Now().Unix())
if int64(expiry) < time.Now().Unix() {
return "", errors.Errorf("could not find item %s in keygroup %s", id, kg)
}
}
return Item.Value, nil
return *val.S, nil
}
......@@ -156,21 +171,20 @@ func (s *Storage) ReadSome(kg, id string, count uint64) (map[string]string, erro
key := makeKeygroupKeyName(kg)
start := makeKeyName(kg, id)
filt := expression.Name(keyName).BeginsWith(key).And(expression.Name(key).GreaterThan(expression.Key(start)))
filt := expression.Name(keyName).GreaterThanEqual(expression.Value(start))
expr, err := expression.NewBuilder().WithFilter(filt).Build()
if err != nil {
return nil, errors.New(err)
}
l := int64(count)
params := &dynamodb.ScanInput{
ExpressionAttributeNames: expr.Names(),
ExpressionAttributeValues: expr.Values(),
FilterExpression: expr.Filter(),
ProjectionExpression: expr.Projection(),
TableName: aws.String(s.dynamotable),
Limit: &l,
}
// Make the DynamoDB Query API call
......@@ -183,25 +197,53 @@ func (s *Storage) ReadSome(kg, id string, count uint64) (map[string]string, erro
for _, i := range result.Items {
item := struct {
Key string
Value string
}{}
k, ok := i[keyName]
err = dynamodbattribute.UnmarshalMap(i, &item)
if err != nil {
return items, errors.New(err)
if !ok {
return items, errors.Errorf("wrong format: key not in %#v", i)
}
if item.Key == key {
if *k.S == key {
continue
}
_, id := getKey(item.Key)
if e, ok := i[expiryKey]; ok {
expiry, err := strconv.Atoi(*e.N)
if err != nil {
return items, errors.Errorf("wrong format: expiry is not a number: %s", err.Error())
}
log.Debug().Msgf("ReadSome found key expiring at %d, it is %d now", expiry, time.Now().Unix())
if int64(expiry) < time.Now().Unix() {
continue
}
}
v, ok := i[valName]
if !ok {
return items, errors.Errorf("wrong format: value not in %#v", i)
}
_, keyID := getKey(*k.S)
items[keyID] = *v.S
}
items[id] = item.Value
// sort and filter
ids := make([]string, len(items))
i := 0
for k := range items {
ids[i] = k
i++
}
sort.Strings(ids)
for i = len(items) - 1; i >= int(count); i-- {
delete(items, ids[i])
}
return items, nil
......@@ -237,24 +279,38 @@ func (s *Storage) ReadAll(kg string) (map[string]string, error) {
for _, i := range result.Items {
item := struct {
Key string
Value string
}{}
k, ok := i[keyName]
err = dynamodbattribute.UnmarshalMap(i, &item)
if err != nil {
return items, errors.New(err)
if !ok {
return items, errors.Errorf("wrong format: key not in %#v", i)
}
if item.Key == key {
if *k.S == key {
continue
}
_, id := getKey(item.Key)
if e, ok := i[expiryKey]; ok {
expiry, err := strconv.Atoi(*e.N)
if err != nil {
return items, errors.Errorf("wrong format: expiry is not a number: %s", err.Error())
}
log.Debug().Msgf("ReadAll found key expiring at %d, it is %d now", expiry, time.Now().Unix())
if int64(expiry) < time.Now().Unix() {
continue
}
}
v, ok := i[valName]
if !ok {
return items, errors.Errorf("wrong format: value not in %#v", i)
}
_, id := getKey(*k.S)
items[id] = item.Value
items[id] = *v.S
}
......@@ -263,100 +319,66 @@ func (s *Storage) ReadAll(kg string) (map[string]string, error) {
// Append appends the item to the specified keygroup by incrementing the latest key by one.
func (s *Storage) Append(kg, val string, expiry int) (string, error) {
// first, get the latest key
// maximum of 18446744073709551615, though!
// if you reach this maximum, please send me a letter
var newest uint64
key := makeKeygroupKeyName(kg)
filt := expression.Name(keyName).BeginsWith(key)
expr, err := expression.NewBuilder().WithFilter(filt).Build()
if err != nil {
return "", errors.New(err)
input := &dynamodb.UpdateItemInput{
TableName: aws.String(s.dynamotable),
Key: map[string]*dynamodb.AttributeValue{
keyName: {
S: aws.String(key),
},
},
ExpressionAttributeNames: map[string]*string{
"#counter": aws.String(counterName),
},
ExpressionAttributeValues: map[string]*dynamodb.AttributeValue{
":increase": {
N: aws.String("1"),
},
},
UpdateExpression: aws.String("SET #counter = #counter + :increase"),
ReturnValues: aws.String("UPDATED_NEW"),
}
params := &dynamodb.ScanInput{
ExpressionAttributeNames: expr.Names(),
ExpressionAttributeValues: expr.Values(),
FilterExpression: expr.Filter(),
ProjectionExpression: expr.Projection(),
TableName: aws.String(s.dynamotable),
}
out, err := s.svc.UpdateItem(input)
// Make the DynamoDB Query API call
result, err := s.svc.Scan(params)
if err != nil {
log.Debug().Msgf("Append: could not update key %s: %s", key, err.Error())
return "", errors.New(err)
}
if len(result.Items) == 0 {
// nothing in existence yet, return MaxUInt (so we can increment it to 0)
newest = ^uint64(0)
} else {
for _, i := range result.Items {
item := struct {
Key string
}{}
err = dynamodbattribute.UnmarshalMap(i, &item)
if err != nil {
return "", errors.New(err)
}
if item.Key == key {
continue
}
_, id := getKey(item.Key)
c, ok := out.Attributes[counterName]
parsed, err := strconv.ParseUint(id, 10, 64)
if err != nil {
return "", errors.New(err)
}
if parsed > newest {
newest = parsed
}
}
}
// increment by one
// conveniently, if we reach MaxUint64, we can still increment by 1 to get back to 0
id := strconv.FormatUint(newest+1, 10)
Item := struct {
Key string
Value string
Expiry int64
}{
Key: id,
Value: val,
Expiry: time.Now().Unix() + int64(expiry),
if !ok {
return "", errors.Errorf("could not increase counter")
}
av, err := dynamodbattribute.MarshalMap(Item)
newID := c.N
if err != nil {
return "", errors.New(err)
in := &dynamodb.PutItemInput{
Item: map[string]*dynamodb.AttributeValue{
keyName: {
S: aws.String(makeKeyName(kg, *newID)),
},
valName: {
S: aws.String(val),
},
},
TableName: aws.String(s.dynamotable),
}
input := &dynamodb.PutItemInput{
Item: av,
TableName: aws.String(s.dynamotable),
if expiry > 0 {
in.Item[expiryKey] = &dynamodb.AttributeValue{
N: aws.String(strconv.FormatInt(time.Now().Unix()+int64(expiry), 10)),
}
}
_, err = s.svc.PutItem(input)
_, err = s.svc.PutItem(in)
if err != nil {
return "", errors.New(err)
}
return id, nil
return *newID, nil
}
// IDs returns the keys of all items in the specified keygroup.
......@@ -387,22 +409,30 @@ func (s *Storage) IDs(kg string) ([]string, error) {
}
for _, i := range result.Items {
k, ok := i[keyName]
item := struct {
Key string
}{}
err = dynamodbattribute.UnmarshalMap(i, &item)
if err != nil {
return ids, errors.New(err)
if !ok {
return ids, errors.Errorf("wrong format: key not in %#v", i)
}
if item.Key == key {
if *k.S == key {
continue
}
_, id := getKey(item.Key)
if e, ok := i[expiryKey]; ok {
expiry, err := strconv.Atoi(*e.N)
if err != nil {
return ids, errors.Errorf("wrong format: expiry is not a number: %s", err.Error())
}
log.Debug().Msgf("IDs found key expiring at %d, it is %d now", expiry, time.Now().Unix())
if int64(expiry) < time.Now().Unix() {
continue
}
}
_, id := getKey(*k.S)
ids = append(ids, id)
......@@ -416,28 +446,25 @@ func (s *Storage) Update(kg, id, val string, _ bool, expiry int) error {
key := makeKeyName(kg, id)
Item := struct {
Key string
Value string
Expiry int64
}{
Key: key,
Value: val,
Expiry: time.Now().Unix() + int64(expiry),
}
av, err := dynamodbattribute.MarshalMap(Item)
if err != nil {
return errors.New(err)
}
input := &dynamodb.PutItemInput{
Item: av,
Item: map[string]*dynamodb.AttributeValue{
keyName: {
S: aws.String(key),
},
valName: {
S: aws.String(val),
},
},
TableName: aws.String(s.dynamotable),
}
_, err = s.svc.PutItem(input)
if expiry > 0 {
input.Item[expiryKey] = &dynamodb.AttributeValue{
N: aws.String(strconv.FormatInt(time.Now().Unix()+int64(expiry), 10)),
}
}
_, err := s.svc.PutItem(input)
if err != nil {
return errors.New(err)
}
......@@ -487,6 +514,17 @@ func (s *Storage) Exists(kg string, id string) bool {
return false
}
if e, ok := result.Item[expiryKey]; ok {
expiry, err := strconv.Atoi(*e.N)
if err != nil {
return false
}
log.Debug().Msgf("Exists found key expiring at %d, it is %d now", expiry, time.Now().Unix())
return int64(expiry) >= time.Now().Unix()
}
return true
}
......@@ -518,26 +556,19 @@ func (s *Storage) ExistsKeygroup(kg string) bool {
func (s *Storage) CreateKeygroup(kg string) error {
key := makeKeygroupKeyName(kg)
Item := struct {
Key string
Value string
}{
Key: key,
Value: key,
}
av, err := dynamodbattribute.MarshalMap(Item)
if err != nil {
return errors.New(err)
}
input := &dynamodb.PutItemInput{
Item: av,
Item: map[string]*dynamodb.AttributeValue{
keyName: {
S: aws.String(key),
},
counterName: {
N: aws.String("-1"),
},
},
TableName: aws.String(s.dynamotable),
}
_, err = s.svc.PutItem(input)
_, err := s.svc.PutItem(input)
if err != nil {
return errors.New(err)
}
......@@ -574,21 +605,17 @@ func (s *Storage) DeleteKeygroup(kg string) error {
for _, i := range result.Items {
item := struct {
Key string
}{}
err = dynamodbattribute.UnmarshalMap(i, &item)
k, ok := i[keyName]
if err != nil {
return errors.New(err)
if !ok {
return errors.Errorf("wrong format")
}
input := &dynamodb.DeleteItemInput{
Key: map[string]*dynamodb.AttributeValue{
keyName: {
S: aws.String(item.Key),
S: aws.String(*k.S),
},
},
TableName: aws.String(s.dynamotable),
......@@ -628,21 +655,17 @@ func (s *Storage) DeleteKeygroup(kg string) error {
for _, i := range result.Items {
item := struct {
Key string
}{}
err = dynamodbattribute.UnmarshalMap(i, &item)
k, ok := i[keyName]
if err != nil {
return errors.New(err)
if !ok {
return errors.Errorf("wrong format")
}
input := &dynamodb.DeleteItemInput{
Key: map[string]*dynamodb.AttributeValue{
keyName: {
S: aws.String(item.Key),
S: aws.String(*k.S),
},
},
TableName: aws.String(s.dynamotable),
......@@ -661,26 +684,19 @@ func (s *Storage) DeleteKeygroup(kg string) error {
func (s *Storage) AddKeygroupTrigger(kg string, id string, host string) error {
key := makeTriggerConfigKeyName(kg, id)
Item := struct {
Key string
Value string
}{
Key: key,
Value: host,
}