Commit 45c3b4e3 authored by pfandzelter's avatar pfandzelter
Browse files

add choosereplica api to alexandra

parent 26fc0eb0
Pipeline #69905 canceled with stages
in 7 minutes and 40 seconds
......@@ -33,6 +33,7 @@ type clientExpiry struct {
// keygroupSet represents a keygroups clients and the last time this information was updated from FreD
type keygroupSet struct {
lastUpdated time.Time
preferred *Client
clients []*clientExpiry
}
......@@ -76,6 +77,27 @@ func (m *ClientsMgr) readFromAnywhere(request *middleware.ReadRequest) ([]string
versions []vclock.VClock
}
// if there is a preferred client, only try to ask that
if set.preferred != nil {
log.Debug().Msgf("there is a preferred node, reading from that: %s", set.preferred.nodeID)
res, err := set.preferred.Client.Read(context.Background(), &clientsProto.ReadRequest{Id: request.Id, Keygroup: request.Keygroup})
if err != nil {
log.Err(err).Msgf("Reading from preferred client %s returned error", set.preferred.nodeID)
} else {
vals := make([]string, len(res.Data))
versions := make([]vclock.VClock, len(res.Data))
for i := range res.Data {
vals[i] = res.Data[i].Val
versions[i] = res.Data[i].Version.Version
log.Debug().Msgf("Reading from client %s returned data: %+v %+v", set.preferred.nodeID, res.Data[i].Val, res.Data[i].Version.Version)
}
return vals, versions, nil
}
}
clientsToAsk := make(map[*Client]struct{})
clts := filterClientsToExpiry(set.clients, request.MinExpiry)
......@@ -97,6 +119,8 @@ func (m *ClientsMgr) readFromAnywhere(request *middleware.ReadRequest) ([]string
clientsToAsk[clts[rand.Intn(len(clts))].client] = struct{}{}
}
log.Debug().Msgf("asking %d nodes", len(clientsToAsk))
var wg sync.WaitGroup
responses := make(chan readResponse, len(clientsToAsk))
done := make(chan struct{})
......@@ -178,16 +202,15 @@ func (m *ClientsMgr) getLightHouse() (client *Client) {
// GetClientTo returns a client with this address
func (m *ClientsMgr) getClientTo(host string, nodeID string) (client *Client) {
log.Info().Msgf("GetClientTo: Trying to get Fred Client to host %s", host)
client = m.clients[host]
log.Info().Msgf("GetClientTo: Trying to get Fred Client to node %s host %s", nodeID, host)
client = m.clients[nodeID]
if client != nil {
if client.nodeID == "__lighthouse" && nodeID != "__lighthouse" {
client.nodeID = nodeID
}
return
}
client = newClient(nodeID, host, m.clientsCert, m.clientsKey)
m.clients[host] = client
m.clients[nodeID] = client
return
}
......@@ -237,7 +260,29 @@ func filterClientsToExpiry(clientEx []*clientExpiry, expiry int64) (out []*clien
return
}
func (m *ClientsMgr) setPreferred(keygroup string, nodeID string) error {
c, ok := m.clients[nodeID]
if !ok {
return fmt.Errorf("unknown node %s", nodeID)
}
m.Lock()
defer m.Unlock()
m.keygroups[keygroup].preferred = c
return nil
}
func (m *ClientsMgr) getClient(keygroup string) (*Client, error) {
m.Lock()
// if there is a preferred client for that keygroup, use that
if k, ok := m.keygroups[keygroup]; ok && k.preferred != nil {
m.Unlock()
return k.preferred, nil
}
m.Unlock()
if m.experimental && rand.Float64() < UseSlowerNodeProb {
return m.getRandomClientWithKeygroup(keygroup, 0)
}
......@@ -333,6 +378,7 @@ func (m *ClientsMgr) updateKeygroupClients(keygroup string) {
return
}
}
log.Debug().Msgf("updateKeygroupClients: Got replicas: %+v", replica)
m.Lock()
......@@ -344,13 +390,27 @@ func (m *ClientsMgr) updateKeygroupClients(keygroup string) {
}
set = m.keygroups[keygroup]
}
// we also need to find out if the preferred node for that keygroup (if any) still exists
preferred := m.keygroups[keygroup].preferred
removePreferred := preferred == nil
set.clients = make([]*clientExpiry, len(replica.Replica))
for i, client := range replica.Replica {
set.clients[i] = &clientExpiry{
client: m.getClientTo(client.Host, client.NodeId),
expiry: client.Expiry,
}
if !removePreferred && client.NodeId == preferred.nodeID {
removePreferred = false
}
}
if removePreferred {
m.keygroups[keygroup].preferred = nil
}
set.lastUpdated = time.Now()
m.keygroups[keygroup] = set
log.Debug().Msgf("updateKeygroupClients: new Clients are: %+v", set)
......
......@@ -18,6 +18,7 @@ import (
const alphaItemSpeed = float32(0.8)
type Client struct {
host string
nodeID string
Client api.ClientClient
conn *grpc.ClientConn
......@@ -71,6 +72,7 @@ func newClient(nodeID string, host string, certFile string, keyFile string) *Cli
Client: api.NewClientClient(conn),
conn: conn,
ReadSpeed: -1,
host: host,
nodeID: nodeID,
}
}
......
......@@ -229,14 +229,27 @@ func (m *Middleware) Append(ctx context.Context, req *middleware.AppendRequest)
// Notify notifies the middleware about a version of a datum that the client has seen by bypassing the middleware. This
// is required to capture external causality.
func (m *Middleware) Notify(_ context.Context, req *middleware.NotifyRequest) (*middleware.NotifyResponse, error) {
func (m *Middleware) Notify(_ context.Context, req *middleware.NotifyRequest) (*middleware.Empty, error) {
err := m.cache.add(req.Keygroup, req.Id, req.Version)
if err != nil {
return nil, err
}
return &middleware.NotifyResponse{}, nil
return &middleware.Empty{}, nil
}
// ChooseReplica allows a client to choose a particular note to send requests to for a keygroup. This will override the
// fastest node if exists
func (m *Middleware) ChooseReplica(_ context.Context, req *middleware.ChooseReplicaRequest) (*middleware.Empty, error) {
log.Debug().Msgf("AlexandraServer has rcdv ChooseReplica: %+v", req)
err := m.clientsMgr.setPreferred(req.Keygroup, req.NodeId)
if err != nil {
return nil, err
}
return &middleware.Empty{}, nil
}
// CreateKeygroup creates the keygroup and also adds the first node (This is two operations in the eye of FReD:
......@@ -244,9 +257,11 @@ func (m *Middleware) Notify(_ context.Context, req *middleware.NotifyRequest) (*
func (m *Middleware) CreateKeygroup(ctx context.Context, req *middleware.CreateKeygroupRequest) (*middleware.Empty, error) {
log.Debug().Msgf("AlexandraServer has rcdv CreateKeygroup: %+v", req)
getReplica, err := m.clientsMgr.getFastestClient().getReplica(ctx, req.FirstNodeId)
if err != nil {
return nil, err
}
log.Debug().Msgf("CreateKeygroup: using node %s (addr=%s)", getReplica.NodeId, getReplica.Host)
_, err = m.clientsMgr.getClientTo(getReplica.Host, getReplica.NodeId).createKeygroup(ctx, req.Keygroup, req.Mutable, req.Expiry)
......@@ -260,7 +275,7 @@ func (m *Middleware) CreateKeygroup(ctx context.Context, req *middleware.CreateK
// DeleteKeygroup deletes a keygroup from FReD.
func (m *Middleware) DeleteKeygroup(ctx context.Context, req *middleware.DeleteKeygroupRequest) (*middleware.Empty, error) {
client, err := m.clientsMgr.getFastestClientWithKeygroup(req.Keygroup, 1)
client, err := m.clientsMgr.getClient(req.Keygroup)
if err != nil {
return nil, err
}
......@@ -278,7 +293,13 @@ func (m *Middleware) DeleteKeygroup(ctx context.Context, req *middleware.DeleteK
// AddReplica lets the client explicitly add a new replica for a keygroup. In the future, this should happen
// automatically.
func (m *Middleware) AddReplica(ctx context.Context, req *middleware.AddReplicaRequest) (*middleware.Empty, error) {
_, err := m.clientsMgr.getFastestClient().Client.AddReplica(ctx, &api.AddReplicaRequest{
c, err := m.clientsMgr.getClient(req.Keygroup)
if err != nil {
return nil, err
}
_, err = c.Client.AddReplica(ctx, &api.AddReplicaRequest{
Keygroup: req.Keygroup,
NodeId: req.NodeId,
Expiry: req.Expiry,
......@@ -296,7 +317,13 @@ func (m *Middleware) AddReplica(ctx context.Context, req *middleware.AddReplicaR
// RemoveReplica lets the client explicitly remove a new replica for a keygroup. In the future, this should happen
// automatically.
func (m *Middleware) RemoveReplica(ctx context.Context, req *middleware.RemoveReplicaRequest) (*middleware.Empty, error) {
_, err := m.clientsMgr.getFastestClient().Client.RemoveReplica(ctx, &api.RemoveReplicaRequest{
c, err := m.clientsMgr.getClient(req.Keygroup)
if err != nil {
return nil, err
}
_, err = c.Client.RemoveReplica(ctx, &api.RemoveReplicaRequest{
Keygroup: req.Keygroup,
NodeId: req.NodeId,
})
......@@ -344,7 +371,13 @@ func (m *Middleware) GetAllReplica(ctx context.Context, _ *middleware.GetAllRepl
// GetKeygroupInfo returns a list of all FReD nodes that replicate a given keygroup. In the future, this API will be
// removed as ALExANDRA handles data replication.
func (m *Middleware) GetKeygroupInfo(ctx context.Context, req *middleware.GetKeygroupInfoRequest) (*middleware.GetKeygroupInfoResponse, error) {
res, err := m.clientsMgr.getFastestClient().Client.GetKeygroupInfo(ctx, &api.GetKeygroupInfoRequest{Keygroup: req.Keygroup})
c, err := m.clientsMgr.getClient(req.Keygroup)
if err != nil {
return nil, err
}
res, err := c.Client.GetKeygroupInfo(ctx, &api.GetKeygroupInfoRequest{Keygroup: req.Keygroup})
if err != nil {
return nil, err
......
This diff is collapsed.
......@@ -12,7 +12,8 @@ service Middleware {
rpc Update (UpdateRequest) returns (Empty);
rpc Delete (DeleteRequest) returns (Empty);
rpc Append (AppendRequest) returns (AppendResponse);
rpc Notify (NotifyRequest) returns (NotifyResponse);
rpc Notify (NotifyRequest) returns (Empty);
rpc ChooseReplica (ChooseReplicaRequest) returns (Empty);
rpc AddReplica (AddReplicaRequest) returns (Empty);
rpc GetKeygroupInfo (GetKeygroupInfoRequest) returns (GetKeygroupInfoResponse);
rpc RemoveReplica (RemoveReplicaRequest) returns (Empty);
......@@ -98,7 +99,10 @@ message NotifyRequest{
map<string, uint64> version = 3;
}
message NotifyResponse{}
message ChooseReplicaRequest{
string keygroup = 1;
string nodeId = 2;
}
message DeleteRequest {
string keygroup = 1;
......
......@@ -25,7 +25,8 @@ type MiddlewareClient interface {
Update(ctx context.Context, in *UpdateRequest, opts ...grpc.CallOption) (*Empty, error)
Delete(ctx context.Context, in *DeleteRequest, opts ...grpc.CallOption) (*Empty, error)
Append(ctx context.Context, in *AppendRequest, opts ...grpc.CallOption) (*AppendResponse, error)
Notify(ctx context.Context, in *NotifyRequest, opts ...grpc.CallOption) (*NotifyResponse, error)
Notify(ctx context.Context, in *NotifyRequest, opts ...grpc.CallOption) (*Empty, error)
ChooseReplica(ctx context.Context, in *ChooseReplicaRequest, opts ...grpc.CallOption) (*Empty, error)
AddReplica(ctx context.Context, in *AddReplicaRequest, opts ...grpc.CallOption) (*Empty, error)
GetKeygroupInfo(ctx context.Context, in *GetKeygroupInfoRequest, opts ...grpc.CallOption) (*GetKeygroupInfoResponse, error)
RemoveReplica(ctx context.Context, in *RemoveReplicaRequest, opts ...grpc.CallOption) (*Empty, error)
......@@ -109,8 +110,8 @@ func (c *middlewareClient) Append(ctx context.Context, in *AppendRequest, opts .
return out, nil
}
func (c *middlewareClient) Notify(ctx context.Context, in *NotifyRequest, opts ...grpc.CallOption) (*NotifyResponse, error) {
out := new(NotifyResponse)
func (c *middlewareClient) Notify(ctx context.Context, in *NotifyRequest, opts ...grpc.CallOption) (*Empty, error) {
out := new(Empty)
err := c.cc.Invoke(ctx, "/mcc.fred.middleware.Middleware/Notify", in, out, opts...)
if err != nil {
return nil, err
......@@ -118,6 +119,15 @@ func (c *middlewareClient) Notify(ctx context.Context, in *NotifyRequest, opts .
return out, nil
}
func (c *middlewareClient) ChooseReplica(ctx context.Context, in *ChooseReplicaRequest, opts ...grpc.CallOption) (*Empty, error) {
out := new(Empty)
err := c.cc.Invoke(ctx, "/mcc.fred.middleware.Middleware/ChooseReplica", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *middlewareClient) AddReplica(ctx context.Context, in *AddReplicaRequest, opts ...grpc.CallOption) (*Empty, error) {
out := new(Empty)
err := c.cc.Invoke(ctx, "/mcc.fred.middleware.Middleware/AddReplica", in, out, opts...)
......@@ -219,7 +229,8 @@ type MiddlewareServer interface {
Update(context.Context, *UpdateRequest) (*Empty, error)
Delete(context.Context, *DeleteRequest) (*Empty, error)
Append(context.Context, *AppendRequest) (*AppendResponse, error)
Notify(context.Context, *NotifyRequest) (*NotifyResponse, error)
Notify(context.Context, *NotifyRequest) (*Empty, error)
ChooseReplica(context.Context, *ChooseReplicaRequest) (*Empty, error)
AddReplica(context.Context, *AddReplicaRequest) (*Empty, error)
GetKeygroupInfo(context.Context, *GetKeygroupInfoRequest) (*GetKeygroupInfoResponse, error)
RemoveReplica(context.Context, *RemoveReplicaRequest) (*Empty, error)
......@@ -257,9 +268,12 @@ func (UnimplementedMiddlewareServer) Delete(context.Context, *DeleteRequest) (*E
func (UnimplementedMiddlewareServer) Append(context.Context, *AppendRequest) (*AppendResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method Append not implemented")
}
func (UnimplementedMiddlewareServer) Notify(context.Context, *NotifyRequest) (*NotifyResponse, error) {
func (UnimplementedMiddlewareServer) Notify(context.Context, *NotifyRequest) (*Empty, error) {
return nil, status.Errorf(codes.Unimplemented, "method Notify not implemented")
}
func (UnimplementedMiddlewareServer) ChooseReplica(context.Context, *ChooseReplicaRequest) (*Empty, error) {
return nil, status.Errorf(codes.Unimplemented, "method ChooseReplica not implemented")
}
func (UnimplementedMiddlewareServer) AddReplica(context.Context, *AddReplicaRequest) (*Empty, error) {
return nil, status.Errorf(codes.Unimplemented, "method AddReplica not implemented")
}
......@@ -446,6 +460,24 @@ func _Middleware_Notify_Handler(srv interface{}, ctx context.Context, dec func(i
return interceptor(ctx, in, info, handler)
}
func _Middleware_ChooseReplica_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(ChooseReplicaRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(MiddlewareServer).ChooseReplica(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/mcc.fred.middleware.Middleware/ChooseReplica",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(MiddlewareServer).ChooseReplica(ctx, req.(*ChooseReplicaRequest))
}
return interceptor(ctx, in, info, handler)
}
func _Middleware_AddReplica_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(AddReplicaRequest)
if err := dec(in); err != nil {
......@@ -665,6 +697,10 @@ var Middleware_ServiceDesc = grpc.ServiceDesc{
MethodName: "Notify",
Handler: _Middleware_Notify_Handler,
},
{
MethodName: "ChooseReplica",
Handler: _Middleware_ChooseReplica_Handler,
},
{
MethodName: "AddReplica",
Handler: _Middleware_AddReplica_Handler,
......
This diff is collapsed.
......@@ -260,12 +260,20 @@ class NotifyRequest(google.protobuf.message.Message):
def ClearField(self, field_name: typing_extensions.Literal[u"id",b"id",u"keygroup",b"keygroup",u"version",b"version"]) -> None: ...
global___NotifyRequest = NotifyRequest
class NotifyResponse(google.protobuf.message.Message):
class ChooseReplicaRequest(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor = ...
KEYGROUP_FIELD_NUMBER: builtins.int
NODEID_FIELD_NUMBER: builtins.int
keygroup: typing.Text = ...
nodeId: typing.Text = ...
def __init__(self,
*,
keygroup : typing.Text = ...,
nodeId : typing.Text = ...,
) -> None: ...
global___NotifyResponse = NotifyResponse
def ClearField(self, field_name: typing_extensions.Literal[u"keygroup",b"keygroup",u"nodeId",b"nodeId"]) -> None: ...
global___ChooseReplicaRequest = ChooseReplicaRequest
class DeleteRequest(google.protobuf.message.Message):
DESCRIPTOR: google.protobuf.descriptor.Descriptor = ...
......
......@@ -53,7 +53,12 @@ class MiddlewareStub(object):
self.Notify = channel.unary_unary(
'/mcc.fred.middleware.Middleware/Notify',
request_serializer=middleware__pb2.NotifyRequest.SerializeToString,
response_deserializer=middleware__pb2.NotifyResponse.FromString,
response_deserializer=middleware__pb2.Empty.FromString,
)
self.ChooseReplica = channel.unary_unary(
'/mcc.fred.middleware.Middleware/ChooseReplica',
request_serializer=middleware__pb2.ChooseReplicaRequest.SerializeToString,
response_deserializer=middleware__pb2.Empty.FromString,
)
self.AddReplica = channel.unary_unary(
'/mcc.fred.middleware.Middleware/AddReplica',
......@@ -159,6 +164,12 @@ class MiddlewareServicer(object):
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def ChooseReplica(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
context.set_details('Method not implemented!')
raise NotImplementedError('Method not implemented!')
def AddReplica(self, request, context):
"""Missing associated documentation comment in .proto file."""
context.set_code(grpc.StatusCode.UNIMPLEMENTED)
......@@ -260,7 +271,12 @@ def add_MiddlewareServicer_to_server(servicer, server):
'Notify': grpc.unary_unary_rpc_method_handler(
servicer.Notify,
request_deserializer=middleware__pb2.NotifyRequest.FromString,
response_serializer=middleware__pb2.NotifyResponse.SerializeToString,
response_serializer=middleware__pb2.Empty.SerializeToString,
),
'ChooseReplica': grpc.unary_unary_rpc_method_handler(
servicer.ChooseReplica,
request_deserializer=middleware__pb2.ChooseReplicaRequest.FromString,
response_serializer=middleware__pb2.Empty.SerializeToString,
),
'AddReplica': grpc.unary_unary_rpc_method_handler(
servicer.AddReplica,
......@@ -455,7 +471,24 @@ class Middleware(object):
metadata=None):
return grpc.experimental.unary_unary(request, target, '/mcc.fred.middleware.Middleware/Notify',
middleware__pb2.NotifyRequest.SerializeToString,
middleware__pb2.NotifyResponse.FromString,
middleware__pb2.Empty.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
@staticmethod
def ChooseReplica(request,
target,
options=(),
channel_credentials=None,
call_credentials=None,
insecure=False,
compression=None,
wait_for_ready=None,
timeout=None,
metadata=None):
return grpc.experimental.unary_unary(request, target, '/mcc.fred.middleware.Middleware/ChooseReplica',
middleware__pb2.ChooseReplicaRequest.SerializeToString,
middleware__pb2.Empty.FromString,
options, channel_credentials,
insecure, call_credentials, compression, wait_for_ready, timeout, metadata)
......
......@@ -42,10 +42,10 @@ failtest: ## Start the failtest
alexandratest: ## Test alexandra (starts 3node, alexandra, alexandra-test)
@docker network create fredwork --gateway 172.26.0.1 --subnet 172.26.0.0/16 || true
@docker-compose -f etcd.yml -f nodeA.yml -f nodeB.yml -f nodeC.yml -f trigger.yml -f alexandra.yml -f alexandra-test.yml build
@docker-compose -f etcd.yml -f nodeA.yml -f nodeB.yml -f nodeC.yml -f trigger.yml -f alexandra.yml -f alexandra-test.yml up --abort-on-container-exit --force-recreate --renew-anon-volumes --remove-orphans
@docker-compose -f etcd.yml -f nodeA.yml -f nodeB.yml -f nodeC.yml -f alexandra.yml -f alexandra-test.yml build
@docker-compose -f etcd.yml -f nodeA.yml -f nodeB.yml -f nodeC.yml -f alexandra.yml -f alexandra-test.yml up --abort-on-container-exit --force-recreate --renew-anon-volumes --remove-orphans
alexandra: ## alexandra (starts 3node, alexandra)
@docker network create fredwork --gateway 172.26.0.1 --subnet 172.26.0.0/16 || true
@docker-compose -f etcd.yml -f nodeA.yml -f nodeB.yml -f nodeC.yml -f trigger.yml -f alexandra.yml build
@docker-compose -f etcd.yml -f nodeA.yml -f nodeB.yml -f nodeC.yml -f trigger.yml -f alexandra.yml up --abort-on-container-exit --force-recreate --renew-anon-volumes --remove-orphans
@docker-compose -f etcd.yml -f nodeA.yml -f nodeB.yml -f nodeC.yml -f alexandra.yml build
@docker-compose -f etcd.yml -f nodeA.yml -f nodeB.yml -f nodeC.yml -f alexandra.yml up --abort-on-container-exit --force-recreate --renew-anon-volumes --remove-orphans
......@@ -21,6 +21,7 @@ printf "Got node address %s\n" "$NODE_ADDR"
--clients-cert /cert/client.crt \
--log-level info \
--log-handler dev \
--experimental \
&
sleep 10
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment