Skip to content

Commit

Permalink
添加全局请求认证header_cookie方法 (#147)
Browse files Browse the repository at this point in the history
  • Loading branch information
wu-clan authored Mar 5, 2024
1 parent 560aa4f commit 75f6375
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 26 deletions.
17 changes: 15 additions & 2 deletions httpfpt/core/auth.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
is_auth: false
# 认证类型,请填写为认证方式的键值,如:bearer_token
auth_type: bearer_token
# token认证
###############
## token 认证
###############
bearer_token:
url: https://api.pity.fun/auth/login
username: tester
Expand All @@ -12,5 +14,16 @@ bearer_token:
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36
# jsonpath表达式,用于从响应中提取token
token_key: $.data.token
# token有效期,单位秒
# token有效期,单位: 秒
timeout: 100000
################
## cookie 认证
################
header_cookie:
url: xxx
username: xxx
password: xxx
headers:
Content-Type: application/json
User-Agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36
timeout: 100000
1 change: 1 addition & 0 deletions httpfpt/db/redis_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ def __init__(self) -> None:
)
self.prefix = 'httpfpt'
self.token_prefix = f'{self.prefix}:token'
self.cookie_prefix = f'{self.prefix}:cookie'
self.case_data_prefix = f'{self.prefix}:case_data'
self.case_id_file_prefix = f'{self.prefix}:case_id_file'

Expand Down
3 changes: 2 additions & 1 deletion httpfpt/enums/request/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@


class AuthType(StrEnum):
bearer_token = 'bearer_token'
TOKEN = 'bearer_token'
COOKIE = 'header_cookie'
66 changes: 46 additions & 20 deletions httpfpt/utils/auth_plugins.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import json

from functools import lru_cache

import requests

from jsonpath import findall

from httpfpt.common.errors import AuthError
from httpfpt.common.errors import AuthError, SendRequestError
from httpfpt.common.yaml_handler import read_yaml
from httpfpt.core.path_conf import AUTH_CONF_PATH
from httpfpt.db.redis_db import redis_client
Expand All @@ -21,36 +23,60 @@ def __init__(self) -> None:
self.auth_type = self.auth_data['auth_type']
if not getattr(AuthType, self.auth_type, None):
raise AuthError(f'认证类型错误, 允许 {get_enum_values(AuthType)} 之一, 请检查认证配置文件')
self.url = self.auth_data[f'{self.auth_type}']['url']
self.username = self.auth_data[f'{self.auth_type}']['username']
self.password = self.auth_data[f'{self.auth_type}']['password']
self.headers = self.auth_data[f'{self.auth_type}']['headers']
self.timeout = self.auth_data[f'{self.auth_type}']['timeout'] or 86400

@lru_cache
def get_auth_data(self) -> dict:
auth_data = read_yaml(AUTH_CONF_PATH, filename='auth.yaml')
return auth_data

@property
def bearer_token(self) -> str:
url = self.auth_data[f'{self.auth_type}']['url']
username = self.auth_data[f'{self.auth_type}']['username']
password = self.auth_data[f'{self.auth_type}']['password']
headers = self.auth_data[f'{self.auth_type}']['headers']
headers.update({'Connection': 'close'})
timeout = self.auth_data[f'{self.auth_type}']['timeout'] or 86400
cache_bearer_token = redis_client.get(f'{redis_client.token_prefix}:{url}', logging=False)
if cache_bearer_token:
token = cache_bearer_token
else:
def request_auth(self) -> requests.Response:
try:
request_data = {
'url': url,
'data': {'username': username, 'password': password},
'headers': headers,
'url': self.url,
'data': {'username': self.username, 'password': self.password},
'headers': self.headers,
'proxies': {'http': None, 'https': None},
}
if 'application/json' in str(headers):
if 'application/json' in str(self.headers):
request_data.update({'json': request_data.pop('data')})
response = requests.post(**request_data)
jp_token = findall(self.auth_data[f'{self.auth_type}']['token_key'], response.json())
response.raise_for_status()
except Exception as e:
raise SendRequestError(f'授权接口请求响应异常: {e}')
return response

@property
def bearer_token(self) -> str:
self.headers.update({'Connection': 'close'})
cache_bearer_token = redis_client.get(f'{redis_client.token_prefix}:{self.url}', logging=False)
if cache_bearer_token:
token = cache_bearer_token
else:
res = self.request_auth()
jp_token = findall(self.auth_data[f'{self.auth_type}']['token_key'], res.json())
token = jp_token[0]
if not token:
raise AuthError('Token 获取失败,请检查登录接口响应或 token_key 表达式')
redis_client.set(f'{redis_client.token_prefix}:{url}', token, ex=timeout)
raise AuthError('Token 获取失败,请检查登录接口响应或 token 提取表达式')
redis_client.set(f'{redis_client.token_prefix}:{self.url}', token, ex=self.timeout)
return token

@property
def header_cookie(self) -> dict:
cache_cookie = redis_client.get(f'{redis_client.cookie_prefix}:{self.url}', logging=False)
if cache_cookie:
cookies = json.loads(cache_cookie)
else:
res = self.request_auth()
res_cookie = res.cookies
cookies = {k: v for k, v in res_cookie.items()}
if not cookies:
raise AuthError('Cookie 获取失败,请检查登录接口响应')
redis_client.set(
f'{redis_client.cookie_prefix}:{self.url}', json.dumps(cookies, ensure_ascii=False), ex=self.timeout
)
return cookies
10 changes: 7 additions & 3 deletions httpfpt/utils/request/request_data_parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -373,9 +373,8 @@ def headers(self) -> dict | None:
raise RequestDataParseError(_error_msg('参数 test_steps:request:headers 为空'))
auth = AuthPlugins()
if auth.is_auth:
if auth.auth_type == AuthType.bearer_token:
token = auth.bearer_token
bearer_token = {'Authorization': f'Bearer {token}'}
if auth.auth_type == AuthType.TOKEN:
bearer_token = {'Authorization': f'Bearer {auth.bearer_token}'}
headers = headers.update(bearer_token) if headers else bearer_token
return headers

Expand All @@ -388,6 +387,11 @@ def cookies(self) -> dict | None:
if cookies is not None:
if not isinstance(cookies, dict):
raise RequestDataParseError(_error_msg('参数 test_steps:request:cookies 不是有效的 dict 类型'))
auth = AuthPlugins()
if auth.is_auth:
if auth.auth_type == AuthType.COOKIE:
header_cookie = auth.header_cookie
cookies = cookies.update(header_cookie) if cookies else header_cookie
return cookies

@property
Expand Down

0 comments on commit 75f6375

Please sign in to comment.