diff --git a/src/dioptra/restapi/v1/plugins/schema.py b/src/dioptra/restapi/v1/plugins/schema.py index 3eacc964a..6bf41baa7 100644 --- a/src/dioptra/restapi/v1/plugins/schema.py +++ b/src/dioptra/restapi/v1/plugins/schema.py @@ -32,7 +32,7 @@ ) ALLOWED_PLUGIN_NAME_REGEX = re.compile(r"^([A-Z]|[A-Z_][A-Z0-9_]+)$", flags=re.IGNORECASE) # noqa: B950; fmt: skip -ALLOWED_PLUGIN_FILENAME_REGEX = re.compile(r"^([A-Za-z]|[A-Za-z_][A-Za-z0-9_]+)\.py$") +ALLOWED_PLUGIN_FILENAME_REGEX = re.compile(r"^([A-Za-z]|[A-Za-z_][A-Za-z0-9_]*(/[A-Za-z_][A-Za-z0-9_]*)*)\.py$") # noqa: B950; fmt: skip ALLOWED_PLUGIN_TASK_REGEX = re.compile(r"^([A-Z]|[A-Z_][A-Z0-9_]+)$", flags=re.IGNORECASE) # noqa: B950; fmt: skip ALLOWED_PLUGIN_TASK_PARAMETER_REGEX = re.compile(r"^([A-Z]|[A-Z_][A-Z0-9_]+)$", flags=re.IGNORECASE) # noqa: B950; fmt: skip diff --git a/tests/unit/restapi/v1/conftest.py b/tests/unit/restapi/v1/conftest.py index 6363ee273..f7fd53086 100644 --- a/tests/unit/restapi/v1/conftest.py +++ b/tests/unit/restapi/v1/conftest.py @@ -328,7 +328,7 @@ def hello_world(name: str) -> str: plugin_file3_response = actions.register_plugin_file( client, plugin_id=plugin_response["id"], - filename="plugin_file_three.py", + filename="path/to/plugin_file_three.py", description="Not Retrieved.", contents=contents, ).get_json() diff --git a/tests/unit/restapi/v1/test_plugin.py b/tests/unit/restapi/v1/test_plugin.py index f69017afd..9a0ae50b3 100644 --- a/tests/unit/restapi/v1/test_plugin.py +++ b/tests/unit/restapi/v1/test_plugin.py @@ -524,12 +524,13 @@ def assert_retrieving_plugin_files_works( assert response.status_code == 200 and response.get_json()["data"] == expected -def assert_registering_existing_plugin_filename_fails( +def assert_registering_plugin_file_fails( client: FlaskClient, plugin_id: int, - existing_filename: str, + filename: str, contents: str, description: str, + tasks: list[dict[str, Any]] | None = None ) -> None: """Assert that registering a plugin file with an existing name fails. @@ -545,9 +546,10 @@ def assert_registering_existing_plugin_filename_fails( response = actions.register_plugin_file( client, plugin_id=plugin_id, - filename=existing_filename, + filename=filename, contents=contents, description=description, + tasks=tasks, ) assert response.status_code == 400 @@ -993,6 +995,55 @@ def hello_world(given_name: str, family_name: str) -> str: expected=plugin_file_expected, ) + valid_files = ["file.py", "_file.py", "f_ile.py", "file_.py", "_file_.py", "_f_ile_.py", "_.py"] # noqa: B950; fmt: skip + single_letter_case = [letter+file for letter in "aA_" for file in valid_files] # noqa: B950; fmt: skip + valid_paths = ["path/", "_path/", "p_ath/", "path_/", "_path_/", "_p_ath_/", "pa/th/", "path0/", "p0ath/", "p/a/th/"] # noqa: B950; fmt: skip + valid_directories = [path+file for path in valid_paths for file in valid_files] # noqa: B950; fmt: skip + valid = single_letter_case + valid_directories + for valid_directory in valid: + plugin_file_response = actions.register_plugin_file( + client, + plugin_id=registered_plugin["id"], + filename=valid_directory, + description=description, + contents=contents, + tasks=tasks, + ) + plugin_file_expected = plugin_file_response.get_json() + assert_plugin_file_response_contents_matches_expectations( + response=plugin_file_expected, + expected_contents={ + "filename": valid_directory, + "description": description, + "contents": contents, + "user_id": user_id, + "group_id": registered_plugin["group"]["id"], + "tasks": expected_tasks, + }, + ) + assert_retrieving_plugin_file_by_id_works( + client, + plugin_id=registered_plugin["id"], + plugin_file_id=plugin_file_expected["id"], + expected=plugin_file_expected, + ) + + invalid_paths = ["0/", "/path/", "path ", "@/", "/path", "/"] # noqa: B950; fmt: skip + invalid_files = ["file.p", "file.c", "filepy", "0file.py"] # noqa: B950; fmt: skip + invalid_paths_with_valid_files = [path+file for path in invalid_paths for file in valid_files] # noqa: B950; fmt: skip + valid_paths_with_invalid_files = [path+file for path in valid_paths for file in invalid_files] # noqa: B950; fmt: skip + invalid_paths_with_invalid_files = [path+file for path in invalid_paths for file in invalid_files] # noqa: B950; fmt: skip + invalid = invalid_paths_with_valid_files + valid_paths_with_invalid_files + invalid_paths_with_invalid_files # noqa: B950; fmt: skip + for invalid_directory in invalid: + assert_registering_plugin_file_fails( + client, + plugin_id=registered_plugin["id"], + filename=invalid_directory, + contents=contents, + description=description, + tasks=tasks, + ) + def test_plugin_file_get_all( client: FlaskClient, @@ -1073,10 +1124,10 @@ def test_cannot_register_existing_plugin_filename( registered_plugin = registered_plugin_with_files["plugin"] existing_plugin_file = registered_plugin_with_files["plugin_file1"] - assert_registering_existing_plugin_filename_fails( + assert_registering_plugin_file_fails( client, plugin_id=registered_plugin["id"], - existing_filename=existing_plugin_file["filename"], + filename=existing_plugin_file["filename"], contents=existing_plugin_file["contents"], description=existing_plugin_file["description"], ) @@ -1815,4 +1866,4 @@ def test_tag_plugin_file( ) response = actions.get_tags( client, resource_route=resource_route, resource_id=plugin_file["id"] - ) + ) \ No newline at end of file