From ba0b329f433f40445d683f3060e8967cc92d968c Mon Sep 17 00:00:00 2001 From: Shiva Prasanth Date: Sat, 6 Mar 2021 17:57:00 +0530 Subject: [PATCH] release 0.0.7 (#30) run target even if other targets properties are unresolved postman import support other methods "TRACE" , "COPY" , "LINK" , "UNLINK", "PURGE" , "LOCK" , "UNLOCK", "PROPFIND" , "VIEW" magic python package is not mandated json variables for json doc ```http json({ {{x}} : 'ram' }} ``` earlier variable x always have to be string. now its accepted --- README.md | 5 +- TASKS.md | 16 +- dotextensions/server.py | 5 +- dotextensions/server/commands.py | 218 +++- dotextensions/server/postman/__init__.py | 1244 +++++++++++++++++++++ dotextensions/server/server.py | 6 +- dotextensions/server/swagger/command.py | 23 + dotextensions/server/swagger/swagger_2.py | 359 ++++++ dotextensions/server/swagger/swagger_3.py | 359 ++++++ dothttp/__init__.py | 183 +-- dothttp/dsl_jsonparser.py | 44 +- dothttp/http.tx | 16 +- dothttp/parse_models.py | 16 +- examples/example.http | 5 +- setup.py | 4 +- test/core/multi/multi.http | 77 ++ test/core/test_multidef_http.py | 62 + test/core/test_request.py | 33 +- test/core/test_substition.py | 18 +- 19 files changed, 2566 insertions(+), 127 deletions(-) create mode 100644 dotextensions/server/postman/__init__.py create mode 100644 dotextensions/server/swagger/command.py create mode 100644 dotextensions/server/swagger/swagger_2.py create mode 100644 dotextensions/server/swagger/swagger_3.py create mode 100644 test/core/multi/multi.http create mode 100644 test/core/test_multidef_http.py diff --git a/README.md b/README.md index 9b7821b..3962733 100644 --- a/README.md +++ b/README.md @@ -32,7 +32,7 @@ Go through this example for better understanding. for babysteps click [here](#fi # makes are get request, with url `https://req.dothttp.dev/user` GET https://req.dothttp.dev/user -# below is an header example, if api_key is not defined, it will be defaulted to `121245454125454121245451` +# below is an header example "Authorization": "Basic dXNlcm5hbWU6cGFzc3dvcmQ=" # below is how you set url params '?' --> signifies url quary param @@ -85,7 +85,7 @@ data({ ### From pypi ```shell -pip install dothttp-req==0.0.5 +pip install dothttp-req==0.0.7 ``` ### From source @@ -93,6 +93,7 @@ pip install dothttp-req==0.0.5 ```shell git clone git@github.com:cedric05/dothttp.git cd dothttp + python3 -m pip install pipenv pipenv install ``` diff --git a/TASKS.md b/TASKS.md index d1fd752..32abde5 100644 --- a/TASKS.md +++ b/TASKS.md @@ -39,18 +39,18 @@ - [x] Better message when connection refused issues - [x] standardise dependency versions - [x] curl print multiline -- [ ] use sub commands for running server - - [ ] [option](https://stackoverflow.com/a/19476216) +- [x] ~~use sub commands for running server~~ (dotextensions if future could be moved to different repository, so don't + have to) - [ ] syntax issues better error reporting - [ ] if it failed to parse certain text, show examples on how to fix it -- [ ] history ? -- [ ] multiple logging options ? -- [ ] output coloring (according to content-type) ? -- [ ] output should have rolling output format ? +- [x] ~~history ?~~ (like curl/httpie it will not remember history) (support will be provided by vscode-extension) +- [x] ~~multiple logging options ?~~ +- [x] ~~output coloring (according to content-type) ?~~ +- [x] ~~output should have rolling output format ?~~ - [ ] curl import - [ ] swagger to http file generator -- [ ] postman import +- [x] postman import # BUGS -- [x] list query parameters are not working ?ram=ranga&ram=rajesh +- [x] ~~list query parameters are not working ?ram=ranga&ram=rajesh~~ diff --git a/dotextensions/server.py b/dotextensions/server.py index d64547e..56b6f27 100644 --- a/dotextensions/server.py +++ b/dotextensions/server.py @@ -6,9 +6,6 @@ from json import JSONDecodeError from typing import Dict -from flask import Flask -from flask import request - from dothttp import RequestCompiler, Config, DotHttpException from dothttp.log_utils import setup_logging as root_logging_setup @@ -106,6 +103,7 @@ def get_command(self, **kwargs): class HttpServer(Base): def __init__(self, port=5000): + from flask import Flask app = Flask("dothttp-server") self.app = app self.port = port @@ -117,6 +115,7 @@ def run_forever(self): self.app.run("localhost", self.port) def get_handler(self, handler): + from flask import request def flask_api_handler(): try: id = int(request.args['id']) diff --git a/dotextensions/server/commands.py b/dotextensions/server/commands.py index 101a39d..b247570 100644 --- a/dotextensions/server/commands.py +++ b/dotextensions/server/commands.py @@ -1,7 +1,22 @@ +import json import mimetypes +import os +from collections import defaultdict +from datetime import datetime +from pathlib import Path +from typing import List, Iterator, Union +from urllib.parse import unquote -from dothttp import RequestCompiler, Config, DotHttpException, dothttp_model, CurlCompiler +import requests +from requests import PreparedRequest + +from dothttp import RequestCompiler, Config, DotHttpException, dothttp_model, CurlCompiler, HttpFileFormatter +from dothttp.parse_models import Http, Allhttp, UrlWrap, BasicAuth, Payload, MultiPartFile, FilesWrap, Query, Header, \ + NameWrap, Line from . import Command, Result, BaseHandler +from .postman import postman_collection_from_dict, Items, URLClass + +DEFAULT_URL = "https://req.dothttp.dev/" class RunHttpFileHandler(BaseHandler): @@ -31,21 +46,51 @@ def run(self, command: Command) -> Result: } }) else: - request = RequestCompiler(config) - resp = request.get_response() + comp = RequestCompiler(config) + request = comp.get_request() + resp = comp.get_response() + response_data = { + "response": { + "headers": + {key: value for key, value in resp.headers.items()}, + "body": resp.text, # for binary out, it will fail, check for alternatives + "status": resp.status_code, } + } + # will be used for response + request_data = {"request": { + "url": request.url, + "body": request.body, + "headers": request.headers, + "method": request.method, + }} + data = {} + data.update(response_data['response']) # deprecated + # data.update(request_data) + data.update(response_data) + # data.update({"http": self.get_http_from_req(request)}) result = Result(id=command.id, - result={ - "headers": - {key: value for key, value in resp.headers.items()}, - "body": resp.text, - "status": resp.status_code, - }) + result=data) except DotHttpException as ex: result = Result(id=command.id, result={ "error_message": ex.message, "error": True}) return result + @staticmethod + def get_http_from_req(request: PreparedRequest): + return HttpFileFormatter.format(Allhttp(allhttps=[Http( + namewrap=None, + urlwrap=UrlWrap(url=request.url, method=request.method), + lines=[ + Line(header=Header(key=key, value=value), query=None) + for key, value in + request.headers.items()], + payload=Payload(data=request.body, datajson=None, + file=None, json=None, fileswrap=None, + type=None), + output=None, basic_auth_wrap=None + )])) + class FormatHttpFileHandler(BaseHandler): method = "/file/format" @@ -96,3 +141,158 @@ def run(self, command: Command) -> Result: result={ "error_message": str(e), "error": True}) return result + + +def slashed_path_to_normal_path(path): + return path + + +class ImportPostmanCollection(BaseHandler): + name = "/import/postman" + + def get_method(self): + return ImportPostmanCollection.name + + @staticmethod + def import_requests_into_dire(items: Iterator[Items], directory: Path, link: str): + collection = Allhttp(allhttps=[]) + for leaf_item in items: + onehttp = ImportPostmanCollection.import_leaf_item(leaf_item) + if onehttp: + collection.allhttps.append(onehttp) + d = {} + if len(collection.allhttps) != 0: + data = HttpFileFormatter.format(collection) + directory.mkdir(parents=True, exist_ok=True) + name = str(directory.joinpath("imported_from_collection.http")) + newline = "\n" + d[name] = f"#!/usr/bin/env dothttp{newline}{newline}" \ + f"# imported from {link}{newline}{newline}" \ + f"{data}\n" + return d + + @staticmethod + def import_leaf_item(item: Items) -> Union[Http, None]: + if not item.request: + return None + # currently comments are not supported + # so ignoring it for now item.description + req = item.request + namewrap = NameWrap(item.name) + urlwrap = UrlWrap(url="https://", method=req.method) + lines = [] + payload = None + basicauthwrap = None + + if isinstance(req.url, URLClass): + host = ".".join(req.url.host) + proto = req.url.protocol + path = "/".join(req.url.path) + url = f"{proto}://{host}/{path}" + urlwrap.url = slashed_path_to_normal_path(url) + for query in req.url.query: + lines.append(Line(query=Query(query.key, unquote(query.value)), header=None)) + else: + urlwrap.url = slashed_path_to_normal_path(req.url) + # if urlwrap.url == "": + # urlwrap.url = DEFAULT_URL + if req.header: + for header in req.header: + lines.append( + Line(header=Header(key=header.key, value=slashed_path_to_normal_path(header.value)), query=None)) + if req.auth and (basic_auth := req.auth.basic): + # TODO don't add creds to http file directly + # add .dothttp.json file + basicauthwrap = BasicAuth(username=basic_auth['username'], password=basic_auth['password']) + if req.body: + # use mode rather than None check + mode = req.body.mode + optins = req.body.options + payload_fileswrap = None + payload_file = None + payload_data = None + payload_json = None + payload_datajson = None + payload_type = None + if formdata := req.body.formdata: + files = [] + for one_form in formdata: + # TODO value can be list + if one_form.type == 'file': + # TODO according to type, you would want to normalize path + files.append(MultiPartFile(one_form.key, slashed_path_to_normal_path(one_form.src), + one_form.content_type)) + else: + files.append(MultiPartFile(one_form.key, one_form.value, one_form.content_type)) + payload_fileswrap = FilesWrap(files) + elif filebody := req.body.file: + # TODO file is back slash escaped + payload_file = slashed_path_to_normal_path(filebody.src) + elif rawbody := req.body.raw: + payload_data = rawbody + if optins and 'raw' in optins and 'language' in optins.get('raw'): + if optins['raw']['language']: + payload_json = json.loads(rawbody) + payload_data = None + elif urlencoded_body := req.body.urlencoded: + encodedbody = defaultdict(default_factory=lambda: []) + for one_form_field in urlencoded_body: + encodedbody[one_form_field.key].append(one_form_field.value) + payload_datajson = encodedbody + elif req.body.graphql: + # TODO currently not supported + pass + payload = Payload(datajson=payload_datajson, data=payload_data, fileswrap=payload_fileswrap, + json=payload_json, file=payload_file, type=payload_type) + http = Http(namewrap=namewrap, urlwrap=urlwrap, payload=payload, lines=lines, basic_auth_wrap=basicauthwrap, + output=None) + return http + + def run(self, command: Command) -> Result: + # params + link: str = command.params.get("link") + directory: str = command.params.get("directory", "") + save = command.params.get("save", False) + overwrite = command.params.get("overwrite", False) + + # input validations + if save: + if not os.path.isdir(directory): + return Result(id=command.id, result={"error_message": "non existent directory", "error": True}) + if not os.access(directory, os.X_OK | os.W_OK): + return Result(id=command.id, + result={"error_message": "insufficient permissions", "error": True}) + if not os.path.isabs(directory): + return Result(id=command.id, + result={"error_message": "expects absolute path, as server is meant to run in background", + "error": True}) + # if not (link.startswith("https://www.postman.com/collections/") or link.startswith( + # "https://www.getpostman.com/collections")): + # return Result(id=command.id, result={"error_message": "not a postman link", "error": True}) + + resp = requests.get(link) + postman_data = resp.json() + if not ("info" in postman_data and 'schema' in postman_data['info'] and postman_data['info'][ + 'schema'] == 'https://schema.getpostman.com/json/collection/v2.0.0/collection.json'): + return Result(id=command.id, result={"error_message": "unsupported postman collection", "error": True}) + + collection = postman_collection_from_dict(postman_data) + d = self.import_items(collection.item, Path(directory).joinpath(collection.info.name), link) + if save: + for path, fileout in d.items(): + if os.path.exists(path) and not overwrite: + p = Path(path) + path = p.with_stem(p.stem + '-' + datetime.now().ctime()) + with open(path, 'w') as f: + f.write(fileout) + return Result(id=command.id, result={"files": d}) + + @staticmethod + def import_items(items: List[Items], directory: Path, link: str = ""): + leaf_folder = filter(lambda item: item.request, items) + d = dict() + d.update(ImportPostmanCollection.import_requests_into_dire(leaf_folder, directory, link)) + folder = map(lambda item: (item.name, item.item), filter(lambda item: item.item, items)) + for sub_folder, subitem in folder: + d.update(ImportPostmanCollection.import_items(subitem, directory.joinpath(sub_folder), link)) + return d diff --git a/dotextensions/server/postman/__init__.py b/dotextensions/server/postman/__init__.py new file mode 100644 index 0000000..6717ef6 --- /dev/null +++ b/dotextensions/server/postman/__init__.py @@ -0,0 +1,1244 @@ +from enum import Enum +from typing import Optional, Dict, Any, Union, List, TypeVar, Callable, Type, cast + +T = TypeVar("T") +EnumT = TypeVar("EnumT", bound=Enum) + + +def from_dict(f: Callable[[Any], T], x: Any) -> Dict[str, T]: + assert isinstance(x, dict) + return {k: f(v) for (k, v) in x.items()} + + +def from_none(x: Any) -> Any: + assert x is None + return x + + +def from_union(fs, x): + for f in fs: + try: + return f(x) + except: + pass + assert False + + +def to_enum(c: Type[EnumT], x: Any) -> EnumT: + assert isinstance(x, c) + return x.value + + +def from_str(x: Any) -> str: + assert isinstance(x, str) + return x + + +def from_bool(x: Any) -> bool: + assert isinstance(x, bool) + return x + + +def to_class(c: Type[T], x: Any) -> dict: + assert isinstance(x, c) + return cast(Any, x).to_dict() + + +def from_list(f: Callable[[Any], T], x: Any) -> List[T]: + assert isinstance(x, list) + return [f(y) for y in x] + + +def from_int(x: Any) -> int: + assert isinstance(x, int) and not isinstance(x, bool) + return x + + +def from_float(x: Any) -> float: + assert isinstance(x, (float, int)) and not isinstance(x, bool) + return float(x) + + +def to_float(x: Any) -> float: + assert isinstance(x, float) + return x + + +class AuthType(Enum): + APIKEY = "apikey" + AWSV4 = "awsv4" + BASIC = "basic" + BEARER = "bearer" + DIGEST = "digest" + EDGEGRID = "edgegrid" + HAWK = "hawk" + NOAUTH = "noauth" + NTLM = "ntlm" + OAUTH1 = "oauth1" + OAUTH2 = "oauth2" + + +class Auth: + """Represents authentication helpers provided by Postman""" + """The attributes for API Key Authentication. e.g. key, value, in.""" + apikey: Optional[Dict[str, Any]] + """The attributes for [AWS + Auth](http://docs.aws.amazon.com/AmazonS3/latest/dev/RESTAuthentication.html). e.g. + accessKey, secretKey, region, service. + """ + awsv4: Optional[Dict[str, Any]] + """The attributes for [Basic + Authentication](https://en.wikipedia.org/wiki/Basic_access_authentication). e.g. + username, password. + """ + basic: Optional[Dict[str, Any]] + """The attributes for [Bearer Token Authentication](https://tools.ietf.org/html/rfc6750). + e.g. token. + """ + bearer: Optional[Dict[str, Any]] + """The attributes for [Digest + Authentication](https://en.wikipedia.org/wiki/Digest_access_authentication). e.g. + username, password, realm, nonce, nonceCount, algorithm, qop, opaque, clientNonce. + """ + digest: Optional[Dict[str, Any]] + """The attributes for [Akamai EdgeGrid + Authentication](https://developer.akamai.com/legacy/introduction/Client_Auth.html). e.g. + accessToken, clientToken, clientSecret, baseURL, nonce, timestamp, headersToSign. + """ + edgegrid: Optional[Dict[str, Any]] + """The attributes for [Hawk Authentication](https://github.com/hueniverse/hawk). e.g. + authId, authKey, algorith, user, nonce, extraData, appId, delegation, timestamp. + """ + hawk: Optional[Dict[str, Any]] + noauth: Any + """The attributes for [NTLM + Authentication](https://msdn.microsoft.com/en-us/library/cc237488.aspx). e.g. username, + password, domain, workstation. + """ + ntlm: Optional[Dict[str, Any]] + """The attributes for [OAuth1](https://oauth.net/1/). e.g. consumerKey, consumerSecret, + token, tokenSecret, signatureMethod, timestamp, nonce, version, realm, encodeOAuthSign. + """ + oauth1: Optional[Dict[str, Any]] + """The attributes for [OAuth2](https://oauth.net/2/). e.g. accessToken, addTokenTo.""" + oauth2: Optional[Dict[str, Any]] + type: AuthType + + def __init__(self, apikey: Optional[Dict[str, Any]], awsv4: Optional[Dict[str, Any]], + basic: Optional[Dict[str, Any]], bearer: Optional[Dict[str, Any]], digest: Optional[Dict[str, Any]], + edgegrid: Optional[Dict[str, Any]], hawk: Optional[Dict[str, Any]], noauth: Any, + ntlm: Optional[Dict[str, Any]], oauth1: Optional[Dict[str, Any]], oauth2: Optional[Dict[str, Any]], + type: AuthType) -> None: + self.apikey = apikey + self.awsv4 = awsv4 + self.basic = basic + self.bearer = bearer + self.digest = digest + self.edgegrid = edgegrid + self.hawk = hawk + self.noauth = noauth + self.ntlm = ntlm + self.oauth1 = oauth1 + self.oauth2 = oauth2 + self.type = type + + @staticmethod + def from_dict(obj: Any) -> 'Auth': + assert isinstance(obj, dict) + apikey = from_union([lambda x: from_dict(lambda x: x, x), from_none], obj.get("apikey")) + awsv4 = from_union([lambda x: from_dict(lambda x: x, x), from_none], obj.get("awsv4")) + basic = from_union([lambda x: from_dict(lambda x: x, x), from_none], obj.get("basic")) + bearer = from_union([lambda x: from_dict(lambda x: x, x), from_none], obj.get("bearer")) + digest = from_union([lambda x: from_dict(lambda x: x, x), from_none], obj.get("digest")) + edgegrid = from_union([lambda x: from_dict(lambda x: x, x), from_none], obj.get("edgegrid")) + hawk = from_union([lambda x: from_dict(lambda x: x, x), from_none], obj.get("hawk")) + noauth = obj.get("noauth") + ntlm = from_union([lambda x: from_dict(lambda x: x, x), from_none], obj.get("ntlm")) + oauth1 = from_union([lambda x: from_dict(lambda x: x, x), from_none], obj.get("oauth1")) + oauth2 = from_union([lambda x: from_dict(lambda x: x, x), from_none], obj.get("oauth2")) + type = AuthType(obj.get("type")) + return Auth(apikey, awsv4, basic, bearer, digest, edgegrid, hawk, noauth, ntlm, oauth1, oauth2, type) + + def to_dict(self) -> dict: + result: dict = {} + result["apikey"] = from_union([lambda x: from_dict(lambda x: x, x), from_none], self.apikey) + result["awsv4"] = from_union([lambda x: from_dict(lambda x: x, x), from_none], self.awsv4) + result["basic"] = from_union([lambda x: from_dict(lambda x: x, x), from_none], self.basic) + result["bearer"] = from_union([lambda x: from_dict(lambda x: x, x), from_none], self.bearer) + result["digest"] = from_union([lambda x: from_dict(lambda x: x, x), from_none], self.digest) + result["edgegrid"] = from_union([lambda x: from_dict(lambda x: x, x), from_none], self.edgegrid) + result["hawk"] = from_union([lambda x: from_dict(lambda x: x, x), from_none], self.hawk) + result["noauth"] = self.noauth + result["ntlm"] = from_union([lambda x: from_dict(lambda x: x, x), from_none], self.ntlm) + result["oauth1"] = from_union([lambda x: from_dict(lambda x: x, x), from_none], self.oauth1) + result["oauth2"] = from_union([lambda x: from_dict(lambda x: x, x), from_none], self.oauth2) + result["type"] = to_enum(AuthType, self.type) + return result + + +class PathClass: + type: Optional[str] + value: Optional[str] + + def __init__(self, type: Optional[str], value: Optional[str]) -> None: + self.type = type + self.value = value + + @staticmethod + def from_dict(obj: Any) -> 'PathClass': + assert isinstance(obj, dict) + type = from_union([from_str, from_none], obj.get("type")) + value = from_union([from_str, from_none], obj.get("value")) + return PathClass(type, value) + + def to_dict(self) -> dict: + result: dict = {} + result["type"] = from_union([from_str, from_none], self.type) + result["value"] = from_union([from_str, from_none], self.value) + return result + + +class Description: + """The content of the description goes here, as a raw string.""" + content: Optional[str] + """Holds the mime type of the raw description content. E.g: 'text/markdown' or 'text/html'. + The type is used to correctly render the description when generating documentation, or in + the Postman app. + """ + type: Optional[str] + """Description can have versions associated with it, which should be put in this property.""" + version: Any + + def __init__(self, content: Optional[str], type: Optional[str], version: Any) -> None: + self.content = content + self.type = type + self.version = version + + @staticmethod + def from_dict(obj: Any) -> 'Description': + assert isinstance(obj, dict) + content = from_union([from_str, from_none], obj.get("content")) + type = from_union([from_str, from_none], obj.get("type")) + version = obj.get("version") + return Description(content, type, version) + + def to_dict(self) -> dict: + result: dict = {} + result["content"] = from_union([from_str, from_none], self.content) + result["type"] = from_union([from_str, from_none], self.type) + result["version"] = self.version + return result + + +class QueryParam: + description: Union[Description, None, str] + """If set to true, the current query parameter will not be sent with the request.""" + disabled: Optional[bool] + key: Optional[str] + value: Optional[str] + + def __init__(self, description: Union[Description, None, str], disabled: Optional[bool], key: Optional[str], + value: Optional[str]) -> None: + self.description = description + self.disabled = disabled + self.key = key + self.value = value + + @staticmethod + def from_dict(obj: Any) -> 'QueryParam': + assert isinstance(obj, dict) + description = from_union([Description.from_dict, from_none, from_str], obj.get("description")) + disabled = from_union([from_bool, from_none], obj.get("disabled")) + key = from_union([from_none, from_str], obj.get("key")) + value = from_union([from_none, from_str], obj.get("value")) + return QueryParam(description, disabled, key, value) + + def to_dict(self) -> dict: + result: dict = {} + result["description"] = from_union([lambda x: to_class(Description, x), from_none, from_str], self.description) + result["disabled"] = from_union([from_bool, from_none], self.disabled) + result["key"] = from_union([from_none, from_str], self.key) + result["value"] = from_union([from_none, from_str], self.value) + return result + + +class VariableType(Enum): + """A variable may have multiple types. This field specifies the type of the variable.""" + ANY = "any" + BOOLEAN = "boolean" + NUMBER = "number" + STRING = "string" + + +class Variable: + """Collection variables allow you to define a set of variables, that are a *part of the + collection*, as opposed to environments, which are separate entities. + *Note: Collection variables must not contain any sensitive information.* + + Using variables in your Postman requests eliminates the need to duplicate requests, which + can save a lot of time. Variables can be defined, and referenced to from any part of a + request. + """ + description: Union[Description, None, str] + disabled: Optional[bool] + """A variable ID is a unique user-defined value that identifies the variable within a + collection. In traditional terms, this would be a variable name. + """ + id: Optional[str] + """A variable key is a human friendly value that identifies the variable within a + collection. In traditional terms, this would be a variable name. + """ + key: Optional[str] + """Variable name""" + name: Optional[str] + """When set to true, indicates that this variable has been set by Postman""" + system: Optional[bool] + """A variable may have multiple types. This field specifies the type of the variable.""" + type: Optional[VariableType] + """The value that a variable holds in this collection. Ultimately, the variables will be + replaced by this value, when say running a set of requests from a collection + """ + value: Any + + def __init__(self, description: Union[Description, None, str], disabled: Optional[bool], id: Optional[str], + key: Optional[str], name: Optional[str], system: Optional[bool], type: Optional[VariableType], + value: Any) -> None: + self.description = description + self.disabled = disabled + self.id = id + self.key = key + self.name = name + self.system = system + self.type = type + self.value = value + + @staticmethod + def from_dict(obj: Any) -> 'Variable': + assert isinstance(obj, dict) + description = from_union([Description.from_dict, from_none, from_str], obj.get("description")) + disabled = from_union([from_bool, from_none], obj.get("disabled")) + id = from_union([from_str, from_none], obj.get("id")) + key = from_union([from_str, from_none], obj.get("key")) + name = from_union([from_str, from_none], obj.get("name")) + system = from_union([from_bool, from_none], obj.get("system")) + type = from_union([VariableType, from_none], obj.get("type")) + value = obj.get("value") + return Variable(description, disabled, id, key, name, system, type, value) + + def to_dict(self) -> dict: + result: dict = {} + result["description"] = from_union([lambda x: to_class(Description, x), from_none, from_str], self.description) + result["disabled"] = from_union([from_bool, from_none], self.disabled) + result["id"] = from_union([from_str, from_none], self.id) + result["key"] = from_union([from_str, from_none], self.key) + result["name"] = from_union([from_str, from_none], self.name) + result["system"] = from_union([from_bool, from_none], self.system) + result["type"] = from_union([lambda x: to_enum(VariableType, x), from_none], self.type) + result["value"] = self.value + return result + + +class URLClass: + """Contains the URL fragment (if any). Usually this is not transmitted over the network, but + it could be useful to store this in some cases. + """ + hash: Optional[str] + """The host for the URL, E.g: api.yourdomain.com. Can be stored as a string or as an array + of strings. + """ + host: Union[List[str], None, str] + path: Union[List[Union[PathClass, str]], None, str] + """The port number present in this URL. An empty value implies 80/443 depending on whether + the protocol field contains http/https. + """ + port: Optional[str] + """The protocol associated with the request, E.g: 'http'""" + protocol: Optional[str] + """An array of QueryParams, which is basically the query string part of the URL, parsed into + separate variables + """ + query: Optional[List[QueryParam]] + """The string representation of the request URL, including the protocol, host, path, hash, + query parameter(s) and path variable(s). + """ + raw: Optional[str] + """Postman supports path variables with the syntax `/path/:variableName/to/somewhere`. These + variables are stored in this field. + """ + variable: Optional[List[Variable]] + + def __init__(self, hash: Optional[str], host: Union[List[str], None, str], + path: Union[List[Union[PathClass, str]], None, str], port: Optional[str], protocol: Optional[str], + query: Optional[List[QueryParam]], raw: Optional[str], variable: Optional[List[Variable]]) -> None: + self.hash = hash + self.host = host + self.path = path + self.port = port + self.protocol = protocol + self.query = query + self.raw = raw + self.variable = variable + + @staticmethod + def from_dict(obj: Any) -> 'URLClass': + assert isinstance(obj, dict) + hash = from_union([from_str, from_none], obj.get("hash")) + host = from_union([lambda x: from_list(from_str, x), from_str, from_none], obj.get("host")) + path = from_union( + [lambda x: from_list(lambda x: from_union([PathClass.from_dict, from_str], x), x), from_str, from_none], + obj.get("path")) + port = from_union([from_str, from_none], obj.get("port")) + protocol = from_union([from_str, from_none], obj.get("protocol")) + query = from_union([lambda x: from_list(QueryParam.from_dict, x), from_none], obj.get("query")) + raw = from_union([from_str, from_none], obj.get("raw")) + variable = from_union([lambda x: from_list(Variable.from_dict, x), from_none], obj.get("variable")) + return URLClass(hash, host, path, port, protocol, query, raw, variable) + + def to_dict(self) -> dict: + result: dict = {} + result["hash"] = from_union([from_str, from_none], self.hash) + result["host"] = from_union([lambda x: from_list(from_str, x), from_str, from_none], self.host) + result["path"] = from_union( + [lambda x: from_list(lambda x: from_union([lambda x: to_class(PathClass, x), from_str], x), x), from_str, + from_none], self.path) + result["port"] = from_union([from_str, from_none], self.port) + result["protocol"] = from_union([from_str, from_none], self.protocol) + result["query"] = from_union([lambda x: from_list(lambda x: to_class(QueryParam, x), x), from_none], self.query) + result["raw"] = from_union([from_str, from_none], self.raw) + result["variable"] = from_union([lambda x: from_list(lambda x: to_class(Variable, x), x), from_none], + self.variable) + return result + + +class Script: + """A script is a snippet of Javascript code that can be used to to perform setup or teardown + operations on a particular response. + """ + exec: Union[List[str], None, str] + """A unique, user defined identifier that can be used to refer to this script from requests.""" + id: Optional[str] + """Script name""" + name: Optional[str] + src: Union[URLClass, None, str] + """Type of the script. E.g: 'text/javascript'""" + type: Optional[str] + + def __init__(self, exec: Union[List[str], None, str], id: Optional[str], name: Optional[str], + src: Union[URLClass, None, str], type: Optional[str]) -> None: + self.exec = exec + self.id = id + self.name = name + self.src = src + self.type = type + + @staticmethod + def from_dict(obj: Any) -> 'Script': + assert isinstance(obj, dict) + exec = from_union([lambda x: from_list(from_str, x), from_str, from_none], obj.get("exec")) + id = from_union([from_str, from_none], obj.get("id")) + name = from_union([from_str, from_none], obj.get("name")) + src = from_union([URLClass.from_dict, from_str, from_none], obj.get("src")) + type = from_union([from_str, from_none], obj.get("type")) + return Script(exec, id, name, src, type) + + def to_dict(self) -> dict: + result: dict = {} + result["exec"] = from_union([lambda x: from_list(from_str, x), from_str, from_none], self.exec) + result["id"] = from_union([from_str, from_none], self.id) + result["name"] = from_union([from_str, from_none], self.name) + result["src"] = from_union([lambda x: to_class(URLClass, x), from_str, from_none], self.src) + result["type"] = from_union([from_str, from_none], self.type) + return result + + +class Event: + """Postman allows you to configure scripts to run when specific events occur. These scripts + are stored here, and can be referenced in the collection by their ID. + + Defines a script associated with an associated event name + """ + """Indicates whether the event is disabled. If absent, the event is assumed to be enabled.""" + disabled: Optional[bool] + """A unique identifier for the enclosing event.""" + id: Optional[str] + """Can be set to `test` or `prerequest` for test scripts or pre-request scripts respectively.""" + listen: str + script: Optional[Script] + + def __init__(self, disabled: Optional[bool], id: Optional[str], listen: str, script: Optional[Script]) -> None: + self.disabled = disabled + self.id = id + self.listen = listen + self.script = script + + @staticmethod + def from_dict(obj: Any) -> 'Event': + assert isinstance(obj, dict) + disabled = from_union([from_bool, from_none], obj.get("disabled")) + id = from_union([from_str, from_none], obj.get("id")) + listen = from_str(obj.get("listen")) + script = from_union([Script.from_dict, from_none], obj.get("script")) + return Event(disabled, id, listen, script) + + def to_dict(self) -> dict: + result: dict = {} + result["disabled"] = from_union([from_bool, from_none], self.disabled) + result["id"] = from_union([from_str, from_none], self.id) + result["listen"] = from_str(self.listen) + result["script"] = from_union([lambda x: to_class(Script, x), from_none], self.script) + return result + + +class CollectionVersionClass: + """A human friendly identifier to make sense of the version numbers. E.g: 'beta-3'""" + identifier: Optional[str] + """Increment this number if you make changes to the collection that changes its behaviour. + E.g: Removing or adding new test scripts. (partly or completely). + """ + major: int + meta: Any + """You should increment this number if you make changes that will not break anything that + uses the collection. E.g: removing a folder. + """ + minor: int + """Ideally, minor changes to a collection should result in the increment of this number.""" + patch: int + + def __init__(self, identifier: Optional[str], major: int, meta: Any, minor: int, patch: int) -> None: + self.identifier = identifier + self.major = major + self.meta = meta + self.minor = minor + self.patch = patch + + @staticmethod + def from_dict(obj: Any) -> 'CollectionVersionClass': + assert isinstance(obj, dict) + identifier = from_union([from_str, from_none], obj.get("identifier")) + major = from_int(obj.get("major")) + meta = obj.get("meta") + minor = from_int(obj.get("minor")) + patch = from_int(obj.get("patch")) + return CollectionVersionClass(identifier, major, meta, minor, patch) + + def to_dict(self) -> dict: + result: dict = {} + result["identifier"] = from_union([from_str, from_none], self.identifier) + result["major"] = from_int(self.major) + result["meta"] = self.meta + result["minor"] = from_int(self.minor) + result["patch"] = from_int(self.patch) + return result + + +class Information: + """Detailed description of the info block""" + """Every collection is identified by the unique value of this field. The value of this field + is usually easiest to generate using a UID generator function. If you already have a + collection, it is recommended that you maintain the same id since changing the id usually + implies that is a different collection than it was originally. + *Note: This field exists for compatibility reasons with Collection Format V1.* + """ + postman_id: Optional[str] + description: Union[Description, None, str] + """A collection's friendly name is defined by this field. You would want to set this field + to a value that would allow you to easily identify this collection among a bunch of other + collections, as such outlining its usage or content. + """ + name: str + """This should ideally hold a link to the Postman schema that is used to validate this + collection. E.g: https://schema.getpostman.com/collection/v1 + """ + schema: str + version: Union[CollectionVersionClass, None, str] + + def __init__(self, postman_id: Optional[str], description: Union[Description, None, str], name: str, schema: str, + version: Union[CollectionVersionClass, None, str]) -> None: + self.postman_id = postman_id + self.description = description + self.name = name + self.schema = schema + self.version = version + + @staticmethod + def from_dict(obj: Any) -> 'Information': + assert isinstance(obj, dict) + postman_id = from_union([from_str, from_none], obj.get("_postman_id")) + description = from_union([Description.from_dict, from_none, from_str], obj.get("description")) + name = from_str(obj.get("name")) + schema = from_str(obj.get("schema")) + version = from_union([CollectionVersionClass.from_dict, from_str, from_none], obj.get("version")) + return Information(postman_id, description, name, schema, version) + + def to_dict(self) -> dict: + result: dict = {} + result["_postman_id"] = from_union([from_str, from_none], self.postman_id) + result["description"] = from_union([lambda x: to_class(Description, x), from_none, from_str], self.description) + result["name"] = from_str(self.name) + result["schema"] = from_str(self.schema) + result["version"] = from_union([lambda x: to_class(CollectionVersionClass, x), from_str, from_none], + self.version) + return result + + +class File: + content: Optional[str] + src: Optional[str] + + def __init__(self, content: Optional[str], src: Optional[str]) -> None: + self.content = content + self.src = src + + @staticmethod + def from_dict(obj: Any) -> 'File': + assert isinstance(obj, dict) + content = from_union([from_str, from_none], obj.get("content")) + src = from_union([from_none, from_str], obj.get("src")) + return File(content, src) + + def to_dict(self) -> dict: + result: dict = {} + result["content"] = from_union([from_str, from_none], self.content) + result["src"] = from_union([from_none, from_str], self.src) + return result + + +class FormParameterType(Enum): + FILE = "file" + TEXT = "text" + + +class FormParameter: + """Override Content-Type header of this form data entity.""" + content_type: Optional[str] + description: Union[Description, None, str] + """When set to true, prevents this form data entity from being sent.""" + disabled: Optional[bool] + key: str + type: Optional[FormParameterType] + value: Optional[str] + src: Union[List[Any], None, str] + + def __init__(self, content_type: Optional[str], description: Union[Description, None, str], + disabled: Optional[bool], key: str, type: Optional[FormParameterType], value: Optional[str], + src: Union[List[Any], None, str]) -> None: + self.content_type = content_type + self.description = description + self.disabled = disabled + self.key = key + self.type = type + self.value = value + self.src = src + + @staticmethod + def from_dict(obj: Any) -> 'FormParameter': + assert isinstance(obj, dict) + content_type = from_union([from_str, from_none], obj.get("contentType")) + description = from_union([Description.from_dict, from_none, from_str], obj.get("description")) + disabled = from_union([from_bool, from_none], obj.get("disabled")) + key = from_str(obj.get("key")) + type = from_union([FormParameterType, from_none], obj.get("type")) + value = from_union([from_str, from_none], obj.get("value")) + src = from_union([lambda x: from_list(lambda x: x, x), from_none, from_str], obj.get("src")) + return FormParameter(content_type, description, disabled, key, type, value, src) + + def to_dict(self) -> dict: + result: dict = {} + result["contentType"] = from_union([from_str, from_none], self.content_type) + result["description"] = from_union([lambda x: to_class(Description, x), from_none, from_str], self.description) + result["disabled"] = from_union([from_bool, from_none], self.disabled) + result["key"] = from_str(self.key) + result["type"] = from_union([lambda x: to_enum(FormParameterType, x), from_none], self.type) + result["value"] = from_union([from_str, from_none], self.value) + result["src"] = from_union([lambda x: from_list(lambda x: x, x), from_none, from_str], self.src) + return result + + +class Mode(Enum): + """Postman stores the type of data associated with this request in this field.""" + FILE = "file" + FORMDATA = "formdata" + GRAPHQL = "graphql" + RAW = "raw" + URLENCODED = "urlencoded" + + +class URLEncodedParameter: + description: Union[Description, None, str] + disabled: Optional[bool] + key: str + value: Optional[str] + + def __init__(self, description: Union[Description, None, str], disabled: Optional[bool], key: str, + value: Optional[str]) -> None: + self.description = description + self.disabled = disabled + self.key = key + self.value = value + + @staticmethod + def from_dict(obj: Any) -> 'URLEncodedParameter': + assert isinstance(obj, dict) + description = from_union([Description.from_dict, from_none, from_str], obj.get("description")) + disabled = from_union([from_bool, from_none], obj.get("disabled")) + key = from_str(obj.get("key")) + value = from_union([from_str, from_none], obj.get("value")) + return URLEncodedParameter(description, disabled, key, value) + + def to_dict(self) -> dict: + result: dict = {} + result["description"] = from_union([lambda x: to_class(Description, x), from_none, from_str], self.description) + result["disabled"] = from_union([from_bool, from_none], self.disabled) + result["key"] = from_str(self.key) + result["value"] = from_union([from_str, from_none], self.value) + return result + + +class Body: + """This field contains the data usually contained in the request body.""" + """When set to true, prevents request body from being sent.""" + disabled: Optional[bool] + file: Optional[File] + formdata: Optional[List[FormParameter]] + graphql: Optional[Dict[str, Any]] + """Postman stores the type of data associated with this request in this field.""" + mode: Optional[Mode] + """Additional configurations and options set for various body modes.""" + options: Optional[Dict[str, Any]] + raw: Optional[str] + urlencoded: Optional[List[URLEncodedParameter]] + + def __init__(self, disabled: Optional[bool], file: Optional[File], formdata: Optional[List[FormParameter]], + graphql: Optional[Dict[str, Any]], mode: Optional[Mode], options: Optional[Dict[str, Any]], + raw: Optional[str], urlencoded: Optional[List[URLEncodedParameter]]) -> None: + self.disabled = disabled + self.file = file + self.formdata = formdata + self.graphql = graphql + self.mode = mode + self.options = options + self.raw = raw + self.urlencoded = urlencoded + + @staticmethod + def from_dict(obj: Any) -> 'Body': + assert isinstance(obj, dict) + disabled = from_union([from_bool, from_none], obj.get("disabled")) + file = from_union([File.from_dict, from_none], obj.get("file")) + formdata = from_union([lambda x: from_list(FormParameter.from_dict, x), from_none], obj.get("formdata")) + graphql = from_union([lambda x: from_dict(lambda x: x, x), from_none], obj.get("graphql")) + mode = from_union([Mode, from_none], obj.get("mode")) + options = from_union([lambda x: from_dict(lambda x: x, x), from_none], obj.get("options")) + raw = from_union([from_str, from_none], obj.get("raw")) + urlencoded = from_union([lambda x: from_list(URLEncodedParameter.from_dict, x), from_none], + obj.get("urlencoded")) + return Body(disabled, file, formdata, graphql, mode, options, raw, urlencoded) + + def to_dict(self) -> dict: + result: dict = {} + result["disabled"] = from_union([from_bool, from_none], self.disabled) + result["file"] = from_union([lambda x: to_class(File, x), from_none], self.file) + result["formdata"] = from_union([lambda x: from_list(lambda x: to_class(FormParameter, x), x), from_none], + self.formdata) + result["graphql"] = from_union([lambda x: from_dict(lambda x: x, x), from_none], self.graphql) + result["mode"] = from_union([lambda x: to_enum(Mode, x), from_none], self.mode) + result["options"] = from_union([lambda x: from_dict(lambda x: x, x), from_none], self.options) + result["raw"] = from_union([from_str, from_none], self.raw) + result["urlencoded"] = from_union( + [lambda x: from_list(lambda x: to_class(URLEncodedParameter, x), x), from_none], self.urlencoded) + return result + + +class CERT: + """An object containing path to file certificate, on the file system""" + """The path to file containing key for certificate, on the file system""" + src: Any + + def __init__(self, src: Any) -> None: + self.src = src + + @staticmethod + def from_dict(obj: Any) -> 'CERT': + assert isinstance(obj, dict) + src = obj.get("src") + return CERT(src) + + def to_dict(self) -> dict: + result: dict = {} + result["src"] = self.src + return result + + +class Key: + """An object containing path to file containing private key, on the file system""" + """The path to file containing key for certificate, on the file system""" + src: Any + + def __init__(self, src: Any) -> None: + self.src = src + + @staticmethod + def from_dict(obj: Any) -> 'Key': + assert isinstance(obj, dict) + src = obj.get("src") + return Key(src) + + def to_dict(self) -> dict: + result: dict = {} + result["src"] = self.src + return result + + +class Certificate: + """A representation of an ssl certificate""" + """An object containing path to file certificate, on the file system""" + cert: Optional[CERT] + """An object containing path to file containing private key, on the file system""" + key: Optional[Key] + """A list of Url match pattern strings, to identify Urls this certificate can be used for.""" + matches: Optional[List[str]] + """A name for the certificate for user reference""" + name: Optional[str] + """The passphrase for the certificate""" + passphrase: Optional[str] + + def __init__(self, cert: Optional[CERT], key: Optional[Key], matches: Optional[List[str]], name: Optional[str], + passphrase: Optional[str]) -> None: + self.cert = cert + self.key = key + self.matches = matches + self.name = name + self.passphrase = passphrase + + @staticmethod + def from_dict(obj: Any) -> 'Certificate': + assert isinstance(obj, dict) + cert = from_union([CERT.from_dict, from_none], obj.get("cert")) + key = from_union([Key.from_dict, from_none], obj.get("key")) + matches = from_union([lambda x: from_list(from_str, x), from_none], obj.get("matches")) + name = from_union([from_str, from_none], obj.get("name")) + passphrase = from_union([from_str, from_none], obj.get("passphrase")) + return Certificate(cert, key, matches, name, passphrase) + + def to_dict(self) -> dict: + result: dict = {} + result["cert"] = from_union([lambda x: to_class(CERT, x), from_none], self.cert) + result["key"] = from_union([lambda x: to_class(Key, x), from_none], self.key) + result["matches"] = from_union([lambda x: from_list(from_str, x), from_none], self.matches) + result["name"] = from_union([from_str, from_none], self.name) + result["passphrase"] = from_union([from_str, from_none], self.passphrase) + return result + + +class Header: + """A representation for a list of headers + + Represents a single HTTP Header + """ + description: Union[Description, None, str] + """If set to true, the current header will not be sent with requests.""" + disabled: Optional[bool] + """This holds the LHS of the HTTP Header, e.g ``Content-Type`` or ``X-Custom-Header``""" + key: str + """The value (or the RHS) of the Header is stored in this field.""" + value: str + + def __init__(self, description: Union[Description, None, str], disabled: Optional[bool], key: str, + value: str) -> None: + self.description = description + self.disabled = disabled + self.key = key + self.value = value + + @staticmethod + def from_dict(obj: Any) -> 'Header': + assert isinstance(obj, dict) + description = from_union([Description.from_dict, from_none, from_str], obj.get("description")) + disabled = from_union([from_bool, from_none], obj.get("disabled")) + key = from_str(obj.get("key")) + value = from_str(obj.get("value")) + return Header(description, disabled, key, value) + + def to_dict(self) -> dict: + result: dict = {} + result["description"] = from_union([lambda x: to_class(Description, x), from_none, from_str], self.description) + result["disabled"] = from_union([from_bool, from_none], self.disabled) + result["key"] = from_str(self.key) + result["value"] = from_str(self.value) + return result + + +class ProxyConfig: + """Using the Proxy, you can configure your custom proxy into the postman for particular url + match + """ + """When set to true, ignores this proxy configuration entity""" + disabled: Optional[bool] + """The proxy server host""" + host: Optional[str] + """The Url match for which the proxy config is defined""" + match: Optional[str] + """The proxy server port""" + port: Optional[int] + """The tunneling details for the proxy config""" + tunnel: Optional[bool] + + def __init__(self, disabled: Optional[bool], host: Optional[str], match: Optional[str], port: Optional[int], + tunnel: Optional[bool]) -> None: + self.disabled = disabled + self.host = host + self.match = match + self.port = port + self.tunnel = tunnel + + @staticmethod + def from_dict(obj: Any) -> 'ProxyConfig': + assert isinstance(obj, dict) + disabled = from_union([from_bool, from_none], obj.get("disabled")) + host = from_union([from_str, from_none], obj.get("host")) + match = from_union([from_str, from_none], obj.get("match")) + port = from_union([from_int, from_none], obj.get("port")) + tunnel = from_union([from_bool, from_none], obj.get("tunnel")) + return ProxyConfig(disabled, host, match, port, tunnel) + + def to_dict(self) -> dict: + result: dict = {} + result["disabled"] = from_union([from_bool, from_none], self.disabled) + result["host"] = from_union([from_str, from_none], self.host) + result["match"] = from_union([from_str, from_none], self.match) + result["port"] = from_union([from_int, from_none], self.port) + result["tunnel"] = from_union([from_bool, from_none], self.tunnel) + return result + + +class RequestClass: + auth: Optional[Auth] + body: Optional[Body] + certificate: Optional[Certificate] + description: Union[Description, None, str] + header: Union[List[Header], None, str] + method: Optional[str] + proxy: Optional[ProxyConfig] + url: Union[URLClass, None, str] + + def __init__(self, auth: Optional[Auth], body: Optional[Body], certificate: Optional[Certificate], + description: Union[Description, None, str], header: Union[List[Header], None, str], + method: Optional[str], proxy: Optional[ProxyConfig], url: Union[URLClass, None, str]) -> None: + self.auth = auth + self.body = body + self.certificate = certificate + self.description = description + self.header = header + self.method = method + self.proxy = proxy + self.url = url + + @staticmethod + def from_dict(obj: Any) -> 'RequestClass': + assert isinstance(obj, dict) + auth = from_union([from_none, Auth.from_dict], obj.get("auth")) + body = from_union([Body.from_dict, from_none], obj.get("body")) + certificate = from_union([Certificate.from_dict, from_none], obj.get("certificate")) + description = from_union([Description.from_dict, from_none, from_str], obj.get("description")) + header = from_union([lambda x: from_list(Header.from_dict, x), from_str, from_none], obj.get("header")) + method = from_union([from_str, from_none], obj.get("method")) + proxy = from_union([ProxyConfig.from_dict, from_none], obj.get("proxy")) + url = from_union([URLClass.from_dict, from_str, from_none], obj.get("url")) + return RequestClass(auth, body, certificate, description, header, method, proxy, url) + + def to_dict(self) -> dict: + result: dict = {} + result["auth"] = from_union([from_none, lambda x: to_class(Auth, x)], self.auth) + result["body"] = from_union([lambda x: to_class(Body, x), from_none], self.body) + result["certificate"] = from_union([lambda x: to_class(Certificate, x), from_none], self.certificate) + result["description"] = from_union([lambda x: to_class(Description, x), from_none, from_str], self.description) + result["header"] = from_union([lambda x: from_list(lambda x: to_class(Header, x), x), from_str, from_none], + self.header) + result["method"] = from_union([from_str, from_none], self.method) + result["proxy"] = from_union([lambda x: to_class(ProxyConfig, x), from_none], self.proxy) + result["url"] = from_union([lambda x: to_class(URLClass, x), from_str, from_none], self.url) + return result + + +class Cookie: + """A Cookie, that follows the [Google Chrome + format](https://developer.chrome.com/extensions/cookies) + """ + """The domain for which this cookie is valid.""" + domain: str + """When the cookie expires.""" + expires: Union[float, None, str] + """Custom attributes for a cookie go here, such as the [Priority + Field](https://code.google.com/p/chromium/issues/detail?id=232693) + """ + extensions: Optional[List[Any]] + """True if the cookie is a host-only cookie. (i.e. a request's URL domain must exactly match + the domain of the cookie). + """ + host_only: Optional[bool] + """Indicates if this cookie is HTTP Only. (if True, the cookie is inaccessible to + client-side scripts) + """ + http_only: Optional[bool] + max_age: Optional[str] + """This is the name of the Cookie.""" + name: Optional[str] + """The path associated with the Cookie.""" + path: str + """Indicates if the 'secure' flag is set on the Cookie, meaning that it is transmitted over + secure connections only. (typically HTTPS) + """ + secure: Optional[bool] + """True if the cookie is a session cookie.""" + session: Optional[bool] + """The value of the Cookie.""" + value: Optional[str] + + def __init__(self, domain: str, expires: Union[float, None, str], extensions: Optional[List[Any]], + host_only: Optional[bool], http_only: Optional[bool], max_age: Optional[str], name: Optional[str], + path: str, secure: Optional[bool], session: Optional[bool], value: Optional[str]) -> None: + self.domain = domain + self.expires = expires + self.extensions = extensions + self.host_only = host_only + self.http_only = http_only + self.max_age = max_age + self.name = name + self.path = path + self.secure = secure + self.session = session + self.value = value + + @staticmethod + def from_dict(obj: Any) -> 'Cookie': + assert isinstance(obj, dict) + domain = from_str(obj.get("domain")) + expires = from_union([from_float, from_str, from_none], obj.get("expires")) + extensions = from_union([lambda x: from_list(lambda x: x, x), from_none], obj.get("extensions")) + host_only = from_union([from_bool, from_none], obj.get("hostOnly")) + http_only = from_union([from_bool, from_none], obj.get("httpOnly")) + max_age = from_union([from_str, from_none], obj.get("maxAge")) + name = from_union([from_str, from_none], obj.get("name")) + path = from_str(obj.get("path")) + secure = from_union([from_bool, from_none], obj.get("secure")) + session = from_union([from_bool, from_none], obj.get("session")) + value = from_union([from_str, from_none], obj.get("value")) + return Cookie(domain, expires, extensions, host_only, http_only, max_age, name, path, secure, session, value) + + def to_dict(self) -> dict: + result: dict = {} + result["domain"] = from_str(self.domain) + result["expires"] = from_union([to_float, from_str, from_none], self.expires) + result["extensions"] = from_union([lambda x: from_list(lambda x: x, x), from_none], self.extensions) + result["hostOnly"] = from_union([from_bool, from_none], self.host_only) + result["httpOnly"] = from_union([from_bool, from_none], self.http_only) + result["maxAge"] = from_union([from_str, from_none], self.max_age) + result["name"] = from_union([from_str, from_none], self.name) + result["path"] = from_str(self.path) + result["secure"] = from_union([from_bool, from_none], self.secure) + result["session"] = from_union([from_bool, from_none], self.session) + result["value"] = from_union([from_str, from_none], self.value) + return result + + +class ResponseClass: + """The raw text of the response.""" + body: Optional[str] + """The numerical response code, example: 200, 201, 404, etc.""" + code: Optional[int] + cookie: Optional[List[Cookie]] + header: Union[List[Union[Header, str]], None, str] + """A unique, user defined identifier that can be used to refer to this response from + requests. + """ + id: Optional[str] + original_request: Union[RequestClass, None, str] + """The time taken by the request to complete. If a number, the unit is milliseconds. If the + response is manually created, this can be set to `null`. + """ + response_time: Union[float, None, str] + """The response status, e.g: '200 OK'""" + status: Optional[str] + """Set of timing information related to request and response in milliseconds""" + timings: Optional[Dict[str, Any]] + + def __init__(self, body: Optional[str], code: Optional[int], cookie: Optional[List[Cookie]], + header: Union[List[Union[Header, str]], None, str], id: Optional[str], + original_request: Union[RequestClass, None, str], response_time: Union[float, None, str], + status: Optional[str], timings: Optional[Dict[str, Any]]) -> None: + self.body = body + self.code = code + self.cookie = cookie + self.header = header + self.id = id + self.original_request = original_request + self.response_time = response_time + self.status = status + self.timings = timings + + @staticmethod + def from_dict(obj: Any) -> 'ResponseClass': + assert isinstance(obj, dict) + body = from_union([from_none, from_str], obj.get("body")) + code = from_union([from_int, from_none], obj.get("code")) + cookie = from_union([lambda x: from_list(Cookie.from_dict, x), from_none], obj.get("cookie")) + header = from_union( + [lambda x: from_list(lambda x: from_union([Header.from_dict, from_str], x), x), from_none, from_str], + obj.get("header")) + id = from_union([from_str, from_none], obj.get("id")) + original_request = from_union([RequestClass.from_dict, from_str, from_none], obj.get("originalRequest")) + response_time = from_union([from_float, from_str, from_none], obj.get("responseTime")) + status = from_union([from_str, from_none], obj.get("status")) + timings = from_union([lambda x: from_dict(lambda x: x, x), from_none], obj.get("timings")) + return ResponseClass(body, code, cookie, header, id, original_request, response_time, status, timings) + + def to_dict(self) -> dict: + result: dict = {} + result["body"] = from_union([from_none, from_str], self.body) + result["code"] = from_union([from_int, from_none], self.code) + result["cookie"] = from_union([lambda x: from_list(lambda x: to_class(Cookie, x), x), from_none], self.cookie) + result["header"] = from_union( + [lambda x: from_list(lambda x: from_union([lambda x: to_class(Header, x), from_str], x), x), from_none, + from_str], self.header) + result["id"] = from_union([from_str, from_none], self.id) + result["originalRequest"] = from_union([lambda x: to_class(RequestClass, x), from_str, from_none], + self.original_request) + result["responseTime"] = from_union([to_float, from_str, from_none], self.response_time) + result["status"] = from_union([from_str, from_none], self.status) + result["timings"] = from_union([lambda x: from_dict(lambda x: x, x), from_none], self.timings) + return result + + +class Items: + """Items are entities which contain an actual HTTP request, and sample responses attached to + it. + + One of the primary goals of Postman is to organize the development of APIs. To this end, + it is necessary to be able to group requests together. This can be achived using + 'Folders'. A folder just is an ordered set of requests. + """ + description: Union[Description, None, str] + event: Optional[List[Event]] + """A unique ID that is used to identify collections internally""" + id: Optional[str] + """A human readable identifier for the current item. + + A folder's friendly name is defined by this field. You would want to set this field to a + value that would allow you to easily identify this folder. + """ + name: Optional[str] + protocol_profile_behavior: Optional[Dict[str, Any]] + request: Union[RequestClass, None, str] + response: Optional[List[Union[List[Any], bool, ResponseClass, float, int, None, str]]] + variable: Optional[List[Variable]] + auth: Optional[Auth] + """Items are entities which contain an actual HTTP request, and sample responses attached to + it. Folders may contain many items. + """ + item: Optional[List['Items']] + + def __init__(self, description: Union[Description, None, str], event: Optional[List[Event]], id: Optional[str], + name: Optional[str], protocol_profile_behavior: Optional[Dict[str, Any]], + request: Union[RequestClass, None, str], + response: Optional[List[Union[List[Any], bool, ResponseClass, float, int, None, str]]], + variable: Optional[List[Variable]], auth: Optional[Auth], item: Optional[List['Items']]) -> None: + self.description = description + self.event = event + self.id = id + self.name = name + self.protocol_profile_behavior = protocol_profile_behavior + self.request = request + self.response = response + self.variable = variable + self.auth = auth + self.item = item + + @staticmethod + def from_dict(obj: Any) -> 'Items': + assert isinstance(obj, dict) + description = from_union([Description.from_dict, from_none, from_str], obj.get("description")) + event = from_union([lambda x: from_list(Event.from_dict, x), from_none], obj.get("event")) + id = from_union([from_str, from_none], obj.get("id")) + name = from_union([from_str, from_none], obj.get("name")) + protocol_profile_behavior = from_union([lambda x: from_dict(lambda x: x, x), from_none], + obj.get("protocolProfileBehavior")) + request = from_union([RequestClass.from_dict, from_str, from_none], obj.get("request")) + response = from_union([lambda x: from_list(lambda x: from_union( + [from_none, from_float, from_int, from_bool, from_str, lambda x: from_list(lambda x: x, x), + ResponseClass.from_dict], x), x), from_none], obj.get("response")) + variable = from_union([lambda x: from_list(Variable.from_dict, x), from_none], obj.get("variable")) + auth = from_union([from_none, Auth.from_dict], obj.get("auth")) + item = from_union([lambda x: from_list(Items.from_dict, x), from_none], obj.get("item")) + return Items(description, event, id, name, protocol_profile_behavior, request, response, variable, auth, item) + + def to_dict(self) -> dict: + result: dict = {} + result["description"] = from_union([lambda x: to_class(Description, x), from_none, from_str], self.description) + result["event"] = from_union([lambda x: from_list(lambda x: to_class(Event, x), x), from_none], self.event) + result["id"] = from_union([from_str, from_none], self.id) + result["name"] = from_union([from_str, from_none], self.name) + result["protocolProfileBehavior"] = from_union([lambda x: from_dict(lambda x: x, x), from_none], + self.protocol_profile_behavior) + result["request"] = from_union([lambda x: to_class(RequestClass, x), from_str, from_none], self.request) + result["response"] = from_union([lambda x: from_list(lambda x: from_union( + [from_none, to_float, from_int, from_bool, from_str, lambda x: from_list(lambda x: x, x), + lambda x: to_class(ResponseClass, x)], x), x), from_none], self.response) + result["variable"] = from_union([lambda x: from_list(lambda x: to_class(Variable, x), x), from_none], + self.variable) + result["auth"] = from_union([from_none, lambda x: to_class(Auth, x)], self.auth) + result["item"] = from_union([lambda x: from_list(lambda x: to_class(Items, x), x), from_none], self.item) + return result + + +class PostmanCollection: + auth: Optional[Auth] + event: Optional[List[Event]] + info: Information + """Items are the basic unit for a Postman collection. You can think of them as corresponding + to a single API endpoint. Each Item has one request and may have multiple API responses + associated with it. + """ + item: List[Items] + protocol_profile_behavior: Optional[Dict[str, Any]] + variable: Optional[List[Variable]] + + def __init__(self, auth: Optional[Auth], event: Optional[List[Event]], info: Information, item: List[Items], + protocol_profile_behavior: Optional[Dict[str, Any]], variable: Optional[List[Variable]]) -> None: + self.auth = auth + self.event = event + self.info = info + self.item = item + self.protocol_profile_behavior = protocol_profile_behavior + self.variable = variable + + @staticmethod + def from_dict(obj: Any) -> 'PostmanCollection': + assert isinstance(obj, dict) + auth = from_union([from_none, Auth.from_dict], obj.get("auth")) + event = from_union([lambda x: from_list(Event.from_dict, x), from_none], obj.get("event")) + info = Information.from_dict(obj.get("info")) + item = from_list(Items.from_dict, obj.get("item")) + protocol_profile_behavior = from_union([lambda x: from_dict(lambda x: x, x), from_none], + obj.get("protocolProfileBehavior")) + variable = from_union([lambda x: from_list(Variable.from_dict, x), from_none], obj.get("variable")) + return PostmanCollection(auth, event, info, item, protocol_profile_behavior, variable) + + def to_dict(self) -> dict: + result: dict = {} + result["auth"] = from_union([from_none, lambda x: to_class(Auth, x)], self.auth) + result["event"] = from_union([lambda x: from_list(lambda x: to_class(Event, x), x), from_none], self.event) + result["info"] = to_class(Information, self.info) + result["item"] = from_list(lambda x: to_class(Items, x), self.item) + result["protocolProfileBehavior"] = from_union([lambda x: from_dict(lambda x: x, x), from_none], + self.protocol_profile_behavior) + result["variable"] = from_union([lambda x: from_list(lambda x: to_class(Variable, x), x), from_none], + self.variable) + return result + + +def postman_collection_from_dict(s: Any) -> PostmanCollection: + return PostmanCollection.from_dict(s) + + +def postman_to_dict(x: PostmanCollection) -> Any: + return to_class(PostmanCollection, x) diff --git a/dotextensions/server/server.py b/dotextensions/server/server.py index 12aac33..6734b0e 100644 --- a/dotextensions/server/server.py +++ b/dotextensions/server/server.py @@ -9,12 +9,14 @@ from flask import request from . import Command, BaseHandler -from .commands import RunHttpFileHandler, FormatHttpFileHandler, GetNameReferencesHandler +from .commands import RunHttpFileHandler, FormatHttpFileHandler, GetNameReferencesHandler, ImportPostmanCollection logger = logging.getLogger('handler') handlers: Dict[str, BaseHandler] = {handler.get_method(): handler for handler in - (FormatHttpFileHandler(), RunHttpFileHandler(), GetNameReferencesHandler())} + (FormatHttpFileHandler(), RunHttpFileHandler(), GetNameReferencesHandler(), + ImportPostmanCollection(), + )} def run(command: Command) -> Dict: diff --git a/dotextensions/server/swagger/command.py b/dotextensions/server/swagger/command.py new file mode 100644 index 0000000..38ddb03 --- /dev/null +++ b/dotextensions/server/swagger/command.py @@ -0,0 +1,23 @@ +from ...server import BaseHandler, Command, Result + + +class SwaggerImporter(BaseHandler): + name = "/import/2.0/swagger" + + def get_method(self): + return SwaggerImporter.name + + def run(self, command: Command) -> Result: + mode = command.params['mode'] + if mode == 'filename': + self.import_file(command.params) + else: + self.import_link(command.params) + + def import_file(self, params): + filename = params['filename'] + with open(filename) as f: + data = f.read() + + def import_link(self, params): + link = params['url'] diff --git a/dotextensions/server/swagger/swagger_2.py b/dotextensions/server/swagger/swagger_2.py new file mode 100644 index 0000000..ebf3e68 --- /dev/null +++ b/dotextensions/server/swagger/swagger_2.py @@ -0,0 +1,359 @@ +from typing import Optional, Dict, Any, List, TypeVar, Callable, Type, cast + +T = TypeVar("T") + + +def from_dict(f: Callable[[Any], T], x: Any) -> Dict[str, T]: + assert isinstance(x, dict) + return {k: f(v) for (k, v) in x.items()} + + +def from_none(x: Any) -> Any: + assert x is None + return x + + +def from_union(fs, x): + for f in fs: + try: + return f(x) + except: + pass + assert False + + +def from_str(x: Any) -> str: + assert isinstance(x, str) + return x + + +def to_class(c: Type[T], x: Any) -> dict: + assert isinstance(x, c) + return cast(Any, x).to_dict() + + +def from_list(f: Callable[[Any], T], x: Any) -> List[T]: + assert isinstance(x, list) + return [f(y) for y in x] + + +class Components: + callbacks: Optional[Dict[str, Any]] + examples: Optional[Dict[str, Any]] + headers: Optional[Dict[str, Any]] + links: Optional[Dict[str, Any]] + parameters: Optional[Dict[str, Any]] + request_bodies: Optional[Dict[str, Any]] + responses: Optional[Dict[str, Any]] + schemas: Optional[Dict[str, Any]] + security_schemes: Optional[Dict[str, Any]] + + def __init__(self, callbacks: Optional[Dict[str, Any]], examples: Optional[Dict[str, Any]], + headers: Optional[Dict[str, Any]], links: Optional[Dict[str, Any]], + parameters: Optional[Dict[str, Any]], request_bodies: Optional[Dict[str, Any]], + responses: Optional[Dict[str, Any]], schemas: Optional[Dict[str, Any]], + security_schemes: Optional[Dict[str, Any]]) -> None: + self.callbacks = callbacks + self.examples = examples + self.headers = headers + self.links = links + self.parameters = parameters + self.request_bodies = request_bodies + self.responses = responses + self.schemas = schemas + self.security_schemes = security_schemes + + @staticmethod + def from_dict(obj: Any) -> 'Components': + assert isinstance(obj, dict) + callbacks = from_union([lambda x: from_dict(lambda x: x, x), from_none], obj.get("callbacks")) + examples = from_union([lambda x: from_dict(lambda x: x, x), from_none], obj.get("examples")) + headers = from_union([lambda x: from_dict(lambda x: x, x), from_none], obj.get("headers")) + links = from_union([lambda x: from_dict(lambda x: x, x), from_none], obj.get("links")) + parameters = from_union([lambda x: from_dict(lambda x: x, x), from_none], obj.get("parameters")) + request_bodies = from_union([lambda x: from_dict(lambda x: x, x), from_none], obj.get("requestBodies")) + responses = from_union([lambda x: from_dict(lambda x: x, x), from_none], obj.get("responses")) + schemas = from_union([lambda x: from_dict(lambda x: x, x), from_none], obj.get("schemas")) + security_schemes = from_union([lambda x: from_dict(lambda x: x, x), from_none], obj.get("securitySchemes")) + return Components(callbacks, examples, headers, links, parameters, request_bodies, responses, schemas, + security_schemes) + + def to_dict(self) -> dict: + result: dict = {} + result["callbacks"] = from_union([lambda x: from_dict(lambda x: x, x), from_none], self.callbacks) + result["examples"] = from_union([lambda x: from_dict(lambda x: x, x), from_none], self.examples) + result["headers"] = from_union([lambda x: from_dict(lambda x: x, x), from_none], self.headers) + result["links"] = from_union([lambda x: from_dict(lambda x: x, x), from_none], self.links) + result["parameters"] = from_union([lambda x: from_dict(lambda x: x, x), from_none], self.parameters) + result["requestBodies"] = from_union([lambda x: from_dict(lambda x: x, x), from_none], self.request_bodies) + result["responses"] = from_union([lambda x: from_dict(lambda x: x, x), from_none], self.responses) + result["schemas"] = from_union([lambda x: from_dict(lambda x: x, x), from_none], self.schemas) + result["securitySchemes"] = from_union([lambda x: from_dict(lambda x: x, x), from_none], self.security_schemes) + return result + + +class ExternalDocumentation: + description: Optional[str] + url: str + + def __init__(self, description: Optional[str], url: str) -> None: + self.description = description + self.url = url + + @staticmethod + def from_dict(obj: Any) -> 'ExternalDocumentation': + assert isinstance(obj, dict) + description = from_union([from_str, from_none], obj.get("description")) + url = from_str(obj.get("url")) + return ExternalDocumentation(description, url) + + def to_dict(self) -> dict: + result: dict = {} + result["description"] = from_union([from_str, from_none], self.description) + result["url"] = from_str(self.url) + return result + + +class Contact: + email: Optional[str] + name: Optional[str] + url: Optional[str] + + def __init__(self, email: Optional[str], name: Optional[str], url: Optional[str]) -> None: + self.email = email + self.name = name + self.url = url + + @staticmethod + def from_dict(obj: Any) -> 'Contact': + assert isinstance(obj, dict) + email = from_union([from_str, from_none], obj.get("email")) + name = from_union([from_str, from_none], obj.get("name")) + url = from_union([from_str, from_none], obj.get("url")) + return Contact(email, name, url) + + def to_dict(self) -> dict: + result: dict = {} + result["email"] = from_union([from_str, from_none], self.email) + result["name"] = from_union([from_str, from_none], self.name) + result["url"] = from_union([from_str, from_none], self.url) + return result + + +class License: + name: str + url: Optional[str] + + def __init__(self, name: str, url: Optional[str]) -> None: + self.name = name + self.url = url + + @staticmethod + def from_dict(obj: Any) -> 'License': + assert isinstance(obj, dict) + name = from_str(obj.get("name")) + url = from_union([from_str, from_none], obj.get("url")) + return License(name, url) + + def to_dict(self) -> dict: + result: dict = {} + result["name"] = from_str(self.name) + result["url"] = from_union([from_str, from_none], self.url) + return result + + +class Info: + contact: Optional[Contact] + description: Optional[str] + license: Optional[License] + terms_of_service: Optional[str] + title: str + version: str + + def __init__(self, contact: Optional[Contact], description: Optional[str], license: Optional[License], + terms_of_service: Optional[str], title: str, version: str) -> None: + self.contact = contact + self.description = description + self.license = license + self.terms_of_service = terms_of_service + self.title = title + self.version = version + + @staticmethod + def from_dict(obj: Any) -> 'Info': + assert isinstance(obj, dict) + contact = from_union([Contact.from_dict, from_none], obj.get("contact")) + description = from_union([from_str, from_none], obj.get("description")) + license = from_union([License.from_dict, from_none], obj.get("license")) + terms_of_service = from_union([from_str, from_none], obj.get("termsOfService")) + title = from_str(obj.get("title")) + version = from_str(obj.get("version")) + return Info(contact, description, license, terms_of_service, title, version) + + def to_dict(self) -> dict: + result: dict = {} + result["contact"] = from_union([lambda x: to_class(Contact, x), from_none], self.contact) + result["description"] = from_union([from_str, from_none], self.description) + result["license"] = from_union([lambda x: to_class(License, x), from_none], self.license) + result["termsOfService"] = from_union([from_str, from_none], self.terms_of_service) + result["title"] = from_str(self.title) + result["version"] = from_str(self.version) + return result + + +class Paths: + pass + + def __init__(self, ) -> None: + pass + + @staticmethod + def from_dict(obj: Any) -> 'Paths': + assert isinstance(obj, dict) + return Paths() + + def to_dict(self) -> dict: + result: dict = {} + return result + + +class ServerVariable: + default: str + description: Optional[str] + enum: Optional[List[str]] + + def __init__(self, default: str, description: Optional[str], enum: Optional[List[str]]) -> None: + self.default = default + self.description = description + self.enum = enum + + @staticmethod + def from_dict(obj: Any) -> 'ServerVariable': + assert isinstance(obj, dict) + default = from_str(obj.get("default")) + description = from_union([from_str, from_none], obj.get("description")) + enum = from_union([lambda x: from_list(from_str, x), from_none], obj.get("enum")) + return ServerVariable(default, description, enum) + + def to_dict(self) -> dict: + result: dict = {} + result["default"] = from_str(self.default) + result["description"] = from_union([from_str, from_none], self.description) + result["enum"] = from_union([lambda x: from_list(from_str, x), from_none], self.enum) + return result + + +class Server: + description: Optional[str] + url: str + variables: Optional[Dict[str, ServerVariable]] + + def __init__(self, description: Optional[str], url: str, variables: Optional[Dict[str, ServerVariable]]) -> None: + self.description = description + self.url = url + self.variables = variables + + @staticmethod + def from_dict(obj: Any) -> 'Server': + assert isinstance(obj, dict) + description = from_union([from_str, from_none], obj.get("description")) + url = from_str(obj.get("url")) + variables = from_union([lambda x: from_dict(ServerVariable.from_dict, x), from_none], obj.get("variables")) + return Server(description, url, variables) + + def to_dict(self) -> dict: + result: dict = {} + result["description"] = from_union([from_str, from_none], self.description) + result["url"] = from_str(self.url) + result["variables"] = from_union([lambda x: from_dict(lambda x: to_class(ServerVariable, x), x), from_none], + self.variables) + return result + + +class Tag: + description: Optional[str] + external_docs: Optional[ExternalDocumentation] + name: str + + def __init__(self, description: Optional[str], external_docs: Optional[ExternalDocumentation], name: str) -> None: + self.description = description + self.external_docs = external_docs + self.name = name + + @staticmethod + def from_dict(obj: Any) -> 'Tag': + assert isinstance(obj, dict) + description = from_union([from_str, from_none], obj.get("description")) + external_docs = from_union([ExternalDocumentation.from_dict, from_none], obj.get("externalDocs")) + name = from_str(obj.get("name")) + return Tag(description, external_docs, name) + + def to_dict(self) -> dict: + result: dict = {} + result["description"] = from_union([from_str, from_none], self.description) + result["externalDocs"] = from_union([lambda x: to_class(ExternalDocumentation, x), from_none], + self.external_docs) + result["name"] = from_str(self.name) + return result + + +class Swagger2: + """Validation schema for OpenAPI Specification 3.0.X.""" + components: Optional[Components] + external_docs: Optional[ExternalDocumentation] + info: Info + openapi: str + paths: Paths + security: Optional[List[Dict[str, List[str]]]] + servers: Optional[List[Server]] + tags: Optional[List[Tag]] + + def __init__(self, components: Optional[Components], external_docs: Optional[ExternalDocumentation], info: Info, + openapi: str, paths: Paths, security: Optional[List[Dict[str, List[str]]]], + servers: Optional[List[Server]], tags: Optional[List[Tag]]) -> None: + self.components = components + self.external_docs = external_docs + self.info = info + self.openapi = openapi + self.paths = paths + self.security = security + self.servers = servers + self.tags = tags + + @staticmethod + def from_dict(obj: Any) -> 'Swagger2': + assert isinstance(obj, dict) + components = from_union([Components.from_dict, from_none], obj.get("components")) + external_docs = from_union([ExternalDocumentation.from_dict, from_none], obj.get("externalDocs")) + info = Info.from_dict(obj.get("info")) + openapi = from_str(obj.get("openapi")) + paths = Paths.from_dict(obj.get("paths")) + security = from_union( + [lambda x: from_list(lambda x: from_dict(lambda x: from_list(from_str, x), x), x), from_none], + obj.get("security")) + servers = from_union([lambda x: from_list(Server.from_dict, x), from_none], obj.get("servers")) + tags = from_union([lambda x: from_list(Tag.from_dict, x), from_none], obj.get("tags")) + return Swagger2(components, external_docs, info, openapi, paths, security, servers, tags) + + def to_dict(self) -> dict: + result: dict = {} + result["components"] = from_union([lambda x: to_class(Components, x), from_none], self.components) + result["externalDocs"] = from_union([lambda x: to_class(ExternalDocumentation, x), from_none], + self.external_docs) + result["info"] = to_class(Info, self.info) + result["openapi"] = from_str(self.openapi) + result["paths"] = to_class(Paths, self.paths) + result["security"] = from_union( + [lambda x: from_list(lambda x: from_dict(lambda x: from_list(from_str, x), x), x), from_none], + self.security) + result["servers"] = from_union([lambda x: from_list(lambda x: to_class(Server, x), x), from_none], self.servers) + result["tags"] = from_union([lambda x: from_list(lambda x: to_class(Tag, x), x), from_none], self.tags) + return result + + +def swagger2_from_dict(s: Any) -> Swagger2: + return Swagger2.from_dict(s) + + +def swagger2_to_dict(x: Swagger2) -> Any: + return to_class(Swagger2, x) diff --git a/dotextensions/server/swagger/swagger_3.py b/dotextensions/server/swagger/swagger_3.py new file mode 100644 index 0000000..0072aa3 --- /dev/null +++ b/dotextensions/server/swagger/swagger_3.py @@ -0,0 +1,359 @@ +from typing import Optional, Dict, Any, List, TypeVar, Callable, Type, cast + +T = TypeVar("T") + + +def from_dict(f: Callable[[Any], T], x: Any) -> Dict[str, T]: + assert isinstance(x, dict) + return {k: f(v) for (k, v) in x.items()} + + +def from_none(x: Any) -> Any: + assert x is None + return x + + +def from_union(fs, x): + for f in fs: + try: + return f(x) + except: + pass + assert False + + +def from_str(x: Any) -> str: + assert isinstance(x, str) + return x + + +def to_class(c: Type[T], x: Any) -> dict: + assert isinstance(x, c) + return cast(Any, x).to_dict() + + +def from_list(f: Callable[[Any], T], x: Any) -> List[T]: + assert isinstance(x, list) + return [f(y) for y in x] + + +class Components: + callbacks: Optional[Dict[str, Any]] + examples: Optional[Dict[str, Any]] + headers: Optional[Dict[str, Any]] + links: Optional[Dict[str, Any]] + parameters: Optional[Dict[str, Any]] + request_bodies: Optional[Dict[str, Any]] + responses: Optional[Dict[str, Any]] + schemas: Optional[Dict[str, Any]] + security_schemes: Optional[Dict[str, Any]] + + def __init__(self, callbacks: Optional[Dict[str, Any]], examples: Optional[Dict[str, Any]], + headers: Optional[Dict[str, Any]], links: Optional[Dict[str, Any]], + parameters: Optional[Dict[str, Any]], request_bodies: Optional[Dict[str, Any]], + responses: Optional[Dict[str, Any]], schemas: Optional[Dict[str, Any]], + security_schemes: Optional[Dict[str, Any]]) -> None: + self.callbacks = callbacks + self.examples = examples + self.headers = headers + self.links = links + self.parameters = parameters + self.request_bodies = request_bodies + self.responses = responses + self.schemas = schemas + self.security_schemes = security_schemes + + @staticmethod + def from_dict(obj: Any) -> 'Components': + assert isinstance(obj, dict) + callbacks = from_union([lambda x: from_dict(lambda x: x, x), from_none], obj.get("callbacks")) + examples = from_union([lambda x: from_dict(lambda x: x, x), from_none], obj.get("examples")) + headers = from_union([lambda x: from_dict(lambda x: x, x), from_none], obj.get("headers")) + links = from_union([lambda x: from_dict(lambda x: x, x), from_none], obj.get("links")) + parameters = from_union([lambda x: from_dict(lambda x: x, x), from_none], obj.get("parameters")) + request_bodies = from_union([lambda x: from_dict(lambda x: x, x), from_none], obj.get("requestBodies")) + responses = from_union([lambda x: from_dict(lambda x: x, x), from_none], obj.get("responses")) + schemas = from_union([lambda x: from_dict(lambda x: x, x), from_none], obj.get("schemas")) + security_schemes = from_union([lambda x: from_dict(lambda x: x, x), from_none], obj.get("securitySchemes")) + return Components(callbacks, examples, headers, links, parameters, request_bodies, responses, schemas, + security_schemes) + + def to_dict(self) -> dict: + result: dict = {} + result["callbacks"] = from_union([lambda x: from_dict(lambda x: x, x), from_none], self.callbacks) + result["examples"] = from_union([lambda x: from_dict(lambda x: x, x), from_none], self.examples) + result["headers"] = from_union([lambda x: from_dict(lambda x: x, x), from_none], self.headers) + result["links"] = from_union([lambda x: from_dict(lambda x: x, x), from_none], self.links) + result["parameters"] = from_union([lambda x: from_dict(lambda x: x, x), from_none], self.parameters) + result["requestBodies"] = from_union([lambda x: from_dict(lambda x: x, x), from_none], self.request_bodies) + result["responses"] = from_union([lambda x: from_dict(lambda x: x, x), from_none], self.responses) + result["schemas"] = from_union([lambda x: from_dict(lambda x: x, x), from_none], self.schemas) + result["securitySchemes"] = from_union([lambda x: from_dict(lambda x: x, x), from_none], self.security_schemes) + return result + + +class ExternalDocumentation: + description: Optional[str] + url: str + + def __init__(self, description: Optional[str], url: str) -> None: + self.description = description + self.url = url + + @staticmethod + def from_dict(obj: Any) -> 'ExternalDocumentation': + assert isinstance(obj, dict) + description = from_union([from_str, from_none], obj.get("description")) + url = from_str(obj.get("url")) + return ExternalDocumentation(description, url) + + def to_dict(self) -> dict: + result: dict = {} + result["description"] = from_union([from_str, from_none], self.description) + result["url"] = from_str(self.url) + return result + + +class Contact: + email: Optional[str] + name: Optional[str] + url: Optional[str] + + def __init__(self, email: Optional[str], name: Optional[str], url: Optional[str]) -> None: + self.email = email + self.name = name + self.url = url + + @staticmethod + def from_dict(obj: Any) -> 'Contact': + assert isinstance(obj, dict) + email = from_union([from_str, from_none], obj.get("email")) + name = from_union([from_str, from_none], obj.get("name")) + url = from_union([from_str, from_none], obj.get("url")) + return Contact(email, name, url) + + def to_dict(self) -> dict: + result: dict = {} + result["email"] = from_union([from_str, from_none], self.email) + result["name"] = from_union([from_str, from_none], self.name) + result["url"] = from_union([from_str, from_none], self.url) + return result + + +class License: + name: str + url: Optional[str] + + def __init__(self, name: str, url: Optional[str]) -> None: + self.name = name + self.url = url + + @staticmethod + def from_dict(obj: Any) -> 'License': + assert isinstance(obj, dict) + name = from_str(obj.get("name")) + url = from_union([from_str, from_none], obj.get("url")) + return License(name, url) + + def to_dict(self) -> dict: + result: dict = {} + result["name"] = from_str(self.name) + result["url"] = from_union([from_str, from_none], self.url) + return result + + +class Info: + contact: Optional[Contact] + description: Optional[str] + license: Optional[License] + terms_of_service: Optional[str] + title: str + version: str + + def __init__(self, contact: Optional[Contact], description: Optional[str], license: Optional[License], + terms_of_service: Optional[str], title: str, version: str) -> None: + self.contact = contact + self.description = description + self.license = license + self.terms_of_service = terms_of_service + self.title = title + self.version = version + + @staticmethod + def from_dict(obj: Any) -> 'Info': + assert isinstance(obj, dict) + contact = from_union([Contact.from_dict, from_none], obj.get("contact")) + description = from_union([from_str, from_none], obj.get("description")) + license = from_union([License.from_dict, from_none], obj.get("license")) + terms_of_service = from_union([from_str, from_none], obj.get("termsOfService")) + title = from_str(obj.get("title")) + version = from_str(obj.get("version")) + return Info(contact, description, license, terms_of_service, title, version) + + def to_dict(self) -> dict: + result: dict = {} + result["contact"] = from_union([lambda x: to_class(Contact, x), from_none], self.contact) + result["description"] = from_union([from_str, from_none], self.description) + result["license"] = from_union([lambda x: to_class(License, x), from_none], self.license) + result["termsOfService"] = from_union([from_str, from_none], self.terms_of_service) + result["title"] = from_str(self.title) + result["version"] = from_str(self.version) + return result + + +class Paths: + pass + + def __init__(self, ) -> None: + pass + + @staticmethod + def from_dict(obj: Any) -> 'Paths': + assert isinstance(obj, dict) + return Paths() + + def to_dict(self) -> dict: + result: dict = {} + return result + + +class ServerVariable: + default: str + description: Optional[str] + enum: Optional[List[str]] + + def __init__(self, default: str, description: Optional[str], enum: Optional[List[str]]) -> None: + self.default = default + self.description = description + self.enum = enum + + @staticmethod + def from_dict(obj: Any) -> 'ServerVariable': + assert isinstance(obj, dict) + default = from_str(obj.get("default")) + description = from_union([from_str, from_none], obj.get("description")) + enum = from_union([lambda x: from_list(from_str, x), from_none], obj.get("enum")) + return ServerVariable(default, description, enum) + + def to_dict(self) -> dict: + result: dict = {} + result["default"] = from_str(self.default) + result["description"] = from_union([from_str, from_none], self.description) + result["enum"] = from_union([lambda x: from_list(from_str, x), from_none], self.enum) + return result + + +class Server: + description: Optional[str] + url: str + variables: Optional[Dict[str, ServerVariable]] + + def __init__(self, description: Optional[str], url: str, variables: Optional[Dict[str, ServerVariable]]) -> None: + self.description = description + self.url = url + self.variables = variables + + @staticmethod + def from_dict(obj: Any) -> 'Server': + assert isinstance(obj, dict) + description = from_union([from_str, from_none], obj.get("description")) + url = from_str(obj.get("url")) + variables = from_union([lambda x: from_dict(ServerVariable.from_dict, x), from_none], obj.get("variables")) + return Server(description, url, variables) + + def to_dict(self) -> dict: + result: dict = {} + result["description"] = from_union([from_str, from_none], self.description) + result["url"] = from_str(self.url) + result["variables"] = from_union([lambda x: from_dict(lambda x: to_class(ServerVariable, x), x), from_none], + self.variables) + return result + + +class Tag: + description: Optional[str] + external_docs: Optional[ExternalDocumentation] + name: str + + def __init__(self, description: Optional[str], external_docs: Optional[ExternalDocumentation], name: str) -> None: + self.description = description + self.external_docs = external_docs + self.name = name + + @staticmethod + def from_dict(obj: Any) -> 'Tag': + assert isinstance(obj, dict) + description = from_union([from_str, from_none], obj.get("description")) + external_docs = from_union([ExternalDocumentation.from_dict, from_none], obj.get("externalDocs")) + name = from_str(obj.get("name")) + return Tag(description, external_docs, name) + + def to_dict(self) -> dict: + result: dict = {} + result["description"] = from_union([from_str, from_none], self.description) + result["externalDocs"] = from_union([lambda x: to_class(ExternalDocumentation, x), from_none], + self.external_docs) + result["name"] = from_str(self.name) + return result + + +class Swagger3: + """Validation schema for OpenAPI Specification 3.0.X.""" + components: Optional[Components] + external_docs: Optional[ExternalDocumentation] + info: Info + openapi: str + paths: Paths + security: Optional[List[Dict[str, List[str]]]] + servers: Optional[List[Server]] + tags: Optional[List[Tag]] + + def __init__(self, components: Optional[Components], external_docs: Optional[ExternalDocumentation], info: Info, + openapi: str, paths: Paths, security: Optional[List[Dict[str, List[str]]]], + servers: Optional[List[Server]], tags: Optional[List[Tag]]) -> None: + self.components = components + self.external_docs = external_docs + self.info = info + self.openapi = openapi + self.paths = paths + self.security = security + self.servers = servers + self.tags = tags + + @staticmethod + def from_dict(obj: Any) -> 'Swagger3': + assert isinstance(obj, dict) + components = from_union([Components.from_dict, from_none], obj.get("components")) + external_docs = from_union([ExternalDocumentation.from_dict, from_none], obj.get("externalDocs")) + info = Info.from_dict(obj.get("info")) + openapi = from_str(obj.get("openapi")) + paths = Paths.from_dict(obj.get("paths")) + security = from_union( + [lambda x: from_list(lambda x: from_dict(lambda x: from_list(from_str, x), x), x), from_none], + obj.get("security")) + servers = from_union([lambda x: from_list(Server.from_dict, x), from_none], obj.get("servers")) + tags = from_union([lambda x: from_list(Tag.from_dict, x), from_none], obj.get("tags")) + return Swagger3(components, external_docs, info, openapi, paths, security, servers, tags) + + def to_dict(self) -> dict: + result: dict = {} + result["components"] = from_union([lambda x: to_class(Components, x), from_none], self.components) + result["externalDocs"] = from_union([lambda x: to_class(ExternalDocumentation, x), from_none], + self.external_docs) + result["info"] = to_class(Info, self.info) + result["openapi"] = from_str(self.openapi) + result["paths"] = to_class(Paths, self.paths) + result["security"] = from_union( + [lambda x: from_list(lambda x: from_dict(lambda x: from_list(from_str, x), x), x), from_none], + self.security) + result["servers"] = from_union([lambda x: from_list(lambda x: to_class(Server, x), x), from_none], self.servers) + result["tags"] = from_union([lambda x: from_list(lambda x: to_class(Tag, x), x), from_none], self.tags) + return result + + +def swagger3_from_dict(s: Any) -> Swagger3: + return Swagger3.from_dict(s) + + +def swagger3_to_dict(x: Swagger3) -> Any: + return to_class(Swagger3, x) diff --git a/dothttp/__init__.py b/dothttp/__init__.py index e8b98f0..70d1b4c 100644 --- a/dothttp/__init__.py +++ b/dothttp/__init__.py @@ -1,3 +1,4 @@ +import functools import logging import os import re @@ -5,10 +6,10 @@ from collections import defaultdict from dataclasses import dataclass, field from http.cookiejar import LWPCookieJar -from typing import Union, Dict, List, DefaultDict, Optional +from typing import DefaultDict, Optional +from typing import Dict, List, Union import jstyleson as json -import magic from jsonschema import validate from requests import PreparedRequest, Session, Response # this is bad, loading private stuff. find a better way @@ -22,6 +23,11 @@ from .parse_models import Allhttp from .property_schema import property_schema +try: + import magic +except ImportError: + magic = None + FORM_URLENCODED = "application/x-www-form-urlencoded" DOTHTTP_COOKIEJAR = os.path.expanduser('~/.dothttp.cookiejar') @@ -66,7 +72,10 @@ class Payload: data: Union[str, bytes, None, Dict] = None json: Union[Dict, None] = None header: Union[str, None] = None - files: Union[Dict, None] = None + # [[ "key", ["filename", "content", "datatype"], + # ["key", ["filename2", "content", "datatype"]], + # ["key2", [None, "content", "datatype"]],] + files: Union[List, None] = None @dataclass @@ -166,10 +175,12 @@ def __init__(self, args: Config): def load(self): self.load_content() + self.load_model() self.load_properties_n_headers() self.load_command_line_props() - self.update_content_with_prop() - self.load_model() + self.validate_names() + self.load_props_needed_for_content() + self.select_target() def load_command_line_props(self): for prop in self.args.properties: @@ -232,13 +243,13 @@ def validate_n_gen(prop, cache: Dict[str, Property]): cache.setdefault(prop, p) return p - @staticmethod - def get_most_possible_val(*args): + def get_most_possible_val(self, var): + args = self.command_line_props.get(var), self.properties.get(var), self.prop_cache[var].value for arg in args: if arg is not None: return arg - def update_content_with_prop(self): + def get_updated_content(self, content) -> str: """ 1. properties defined in file itself ({{a=10}}) allowed values are @@ -251,32 +262,39 @@ def update_content_with_prop(self): 4. properties from files's '*' :return: """ - out = BaseModelProcessor.var_regex.findall(self.content) - base_logger.debug(f'property used in `{self.file}` are `{out}`') - prop_cache: Dict[str, Property] = {} - tuple(self.validate_n_gen(x, prop_cache) for x in out if x) # generates prop_cache, this could be done better - props_needed = set(prop_cache.keys()) + prop_cache = self.prop_cache + + content_prop_needed = self.get_declared_props(content) + props_needed = set(content_prop_needed.keys()) - keys = set(self.properties.keys()).union(set(self.command_line_props.keys())) - missing_props = props_needed - keys - set(key for key in prop_cache if prop_cache[key].value is not None) + missing_props = props_needed - self.get_props_available() if len(missing_props) != 0: raise PropertyNotFoundException( var=missing_props, propertyfile=self.property_file if self.property_file else "not specified") for var in props_needed: - base_logger.debug( - f'using `{self.properties.get(var)}` for property {var} ') # command line props take preference - value = self.get_most_possible_val(self.command_line_props.get(var), self.properties.get(var), - prop_cache[var].value) - for text_to_replace in prop_cache[var].text: - self.content = self.content.replace("{{" + text_to_replace + "}}", value) - + value = self.get_most_possible_val(var) + base_logger.debug(f'using `{value}` for property {var}') + for text_to_replace in content_prop_needed[var].text: + content = content.replace("{{" + text_to_replace + "}}", value) + return content + + @functools.lru_cache + def get_props_available(self): + return set(self.properties.keys()).union(set(self.command_line_props.keys())).union(set( + key for key in self.prop_cache if self.prop_cache[key].value is not None)) + + def load_props_needed_for_content(self): + self.prop_cache = self.get_declared_props(self.content) + + def get_declared_props(self, content): + out = BaseModelProcessor.var_regex.findall(content) + base_logger.debug(f'property used in `{self.file}` are `{out}`') + prop_cache: Dict[str, Property] = {} + tuple(self.validate_n_gen(x, prop_cache) for x in out if x) # generates prop_cache, this could be done better + return prop_cache -class RequestBase(BaseModelProcessor): - def __init__(self, args: Config): - super().__init__(args) - self._cookie: Union[LWPCookieJar, None] = None - self.validate_names() + def select_target(self): if target := self.args.target: if not isinstance(target, str): target = str(target) @@ -307,11 +325,17 @@ def validate_names(self): names.append(name) names.append(str(index + 1)) + +class RequestBase(BaseModelProcessor): + def __init__(self, args: Config): + super().__init__(args) + self._cookie: Union[LWPCookieJar, None] = None + def get_query(self): params: DefaultDict[List] = defaultdict(list) for line in self.http.lines: if query := line.query: - params[query.key].append(query.value) + params[self.get_updated_content(query.key)].append(self.get_updated_content(query.value)) request_logger.debug( f'computed query params from `{self.file}` are `{params}`') return params @@ -337,7 +361,7 @@ def get_headers(self): def get_url(self): request_logger.debug( f'url is {self.http.urlwrap.url}') - return self.http.urlwrap.url + return self.get_updated_content(self.http.urlwrap.url) def get_method(self): if method := self.http.urlwrap.method: @@ -363,41 +387,47 @@ def get_payload(self): if not self.http.payload: return Payload() elif data := self.http.payload.data: - mimetype = self.get_mimetype_from_buffer(data, self.http.payload.type) + content = self.get_updated_content(data) + mimetype = self.get_mimetype_from_buffer(content, + self.get_updated_content(self.http.payload.type)) request_logger.debug( - f'payload for request is `{data}`') - return Payload(data, header=mimetype) + f'payload for request is `{content}`') + return Payload(content, header=mimetype) elif data_json := self.http.payload.datajson: - d = json_or_array_to_json(data_json) + d = json_or_array_to_json(data_json, self.get_updated_content) if isinstance(d, list): raise PayloadDataNotValidException(payload=f"data should be json/str, current: {d}") + # TODO convert all into string + # varstring hanlding return Payload(data=d, header=FORM_URLENCODED) - elif filename := self.http.payload.file: + elif upload_filename := self.http.payload.file: + upload_filename = self.get_updated_content(upload_filename) request_logger.debug( - f'payload will be loaded from `{filename}`') - if not os.path.exists(filename): + f'payload will be loaded from `{upload_filename}`') + if not os.path.exists(upload_filename): request_logger.debug( - f'payload file `{filename}` Not found. ') - raise DataFileNotFoundException(datafile=filename) - mimetype = self.get_mimetype_from_file(filename, self.http.payload.type) - with open(filename, 'rb') as f: + f'payload file `{upload_filename}` Not found. ') + raise DataFileNotFoundException(datafile=upload_filename) + mimetype = self.get_mimetype_from_file(upload_filename, self.http.payload.type) + with open(upload_filename, 'rb') as f: return Payload(data=f.read(), header=mimetype) elif json_data := self.http.payload.json: - d = json_or_array_to_json(json_data) + d = json_or_array_to_json(json_data, self.get_updated_content) return Payload(json=d, header=MIME_TYPE_JSON) elif files_wrap := self.http.payload.fileswrap: - files = {} - for filetype in files_wrap.files: - content = filetype.path - mimetype = filetype.type - if os.path.exists(filetype.path): # probably check valid path, then check for exists - content = open(filetype.path, 'rb') - filename = os.path.basename(filetype.path) - mimetype = self.get_mimetype_from_file(filetype.path, mimetype) - files[filetype.name] = (filename, content, mimetype) + files = [] + for multipart_file in files_wrap.files: + multipart_content = self.get_updated_content(multipart_file.path) + multipart_key = self.get_updated_content(multipart_file.name) + mimetype = self.get_updated_content(multipart_file.type) if multipart_file.type else None + if os.path.exists(multipart_content): # probably check valid path, then check for exists + mimetype = self.get_mimetype_from_file(multipart_content, mimetype) + multipart_filename = os.path.basename(multipart_content) + multipart_content = open(multipart_content, 'rb') + files.append((multipart_key, (multipart_filename, multipart_content, mimetype))) else: - mimetype = self.get_mimetype_from_buffer(content, mimetype) - files[filetype.name] = (None, content, mimetype) + mimetype = self.get_mimetype_from_buffer(multipart_content, mimetype) + files.append((multipart_key, (None, multipart_content, mimetype))) return Payload(files=files) return Payload() @@ -422,11 +452,12 @@ def get_mimetype_from_buffer(data, mimetype: Optional[str]) -> Optional[str]: def get_output(self): if output := self.http.output: - print(f'output will be written to `{os.path.abspath(output.output)}`') + output_file = self.get_updated_content(output.output) + print(f'output will be written to `{os.path.abspath(output_file)}`') request_logger.debug( - f'output will be written into `{self.file}` is `{os.path.abspath(output.output)}`') + f'output will be written into `{self.file}` is `{os.path.abspath(output_file)}`') try: - return open(output.output, 'wb') + return open(output_file, 'wb') except: request_logger.debug( f'not able to open `{output}`. output will be written to stdout') @@ -476,6 +507,7 @@ def get_auth(self): return auth_wrap.username, auth_wrap.password return None + @functools.lru_cache def get_request(self): prep = self.get_request_notbody() payload = self.get_payload() @@ -518,15 +550,15 @@ def get_curl_output(self): parts = [] if self.http.payload: if self.http.payload.file: - parts.append(('--data', "@" + self.http.payload.file)) + parts.append(('--data', "@" + self.get_updated_content(self.http.payload.file))) elif self.http.payload.fileswrap: payload = self.get_payload() if payload.files: for file in payload.files: - if isinstance(payload.files[file][1], str): - parts.append(('--form', file + "=" + payload.files[file][1])) + if isinstance(file[1][1], str): + parts.append(('--form', file[0] + "=" + file[1][1])) else: - parts.append(('--form', file + "=@" + payload.files[file][1].name)) + parts.append(('--form', file[0] + "=@" + file[1][1].name)) else: payload = self.get_payload() prep.prepare_body(data=payload.data, json=payload.json, files=payload.files) @@ -536,15 +568,21 @@ def get_curl_output(self): class HttpFileFormatter(RequestBase): + def get_updated_content(self, content): + return content + def load(self): self.load_content() self.load_model() + self.prop_cache = {} @staticmethod def format(model): output_str = "" for http in model.allhttps: new_line = "\n" + if namewrap := http.namewrap: + output_str += f"@name(\"{namewrap.name}\"){new_line}" method = http.urlwrap.method if http.urlwrap.method else "GET" output_str += f'{method} "{http.urlwrap.url}"' if auth_wrap := http.basic_auth_wrap: @@ -564,24 +602,33 @@ def format(model): p = "" mime_type = payload.type if data := payload.data: - p = f'data("{data}"{(" ," + mime_type) if mime_type else ""})' + if '"' in data and "'" not in data: + data = f"'{data}'" + elif '"' not in data and "'" in data: + data = f'"{data}"' + else: + # TODO not completely works + # url escaping is done wrong + data = "'" + data.replace("'", "\\'") + "'" + p = f'data({data}{(" ," + mime_type) if mime_type else ""})' if datajson := payload.datajson: - parsed_data = json_or_array_to_json(datajson) + parsed_data = json_or_array_to_json(datajson, lambda a: a) p = f'data({json.dumps(parsed_data, indent=4)})' elif filetype := payload.file: p = f'fileinput("{filetype}",{(" ," + mime_type) if mime_type else ""})' elif json_data := payload.json: - parsed_data = json_or_array_to_json(json_data) + parsed_data = json_or_array_to_json(json_data, lambda a: a) p = f'json({json.dumps(parsed_data, indent=4)})' elif files_wrap := payload.fileswrap: - p2 = "\n\t".join(map( - lambda file_type: f'("{file_type.method}", "{(file_type.method)}"' - f'\'{(" ," + file_type.type) if file_type.type else ""}\')' + p2 = ",\n\t".join(map( + lambda file_type: f'("{file_type.name}", "{(file_type.path)}"' + f'{(" ," + file_type.type) if file_type.type else ""})' , files_wrap.files)) - p = f"files(\n\t{p2}\n)" - output_str += f'\n{p}' + p = f"files({new_line}\t{p2}{new_line})" + output_str += f'{new_line}{p}' if output := http.output: - output_str += f'\noutput({output.output})\n' + output_str += f'{new_line}output({output.output})' + output_str += new_line * 3 return output_str def run(self): diff --git a/dothttp/dsl_jsonparser.py b/dothttp/dsl_jsonparser.py index e3fcd9c..c2c264b 100644 --- a/dothttp/dsl_jsonparser.py +++ b/dothttp/dsl_jsonparser.py @@ -1,23 +1,51 @@ -from typing import Dict, List, Union +import json +from typing import Union, Dict, List -def json_or_array_to_json(model) -> Union[Dict, List]: +def json_or_array_to_json(model, update_content_func) -> Union[Dict, List]: + if isinstance(model, Dict) or isinstance(model, List): + # TODO + # this is bad + # hooking here could lead to other issues + return model if array := model.array: - return [jsonmodel_to_json(value) for value in array.values] + return [update_content_func(value, update_content_func) for value in array.values] elif json_object := model.object: - return {member.key: jsonmodel_to_json(member.value) for member in json_object.members} + return { + # TODO i'm confused about key weather it should be string or int or float (value has float, number, + # boolean, null) but key is unsupported by requests + get_key(member, update_content_func): jsonmodel_to_json(member.value, update_content_func) + for + member in + json_object.members} -def jsonmodel_to_json(model): +def get_key(member, update_content_func): + if member.key: + return update_content_func(member.key) + return get_json_data(member.var, update_content_func) + + +def jsonmodel_to_json(model, update_content_func): if str_value := model.str: - return str_value.value + return update_content_func(str_value.value) + elif var_value := model.var: + return get_json_data(var_value, update_content_func) elif flt := model.flt: return flt.value elif bl := model.bl: return bl.value elif json_object := model.object: - return {member.key: jsonmodel_to_json(member.value) for member in json_object.members} + return {member.key: jsonmodel_to_json(member.value, update_content_func) for member in json_object.members} elif array := model.array: - return [jsonmodel_to_json(value) for value in array.values] + return [jsonmodel_to_json(value, update_content_func) for value in array.values] elif model == 'null': return None + + +def get_json_data(var_value, update_content_func): + content: str = update_content_func(var_value) + try: + return json.loads(content) + except ValueError: + return content diff --git a/dothttp/http.tx b/dothttp/http.tx index a5c6a47..4cdc357 100644 --- a/dothttp/http.tx +++ b/dothttp/http.tx @@ -32,7 +32,10 @@ BASICAUTH: METHODTYPE: "GET" | "POST" | "OPTIONS" | "DELETE" | "CONNECT" | "PUT" - | "HEAD" | "TRACE" + | "HEAD" | "TRACE" | "PATCH" + | "COPY" | "LINK" | "UNLINK" + | "PURGE" | "LOCK" | "UNLOCK" + | "PROPFIND" | "VIEW" ; QUERY: @@ -46,9 +49,11 @@ QUERY: PAYLOAD: ( 'data' '(' data=STRING (',' type=STRING)? ','? ')' | 'data' '(' datajson=JSON (',' type=STRING)? ','? ')' + | 'urlencoded' '(' datajson=JSON (',' type=STRING)? ','? ')' | 'fileinput' '(' file=STRING (',' type=STRING)? ','? ')' | ('json' '(' json=JSON ')' ) | (fileswrap=FILES) + | (form=FILES) ) ; @@ -71,7 +76,7 @@ Array: ; Value: - str=String | flt=Float | bl=Bool | object=Object | array=Array | null="null" + str=String | var=VarString | flt=Float | bl=Bool | object=Object | array=Array | null="null" ; @@ -92,7 +97,8 @@ Object: ; Member: - key=STRING ':' value=Value + (key=STRING ':' value=Value) | + (var=VarString ':' value=Value) ; Comment: UnixComment | CommentLine | CommentBlock @@ -108,6 +114,10 @@ CommentBlock: /\/\*(.|\n)*?\*\// ; +VarString: + "{{" /\w+/ ("=" (/\w+/ | STRING))? "}}" +; + DotString: STRING | /\w+/ ; diff --git a/dothttp/parse_models.py b/dothttp/parse_models.py index 2bebe1f..a3acab2 100644 --- a/dothttp/parse_models.py +++ b/dothttp/parse_models.py @@ -1,5 +1,5 @@ from dataclasses import dataclass -from typing import List, Optional, Dict +from typing import List, Optional @dataclass @@ -24,11 +24,13 @@ class BasicAuth: password: str +@dataclass class Query: key: str value: str +@dataclass class Header: key: str value: str @@ -55,10 +57,10 @@ class FilesWrap: @dataclass class Payload: data: Optional[str] - datajson: Optional[Dict] + datajson: Optional file: Optional[str] json: Optional - fileswrap: FilesWrap + fileswrap: Optional[FilesWrap] type: Optional[str] @@ -71,10 +73,10 @@ class ToFile: class Http: namewrap: Optional[NameWrap] urlwrap: UrlWrap - basic_auth_wrap: BasicAuth - lines: List[Line] - payload: Payload - output: ToFile + basic_auth_wrap: Optional[BasicAuth] + lines: Optional[List[Line]] + payload: Optional[Payload] + output: Optional[ToFile] @dataclass diff --git a/examples/example.http b/examples/example.http index 921bbb6..21f4786 100644 --- a/examples/example.http +++ b/examples/example.http @@ -37,7 +37,7 @@ basicauth('username', 'password') json --> signifies payload is json data */ json({ - "name": "{{name=adam}}", # name is templated, if spcified via env or property, it will be replaced + "{{name2222}}": "{{name=adam}}", # name is templated, if spcified via env or property, it will be replaced "org": "dothttp", "location": "Hyderabad", # "interests": ["exploring", "listening to music"], @@ -51,7 +51,8 @@ POST https://req.dothttp.dev/post # define headers in .dothttp.json with env basicauth("{{username}}", "{{password}}") -data({ +json({ + "height": {{height}}, "name": "Adam A", "org": "dothttp", "location": "Hyderabad", diff --git a/setup.py b/setup.py index c59e826..9102c05 100644 --- a/setup.py +++ b/setup.py @@ -1,5 +1,5 @@ import os -import sys + from setuptools import setup, find_packages @@ -20,7 +20,7 @@ def requirements(): setup( name="dothttp_req", - version="0.0.5", + version="0.0.7", author="prasanth", author_email="kesavarapu.siva@gmail.com", description=("DotHttp recommended tool for making http requests."), diff --git a/test/core/multi/multi.http b/test/core/multi/multi.http new file mode 100644 index 0000000..1931d35 --- /dev/null +++ b/test/core/multi/multi.http @@ -0,0 +1,77 @@ +@name('request with no vars') +GET https://req.dothttp.dev +? querykey: queryvalue + +@name('request with vars used') +POST https://req.dothttp.dev +? "{{querykey}}": queryvalue +data({ + "{{querykey}}": "queryvalue", + "querykey": "{{querykey}}" +}) + + +@name('request with infile json') +POST https://req.dothttp.dev +? "{{querykey}}": queryvalue +json({ + "{{querykey}}": "queryvalue", + "querykey": "{{querykey}}" +}) + + +@name('request infile with boolean') +POST https://req.dothttp.dev +? "{{querykey}}": queryvalue +json({ + "ram": "ranga", + "itsstring": "{{querykey}}", + "itsstring2": {{itsactualstring=ramchandra}}, + "itsint": {{querykey}}, + "itsint2": {{int=29}}, + "itsfloat": {{float="2.3"}}, + "itstrue": {{true=true}}, + "itsfalse": {{false=false}}, + "itsonemoretrue": {{true}}, + "itsonemorefalse": {{false}}, + "itsnull": {{null=null}}, + "itsarray": [ + '{{querykey}}', + {{itsactualstring}}, + {{querykey}}, + {{int}}, + {{true}}, + {{false}}, + {{null}}, + {{float}}, + ], + // currently float has issue +}) + + +@name('request with vars defined, expects from outside') +POST https://req.dothttp.dev +? "{{querykey=10}}": queryvalue +json({ + "ram": 'ranga', + "itsstring": "{{querykey2}}", + "itsstring2": {{itsactualstring2}}, + "itsint": {{querykey2}}, + "itsint2": {{int2}}, + "itsfloat": {{float2}}, + "itstrue": {{true2}}, + "itsfalse": {{false2}}, + "itsonemoretrue": {{true2}}, + "itsonemorefalse": {{false2}}, + "itsnull": {{null2}}, + "itsarray": [ + '{{querykey2}}', + {{itsactualstring2}}, + {{querykey2}}, + {{int2}}, + {{true2}}, + {{false2}}, + {{null2}}, + {{float2}}, + ], +}) diff --git a/test/core/test_multidef_http.py b/test/core/test_multidef_http.py new file mode 100644 index 0000000..189189f --- /dev/null +++ b/test/core/test_multidef_http.py @@ -0,0 +1,62 @@ +import requests + +from dothttp import PropertyNotFoundException +from test import TestBase +from test.core.test_request import dir_path + +base_dir = f"{dir_path}/multi" + +session = requests.session() + + +class Multiplefiletest(TestBase): + testpostdata = b'{"ram": "ranga", "itsstring": "10", "itsstring2": "ramchandra", "itsint": 10, "itsint2": 29, ' \ + b'"itsfloat": 2.3, "itstrue": true, "itsfalse": false, "itsonemoretrue": true, "itsonemorefalse": false, ' \ + b'"itsnull": null, "itsarray": ["10", "ramchandra", 10, 29, true, false, null, 2.3]}' + + def test_simple_http(self): + # should run as it is not using any vars + req = self.get_request(f"{base_dir}/multi.http", target=1) + self.assertEqual("https://req.dothttp.dev/?querykey=queryvalue", req.url) + + def test_infiledefined_data_prop_http(self): + # should run as it is not using any vars + req = self.get_request(f"{base_dir}/multi.http", target=2) + self.assertEqual("https://req.dothttp.dev/?10=queryvalue", req.url) + self.assertEqual("10=queryvalue&querykey=10", req.body) + + def test_infiledefined_json_prop_http(self): + # should run as it is not using any vars + req = self.get_request(f"{base_dir}/multi.http", target=3) + self.assertEqual("https://req.dothttp.dev/?10=queryvalue", req.url) + self.assertEqual(b'{"10": "queryvalue", "querykey": "10"}', req.body) + + def test_infiledefined_json_complex_prop_http(self): + # should run as it is not using any vars + req = self.get_request(f"{base_dir}/multi.http", target=4) + self.assertEqual("https://req.dothttp.dev/?10=queryvalue", req.url) + self.assertEqual( + self.testpostdata, + req.body) + + def test_infileunresolved_prop_http(self): + # should run as it is not using any vars + with self.assertRaises(PropertyNotFoundException): + # should fail with unresolved properties + self.get_request(f"{base_dir}/multi.http", target=5) + # should pass when required properties are sent + req = self.get_request(f"{base_dir}/multi.http", + target=5, + properties=["querykey2=10", + "itsactualstring2=ramchandra", + "querykey2=10", + "int2=29", + "float2=2.3", + "true2=true", + "false2=false", + "null2=null", + ]) + self.assertEqual("https://req.dothttp.dev/?10=queryvalue", req.url) + self.assertEqual( + self.testpostdata, + req.body) diff --git a/test/core/test_request.py b/test/core/test_request.py index b7e14cb..e882b76 100644 --- a/test/core/test_request.py +++ b/test/core/test_request.py @@ -1,8 +1,9 @@ import os +import sys import tempfile import unittest -from dothttp import HttpFileException, HttpFileFormatter, CurlCompiler, HttpFileNotFoundException +from dothttp import HttpFileException, CurlCompiler from test import TestBase dir_path = os.path.dirname(os.path.realpath(__file__)) @@ -71,14 +72,14 @@ def test_curl_print(self): def test_format_print(self): req = self.get_req_comp(f"{base_dir}/redirect.http", format=True, stdout=True) req.load() - output = HttpFileFormatter.format(req.model) - self.assertEqual('GET "http://endeavour.today/"', output) + output = req.format(req.model) + self.assertEqual('GET "http://endeavour.today/"\n\n\n', output) print(output) def test_format2_print(self): req = self.get_req_comp(f"{sub_dir}/multipleenv.http", format=True, stdout=True) req.load() - output = HttpFileFormatter.format(req.model) + output = req.format(req.model) self.assertEqual("""POST "https://{{host1}}/ram" ? ("{{queryname1}}", "value1") ? ("key2", "{{valuename1}}") @@ -86,8 +87,11 @@ def test_format2_print(self): "{{queryname2}}": "{{valuename2}}" }) output(test) + + """, output) + @unittest.skipUnless(sys.platform.startswith("win"), "requires Windows") def test_multiline_curl(self): with tempfile.NamedTemporaryFile(delete=False) as f: # with files @@ -106,6 +110,27 @@ def test_multiline_curl(self): -H 'Content-Length: 13' \\ -H 'Content-Type: application/json' \\ -d '{"hi": "hi2"}' \\ +https://httpbin.org/post''', self.get_curl_out(f, 3)) + + @unittest.skipUnless(sys.platform.startswith("linux"), "requires linux") + def test_multiline_curl_linux(self): + with tempfile.NamedTemporaryFile(delete=False) as f: + # with files + self.assertEqual(f'''curl -X POST \\ +--form test=@{f.name} \\ +--form hi=hi2 \\ +https://httpbin.org/post''', self.get_curl_out(f)) + + # with file input + self.assertEqual(f'''curl -X POST \\ +--data @{f.name} \\ +https://httpbin.org/post''', self.get_curl_out(f, 2)) + + # with json out + self.assertEqual('''curl -X POST \\ +-H 'Content-Length: 13' \\ +-H 'Content-Type: application/json' \\ +-d '{"hi": "hi2"}' \\ https://httpbin.org/post''', self.get_curl_out(f, 3)) def get_curl_out(self, f, target=1): diff --git a/test/core/test_substition.py b/test/core/test_substition.py index 0ba0051..4b073c9 100644 --- a/test/core/test_substition.py +++ b/test/core/test_substition.py @@ -48,36 +48,36 @@ def test_substitution_infile_with_quotes(self): def test_substitution_infile_with_multiple_suages(self): req: PreparedRequest = self.get_request(f"{base_dir}/infilesinglewithmultipleusages.http") - self.assertEquals("https://google.com/", req.url) - self.assertEquals(b'{"google.com": "google.com"}', req.body) + self.assertEqual("https://google.com/", req.url) + self.assertEqual(b'{"google.com": "google.com"}', req.body) def test_define_on_second_occurence(self): req: PreparedRequest = self.get_request(f"{base_dir}/definevariableonsecond.http") - self.assertEquals("https://dothttp.dev/", req.url) - self.assertEquals('dothttp.dev', req.body) + self.assertEqual("https://dothttp.dev/", req.url) + self.assertEqual('dothttp.dev', req.body) def test_substitution_preference(self): ## command line > env (last env > first env) > infile req: PreparedRequest = self.get_request(f"{base_dir}/simpleinfile.http", prop=f"{base_dir}/simepleinfile.json") - self.assertEquals("https://google.com/", req.url) + self.assertEqual("https://google.com/", req.url) req: PreparedRequest = self.get_request(f"{base_dir}/simpleinfile.http", prop=f"{base_dir}/simepleinfile.json", env=["env1"] ) - self.assertEquals("https://yahoo.com/", req.url) + self.assertEqual("https://yahoo.com/", req.url) req: PreparedRequest = self.get_request(f"{base_dir}/simpleinfile.http", prop=f"{base_dir}/simepleinfile.json", env=["env1", "env2"] ) - self.assertEquals("https://ins.com/", req.url) + self.assertEqual("https://ins.com/", req.url) req: PreparedRequest = self.get_request(f"{base_dir}/simpleinfile.http", prop=f"{base_dir}/simepleinfile.json", env=["env1", "env2", "env3"] ) - self.assertEquals("https://hub.com/", req.url) + self.assertEqual("https://hub.com/", req.url) req: PreparedRequest = self.get_request(f"{base_dir}/simpleinfile.http", prop=f"{base_dir}/simepleinfile.json", env=["env1", "env2", "env3"], properties=["host=ramba.com"] ) - self.assertEquals("https://ramba.com/", req.url) + self.assertEqual("https://ramba.com/", req.url)