Commit 1609ff46 authored by t.schirmer's avatar t.schirmer Committed by pfandzelter
Browse files

Add first version of zmq

ZMQ can send multiple message types

add API version identifier

add geohash ID to application, to be used for addressing

add leveldb as a storage driver

add container building to gitlab ci

update makefile to allow container build with right tag

try removing lint from dep

Completely rewrite zmq layer

add zmq package
parent d0322cbb
*
!frednode
\ No newline at end of file
......@@ -62,4 +62,4 @@ container:
- echo $CONTAINER_TAG
- docker login -u $CI_REGISTRY_USER -p $CI_REGISTRY_PASSWORD $CI_REGISTRY
- docker tag mcc-fred/fred gitlab-registry.tubit.tu-berlin.de/mcc-fred/fred/fred:$CONTAINER_TAG
- docker push gitlab-registry.tubit.tu-berlin.de/mcc-fred/fred/fred:$CONTAINER_TAG
\ No newline at end of file
- docker push gitlab-registry.tubit.tu-berlin.de/mcc-fred/fred/fred:$CONTAINER_TAG
......@@ -3,6 +3,7 @@
<words>
<w>keygroup</w>
<w>keygroups</w>
<w>kgname</w>
</words>
</dictionary>
</component>
\ No newline at end of file
......@@ -17,4 +17,4 @@ COPY --from=golang /go/bin/main frednode
EXPOSE 9001
ENTRYPOINT ["./frednode"]
\ No newline at end of file
ENTRYPOINT ["./frednode"]
......@@ -4,6 +4,11 @@
[![coverage report](https://gitlab.tubit.tu-berlin.de/mcc-fred/fred/badges/master/coverage.svg)](https://gitlab.tubit.tu-berlin.de/mcc-fred/fred/commits/master)
[![License MIT](https://img.shields.io/badge/License-MIT-brightgreen.svg)](https://img.shields.io/badge/License-MIT-brightgreen.svg)
## Setup
In order to run zmq please install [zmq](https://zeromq.org/download/) and [czmq](http://czmq.zeromq.org/page:get-the-software).
On Arch, this is done by running `yay -S czmq`
## Git Workflow
Setup git environment with `sh ./env-setup.sh` (installs git hooks). Be sure to have go installed...
......
......@@ -21,7 +21,6 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
# Set Clang as default CC
ENV set_clang /etc/profile.d/set-clang-cc.sh
RUN echo "export CC=clang-7.0" | tee -a ${set_clang} && chmod a+x ${set_clang}
# Install czmq
RUN echo "deb http://download.opensuse.org/repositories/network:/messaging:/zeromq:/release-stable/Debian_10/ ./" >> /etc/apt/sources.list
RUN wget -O - https://download.opensuse.org/repositories/network:/messaging:/zeromq:/release-stable/Debian_10/Release.key | apt-key add -
......@@ -58,4 +57,13 @@ RUN set -eux; \
tar xzvf docker.tgz --strip=1 -C /usr/local/bin docker/docker; \
rm docker.tgz; \
\
docker --version
\ No newline at end of file
docker --version
# Install zmq
RUN echo "deb http://download.opensuse.org/repositories/network:/messaging:/zeromq:/release-stable/Debian_10/ ./" >> /etc/apt/sources.list
RUN wget -O - https://download.opensuse.org/repositories/network:/messaging:/zeromq:/release-stable/Debian_10/Release.key | apt-key add -
RUN apt-get update && apt-get install -y --no-install-recommends \
libczmq-dev libzmq3-dev libsodium23 \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
......@@ -3,18 +3,21 @@ package main
import (
"flag"
"fmt"
"gitlab.tu-berlin.de/mcc-fred/fred/pkg/exthandler"
"gitlab.tu-berlin.de/mcc-fred/fred/pkg/memorykg"
"log"
"github.com/BurntSushi/toml"
"github.com/mmcloughlin/geohash"
"gitlab.tu-berlin.de/mcc-fred/fred/pkg/data"
"gitlab.tu-berlin.de/mcc-fred/fred/pkg/exthandler"
"gitlab.tu-berlin.de/mcc-fred/fred/pkg/inthandler"
"gitlab.tu-berlin.de/mcc-fred/fred/pkg/keygroup"
"gitlab.tu-berlin.de/mcc-fred/fred/pkg/leveldbsd"
"gitlab.tu-berlin.de/mcc-fred/fred/pkg/memorykg"
"gitlab.tu-berlin.de/mcc-fred/fred/pkg/memorysd"
"gitlab.tu-berlin.de/mcc-fred/fred/pkg/memoryzmq"
"gitlab.tu-berlin.de/mcc-fred/fred/pkg/webserver"
"gitlab.tu-berlin.de/mcc-fred/fred/pkg/zmqserver"
)
type fredConfig struct {
......@@ -29,6 +32,9 @@ type fredConfig struct {
Storage struct {
Adaptor string `toml:"adaptor"`
} `toml:"storage"`
ZMQ struct {
Port int `toml:"port"`
} `toml:"zmq"`
}
func main() {
......@@ -45,6 +51,8 @@ func main() {
log.Fatalf("invalid configuration! error: %s", err)
}
var nodeID = geohash.Encode(fc.Location.Lat, fc.Location.Lng)
var is data.Service
var ks keygroup.Service
......@@ -72,14 +80,21 @@ func main() {
log.Fatalf("unknown storage backend")
}
// Add more options here
k = memorykg.New()
is = data.New(i)
ks = keygroup.New(k, geohash.Encode(fc.Location.Lat, fc.Location.Lng))
e := exthandler.New(is, ks)
//inthandler := inthandler.New(is, ks)
ks = keygroup.New(k, nodeID)
addr := fmt.Sprintf("%s:%d", fc.Server.Host, fc.Server.Port)
log.Fatal(webserver.Setup(addr, e))
extH := exthandler.New(is, ks)
intH := inthandler.New(is, ks)
// Add more options here
zmqH := memoryzmq.New(intH)
go log.Fatal(zmqserver.Setup(fc.ZMQ.Port, nodeID, zmqH))
log.Fatal(webserver.Setup(addr, extH))
}
package main
import (
"bufio"
"fmt"
"github.com/zeromq/goczmq"
"gitlab.tu-berlin.de/mcc-fred/fred/pkg/zmqserver"
"log"
"math"
"os"
"time"
)
func main() {
multipart := true
rec, _ := zmqserver.NewReceiver("test", "5555")
var start int64 = 0
var end int64 = 0
go func() {
for {
recvd, err := rec.Receive()
end = time.Now().UnixNano()
log.Printf("Receiver: got message with size: %d with error %v", len(recvd), err)
log.Printf("Took me: %d milliseconds", (end-start)/1000000)
}
}()
// Create a very big string
oneGb := int(math.Pow(10, 9))
b := make([]byte, (oneGb*30)/10)
for i := range b {
b[i] = 0x41 // ASCII "A"
}
if multipart {
dealer, _ := goczmq.NewDealer(fmt.Sprintf("tcp://%s:%d", "localhost", 5555))
// So that the router doesnt send an answer
b = append([]byte{0x00}, b...)
start = time.Now().UnixNano()
parts := 60
for i := 0; i < len(b); i += len(b) / parts { // Send in 10 parts
end := i + len(b)/parts
flag := goczmq.FlagMore
if end >= len(b) {
end = len(b)
flag = goczmq.FlagNone
log.Printf("Sending last frame")
}
bytes := b[i:end]
log.Printf("Sending frame #%d", i)
dealer.SendFrame(bytes, flag)
}
b = nil
//log.Printf("Sending last frame")
//dealer.SendFrame([]byte{0x41}, goczmq.FlagNone) // Send one byte more, why not
log.Printf("Done sending! Press enter to continue:")
bufio.NewReader(os.Stdin).ReadString('\n')
} else {
log.Printf("Converting byte[] to string")
log.Printf("Created a very big string of size %dGB", len(b)/int(math.Pow(10, 9)))
// Send a very big file in one message
sen := zmqserver.NewSender("localhost", 5555)
log.Printf("Sending and waiting for answer")
start = time.Now().UnixNano()
_ = sen.SendBytes(b)
b = nil
log.Printf("Done sending! Press enter to continue:")
bufio.NewReader(os.Stdin).ReadString('\n')
}
}
......@@ -14,5 +14,8 @@ port = 9001
# adaptor can be either "leveldb" or "memory"
adaptor = "leveldb"
[zmq]
port = 5555
[leveldb]
path = "./db"
\ No newline at end of file
......@@ -7,6 +7,7 @@ require (
github.com/gin-gonic/gin v1.5.0
github.com/mmcloughlin/geohash v0.9.0
github.com/syndtr/goleveldb v1.0.0
github.com/zeromq/goczmq v4.1.0+incompatible
golang.org/x/lint v0.0.0-20191125180803-fdd1cda4f05f // indirect
golang.org/x/tools v0.0.0-20191230220329-2aa90c603ae3 // indirect
)
......@@ -50,6 +50,8 @@ github.com/ugorji/go v1.1.7 h1:/68gy2h+1mWMrwZFeD1kQialdSzAb432dtpeJ42ovdo=
github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVMw=
github.com/ugorji/go/codec v1.1.7 h1:2SvQaVZ1ouYrrKKwoSk2pzd4A9evlKJb9oTL+OaLUSs=
github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY=
github.com/zeromq/goczmq v4.1.0+incompatible h1:cGVQaU6kIwwrGso0Pgbl84tzAz/h7FJ3wYQjSonjFFc=
github.com/zeromq/goczmq v4.1.0+incompatible/go.mod h1:1uZybAJoSRCvZMH2rZxEwWBSmC4T7CB/xQOfChwPEzg=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/lint v0.0.0-20191125180803-fdd1cda4f05f h1:J5lckAjkw6qYlOZNj90mLYNTEKDvWeuc1yieZ8qUzUE=
......@@ -66,9 +68,12 @@ golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a h1:aYOabOQFp6Vj6W1F80affTUvO9UxmJRx8K0gsfABByQ=
golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a h1:aYOabOQFp6Vj6W1F80affTUvO9UxmJRx8K0gsfABByQ=
golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/tools v0.0.0-20191125144606-a911d9008d1f h1:kDxGY2VmgABOe55qheT/TFqUMtcTHnomIPS1iv3G4Ms=
golang.org/x/tools v0.0.0-20191125144606-a911d9008d1f/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191230220329-2aa90c603ae3 h1:2+KluhQfJ1YhW+TB1KrISS2SfiG1pLEoseB0D4VF/bo=
......
......@@ -57,4 +57,4 @@ module.exports = class Fred {
return await request(options);
}
};
\ No newline at end of file
};
......@@ -12,4 +12,4 @@ type Handler interface {
HandleRead(i data.Item) (data.Item, error)
HandleUpdate(i data.Item) error
HandleDelete(i data.Item) error
}
\ No newline at end of file
}
......@@ -35,9 +35,9 @@ func New(dbPath string) (s *Storage) {
func (s *Storage) Read(i data.Item) (data.Item, error) {
key := makeKeyName(i.Keygroup, i.ID)
data, err := s.db.Get([]byte(key), nil)
value, err := s.db.Get([]byte(key), nil)
i.Data = string(data)
i.Data = string(value)
return i, err
}
......
......@@ -58,7 +58,7 @@ func TestStorage_CreateKeygroup(t *testing.T) {
},
args{data.Item{
Keygroup: "",
},},
}},
false,
},
}
......
package memoryzmq
import (
"gitlab.tu-berlin.de/mcc-fred/fred/pkg/data"
"gitlab.tu-berlin.de/mcc-fred/fred/pkg/keygroup"
"log"
"gitlab.tu-berlin.de/mcc-fred/fred/pkg/inthandler"
"gitlab.tu-berlin.de/mcc-fred/fred/pkg/zmqserver"
)
type localMemoryMessageHandler struct {
i inthandler.Handler
}
// New creates a new localMemoryMessageHandler that uses the given handler.
func New(h inthandler.Handler) (l zmqserver.MessageHandler) {
l = &localMemoryMessageHandler{
i:h,
}
return l
}
// HandleCreateKeygroup handles requests to the CreateKeygroup endpoint of the internal zmq interface.
func (l *localMemoryMessageHandler) HandleCreateKeygroup(req *zmqserver.Request, from string) {
_ = l.i.HandleCreateKeygroup(keygroup.Keygroup{Name: req.Keygroup})
// TODO Error handling: send a reply message if necessary, the identity of the sender is in req.From
}
// HandleGetValueFromKeygroup handles requests to the Read endpoint of the internal zmq interface.
func (l *localMemoryMessageHandler) HandleGetValueFromKeygroup(req *zmqserver.Request, from string) {
_, err := l.i.HandleRead(data.Item{
Keygroup: req.Keygroup,
ID: req.ID,
})
if err == nil {
log.Fatal("Get from App failed! ", err)
return
}
//C.SendGetAnswer(from, req.Keygroup, val)
}
// HandlePutValueIntoKeygroup handles requests to the Update endpoint of the internal zmq interface.
func (l *localMemoryMessageHandler) HandlePutValueIntoKeygroup(req *zmqserver.Request, from string) {
_ = l.i.HandleUpdate(data.Item{
Keygroup: req.Keygroup,
ID: req.ID,
})
}
// HandleDeleteFromKeygroup handles requests to the Delete endpoint of the internal zmq interface.
func (l *localMemoryMessageHandler) HandleDeleteFromKeygroup(req *zmqserver.Request, from string) {
_ = l.i.HandleDelete(data.Item{
Keygroup: req.Keygroup,
ID: req.ID,
})
}
// HandleDeleteKeygroup handles requests to the DeleteKeygroup endpoint of the internal zmq interface.
func (l *localMemoryMessageHandler) HandleDeleteKeygroup(req *zmqserver.Request, from string) {
_ = l.i.HandleDeleteKeygroup(keygroup.Keygroup{Name: req.Keygroup})
}
package zmqserver
import (
"encoding/json"
"errors"
"github.com/zeromq/goczmq"
"log"
)
// Controller is a ZMQ server that accepts incoming requests within the fred system.
type Controller struct {
poller *goczmq.Poller
receiver *Receiver
senders map[string]Sender
handler MessageHandler
}
// Setup creates a controller and runs it.
func Setup(port string, id string, handler MessageHandler) (err error) {
p, err := goczmq.NewPoller()
if err != nil {
return
}
r, err := NewReceiver(id, port)
if err != nil {
return
}
err = p.Add(r.GetSocket())
if err != nil {
return
}
controller := &Controller{
poller: p,
receiver: r,
senders: make(map[string]Sender),
handler: handler,
}
//if anything happens, we should be destroying this controller...
//defer controller.Destroy()
return pollForever(controller)
}
func pollForever(c *Controller) error {
for {
newMessageSocket := c.poller.Wait(10_000)
if newMessageSocket == nil {
return errors.New("there was no new message for 10 seconds, shutting down")
}
// Receiver has got a new message
request, err := newMessageSocket.RecvMessage()
// TODO error handling
if err != nil {
log.Println(err)
}
// src = identity of socket from dealer
src := string(request[0])
recvd := request[1]
msg := recvd[1:]
// the first byte of answer tells whether an answer is expected
answerType := recvd[0]
// identity of sender can be either:
// - our own receiver socket. This means another node wants to initiate a conservation with us
// - another socket. This must be a sender socket (since we have no other sockets)
// This means we have at some time sent a request to another receiver socket and the other node has replied to our request
// Currently this only handles cases where a reply is expected (eg get request)
// But it should be expanded to also expect error replies and handle them
// We dont want to send the answer in the current thread because that would block polling
if newMessageSocket.Identity() == c.receiver.GetSocket().Identity() {
switch answerType {
case 0x10: // Create keygroup
var req = &Request{}
err = json.Unmarshal(msg, &req)
go c.handler.HandleCreateKeygroup(req, src)
case 0x11: // Delete keygroup
var req = &Request{}
err = json.Unmarshal(msg, &req)
go c.handler.HandleDeleteKeygroup(req, src)
case 0x12: // Get from Keygroup
var req = &Request{}
err = json.Unmarshal(msg, &req)
go c.handler.HandleGetValueFromKeygroup(req, src)
case 0x13: // Put into keygroup
var req = &Request{}
err = json.Unmarshal(msg, &req)
go c.handler.HandlePutValueIntoKeygroup(req, src)
case 0x14: // Delete in Keygroup
var req = &Request{}
err = json.Unmarshal(msg, &req)
go c.handler.HandleDeleteFromKeygroup(req, src)
}
} else {
switch answerType {
case 0x12: // Answer to a get request received
var res = &Response{}
err = json.Unmarshal(msg, &res)
log.Println("Yeah! My get request was answered! ")
log.Println(res)
}
}
}
}
// Destroy the controller
func (c *Controller) Destroy() {
c.poller.Destroy()
c.receiver.Destroy()
}
// sendMessage to the specified IP
func (c *Controller) sendMessage(msType byte, ip string, msg []byte) (err error) {
cSender, exists := c.senders[ip]
if !exists {
c.senders[ip] = *NewSender(ip, 5555)
cSender = c.senders[ip]
err = c.poller.Add(cSender.GetSocket())
}
if err != nil {
return err
}
err = cSender.SendMessageWithType(msType, msg)
return
}
// SendPut that answers a getRequest
func (c *Controller) SendPut(ip, kgname, kgid, value string) (err error) {
req, err := json.Marshal(&Request{
Keygroup: kgname,
ID: kgid,
Value: value,
})
if err != nil {
return
}
err = c.sendMessage(0x13, ip, req)
return
}
// SendGetReply that answers a getRequest.
func (c *Controller) SendGetReply(to string, kgname, id, value string) (err error) {
rep, err := json.Marshal(&Response{
Keygroup: kgname,
ID: id,
Value: value,
})
if err != nil {
return
}
err = c.receiver.ReplyTo(to, 0x12, rep)
return
}
package zmqserver
// MessageHandler provides all methods to handle an incoming ZMQ request.
type MessageHandler interface {
HandleGetValueFromKeygroup(req *Request, from string)
HandlePutValueIntoKeygroup(req *Request, from string)
HandleDeleteFromKeygroup(req *Request, from string)
HandleDeleteKeygroup(req *Request, from string)
HandleCreateKeygroup(req *Request, src string)
}
package zmqserver
import (
"fmt"
"github.com/zeromq/goczmq"
)
// Receiver can receive zmq messages on a zmq socket and respond to them (if necessary).
type Receiver struct {
socket *goczmq.Sock
}
// NewReceiver creates a zmq Receiver that listens on the given port.
func NewReceiver(id string, port string) (rec *Receiver, err error) {
// Create a router socket and bind it.
r, err := goczmq.NewRouter(fmt.Sprintf("tcp://0.0.0.0:%s", port))
if err != nil {
return nil, err
}
r.SetIdentity(fmt.Sprintf("receiver-%s", id))
rec = &Receiver{socket: r}
return
}
// GetSocket of receiver.
func (r *Receiver) GetSocket() (socket *goczmq.Sock) {
return r.socket
}
// ReplyTo a sender that has sent a request that needs an answer.
func (r *Receiver) ReplyTo(id string, msType byte, data []byte) (err error) {
err = r.socket.SendFrame([]byte(id), goczmq.FlagMore)
err = r.socket.SendFrame([]byte{msType}, goczmq.FlagMore)
// TODO if data is too big maybe split this up
err = r.socket.SendFrame(data, goczmq.FlagNone)
return
}
// Destroy the receiver.
func (r *Receiver) Destroy() {
r.Destroy()
}
// Receive receives a message on the Receiver.
func (r *Receiver) Receive() ([][]byte, error) {
return r.socket.RecvMessage()
}
package zmqserver
// Request has all data for a ZMQ request.
type Request struct {
Keygroup string
ID string
Value string
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment