Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[python] add support for SHi and CMDi tests for python #3791

Merged
merged 4 commits into from
Jan 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions manifests/python.yml
Original file line number Diff line number Diff line change
Expand Up @@ -245,17 +245,17 @@ tests/:
TestURI: missing_feature
rasp/:
test_cmdi.py:
Test_Cmdi_BodyJson: missing_feature
Test_Cmdi_BodyUrlEncoded: missing_feature
Test_Cmdi_BodyXml: missing_feature
Test_Cmdi_Capability: missing_feature
Test_Cmdi_Mandatory_SpanTags: missing_feature
Test_Cmdi_Optional_SpanTags: missing_feature
Test_Cmdi_BodyJson: v2.20.0.dev
Test_Cmdi_BodyUrlEncoded: v2.20.0.dev
Test_Cmdi_BodyXml: v2.20.0.dev
Test_Cmdi_Capability: v2.20.0.dev
Test_Cmdi_Mandatory_SpanTags: v2.20.0.dev
Test_Cmdi_Optional_SpanTags: v2.20.0.dev
Test_Cmdi_Rules_Version: v2.18.0.dev
Test_Cmdi_StackTrace: missing_feature
Test_Cmdi_Telemetry: missing_feature
Test_Cmdi_Telemetry_Variant_Tag: missing_feature
Test_Cmdi_UrlQuery: missing_feature
Test_Cmdi_StackTrace: v2.20.0.dev
Test_Cmdi_Telemetry: v2.20.0.dev
Test_Cmdi_Telemetry_Variant_Tag: v2.20.0.dev
Test_Cmdi_UrlQuery: v2.20.0.dev
Test_Cmdi_Waf_Version: v2.18.0.dev
test_lfi.py:
Test_Lfi_BodyJson: v2.10.0
Expand All @@ -280,7 +280,7 @@ tests/:
Test_Shi_Rules_Version: v2.15.0
Test_Shi_StackTrace: v2.11.0-rc2
Test_Shi_Telemetry: v2.11.0-rc2
Test_Shi_Telemetry_Variant_Tag: missing_feature
Test_Shi_Telemetry_Variant_Tag: v2.20.0.dev
Test_Shi_UrlQuery: v2.11.0-rc2
Test_Shi_Waf_Version: v2.15.0
test_sqli.py:
Expand Down
27 changes: 27 additions & 0 deletions utils/build/docker/python/django/app/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,32 @@ def rasp_shi(request, *args, **kwargs):
return HttpResponse(f"Shell command failure: {e!r}", status=201)


@csrf_exempt
def rasp_cmdi(request, *args, **kwargs):
cmd = None
if request.method == "GET":
cmd = request.GET.get("command")
elif request.method == "POST":
try:
cmd = (request.POST or json.loads(request.body)).get("command")
except Exception as e:
print(repr(e), file=sys.stderr)
try:
if cmd is None:
cmd = xmltodict.parse(request.body).get("command").get("cmd")
except Exception as e:
print(repr(e), file=sys.stderr)
pass

if cmd is None:
return HttpResponse("missing command parameter", status=400)
try:
res = subprocess.run(cmd, capture_output=True)
return HttpResponse(f"Exec command [{cmd}] with result: {res}")
except Exception as e:
return HttpResponse(f"Shell command [{cmd}] failure: {e!r}", status=201)


### END EXPLOIT PREVENTION


Expand Down Expand Up @@ -853,6 +879,7 @@ def s3_multipart_upload(request):
path("returnheaders", return_headers),
path("returnheaders/", return_headers),
path("set_cookie", set_cookie),
path("rasp/cmdi", rasp_cmdi),
path("rasp/lfi", rasp_lfi),
path("rasp/shi", rasp_shi),
path("rasp/sqli", rasp_sqli),
Expand Down
28 changes: 28 additions & 0 deletions utils/build/docker/python/fastapi/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,34 @@ async def rasp_shi(request: Request):
return PlainTextResponse(f"Shell command failure: {e!r}", status_code=201)


@app.get("/rasp/cmdi")
@app.post("/rasp/cmdi")
async def rasp_cmdi(request: Request):
cmd = None
if request.method == "GET":
cmd = request.query_params.get("command")
elif request.method == "POST":
body = await request.body()
try:
cmd = ((await request.form()) or json.loads(body) or {}).get("command")
except Exception as e:
print(repr(e), file=sys.stderr)
try:
if cmd is None:
cmd = xmltodict.parse(body).get("command").get("cmd")
except Exception as e:
print(repr(e), file=sys.stderr)
pass

if cmd is None:
return PlainTextResponse("missing command parameter", status_code=400)
try:
res = subprocess.run(cmd, capture_output=True)
return PlainTextResponse(f"Exec command [{cmd}] with result: {res}")
except Exception as e:
return PlainTextResponse(f"Exec command [{cmd}] failure: {e!r}", status_code=201)


### END EXPLOIT PREVENTION


Expand Down
28 changes: 27 additions & 1 deletion utils/build/docker/python/flask/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -345,7 +345,7 @@ def rasp_shi(*args, **kwargs):
pass

if list_dir is None:
return "missing user_id parameter", 400
return "missing list_dir parameter", 400
try:
command = f"ls {list_dir}"
res = os.system(command)
Expand All @@ -355,6 +355,32 @@ def rasp_shi(*args, **kwargs):
return f"Shell command failure: {e!r}", 201


@app.route("/rasp/cmdi", methods=["GET", "POST"])
def rasp_cmdi(*args, **kwargs):
cmd = None
if request.method == "GET":
cmd = flask_request.args.get("command")
elif request.method == "POST":
try:
cmd = (request.form or request.json or {}).get("command")
except Exception as e:
print(repr(e), file=sys.stderr)
try:
if cmd is None:
cmd = xmltodict.parse(flask_request.data).get("command").get("cmd")
except Exception as e:
print(repr(e), file=sys.stderr)
pass

if cmd is None:
return "missing cmd parameter", 400
try:
res = subprocess.run(cmd, capture_output=True)
return f"Exec command [{cmd}] with result: [{res.returncode}]: {res.stdout}", 200
except Exception as e:
return f"Exec command [{cmd}] yfailure: {e!r}", 201


### END EXPLOIT PREVENTION


Expand Down
2 changes: 1 addition & 1 deletion utils/interfaces/_library/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ def validator(_, appsec_data):

trigger = triggers[0]
obtained_rule_id = trigger["rule"]["id"]
assert obtained_rule_id == rule, f"incorrect rule id, expected {rule}"
assert obtained_rule_id == rule, f"incorrect rule id, expected {rule}, got {obtained_rule_id}"

if parameters is not None:
rule_matches = trigger["rule_matches"]
Expand Down
Loading