-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathsdk.go
250 lines (211 loc) · 6.47 KB
/
sdk.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
package golang
import (
"context"
"fmt"
"hash/fnv"
"net"
"os"
"strings"
"github.com/gaia-pipeline/protobuf"
"github.com/pkg/errors"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/health"
healthpb "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/reflection"
)
// coreProtocolVersion is the protocol version of the plugin system itself.
const coreProtocolVersion = 1
// ProtocolVersion currently in use by Gaia
const ProtocolVersion = 2
// ProtocolType is the type used to communicate.
const ProtocolType = "grpc"
// List domain (usually localhost)
const listenIP = "localhost:"
// env variable key names for TLS cert path
const serverCertEnv = "GAIA_PLUGIN_CERT"
const serverKeyEnv = "GAIA_PLUGIN_KEY"
const rootCACertEnv = "GAIA_PLUGIN_CA_CERT"
var (
// ErrorJobNotFound is returned when a given job id was not found
// locally.
ErrorJobNotFound = errors.New("job not found in plugin")
// ErrorExitPipeline is used to safely exit the pipeline (actually not an error).
// Prevents the pipeline to be marked as 'failed'.
ErrorExitPipeline = errors.New("pipeline exit requested by job")
// ErrorDuplicateJob is returned when two jobs have the same title which is restricted.
ErrorDuplicateJob = errors.New("duplicate job found (two jobs with same title)")
// errCertNotAppended is thrown when the root CA cert cannot be appended to the pool.
errCertNotAppended = errors.New("cannot append root CA cert to cert pool")
)
// CachedJobs holds a list of JobsWrapper for later processing
var cachedJobs []jobsWrapper
// GRPCServer is the plugin gRPC implementation.
type GRPCServer struct{}
// GetJobs streams all given jobs back.
func (GRPCServer) GetJobs(empty *proto.Empty, stream proto.Plugin_GetJobsServer) error {
for _, job := range cachedJobs {
err := stream.Send(&job.job)
if err != nil {
return err
}
}
return nil
}
// ExecuteJob receives a job and executes it.
// Returns a JobResult object which gives information about job execution.
func (GRPCServer) ExecuteJob(ctx context.Context, j *proto.Job) (*proto.JobResult, error) {
job := getJob((*j).UniqueId)
if job == nil {
return nil, ErrorJobNotFound
}
// transform arguments
args := Arguments{}
for _, arg := range j.GetArgs() {
a := Argument{
Key: arg.Key,
Value: arg.Value,
}
args = append(args, a)
}
// Execute Job
err := job.funcPointer(args)
// Generate result object only when we got an error
r := &proto.JobResult{}
if err != nil {
// Check if job wants to force exit pipeline.
// We will exit the pipeline but not mark as 'failed'.
if err != ErrorExitPipeline {
// We got an error. Pipeline is now marked as 'failed'.
r.Failed = true
}
// Set log message and job id
r.ExitPipeline = true
r.Message = err.Error()
r.UniqueId = job.job.UniqueId
}
return r, nil
}
// Serve initiates the gRPC Server and listens.
// This method should be last called in the plugin main function.
func Serve(j Jobs) error {
// Cache the jobs list for later processing.
// We first have to translate given jobs to different structure.
cachedJobs = []jobsWrapper{}
for _, job := range j {
// Manual interaction
var ma proto.ManualInteraction
if job.Interaction != nil {
ma = proto.ManualInteraction{
Description: job.Interaction.Description,
Type: job.Interaction.Type.String(),
Value: job.Interaction.Value,
}
}
// Arguments
args := []*proto.Argument{}
if job.Args != nil {
for _, arg := range job.Args {
a := &proto.Argument{
Description: arg.Description,
Type: arg.Type.String(),
Key: arg.Key,
Value: arg.Value,
}
args = append(args, a)
}
}
// Create proto jobs object
p := proto.Job{
UniqueId: hash(job.Title),
Title: job.Title,
Description: job.Description,
Args: args,
Interaction: &ma,
}
// Resolve dependencies
if job.DependsOn != nil {
p.Dependson = []uint32{}
for _, depJob := range job.DependsOn {
var foundDep bool
for _, currJob := range j {
if strings.Compare(strings.ToLower(currJob.Title), strings.ToLower(depJob)) == 0 {
p.Dependson = append(p.Dependson, hash(currJob.Title))
foundDep = true
break
}
}
if !foundDep {
return errors.Errorf("job '%s' has dependency '%s' which is not declared", job.Title, depJob)
}
}
}
// Create jobs wrapper object
w := jobsWrapper{
funcPointer: job.Handler,
job: p,
}
cachedJobs = append(cachedJobs, w)
}
// Check if two jobs have the same title which is restricted
for x, job := range cachedJobs {
for y, innerJob := range cachedJobs {
if x != y && job.job.UniqueId == innerJob.job.UniqueId {
return ErrorDuplicateJob
}
}
}
// Get certificates path from environment variables
certPath := os.Getenv(serverCertEnv)
keyPath := os.Getenv(serverKeyEnv)
caCertPath := os.Getenv(rootCACertEnv)
// Check if all certs are available
if _, err := os.Stat(certPath); os.IsNotExist(err) {
return errors.Wrap(err, "cannot find path to certificate")
}
if _, err := os.Stat(keyPath); os.IsNotExist(err) {
return errors.Wrap(err, "cannot find path to key")
}
if _, err := os.Stat(caCertPath); os.IsNotExist(err) {
return errors.Wrap(err, "cannot find path to root CA certificate")
}
// implement health service
health := health.NewServer()
health.SetServingStatus("plugin", healthpb.HealthCheckResponse_SERVING)
// Generate TLS config
tlsConfig, err := generateTLSConfig(certPath, keyPath, caCertPath)
if err != nil {
return errors.Wrap(err, "cannot create TLS config")
}
// Create new gRPC server and register services
s := grpc.NewServer(grpc.Creds(credentials.NewTLS(tlsConfig)))
proto.RegisterPluginServer(s, &GRPCServer{})
healthpb.RegisterHealthServer(s, health)
// Register reflection service on gRPC server
reflection.Register(s)
// Create TCP Server
lis, err := net.Listen("tcp", listenIP)
if err != nil {
return errors.Wrap(err, "cannot start tcp server")
}
// Output the address and service name to stdout.
// hashicorp go-plugin will use that to establish connection.
fmt.Printf("%d|%d|%s|%s|%s\n",
coreProtocolVersion,
ProtocolVersion,
lis.Addr().Network(),
lis.Addr().String(),
ProtocolType)
os.Stdout.Sync()
// Listen
if err := s.Serve(lis); err != nil {
return errors.Wrap(err, "cannot start grpc server")
}
return nil
}
// hash hashes the given string.
func hash(s string) uint32 {
h := fnv.New32a()
h.Write([]byte(s))
return h.Sum32()
}