Commit 4a97ced6 authored by pfandzelter's avatar pfandzelter
Browse files

enable data item expiry per keygroup<->replica node

parent 9c81a791
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ChangeListManager">
<list default="true" id="b8b6ef1f-1fb0-45b7-ad29-aed6293e4aa1" name="Default Changelist" comment="Add Git Workflow">
<list default="true" id="2cf60814-4e57-4682-96ef-bf0b72e1fabd" name="Default Changelist" comment="">
<change beforePath="$PROJECT_DIR$/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" />
<change beforePath="$PROJECT_DIR$/pkg/externalconnection/client.pb.go" beforeDir="false" afterPath="$PROJECT_DIR$/pkg/externalconnection/client.pb.go" afterDir="false" />
<change beforePath="$PROJECT_DIR$/pkg/externalconnection/client.proto" beforeDir="false" afterPath="$PROJECT_DIR$/pkg/externalconnection/client.proto" afterDir="false" />
<change beforePath="$PROJECT_DIR$/pkg/externalconnection/extserver.go" beforeDir="false" afterPath="$PROJECT_DIR$/pkg/externalconnection/extserver.go" afterDir="false" />
<change beforePath="$PROJECT_DIR$/tests/3NodeTest/cmd/main/main.go" beforeDir="false" afterPath="$PROJECT_DIR$/tests/3NodeTest/cmd/main/main.go" afterDir="false" />
<change beforePath="$PROJECT_DIR$/tests/3NodeTest/pkg/grpcclient/node.go" beforeDir="false" afterPath="$PROJECT_DIR$/tests/3NodeTest/pkg/grpcclient/node.go" afterDir="false" />
</list>
......
......@@ -29,4 +29,22 @@ How do I push changes to the `master` branch?
7. Create a pull request from gitlab.tu-berlin.de
8. Get pull request reviewed and merge it into master
Some last words, keep pull requests small (not 100 files changed etc :D), so they are easier to review and rather create a lot of pull requests than one big
\ No newline at end of file
Some last words, keep pull requests small (not 100 files changed etc :D), so they are easier to review and rather create a lot of pull requests than one big
## Using the DynamoDB Backend
To use the DynamoDB storage backend, a table must already exist in DynamoDB.
It should have the String Hash Key "Key" and a [Number field "Expiry" that is enabled as the TTL attribute](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/time-to-live-ttl-how-to.html).
Furthermore, the `fred` process that talks to DynamoDB should have IAM keys configured as environment variables and the corresponding IAM user must have permission to access the table.
To create a table named `fred` (this must be passed in as command-line parameter `--dynamo-table=fred`) using the AWS CLI:
```bash
AWS_PAGER="" aws dynamodb create-table --table-name fred --attribute-definitions "AttributeName=Key,AttributeType=S AttributeName=Value,AttributeType=S AttributeName=Expiry,AttributeType=N" --key-schema "AttributeName=Key,KeyType=HASH" --provisioned-throughput "ReadCapacityUnits=1,WriteCapacityUnits=1"
AWS_PAGER="" aws dynamodb update-time-to-live --table-name fred --time-to-live-specification "Enabled=true, AttributeName=Expiry"
```
To delete the table:
```bash
AWS_PAGER="" aws dynamodb delete-table --table-name fred
```
\ No newline at end of file
......@@ -43,10 +43,12 @@ Class | Method | HTTP request | Description
## Documentation For Models
- [Body](docs/Body.md)
- [Body1](docs/Body1.md)
- [Item](docs/Item.md)
- [ModelError](docs/ModelError.md)
- [Node](docs/Node.md)
- [ReplicationList](docs/ReplicationList.md)
- [ReplicationListNodes](docs/ReplicationListNodes.md)
- [TriggerList](docs/TriggerList.md)
- [TriggerListNodes](docs/TriggerListNodes.md)
- [TriggerNode](docs/TriggerNode.md)
......
......@@ -47,7 +47,8 @@ paths:
- in: "body"
name: "body"
description: "Type of keygroup to create (true for mutable table or false\
\ for append-only log)"
\ for append-only log) and expiration of items on this replica in seconds\
\ (if <=0, data will not expire)"
required: true
schema:
$ref: "#/definitions/body"
......@@ -135,6 +136,14 @@ paths:
type: "string"
format: "string"
x-exportParamName: "NodeId"
- in: "body"
name: "body"
description: "Parameters for keygroup replication on this node, namely expiration\
\ of items on this replica in seconds (if <=0, data will not expire)"
required: true
schema:
$ref: "#/definitions/body_1"
x-exportParamName: "Body"
responses:
"200":
description: "OK"
......@@ -513,14 +522,18 @@ definitions:
nodes:
type: "array"
items:
type: "string"
format: "utf-8"
example: "nodeA"
$ref: "#/definitions/ReplicationList_nodes"
example:
nodes:
- "nodeB"
- "nodeC"
- "nodeD"
nodeB:
id: "nodeB"
expiry: 0
nodeC:
id: "nodeC"
expiry: 300
nodeD:
id: "nodeD"
expiry: 3600
TriggerList:
type: "object"
required:
......@@ -539,11 +552,36 @@ definitions:
body:
type: "object"
required:
- "expiry"
- "mutable"
properties:
mutable:
type: "boolean"
example: false
expiry:
type: "number"
example: 0
body_1:
type: "object"
required:
- "expiry"
properties:
expiry:
type: "number"
example: 0
ReplicationList_nodes:
type: "object"
required:
- "expiry"
- "id"
properties:
id:
type: "string"
format: "utf-8"
example: "nodeA"
expiry:
type: "number"
example: 0
TriggerList_nodes:
type: "object"
required:
......
......@@ -114,7 +114,7 @@ KeygroupApiService Create a new Keygroup
Creates a new Keygroup with the name &#x60;group_id&#x60; if it does not exist already.
* @param ctx context.Context - for authentication, logging, cancellation, deadlines, tracing, etc. Passed from http.Request or context.Background().
* @param groupId Name of Keygroup
* @param body Type of keygroup to create (true for mutable table or false for append-only log)
* @param body Type of keygroup to create (true for mutable table or false for append-only log) and expiration of items on this replica in seconds (if &lt;&#x3D;0, data will not expire)
*/
......@@ -384,10 +384,11 @@ Registers the node with the name &#x60;node_id&#x60; as a replica node for a Key
* @param ctx context.Context - for authentication, logging, cancellation, deadlines, tracing, etc. Passed from http.Request or context.Background().
* @param groupId Name of Keygroup
* @param nodeId Name of Replica Node
* @param body Parameters for keygroup replication on this node, namely expiration of items on this replica in seconds (if &lt;&#x3D;0, data will not expire)
*/
func (a *KeygroupApiService) KeygroupGroupIdReplicaNodeIdPost(ctx context.Context, groupId string, nodeId string) (*http.Response, error) {
func (a *KeygroupApiService) KeygroupGroupIdReplicaNodeIdPost(ctx context.Context, groupId string, nodeId string, body Body1) (*http.Response, error) {
var (
localVarHttpMethod = strings.ToUpper("Post")
localVarPostBody interface{}
......@@ -422,6 +423,8 @@ func (a *KeygroupApiService) KeygroupGroupIdReplicaNodeIdPost(ctx context.Contex
if localVarHttpHeaderAccept != "" {
localVarHeaderParams["Accept"] = localVarHttpHeaderAccept
}
// body params
localVarPostBody = &body
r, err := a.client.prepareRequest(ctx, localVarPath, localVarHttpMethod, localVarPostBody, localVarHeaderParams, localVarQueryParams, localVarFormParams, localVarFileName, localVarFileBytes)
if err != nil {
return nil, err
......
......@@ -217,10 +217,11 @@ Registers the node with the name &#x60;node_id&#x60; as a replica node for a Key
* @param ctx context.Context - for authentication, logging, cancellation, deadlines, tracing, etc. Passed from http.Request or context.Background().
* @param groupId Name of Keygroup
* @param nodeId Name of Replica Node
* @param body Parameters for keygroup replication on this node, namely expiration of items on this replica in seconds (if &lt;&#x3D;0, data will not expire)
*/
func (a *ReplicationApiService) KeygroupGroupIdReplicaNodeIdPost(ctx context.Context, groupId string, nodeId string) (*http.Response, error) {
func (a *ReplicationApiService) KeygroupGroupIdReplicaNodeIdPost(ctx context.Context, groupId string, nodeId string, body Body1) (*http.Response, error) {
var (
localVarHttpMethod = strings.ToUpper("Post")
localVarPostBody interface{}
......@@ -255,6 +256,8 @@ func (a *ReplicationApiService) KeygroupGroupIdReplicaNodeIdPost(ctx context.Con
if localVarHttpHeaderAccept != "" {
localVarHeaderParams["Accept"] = localVarHttpHeaderAccept
}
// body params
localVarPostBody = &body
r, err := a.client.prepareRequest(ctx, localVarPath, localVarHttpMethod, localVarPostBody, localVarHeaderParams, localVarQueryParams, localVarFormParams, localVarFileName, localVarFileBytes)
if err != nil {
return nil, err
......
......@@ -4,6 +4,7 @@
Name | Type | Description | Notes
------------ | ------------- | ------------- | -------------
**Mutable** | **bool** | | [default to null]
**Expiry** | **float32** | | [default to null]
[[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md)
......
# Body1
## Properties
Name | Type | Description | Notes
------------ | ------------- | ------------- | -------------
**Expiry** | **float32** | | [default to null]
[[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md)
......@@ -51,7 +51,7 @@ Name | Type | Description | Notes
------------- | ------------- | ------------- | -------------
**ctx** | **context.Context** | context for authentication, logging, cancellation, deadlines, tracing, etc.
**groupId** | **string**| Name of Keygroup |
**body** | [**Body**](Body.md)| Type of keygroup to create (true for mutable table or false for append-only log) |
**body** | [**Body**](Body.md)| Type of keygroup to create (true for mutable table or false for append-only log) and expiration of items on this replica in seconds (if &lt;&#x3D;0, data will not expire) |
### Return type
......@@ -126,7 +126,7 @@ No authorization required
[[Back to top]](#) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to Model list]](../README.md#documentation-for-models) [[Back to README]](../README.md)
# **KeygroupGroupIdReplicaNodeIdPost**
> KeygroupGroupIdReplicaNodeIdPost(ctx, groupId, nodeId)
> KeygroupGroupIdReplicaNodeIdPost(ctx, groupId, nodeId, body)
Create a new Replica node for a Keygroup
Registers the node with the name `node_id` as a replica node for a Keygroup with the name `group_id` if it does not exist already.
......@@ -138,6 +138,7 @@ Name | Type | Description | Notes
**ctx** | **context.Context** | context for authentication, logging, cancellation, deadlines, tracing, etc.
**groupId** | **string**| Name of Keygroup |
**nodeId** | **string**| Name of Replica Node |
**body** | [**Body1**](Body1.md)| Parameters for keygroup replication on this node, namely expiration of items on this replica in seconds (if &lt;&#x3D;0, data will not expire) |
### Return type
......
......@@ -70,7 +70,7 @@ No authorization required
[[Back to top]](#) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to Model list]](../README.md#documentation-for-models) [[Back to README]](../README.md)
# **KeygroupGroupIdReplicaNodeIdPost**
> KeygroupGroupIdReplicaNodeIdPost(ctx, groupId, nodeId)
> KeygroupGroupIdReplicaNodeIdPost(ctx, groupId, nodeId, body)
Create a new Replica node for a Keygroup
Registers the node with the name `node_id` as a replica node for a Keygroup with the name `group_id` if it does not exist already.
......@@ -82,6 +82,7 @@ Name | Type | Description | Notes
**ctx** | **context.Context** | context for authentication, logging, cancellation, deadlines, tracing, etc.
**groupId** | **string**| Name of Keygroup |
**nodeId** | **string**| Name of Replica Node |
**body** | [**Body1**](Body1.md)| Parameters for keygroup replication on this node, namely expiration of items on this replica in seconds (if &lt;&#x3D;0, data will not expire) |
### Return type
......
......@@ -3,7 +3,7 @@
## Properties
Name | Type | Description | Notes
------------ | ------------- | ------------- | -------------
**Nodes** | **[]string** | | [default to null]
**Nodes** | [**[]ReplicationListNodes**](ReplicationList_nodes.md) | | [default to null]
[[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md)
......
# ReplicationListNodes
## Properties
Name | Type | Description | Notes
------------ | ------------- | ------------- | -------------
**Id** | **string** | | [default to null]
**Expiry** | **float32** | | [default to null]
[[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md)
......@@ -12,4 +12,5 @@ package swagger
type Body struct {
Mutable bool `json:"mutable"`
Expiry float32 `json:"expiry"`
}
/*
* FReD
*
* FReD - Fog Replicated Data Storage.
*
* API version: 0.0.4
* Contact: tp@mcc.tu-berlin.de
* Generated by: Swagger Codegen (https://github.com/swagger-api/swagger-codegen.git)
*/
package swagger
type Body1 struct {
Expiry float32 `json:"expiry"`
}
......@@ -11,5 +11,5 @@
package swagger
type ReplicationList struct {
Nodes []string `json:"nodes"`
Nodes []ReplicationListNodes `json:"nodes"`
}
/*
* FReD
*
* FReD - Fog Replicated Data Storage.
*
* API version: 0.0.4
* Contact: tp@mcc.tu-berlin.de
* Generated by: Swagger Codegen (https://github.com/swagger-api/swagger-codegen.git)
*/
package swagger
type ReplicationListNodes struct {
Id string `json:"id"`
Expiry float32 `json:"expiry"`
}
......@@ -198,11 +198,24 @@ func (s *Storage) IDs(kg string) ([]string, error) {
}
// Update updates the item with the specified id in the specified keygroup.
func (s *Storage) Update(kg, id, val string) error {
func (s *Storage) Update(kg, id, val string, expiry int) error {
err := s.db.Update(func(txn *badger.Txn) error {
key := makeKeyName(kg, id)
if expiry > 0 {
err := txn.SetEntry(&badger.Entry{
Key: key,
Value: []byte(val),
ExpiresAt: uint64(time.Now().Unix()) + uint64(expiry),
})
if err != nil {
return err
}
return nil
}
err := txn.Set(key, []byte(val))
if err != nil {
return err
}
......
......@@ -2,6 +2,7 @@ package badgerdb
import (
"testing"
"time"
"github.com/go-errors/errors"
"github.com/rs/zerolog"
......@@ -52,19 +53,19 @@ func TestReadAll(t *testing.T) {
log.Err(err).Msg(err.(*errors.Error).ErrorStack())
}
err = db.Update(kg, "id-1", "data-1")
err = db.Update(kg, "id-1", "data-1", 0)
if err != nil {
log.Err(err).Msg(err.(*errors.Error).ErrorStack())
}
err = db.Update(kg, "id-2", "data-2")
err = db.Update(kg, "id-2", "data-2", 0)
if err != nil {
log.Err(err).Msg(err.(*errors.Error).ErrorStack())
}
err = db.Update(kg, "id-3", "data-3")
err = db.Update(kg, "id-3", "data-3", 0)
if err != nil {
log.Err(err).Msg(err.(*errors.Error).ErrorStack())
......@@ -78,19 +79,19 @@ func TestReadAll(t *testing.T) {
log.Err(err).Msg(err.(*errors.Error).ErrorStack())
}
err = db.Update(kg2, "id-1", "data-1")
err = db.Update(kg2, "id-1", "data-1", 0)
if err != nil {
log.Err(err).Msg(err.(*errors.Error).ErrorStack())
}
err = db.Update(kg2, "id-2", "data-2")
err = db.Update(kg2, "id-2", "data-2", 0)
if err != nil {
log.Err(err).Msg(err.(*errors.Error).ErrorStack())
}
err = db.Update(kg2, "id-3", "data-3")
err = db.Update(kg2, "id-3", "data-3", 0)
if err != nil {
log.Err(err).Msg(err.(*errors.Error).ErrorStack())
......@@ -119,7 +120,7 @@ func TestItemGet(t *testing.T) {
log.Err(err).Msg(err.(*errors.Error).ErrorStack())
}
err = db.Update(kg, id, value)
err = db.Update(kg, id, value, 0)
if err != nil {
log.Err(err).Msg(err.(*errors.Error).ErrorStack())
......@@ -145,7 +146,7 @@ func TestItemAfterDeleteKeygroup(t *testing.T) {
log.Err(err).Msg(err.(*errors.Error).ErrorStack())
}
err = db.Update(kg, id, value)
err = db.Update(kg, id, value, 0)
if err != nil {
log.Err(err).Msg(err.(*errors.Error).ErrorStack())
......@@ -162,3 +163,36 @@ func TestItemAfterDeleteKeygroup(t *testing.T) {
t.Fatalf("Expected an error, but got %s", retr)
}
}
func TestExpiry(t *testing.T) {
kg := "test-kg-item"
id := "name"
value := "value"
err := db.CreateKeygroup(kg)
if err != nil {
log.Err(err).Msg(err.(*errors.Error).ErrorStack())
}
err = db.Update(kg, id, value, 10)
if err != nil {
log.Err(err).Msg(err.(*errors.Error).ErrorStack())
}
retr, err := db.Read(kg, id)
if err != nil {
t.Fatal(err)
}
if retr != value {
t.Fatalf("Expected to get %s but got %s", value, retr)
}
time.Sleep(10 * time.Second)
_, err = db.Read(kg, id)
if err == nil {
t.Fatal(err)
}
}
......@@ -6,6 +6,7 @@ import (
"github.com/aws/aws-sdk-go/service/dynamodb/expression"
"github.com/rs/zerolog/log"
"strings"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
......@@ -239,16 +240,18 @@ func (s *Storage) IDs(kg string) ([]string, error) {
}
// Update updates the item with the specified id in the specified keygroup.
func (s *Storage) Update(kg, id, val string) error {
func (s *Storage) Update(kg, id, val string, expiry int) error {
key := makeKeyName(kg, id)
Item := struct {
Key string
Value string
Key string
Value string
Expiry int64
}{
Key: key,
Value: val,
Key: key,
Value: val,
Expiry: time.Now().Unix() + int64(expiry),
}
av, err := dynamodbattribute.MarshalMap(Item)
......
This diff is collapsed.
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment