From 4b40434b06f0cd304a0b250a585192a1948f58f1 Mon Sep 17 00:00:00 2001 From: Tobias Pfandzelter Date: Thu, 22 Jul 2021 20:03:53 +0200 Subject: [PATCH] add a proper load balancer --- cmd/frednode/main.go | 18 +- cmd/fredproxy/main.go | 124 ++++++ go.mod | 3 +- go.sum | 3 - pkg/api/api.go | 90 +++-- pkg/fred/fred_test.go | 2 +- pkg/peering/client.go | 54 ++- pkg/peering/server.go | 33 +- pkg/proxy/api.go | 453 ++++++++++++++++++++++ pkg/proxy/peering.go | 180 +++++++++ pkg/proxy/proxy.go | 25 ++ proxy.Dockerfile | 35 ++ tests/3NodeTest/cmd/main/replica_suite.go | 3 + tests/runner/haproxy.Dockerfile | 3 - tests/runner/nodeA-haproxy.cfg | 29 -- tests/runner/nodeA.yml | 26 +- tests/runner/nodeB-debug.yml | 3 + tests/runner/nodeB.yml | 3 + tests/runner/nodeC.yml | 3 + 19 files changed, 1004 insertions(+), 86 deletions(-) create mode 100644 cmd/fredproxy/main.go create mode 100644 pkg/proxy/api.go create mode 100644 pkg/proxy/peering.go create mode 100644 pkg/proxy/proxy.go create mode 100644 proxy.Dockerfile delete mode 100644 tests/runner/haproxy.Dockerfile delete mode 100644 tests/runner/nodeA-haproxy.cfg diff --git a/cmd/frednode/main.go b/cmd/frednode/main.go index 2747d32..61ea0f6 100644 --- a/cmd/frednode/main.go +++ b/cmd/frednode/main.go @@ -43,8 +43,11 @@ type fredConfig struct { Adaptor string `env:"STORAGE_ADAPTOR"` } Peering struct { - Host string `env:"PEERING_HOST"` - HostProxy string `env:"PEERING_PROXY"` + Host string `env:"PEERING_HOST"` + Proxy string `env:"PEERING_PROXY"` + Cert string `env:"PEERING_CERT"` + Key string `env:"PEERING_KEY"` + CA string `env:"PEERING_CA"` } Log struct { Level string `env:"LOG_LEVEL"` @@ -103,7 +106,10 @@ func parseArgs() (fc fredConfig) { // this is the address that grpc will bind to (locally) flag.StringVar(&(fc.Peering.Host), "peer-host", "", "local address of this peering server. (Env: PEERING_HOST)") // this is the address that the node will advertise to nase - flag.StringVar(&(fc.Peering.HostProxy), "peer-host-proxy", "", "Publicly reachable address of this peering server (if behind a proxy). (Env: PEERING_PROXY)") + flag.StringVar(&(fc.Peering.Proxy), "peer-host-proxy", "", "Publicly reachable address of this peering server (if behind a proxy). (Env: PEERING_PROXY)") + flag.StringVar(&(fc.Peering.Cert), "peer-cert", "", "Certificate for peering connection. (Env: PEERING_CERT)") + flag.StringVar(&(fc.Peering.Key), "peer-key", "", "Key file for peering connection. (Env: PEERING_KEY)") + flag.StringVar(&(fc.Peering.CA), "peer-ca", "", "Certificate authority root certificate file for peering connections. (Env: PEERING)") // storage configuration flag.StringVar(&(fc.Storage.Adaptor), "adaptor", "", "Storage adaptor, can be \"remote\", \"badgerdb\", \"memory\", \"dynamo\". (Env: STORAGE_ADAPTOR)") @@ -275,7 +281,7 @@ func main() { } log.Debug().Msg("Starting Interconnection Client...") - c := peering.NewClient() + c := peering.NewClient(fc.Peering.Cert, fc.Peering.Key, fc.Peering.CA) log.Debug().Msg("Starting NaSe Client...") @@ -292,7 +298,7 @@ func main() { Client: c, NaSe: n, PeeringHost: fc.Peering.Host, - PeeringHostProxy: fc.Peering.HostProxy, + PeeringHostProxy: fc.Peering.Proxy, ExternalHost: fc.Server.Host, ExternalHostProxy: fc.Server.Proxy, TriggerCert: fc.Trigger.Cert, @@ -301,7 +307,7 @@ func main() { }) log.Debug().Msg("Starting Interconnection Server...") - is := peering.NewServer(fc.Peering.Host, f.I) + is := peering.NewServer(fc.Peering.Host, f.I, fc.Peering.Cert, fc.Peering.Key, fc.Peering.CA) log.Debug().Msg("Starting GRPC Server for Client (==Externalconnection)...") isProxied := fc.Server.Proxy != "" && fc.Server.Host != fc.Server.Proxy diff --git a/cmd/fredproxy/main.go b/cmd/fredproxy/main.go new file mode 100644 index 0000000..b36a6a0 --- /dev/null +++ b/cmd/fredproxy/main.go @@ -0,0 +1,124 @@ +package main + +import ( + "flag" + "fmt" + "net" + "os" + "os/signal" + "strings" + "syscall" + + "git.tu-berlin.de/mcc-fred/fred/pkg/proxy" + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" +) + +func main() { + // we need: + // - a port for the grpc client frontend + // - a port for the grpc peering endpoint + // - a list of addresses (we're simplifying this: you can't set different ports for your different machines) + // - probably also the usual certificate stuff + // - log level and handler as always + clientPort := flag.Int("client-port", 0, "port to bind proxy to for client access") + peeringPort := flag.Int("peering-port", 0, "port to bind to for peering access") + machines := flag.String("machines", "", "list of machine addresses, comma-separated") + loghandler := flag.String("log-handler", "dev", "dev=>pretty, prod=>json") + loglevel := flag.String("log-level", "debug", "Log level, can be \"debug\", \"info\" ,\"warn\", \"error\", \"fatal\", \"panic\".") + peeringCert := flag.String("peer-cert", "", "Certificate for peering connection.") + peeringKey := flag.String("peer-key", "", "Key file for peering connection.") + peeringCA := flag.String("peer-ca", "", "Certificate authority root certificate file for peering connections.") + apiCert := flag.String("api-cert", "", "Certificate for API connection.") + apiKey := flag.String("api-key", "", "Key file for API connection.") + apiCA := flag.String("api-ca", "", "Certificate authority root certificate file for API connections.") + + flag.Parse() + + // Setup Logging + // In Dev the ConsoleWriter has nice colored output, but is not very fast. + // In Prod the default handler is used. It writes json to stdout and is very fast. + if *loghandler == "dev" { + log.Logger = log.Output( + zerolog.ConsoleWriter{ + Out: os.Stderr, + NoColor: false, + }, + ) + } else if *loghandler != "prod" { + log.Fatal().Msg("Log Handler has to be either dev or prod") + } + + switch *loglevel { + case "debug": + zerolog.SetGlobalLevel(zerolog.DebugLevel) + case "info": + zerolog.SetGlobalLevel(zerolog.InfoLevel) + case "warn": + zerolog.SetGlobalLevel(zerolog.WarnLevel) + case "error": + zerolog.SetGlobalLevel(zerolog.ErrorLevel) + case "fatal": + zerolog.SetGlobalLevel(zerolog.FatalLevel) + case "panic": + zerolog.SetGlobalLevel(zerolog.PanicLevel) + default: + zerolog.SetGlobalLevel(zerolog.DebugLevel) + log.Info().Msg("No Loglevel specified, using 'debug'") + } + + // parse machines + p := proxy.NewProxy(strings.Split(*machines, ",")) + + pS, err := proxy.StartPeeringProxy(p, *peeringPort, *peeringCert, *peeringKey, *peeringCA) + if err != nil { + log.Fatal().Err(err).Msg(err.Error()) + } + + aS, err := proxy.StartAPIProxy(p, *clientPort, *apiCert, *apiKey, *apiCA) + if err != nil { + log.Fatal().Err(err).Msg(err.Error()) + } + + pLis, err := net.Listen("tcp", fmt.Sprintf(":%d", *peeringPort)) + + if err != nil { + log.Fatal().Err(err).Msg("Failed to listen") + } + + go func() { + log.Debug().Msgf("Peering proxy starting to listen on :%d, proxying to %s", *peeringPort, *machines) + err := pS.Serve(pLis) + + // if Serve returns without an error, we probably intentionally closed it + if err != nil { + log.Fatal().Msgf("PeeringProxy Server exited: %s", err.Error()) + } + }() + + aLis, err := net.Listen("tcp", fmt.Sprintf(":%d", *clientPort)) + + if err != nil { + log.Fatal().Err(err).Msg("Failed to listen") + } + + go func() { + log.Debug().Msgf("API proxy starting to listen on :%d, proxying to %s", *clientPort, *machines) + err := aS.Serve(aLis) + + // if Serve returns without an error, we probably intentionally closed it + if err != nil { + log.Fatal().Msgf("APIProxy Server exited: %s", err.Error()) + } + }() + + quit := make(chan os.Signal, 1) + signal.Notify(quit, + os.Interrupt, + syscall.SIGTERM) + + <-quit + log.Info().Msg("FReD Proxy Closing Now!") + pS.GracefulStop() + aS.GracefulStop() +} diff --git a/go.mod b/go.mod index 2a00959..4fac3ed 100644 --- a/go.mod +++ b/go.mod @@ -3,16 +3,15 @@ module git.tu-berlin.de/mcc-fred/fred go 1.16 require ( - github.com/allegro/bigcache v1.2.1 github.com/aws/aws-sdk-go v1.34.13 github.com/caarlos0/env/v6 v6.4.0 + github.com/cespare/xxhash/v2 v2.1.1 github.com/dgraph-io/badger/v3 v3.2103.0 github.com/dgraph-io/ristretto v0.1.0 github.com/go-errors/errors v1.1.1 github.com/mmcloughlin/geohash v0.9.0 github.com/rs/zerolog v1.17.2 github.com/stretchr/testify v1.7.0 - go.etcd.io/etcd/api/v3 v3.5.0 go.etcd.io/etcd/client/pkg/v3 v3.5.0 go.etcd.io/etcd/client/v3 v3.5.0 go.etcd.io/etcd/server/v3 v3.5.0 diff --git a/go.sum b/go.sum index 6619cb3..b111b03 100644 --- a/go.sum +++ b/go.sum @@ -24,8 +24,6 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= -github.com/allegro/bigcache v1.2.1 h1:hg1sY1raCwic3Vnsvje6TT7/pnZba83LeFck5NrFKSc= -github.com/allegro/bigcache v1.2.1/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= @@ -81,7 +79,6 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgraph-io/badger/v3 v3.2103.0 h1:abkD2EnP3+6Tj8h5LI1y00dJ9ICKTIAzvG9WmZ8S2c4= github.com/dgraph-io/badger/v3 v3.2103.0/go.mod h1:GHMCYxuDWyzbHkh4k3yyg4PM61tJPFfEGSMbE3Vd5QE= -github.com/dgraph-io/ristretto v0.0.4-0.20210309073149-3836124cdc5a h1:1cMMkx3iegOzbAxVl1ZZQRHk+gaCf33Y5/4I3l0NNSg= github.com/dgraph-io/ristretto v0.0.4-0.20210309073149-3836124cdc5a/go.mod h1:MIonLggsKgZLUSt414ExgwNtlOL5MuEoAJP514mwGe8= github.com/dgraph-io/ristretto v0.1.0 h1:Jv3CGQHp9OjuMBSne1485aDpUkTKEcUqF+jm/LuerPI= github.com/dgraph-io/ristretto v0.1.0/go.mod h1:fux0lOrBhrVCJd3lcTHsIJhq1T2rokOu6v9Vcb3Q9ug= diff --git a/pkg/api/api.go b/pkg/api/api.go index 428f726..778f30c 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -14,6 +14,7 @@ import ( "github.com/rs/zerolog/log" "google.golang.org/grpc" "google.golang.org/grpc/credentials" + "google.golang.org/grpc/metadata" "google.golang.org/grpc/peer" ) @@ -23,6 +24,8 @@ type Server struct { roots *x509.CertPool isProxied bool proxyHost string + proxyPort string + *grpc.Server } @@ -38,30 +41,30 @@ var ( ) // CheckCert checks the certificate from the given gRPC context for validity and returns the Common Name -func (s *Server) CheckCert(ctx context.Context) (name string, err error) { +func (s *Server) CheckCert(ctx context.Context) (string, error) { // get peer information p, ok := peer.FromContext(ctx) if !ok { - return name, errors.Errorf("no peer found") + return "", errors.Errorf("no peer found") } // get TLS credential information for this connection tlsAuth, ok := p.AuthInfo.(credentials.TLSInfo) if !ok { - return name, errors.Errorf("unexpected peer transport credentials") + return "", errors.Errorf("unexpected peer transport credentials") } // check that the certificate exists if len(tlsAuth.State.VerifiedChains) == 0 || len(tlsAuth.State.VerifiedChains[0]) == 0 { - return name, errors.Errorf("could not verify peer certificate: %v", tlsAuth.State) + return "", errors.Errorf("could not verify peer certificate: %v", tlsAuth.State) } host, _, err := net.SplitHostPort(p.Addr.String()) if err != nil { - return name, errors.New(err) + return "", errors.New(err) } // verify the certificate: @@ -80,36 +83,57 @@ func (s *Server) CheckCert(ctx context.Context) (name string, err error) { }) if err != nil { - return name, errors.New(err) + return "", errors.New(err) } - } else { - // ELSE we sit behind a proxy and the proxy should be the one tunneling the gRPC connection to us - // hence if we can ensure that it is indeed the proxy that is talking to us and not someone who has found - // their way into the network, we can be sure that the proxy/LB has checked the certificate - _, err = tlsAuth.State.VerifiedChains[0][0].Verify(x509.VerifyOptions{ - Roots: s.roots, - CurrentTime: time.Now(), - Intermediates: x509.NewCertPool(), - KeyUsages: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth}, - }) - if err != nil { - return name, errors.New(err) + // Check subject common name exists and return it as the user name for that client + name := tlsAuth.State.VerifiedChains[0][0].Subject.CommonName + + if name == "" { + return "", errors.Errorf("invalid subject common name") } + + log.Debug().Msgf("CheckCert: GRPC Context Certificate Name: %s", name) + + return name, nil } + // ELSE we sit behind a proxy and the proxy should be the one tunneling the gRPC connection to us + // hence if we can ensure that it is indeed the proxy that is talking to us and not someone who has found + // their way into the network, we can be sure that the proxy/LB has checked the certificate + // in this case, the proxy will give the user name to us as a header (thanks, proxy!) - // Check subject common name exists and return it as the user name for that client - if name = tlsAuth.State.VerifiedChains[0][0].Subject.CommonName; name == "" { - return name, errors.Errorf("invalid subject common name") + if host != s.proxyHost { + return "", errors.Errorf("node is proxied but got request not from proxy (%s instead of %s)", host, s.proxyHost) } - log.Debug().Msgf("CheckCert: GRPC Context Certificate Name: %s", name) + _, err = tlsAuth.State.VerifiedChains[0][0].Verify(x509.VerifyOptions{ + Roots: s.roots, + CurrentTime: time.Now(), + Intermediates: x509.NewCertPool(), + KeyUsages: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth}, + }) - return name, nil + if err != nil { + return "", errors.New(err) + } + + md, ok := metadata.FromIncomingContext(ctx) + + if !ok { + return "", errors.Errorf("no metadata could be found for proxied request") + } + + u := md.Get("user") + + if len(u) != 1 { + return "", errors.Errorf("invalid user header for proxied request") + } + + return u[0], nil } // NewServer creates a new Server for requests from Fred Clients -func NewServer(host string, handler fred.ExtHandler, cert string, key string, caCert string, isProxied bool, proxyHost string) *Server { +func NewServer(host string, handler fred.ExtHandler, cert string, key string, caCert string, isProxied bool, proxy string) *Server { // Load server's certificate and private key serverCert, err := tls.LoadX509KeyPair(cert, key) @@ -136,12 +160,20 @@ func NewServer(host string, handler fred.ExtHandler, cert string, key string, ca MinVersion: tls.VersionTLS12, } + proxyHost, proxyPort, err := net.SplitHostPort(proxy) + + if isProxied && err != nil { + log.Fatal().Err(err).Msg("Failed to parse proxy host and port") + return nil + } + s := &Server{ - handler, - rootCAs, - isProxied, - proxyHost, - grpc.NewServer( + e: handler, + roots: rootCAs, + isProxied: isProxied, + proxyHost: proxyHost, + proxyPort: proxyPort, + Server: grpc.NewServer( grpc.Creds(credentials.NewTLS(config)), ), } diff --git a/pkg/fred/fred_test.go b/pkg/fred/fred_test.go index 7bf6d80..e8a3d35 100644 --- a/pkg/fred/fred_test.go +++ b/pkg/fred/fred_test.go @@ -82,7 +82,7 @@ func TestMain(m *testing.M) { config := fred.Config{ Store: badgerdb.NewMemory(), - Client: peering.NewClient(), + Client: peering.NewClient(certBasePath+"nodeA.crt", certBasePath+"nodeA.key", certBasePath+"ca.crt"), NaSe: n, PeeringHost: "127.0.0.1:8000", PeeringHostProxy: "", diff --git a/pkg/peering/client.go b/pkg/peering/client.go index cec366c..5d0dc06 100644 --- a/pkg/peering/client.go +++ b/pkg/peering/client.go @@ -2,29 +2,69 @@ package peering import ( "context" + "crypto/tls" + "crypto/x509" + "io/ioutil" "git.tu-berlin.de/mcc-fred/fred/pkg/fred" "git.tu-berlin.de/mcc-fred/fred/proto/peering" "github.com/go-errors/errors" "github.com/rs/zerolog/log" "google.golang.org/grpc" + "google.golang.org/grpc/credentials" ) // Client is an peering client to communicate with peers. type Client struct { - conn map[string]peering.NodeClient + conn map[string]peering.NodeClient + credentials credentials.TransportCredentials } // NewClient creates a new empty client to communicate with peers. -func NewClient() *Client { +func NewClient(certFile string, keyFile string, caFile string) *Client { + cert, err := tls.LoadX509KeyPair(certFile, keyFile) + + if err != nil { + log.Fatal().Err(err).Msg("Cannot load certificates") + + return nil + } + + // Create a new cert pool and add our own CA certificate + rootCAs, err := x509.SystemCertPool() + + if err != nil { + log.Fatal().Err(err).Msg("Cannot load root certificates") + return nil + } + + loaded, err := ioutil.ReadFile(caFile) + + if err != nil { + log.Fatal().Msgf("unexpected missing certfile: %v", err) + } + + rootCAs.AppendCertsFromPEM(loaded) + + tlsConfig := &tls.Config{ + Certificates: []tls.Certificate{cert}, + MinVersion: tls.VersionTLS12, + RootCAs: rootCAs, + } + return &Client{ - conn: make(map[string]peering.NodeClient), + conn: make(map[string]peering.NodeClient), + credentials: credentials.NewTLS(tlsConfig), } } // getClient creates a new connection to a server or uses an existing one. func (c *Client) getClient(host string) (peering.NodeClient, error) { - conn, err := grpc.Dial(host, grpc.WithInsecure()) + if client, ok := c.conn[host]; ok { + return client, nil + } + + conn, err := grpc.Dial(host, grpc.WithTransportCredentials(c.credentials)) if err != nil { log.Error().Err(err).Msg("Cannot create Grpc connection") @@ -33,9 +73,9 @@ func (c *Client) getClient(host string) (peering.NodeClient, error) { log.Debug().Msgf("Interclient: Created Connection to %s", host) - c.conn[host] = peering.NewNodeClient(conn) - - return c.conn[host], nil + client := peering.NewNodeClient(conn) + c.conn[host] = client + return client, nil } // Destroy currently does nothing, but might delete open connections if they are implemented diff --git a/pkg/peering/server.go b/pkg/peering/server.go index e67da24..11fe6b5 100644 --- a/pkg/peering/server.go +++ b/pkg/peering/server.go @@ -2,11 +2,15 @@ package peering import ( "context" + "crypto/tls" + "crypto/x509" + "io/ioutil" "net" "git.tu-berlin.de/mcc-fred/fred/proto/peering" "github.com/rs/zerolog/log" "google.golang.org/grpc" + "google.golang.org/grpc/credentials" "git.tu-berlin.de/mcc-fred/fred/pkg/fred" ) @@ -18,9 +22,34 @@ type Server struct { } // NewServer creates a new Server for communication to the inthandler from other nodes -func NewServer(host string, handler fred.IntHandler) *Server { +func NewServer(host string, handler fred.IntHandler, certFile string, keyFile string, caFile string) *Server { + // Load server's certificate and private key + cert, err := tls.LoadX509KeyPair(certFile, keyFile) - s := &Server{handler, grpc.NewServer()} + if err != nil { + log.Fatal().Msgf("could not load key pair: %v", err) + } + + // Create a new cert pool and add our own CA certificate + rootCAs := x509.NewCertPool() + + loaded, err := ioutil.ReadFile(caFile) + + if err != nil { + log.Fatal().Msgf("unexpected missing certfile: %v", err) + } + + rootCAs.AppendCertsFromPEM(loaded) + + // Create the credentials and return it + config := &tls.Config{ + Certificates: []tls.Certificate{cert}, + ClientAuth: tls.RequireAndVerifyClientCert, + ClientCAs: rootCAs, + MinVersion: tls.VersionTLS12, + } + + s := &Server{handler, grpc.NewServer(grpc.Creds(credentials.NewTLS(config)))} lis, err := net.Listen("tcp", host) if err != nil { diff --git a/pkg/proxy/api.go b/pkg/proxy/api.go new file mode 100644 index 0000000..5189520 --- /dev/null +++ b/pkg/proxy/api.go @@ -0,0 +1,453 @@ +package proxy + +import ( + "context" + "crypto/tls" + "crypto/x509" + "fmt" + "io/ioutil" + "net" + "time" + + "git.tu-berlin.de/mcc-fred/fred/proto/client" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/metadata" + "google.golang.org/grpc/peer" +) + +type APIProxy struct { + p *Proxy + port int + conn map[string]client.ClientClient + opts grpc.DialOption + roots *x509.CertPool +} + +func StartAPIProxy(p *Proxy, port int, cert string, key string, caCert string) (*grpc.Server, error) { + // Load server's certificate and private key + serverCert, err := tls.LoadX509KeyPair(cert, key) + + if err != nil { + return nil, err + } + + // Create a new cert pool and add our own CA certificate + rootCAs := x509.NewCertPool() + + loaded, err := ioutil.ReadFile(caCert) + + if err != nil { + return nil, err + } + + rootCAs.AppendCertsFromPEM(loaded) + // Create the credentials and return it + + config := &tls.Config{ + Certificates: []tls.Certificate{serverCert}, + ClientAuth: tls.RequireAndVerifyClientCert, + ClientCAs: rootCAs, + RootCAs: rootCAs, + MinVersion: tls.VersionTLS12, + } + + a := &APIProxy{ + p: p, + port: port, + conn: make(map[string]client.ClientClient), + opts: grpc.WithTransportCredentials(credentials.NewTLS(config)), + roots: rootCAs, + } + + s := grpc.NewServer(grpc.Creds(credentials.NewTLS(config))) + + client.RegisterClientServer(s, a) + + return s, nil +} + +func (a *APIProxy) getConn(keygroup string) (client.ClientClient, error) { + host := a.p.getHost(keygroup) + + if c, ok := a.conn[host]; ok { + return c, nil + } + + conn, err := grpc.Dial(fmt.Sprintf("%s:%d", host, a.port), a.opts) + + if err != nil { + return nil, err + } + + c := client.NewClientClient(conn) + + a.conn[host] = c + return c, nil +} + +func (a *APIProxy) getAny() (client.ClientClient, error) { + host := a.p.getAny() + + if c, ok := a.conn[host]; ok { + return c, nil + } + + conn, err := grpc.Dial(fmt.Sprintf("%s:%d", host, a.port), a.opts) + + if err != nil { + return nil, err + } + + c := client.NewClientClient(conn) + + a.conn[host] = c + return c, nil +} + +func (a *APIProxy) addUserHeader(ctx context.Context) (context.Context, error) { + // since we pass each request on to the backend with our proxy, we lose user information + // we add that back on by adding the "user" header to the context + + // get peer information + p, ok := peer.FromContext(ctx) + + if !ok { + return ctx, fmt.Errorf("no peer found") + } + + // get TLS credential information for this connection + tlsAuth, ok := p.AuthInfo.(credentials.TLSInfo) + + if !ok { + return ctx, fmt.Errorf("unexpected peer transport credentials") + } + + // check that the certificate exists + if len(tlsAuth.State.VerifiedChains) == 0 || len(tlsAuth.State.VerifiedChains[0]) == 0 { + return ctx, fmt.Errorf("could not verify peer certificate: %v", tlsAuth.State) + } + + host, _, err := net.SplitHostPort(p.Addr.String()) + + if err != nil { + return ctx, err + } + + // verify the certificate: + // IF we are not proxied and communicate with the client directly: + // 1) it should be issued by a CA in our root CA pool + // 2) any intermediates are valid for us + // 3) the certificate should be valid for client authentication + // 4) the certificate should have the clients address as a SAN + _, err = tlsAuth.State.VerifiedChains[0][0].Verify(x509.VerifyOptions{ + Roots: a.roots, + CurrentTime: time.Now(), + Intermediates: x509.NewCertPool(), + KeyUsages: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth}, + DNSName: host, + }) + + if err != nil { + return ctx, err + } + + // Check subject common name exists and return it as the user name for that client + name := tlsAuth.State.VerifiedChains[0][0].Subject.CommonName + + if name == "" { + return ctx, fmt.Errorf("invalid subject common name") + } + + md, ok := metadata.FromIncomingContext(ctx) + + if !ok { + return ctx, fmt.Errorf("no metadata could be found for proxied request") + } + + newMD := md.Copy() + newMD.Set("user", name) + + return metadata.NewOutgoingContext(ctx, newMD), nil + +} + +// CreateKeygroup calls this method on the exthandler +func (a *APIProxy) CreateKeygroup(ctx context.Context, req *client.CreateKeygroupRequest) (*client.StatusResponse, error) { + + c, err := a.getConn(req.Keygroup) + + if err != nil { + return nil, err + } + + ctx, err = a.addUserHeader(ctx) + if err != nil { + return nil, err + } + + return c.CreateKeygroup(ctx, req) +} + +// DeleteKeygroup calls this method on the exthandler +func (a *APIProxy) DeleteKeygroup(ctx context.Context, req *client.DeleteKeygroupRequest) (*client.StatusResponse, error) { + + c, err := a.getConn(req.Keygroup) + + if err != nil { + return nil, err + } + + ctx, err = a.addUserHeader(ctx) + if err != nil { + return nil, err + } + + return c.DeleteKeygroup(ctx, req) +} + +// Read calls this method on the exthandler +func (a *APIProxy) Read(ctx context.Context, req *client.ReadRequest) (*client.ReadResponse, error) { + + c, err := a.getConn(req.Keygroup) + + if err != nil { + return nil, err + } + + ctx, err = a.addUserHeader(ctx) + if err != nil { + return nil, err + } + + return c.Read(ctx, req) +} + +// Scan calls this method on the exthandler +func (a *APIProxy) Scan(ctx context.Context, req *client.ScanRequest) (*client.ScanResponse, error) { + + c, err := a.getConn(req.Keygroup) + + if err != nil { + return nil, err + } + + ctx, err = a.addUserHeader(ctx) + if err != nil { + return nil, err + } + + return c.Scan(ctx, req) +} + +// Append calls this method on the exthandler +func (a *APIProxy) Append(ctx context.Context, req *client.AppendRequest) (*client.AppendResponse, error) { + + c, err := a.getConn(req.Keygroup) + + if err != nil { + return nil, err + } + + ctx, err = a.addUserHeader(ctx) + if err != nil { + return nil, err + } + + return c.Append(ctx, req) +} + +// Update calls this method on the exthandler +func (a *APIProxy) Update(ctx context.Context, req *client.UpdateRequest) (*client.StatusResponse, error) { + + c, err := a.getConn(req.Keygroup) + + if err != nil { + return nil, err + } + + ctx, err = a.addUserHeader(ctx) + if err != nil { + return nil, err + } + + return c.Update(ctx, req) +} + +// Delete calls this method on the exthandler +func (a *APIProxy) Delete(ctx context.Context, req *client.DeleteRequest) (*client.StatusResponse, error) { + + c, err := a.getConn(req.Keygroup) + + if err != nil { + return nil, err + } + + ctx, err = a.addUserHeader(ctx) + if err != nil { + return nil, err + } + + return c.Delete(ctx, req) +} + +// AddReplica calls this method on the exthandler +func (a *APIProxy) AddReplica(ctx context.Context, req *client.AddReplicaRequest) (*client.StatusResponse, error) { + + c, err := a.getConn(req.Keygroup) + + if err != nil { + return nil, err + } + + ctx, err = a.addUserHeader(ctx) + if err != nil { + return nil, err + } + + return c.AddReplica(ctx, req) +} + +// GetKeygroupReplica calls this method on the exthandler +func (a *APIProxy) GetKeygroupReplica(ctx context.Context, req *client.GetKeygroupReplicaRequest) (*client.GetKeygroupReplicaResponse, error) { + c, err := a.getConn(req.Keygroup) + + if err != nil { + return nil, err + } + + ctx, err = a.addUserHeader(ctx) + if err != nil { + return nil, err + } + + return c.GetKeygroupReplica(ctx, req) +} + +// RemoveReplica calls this method on the exthandler +func (a *APIProxy) RemoveReplica(ctx context.Context, req *client.RemoveReplicaRequest) (*client.StatusResponse, error) { + c, err := a.getConn(req.Keygroup) + + if err != nil { + return nil, err + } + + ctx, err = a.addUserHeader(ctx) + if err != nil { + return nil, err + } + + return c.RemoveReplica(ctx, req) +} + +// GetReplica calls this method on the exthandler +func (a *APIProxy) GetReplica(ctx context.Context, req *client.GetReplicaRequest) (*client.GetReplicaResponse, error) { + c, err := a.getAny() + + if err != nil { + return nil, err + } + + ctx, err = a.addUserHeader(ctx) + if err != nil { + return nil, err + } + + return c.GetReplica(ctx, req) +} + +// GetAllReplica calls this method on the exthandler +func (a *APIProxy) GetAllReplica(ctx context.Context, req *client.GetAllReplicaRequest) (*client.GetAllReplicaResponse, error) { + c, err := a.getAny() + + if err != nil { + return nil, err + } + + ctx, err = a.addUserHeader(ctx) + if err != nil { + return nil, err + } + + return c.GetAllReplica(ctx, req) +} + +// GetKeygroupTriggers calls this method on the exthandler +func (a *APIProxy) GetKeygroupTriggers(ctx context.Context, req *client.GetKeygroupTriggerRequest) (*client.GetKeygroupTriggerResponse, error) { + c, err := a.getConn(req.Keygroup) + + if err != nil { + return nil, err + } + + ctx, err = a.addUserHeader(ctx) + if err != nil { + return nil, err + } + + return c.GetKeygroupTriggers(ctx, req) +} + +// AddTrigger calls this method on the exthandler +func (a *APIProxy) AddTrigger(ctx context.Context, req *client.AddTriggerRequest) (*client.StatusResponse, error) { + c, err := a.getConn(req.Keygroup) + + if err != nil { + return nil, err + } + + ctx, err = a.addUserHeader(ctx) + if err != nil { + return nil, err + } + + return c.AddTrigger(ctx, req) +} + +// RemoveTrigger calls this method on the exthandler +func (a *APIProxy) RemoveTrigger(ctx context.Context, req *client.RemoveTriggerRequest) (*client.StatusResponse, error) { + c, err := a.getConn(req.Keygroup) + + if err != nil { + return nil, err + } + + ctx, err = a.addUserHeader(ctx) + if err != nil { + return nil, err + } + + return c.RemoveTrigger(ctx, req) +} + +// AddUser calls this method on the exthandler +func (a *APIProxy) AddUser(ctx context.Context, req *client.UserRequest) (*client.StatusResponse, error) { + c, err := a.getConn(req.Keygroup) + + if err != nil { + return nil, err + } + + ctx, err = a.addUserHeader(ctx) + if err != nil { + return nil, err + } + + return c.AddUser(ctx, req) +} + +// RemoveUser calls this method on the exthandler +func (a *APIProxy) RemoveUser(ctx context.Context, req *client.UserRequest) (*client.StatusResponse, error) { + c, err := a.getConn(req.Keygroup) + + if err != nil { + return nil, err + } + + ctx, err = a.addUserHeader(ctx) + if err != nil { + return nil, err + } + + return c.RemoveUser(ctx, req) +} diff --git a/pkg/proxy/peering.go b/pkg/proxy/peering.go new file mode 100644 index 0000000..b623c23 --- /dev/null +++ b/pkg/proxy/peering.go @@ -0,0 +1,180 @@ +package proxy + +import ( + "context" + "crypto/tls" + "crypto/x509" + "fmt" + "io/ioutil" + + "git.tu-berlin.de/mcc-fred/fred/proto/peering" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials" +) + +type PeeringProxy struct { + p *Proxy + port int + conn map[string]peering.NodeClient + opts grpc.DialOption +} + +func StartPeeringProxy(p *Proxy, port int, cert string, key string, caCert string) (*grpc.Server, error) { + // Load server's certificate and private key + serverCert, err := tls.LoadX509KeyPair(cert, key) + + if err != nil { + return nil, err + } + + // Create a new cert pool and add our own CA certificate + rootCAs := x509.NewCertPool() + + loaded, err := ioutil.ReadFile(caCert) + + if err != nil { + return nil, err + } + + rootCAs.AppendCertsFromPEM(loaded) + // Create the credentials and return it + + config := &tls.Config{ + Certificates: []tls.Certificate{serverCert}, + ClientAuth: tls.RequireAndVerifyClientCert, + ClientCAs: rootCAs, + RootCAs: rootCAs, + MinVersion: tls.VersionTLS12, + } + + a := &PeeringProxy{ + p: p, + port: port, + conn: make(map[string]peering.NodeClient), + opts: grpc.WithTransportCredentials(credentials.NewTLS(config)), + } + + s := grpc.NewServer(grpc.Creds(credentials.NewTLS(config))) + + peering.RegisterNodeServer(s, a) + + return s, nil +} + +func (p *PeeringProxy) getConn(keygroup string) (peering.NodeClient, error) { + host := p.p.getHost(keygroup) + + if c, ok := p.conn[host]; ok { + return c, nil + } + + conn, err := grpc.Dial(fmt.Sprintf("%s:%d", host, p.port), p.opts) + + if err != nil { + return nil, err + } + + c := peering.NewNodeClient(conn) + + p.conn[host] = c + return c, nil +} + +// CreateKeygroup calls this Method on the Inthandler +func (p *PeeringProxy) CreateKeygroup(ctx context.Context, req *peering.CreateKeygroupRequest) (*peering.Empty, error) { + c, err := p.getConn(req.Keygroup) + + if err != nil { + return nil, err + } + + return c.CreateKeygroup(ctx, req) +} + +// DeleteKeygroup calls this Method on the Inthandler +func (p *PeeringProxy) DeleteKeygroup(ctx context.Context, req *peering.DeleteKeygroupRequest) (*peering.Empty, error) { + c, err := p.getConn(req.Keygroup) + + if err != nil { + return nil, err + } + + return c.DeleteKeygroup(ctx, req) +} + +// PutItem calls HandleUpdate on the Inthandler +func (p *PeeringProxy) PutItem(ctx context.Context, req *peering.PutItemRequest) (*peering.Empty, error) { + c, err := p.getConn(req.Keygroup) + + if err != nil { + return nil, err + } + + return c.PutItem(ctx, req) +} + +// AppendItem calls HandleAppend on the Inthandler +func (p *PeeringProxy) AppendItem(ctx context.Context, req *peering.AppendItemRequest) (*peering.Empty, error) { + c, err := p.getConn(req.Keygroup) + + if err != nil { + return nil, err + } + + return c.AppendItem(ctx, req) +} + +// GetItem has no implementation +func (p *PeeringProxy) GetItem(ctx context.Context, req *peering.GetItemRequest) (*peering.GetItemResponse, error) { + c, err := p.getConn(req.Keygroup) + + if err != nil { + return nil, err + } + + return c.GetItem(ctx, req) +} + +// DeleteItem has no implementation +func (p *PeeringProxy) DeleteItem(ctx context.Context, req *peering.DeleteItemRequest) (*peering.Empty, error) { + c, err := p.getConn(req.Keygroup) + + if err != nil { + return nil, err + } + + return c.DeleteItem(ctx, req) +} + +// GetAllItems has no implementation +func (p *PeeringProxy) GetAllItems(ctx context.Context, req *peering.GetAllItemsRequest) (*peering.GetAllItemsResponse, error) { + c, err := p.getConn(req.Keygroup) + + if err != nil { + return nil, err + } + + return c.GetAllItems(ctx, req) +} + +// AddReplica calls this Method on the Inthandler +func (p *PeeringProxy) AddReplica(ctx context.Context, req *peering.AddReplicaRequest) (*peering.Empty, error) { + c, err := p.getConn(req.Keygroup) + + if err != nil { + return nil, err + } + + return c.AddReplica(ctx, req) +} + +// RemoveReplica calls this Method on the Inthandler +func (p *PeeringProxy) RemoveReplica(ctx context.Context, req *peering.RemoveReplicaRequest) (*peering.Empty, error) { + c, err := p.getConn(req.Keygroup) + + if err != nil { + return nil, err + } + + return c.RemoveReplica(ctx, req) +} diff --git a/pkg/proxy/proxy.go b/pkg/proxy/proxy.go new file mode 100644 index 0000000..7faaf65 --- /dev/null +++ b/pkg/proxy/proxy.go @@ -0,0 +1,25 @@ +package proxy + +import ( + "math/rand" + + "github.com/cespare/xxhash/v2" +) + +type Proxy struct { + hosts []string +} + +func NewProxy(hosts []string) *Proxy { + return &Proxy{ + hosts: hosts, + } +} + +func (p *Proxy) getHost(keygroup string) string { + return p.hosts[xxhash.Sum64String(keygroup)%uint64(len(p.hosts))] +} + +func (p *Proxy) getAny() string { + return p.hosts[rand.Intn(len(p.hosts))] +} diff --git a/proxy.Dockerfile b/proxy.Dockerfile new file mode 100644 index 0000000..81a0ed0 --- /dev/null +++ b/proxy.Dockerfile @@ -0,0 +1,35 @@ +# Shamelessly stolen from the original dockerfile +# building the binary +FROM golang:1.16-alpine as golang + +LABEL maintainer="tp@mcc.tu-berlin.de" + +WORKDIR /go/src/git.tu-berlin.de/mcc-fred/fred/ + +RUN apk update && apk add ca-certificates && rm -rf /var/cache/apk/* +RUN update-ca-certificates + +# Make an extra layer for the installed packages so that they dont have to be downloaded everytime +COPY go.mod . +COPY go.sum . + +RUN go mod download + +COPY cmd/fredproxy cmd/fredproxy +COPY pkg pkg +COPY proto proto + +# Static build required so that we can safely copy the binary over. +RUN CGO_ENABLED=0 go install ./cmd/fredproxy/ + +# actual Docker image +FROM scratch + +WORKDIR / +COPY --from=golang /go/bin/fredproxy fredproxy + +EXPOSE 5555 +EXPOSE 9001 + +ENV PATH=. +ENTRYPOINT ["fredproxy"] diff --git a/tests/3NodeTest/cmd/main/replica_suite.go b/tests/3NodeTest/cmd/main/replica_suite.go index d0485c7..200a79d 100644 --- a/tests/3NodeTest/cmd/main/replica_suite.go +++ b/tests/3NodeTest/cmd/main/replica_suite.go @@ -21,6 +21,9 @@ func (t *ReplicaSuite) RunTests() { logNodeAction(t.c.nodeA, "Adding nodeB as Replica node for KGRep") t.c.nodeA.AddKeygroupReplica("KGRep", t.c.nodeB.ID, 0, false) + logNodeAction(t.c.nodeB, "Putting a valuein KGRep") + t.c.nodeB.PutItem("KGRep", "KGRepItem", "val", false) + logNodeAction(t.c.nodeB, "Deleting the value from KGRep") t.c.nodeB.DeleteItem("KGRep", "KGRepItem", false) diff --git a/tests/runner/haproxy.Dockerfile b/tests/runner/haproxy.Dockerfile deleted file mode 100644 index e4946ec..0000000 --- a/tests/runner/haproxy.Dockerfile +++ /dev/null @@ -1,3 +0,0 @@ -FROM haproxy:lts-alpine - -COPY nodeA-haproxy.cfg /usr/local/etc/haproxy/haproxy.cfg \ No newline at end of file diff --git a/tests/runner/nodeA-haproxy.cfg b/tests/runner/nodeA-haproxy.cfg deleted file mode 100644 index d8136b9..0000000 --- a/tests/runner/nodeA-haproxy.cfg +++ /dev/null @@ -1,29 +0,0 @@ -defaults - timeout connect 10000ms - timeout client 60000ms - timeout server 60000ms - -frontend fe_grpc - mode tcp - bind *:5555 npn spdy/2 alpn h2,http/1.1 - default_backend be_grpc - -frontend fe_web - mode tcp - bind *:9001 npn spdy/2 alpn h2,http/1.1 - default_backend be_web - -# proxy between 172.26.1.101, 172.26.1.102, 172.26.1.103 -backend be_web - mode tcp - balance roundrobin - server nodeA1 172.26.1.101:9001 check sni req.ssl_sni - server nodeA2 172.26.1.102:9001 check sni req.ssl_sni - server nodeA3 172.26.1.103:9001 check sni req.ssl_sni - -backend be_grpc - mode tcp - balance roundrobin - server nodeA1 172.26.1.101:5555 check sni req.ssl_sni - server nodeA2 172.26.1.102:5555 check sni req.ssl_sni - server nodeA3 172.26.1.103:5555 check sni req.ssl_sni diff --git a/tests/runner/nodeA.yml b/tests/runner/nodeA.yml index 7e9ff7a..f46a022 100644 --- a/tests/runner/nodeA.yml +++ b/tests/runner/nodeA.yml @@ -1,20 +1,29 @@ version: "3.7" services: - # this haproxy proxies between 172.26.1.101, 172.26.1.102, 172.26.1.103 nodeAproxy: depends_on: - nodeA-1 - nodeA-2 - nodeA-3 build: - context: . - dockerfile: haproxy.Dockerfile + context: ../../ + dockerfile: proxy.Dockerfile image: fred/fredproxy:local container_name: nodeAproxy + command: "--log-level '${LOG_LEVEL}' \ + --client-port 9001 \ + --peering-port 5555 \ + --machines 172.26.1.101,172.26.1.102,172.26.1.103 \ + --api-cert /cert/nodeA.crt \ + --api-key /cert/nodeA.key \ + --api-ca /cert/ca.crt + --peer-cert /cert/nodeA.crt \ + --peer-key /cert/nodeA.key \ + --peer-ca /cert/ca.crt" volumes: - ./certificates/nodeA.crt:/cert/nodeA.crt - - ./certificates/nodeA.key:/cert/nodeA.crt.key + - ./certificates/nodeA.key:/cert/nodeA.key - ./certificates/ca.crt:/cert/ca.crt networks: fredwork: @@ -36,6 +45,9 @@ services: --cert /cert/nodeA.crt \ --key /cert/nodeA.key \ --ca-file /cert/ca.crt \ + --peer-cert /cert/nodeA.crt \ + --peer-key /cert/nodeA.key \ + --peer-ca /cert/ca.crt \ --adaptor remote \ --cpuprofile /profiles/cpuprofile.pprof \ --memprofile /profiles/memprofile.pprof \ @@ -80,6 +92,9 @@ services: --cert /cert/nodeA.crt \ --key /cert/nodeA.key \ --ca-file /cert/ca.crt \ + --peer-cert /cert/nodeA.crt \ + --peer-key /cert/nodeA.key \ + --peer-ca /cert/ca.crt \ --adaptor remote \ --nase-host https://172.26.6.1:2379 \ --nase-cert /cert/nodeA.crt \ @@ -120,6 +135,9 @@ services: --cert /cert/nodeA.crt \ --key /cert/nodeA.key \ --ca-file /cert/ca.crt \ + --peer-cert /cert/nodeA.crt \ + --peer-key /cert/nodeA.key \ + --peer-ca /cert/ca.crt \ --adaptor remote \ --nase-host https://172.26.6.1:2379 \ --nase-cert /cert/nodeA.crt \ diff --git a/tests/runner/nodeB-debug.yml b/tests/runner/nodeB-debug.yml index 67ca86d..9ad7d83 100644 --- a/tests/runner/nodeB-debug.yml +++ b/tests/runner/nodeB-debug.yml @@ -21,6 +21,9 @@ services: --cert /cert/nodeB.crt \ --key /cert/nodeB.key \ --ca-file /cert/ca.crt \ + --peer-cert /cert/nodeB.crt \ + --peer-key /cert/nodeB.key \ + --peer-ca /cert/ca.crt \ --adaptor remote \ --nase-host https://172.26.6.1:2379 \ --nase-cert /cert/nodeB.crt \ diff --git a/tests/runner/nodeB.yml b/tests/runner/nodeB.yml index ec0fe0b..8b5023e 100644 --- a/tests/runner/nodeB.yml +++ b/tests/runner/nodeB.yml @@ -15,6 +15,9 @@ services: --cert /cert/nodeB.crt \ --key /cert/nodeB.key \ --ca-file /cert/ca.crt \ + --peer-cert /cert/nodeB.crt \ + --peer-key /cert/nodeB.key \ + --peer-ca /cert/ca.crt \ --adaptor remote \ --nase-host https://172.26.6.1:2379 \ --nase-cert /cert/nodeB.crt \ diff --git a/tests/runner/nodeC.yml b/tests/runner/nodeC.yml index 3f58376..7af094e 100644 --- a/tests/runner/nodeC.yml +++ b/tests/runner/nodeC.yml @@ -15,6 +15,9 @@ services: --cert /cert/nodeC.crt \ --key /cert/nodeC.key \ --ca-file /cert/ca.crt \ + --peer-cert /cert/nodeC.crt \ + --peer-key /cert/nodeC.key \ + --peer-ca /cert/ca.crt \ --adaptor remote \ --nase-host https://172.26.6.1:2379 \ --nase-cert /cert/nodeC.crt \ -- GitLab