diff --git a/backend/backend.go b/backend/backend.go index 5b94ff1..20a141b 100644 --- a/backend/backend.go +++ b/backend/backend.go @@ -36,6 +36,9 @@ type Backend interface { // List iterates over done resources within a namespace List(ctx context.Context, req *rpc.ListRequest, iter Iterator) error + // Ping pings the backend connection. + Ping() error + // Close closes the backend connection. Close() error } diff --git a/backend/mock/mock.go b/backend/mock/mock.go index 7404308..104317f 100644 --- a/backend/mock/mock.go +++ b/backend/mock/mock.go @@ -133,6 +133,9 @@ func (b *Backend) List(_ context.Context, req *rpc.ListRequest, iter backend.Ite return nil } +// Ping implements the backend.Backend interface. +func (*Backend) Ping() error { return nil } + // Close implements the backend.Backend interface. func (*Backend) Close() error { return nil } diff --git a/backend/postgres/postgres.go b/backend/postgres/postgres.go index 678e460..3578a7c 100644 --- a/backend/postgres/postgres.go +++ b/backend/postgres/postgres.go @@ -214,6 +214,9 @@ func (b *postgres) Done(ctx context.Context, owner string, handleID uuid.UUID, m return performUpdate(ctx, stmt) } +// Ping implements the backend.Backend interface. +func (b *postgres) Ping() error { return b.DB.Ping() } + // Close implements the backend.Backend interface. func (b *postgres) Close() error { if b.ownDB { diff --git a/cmd/accord-server/main.go b/cmd/accord-server/main.go index 48ee046..7afd0f4 100644 --- a/cmd/accord-server/main.go +++ b/cmd/accord-server/main.go @@ -6,6 +6,7 @@ import ( "log" "net" "strings" + "time" "github.com/bsm/accord/backend/postgres" "github.com/bsm/accord/internal/service" @@ -45,8 +46,12 @@ func run(ctx context.Context) error { return err } - log.Printf("Listening on %s\n", flags.addr) srv := grpc.NewServer() - rpc.RegisterV1Server(srv, service.New(backend)) + svc := service.New(backend) + rpc.RegisterV1Server(srv, svc) + hch := rpc.RunHealthCheck(srv, svc, "accord", 5*time.Second) + defer hch.Stop() + + log.Printf("Listening on %s\n", flags.addr) return srv.Serve(lis) } diff --git a/internal/service/service.go b/internal/service/service.go index 4d3ff73..60f5395 100644 --- a/internal/service/service.go +++ b/internal/service/service.go @@ -12,17 +12,23 @@ import ( "google.golang.org/grpc/status" ) -type service struct { +// Service instances serve GRPC requests. +type Service struct { b backend.Backend } // New initalizes a new service -func New(b backend.Backend) rpc.V1Server { - return &service{b: b} +func New(b backend.Backend) *Service { + return &Service{b: b} +} + +// Ping implements rpc.Pinger. +func (s *Service) Ping() error { + return s.b.Ping() } // Acquire implements rpc.V1Server. -func (s *service) Acquire(ctx context.Context, req *rpc.AcquireRequest) (*rpc.AcquireResponse, error) { +func (s *Service) Acquire(ctx context.Context, req *rpc.AcquireRequest) (*rpc.AcquireResponse, error) { if req.Owner == "" { return nil, status.Error(codes.InvalidArgument, "invalid owner") } @@ -46,7 +52,7 @@ func (s *service) Acquire(ctx context.Context, req *rpc.AcquireRequest) (*rpc.Ac } // Renew implements rpc.V1Server. -func (s *service) Renew(ctx context.Context, req *rpc.RenewRequest) (*rpc.RenewResponse, error) { +func (s *Service) Renew(ctx context.Context, req *rpc.RenewRequest) (*rpc.RenewResponse, error) { if req.Owner == "" { return nil, status.Error(codes.InvalidArgument, "invalid owner") } @@ -63,7 +69,7 @@ func (s *service) Renew(ctx context.Context, req *rpc.RenewRequest) (*rpc.RenewR } // Done implements rpc.V1Server. -func (s *service) Done(ctx context.Context, req *rpc.DoneRequest) (*rpc.DoneResponse, error) { +func (s *Service) Done(ctx context.Context, req *rpc.DoneRequest) (*rpc.DoneResponse, error) { if req.Owner == "" { return nil, status.Error(codes.InvalidArgument, "invalid owner") } @@ -80,7 +86,7 @@ func (s *service) Done(ctx context.Context, req *rpc.DoneRequest) (*rpc.DoneResp } // List implements rpc.V1Server. -func (s *service) List(req *rpc.ListRequest, srv rpc.V1_ListServer) error { +func (s *Service) List(req *rpc.ListRequest, srv rpc.V1_ListServer) error { return s.b.List(srv.Context(), req, func(data *backend.HandleData) error { return srv.Send(convertHandle(data)) }) diff --git a/internal/service/service_test.go b/internal/service/service_test.go index 511392a..6a845fe 100644 --- a/internal/service/service_test.go +++ b/internal/service/service_test.go @@ -13,7 +13,8 @@ import ( ) var _ = Describe("V1Service", func() { - var subject rpc.V1Server + var subject *service.Service + var _ rpc.V1Server = subject var backend *mock.Backend var ctx = context.Background() const owner = "THEOWNER" diff --git a/rpc/health.go b/rpc/health.go new file mode 100644 index 0000000..5901023 --- /dev/null +++ b/rpc/health.go @@ -0,0 +1,51 @@ +package rpc + +import ( + context "context" + "time" + + grpc "google.golang.org/grpc" + "google.golang.org/grpc/health" + hpb "google.golang.org/grpc/health/grpc_health_v1" +) + +// Pinger servers can handle ping requests. +type Pinger interface { + Ping() error +} + +// HealthCheck instances can be stopped. +type HealthCheck interface { + Stop() +} + +type healthCheck struct{ cancel context.CancelFunc } + +func (h *healthCheck) Stop() { h.cancel() } + +// RunHealthCheck starts a standard grpc health check. +func RunHealthCheck(s *grpc.Server, c Pinger, name string, interval time.Duration) HealthCheck { + svc := health.NewServer() + hpb.RegisterHealthServer(s, svc) + + ctx, cancel := context.WithCancel(context.Background()) + go func() { + ticker := time.NewTicker(interval) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + svc.SetServingStatus(name, hpb.HealthCheckResponse_NOT_SERVING) + return + case <-ticker.C: + if err := c.Ping(); err == nil { + svc.SetServingStatus(name, hpb.HealthCheckResponse_SERVING) + } else { + svc.SetServingStatus(name, hpb.HealthCheckResponse_NOT_SERVING) + } + } + } + }() + return &healthCheck{cancel: cancel} +}