diff --git a/Makefile b/Makefile index 18b9f6f..1dba768 100644 --- a/Makefile +++ b/Makefile @@ -48,6 +48,7 @@ generate: go run github.com/golang/mock/mockgen github.com/nitrictech/nitric/core/pkg/proto/storage/v1 StorageClient > mocks/storage.go go run github.com/golang/mock/mockgen github.com/nitrictech/nitric/core/pkg/proto/secrets/v1 SecretManagerClient > mocks/secrets.go go run github.com/golang/mock/mockgen github.com/nitrictech/nitric/core/pkg/proto/topics/v1 TopicsClient > mocks/topics.go + go run github.com/golang/mock/mockgen github.com/nitrictech/nitric/core/pkg/proto/batch/v1 BatchClient > mocks/batch.go go run github.com/golang/mock/mockgen -package mock_v1 google.golang.org/grpc ClientConnInterface > mocks/grpc_clientconn.go # Runs tests for coverage upload to codecov.io diff --git a/go.mod b/go.mod index bef6532..08738ab 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/golangci/golangci-lint v1.61.0 github.com/google/addlicense v1.1.1 github.com/missionMeteora/toolkit v0.0.0-20170713173850-88364e3ef8cc - github.com/nitrictech/nitric/core v0.0.0-20240915234849-42c1e482ddab + github.com/nitrictech/nitric/core v0.0.0-20241003062412-76ea6275fb0b github.com/nitrictech/protoutils v0.0.0-20220321044654-02667a814cdf github.com/onsi/ginkgo v1.16.5 github.com/onsi/gomega v1.34.2 @@ -118,7 +118,7 @@ require ( github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect github.com/polyfloyd/go-errorlint v1.6.0 // indirect github.com/prometheus/client_golang v1.14.0 // indirect - github.com/prometheus/client_model v0.5.0 // indirect + github.com/prometheus/client_model v0.6.0 // indirect github.com/prometheus/common v0.37.0 // indirect github.com/prometheus/procfs v0.8.0 // indirect github.com/quasilyte/go-ruleguard v0.4.3-0.20240823090925-0fe6f58b47b1 // indirect @@ -220,5 +220,5 @@ require ( go-simpler.org/musttag v0.12.2 // indirect go-simpler.org/sloglint v0.7.2 // indirect go.uber.org/automaxprocs v1.5.3 // indirect - go.uber.org/goleak v1.2.1 // indirect + go.uber.org/goleak v1.3.0 // indirect ) diff --git a/go.sum b/go.sum index 8da0a9f..30fcc35 100644 --- a/go.sum +++ b/go.sum @@ -424,8 +424,10 @@ github.com/nishanths/exhaustive v0.12.0 h1:vIY9sALmw6T/yxiASewa4TQcFsVYZQQRUQJhK github.com/nishanths/exhaustive v0.12.0/go.mod h1:mEZ95wPIZW+x8kC4TgC+9YCUgiST7ecevsVDTgc2obs= github.com/nishanths/predeclared v0.2.2 h1:V2EPdZPliZymNAn79T8RkNApBjMmVKh5XRpLm/w98Vk= github.com/nishanths/predeclared v0.2.2/go.mod h1:RROzoN6TnGQupbC+lqggsOlcgysk3LMK/HI84Mp280c= -github.com/nitrictech/nitric/core v0.0.0-20240915234849-42c1e482ddab h1:59YTNUa6en385Y21SwMTaixbe31SwgzjkjEhxx0RuwQ= -github.com/nitrictech/nitric/core v0.0.0-20240915234849-42c1e482ddab/go.mod h1:N274XVBjYhGEQoT42baWM6/lETBQYQhqPpqUuk2gmLc= +github.com/nitrictech/nitric/core v0.0.0-20240913000004-5d21c28b00ba h1:ZIPl9waqhbqw3xB2+zpUI2T1kEHyMkOnZZMt6tdrEUM= +github.com/nitrictech/nitric/core v0.0.0-20240913000004-5d21c28b00ba/go.mod h1:4LQH9hea9rX+0A+8G47NRk5nZuXCDqiwci1aZsHAkU8= +github.com/nitrictech/nitric/core v0.0.0-20241003062412-76ea6275fb0b h1:ImQFk66gRM3v9A6qmPImOiV3HJMDAX93X5rplMKn6ok= +github.com/nitrictech/nitric/core v0.0.0-20241003062412-76ea6275fb0b/go.mod h1:9bQnYPqLzq8CcPk5MHT3phg19CWJhDlFOfdIv27lwwM= github.com/nitrictech/protoutils v0.0.0-20220321044654-02667a814cdf h1:8MB8W8ylM8sCM2COGfiO39/tB6BTdiawLszaUGCNL5w= github.com/nitrictech/protoutils v0.0.0-20220321044654-02667a814cdf/go.mod h1:b2lzk2a4o1bvSrSCE6yvTldHuXCJymuDVhdMJGOSslw= github.com/nunnatsa/ginkgolinter v0.16.2 h1:8iLqHIZvN4fTLDC0Ke9tbSZVcyVHoBs0HIbnVSxfHJk= @@ -481,6 +483,7 @@ github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1: github.com/prometheus/client_model v0.2.0/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw= github.com/prometheus/client_model v0.5.0/go.mod h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk5Pm3gvsjB5tr+kI= +github.com/prometheus/client_model v0.6.0/go.mod h1:NTQHnmxFpouOD0DpvP4XujX3CdOAGQPoaGhyTchlyt8= github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y86RQel1bk4= github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo= github.com/prometheus/common v0.26.0/go.mod h1:M7rCNAaPfAosfx8veZJCuw84e35h3Cfd9VFqTh1DIvc= @@ -642,6 +645,7 @@ go.uber.org/automaxprocs v1.5.3 h1:kWazyxZUrS3Gs4qUpbwo5kEIMGe/DAvi5Z4tl2NW4j8= go.uber.org/automaxprocs v1.5.3/go.mod h1:eRbA25aqJrxAbsLO0xy5jVwPt7FQnRgjW+efnwa1WM0= go.uber.org/goleak v1.2.1 h1:NBol2c7O1ZokfZ0LEU9K6Whx/KnwvepVetCUhtKja4A= go.uber.org/goleak v1.2.1/go.mod h1:qlT2yGI9QafXHhZZLxlSuNsMw3FFLxBr+tBRlmO1xH4= +go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.8.0 h1:dg6GjLku4EH+249NNmoIciG9N/jURbDG+pFlTkhzIC8= go.uber.org/multierr v1.8.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak= go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60= diff --git a/mocks/batch.go b/mocks/batch.go new file mode 100644 index 0000000..e709ed2 --- /dev/null +++ b/mocks/batch.go @@ -0,0 +1,57 @@ +// Code generated by MockGen. DO NOT EDIT. +// Source: github.com/nitrictech/nitric/core/pkg/proto/batch/v1 (interfaces: BatchClient) + +// Package mock_v1 is a generated GoMock package. +package mock_v1 + +import ( + context "context" + reflect "reflect" + + gomock "github.com/golang/mock/gomock" + batchpb "github.com/nitrictech/nitric/core/pkg/proto/batch/v1" + grpc "google.golang.org/grpc" +) + +// MockBatchClient is a mock of BatchClient interface. +type MockBatchClient struct { + ctrl *gomock.Controller + recorder *MockBatchClientMockRecorder +} + +// MockBatchClientMockRecorder is the mock recorder for MockBatchClient. +type MockBatchClientMockRecorder struct { + mock *MockBatchClient +} + +// NewMockBatchClient creates a new mock instance. +func NewMockBatchClient(ctrl *gomock.Controller) *MockBatchClient { + mock := &MockBatchClient{ctrl: ctrl} + mock.recorder = &MockBatchClientMockRecorder{mock} + return mock +} + +// EXPECT returns an object that allows the caller to indicate expected use. +func (m *MockBatchClient) EXPECT() *MockBatchClientMockRecorder { + return m.recorder +} + +// SubmitJob mocks base method. +func (m *MockBatchClient) SubmitJob(arg0 context.Context, arg1 *batchpb.JobSubmitRequest, arg2 ...grpc.CallOption) (*batchpb.JobSubmitResponse, error) { + m.ctrl.T.Helper() + varargs := []interface{}{arg0, arg1} + for _, a := range arg2 { + varargs = append(varargs, a) + } + ret := m.ctrl.Call(m, "SubmitJob", varargs...) + ret0, _ := ret[0].(*batchpb.JobSubmitResponse) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// SubmitJob indicates an expected call of SubmitJob. +func (mr *MockBatchClientMockRecorder) SubmitJob(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + varargs := append([]interface{}{arg0, arg1}, arg2...) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubmitJob", reflect.TypeOf((*MockBatchClient)(nil).SubmitJob), varargs...) +} diff --git a/nitric/apis/api.go b/nitric/apis/api.go index 1f2b381..1c26700 100644 --- a/nitric/apis/api.go +++ b/nitric/apis/api.go @@ -47,8 +47,10 @@ type Route interface { ApiName() string } -type Handler = handlers.Handler[Ctx] -type Middleware = handlers.Middleware[Ctx] +type ( + Handler = handlers.Handler[Ctx] + Middleware = handlers.Middleware[Ctx] +) type route struct { path string diff --git a/nitric/batch/batch.go b/nitric/batch/batch.go new file mode 100644 index 0000000..f06d38d --- /dev/null +++ b/nitric/batch/batch.go @@ -0,0 +1,144 @@ +// Copyright 2021 Nitric Technologies Pty Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package batch + +import ( + "fmt" + + "github.com/nitrictech/go-sdk/internal/handlers" + "github.com/nitrictech/go-sdk/nitric/workers" + batchpb "github.com/nitrictech/nitric/core/pkg/proto/batch/v1" + v1 "github.com/nitrictech/nitric/core/pkg/proto/resources/v1" +) + +// JobPermission defines the available permissions on a job +type JobPermission string + +type Handler = handlers.Handler[Ctx] + +const ( + // JobSubmit is required to call Submit on a job. + JobSubmit JobPermission = "submit" +) + +type JobReference interface { + // Allow requests the given permissions to the job. + Allow(permission JobPermission, permissions ...JobPermission) *BatchClient + + // Handler will register and start the job task handler that will be called for all task submitted to this job. + // Valid function signatures for middleware are: + // + // func() + // func() error + // func(*batch.Ctx) + // func(*batch.Ctx) error + // Handler[batch.Ctx] + Handler(handler interface{}, options ...HandlerOption) +} + +type jobReference struct { + name string + manager *workers.Manager + registerChan <-chan workers.RegisterResult +} + +// NewJob creates a new job resource with the give name. +func NewJob(name string) JobReference { + job := &jobReference{ + name: name, + manager: workers.GetDefaultManager(), + } + + job.registerChan = job.manager.RegisterResource(&v1.ResourceDeclareRequest{ + Id: &v1.ResourceIdentifier{ + Type: v1.ResourceType_Job, + Name: name, + }, + Config: &v1.ResourceDeclareRequest_Job{ + Job: &v1.JobResource{}, + }, + }) + + return job +} + +func (j *jobReference) Allow(permission JobPermission, permissions ...JobPermission) *BatchClient { + allPerms := append([]JobPermission{permission}, permissions...) + + actions := []v1.Action{} + for _, perm := range allPerms { + switch perm { + case JobSubmit: + actions = append(actions, v1.Action_JobSubmit) + default: + panic(fmt.Errorf("JobPermission %s unknown", perm)) + } + } + + registerResult := <-j.registerChan + if registerResult.Err != nil { + panic(registerResult.Err) + } + + err := j.manager.RegisterPolicy(registerResult.Identifier, actions...) + if err != nil { + panic(err) + } + + client, err := NewBatchClient(j.name) + if err != nil { + panic(err) + } + + return client +} + +func (j *jobReference) Handler(handler interface{}, opts ...HandlerOption) { + options := &handlerOptions{} + + for _, opt := range opts { + opt(options) + } + + registrationRequest := &batchpb.RegistrationRequest{ + JobName: j.name, + Requirements: &batchpb.JobResourceRequirements{}, + } + + if options.cpus != nil { + registrationRequest.Requirements.Cpus = *options.cpus + } + + if options.memory != nil { + registrationRequest.Requirements.Memory = *options.memory + } + + if options.gpus != nil { + registrationRequest.Requirements.Gpus = *options.gpus + } + + typedHandler, err := handlers.HandlerFromInterface[Ctx](handler) + if err != nil { + panic(err) + } + + jobOpts := &jobWorkerOpts{ + RegistrationRequest: registrationRequest, + Handler: typedHandler, + } + + worker := newJobWorker(jobOpts) + j.manager.AddWorker("JobWorker:"+j.name, worker) +} diff --git a/nitric/batch/batch_suite_test.go b/nitric/batch/batch_suite_test.go new file mode 100644 index 0000000..0ee6492 --- /dev/null +++ b/nitric/batch/batch_suite_test.go @@ -0,0 +1,27 @@ +// Copyright 2023 Nitric Technologies Pty Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package batch_test + +import ( + "testing" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +func TestBatch(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Batch (Jobs) Suite") +} diff --git a/nitric/batch/batch_workers.go b/nitric/batch/batch_workers.go new file mode 100644 index 0000000..7b48710 --- /dev/null +++ b/nitric/batch/batch_workers.go @@ -0,0 +1,107 @@ +// Copyright 2023 Nitric Technologies Pty Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package batch + +import ( + "context" + "io" + + "google.golang.org/grpc" + + errorsstd "errors" + + "github.com/nitrictech/go-sdk/constants" + "github.com/nitrictech/go-sdk/nitric/errors" + "github.com/nitrictech/go-sdk/nitric/errors/codes" + v1 "github.com/nitrictech/nitric/core/pkg/proto/batch/v1" +) + +type jobWorker struct { + client v1.JobClient + registrationRequest *v1.RegistrationRequest + handler Handler +} +type jobWorkerOpts struct { + RegistrationRequest *v1.RegistrationRequest + Handler Handler +} + +// Start implements Worker. +func (s *jobWorker) Start(ctx context.Context) error { + initReq := &v1.ClientMessage{ + Content: &v1.ClientMessage_RegistrationRequest{ + RegistrationRequest: s.registrationRequest, + }, + } + + // Create the request stream and send the initial request + stream, err := s.client.HandleJob(ctx) + if err != nil { + return err + } + + err = stream.Send(initReq) + if err != nil { + return err + } + for { + var ctx *Ctx + + resp, err := stream.Recv() + + if errorsstd.Is(err, io.EOF) { + err = stream.CloseSend() + if err != nil { + return err + } + + return nil + } else if err == nil && resp.GetRegistrationResponse() != nil { + // Do nothing + } else if err == nil && resp.GetJobRequest() != nil { + ctx = NewCtx(resp) + err = s.handler(ctx) + if err != nil { + ctx.WithError(err) + } + + err = stream.Send(ctx.ToClientMessage()) + if err != nil { + return err + } + } else { + return err + } + } +} + +func newJobWorker(opts *jobWorkerOpts) *jobWorker { + conn, err := grpc.NewClient(constants.NitricAddress(), constants.DefaultOptions()...) + if err != nil { + panic(errors.NewWithCause( + codes.Unavailable, + "NewJobWorker: Unable to reach JobClient", + err, + )) + } + + client := v1.NewJobClient(conn) + + return &jobWorker{ + client: client, + registrationRequest: opts.RegistrationRequest, + handler: opts.Handler, + } +} diff --git a/nitric/batch/client.go b/nitric/batch/client.go new file mode 100644 index 0000000..fcef035 --- /dev/null +++ b/nitric/batch/client.go @@ -0,0 +1,88 @@ +// Copyright 2021 Nitric Technologies Pty Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package batch + +import ( + "context" + + "google.golang.org/grpc" + + "github.com/nitrictech/go-sdk/constants" + "github.com/nitrictech/go-sdk/nitric/errors" + "github.com/nitrictech/go-sdk/nitric/errors/codes" + v1 "github.com/nitrictech/nitric/core/pkg/proto/batch/v1" + "github.com/nitrictech/protoutils" +) + +// Batch +type BatchClientIn interface { + // Name returns the Job name. + Name() string + + // Submit will submit the provided request to the job. + Submit(ctx context.Context, data map[string]interface{}) error +} + +type BatchClient struct { + name string + batchClient v1.BatchClient +} + +func (s *BatchClient) Name() string { + return s.name +} + +func (s *BatchClient) Submit(ctx context.Context, data map[string]interface{}) error { + dataStruct, err := protoutils.NewStruct(data) + if err != nil { + return errors.NewWithCause(codes.InvalidArgument, "Batch.Submit", err) + } + + // Create the request + req := &v1.JobSubmitRequest{ + JobName: s.name, + Data: &v1.JobData{ + Data: &v1.JobData_Struct{ + Struct: dataStruct, + }, + }, + } + + // Submit the request + _, err = s.batchClient.SubmitJob(ctx, req) + if err != nil { + return errors.FromGrpcError(err) + } + + return nil +} + +func NewBatchClient(name string) (*BatchClient, error) { + conn, err := grpc.NewClient(constants.NitricAddress(), constants.DefaultOptions()...) + if err != nil { + return nil, errors.NewWithCause( + codes.Unavailable, + "NewBatchClient: unable to reach nitric server", + err, + ) + } + + batchClient := v1.NewBatchClient(conn) + + return &BatchClient{ + name: name, + batchClient: batchClient, + }, nil +} diff --git a/nitric/batch/client_test.go b/nitric/batch/client_test.go new file mode 100644 index 0000000..67865d6 --- /dev/null +++ b/nitric/batch/client_test.go @@ -0,0 +1,119 @@ +// Copyright 2021 Nitric Technologies Pty Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package batch + +import ( + "context" + "errors" + "strings" + + "github.com/golang/mock/gomock" + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" + + mock_v1 "github.com/nitrictech/go-sdk/mocks" + v1 "github.com/nitrictech/nitric/core/pkg/proto/batch/v1" + "github.com/nitrictech/protoutils" +) + +var _ = Describe("File", func() { + var ( + ctrl *gomock.Controller + mockBatchClient *mock_v1.MockBatchClient + j *BatchClient + jobName string + ctx context.Context + ) + + BeforeEach(func() { + ctrl = gomock.NewController(GinkgoT()) + mockBatchClient = mock_v1.NewMockBatchClient(ctrl) + + jobName = "test-job" + j = &BatchClient{ + name: jobName, + batchClient: mockBatchClient, + } + + ctx = context.Background() + }) + + AfterEach(func() { + ctrl.Finish() + }) + + Describe("Name()", func() { + It("should have the same job name as the one provided", func() { + _jobName := j.Name() + Expect(_jobName).To(Equal(jobName)) + }) + }) + + Describe("Submit()", func() { + var dataToBeSubmitted map[string]interface{} + + BeforeEach(func() { + dataToBeSubmitted = map[string]interface{}{ + "data": "hello world", + } + }) + + When("the gRPC Read operation is successful", func() { + BeforeEach(func() { + payloadStruct, err := protoutils.NewStruct(dataToBeSubmitted) + Expect(err).ToNot(HaveOccurred()) + + mockBatchClient.EXPECT().SubmitJob(gomock.Any(), &v1.JobSubmitRequest{ + JobName: jobName, + Data: &v1.JobData{ + Data: &v1.JobData_Struct{ + Struct: payloadStruct, + }, + }, + }).Return( + &v1.JobSubmitResponse{}, + nil).Times(1) + }) + + It("should not return error", func() { + err := j.Submit(ctx, dataToBeSubmitted) + + Expect(err).ToNot(HaveOccurred()) + }) + }) + + When("the grpc server returns an error", func() { + var errorMsg string + + BeforeEach(func() { + errorMsg = "Internal Error" + + By("the gRPC server returning an error") + mockBatchClient.EXPECT().SubmitJob(gomock.Any(), gomock.Any()).Return( + nil, + errors.New(errorMsg), + ).Times(1) + }) + + It("should return the passed error", func() { + err := j.Submit(ctx, dataToBeSubmitted) + + By("returning error with expected message") + Expect(err).To(HaveOccurred()) + Expect(strings.Contains(err.Error(), errorMsg)).To(BeTrue()) + }) + }) + }) +}) diff --git a/nitric/batch/context.go b/nitric/batch/context.go new file mode 100644 index 0000000..a348796 --- /dev/null +++ b/nitric/batch/context.go @@ -0,0 +1,54 @@ +// Copyright 2023 Nitric Technologies Pty Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package batch + +import batchpb "github.com/nitrictech/nitric/core/pkg/proto/batch/v1" + +type Ctx struct { + id string + Request Request + Response *Response + Extras map[string]interface{} +} + +func (c *Ctx) ToClientMessage() *batchpb.ClientMessage { + return &batchpb.ClientMessage{ + Id: c.id, + Content: &batchpb.ClientMessage_JobResponse{ + JobResponse: &batchpb.JobResponse{ + Success: true, + }, + }, + } +} + +func NewCtx(msg *batchpb.ServerMessage) *Ctx { + return &Ctx{ + id: msg.Id, + Request: &requestImpl{ + jobName: msg.GetJobRequest().GetJobName(), + data: msg.GetJobRequest().GetData().GetStruct().AsMap(), + }, + Response: &Response{ + Success: true, + }, + } +} + +func (c *Ctx) WithError(err error) { + c.Response = &Response{ + Success: false, + } +} diff --git a/nitric/batch/options.go b/nitric/batch/options.go new file mode 100644 index 0000000..fb934ac --- /dev/null +++ b/nitric/batch/options.go @@ -0,0 +1,48 @@ +// Copyright 2021 Nitric Technologies Pty Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package batch + +type HandlerOption func(opts *handlerOptions) + +// HandlerOptions defines the resource requirements for a job +type handlerOptions struct { + // Cpus is the number of CPUs/vCPUs to allocate to the job + cpus *float32 + // Memory is the amount of memory in MiB to allocate to the job + memory *int64 + // Gpus is the number of GPUs to allocate to the job + gpus *int64 +} + +// WithCpus - Set the number of CPUs/vCPUs to allocate to job handler instances +func WithCpus(cpus float32) HandlerOption { + return func(opts *handlerOptions) { + opts.cpus = &cpus + } +} + +// WithMemory - Set the amount of memory in MiB to allocate to job handler instances +func WithMemory(mib int64) HandlerOption { + return func(opts *handlerOptions) { + opts.memory = &mib + } +} + +// WithGpus - Set the number of GPUs to allocate to job handler instances +func WithGpus(gpus int64) HandlerOption { + return func(opts *handlerOptions) { + opts.gpus = &gpus + } +} diff --git a/nitric/batch/request.go b/nitric/batch/request.go new file mode 100644 index 0000000..4210a4e --- /dev/null +++ b/nitric/batch/request.go @@ -0,0 +1,33 @@ +// Copyright 2023 Nitric Technologies Pty Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package batch + +type Request interface { + JobName() string + Data() map[string]interface{} +} + +type requestImpl struct { + jobName string + data map[string]interface{} +} + +func (m *requestImpl) JobName() string { + return m.jobName +} + +func (m *requestImpl) Data() map[string]interface{} { + return m.data +} diff --git a/nitric/batch/response.go b/nitric/batch/response.go new file mode 100644 index 0000000..9f859ca --- /dev/null +++ b/nitric/batch/response.go @@ -0,0 +1,19 @@ +// Copyright 2023 Nitric Technologies Pty Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package batch + +type Response struct { + Success bool +} diff --git a/nitric/nitric.go b/nitric/nitric.go index d2d5f53..d141ac4 100644 --- a/nitric/nitric.go +++ b/nitric/nitric.go @@ -16,6 +16,7 @@ package nitric import ( "github.com/nitrictech/go-sdk/nitric/apis" + "github.com/nitrictech/go-sdk/nitric/batch" "github.com/nitrictech/go-sdk/nitric/keyvalue" "github.com/nitrictech/go-sdk/nitric/queues" "github.com/nitrictech/go-sdk/nitric/schedules" @@ -37,6 +38,7 @@ var ( NewBucket = storage.NewBucket NewTopic = topics.NewTopic NewWebsocket = websockets.NewWebsocket + NewJob = batch.NewJob ) func Run() {