-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathcoprocess_python_test.go
230 lines (202 loc) · 6.81 KB
/
coprocess_python_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
// +build coprocess
// +build python
package main
import (
"bytes"
"mime/multipart"
"testing"
"time"
"github.com/TykTechnologies/tyk/config"
"github.com/TykTechnologies/tyk/test"
"github.com/TykTechnologies/tyk/user"
)
var pythonBundleWithAuthCheck = map[string]string{
"manifest.json": `
{
"file_list": [
"middleware.py"
],
"custom_middleware": {
"driver": "python",
"auth_check": {
"name": "MyAuthHook"
}
}
}
`,
"middleware.py": `
from tyk.decorators import *
from gateway import TykGateway as tyk
@Hook
def MyAuthHook(request, session, metadata, spec):
auth_header = request.get_header('Authorization')
if auth_header == 'valid_token':
session.rate = 1000.0
session.per = 1.0
metadata["token"] = "valid_token"
return request, session, metadata
`,
}
var pythonBundleWithPostHook = map[string]string{
"manifest.json": `
{
"file_list": [
"middleware.py"
],
"custom_middleware": {
"driver": "python",
"post": [{
"name": "MyPostHook"
}]
}
}
`,
"middleware.py": `
from tyk.decorators import *
from gateway import TykGateway as tyk
import json
@Hook
def MyPostHook(request, session, spec):
if "testkey" not in session.metadata.keys():
request.object.return_overrides.response_code = 400
request.object.return_overrides.response_error = "'testkey' not found in metadata"
return request, session
nested_data = json.loads(session.metadata["testkey"])
if "nestedkey" not in nested_data:
request.object.return_overrides.response_code = 400
request.object.return_overrides.response_error = "'nestedkey' not found in nested metadata"
return request, session
if "stringkey" not in session.metadata.keys():
request.object.return_overrides.response_code = 400
request.object.return_overrides.response_error = "'stringkey' not found in metadata"
return request, session
stringkey = session.metadata["stringkey"]
if stringkey != "testvalue":
request.object.return_overrides.response_code = 400
request.object.return_overrides.response_error = "'stringkey' value doesn't match"
return request, session
return request, session
`,
}
var pythonBundleWithPreHook = map[string]string{
"manifest.json": `
{
"file_list": [
"middleware.py"
],
"custom_middleware": {
"driver": "python",
"pre": [{
"name": "MyPreHook"
}]
}
}
`,
"middleware.py": `
from tyk.decorators import *
from gateway import TykGateway as tyk
@Hook
def MyPreHook(request, session, metadata, spec):
content_type = request.get_header("Content-Type")
if "json" in content_type:
if len(request.object.raw_body) <= 0:
request.object.return_overrides.response_code = 400
request.object.return_overrides.response_error = "Raw body field is empty"
return request, session, metadata
if "{}" not in request.object.body:
request.object.return_overrides.response_code = 400
request.object.return_overrides.response_error = "Body field doesn't match"
return request, session, metadata
if "multipart" in content_type:
if len(request.object.body) != 0:
request.object.return_overrides.response_code = 400
request.object.return_overrides.response_error = "Body field isn't empty"
if len(request.object.raw_body) <= 0:
request.object.return_overrides.response_code = 400
request.object.return_overrides.response_error = "Raw body field is empty"
return request, session, metadata
`,
}
func TestPythonBundles(t *testing.T) {
ts := newTykTestServer(tykTestServerConfig{
coprocessConfig: config.CoProcessConfig{
EnableCoProcess: true,
}})
defer ts.Close()
authCheckBundle := registerBundle("python_with_auth_check", pythonBundleWithAuthCheck)
postHookBundle := registerBundle("python_with_post_hook", pythonBundleWithPostHook)
preHookBundle := registerBundle("python_with_pre_hook", pythonBundleWithPreHook)
t.Run("Single-file bundle with authentication hook", func(t *testing.T) {
buildAndLoadAPI(func(spec *APISpec) {
spec.Proxy.ListenPath = "/test-api/"
spec.UseKeylessAccess = false
spec.EnableCoProcessAuth = true
spec.CustomMiddlewareBundle = authCheckBundle
spec.VersionData.NotVersioned = true
})
time.Sleep(1 * time.Second)
validAuth := map[string]string{"Authorization": "valid_token"}
invalidAuth := map[string]string{"Authorization": "invalid_token"}
ts.Run(t, []test.TestCase{
{Path: "/test-api/", Code: 200, Headers: validAuth},
{Path: "/test-api/", Code: 403, Headers: invalidAuth},
}...)
})
t.Run("Single-file bundle with post hook", func(t *testing.T) {
keyID := createSession(func(s *user.SessionState) {
s.MetaData = map[string]interface{}{
"testkey": map[string]interface{}{"nestedkey": "nestedvalue"},
"stringkey": "testvalue",
}
})
buildAndLoadAPI(func(spec *APISpec) {
spec.Proxy.ListenPath = "/test-api-2/"
spec.UseKeylessAccess = false
spec.EnableCoProcessAuth = false
spec.CustomMiddlewareBundle = postHookBundle
spec.VersionData.NotVersioned = true
})
time.Sleep(1 * time.Second)
auth := map[string]string{"Authorization": keyID}
ts.Run(t, []test.TestCase{
{Path: "/test-api-2/", Code: 200, Headers: auth},
}...)
})
t.Run("Single-file bundle with pre hook and UTF-8/non-UTF-8 request data", func(t *testing.T) {
buildAndLoadAPI(func(spec *APISpec) {
spec.Proxy.ListenPath = "/test-api-2/"
spec.UseKeylessAccess = true
spec.EnableCoProcessAuth = false
spec.CustomMiddlewareBundle = preHookBundle
spec.VersionData.NotVersioned = true
})
time.Sleep(1 * time.Second)
fileData := generateTestBinaryData()
var buf bytes.Buffer
multipartWriter := multipart.NewWriter(&buf)
file, err := multipartWriter.CreateFormFile("file", "test.bin")
if err != nil {
t.Fatalf("Couldn't use multipart writer: %s", err.Error())
}
_, err = fileData.WriteTo(file)
if err != nil {
t.Fatalf("Couldn't write to multipart file: %s", err.Error())
}
field, err := multipartWriter.CreateFormField("testfield")
if err != nil {
t.Fatalf("Couldn't use multipart writer: %s", err.Error())
}
_, err = field.Write([]byte("testvalue"))
if err != nil {
t.Fatalf("Couldn't write to form field: %s", err.Error())
}
err = multipartWriter.Close()
if err != nil {
t.Fatalf("Couldn't close multipart writer: %s", err.Error())
}
ts.Run(t, []test.TestCase{
{Path: "/test-api-2/", Code: 200, Data: &buf, Headers: map[string]string{"Content-Type": multipartWriter.FormDataContentType()}},
{Path: "/test-api-2/", Code: 200, Data: "{}", Headers: map[string]string{"Content-Type": "application/json"}},
}...)
})
}