Commit 98e7e53a authored by t.schirmer's avatar t.schirmer Committed by pfandzelter
Browse files

Naming Service

parent 62241500
**/*
!/pkg/*
!/cmd/*
!go.mod
\ No newline at end of file
!go.mod
!go.sum
\ No newline at end of file
......@@ -8,6 +8,7 @@ frednode
# leveldb database
db/
test.db/
### tubCloud ###
*_conflic*
......
......@@ -12,10 +12,15 @@ RUN mkdir /lib64 && ln -s /lib/libc.musl-x86_64.so.1 /lib64/ld-linux-x86-64.so.2
WORKDIR /go/src/gitlab.tu-berlin.de/mcc-fred/fred/
# Make an extra layer for the installed packages so that they dont have to be downloaded everytime
COPY go.mod .
COPY go.sum .
RUN go mod download
COPY . .
# Static build required so that we can safely copy the binary over.
RUN ls
RUN touch ./cmd/frednode/dummy.cc
RUN go install -a -ldflags '-linkmode external -w -s -extldflags "-static -luuid" ' ./cmd/frednode/
......
......@@ -7,7 +7,7 @@
## Setup
In order to run zmq please install [zmq](https://zeromq.org/download/) and [czmq](http://czmq.zeromq.org/page:get-the-software).
On Arch, this is done by running `yay -S czmq`. Or use the Docker image.
On Arch, this is done by running `yay -S czmq-git`. Or use the Docker image.
To use Terraform, install [Terraform](https://www.terraform.io/downloads.html).
......
......@@ -16,21 +16,22 @@ import (
"gitlab.tu-berlin.de/mcc-fred/fred/pkg/data"
"gitlab.tu-berlin.de/mcc-fred/fred/pkg/exthandler"
"gitlab.tu-berlin.de/mcc-fred/fred/pkg/inthandler"
"gitlab.tu-berlin.de/mcc-fred/fred/pkg/keygroup"
"gitlab.tu-berlin.de/mcc-fred/fred/pkg/leveldbsd"
"gitlab.tu-berlin.de/mcc-fred/fred/pkg/memorykg"
"gitlab.tu-berlin.de/mcc-fred/fred/pkg/memoryrs"
"gitlab.tu-berlin.de/mcc-fred/fred/pkg/memorysd"
"gitlab.tu-berlin.de/mcc-fred/fred/pkg/memoryzmq"
"gitlab.tu-berlin.de/mcc-fred/fred/pkg/nameservice"
"gitlab.tu-berlin.de/mcc-fred/fred/pkg/replhandler"
"gitlab.tu-berlin.de/mcc-fred/fred/pkg/replication"
storage "gitlab.tu-berlin.de/mcc-fred/fred/pkg/storageconnection"
"gitlab.tu-berlin.de/mcc-fred/fred/pkg/webserver"
"gitlab.tu-berlin.de/mcc-fred/fred/pkg/zmqclient"
"gitlab.tu-berlin.de/mcc-fred/fred/pkg/zmqserver"
"gitlab.tu-berlin.de/mcc-fred/fred/pkg/zmqtointhandler"
)
type fredConfig struct {
General struct {
nodeID string `toml:nodeID`
} `toml:general`
Location struct {
Lat float64 `toml:"lat"`
Lng float64 `toml:"lng"`
......@@ -44,7 +45,8 @@ type fredConfig struct {
Adaptor string `toml:"adaptor"`
} `toml:"storage"`
ZMQ struct {
Port int `toml:"port"`
Port int `toml:"port"`
Host string `toml:"host"`
} `toml:"zmq"`
Log struct {
Level string `toml:"level"`
......@@ -57,11 +59,15 @@ type fredConfig struct {
Ldb struct {
Path string `toml:"path"`
} `toml:"leveldb"`
NaSe struct {
Host string `toml:"host"`
} `toml:"nase"`
}
const apiversion string = "/v0"
var (
nodeID = kingpin.Flag("nodeID", "Unique ID of this node. Will be calculated from lat/long if omitted").String()
configPath = kingpin.Flag("config", "Path to .toml configuration file.").PlaceHolder("PATH").String()
lat = kingpin.Flag("lat", "Latitude of the node.").PlaceHolder("LATITUDE").Default("-200").Float64() // Domain: [-90,90]
lng = kingpin.Flag("lng", "Longitude of the node.").PlaceHolder("LONGITUDE").Default("-200").Float64() // Domain: ]-180,180]
......@@ -69,12 +75,15 @@ var (
wsPort = kingpin.Flag("ws-port", "Port of webserver.").PlaceHolder("WS-PORT").Default("-1").Int() // Domain: [0,9999]
wsSSL = kingpin.Flag("use-tls", "Use TLS/SSL to serve over HTTPS. Works only if host argument is a FQDN.").PlaceHolder("USE-SSL").Bool()
zmqPort = kingpin.Flag("zmq-port", "Port of ZeroMQ.").PlaceHolder("ZMQ-PORT").Default("-1").Int() // Domain: [0,9999]
zmqHost = kingpin.Flag("zmq-host", "(Publicly reachable) address of this zmq server.").String()
adaptor = kingpin.Flag("adaptor", "Storage adaptor, can be \"leveldb\", \"memory\".").Enum("leveldb", "memory")
logLevel = kingpin.Flag("log-level", "Log level, can be \"debug\", \"info\" ,\"warn\", \"error\", \"fatal\", \"panic\".").Enum("debug", "info", "warn", "errors", "fatal", "panic")
handler = kingpin.Flag("handler", "Mode of log handler, can be \"dev\", \"prod\".").Enum("dev", "prod")
remoteStorageHost = kingpin.Flag("remote-storage-host", "Host address of GRPC Server.").String()
remoteStoragePort = kingpin.Flag("remote-storage-port", "Port of GRPC Server.").PlaceHolder("WS-PORT").Default("-1").Int()
ldbPath = kingpin.Flag("leveldb-path", "Path to the leveldb database").String()
// TODO this should be a list of nodes. One node is enough, but if we want reliability we should accept multiple etcd nodes
naseHost = kingpin.Flag("naseHost", "Host where the etcd-server runs").String()
)
func main() {
......@@ -94,12 +103,19 @@ func main() {
// default value means unset -> don't replace
// numbers have negative defaults outside their domain, simple domain checks are implemented
// e.g. lat < -90 is ignored and toml is used (if available)
if *nodeID != "" {
fc.General.nodeID = *nodeID
}
if *lat >= -90 && *lat <= 90 {
fc.Location.Lat = *lat
}
if *lng >= -180 && *lng <= 180 {
fc.Location.Lng = *lng
}
// If no NodeID is provided use lat/long as NodeID
if fc.General.nodeID == "" {
fc.General.nodeID = geohash.Encode(fc.Location.Lat, fc.Location.Lng)
}
if *wsHost != "" {
fc.Server.Host = *wsHost
}
......@@ -109,6 +125,9 @@ func main() {
if *wsSSL {
fc.Server.UseTLS = *wsSSL
}
if *zmqHost != "" {
fc.ZMQ.Host = *zmqHost
}
if *zmqPort >= 0 {
fc.ZMQ.Port = *zmqPort
}
......@@ -130,6 +149,9 @@ func main() {
if *ldbPath != "" {
fc.Ldb.Path = *ldbPath
}
if *naseHost != "" {
fc.NaSe.Host = *naseHost
}
// Setup Logging
// In Dev the ConsoleWriter has nice colored output, but is not very fast.
......@@ -171,15 +193,10 @@ func main() {
log.Info().Msg("No Loglevel specified, using 'info'")
}
var nodeID = geohash.Encode(fc.Location.Lat, fc.Location.Lng)
var is data.Service
var ks keygroup.Service
var rs replication.Service
var i data.Store
var k keygroup.Store
var n replication.Store
switch fc.Storage.Adaptor {
case "leveldb":
......@@ -194,24 +211,20 @@ func main() {
log.Fatal().Msg("unknown storage backend")
}
// Add more options here
k = memorykg.New()
n = memoryrs.New(fc.ZMQ.Port)
is = data.New(i)
c := zmqclient.NewClient()
ks = keygroup.New(k, nodeID)
rs = replhandler.New(n, c)
nase := nameservice.New(fc.General.nodeID, []string{fc.NaSe.Host})
nase.RegisterSelf(replication.Address{Addr: fc.ZMQ.Host}, fc.ZMQ.Port)
rs = replhandler.New(c, *nase, i)
extH := exthandler.New(is, ks, rs)
intH := inthandler.New(is, ks, rs)
extH := exthandler.New(is, rs)
intH := inthandler.New(is, rs)
// Add more options here
zmqH := memoryzmq.New(intH)
zmqH := zmqtointhandler.New(intH)
zmqServer, err := zmqserver.Setup(fc.ZMQ.Port, nodeID, zmqH)
zmqServer, err := zmqserver.Setup(fc.ZMQ.Port, fc.General.nodeID, zmqH)
if err != nil {
panic("Cannot start zmqServer")
......
File mode changed from 100644 to 100755
......@@ -8,18 +8,26 @@ require (
github.com/alecthomas/kingpin v2.2.6+incompatible
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 // indirect
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d // indirect
github.com/coreos/go-semver v0.3.0 // indirect
github.com/gin-contrib/logger v0.0.2
github.com/gin-gonic/autotls v0.0.0-20191129055149-ffaac874b99f
github.com/gin-gonic/gin v1.5.0
github.com/gogo/protobuf v1.3.1 // indirect
github.com/golang/protobuf v1.3.3
github.com/google/uuid v1.1.1 // indirect
github.com/mmcloughlin/geohash v0.9.0
github.com/rs/zerolog v1.17.2
github.com/spf13/afero v1.2.2 // indirect
github.com/stretchr/testify v1.4.0
github.com/syndtr/goleveldb v1.0.0
github.com/zeromq/goczmq v4.1.0+incompatible
golang.org/x/lint v0.0.0-20200130185559-910be7a94367 // indirect
golang.org/x/net v0.0.0-20190827160401-ba9fcec4b297 // indirect
go.etcd.io/etcd v0.5.0-alpha.5.0.20200306183522-221f0cc107cb
go.uber.org/zap v1.15.0 // indirect
golang.org/x/lint v0.0.0-20200302205851-738671d3881b // indirect
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
golang.org/x/tools v0.0.0-20200219054238-753a1d49df85 // indirect
golang.org/x/tools v0.0.0-20200724022722-7017fd6b1305 // indirect
google.golang.org/grpc v1.28.1
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 // indirect
gopkg.in/yaml.v2 v2.2.4 // indirect
honnef.co/go/tools v0.0.1-2019.2.3 // indirect
honnef.co/go/tools v0.0.1-2020.1.4 // indirect
)
This diff is collapsed.
# FReD Naming Service
The naming service uses etcd.io for distributed storage.
The provided docker-compose starts a local instance of etcd.io. To access the CLI enter the docker container `docker exec -it etcd-1 /bin/sh
` and execute `etcdctl`
Clients can reach etcd on port 2379, peers can reach it on port 2380
Prefix Search is also possible, see implementation in nameservice
## Data Representation
Etcd is a key-value store that supports a range parameter for GET request, which could be used to store additional information in the key.
We need to store:
- What Keygroups exist
- All the replicas of a keygroup (and their rights (read/write) ??)
- The current status of a keygoup (running, thombstoned)
Key `kg-[keygroupname]-node-[nodeID]` has: Current status of the replica on this node (initializing, full, readOnly, ...)
Key `kg-[keygroupname]-status` has: String with current status
See if kg exists: Get range `kg-[keygroupname]-`
Get all Replicas: Get range `kg-[keygroupname]-node-`
Every Node will also store information about itself in `node-[nodeid]-status` and `node-nodeid-adress` (format: `ip:port`)
__Problem__: Keygroup names can't end in `-node` or `-status` (could also use other delimitor)
version: "3.7"
services:
etcd:
image: gcr.io/etcd-development/etcd:v3.4.7
container_name: etcd-1
entrypoint: "etcd --name s-1 \
--data-dir /tmp/etcd/s-1 \
--listen-client-urls http://172.26.1.1:2379 \
--advertise-client-urls http://172.26.1.1:2379 \
--listen-peer-urls http://172.26.1.1:2380 \
--initial-advertise-peer-urls http://172.26.1.1:2380 \
--initial-cluster s-1=http://172.26.1.1:2380 \
--initial-cluster-token tkn \
--initial-cluster-state new \
--log-level debug \
--log-outputs stderr"
ports:
- 2379:2379
- 2380:2380
networks:
fredwork:
ipv4_address: 172.26.1.1
networks:
fredwork:
external: true
# docker network create fredwork --gateway 172.26.0.1 --subnet 172.26.0.0/16
\ No newline at end of file
ETCD_VER=v3.4.7
# choose either URL
GOOGLE_URL=https://storage.googleapis.com/etcd
GITHUB_URL=https://github.com/etcd-io/etcd/releases/download
DOWNLOAD_URL=${GOOGLE_URL}
curl -L ${DOWNLOAD_URL}/${ETCD_VER}/etcd-${ETCD_VER}-linux-amd64.tar.gz -o etcd-${ETCD_VER}-linux-amd64.tar.gz
#tar xzvf etcd-${ETCD_VER}-linux-amd64.tar.gz -C etcd-download-test --strip-components=1
#rm -f etcd-${ETCD_VER}-linux-amd64.tar.gz
#
#etcd-download-test/etcd --version
#etcd-download-test/etcdctl version
\ No newline at end of file
......@@ -19,7 +19,7 @@ type Store interface {
// Needs: keygroup, id
Exists(kg commons.KeygroupName, id string) bool
// Needs: keygroup, Returns: err
// Doesnt really need to store the KG itself, that is keygroup/store.go's job.
// Doesnt really need to store the KG itself, that is NaSe's job.
// But might be useful for databases using it
CreateKeygroup(kg commons.KeygroupName) error
// Same as with CreateKeygroup
......
package exthandler
import (
"fmt"
"github.com/rs/zerolog/log"
"gitlab.tu-berlin.de/mcc-fred/fred/pkg/data"
errors "gitlab.tu-berlin.de/mcc-fred/fred/pkg/errors"
"gitlab.tu-berlin.de/mcc-fred/fred/pkg/errors"
"gitlab.tu-berlin.de/mcc-fred/fred/pkg/keygroup"
"gitlab.tu-berlin.de/mcc-fred/fred/pkg/replication"
)
type handler struct {
i data.Service
k keygroup.Service
r replication.Service
dataService data.Service
replService replication.Service
}
// New creates a new handler for external request (i.e. from clients).
func New(i data.Service, k keygroup.Service, r replication.Service) Handler {
// New creates a new handler for external request (dataService.e. from clients).
func New(i data.Service, r replication.Service) Handler {
return &handler{
i: i,
k: k,
r: r,
dataService: i,
replService: r,
}
}
// HandleCreateKeygroup handles requests to the CreateKeygroup endpoint of the client interface.
func (h *handler) HandleCreateKeygroup(k keygroup.Keygroup) error {
if err := h.k.Create(keygroup.Keygroup{
Name: k.Name,
}); err != nil {
log.Err(err).Msg("Exthandler cannot create keygroup with keygroup service")
return err
}
if err := h.i.CreateKeygroup(k.Name); err != nil {
if err := h.dataService.CreateKeygroup(k.Name); err != nil {
log.Err(err).Msg("Exthandler cannot create keygroup with data service")
return err
}
if err := h.r.CreateKeygroup(k); err != nil {
if err := h.replService.CreateKeygroup(k); err != nil {
log.Err(err).Msg("Exthandler cannot create keygroup with replication service")
return err
}
......@@ -50,19 +39,12 @@ func (h *handler) HandleCreateKeygroup(k keygroup.Keygroup) error {
// HandleDeleteKeygroup handles requests to the DeleteKeygroup endpoint of the client interface.
func (h *handler) HandleDeleteKeygroup(k keygroup.Keygroup) error {
if err := h.k.Delete(keygroup.Keygroup{
Name: k.Name,
}); err != nil {
log.Err(err).Msg("Exthandler cannot delete keygroup with keygroup service")
return err
}
if err := h.i.DeleteKeygroup(k.Name); err != nil {
if err := h.dataService.DeleteKeygroup(k.Name); err != nil {
log.Err(err).Msg("Exthandler cannot delete keygroup with data service")
return err
}
if err := h.r.RelayDeleteKeygroup(keygroup.Keygroup{
if err := h.replService.RelayDeleteKeygroup(keygroup.Keygroup{
Name: k.Name,
}); err != nil {
log.Err(err).Msg("Exthandler cannot delete keygroup with replication service")
......@@ -74,13 +56,7 @@ func (h *handler) HandleDeleteKeygroup(k keygroup.Keygroup) error {
// HandleRead handles requests to the Read endpoint of the client interface.
func (h *handler) HandleRead(i data.Item) (data.Item, error) {
if !h.k.Exists(keygroup.Keygroup{
Name: i.Keygroup,
}) {
return i, errors.New(errors.StatusNotFound, "exthandler: keygroup does not exist")
}
result, err := h.i.Read(i.Keygroup, i.ID)
result, err := h.dataService.Read(i.Keygroup, i.ID)
if err != nil {
return i, errors.New(errors.StatusNotFound, "exthandler: item does not exist")
}
......@@ -91,18 +67,12 @@ func (h *handler) HandleRead(i data.Item) (data.Item, error) {
// HandleUpdate handles requests to the Update endpoint of the client interface.
func (h *handler) HandleUpdate(i data.Item) error {
if !h.k.Exists(keygroup.Keygroup{
Name: i.Keygroup,
}) {
return errors.New(errors.StatusNotFound, "exthandler: keygroup does not exist")
}
if err := h.i.Update(i); err != nil {
if err := h.dataService.Update(i); err != nil {
log.Err(err).Msg("Exthandler cannot relay update with data service")
return err
}
if err := h.r.RelayUpdate(i); err != nil {
if err := h.replService.RelayUpdate(i); err != nil {
log.Err(err).Msg("Exthandler cannot relay update with replication service")
return err
}
......@@ -112,18 +82,12 @@ func (h *handler) HandleUpdate(i data.Item) error {
// HandleDelete handles requests to the Delete endpoint of the client interface.
func (h *handler) HandleDelete(i data.Item) error {
if !h.k.Exists(keygroup.Keygroup{
Name: i.Keygroup,
}) {
return errors.New(errors.StatusNotFound, "exthandler: keygroup does not exist")
}
if err := h.i.Delete(i.Keygroup, i.ID); err != nil {
if err := h.dataService.Delete(i.Keygroup, i.ID); err != nil {
log.Err(err).Msg("Exthandler cannot delete data item with data service")
return err
}
if err := h.r.RelayDelete(i); err != nil {
if err := h.replService.RelayDelete(i); err != nil {
log.Err(err).Msg("Exthandler cannot delete data item with data service")
return err
}
......@@ -133,19 +97,13 @@ func (h *handler) HandleDelete(i data.Item) error {
// HandleAddReplica handles requests to the AddKeygroupReplica endpoint of the client interface.
func (h *handler) HandleAddReplica(k keygroup.Keygroup, n replication.Node) error {
if !h.k.Exists(keygroup.Keygroup{
Name: k.Name,
}) {
return errors.New(errors.StatusNotFound, "exthandler: keygroup does not exist")
}
i, err := h.i.ReadAll(k.Name)
i, err := h.dataService.ReadAll(k.Name)
if err != nil {
return err
}
if err := h.r.AddReplica(k, n, i, true); err != nil {
if err := h.replService.AddReplica(k, n, i, true); err != nil {
log.Err(err).Msg("Exthandler cannot add a new keygroup replica")
return err
}
......@@ -155,24 +113,12 @@ func (h *handler) HandleAddReplica(k keygroup.Keygroup, n replication.Node) erro
// HandleGetKeygroupReplica handles requests to the GetKeygroupReplica endpoint of the client interface.
func (h *handler) HandleGetKeygroupReplica(k keygroup.Keygroup) ([]replication.Node, error) {
if !h.k.Exists(keygroup.Keygroup{
Name: k.Name,
}) {
return nil, errors.New(errors.StatusNotFound, "exthandler: keygroup does not exist")
}
return h.r.GetReplica(k)
return h.replService.GetReplica(k)
}
// HandleRemoveReplica handles requests to the RemoveKeygroupReplica endpoint of the client interface.
func (h *handler) HandleRemoveReplica(k keygroup.Keygroup, n replication.Node) error {
if !h.k.Exists(keygroup.Keygroup{
Name: k.Name,
}) {
return errors.New(errors.StatusNotFound, "exthandler: keygroup does not exist")
}
if err := h.r.RemoveReplica(k, n, true); err != nil {
if err := h.replService.RemoveReplica(k, n, true); err != nil {
return err
}
......@@ -181,40 +127,39 @@ func (h *handler) HandleRemoveReplica(k keygroup.Keygroup, n replication.Node) e
// HandleAddNode handles requests to the AddReplica endpoint of the client interface.
func (h *handler) HandleAddNode(n []replication.Node) error {
e := make([]string, len(n))
ec := 0
for _, node := range n {
if err := h.r.AddNode(node, true); err != nil {
log.Err(err).Msgf("Exthandler can not add a new replica node. (node=%#v)", node)
e[ec] = fmt.Sprintf("%v", err)
ec++
}
}
if ec > 0 {
return errors.New(errors.StatusInternalError, fmt.Sprintf("exthandler: %v", e))
}
// TODO remove? IDK
panic("Is it still necessary to add a node? The node can do this on startup by itself")
// e := make([]string, len(n))
// ec := 0
//
// for _, node := range n {
// if err := h.replService.AddNode(node, true); err != nil {
// log.Err(err).Msgf("Exthandler can not add a new replica node. (node=%#v)", node)
// e[ec] = fmt.Sprintf("%v", err)
// ec++
// }
// }
//
// if ec > 0 {
// return errors.New(errors.StatusInternalError, fmt.Sprintf("exthandler: %v", e))
// }
return nil
}
// HandleGetReplica handles requests to the GetAllReplica endpoint of the client interface.
func (h *handler) HandleGetReplica(n replication.Node) (replication.Node, error) {
return h.r.GetNode(n)
return h.replService.GetNode(n)
}
// HandleGetAllReplica handles requests to the GetAllReplica endpoint of the client interface.
func (h *handler) HandleGetAllReplica() ([]replication.Node, error) {
return h.r.GetNodes()
return h.replService.GetNodes()
}
// HandleRemoveNode handles requests to the RemoveReplica endpoint of the client interface.
func (h *handler) HandleRemoveNode(n replication.Node) error {
return h.r.RemoveNode(n, true)
}
// HandleSeed handles seeding of the first replica node.
func (h *handler) HandleSeed(n replication.Node) error {
return h.r.Seed(n)
// TODO why would this be called and what to do now.
//return h.replService.RemoveNode(n, true)
return nil
}
......@@ -20,5 +20,4 @@ type Handler interface {
HandleGetReplica(n replication.Node) (replication.Node, error)
HandleGetAllReplica() ([]replication.Node, error)
HandleRemoveNode(n replication.Node) error
HandleSeed(n replication.Node) error
}
......@@ -14,8 +14,4 @@ type Handler interface {
HandleDelete(i data.Item) error
HandleAddReplica(k keygroup.Keygroup, n replication.Node) error
HandleRemoveReplica(k keygroup.Keygroup, n replication.Node) error
HandleAddNode(n replication.Node) error
HandleRemoveNode(n replication.Node) error
HandleIntroduction(introducer replication.Node, self replication.Node, node []replication.Node) error
HandleDetroduction() error
}
......@@ -12,35 +12,26 @@ import (
)
type handler struct {
i data.Service
k keygroup.Service
r replication.Service
dataService data.Service
replService replication.Service
}
// New creates a new handler for internal request (i.e. from peer nodes or the naming service).
func New(i data.Service, k keygroup.Service, r replication.Service) Handler {
// New creates a new handler for internal request (dataService.e. from peer nodes or the naming service).
func New(i data.Service, r replication.Service) Handler {
return &handler{
i: i,
k: k,
r: r,
dataService: i,
replService: r,
}
}
// HandleCreateKeygroup handles requests to the CreateKeygroup endpoint of the internal interface.
func (h *handler) HandleCreateKeygroup(k keygroup.Keygroup, nodes []replication.Node) error {
if err := h.k.Create(keygroup.Keygroup{
Name: k.Name,
}); err != nil {
log.Err(err).Msg("Inthandler cannot create keygroup with keygroup service")
return err
}
if err := h.i.CreateKeygroup