Commit 6c86fad2 authored by pfandzelter's avatar pfandzelter
Browse files

Merge branch 'tp/fred-proxy' into 'main'

Add Custom Load Balancer

See merge request !147
parents 1dd9dea3 4b40434b
Pipeline #18467 passed with stages
in 18 minutes and 32 seconds
......@@ -43,8 +43,11 @@ type fredConfig struct {
Adaptor string `env:"STORAGE_ADAPTOR"`
}
Peering struct {
Host string `env:"PEERING_HOST"`
HostProxy string `env:"PEERING_PROXY"`
Host string `env:"PEERING_HOST"`
Proxy string `env:"PEERING_PROXY"`
Cert string `env:"PEERING_CERT"`
Key string `env:"PEERING_KEY"`
CA string `env:"PEERING_CA"`
}
Log struct {
Level string `env:"LOG_LEVEL"`
......@@ -103,7 +106,10 @@ func parseArgs() (fc fredConfig) {
// this is the address that grpc will bind to (locally)
flag.StringVar(&(fc.Peering.Host), "peer-host", "", "local address of this peering server. (Env: PEERING_HOST)")
// this is the address that the node will advertise to nase
flag.StringVar(&(fc.Peering.HostProxy), "peer-host-proxy", "", "Publicly reachable address of this peering server (if behind a proxy). (Env: PEERING_PROXY)")
flag.StringVar(&(fc.Peering.Proxy), "peer-host-proxy", "", "Publicly reachable address of this peering server (if behind a proxy). (Env: PEERING_PROXY)")
flag.StringVar(&(fc.Peering.Cert), "peer-cert", "", "Certificate for peering connection. (Env: PEERING_CERT)")
flag.StringVar(&(fc.Peering.Key), "peer-key", "", "Key file for peering connection. (Env: PEERING_KEY)")
flag.StringVar(&(fc.Peering.CA), "peer-ca", "", "Certificate authority root certificate file for peering connections. (Env: PEERING)")
// storage configuration
flag.StringVar(&(fc.Storage.Adaptor), "adaptor", "", "Storage adaptor, can be \"remote\", \"badgerdb\", \"memory\", \"dynamo\". (Env: STORAGE_ADAPTOR)")
......@@ -275,7 +281,7 @@ func main() {
}
log.Debug().Msg("Starting Interconnection Client...")
c := peering.NewClient()
c := peering.NewClient(fc.Peering.Cert, fc.Peering.Key, fc.Peering.CA)
log.Debug().Msg("Starting NaSe Client...")
......@@ -292,7 +298,7 @@ func main() {
Client: c,
NaSe: n,
PeeringHost: fc.Peering.Host,
PeeringHostProxy: fc.Peering.HostProxy,
PeeringHostProxy: fc.Peering.Proxy,
ExternalHost: fc.Server.Host,
ExternalHostProxy: fc.Server.Proxy,
TriggerCert: fc.Trigger.Cert,
......@@ -301,7 +307,7 @@ func main() {
})
log.Debug().Msg("Starting Interconnection Server...")
is := peering.NewServer(fc.Peering.Host, f.I)
is := peering.NewServer(fc.Peering.Host, f.I, fc.Peering.Cert, fc.Peering.Key, fc.Peering.CA)
log.Debug().Msg("Starting GRPC Server for Client (==Externalconnection)...")
isProxied := fc.Server.Proxy != "" && fc.Server.Host != fc.Server.Proxy
......
package main
import (
"flag"
"fmt"
"net"
"os"
"os/signal"
"strings"
"syscall"
"git.tu-berlin.de/mcc-fred/fred/pkg/proxy"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)
func main() {
// we need:
// - a port for the grpc client frontend
// - a port for the grpc peering endpoint
// - a list of addresses (we're simplifying this: you can't set different ports for your different machines)
// - probably also the usual certificate stuff
// - log level and handler as always
clientPort := flag.Int("client-port", 0, "port to bind proxy to for client access")
peeringPort := flag.Int("peering-port", 0, "port to bind to for peering access")
machines := flag.String("machines", "", "list of machine addresses, comma-separated")
loghandler := flag.String("log-handler", "dev", "dev=>pretty, prod=>json")
loglevel := flag.String("log-level", "debug", "Log level, can be \"debug\", \"info\" ,\"warn\", \"error\", \"fatal\", \"panic\".")
peeringCert := flag.String("peer-cert", "", "Certificate for peering connection.")
peeringKey := flag.String("peer-key", "", "Key file for peering connection.")
peeringCA := flag.String("peer-ca", "", "Certificate authority root certificate file for peering connections.")
apiCert := flag.String("api-cert", "", "Certificate for API connection.")
apiKey := flag.String("api-key", "", "Key file for API connection.")
apiCA := flag.String("api-ca", "", "Certificate authority root certificate file for API connections.")
flag.Parse()
// Setup Logging
// In Dev the ConsoleWriter has nice colored output, but is not very fast.
// In Prod the default handler is used. It writes json to stdout and is very fast.
if *loghandler == "dev" {
log.Logger = log.Output(
zerolog.ConsoleWriter{
Out: os.Stderr,
NoColor: false,
},
)
} else if *loghandler != "prod" {
log.Fatal().Msg("Log Handler has to be either dev or prod")
}
switch *loglevel {
case "debug":
zerolog.SetGlobalLevel(zerolog.DebugLevel)
case "info":
zerolog.SetGlobalLevel(zerolog.InfoLevel)
case "warn":
zerolog.SetGlobalLevel(zerolog.WarnLevel)
case "error":
zerolog.SetGlobalLevel(zerolog.ErrorLevel)
case "fatal":
zerolog.SetGlobalLevel(zerolog.FatalLevel)
case "panic":
zerolog.SetGlobalLevel(zerolog.PanicLevel)
default:
zerolog.SetGlobalLevel(zerolog.DebugLevel)
log.Info().Msg("No Loglevel specified, using 'debug'")
}
// parse machines
p := proxy.NewProxy(strings.Split(*machines, ","))
pS, err := proxy.StartPeeringProxy(p, *peeringPort, *peeringCert, *peeringKey, *peeringCA)
if err != nil {
log.Fatal().Err(err).Msg(err.Error())
}
aS, err := proxy.StartAPIProxy(p, *clientPort, *apiCert, *apiKey, *apiCA)
if err != nil {
log.Fatal().Err(err).Msg(err.Error())
}
pLis, err := net.Listen("tcp", fmt.Sprintf(":%d", *peeringPort))
if err != nil {
log.Fatal().Err(err).Msg("Failed to listen")
}
go func() {
log.Debug().Msgf("Peering proxy starting to listen on :%d, proxying to %s", *peeringPort, *machines)
err := pS.Serve(pLis)
// if Serve returns without an error, we probably intentionally closed it
if err != nil {
log.Fatal().Msgf("PeeringProxy Server exited: %s", err.Error())
}
}()
aLis, err := net.Listen("tcp", fmt.Sprintf(":%d", *clientPort))
if err != nil {
log.Fatal().Err(err).Msg("Failed to listen")
}
go func() {
log.Debug().Msgf("API proxy starting to listen on :%d, proxying to %s", *clientPort, *machines)
err := aS.Serve(aLis)
// if Serve returns without an error, we probably intentionally closed it
if err != nil {
log.Fatal().Msgf("APIProxy Server exited: %s", err.Error())
}
}()
quit := make(chan os.Signal, 1)
signal.Notify(quit,
os.Interrupt,
syscall.SIGTERM)
<-quit
log.Info().Msg("FReD Proxy Closing Now!")
pS.GracefulStop()
aS.GracefulStop()
}
......@@ -24,8 +24,6 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho=
github.com/allegro/bigcache v1.2.1 h1:hg1sY1raCwic3Vnsvje6TT7/pnZba83LeFck5NrFKSc=
github.com/allegro/bigcache v1.2.1/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/armon/circbuf v0.0.0-20150827004946-bbbad097214e/go.mod h1:3U/XgcO3hCbHZ8TKRvWD2dDTCfh9M9ya+I9JpbB7O8o=
github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8=
......@@ -81,7 +79,6 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dgraph-io/badger/v3 v3.2103.0 h1:abkD2EnP3+6Tj8h5LI1y00dJ9ICKTIAzvG9WmZ8S2c4=
github.com/dgraph-io/badger/v3 v3.2103.0/go.mod h1:GHMCYxuDWyzbHkh4k3yyg4PM61tJPFfEGSMbE3Vd5QE=
github.com/dgraph-io/ristretto v0.0.4-0.20210309073149-3836124cdc5a h1:1cMMkx3iegOzbAxVl1ZZQRHk+gaCf33Y5/4I3l0NNSg=
github.com/dgraph-io/ristretto v0.0.4-0.20210309073149-3836124cdc5a/go.mod h1:MIonLggsKgZLUSt414ExgwNtlOL5MuEoAJP514mwGe8=
github.com/dgraph-io/ristretto v0.1.0 h1:Jv3CGQHp9OjuMBSne1485aDpUkTKEcUqF+jm/LuerPI=
github.com/dgraph-io/ristretto v0.1.0/go.mod h1:fux0lOrBhrVCJd3lcTHsIJhq1T2rokOu6v9Vcb3Q9ug=
......
......@@ -14,6 +14,7 @@ import (
"github.com/rs/zerolog/log"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
)
......@@ -23,6 +24,8 @@ type Server struct {
roots *x509.CertPool
isProxied bool
proxyHost string
proxyPort string
*grpc.Server
}
......@@ -38,30 +41,30 @@ var (
)
// CheckCert checks the certificate from the given gRPC context for validity and returns the Common Name
func (s *Server) CheckCert(ctx context.Context) (name string, err error) {
func (s *Server) CheckCert(ctx context.Context) (string, error) {
// get peer information
p, ok := peer.FromContext(ctx)
if !ok {
return name, errors.Errorf("no peer found")
return "", errors.Errorf("no peer found")
}
// get TLS credential information for this connection
tlsAuth, ok := p.AuthInfo.(credentials.TLSInfo)
if !ok {
return name, errors.Errorf("unexpected peer transport credentials")
return "", errors.Errorf("unexpected peer transport credentials")
}
// check that the certificate exists
if len(tlsAuth.State.VerifiedChains) == 0 || len(tlsAuth.State.VerifiedChains[0]) == 0 {
return name, errors.Errorf("could not verify peer certificate: %v", tlsAuth.State)
return "", errors.Errorf("could not verify peer certificate: %v", tlsAuth.State)
}
host, _, err := net.SplitHostPort(p.Addr.String())
if err != nil {
return name, errors.New(err)
return "", errors.New(err)
}
// verify the certificate:
......@@ -80,36 +83,57 @@ func (s *Server) CheckCert(ctx context.Context) (name string, err error) {
})
if err != nil {
return name, errors.New(err)
return "", errors.New(err)
}
} else {
// ELSE we sit behind a proxy and the proxy should be the one tunneling the gRPC connection to us
// hence if we can ensure that it is indeed the proxy that is talking to us and not someone who has found
// their way into the network, we can be sure that the proxy/LB has checked the certificate
_, err = tlsAuth.State.VerifiedChains[0][0].Verify(x509.VerifyOptions{
Roots: s.roots,
CurrentTime: time.Now(),
Intermediates: x509.NewCertPool(),
KeyUsages: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth},
})
if err != nil {
return name, errors.New(err)
// Check subject common name exists and return it as the user name for that client
name := tlsAuth.State.VerifiedChains[0][0].Subject.CommonName
if name == "" {
return "", errors.Errorf("invalid subject common name")
}
log.Debug().Msgf("CheckCert: GRPC Context Certificate Name: %s", name)
return name, nil
}
// ELSE we sit behind a proxy and the proxy should be the one tunneling the gRPC connection to us
// hence if we can ensure that it is indeed the proxy that is talking to us and not someone who has found
// their way into the network, we can be sure that the proxy/LB has checked the certificate
// in this case, the proxy will give the user name to us as a header (thanks, proxy!)
// Check subject common name exists and return it as the user name for that client
if name = tlsAuth.State.VerifiedChains[0][0].Subject.CommonName; name == "" {
return name, errors.Errorf("invalid subject common name")
if host != s.proxyHost {
return "", errors.Errorf("node is proxied but got request not from proxy (%s instead of %s)", host, s.proxyHost)
}
log.Debug().Msgf("CheckCert: GRPC Context Certificate Name: %s", name)
_, err = tlsAuth.State.VerifiedChains[0][0].Verify(x509.VerifyOptions{
Roots: s.roots,
CurrentTime: time.Now(),
Intermediates: x509.NewCertPool(),
KeyUsages: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth},
})
return name, nil
if err != nil {
return "", errors.New(err)
}
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return "", errors.Errorf("no metadata could be found for proxied request")
}
u := md.Get("user")
if len(u) != 1 {
return "", errors.Errorf("invalid user header for proxied request")
}
return u[0], nil
}
// NewServer creates a new Server for requests from Fred Clients
func NewServer(host string, handler fred.ExtHandler, cert string, key string, caCert string, isProxied bool, proxyHost string) *Server {
func NewServer(host string, handler fred.ExtHandler, cert string, key string, caCert string, isProxied bool, proxy string) *Server {
// Load server's certificate and private key
serverCert, err := tls.LoadX509KeyPair(cert, key)
......@@ -136,12 +160,20 @@ func NewServer(host string, handler fred.ExtHandler, cert string, key string, ca
MinVersion: tls.VersionTLS12,
}
proxyHost, proxyPort, err := net.SplitHostPort(proxy)
if isProxied && err != nil {
log.Fatal().Err(err).Msg("Failed to parse proxy host and port")
return nil
}
s := &Server{
handler,
rootCAs,
isProxied,
proxyHost,
grpc.NewServer(
e: handler,
roots: rootCAs,
isProxied: isProxied,
proxyHost: proxyHost,
proxyPort: proxyPort,
Server: grpc.NewServer(
grpc.Creds(credentials.NewTLS(config)),
),
}
......
......@@ -82,7 +82,7 @@ func TestMain(m *testing.M) {
config := fred.Config{
Store: badgerdb.NewMemory(),
Client: peering.NewClient(),
Client: peering.NewClient(certBasePath+"nodeA.crt", certBasePath+"nodeA.key", certBasePath+"ca.crt"),
NaSe: n,
PeeringHost: "127.0.0.1:8000",
PeeringHostProxy: "",
......
......@@ -2,29 +2,69 @@ package peering
import (
"context"
"crypto/tls"
"crypto/x509"
"io/ioutil"
"git.tu-berlin.de/mcc-fred/fred/pkg/fred"
"git.tu-berlin.de/mcc-fred/fred/proto/peering"
"github.com/go-errors/errors"
"github.com/rs/zerolog/log"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
)
// Client is an peering client to communicate with peers.
type Client struct {
conn map[string]peering.NodeClient
conn map[string]peering.NodeClient
credentials credentials.TransportCredentials
}
// NewClient creates a new empty client to communicate with peers.
func NewClient() *Client {
func NewClient(certFile string, keyFile string, caFile string) *Client {
cert, err := tls.LoadX509KeyPair(certFile, keyFile)
if err != nil {
log.Fatal().Err(err).Msg("Cannot load certificates")
return nil
}
// Create a new cert pool and add our own CA certificate
rootCAs, err := x509.SystemCertPool()
if err != nil {
log.Fatal().Err(err).Msg("Cannot load root certificates")
return nil
}
loaded, err := ioutil.ReadFile(caFile)
if err != nil {
log.Fatal().Msgf("unexpected missing certfile: %v", err)
}
rootCAs.AppendCertsFromPEM(loaded)
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{cert},
MinVersion: tls.VersionTLS12,
RootCAs: rootCAs,
}
return &Client{
conn: make(map[string]peering.NodeClient),
conn: make(map[string]peering.NodeClient),
credentials: credentials.NewTLS(tlsConfig),
}
}
// getClient creates a new connection to a server or uses an existing one.
func (c *Client) getClient(host string) (peering.NodeClient, error) {
conn, err := grpc.Dial(host, grpc.WithInsecure())
if client, ok := c.conn[host]; ok {
return client, nil
}
conn, err := grpc.Dial(host, grpc.WithTransportCredentials(c.credentials))
if err != nil {
log.Error().Err(err).Msg("Cannot create Grpc connection")
......@@ -33,9 +73,9 @@ func (c *Client) getClient(host string) (peering.NodeClient, error) {
log.Debug().Msgf("Interclient: Created Connection to %s", host)
c.conn[host] = peering.NewNodeClient(conn)
return c.conn[host], nil
client := peering.NewNodeClient(conn)
c.conn[host] = client
return client, nil
}
// Destroy currently does nothing, but might delete open connections if they are implemented
......
......@@ -2,11 +2,15 @@ package peering
import (
"context"
"crypto/tls"
"crypto/x509"
"io/ioutil"
"net"
"git.tu-berlin.de/mcc-fred/fred/proto/peering"
"github.com/rs/zerolog/log"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"git.tu-berlin.de/mcc-fred/fred/pkg/fred"
)
......@@ -18,9 +22,34 @@ type Server struct {
}
// NewServer creates a new Server for communication to the inthandler from other nodes
func NewServer(host string, handler fred.IntHandler) *Server {
func NewServer(host string, handler fred.IntHandler, certFile string, keyFile string, caFile string) *Server {
// Load server's certificate and private key
cert, err := tls.LoadX509KeyPair(certFile, keyFile)
s := &Server{handler, grpc.NewServer()}
if err != nil {
log.Fatal().Msgf("could not load key pair: %v", err)
}
// Create a new cert pool and add our own CA certificate
rootCAs := x509.NewCertPool()
loaded, err := ioutil.ReadFile(caFile)
if err != nil {
log.Fatal().Msgf("unexpected missing certfile: %v", err)
}
rootCAs.AppendCertsFromPEM(loaded)
// Create the credentials and return it
config := &tls.Config{
Certificates: []tls.Certificate{cert},
ClientAuth: tls.RequireAndVerifyClientCert,
ClientCAs: rootCAs,
MinVersion: tls.VersionTLS12,
}
s := &Server{handler, grpc.NewServer(grpc.Creds(credentials.NewTLS(config)))}
lis, err := net.Listen("tcp", host)
if err != nil {
......
package proxy
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"net"
"time"
"git.tu-berlin.de/mcc-fred/fred/proto/client"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
)
type APIProxy struct {
p *Proxy
port int
conn map[string]client.ClientClient
opts grpc.DialOption
roots *x509.CertPool
}
func StartAPIProxy(p *Proxy, port int, cert string, key string, caCert string) (*grpc.Server, error) {
// Load server's certificate and private key
serverCert, err := tls.LoadX509KeyPair(cert, key)
if err != nil {
return nil, err
}
// Create a new cert pool and add our own CA certificate
rootCAs := x509.NewCertPool()
loaded, err := ioutil.ReadFile(caCert)
if err != nil {
return nil, err
}
rootCAs.AppendCertsFromPEM(loaded)
// Create the credentials and return it
config := &tls.Config{
Certificates: []tls.Certificate{serverCert},
ClientAuth: tls.RequireAndVerifyClientCert,
ClientCAs: rootCAs,
RootCAs: rootCAs,
MinVersion: tls.VersionTLS12,
}
a := &APIProxy{
p: p,
port: port,
conn: make(map[string]client.ClientClient),
opts: grpc.WithTransportCredentials(credentials.NewTLS(config)),
roots: rootCAs,
}
s := grpc.NewServer(grpc.Creds(credentials.NewTLS(config)))
client.RegisterClientServer(s, a)
return s, nil
}
func (a *APIProxy) getConn(keygroup string) (client.ClientClient, error) {
host := a.p.getHost(keygroup)
if c, ok := a.conn[host]; ok {
return c, nil
}
conn, err := grpc.Dial(fmt.Sprintf("%s:%d", host, a.port), a.opts)
if err != nil {
return nil, err
}
c := client.NewClientClient(conn)
a.conn[host] = c
return c, nil
}
func (a *APIProxy) getAny() (client.ClientClient, error) {
host := a.p.getAny()
if c, ok := a.conn[host]; ok {
return c, nil
}
conn, err := grpc.Dial(fmt.Sprintf("%s:%d", host, a.port), a.opts)
if err != nil {