diff --git a/cmd/commands/run.go b/cmd/commands/run.go index e528f0d..f95a897 100644 --- a/cmd/commands/run.go +++ b/cmd/commands/run.go @@ -6,6 +6,7 @@ import ( "os/signal" "syscall" + "github.com/dezh-tech/immortal" "github.com/dezh-tech/immortal/cmd/relay" "github.com/dezh-tech/immortal/config" "github.com/dezh-tech/immortal/pkg/logger" @@ -23,6 +24,8 @@ func HandleRun(args []string) { logger.InitGlobalLogger(&cfg.Logger) + logger.Info("running immortal", "version", immortal.StringVersion()) + r, err := relay.New(cfg) if err != nil { ExitOnError(err) @@ -31,7 +34,9 @@ func HandleRun(args []string) { sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, os.Interrupt) - errCh := r.Start() + shutdownch := make(chan struct{}, 1) + + errCh := r.Start(shutdownch) select { case sig := <-sigChan: @@ -45,5 +50,11 @@ func HandleRun(args []string) { if err := r.Stop(); err != nil { ExitOnError(err) } + + case shsig := <-shutdownch: + logger.Info("Received signal from manager over grpc: Initiating graceful shutdown", "signal", shsig) + if err := r.Stop(); err != nil { + ExitOnError(err) + } } } diff --git a/cmd/relay/relay.go b/cmd/relay/relay.go index ccfcfae..370966e 100644 --- a/cmd/relay/relay.go +++ b/cmd/relay/relay.go @@ -85,7 +85,7 @@ func New(cfg *config.Config) (*Relay, error) { } // Start runs the relay and its children. -func (r *Relay) Start() chan error { +func (r *Relay) Start(shutdownch chan struct{}) chan error { logger.Info("starting the relay") errCh := make(chan error, 2) @@ -97,7 +97,7 @@ func (r *Relay) Start() chan error { }() go func() { - if err := r.grpcServer.Start(); err != nil { + if err := r.grpcServer.Start(shutdownch); err != nil { errCh <- err } }() @@ -121,5 +121,9 @@ func (r *Relay) Stop() error { return err } + if err := r.redis.Close(); err != nil { + return err + } + return nil } diff --git a/delivery/grpc/gen/shutdown.pb.go b/delivery/grpc/gen/shutdown.pb.go new file mode 100644 index 0000000..43033ca --- /dev/null +++ b/delivery/grpc/gen/shutdown.pb.go @@ -0,0 +1,193 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.33.0 +// protoc (unknown) +// source: shutdown.proto + +package grpc + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type ShutdownRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *ShutdownRequest) Reset() { + *x = ShutdownRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_shutdown_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ShutdownRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ShutdownRequest) ProtoMessage() {} + +func (x *ShutdownRequest) ProtoReflect() protoreflect.Message { + mi := &file_shutdown_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ShutdownRequest.ProtoReflect.Descriptor instead. +func (*ShutdownRequest) Descriptor() ([]byte, []int) { + return file_shutdown_proto_rawDescGZIP(), []int{0} +} + +type ShutdownResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields +} + +func (x *ShutdownResponse) Reset() { + *x = ShutdownResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_shutdown_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ShutdownResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ShutdownResponse) ProtoMessage() {} + +func (x *ShutdownResponse) ProtoReflect() protoreflect.Message { + mi := &file_shutdown_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ShutdownResponse.ProtoReflect.Descriptor instead. +func (*ShutdownResponse) Descriptor() ([]byte, []int) { + return file_shutdown_proto_rawDescGZIP(), []int{1} +} + +var File_shutdown_proto protoreflect.FileDescriptor + +var file_shutdown_proto_rawDesc = []byte{ + 0x0a, 0x0e, 0x73, 0x68, 0x75, 0x74, 0x64, 0x6f, 0x77, 0x6e, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, + 0x12, 0x08, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x2e, 0x76, 0x31, 0x22, 0x11, 0x0a, 0x0f, 0x53, 0x68, + 0x75, 0x74, 0x64, 0x6f, 0x77, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x22, 0x12, 0x0a, + 0x10, 0x53, 0x68, 0x75, 0x74, 0x64, 0x6f, 0x77, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, + 0x65, 0x32, 0x54, 0x0a, 0x0f, 0x53, 0x68, 0x75, 0x74, 0x64, 0x6f, 0x77, 0x6e, 0x53, 0x65, 0x72, + 0x76, 0x69, 0x63, 0x65, 0x12, 0x41, 0x0a, 0x08, 0x53, 0x68, 0x75, 0x74, 0x64, 0x6f, 0x77, 0x6e, + 0x12, 0x19, 0x2e, 0x72, 0x65, 0x6c, 0x61, 0x79, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x68, 0x75, 0x74, + 0x64, 0x6f, 0x77, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1a, 0x2e, 0x72, 0x65, + 0x6c, 0x61, 0x79, 0x2e, 0x76, 0x31, 0x2e, 0x53, 0x68, 0x75, 0x74, 0x64, 0x6f, 0x77, 0x6e, 0x52, + 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x42, 0x2d, 0x5a, 0x2b, 0x67, 0x69, 0x74, 0x68, 0x75, + 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x64, 0x65, 0x7a, 0x68, 0x2d, 0x74, 0x65, 0x63, 0x68, 0x2f, + 0x69, 0x6d, 0x6d, 0x6f, 0x72, 0x74, 0x61, 0x6c, 0x2f, 0x64, 0x65, 0x6c, 0x69, 0x76, 0x65, 0x72, + 0x79, 0x2f, 0x67, 0x72, 0x70, 0x63, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_shutdown_proto_rawDescOnce sync.Once + file_shutdown_proto_rawDescData = file_shutdown_proto_rawDesc +) + +func file_shutdown_proto_rawDescGZIP() []byte { + file_shutdown_proto_rawDescOnce.Do(func() { + file_shutdown_proto_rawDescData = protoimpl.X.CompressGZIP(file_shutdown_proto_rawDescData) + }) + return file_shutdown_proto_rawDescData +} + +var file_shutdown_proto_msgTypes = make([]protoimpl.MessageInfo, 2) +var file_shutdown_proto_goTypes = []interface{}{ + (*ShutdownRequest)(nil), // 0: relay.v1.ShutdownRequest + (*ShutdownResponse)(nil), // 1: relay.v1.ShutdownResponse +} +var file_shutdown_proto_depIdxs = []int32{ + 0, // 0: relay.v1.ShutdownService.Shutdown:input_type -> relay.v1.ShutdownRequest + 1, // 1: relay.v1.ShutdownService.Shutdown:output_type -> relay.v1.ShutdownResponse + 1, // [1:2] is the sub-list for method output_type + 0, // [0:1] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_shutdown_proto_init() } +func file_shutdown_proto_init() { + if File_shutdown_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_shutdown_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ShutdownRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_shutdown_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ShutdownResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_shutdown_proto_rawDesc, + NumEnums: 0, + NumMessages: 2, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_shutdown_proto_goTypes, + DependencyIndexes: file_shutdown_proto_depIdxs, + MessageInfos: file_shutdown_proto_msgTypes, + }.Build() + File_shutdown_proto = out.File + file_shutdown_proto_rawDesc = nil + file_shutdown_proto_goTypes = nil + file_shutdown_proto_depIdxs = nil +} diff --git a/delivery/grpc/gen/shutdown_grpc.pb.go b/delivery/grpc/gen/shutdown_grpc.pb.go new file mode 100644 index 0000000..c6e57b4 --- /dev/null +++ b/delivery/grpc/gen/shutdown_grpc.pb.go @@ -0,0 +1,107 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. +// versions: +// - protoc-gen-go-grpc v1.3.0 +// - protoc (unknown) +// source: shutdown.proto + +package grpc + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +const ( + ShutdownService_Shutdown_FullMethodName = "/relay.v1.ShutdownService/Shutdown" +) + +// ShutdownServiceClient is the client API for ShutdownService service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type ShutdownServiceClient interface { + Shutdown(ctx context.Context, in *ShutdownRequest, opts ...grpc.CallOption) (*ShutdownResponse, error) +} + +type shutdownServiceClient struct { + cc grpc.ClientConnInterface +} + +func NewShutdownServiceClient(cc grpc.ClientConnInterface) ShutdownServiceClient { + return &shutdownServiceClient{cc} +} + +func (c *shutdownServiceClient) Shutdown(ctx context.Context, in *ShutdownRequest, opts ...grpc.CallOption) (*ShutdownResponse, error) { + out := new(ShutdownResponse) + err := c.cc.Invoke(ctx, ShutdownService_Shutdown_FullMethodName, in, out, opts...) + if err != nil { + return nil, err + } + return out, nil +} + +// ShutdownServiceServer is the server API for ShutdownService service. +// All implementations should embed UnimplementedShutdownServiceServer +// for forward compatibility +type ShutdownServiceServer interface { + Shutdown(context.Context, *ShutdownRequest) (*ShutdownResponse, error) +} + +// UnimplementedShutdownServiceServer should be embedded to have forward compatible implementations. +type UnimplementedShutdownServiceServer struct { +} + +func (UnimplementedShutdownServiceServer) Shutdown(context.Context, *ShutdownRequest) (*ShutdownResponse, error) { + return nil, status.Errorf(codes.Unimplemented, "method Shutdown not implemented") +} + +// UnsafeShutdownServiceServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to ShutdownServiceServer will +// result in compilation errors. +type UnsafeShutdownServiceServer interface { + mustEmbedUnimplementedShutdownServiceServer() +} + +func RegisterShutdownServiceServer(s grpc.ServiceRegistrar, srv ShutdownServiceServer) { + s.RegisterService(&ShutdownService_ServiceDesc, srv) +} + +func _ShutdownService_Shutdown_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { + in := new(ShutdownRequest) + if err := dec(in); err != nil { + return nil, err + } + if interceptor == nil { + return srv.(ShutdownServiceServer).Shutdown(ctx, in) + } + info := &grpc.UnaryServerInfo{ + Server: srv, + FullMethod: ShutdownService_Shutdown_FullMethodName, + } + handler := func(ctx context.Context, req interface{}) (interface{}, error) { + return srv.(ShutdownServiceServer).Shutdown(ctx, req.(*ShutdownRequest)) + } + return interceptor(ctx, in, info, handler) +} + +// ShutdownService_ServiceDesc is the grpc.ServiceDesc for ShutdownService service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var ShutdownService_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "relay.v1.ShutdownService", + HandlerType: (*ShutdownServiceServer)(nil), + Methods: []grpc.MethodDesc{ + { + MethodName: "Shutdown", + Handler: _ShutdownService_Shutdown_Handler, + }, + }, + Streams: []grpc.StreamDesc{}, + Metadata: "shutdown.proto", +} diff --git a/delivery/grpc/proto/shutdown.proto b/delivery/grpc/proto/shutdown.proto new file mode 100644 index 0000000..b1c8e79 --- /dev/null +++ b/delivery/grpc/proto/shutdown.proto @@ -0,0 +1,13 @@ +syntax = "proto3"; + +package relay.v1; + +option go_package = "github.com/dezh-tech/immortal/delivery/grpc"; + +service ShutdownService { + rpc Shutdown (ShutdownRequest) returns (ShutdownResponse); +} + +message ShutdownRequest {} + +message ShutdownResponse {} diff --git a/delivery/grpc/server.go b/delivery/grpc/server.go index de31b09..d0e48db 100644 --- a/delivery/grpc/server.go +++ b/delivery/grpc/server.go @@ -37,7 +37,7 @@ func New(conf *Config, r *redis.Redis, db *database.Database, st time.Time) *Ser } } -func (s *Server) Start() error { +func (s *Server) Start(shutdownch chan struct{}) error { listener, err := net.Listen("tcp", net.JoinHostPort(s.config.Bind, //nolint strconv.Itoa(int(s.config.Port)))) if err != nil { @@ -47,8 +47,10 @@ func (s *Server) Start() error { grpcServer := grpc.NewServer(grpc.ChainUnaryInterceptor()) healthServer := newHealthServer(s) + shutdownServer := newShutdownServer(s, shutdownch) rpb.RegisterHealthServiceServer(grpcServer, healthServer) + rpb.RegisterShutdownServiceServer(grpcServer, shutdownServer) s.listener = listener s.grpc = grpcServer diff --git a/delivery/grpc/shutdown.go b/delivery/grpc/shutdown.go new file mode 100644 index 0000000..d6ab2a9 --- /dev/null +++ b/delivery/grpc/shutdown.go @@ -0,0 +1,28 @@ +package grpc + +import ( + "context" + + rpb "github.com/dezh-tech/immortal/delivery/grpc/gen" + "github.com/dezh-tech/immortal/pkg/logger" +) + +type shutdownServer struct { + shdCh chan struct{} + *Server +} + +func newShutdownServer(server *Server, shdCh chan struct{}) *shutdownServer { + return &shutdownServer{ + Server: server, + shdCh: shdCh, + } +} + +func (s shutdownServer) Shutdown(_ context.Context, r *rpb.ShutdownRequest) (*rpb.ShutdownResponse, error) { + logger.Info("shutdown signal received from grpc", "caller", r.String()) + + s.shdCh <- struct{}{} + + return &rpb.ShutdownResponse{}, nil +} diff --git a/version.go b/version.go index e4c681b..71842bf 100644 --- a/version.go +++ b/version.go @@ -7,8 +7,8 @@ import "fmt" var ( major = 0 minor = 0 - patch = 7 - meta = "" + patch = 8 + meta = "beta" ) func StringVersion() string {