-
-
Notifications
You must be signed in to change notification settings - Fork 7
/
Copy pathratus.go
331 lines (268 loc) · 11.8 KB
/
ratus.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
// Package ratus contains data models and a client library for Go applications.
package ratus
import (
"context"
"encoding/gob"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"strings"
"time"
)
// DefaultTimeout is the default timeout duration for task execution.
const DefaultTimeout = "10m"
// DefaultLimit is the default number of resources to return in pagination.
const DefaultLimit = 10
// NonceLength is the length of the randomly generated nonce strings.
const NonceLength = 16
// StatusClientClosedRequest is the code for client closed request errors.
const StatusClientClosedRequest = 499
var (
// ErrBadRequest is returned when the request is malformed.
ErrBadRequest = errors.New("bad request")
// ErrNotFound is returned when the requested resource is not found.
ErrNotFound = errors.New("not found")
// ErrConflict is returned when the resource conflicts with existing ones.
ErrConflict = errors.New("conflict")
// ErrClientClosedRequest is returned when the client closed the request.
ErrClientClosedRequest = errors.New("client closed request")
// ErrInternalServerError is returned when the server encountered a
// situation it does not know how to handle.
ErrInternalServerError = errors.New("internal server error")
// ErrServiceUnavailable is returned when the service is unavailable.
ErrServiceUnavailable = errors.New("service unavailable")
)
// init registers interface types for binary encoding and decoding.
func init() {
gob.Register([]interface{}{})
gob.Register(map[string]interface{}{})
gob.Register([]map[string]interface{}{})
}
// TaskState indicates the state of a task.
type TaskState int32
const (
// The "pending" state indicates that the task is ready to be executed or
// is waiting to be executed in the future.
TaskStatePending TaskState = iota
// The "active" state indicates that the task is being processed by a
// consumer. Active tasks that have timed out will be automatically reset
// to the "pending" state. Consumer code should handle failure and set the
// state to "pending" to retry later if necessary.
TaskStateActive
// The "completed" state indicates that the task has completed its execution.
// If the storage engine implementation supports TTL, completed tasks will
// be automatically deleted after the retention period has expired.
TaskStateCompleted
// The "archived" state indicates that the task is stored as an archive.
// Archived tasks will never be deleted due to expiration.
TaskStateArchived
)
// Topic refers to an ordered subset of tasks with the same topic name property.
type Topic struct {
// User-defined unique name of the topic.
Name string `json:"name" bson:"_id"`
// The number of tasks that belong to the topic.
Count int64 `json:"count,omitempty" bson:"count,omitempty"`
}
// Task references an idempotent unit of work that should be executed asynchronously.
type Task struct {
// User-defined unique ID of the task.
// Task IDs across all topics share the same namespace.
ID string `json:"_id" bson:"_id"`
// Topic that the task currently belongs to. Tasks under the same topic
// will be executed according to the scheduled time.
Topic string `json:"topic" bson:"topic"`
// Current state of the task. At a given moment, the state of a task may be
// either "pending", "active", "completed" or "archived".
State TaskState `json:"state" bson:"state"`
// The nonce field stores a random string for implementing an optimistic
// concurrency control (OCC) layer outside of the storage engine. Ratus
// ensures consumers can only commit to tasks that have not changed since
// the promise was made by verifying the nonce field.
Nonce string `json:"nonce" bson:"nonce"`
// Identifier of the producer instance who produced the task.
Producer string `json:"producer,omitempty" bson:"producer,omitempty"`
// Identifier of the consumer instance who consumed the task.
Consumer string `json:"consumer,omitempty" bson:"consumer,omitempty"`
// The time the task was created.
// Timestamps are generated by the instance running Ratus, remember to
// perform clock synchronization before running multiple instances.
Produced *time.Time `json:"produced,omitempty" bson:"produced,omitempty"`
// The time the task is scheduled to be executed. Tasks will not be
// executed until the scheduled time arrives. After the scheduled time,
// excessive tasks will be executed in the order of the scheduled time.
Scheduled *time.Time `json:"scheduled,omitempty" bson:"scheduled,omitempty"`
// The time the task was claimed by a consumer.
// Not to confuse this with the time of commit, which is not recorded.
Consumed *time.Time `json:"consumed,omitempty" bson:"consumed,omitempty"`
// The deadline for the completion of execution promised by the consumer.
// Consumer code needs to commit the task before this deadline, otherwise
// the task is determined to have timed out and will be reset to the
// "pending" state, allowing other consumers to retry.
Deadline *time.Time `json:"deadline,omitempty" bson:"deadline,omitempty"`
// A minimal descriptor of the task to be executed.
// It is not recommended to rely on Ratus as the main storage of tasks.
// Instead, consider storing the complete task record in a database, and
// use a minimal descriptor as the payload to reference the task.
Payload any `json:"payload,omitempty" bson:"payload,omitempty"`
// A duration relative to the time the task is accepted, indicating that
// the task will be scheduled to execute after this duration. When the
// absolute scheduled time is specified, the scheduled time will take
// precedence. It is recommended to use relative durations whenever
// possible to avoid clock synchronization issues. The value must be a
// valid duration string parsable by time.ParseDuration. This field is only
// used when creating a task and will be cleared after converting to an
// absolute scheduled time.
Defer string `json:"defer,omitempty" bson:"-"`
}
// Decode parses the payload of the task and stores the result in the value
// pointed by the specified pointer.
func (t *Task) Decode(v any) error {
// Counterintuitively, the seemingly dumb approach of just marshalling
// input into JSON bytes and decoding it from those bytes is actually both
// 29.5% faster (than reflection) and causes less memory allocations.
// Reference: https://github.com/mitchellh/mapstructure/issues/37
b, err := json.Marshal(t.Payload)
if err != nil {
return err
}
return json.Unmarshal(b, v)
}
// Promise represents a claim on the ownership of an active task.
type Promise struct {
// Unique ID of the promise, which is the same as the target task ID.
// A promise with an empty ID is considered an "wildcard promise", and
// Ratus will assign an appropriate task based on the status of the queue.
// A task can only be owned by a single promise at a given time.
ID string `json:"_id,omitempty" bson:"_id" form:"_id"`
// Identifier of the consumer instance who consumed the task.
Consumer string `json:"consumer,omitempty" bson:"consumer,omitempty" form:"consumer"`
// The deadline for the completion of execution promised by the consumer.
// Consumer code needs to commit the task before this deadline, otherwise
// the task is determined to have timed out and will be reset to the
// "pending" state, allowing other consumers to retry.
Deadline *time.Time `json:"deadline,omitempty" bson:"deadline,omitempty" form:"deadline"`
// Timeout duration for task execution promised by the consumer. When the
// absolute deadline time is specified, the deadline will take precedence.
// It is recommended to use relative durations whenever possible to avoid
// clock synchronization issues. The value must be a valid duration string
// parsable by time.ParseDuration. This field is only used when creating a
// promise and will be cleared after converting to an absolute deadline.
Timeout string `json:"timeout,omitempty" bson:"-" form:"timeout"`
}
// Commit contains a set of updates to be applied to a task.
type Commit struct {
// If not empty, the commit will be accepted only if the value matches the
// corresponding nonce of the target task.
Nonce string `json:"nonce,omitempty" bson:"nonce,omitempty"`
// If not empty, transfer the task to the specified topic.
Topic string `json:"topic,omitempty" bson:"topic,omitempty"`
// If not nil, set the state of the task to the specified value.
// If nil, the state of the task will be set to "completed" by default.
State *TaskState `json:"state,omitempty" bson:"state,omitempty"`
// If not nil, set the scheduled time of the task to the specified value.
Scheduled *time.Time `json:"scheduled,omitempty" bson:"scheduled,omitempty"`
// If not nil, use this value to replace the payload of the task.
Payload any `json:"payload,omitempty" bson:"payload,omitempty"`
// A duration relative to the time the commit is accepted, indicating that
// the task will be scheduled to execute after this duration. When the
// absolute scheduled time is specified, the scheduled time will take
// precedence. It is recommended to use relative durations whenever
// possible to avoid clock synchronization issues. The value must be a
// valid duration string parsable by time.ParseDuration. This field is only
// used when creating a commit and will be cleared after converting to an
// absolute scheduled time.
Defer string `json:"defer,omitempty" bson:"-"`
}
// Topics contains a list of topic resources.
type Topics struct {
Data []*Topic `json:"data"`
}
// Tasks contains a list of task resources.
type Tasks struct {
Data []*Task `json:"data"`
}
// Promises contains a list of promise resources.
type Promises struct {
Data []*Promise `json:"data"`
}
// Updated contains result of an update operation.
type Updated struct {
// Number of resources created by the operation.
Created int64 `json:"created"`
// Number of resources updated by the operation.
Updated int64 `json:"updated"`
}
// Deleted contains result of a delete operation.
type Deleted struct {
// Number of resources deleted by the operation.
Deleted int64 `json:"deleted"`
}
// Error contains an error message.
type Error struct {
// The error object.
Error struct {
// Code of the error.
Code int `json:"code"`
// Message of the error.
Message string `json:"message"`
} `json:"error"`
}
// Err returns an error type from the error message. It will automatically wrap
// sentinel error types based on the code and remove duplicates in the message.
func (e *Error) Err() error {
// Determine the sentinel error to wrap around based on the error code.
var err error
switch e.Error.Code {
case StatusClientClosedRequest:
err = ErrClientClosedRequest
case http.StatusBadRequest:
err = ErrBadRequest
case http.StatusNotFound:
err = ErrNotFound
case http.StatusConflict:
err = ErrConflict
case http.StatusInternalServerError:
err = ErrInternalServerError
case http.StatusServiceUnavailable:
err = ErrServiceUnavailable
default:
return errors.New(e.Error.Message)
}
// Remove duplicated prefix caused by wrapping.
m := strings.TrimPrefix(e.Error.Message, err.Error())
if m != "" {
err = fmt.Errorf("%w%s", err, m)
}
return err
}
// NewError creates an error message from an error type.
func NewError(err error) *Error {
// Determine status code for the error.
var s int
switch {
case errors.Is(err, context.Canceled):
s = StatusClientClosedRequest
case errors.Is(err, io.ErrUnexpectedEOF):
s = StatusClientClosedRequest
case errors.Is(err, ErrClientClosedRequest):
s = StatusClientClosedRequest
case errors.Is(err, ErrBadRequest):
s = http.StatusBadRequest
case errors.Is(err, ErrNotFound):
s = http.StatusNotFound
case errors.Is(err, ErrConflict):
s = http.StatusConflict
case errors.Is(err, ErrServiceUnavailable):
s = http.StatusServiceUnavailable
default:
s = http.StatusInternalServerError
}
// Populate error information.
var e Error
e.Error.Code = s
e.Error.Message = err.Error()
return &e
}