From 75f63758e2136c5e45f9f088c56fd13d2aac1e09 Mon Sep 17 00:00:00 2001 From: Wu Clan Date: Tue, 5 Mar 2024 19:15:46 +0800 Subject: [PATCH] =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E5=85=A8=E5=B1=80=E8=AF=B7?= =?UTF-8?q?=E6=B1=82=E8=AE=A4=E8=AF=81header=5Fcookie=E6=96=B9=E6=B3=95=20?= =?UTF-8?q?(#147)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- httpfpt/core/auth.yaml | 17 +++++- httpfpt/db/redis_db.py | 1 + httpfpt/enums/request/auth.py | 3 +- httpfpt/utils/auth_plugins.py | 66 ++++++++++++++------- httpfpt/utils/request/request_data_parse.py | 10 +++- 5 files changed, 71 insertions(+), 26 deletions(-) diff --git a/httpfpt/core/auth.yaml b/httpfpt/core/auth.yaml index 0031134..0889c7c 100644 --- a/httpfpt/core/auth.yaml +++ b/httpfpt/core/auth.yaml @@ -2,7 +2,9 @@ is_auth: false # 认证类型,请填写为认证方式的键值,如:bearer_token auth_type: bearer_token -# token认证 +############### +## token 认证 +############### bearer_token: url: https://api.pity.fun/auth/login username: tester @@ -12,5 +14,16 @@ bearer_token: User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36 # jsonpath表达式,用于从响应中提取token token_key: $.data.token - # token有效期,单位秒 + # token有效期,单位: 秒 + timeout: 100000 +################ +## cookie 认证 +################ +header_cookie: + url: xxx + username: xxx + password: xxx + headers: + Content-Type: application/json + User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36 timeout: 100000 diff --git a/httpfpt/db/redis_db.py b/httpfpt/db/redis_db.py index 6c909a5..e71fe63 100644 --- a/httpfpt/db/redis_db.py +++ b/httpfpt/db/redis_db.py @@ -22,6 +22,7 @@ def __init__(self) -> None: ) self.prefix = 'httpfpt' self.token_prefix = f'{self.prefix}:token' + self.cookie_prefix = f'{self.prefix}:cookie' self.case_data_prefix = f'{self.prefix}:case_data' self.case_id_file_prefix = f'{self.prefix}:case_id_file' diff --git a/httpfpt/enums/request/auth.py b/httpfpt/enums/request/auth.py index 156887a..730dab5 100644 --- a/httpfpt/enums/request/auth.py +++ b/httpfpt/enums/request/auth.py @@ -4,4 +4,5 @@ class AuthType(StrEnum): - bearer_token = 'bearer_token' + TOKEN = 'bearer_token' + COOKIE = 'header_cookie' diff --git a/httpfpt/utils/auth_plugins.py b/httpfpt/utils/auth_plugins.py index ff75eff..4670f80 100644 --- a/httpfpt/utils/auth_plugins.py +++ b/httpfpt/utils/auth_plugins.py @@ -1,12 +1,14 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- +import json + from functools import lru_cache import requests from jsonpath import findall -from httpfpt.common.errors import AuthError +from httpfpt.common.errors import AuthError, SendRequestError from httpfpt.common.yaml_handler import read_yaml from httpfpt.core.path_conf import AUTH_CONF_PATH from httpfpt.db.redis_db import redis_client @@ -21,36 +23,60 @@ def __init__(self) -> None: self.auth_type = self.auth_data['auth_type'] if not getattr(AuthType, self.auth_type, None): raise AuthError(f'认证类型错误, 允许 {get_enum_values(AuthType)} 之一, 请检查认证配置文件') + self.url = self.auth_data[f'{self.auth_type}']['url'] + self.username = self.auth_data[f'{self.auth_type}']['username'] + self.password = self.auth_data[f'{self.auth_type}']['password'] + self.headers = self.auth_data[f'{self.auth_type}']['headers'] + self.timeout = self.auth_data[f'{self.auth_type}']['timeout'] or 86400 @lru_cache def get_auth_data(self) -> dict: auth_data = read_yaml(AUTH_CONF_PATH, filename='auth.yaml') return auth_data - @property - def bearer_token(self) -> str: - url = self.auth_data[f'{self.auth_type}']['url'] - username = self.auth_data[f'{self.auth_type}']['username'] - password = self.auth_data[f'{self.auth_type}']['password'] - headers = self.auth_data[f'{self.auth_type}']['headers'] - headers.update({'Connection': 'close'}) - timeout = self.auth_data[f'{self.auth_type}']['timeout'] or 86400 - cache_bearer_token = redis_client.get(f'{redis_client.token_prefix}:{url}', logging=False) - if cache_bearer_token: - token = cache_bearer_token - else: + def request_auth(self) -> requests.Response: + try: request_data = { - 'url': url, - 'data': {'username': username, 'password': password}, - 'headers': headers, + 'url': self.url, + 'data': {'username': self.username, 'password': self.password}, + 'headers': self.headers, 'proxies': {'http': None, 'https': None}, } - if 'application/json' in str(headers): + if 'application/json' in str(self.headers): request_data.update({'json': request_data.pop('data')}) response = requests.post(**request_data) - jp_token = findall(self.auth_data[f'{self.auth_type}']['token_key'], response.json()) + response.raise_for_status() + except Exception as e: + raise SendRequestError(f'授权接口请求响应异常: {e}') + return response + + @property + def bearer_token(self) -> str: + self.headers.update({'Connection': 'close'}) + cache_bearer_token = redis_client.get(f'{redis_client.token_prefix}:{self.url}', logging=False) + if cache_bearer_token: + token = cache_bearer_token + else: + res = self.request_auth() + jp_token = findall(self.auth_data[f'{self.auth_type}']['token_key'], res.json()) token = jp_token[0] if not token: - raise AuthError('Token 获取失败,请检查登录接口响应或 token_key 表达式') - redis_client.set(f'{redis_client.token_prefix}:{url}', token, ex=timeout) + raise AuthError('Token 获取失败,请检查登录接口响应或 token 提取表达式') + redis_client.set(f'{redis_client.token_prefix}:{self.url}', token, ex=self.timeout) return token + + @property + def header_cookie(self) -> dict: + cache_cookie = redis_client.get(f'{redis_client.cookie_prefix}:{self.url}', logging=False) + if cache_cookie: + cookies = json.loads(cache_cookie) + else: + res = self.request_auth() + res_cookie = res.cookies + cookies = {k: v for k, v in res_cookie.items()} + if not cookies: + raise AuthError('Cookie 获取失败,请检查登录接口响应') + redis_client.set( + f'{redis_client.cookie_prefix}:{self.url}', json.dumps(cookies, ensure_ascii=False), ex=self.timeout + ) + return cookies diff --git a/httpfpt/utils/request/request_data_parse.py b/httpfpt/utils/request/request_data_parse.py index ac2725e..e71bf3b 100644 --- a/httpfpt/utils/request/request_data_parse.py +++ b/httpfpt/utils/request/request_data_parse.py @@ -373,9 +373,8 @@ def headers(self) -> dict | None: raise RequestDataParseError(_error_msg('参数 test_steps:request:headers 为空')) auth = AuthPlugins() if auth.is_auth: - if auth.auth_type == AuthType.bearer_token: - token = auth.bearer_token - bearer_token = {'Authorization': f'Bearer {token}'} + if auth.auth_type == AuthType.TOKEN: + bearer_token = {'Authorization': f'Bearer {auth.bearer_token}'} headers = headers.update(bearer_token) if headers else bearer_token return headers @@ -388,6 +387,11 @@ def cookies(self) -> dict | None: if cookies is not None: if not isinstance(cookies, dict): raise RequestDataParseError(_error_msg('参数 test_steps:request:cookies 不是有效的 dict 类型')) + auth = AuthPlugins() + if auth.is_auth: + if auth.auth_type == AuthType.COOKIE: + header_cookie = auth.header_cookie + cookies = cookies.update(header_cookie) if cookies else header_cookie return cookies @property