-
Notifications
You must be signed in to change notification settings - Fork 16
/
Copy pathoptions.go
165 lines (137 loc) · 4.85 KB
/
options.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
package semaphore
import (
"github.com/jexia/semaphore/v2/pkg/broker"
"github.com/jexia/semaphore/v2/pkg/broker/logger"
"github.com/jexia/semaphore/v2/pkg/codec"
"github.com/jexia/semaphore/v2/pkg/flow"
"github.com/jexia/semaphore/v2/pkg/functions"
"github.com/jexia/semaphore/v2/pkg/providers"
"github.com/jexia/semaphore/v2/pkg/specs"
"github.com/jexia/semaphore/v2/pkg/transport"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
// Option represents a constructor func which sets a given option
type Option func(*broker.Context, *Options)
// Options represents all the available options
type Options struct {
Codec codec.Constructors
Callers transport.Callers
FlowResolvers providers.FlowsResolvers
Middleware []Middleware
BeforeConstructor BeforeConstructor
BeforeManagerDo flow.BeforeManager
BeforeManagerRollback flow.BeforeManager
AfterManagerDo flow.AfterManager
AfterManagerRollback flow.AfterManager
BeforeNodeDo flow.BeforeNode
BeforeNodeRollback flow.BeforeNode
AfterNodeDo flow.AfterNode
AfterNodeRollback flow.AfterNode
AfterFlowConstruction AfterFlowConstruction
Functions functions.Custom
}
// Middleware is called once the options have been initialised
type Middleware interface {
Use(*broker.Context) ([]Option, error)
}
type middleware struct {
handle func(*broker.Context) ([]Option, error)
}
func (m *middleware) Use(ctx *broker.Context) ([]Option, error) {
return m.handle(ctx)
}
// MiddlewareFunc wraps the given handle inside a middleware implementation
func MiddlewareFunc(handle func(*broker.Context) ([]Option, error)) Middleware {
return &middleware{handle}
}
// BeforeConstructor is called before the specifications is constructored
type BeforeConstructor func(*broker.Context, functions.Collection, Options) error
// BeforeConstructorHandler wraps the before constructed function to allow middleware to be chained
type BeforeConstructorHandler func(BeforeConstructor) BeforeConstructor
// AfterFlowConstruction is called before the construction of a flow manager
type AfterFlowConstruction func(*broker.Context, specs.FlowInterface, *flow.Manager) error
// AfterFlowConstructionHandler wraps the before flow construction function to allow middleware to be chained
type AfterFlowConstructionHandler func(AfterFlowConstruction) AfterFlowConstruction
// NewOptions constructs a Options object from the given Option constructors
func NewOptions(ctx *broker.Context, options ...Option) (Options, error) {
result := Options{
FlowResolvers: make([]providers.FlowsResolver, 0),
Codec: make(map[string]codec.Constructor),
}
if options == nil {
return result, nil
}
err := SetOptions(ctx, &result, options...)
if err != nil {
return result, err
}
return result, nil
}
// SetOptions sets the given options in the given parent
func SetOptions(ctx *broker.Context, parent *Options, options ...Option) error {
for _, option := range options {
if option == nil {
continue
}
option(ctx, parent)
}
for _, middleware := range parent.Middleware {
options, err := middleware.Use(ctx)
if err != nil {
return err
}
for _, option := range options {
option(ctx, parent)
}
}
return nil
}
// NewCollection constructs a new options collection
func NewCollection(options ...Option) []Option {
return options
}
// WithFlows appends the given flows resolver to the available flow resolvers
func WithFlows(definition providers.FlowsResolver) Option {
return func(ctx *broker.Context, options *Options) {
options.FlowResolvers = append(options.FlowResolvers, definition)
}
}
// WithCodec appends the given codec to the collection of available codecs
func WithCodec(codec codec.Constructor) Option {
return func(ctx *broker.Context, options *Options) {
options.Codec[codec.Name()] = codec
}
}
// WithCaller appends the given caller to the collection of available callers
func WithCaller(caller transport.NewCaller) Option {
return func(ctx *broker.Context, options *Options) {
options.Callers = append(options.Callers, caller(ctx))
}
}
// WithFunctions defines the custom defined functions to be used
func WithFunctions(custom functions.Custom) Option {
return func(ctx *broker.Context, options *Options) {
if options.Functions == nil {
options.Functions = functions.Custom{}
}
for key, fn := range custom {
options.Functions[key] = fn
}
}
}
// WithLogLevel sets the log level for the given module
func WithLogLevel(pattern string, value string) Option {
return func(ctx *broker.Context, options *Options) {
level := zapcore.InfoLevel
err := level.UnmarshalText([]byte(value))
if err != nil {
logger.Error(ctx, "unable to unmarshal log level", zap.String("level", value))
return
}
err = logger.SetLevel(ctx, pattern, level)
if err != nil {
logger.Error(ctx, "unable to set log level", zap.Error(err))
}
}
}