From 6dd36dc9e2ea100d815d5da7be05070b708cce2f Mon Sep 17 00:00:00 2001 From: boringcat Date: Thu, 28 Dec 2023 18:03:47 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E6=96=B0=E6=94=AF=E6=8C=81?= =?UTF-8?q?=20=E6=94=AF=E6=8C=81Argon2Id=E7=AE=97=E6=B3=95=20=E6=94=AF?= =?UTF-8?q?=E6=8C=81=E8=A7=A3=E5=AF=86=E7=BB=84=E7=BB=87=E6=95=B0=E6=8D=AE?= =?UTF-8?q?=20=E9=A2=84=E7=95=99API=E7=99=BB=E5=BD=95=E6=8E=A5=E5=8F=A3?= =?UTF-8?q?=EF=BC=88=E4=BD=86=E6=B2=A1=E5=8D=B5=E7=94=A8=EF=BC=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 27 +- requirements.txt | 3 +- setup.py | 3 +- vaultwarden/__init__.py | 4 + vaultwarden/api.py | 475 +++++++++++++++--------------- vaultwarden/errors.py | 5 + vaultwarden/response.py | 634 ++++++++++++++++++++++++++++++++++++++++ vaultwarden/types.py | 31 ++ 8 files changed, 931 insertions(+), 251 deletions(-) create mode 100644 vaultwarden/errors.py create mode 100644 vaultwarden/response.py create mode 100644 vaultwarden/types.py diff --git a/README.md b/README.md index bc3b858..f1d02ce 100644 --- a/README.md +++ b/README.md @@ -6,23 +6,26 @@ ## 依赖要求 注:requirement.txt 附带的版本号为开发使用的版本 -- python >= 3 - 请根据 pycrypto 支持情况自行决定Python版本 +- python>=3 - requests>=2.31.0 - passlib>=1.7.4 - hkdf>=0.0.3 - cryptography>=41.0.5 +- argon2-cffi>=23.1.0 ## 使用方法 ```python -from api import VaultwardenAPI -vapi = VaultwardenAPI(baseUrl, email, masterPassword) # 配置服务器与账号消息 -vapi.fullLogin() # 登录 -vapi.sync(86400) # 如果与上次同步间隔小于86400秒(一天)则不同步 -alist = vapi.searchByName('somename') # 模糊搜索 -blist = vapi.searchByName('JustThisName', equal = True) # 全匹配搜索 -allItemInFolders = vapi.getListByFolder('somefoldername') # 模糊搜索目录 -allItemInOneFolder = vapi.getListByFolder('JustThisFolder', equal = True) # 全匹配目录 -allCiphers = vapi.decryptedCiphers # 拿所有解密后的 Ciphers -allFolders = vapi.decryptedFolders # 拿所有解密后的 Folders +from vaultwarden import * # VaultwardenAPI, UriMatchType +with VaultwardenAPI(baseUrl, email, masterPassword) as vapi: # 配置服务器与账号消息 + vapi.fullLogin() # 登录 + vapi.sync(86400) # 如果与上次同步间隔小于86400秒(一天)则不同步 + alist = list(vapi.searchCiphers(name = 'somename')) # 模糊搜索 + blist = list(vapi.searchCiphers(name = 'JustThisName', equal = True)) # 全匹配搜索 + blist = list(vapi.searchCiphers(domain = 'http://github.com', domain_type = UriMatchType.Host)) + # 匹配域名查询 + allItemInFolders = list(vapi.searchFolders(name = 'somefoldername')) # 模糊搜索目录 + allItemInOneFolder = list(vapi.searchFolders(name = 'JustThisFolder', equal = True)) + # 全匹配目录 + allCiphers = vapi.Ciphers # 拿所有解密后的 Ciphers + allFolders = vapi.Folders # 拿所有解密后的 Folders ``` \ No newline at end of file diff --git a/requirements.txt b/requirements.txt index 5bfd6a0..de805c3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,5 @@ requests>=2.31.0 passlib>=1.7.4 hkdf>=0.0.3 -cryptography>=41.0.5 \ No newline at end of file +cryptography>=41.0.5 +argon2-cffi>=23.1.0 \ No newline at end of file diff --git a/setup.py b/setup.py index e7c25e8..423b7e9 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ setup( name="python-vaultwarden", - version="0.0.3", + version="0.0.4", author="BoringCat", author_email="c654477757@gmail.com", description="Simple VaultWarden API for Python", @@ -17,6 +17,7 @@ 'passlib>=1.7.4', 'hkdf>=0.0.3', 'cryptography>=41.0.5', + 'argon2-cffi>=23.1.0', ], python_requires='>=3', ) \ No newline at end of file diff --git a/vaultwarden/__init__.py b/vaultwarden/__init__.py index e69de29..c00cf4d 100644 --- a/vaultwarden/__init__.py +++ b/vaultwarden/__init__.py @@ -0,0 +1,4 @@ +from .api import VaultwardenAPI +from .types import UriMatchType + +__all__ = ['VaultwardenAPI', 'UriMatchType'] \ No newline at end of file diff --git a/vaultwarden/api.py b/vaultwarden/api.py index 58dfeef..ad679a5 100644 --- a/vaultwarden/api.py +++ b/vaultwarden/api.py @@ -1,24 +1,26 @@ +import re import os import hmac +import gzip import json import uuid import hashlib +import logging import requests +import typing as _t from copy import deepcopy +from traceback import format_exc +from urllib.parse import urlparse from base64 import b64decode, b64encode from datetime import datetime, timedelta -from typing import Callable, Dict, Tuple -from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes +from argon2.low_level import Type, hash_secret_raw from cryptography.hazmat.primitives import hashes, serialization from cryptography.hazmat.primitives.asymmetric import rsa, padding +from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes -TYPE_AESCBC256_B64 = 0 -TYPE_AESCBC128_HMACSHA256_B64 = 1 -TYPE_AESCBC256_HMACSHA256_B64 = 2 -TYPE_RSA2048_OAEPSHA256_B64 = 3 -TYPE_RSA2048_OAEPSHA1_B64 = 4 -TYPE_RSA2048_OAEPSHA256_HMACSHA256_B64 = 5 -TYPE_RSA2048_OAEPSHA1_HMACSHA256_B64 = 6 +from .response import SyncResponse, CipherResponse +from .types import * +from .errors import * class CipherString(): @property @@ -35,47 +37,46 @@ def CT_B64_str(self): return b64encode(self.__ct).decode('UTF-8') def MAC(self): return self.__mac @property def MAC_B64_str(self): return b64encode(self.__mac).decode('UTF-8') - __type:int = None + __type:EncryptionType = None __iv:bytes = None __ct:bytes = None __mac:bytes = None def __init__(self, data:str) -> None: + self.__logger = logging.getLogger(__package__).getChild(type(self).__name__) t, edata = data.split('.', 1) - self.__type = int(t) + self.__logger.debug('EncryptionType: %s', t) + self.__type = EncryptionType(int(t)) datas = edata.split('|') - if self.Type == TYPE_AESCBC256_B64: + self.__logger.debug('datas: %s', datas) + if self.Type == EncryptionType.AesCbc256_B64: self.__iv = b64decode(datas[0].encode('UTF-8')) self.__ct = b64decode(datas[1].encode('UTF-8')) - elif self.Type in [TYPE_AESCBC128_HMACSHA256_B64, TYPE_AESCBC256_HMACSHA256_B64]: + elif self.Type in [EncryptionType.AesCbc128_HmacSha256_B64, EncryptionType.AesCbc256_HmacSha256_B64]: self.__iv = b64decode(datas[0].encode('UTF-8')) self.__ct = b64decode(datas[1].encode('UTF-8')) self.__mac = b64decode(datas[2].encode('UTF-8')) - elif self.Type in [TYPE_RSA2048_OAEPSHA256_B64, TYPE_RSA2048_OAEPSHA1_B64]: + elif self.Type in [EncryptionType.Rsa2048_OaepSha256_B64, EncryptionType.Rsa2048_OaepSha1_B64]: self.__ct = b64decode(datas[0].encode('UTF-8')) + else: + raise DecryptUnSupportTypeError(t) def HasMAC(self) -> bool: - return self.Type not in [TYPE_AESCBC256_B64, TYPE_RSA2048_OAEPSHA256_B64, TYPE_RSA2048_OAEPSHA1_B64] + return self.Type not in [EncryptionType.AesCbc256_B64, EncryptionType.Rsa2048_OaepSha256_B64, EncryptionType.Rsa2048_OaepSha1_B64] def IsZero(self) -> bool: - return bool(self.Type) and bool(self.__iv) and bool(self.__ct) and bool(self.__mac) + return self.Type is None and self.__iv is None and self.__ct is None and self.__mac is None def __str__(self) -> str: if self.IsZero(): return "" - if self.Type == TYPE_AESCBC256_B64: - return '%d.%s.%s' % (self.Type, self.IV_B64_str, self.CT_B64_str) - elif self.Type in [TYPE_AESCBC128_HMACSHA256_B64, TYPE_AESCBC256_HMACSHA256_B64]: - return '%d.%s.%s.%s' % (self.Type, self.IV_B64_str, self.CT_B64_str, self.MAC_B64_str) - elif self.Type in [TYPE_RSA2048_OAEPSHA256_B64, TYPE_RSA2048_OAEPSHA1_B64]: - return '%d.%s' % (self.Type, self.CT_B64_str) + if self.Type == EncryptionType.AesCbc256_B64: + return '%d.%s.%s' % (self.Type.value, self.IV_B64_str, self.CT_B64_str) + elif self.Type in [EncryptionType.AesCbc128_HmacSha256_B64, EncryptionType.AesCbc256_HmacSha256_B64]: + return '%d.%s.%s.%s' % (self.Type.value, self.IV_B64_str, self.CT_B64_str, self.MAC_B64_str) + elif self.Type in [EncryptionType.Rsa2048_OaepSha256_B64, EncryptionType.Rsa2048_OaepSha1_B64]: + return '%d.%s' % (self.Type.value, self.CT_B64_str) return "" - -class HttpStatusErrror(Exception): pass -class UnSupportKdfError(Exception): pass -class DecryptHmacMisMatchError(Exception): pass -class DecryptUnSupportTypeError(Exception): pass - class VaultwardenAPI(): @property def session(self): return self.__session @@ -88,23 +89,38 @@ def Kdf(self): return (self.__ident or self.__kdf)['Kdf'] @property def KdfIterations(self): return (self.__ident or self.__kdf)['KdfIterations'] @property + def KdfMemory(self): return (self.__ident or self.__kdf)['KdfMemory'] * 1024 + @property + def KdfParallelism(self): return (self.__ident or self.__kdf)['KdfParallelism'] + @property def masterKey(self): if not self.__masterKey: - self.__masterKey = hashlib.pbkdf2_hmac( - hash_name = 'sha256', - password = self.__masterPassword.encode('UTF-8'), - salt = self.__email.encode('UTF-8'), - iterations = self.KdfIterations - ) + if self.Kdf == 0: + self.__masterKey = hashlib.pbkdf2_hmac( + hash_name = 'sha256', + password = self.__masterPassword.encode('UTF-8'), + salt = self.__email.encode('UTF-8'), + iterations = self.KdfIterations + ) + elif self.Kdf == 1: + self.__masterKey = hash_secret_raw( + secret = self.__masterPassword.encode('UTF-8'), + salt = hashlib.sha256(self.__email.encode('UTF-8')).digest(), + time_cost = self.KdfIterations, + memory_cost = self.KdfMemory, + parallelism = self.KdfParallelism, + hash_len = 32, + type = Type.ID + ) + else: + raise UnSupportKdfError("hash password only support Kdf == 0 (PBKDF2_SHA256) or Kdf == 1 (Argon2id)") return self.__masterKey @property def deviceId(self): return self.__devInfo['device_id'] @property - def ident(self): return deepcopy(self.__ident) - @property - def decryptKey(self): return self.__decryptKey - @property - def decryptHmacKey(self): return self.__decryptHmacKey + def ident(self): return self.__ident + @ident.deleter + def ident(self): self.__ident = None @property def decryptPrivateKey(self): return self.__decryptPrivateKey @property @@ -112,22 +128,32 @@ def tokenType(self): return self.__ident['token_type'] @property def accessToken(self): return self.__ident['access_token'] @property - def refreshToken(self): return self.__ident['refresh_token'] + def refreshToken(self): return self.__ident.get('refresh_token', None) @property - def decryptedCiphers(self): return deepcopy(self.__decrypt_ciphers) + def Ciphers(self): return self.__sync.ciphers @property - def decryptedFolders(self): return deepcopy(self.__decrypt_folders) - def __init__(self, baseUrl:str, email:str, masterPassword:str, configPath:str = None) -> None: + def Folders(self): return self.__sync.folders + @property + def Collections(self): return self.__sync.collections + @property + def Domains(self): return self.__sync.domains + @property + def Policies(self): return self.__sync.policies + @property + def Sends(self): return self.__sync.sends + def __init__(self, baseUrl:str, email:str, masterPassword:str, api_scope:str = "api" ,configPath:str = None) -> None: self.__init_vars() self.__baseUrl = baseUrl.rstrip("/") self.__email = email self.__masterPassword = masterPassword + self.__scope = api_scope self.__session = requests.session() - configPath = configPath or os.path.join(os.path.expanduser('~'), ".config", "python-vaultwarden") + baseDomain = urlparse(self.baseUrl).hostname + configPath = configPath or os.path.join(os.path.expanduser('~'), ".config", "python-vaultwarden", baseDomain) if not os.path.exists(configPath): os.makedirs(configPath) self.__deviceInfoFile = os.path.join(configPath, "deviceInfo.json") - self.__syncFile = os.path.join(configPath, "syncData.json") + self.__syncFile = os.path.join(configPath, "syncData.json.gz") self.__idnetFile = os.path.join(configPath, "tokenInfo.json") self.loadDevInfo() self.loadIdent() @@ -140,13 +166,18 @@ def __init_vars(self): self.__session:requests.Session = None self.__masterKey:bytes = None self.__ident:dict = None - self.__decryptKey:bytes = None - self.__decryptHmacKey:bytes = None self.__kdf:dict = None - self.__sync:dict = None - self.__decrypt_folders:Dict[str, str] = None - self.__decrypt_ciphers:Dict[str, dict] = None + self.__sync:SyncResponse = None self.__decryptPrivateKey:str = None + self.__decryptKeys:_t.Dict[str, _t.Tuple[bytes, bytes]] = {} + self.__logger = logging.getLogger(__package__).getChild(type(self).__name__) + + def __enter__(self): + self.fullLogin() + self.sync(86400) + return self + + def __exit__(self, *args, **kwargs): pass def saveDevInfo(self): devInfo = { @@ -166,7 +197,7 @@ def loadDevInfo(self): self.saveDevInfo() def saveIdent(self): - data = self.ident + data = deepcopy(self.ident) data['token_expiry'] = goUTCStrftime(self.__ident['token_expiry']) with open(self.__idnetFile, 'w', encoding='UTF-8') as f: json.dump(data, f, ensure_ascii=False) @@ -181,21 +212,21 @@ def loadIdent(self): self.__ident = None else: self.decryptProfileKey(self.__ident['Key']) - self.__decryptPrivateKey = self.decryptUserData(self.__ident['PrivateKey']) + self.__decryptPrivateKey = self.decryptData(self.__ident['PrivateKey']) except: - pass + self.__logger.debug(format_exc()) - def saveSync(self): - with open(self.__syncFile, 'w', encoding='UTF-8') as f: - json.dump(self.__sync, f, ensure_ascii=False) + def saveSync(self, data): + with gzip.open(self.__syncFile, 'wt', compresslevel=9, encoding='UTF-8') as f: + json.dump(data, f, ensure_ascii=False) def loadSync(self): try: - with open(self.__syncFile, 'r', encoding='UTF-8') as f: - self.__sync = json.load(f) - self.__fullDecrypt() + with gzip.open(self.__syncFile, 'rt', encoding='UTF-8') as f: + self.__sync = SyncResponse(json.load(f)) + self.decryptSync() except: - pass + self.__logger.debug(format_exc()) def _persureResponse(self, resp:requests.Response) -> dict: if resp.status_code != 200: @@ -209,12 +240,23 @@ def account_prelogin(self): return self.__kdf.copy() def getHashedPassword(self) -> str: - if self.Kdf != 0: - raise UnSupportKdfError("hash password only support Kdf == 0 (PBKDF2_SHA256)") - hashlibPwd = b64encode(hashlib.pbkdf2_hmac('sha256', self.masterKey, self.__masterPassword.encode('UTF-8'), 1)).decode() - return hashlibPwd + return b64encode(hashlib.pbkdf2_hmac('sha256', self.masterKey, self.__masterPassword.encode('UTF-8'), 1)).decode() + + def identity_token_credentials(self): + uri = self.baseUrl + '/identity/connect/token' + res = self.session.post(uri, data={ + "grant_type": "client_credentials", + "scope": self.__scope, + "client_id": self.__client_id, + "client_secret": self.__client_secret, + "deviceType": "6", + "deviceName": "python-vaultwarden", + "deviceIdentifier": self.deviceId + }) + self.__ident = self._persureResponse(res) + self.decryptIdent() - def identity_token(self, hashedPassword:str) -> dict: + def identity_token_password(self, hashedPassword:str) -> dict: uri = self.baseUrl + '/identity/connect/token' res = self.session.post(uri, data={ "grant_type": "password", @@ -223,52 +265,49 @@ def identity_token(self, hashedPassword:str) -> dict: "scope": "api offline_access", "client_id": "connector", "deviceType": "6", - "deviceName": "bitwarden-http-api", + "deviceName": "python-vaultwarden", "deviceIdentifier": self.deviceId }) self.__ident = self._persureResponse(res) - self.__ident['token_expiry'] = datetime.now() + timedelta(seconds=self.__ident['expires_in']) - self.saveIdent() - self.decryptProfileKey(self.__ident['Key']) - self.__decryptPrivateKey = self.decryptUserData(self.__ident['PrivateKey']) - return self.ident + self.decryptIdent() - def identity_token_refresh(self, refresh_token:str = None) -> dict: + def identity_token_refresh(self, refresh_token:str = None): uri = self.baseUrl + '/identity/connect/token' res = self.session.post(uri, data={ "grant_type": "refresh_token", "refresh_token": refresh_token or self.refreshToken, }) self.__ident = self._persureResponse(res) - self.__ident['token_expiry'] = datetime.now() + timedelta(seconds=self.__ident['expires_in']) + self.decryptIdent() + + def decryptIdent(self): + if 'token_expiry' in self.__ident: + self.__ident['token_expiry'] = goUTCStrptime(self.__ident['token_expiry']) + else: + self.__ident['token_expiry'] = datetime.now() + timedelta(seconds=self.__ident['expires_in']) self.saveIdent() self.decryptProfileKey(self.__ident['Key']) - self.__decryptPrivateKey = self.decryptUserData(self.__ident['PrivateKey']) - return self.ident + self.__decryptPrivateKey = self.decryptData(self.__ident['PrivateKey'], encoding=False) - def decryptProfileKey(self, data:str) -> Tuple[str, str]: + def decryptProfileKey(self, data:str) -> _t.Tuple[str, str]: from hkdf import hkdf_expand cstr = CipherString(data) decKey = hkdf_expand(self.masterKey, info=b'enc', hash=hashlib.sha256) decMacKey = hkdf_expand(self.masterKey, info=b'mac', hash=hashlib.sha256) dst = decryptWith(cstr, decKey, decMacKey) - if len(dst) == 64: - self.__decryptKey = dst[:32] - self.__decryptHmacKey = dst[32:] - elif len(dst) == 32: - self.__decryptKey = dst - return self.decryptKey, self.decryptHmacKey - - def fullLogin(self, token:str = None) -> bool: + self.__decryptKeys[None] = splitKey(cstr.Type, dst) + + def fullLogin(self, token:str = None): if not token and self.__ident: token = self.refreshToken + if self.__ident and self.__ident['token_expiry'] >= datetime.now(): + return if token: - if self.__ident['token_expiry'] < datetime.now(): - self.identity_token_refresh(token) + self.identity_token_refresh(token) else: self.account_prelogin() hpwd = self.getHashedPassword() - self.identity_token(hpwd) + self.identity_token_password(hpwd) def sync(self, after:int = 0): if (datetime.now() - self.__devInfo['last_sync']).total_seconds() < after: @@ -281,149 +320,85 @@ def sync(self, after:int = 0): "Accept": "application/json", "Authorization": "%s %s" % (self.tokenType, self.accessToken), }) - self.__sync = self._persureResponse(res) + sync = self._persureResponse(res) + self.saveSync(sync) + self.__sync = SyncResponse(sync) + self.decryptSync() self.__devInfo['last_sync'] = datetime.now() - self.__fullDecrypt() self.saveDevInfo() - self.saveSync() - def dump(self) -> dict: - return deepcopy(self.__sync) + def decryptSync(self): + self.decryptProfileKey(self.__sync.profile.key) + self.__decryptPrivateKey = self.decryptData(self.__sync.profile.privateKey, encoding=None) + for org in self.__sync.profile.organizations or []: + self.__decryptKeys[org.id] = splitKey(None, self.decryptData(org.key, encoding=None)) + self.__sync.DecryptAll(self.decryptData) - def decryptUserData(self, data:str): - return decryptWith(CipherString(data), self.decryptKey, self.decryptHmacKey) + def dump(self) -> dict: return self.__sync.toJson() - def __fullDecrypt(self): - if not self.__sync: - return - self.__fullFoldersDecrypt() - self.__fullCiphersDecrypt() - - def __fullFoldersDecrypt(self): - self.__decrypt_folders = {} - for f in self.__sync['Folders']: - name = self.decryptUserData(f['Name']).decode('UTF-8') - self.__decrypt_folders[name] = f['Id'] - - def __fullCiphersDecrypt(self): - def decryptData(d:str) -> str: - if not d: - return d - s = CipherString(d) - if s.Type in [TYPE_AESCBC256_B64, TYPE_AESCBC256_HMACSHA256_B64]: - return decryptWith(s, self.decryptKey, self.decryptHmacKey).decode('UTF-8') - elif s.Type in [ - TYPE_RSA2048_OAEPSHA1_B64, TYPE_RSA2048_OAEPSHA1_HMACSHA256_B64, - TYPE_RSA2048_OAEPSHA256_B64, TYPE_RSA2048_OAEPSHA256_HMACSHA256_B64 - ]: - return rsaDecryptWith(s, self.decryptPrivateKey).decode('UTF-8') - else: - raise DecryptUnSupportTypeError(s.Type) - self.__decrypt_ciphers = {} - for c in self.__sync['Ciphers']: - if c['Type'] == 1: # Login - d = self.__decryptLoginData(c, decryptData) - self.__decrypt_ciphers[d['Name']] = d - elif c['Type'] == 2: # SecureNote - d = self.__decryptSecureNoteData(c, decryptData) - self.__decrypt_ciphers[d['Name']] = d - - @staticmethod - def __decryptLoginData(cipher:dict, decryptFunc:Callable[[str], str]) -> dict: - resp = deepcopy(cipher) - resp['Data']['Name'] = decryptFunc(cipher['Data']['Name']) - resp['Data']['Password'] = decryptFunc(cipher['Data']['Password']) - resp['Data']['Totp'] = decryptFunc(cipher['Data']['Totp']) - resp['Data']['Uri'] = decryptFunc(cipher['Data']['Uri']) - for idx, uri in enumerate(cipher['Data']['Uris'] or []): - resp['Data']['Uris'][idx]['Uri'] = decryptFunc(uri['Uri']) - if cipher['Data']['Fields']: - for idx, field in enumerate(cipher['Data']['Fields'] or []): - resp['Data']['Fields'][idx]['Name'] = decryptFunc(field['Name']) - resp['Data']['Fields'][idx]['Value'] = decryptFunc(field['Value']) - if cipher['Data']['PasswordHistory']: - for idx, his in enumerate(cipher['Data']['PasswordHistory'] or []): - resp['Data']['PasswordHistory'][idx]['Password'] = decryptFunc(his['Password']) - resp['Data']['PasswordHistory'][idx]['LastUsedDate'] = datetime.strptime(his['LastUsedDate'], '%Y-%m-%dT%H:%M:%S.%fZ') - resp['Data']['Username'] = decryptFunc(cipher['Data']['Username']) - resp['Login']['Password'] = decryptFunc(cipher['Login']['Password']) - resp['Login']['Totp'] = decryptFunc(cipher['Login']['Totp']) - resp['Login']['Uri'] = decryptFunc(cipher['Login']['Uri']) - resp['Login']['Username'] = decryptFunc(cipher['Login']['Username']) - for idx, uri in enumerate(cipher['Login']['Uris'] or []): - resp['Login']['Uris'][idx]['Uri'] = decryptFunc(uri['Uri']) - if cipher['Fields']: - for idx, field in enumerate(cipher['Fields'] or []): - resp['Fields'][idx]['Name'] = decryptFunc(field['Name']) - resp['Fields'][idx]['Value'] = decryptFunc(field['Value']) - if cipher['PasswordHistory']: - for idx, his in enumerate(cipher['PasswordHistory'] or []): - resp['PasswordHistory'][idx]['Password'] = decryptFunc(his['Password']) - resp['PasswordHistory'][idx]['LastUsedDate'] = datetime.strptime(his['LastUsedDate'], '%Y-%m-%dT%H:%M:%S.%fZ') - resp['Name'] = decryptFunc(cipher['Name']) - return resp - - @staticmethod - def __decryptSecureNoteData(cipher:dict, decryptFunc:Callable[[str], str]) -> dict: - resp = deepcopy(cipher) - resp['Data']['Name'] = decryptFunc(cipher['Data']['Name']) - resp['Data']['Notes'] = decryptFunc(cipher['Data']['Notes']) - if cipher['Data']['Fields']: - for idx, field in enumerate(cipher['Data']['Fields']): - resp['Data']['Fields'][idx]['Name'] = decryptFunc(field['Name']) - resp['Data']['Fields'][idx]['Value'] = decryptFunc(field['Value']) - resp['Name'] = decryptFunc(cipher['Name']) - resp['Notes'] = decryptFunc(cipher['Notes']) - if cipher['Fields']: - for idx, field in enumerate(cipher['Fields']): - resp['Fields'][idx]['Name'] = decryptFunc(field['Name']) - resp['Fields'][idx]['Value'] = decryptFunc(field['Value']) - return resp - - def searchByName(self, name:str, equal:bool = False): - if not name: - return [] - if name in self.__decrypt_ciphers: - return [self.__decrypt_ciphers[name]] - resp = [] - if equal: - return resp - for cname, d in self.__decrypt_ciphers.items(): - if name in cname: - resp.append(deepcopy(d)) - return resp - - def searchByUserName(self, name:str, equal:bool = False): - if not name: - return [] - resp = [] - for d in self.__decrypt_ciphers.values(): - if d['Type'] == 1 and ( # Login - (name in d['Login']['Username'] and not equal) - or (name == d['Login']['Username'] and equal) - ): - resp.append(deepcopy(d)) - return resp - - def getListByFolder(self, name:str, equal:bool = False): - if not name: - return [] - fids = [] - if equal: - fid = self.__decrypt_folders.get(name, None) - if not fid: - return resp - fids.append(fid) + def decryptData(self, src:str, orgId:str = None, encoding:str = 'UTF-8') -> str: + if not src: + return src + try: + cipher = CipherString(src) + except: + return src + if cipher.Type in EncryptionTypeAes: + self.__logger.debug('orgId: %s', orgId) + dst = decryptWith(cipher, *self.__decryptKeys.get(orgId, self.__decryptKeys[None])) + elif cipher.Type in EncryptionTypeRsa: + dst = rsaDecryptWith(cipher, self.decryptPrivateKey) else: - for fname, d in self.__decrypt_folders.items(): - if name in fname: - fids.append(d) - resp = [] - for d in self.__decrypt_ciphers.values(): - if d['FolderId'] in fids: - resp.append(deepcopy(d)) - return resp - + raise DecryptUnSupportTypeError(cipher.Type) + if encoding: + return dst.decode(encoding) + return dst + + def searchCiphers( + self, id:str = None, folder_id:str = None, folder:str = None, name:str = None, username:str = None, + domain:str = None, domain_type:UriMatchType = UriMatchType.Domain, equal:bool = False, deleted:bool = False + ): + iter = filter(lambda x:not bool(x.deletedDate) or deleted, self.Ciphers) + if id is not None: + iter = filter(lambda x:x.id == id, iter) + elif name is not None: + iter = filter(lambda x:x.name == name, iter) if equal else filter(lambda x:name in x.name, iter) + if folder is not None: + try: + folder_id = next(self.searchFolders(id=folder_id)) + except StopIteration: + folder_id = None + if folder_id is not None: + iter = filter(lambda x:x.folderId == folder_id, iter) + if username is not None: + iter = filter(lambda x:x.type == 1 and x.login.username == username, iter) if equal else filter(lambda x:x.type == 1 and name in x.login.username, iter) + if domain is not None: + iter = filter(domainMatch(domain_type, domain), iter) + yield from iter + + def searchFolders(self, id:str = None, name:str = None, equal:bool = False): + iter = self.Folders + if id is not None: + iter = filter(lambda x:x.id == id, iter) + elif name is not None: + iter = filter(lambda x:x.name == name, iter) if equal else filter(lambda x:name in x.name, iter) + yield from iter + +def splitKey(t:EncryptionType, key:bytes) -> _t.Tuple[bytes, _t.Optional[bytes]]: + if t == None: + if len(key) == 32: + t = EncryptionType.AesCbc256_B64 + elif len(key) == 64: + t = EncryptionType.AesCbc256_HmacSha256_B64 + else: + raise DecryptUnSupportTypeError("Unable to determine encType.") + if t == EncryptionType.AesCbc256_B64 and len(key) == 32: + return key, None + elif t == EncryptionType.AesCbc128_HmacSha256_B64 and len(key) == 32: + return key[:16], key[16:] + elif t == EncryptionType.AesCbc256_HmacSha256_B64 and len(key) == 64: + return key[:32], key[32:] + raise DecryptUnSupportTypeError("Unsupported encType/key length.") def unpadPKCS7(src:bytes, size:int) -> bytes: n = src[-1] @@ -434,15 +409,15 @@ def unpadPKCS7(src:bytes, size:int) -> bytes: return src[:-n] def decryptWith(s:CipherString, key:bytes, macKey:bytes) -> bytes: - if s.Type not in [TYPE_AESCBC256_B64, TYPE_AESCBC256_HMACSHA256_B64]: - raise DecryptUnSupportTypeError() - if s.HasMAC(): + if s.Type not in EncryptionTypeAes: + raise DecryptUnSupportTypeError(s.Type) + if s.HasMAC() and macKey: msg = s.IV + s.CT mac = hmac.new(macKey, digestmod=hashlib.sha256) mac.update(msg) expectedMAC = mac.digest() if not hmac.compare_digest(s.MAC, expectedMAC): - raise DecryptHmacMisMatchError() + raise DecryptHmacMisMatchError(f'{b64encode(s.MAC).decode()} != {b64encode(expectedMAC).decode()}') cipher = Cipher(algorithms.AES(key), modes.CBC(s.IV)) dst = cipher.decryptor() block = dst.update(s.CT) + dst.finalize() @@ -450,18 +425,15 @@ def decryptWith(s:CipherString, key:bytes, macKey:bytes) -> bytes: return data def rsaDecryptWith(s:CipherString, privateKey:bytes) -> bytes: - if s.Type not in [ - TYPE_RSA2048_OAEPSHA1_B64, TYPE_RSA2048_OAEPSHA1_HMACSHA256_B64, - TYPE_RSA2048_OAEPSHA256_B64, TYPE_RSA2048_OAEPSHA256_HMACSHA256_B64 - ]: + if s.Type not in EncryptionTypeRsa: raise DecryptUnSupportTypeError() - pri:rsa.RSAPrivateKey = serialization.load_der_private_key(privateKey) + pri:rsa.RSAPrivateKey = serialization.load_der_private_key(privateKey, None) - if s.Type in [TYPE_RSA2048_OAEPSHA256_B64, TYPE_RSA2048_OAEPSHA256_HMACSHA256_B64]: - oaep = padding.OAEP(mgf=padding.PKCS1v15(), algorithm=hashes.SHA256()) - elif s.Type in [TYPE_RSA2048_OAEPSHA1_B64, TYPE_RSA2048_OAEPSHA1_HMACSHA256_B64]: - oaep = padding.OAEP(mgf=padding.PKCS1v15(), algorithm=hashes.SHA1()) + if s.Type in [EncryptionType.Rsa2048_OaepSha256_B64, EncryptionType.Rsa2048_OaepSha256_HmacSha256_B64]: + oaep = padding.OAEP(mgf=padding.MGF1(algorithm=hashes.SHA256()), algorithm=hashes.SHA256(), label=None) + elif s.Type in [EncryptionType.Rsa2048_OaepSha1_B64, EncryptionType.Rsa2048_OaepSha1_HmacSha256_B64]: + oaep = padding.OAEP(mgf=padding.MGF1(algorithm=hashes.SHA1()), algorithm=hashes.SHA1(), label=None) else: raise DecryptUnSupportTypeError() return pri.decrypt(s.CT, oaep) @@ -473,3 +445,32 @@ def goUTCStrptime(time:str) -> datetime: return datetime.fromtimestamp( datetime.strptime(time[:-3],"%Y-%m-%dT%H:%M:%S.%f").timestamp()+28800 ) + +def domainMatch(mode:int, dst:str) -> _t.Callable[[dict], bool]: + def dmatch(cipher:CipherResponse) -> bool: + if cipher.type != 1: + return False + status = False + for uri in cipher.login.uris or []: + _mode = uri.match or mode + if _mode == UriMatchType.Domain: + srcu = urlparse(uri.uri) + srch = srcu.hostname or srcu.path + src = '.'.join(srch.split('.')[1:] or [srch]) + dstu = urlparse(dst) + dsth = dstu.hostname or dstu.path + _dst = '.'.join(dsth.split('.')[1:] or [dsth]) + status |= _dst == src + elif _mode == UriMatchType.Host: + status |= urlparse(dst).hostname == urlparse(uri.uri).hostname + elif _mode == UriMatchType.Exact: + status |= dst == uri.uri + elif _mode == UriMatchType.StartsWith: + status |= uri.uri.startswith(dst) + elif _mode == UriMatchType.RegularExpression: + status |= re.match(uri.uri, dst) is not None + else: + status |= False + return status + return dmatch + diff --git a/vaultwarden/errors.py b/vaultwarden/errors.py new file mode 100644 index 0000000..9f8ee53 --- /dev/null +++ b/vaultwarden/errors.py @@ -0,0 +1,5 @@ + +class HttpStatusErrror(Exception): pass +class UnSupportKdfError(Exception): pass +class DecryptHmacMisMatchError(Exception): pass +class DecryptUnSupportTypeError(Exception): pass diff --git a/vaultwarden/response.py b/vaultwarden/response.py new file mode 100644 index 0000000..5aafaa2 --- /dev/null +++ b/vaultwarden/response.py @@ -0,0 +1,634 @@ +import typing as _t +from datetime import datetime + +DecryptFunc = _t.Callable[[str, _t.Optional[str]], str] + +def Decrypt(val, fn:DecryptFunc, orgId:str = None): + if isinstance(val, str): + return fn(val, orgId) + elif isinstance(val, list): + for idx, _v in enumerate(val): + val[idx] = Decrypt(_v, fn, orgId) + elif isinstance(val, dict): + for k, v in val.items(): + val[k] = Decrypt(v, fn, orgId) + elif isinstance(val, BaseResponse): + getattr(val, 'DecryptAll')(fn, orgId) + return val + +class BaseResponse(): + __not_decrypt__ = [] + __repr_item__ = [] + def __init__(self, response:dict) -> None: + self.__response = response + + def getResponseProperty(self, propertyName:str, response:dict = None, exactName:bool = False): + if not propertyName: + raise ValueError('propertyName must not be null/empty.') + _response = response or self.__response + if not _response: + return + if not exactName and propertyName not in _response: + otherCasePropertyName:str = None + if propertyName[0] == propertyName[0].upper(): + otherCasePropertyName = propertyName[0].lower() + else: + otherCasePropertyName = propertyName[0].upper() + if len(propertyName) > 1: + otherCasePropertyName += propertyName[1:] + propertyName = otherCasePropertyName + if propertyName not in _response: + propertyName = propertyName.lower() + if propertyName not in _response: + propertyName = propertyName.upper() + return _response.get(propertyName, None) + def toJson(self): return self.__response + def DecryptAll(self, fn:DecryptFunc, orgId:str = None): + _orgId = orgId or self.getResponseProperty('OrganizationId') + for k in dir(self): + if k.startswith('_') or k.endswith('_'): + continue + if k in self.__not_decrypt__: + continue + setattr(self, k, Decrypt(getattr(self, k), fn, _orgId)) + def __str__(self): + return f'<{type(self).__name__}: >' + def __repr__(self): + if not self.__repr_item__: + return super().__repr__() + return f'<{type(self).__name__}: {" ".join(map(lambda x:"%s=%s" % x, map(lambda x:(x,getattr(self, x)), self.__repr_item__)))}>' + +class PermissionsApi(BaseResponse): + accessEventLogs :bool + accessImportExport :bool + accessReports :bool + createNewCollections :bool + editAnyCollection :bool + deleteAnyCollection :bool + editAssignedCollections :bool + deleteAssignedCollections:bool + manageCiphers :bool + manageGroups :bool + manageSso :bool + managePolicies :bool + manageUsers :bool + manageResetPassword :bool + manageScim :bool + def __init__(self, response: dict) -> None: + super().__init__(response) + if not response: + return + self.accessEventLogs = self.getResponseProperty("AccessEventLogs") + self.accessImportExport = self.getResponseProperty("AccessImportExport") + self.accessReports = self.getResponseProperty("AccessReports") + + self.createNewCollections = self.getResponseProperty("CreateNewCollections") + self.editAnyCollection = self.getResponseProperty("EditAnyCollection") + self.deleteAnyCollection = self.getResponseProperty("DeleteAnyCollection") + self.editAssignedCollections = self.getResponseProperty("EditAssignedCollections") + self.deleteAssignedCollections = self.getResponseProperty("DeleteAssignedCollections") + + self.manageCiphers = self.getResponseProperty("ManageCiphers") + self.manageGroups = self.getResponseProperty("ManageGroups") + self.manageSso = self.getResponseProperty("ManageSso") + self.managePolicies = self.getResponseProperty("ManagePolicies") + self.manageUsers = self.getResponseProperty("ManageUsers") + self.manageResetPassword = self.getResponseProperty("ManageResetPassword") + self.manageScim = self.getResponseProperty("ManageScim") + +class ProfileOrganizationResponse(BaseResponse): + id :str + name :str + usePolicies :bool + useGroups :bool + useDirectory :bool + useEvents :bool + useTotp :bool + use2fa :bool + useApi :bool + useSso :bool + useKeyConnector :bool + useScim :bool + useCustomPermissions :bool + useResetPassword :bool + useSecretsManager :bool + usePasswordManager :bool + useActivateAutofillPolicy :bool + selfHost :bool + usersGetPremium :bool + seats :int + maxCollections :int + maxStorageGb :_t.Optional[int] + key :str + hasPublicAndPrivateKeys :bool + status :int + type :int + enabled :bool + ssoBound :bool + identifier :str + permissions :PermissionsApi + resetPasswordEnrolled :bool + userId :str + providerId :str + providerName :str + providerType :_t.Optional[int] + familySponsorshipFriendlyName :str + familySponsorshipAvailable :bool + planProductType :int + keyConnectorEnabled :bool + keyConnectorUrl :str + familySponsorshipLastSyncDate :_t.Optional[datetime] + familySponsorshipValidUntil :_t.Optional[datetime] + familySponsorshipToDelete :_t.Optional[bool] + accessSecretsManager :bool + limitCollectionCreationDeletion :bool + allowAdminAccessToAllCollectionItems:bool + def __init__(self, response: dict) -> None: + super().__init__(response) + self.id = self.getResponseProperty("Id") + self.name = self.getResponseProperty("Name") + self.usePolicies = self.getResponseProperty("UsePolicies") + self.useGroups = self.getResponseProperty("UseGroups") + self.useDirectory = self.getResponseProperty("UseDirectory") + self.useEvents = self.getResponseProperty("UseEvents") + self.useTotp = self.getResponseProperty("UseTotp") + self.use2fa = self.getResponseProperty("Use2fa") + self.useApi = self.getResponseProperty("UseApi") + self.useSso = self.getResponseProperty("UseSso") + self.useKeyConnector = self.getResponseProperty("UseKeyConnector") or False + self.useScim = self.getResponseProperty("UseScim") or False + self.useCustomPermissions = self.getResponseProperty("UseCustomPermissions") or False + self.useResetPassword = self.getResponseProperty("UseResetPassword") + self.useSecretsManager = self.getResponseProperty("UseSecretsManager") + self.usePasswordManager = self.getResponseProperty("UsePasswordManager") + self.useActivateAutofillPolicy = self.getResponseProperty("UseActivateAutofillPolicy") + self.selfHost = self.getResponseProperty("SelfHost") + self.usersGetPremium = self.getResponseProperty("UsersGetPremium") + self.seats = self.getResponseProperty("Seats") + self.maxCollections = self.getResponseProperty("MaxCollections") + self.maxStorageGb = self.getResponseProperty("MaxStorageGb") + self.key = self.getResponseProperty("Key") + self.hasPublicAndPrivateKeys = self.getResponseProperty("HasPublicAndPrivateKeys") + self.status = self.getResponseProperty("Status") + self.type = self.getResponseProperty("Type") + self.enabled = self.getResponseProperty("Enabled") + self.ssoBound = self.getResponseProperty("SsoBound") + self.identifier = self.getResponseProperty("Identifier") + self.permissions = PermissionsApi(self.getResponseProperty("permissions")) + self.resetPasswordEnrolled = self.getResponseProperty("ResetPasswordEnrolled") + self.userId = self.getResponseProperty("UserId") + self.providerId = self.getResponseProperty("ProviderId") + self.providerName = self.getResponseProperty("ProviderName") + self.providerType = self.getResponseProperty("ProviderType") + self.familySponsorshipFriendlyName = self.getResponseProperty("FamilySponsorshipFriendlyName") + self.familySponsorshipAvailable = self.getResponseProperty("FamilySponsorshipAvailable") + self.planProductType = self.getResponseProperty("PlanProductType") + self.keyConnectorEnabled = self.getResponseProperty("KeyConnectorEnabled") or False + self.keyConnectorUrl = self.getResponseProperty("KeyConnectorUrl") + +class ProfileProviderResponse(BaseResponse): + id :str + name :str + key :str + status :int + type :int + enabled :bool + permissions:PermissionsApi + userId :str + useEvents :bool + def __init__(self, response: dict) -> None: + super().__init__(response) + self.id = self.getResponseProperty("Id") + self.name = self.getResponseProperty("Name") + self.key = self.getResponseProperty("Key") + self.status = self.getResponseProperty("Status") + self.type = self.getResponseProperty("Type") + self.enabled = self.getResponseProperty("Enabled") + self.permissions = PermissionsApi(self.getResponseProperty("permissions")) + self.userId = self.getResponseProperty("UserId") + self.useEvents = self.getResponseProperty("UseEvents") + +class ProfileProviderOrganizationResponse(BaseResponse): + def __init__(self, response: dict) -> None: + super().__init__(response) + self.keyConnectorEnabled = False + +class ProfileResponse(BaseResponse): + id :str + name :str + email :str + emailVerified :bool + masterPasswordHint :str + premiumPersonally :bool + premiumFromOrganization:bool + culture :str + twoFactorEnabled :bool + key :str + avatarColor :str + privateKey :str + securityStamp :str + forcePasswordReset :bool + usesKeyConnector :bool + organizations :_t.List[ProfileOrganizationResponse] = [] + providers :_t.List[ProfileProviderResponse] = [] + providerOrganizations :_t.List[ProfileProviderOrganizationResponse] = [] + def __init__(self, response: dict) -> None: + super().__init__(response) + self.id = self.getResponseProperty("Id") + self.name = self.getResponseProperty("Name") + self.email = self.getResponseProperty("Email") + self.emailVerified = self.getResponseProperty("EmailVerified") + self.masterPasswordHint = self.getResponseProperty("MasterPasswordHint") + self.premiumPersonally = self.getResponseProperty("Premium") + self.premiumFromOrganization = self.getResponseProperty("PremiumFromOrganization") + self.culture = self.getResponseProperty("Culture") + self.twoFactorEnabled = self.getResponseProperty("TwoFactorEnabled") + self.key = self.getResponseProperty("Key") + self.avatarColor = self.getResponseProperty("AvatarColor") + self.privateKey = self.getResponseProperty("PrivateKey") + self.securityStamp = self.getResponseProperty("SecurityStamp") + self.forcePasswordReset = self.getResponseProperty("ForcePasswordReset") or False + self.usesKeyConnector = self.getResponseProperty("UsesKeyConnector") or False + organizations = self.getResponseProperty("Organizations") + self.organizations = list(map(ProfileOrganizationResponse, organizations)) if organizations else [] + providers = self.getResponseProperty("Providers") + self.providers = list(map(ProfileProviderResponse, providers)) if providers else [] + providerOrganizations = self.getResponseProperty("ProviderOrganizations") + self.providerOrganizations = list(map(ProfileProviderOrganizationResponse, providerOrganizations)) if providerOrganizations else [] + +class FolderResponse(BaseResponse): + id :str + name :str + revisionDate:str + def __init__(self, response: dict) -> None: + super().__init__(response) + self.id = self.getResponseProperty("Id") + self.name = self.getResponseProperty("Name") + self.revisionDate = self.getResponseProperty("RevisionDate") + +class CollectionResponse(BaseResponse): + id :str + organizationId:str + name :str + externalId :str + def __init__(self, response: dict) -> None: + super().__init__(response) + self.id = self.getResponseProperty("Id") + self.organizationId = self.getResponseProperty("OrganizationId") + self.name = self.getResponseProperty("Name") + self.externalId = self.getResponseProperty("ExternalId") + +class CollectionDetailsResponse(CollectionResponse): + readOnly :bool + manage :bool + hidePasswords:bool + def __init__(self, response: dict) -> None: + super().__init__(response) + self.readOnly = self.getResponseProperty("ReadOnly") or False + self.manage = self.getResponseProperty("Manage") or False + self.hidePasswords = self.getResponseProperty("HidePasswords") or False + +class LoginUriApi(BaseResponse): + uri :str + match:int + __repr_item__ = ['uri'] + def __init__(self, response: dict) -> None: + super().__init__(response) + self.uri = self.getResponseProperty("Uri") + self.match = self.getResponseProperty("Match") + +class Fido2CredentialApi(BaseResponse): + credentialId :str + keyType :str + keyAlgorithm :str + keyCurve :str + keyValue :str + rpId :str + userHandle :str + userName :str + counter :str + rpName :str + userDisplayName:str + discoverable :str + creationDate :str + def __init__(self, response: dict) -> None: + super().__init__(response) + if not response: + return + self.credentialId = self.getResponseProperty("CredentialId") + self.keyType = self.getResponseProperty("KeyType") + self.keyAlgorithm = self.getResponseProperty("KeyAlgorithm") + self.keyCurve = self.getResponseProperty("KeyCurve") + self.keyValue = self.getResponseProperty("keyValue") + self.rpId = self.getResponseProperty("RpId") + self.userHandle = self.getResponseProperty("UserHandle") + self.userName = self.getResponseProperty("UserName") + self.counter = self.getResponseProperty("Counter") + self.rpName = self.getResponseProperty("RpName") + self.userDisplayName = self.getResponseProperty("UserDisplayName") + self.discoverable = self.getResponseProperty("Discoverable") + self.creationDate = self.getResponseProperty("CreationDate") + +class FieldApi(BaseResponse): + type :str + name :str + value :int + linkedId:int + def __init__(self, response: dict) -> None: + super().__init__(response) + if not response: + return + self.type = self.getResponseProperty("Type") + self.name = self.getResponseProperty("Name") + self.value = self.getResponseProperty("Value") + self.linkedId = self.getResponseProperty("linkedId") + +class AttachmentResponse(BaseResponse): + id :str + url :str + fileName:str + key :str + size :str + sizeName:str + def __init__(self, response: dict) -> None: + super().__init__(response) + self.id = self.getResponseProperty("Id") + self.url = self.getResponseProperty("Url") + self.fileName = self.getResponseProperty("FileName") + self.key = self.getResponseProperty("Key") + self.size = self.getResponseProperty("Size") + self.sizeName = self.getResponseProperty("SizeName") + +class PasswordHistoryResponse(BaseResponse): + password :str + lastUsedDate:str + def __init__(self, response: dict) -> None: + super().__init__(response) + self.password = self.getResponseProperty("Password") + self.lastUsedDate = self.getResponseProperty("LastUsedDate") + +class LoginApi(BaseResponse): + uris :_t.List[LoginUriApi] + username :str + password :str + passwordRevisionDate:str + totp :str + autofillOnPageLoad :bool + fido2Credentials:_t.Optional[_t.List[Fido2CredentialApi]] + __repr_item__ = ['username'] + def __init__(self, response: dict) -> None: + super().__init__(response) + self.username = self.getResponseProperty("Username") + self.password = self.getResponseProperty("Password") + self.passwordRevisionDate = self.getResponseProperty("PasswordRevisionDate") + self.totp = self.getResponseProperty("Totp") + self.autofillOnPageLoad = self.getResponseProperty("AutofillOnPageLoad") + uris = self.getResponseProperty("Uris") + self.uris = list(map(LoginUriApi, uris)) if uris else None + fido2Credentials = self.getResponseProperty("Fido2Credentials") + self.fido2Credentials = list(map(Fido2CredentialApi, fido2Credentials)) if fido2Credentials else None + +class CardApi(BaseResponse): + cardholderName:str + brand :str + number :str + expMonth :str + expYear :str + code :str + def __init__(self, response: dict) -> None: + super().__init__(response) + if not response: + return + self.cardholderName = self.getResponseProperty('CardholderName') + self.brand = self.getResponseProperty('Brand') + self.number = self.getResponseProperty('Number') + self.expMonth = self.getResponseProperty('ExpMonth') + self.expYear = self.getResponseProperty('ExpYear') + self.code = self.getResponseProperty('Code') + +class IdentityApi(BaseResponse): + title :str + firstName :str + middleName :str + lastName :str + address1 :str + address2 :str + address3 :str + city :str + state :str + postalCode :str + country :str + company :str + email :str + phone :str + ssn :str + username :str + passportNumber:str + licenseNumber :str + def __init__(self, response: dict) -> None: + super().__init__(response) + if not response: + return + self.title = self.getResponseProperty('Title') + self.firstName = self.getResponseProperty('FirstName') + self.middleName = self.getResponseProperty('MiddleName') + self.lastName = self.getResponseProperty('LastName') + self.address1 = self.getResponseProperty('Address1') + self.address2 = self.getResponseProperty('Address2') + self.address3 = self.getResponseProperty('Address3') + self.city = self.getResponseProperty('City') + self.state = self.getResponseProperty('State') + self.postalCode = self.getResponseProperty('PostalCode') + self.country = self.getResponseProperty('Country') + self.company = self.getResponseProperty('Company') + self.email = self.getResponseProperty('Email') + self.phone = self.getResponseProperty('Phone') + self.ssn = self.getResponseProperty('SSN') + self.username = self.getResponseProperty('Username') + self.passportNumber = self.getResponseProperty('PassportNumber') + self.licenseNumber = self.getResponseProperty('LicenseNumber') + +class SecureNoteApi(BaseResponse): + type:int + def __init__(self, response: dict) -> None: + super().__init__(response) + if not response: + return + self.type = self.getResponseProperty('Type') + +class CipherResponse(BaseResponse): + id :str + organizationId :str + folderId :str + type :int + name :str + notes :str + fields :_t.List[FieldApi] + login :LoginApi + card :CardApi + identity :IdentityApi + secureNote :SecureNoteApi + favorite :bool + edit :bool + viewPassword :bool + organizationUseTotp:bool + revisionDate :str + attachments :_t.List[AttachmentResponse] + passwordHistory :_t.List[PasswordHistoryResponse] + collectionIds :_t.List[str] + creationDate :str + deletedDate :str + reprompt :int + key :str + __repr_item__ = ['name'] + def __init__(self, response:dict) -> None: + super().__init__(response) + self.id = self.getResponseProperty('Id') + self.organizationId = self.getResponseProperty("OrganizationId") + self.folderId = self.getResponseProperty("FolderId") + self.type = self.getResponseProperty("Type") + self.name = self.getResponseProperty("Name") + self.notes = self.getResponseProperty("Notes") + self.favorite = self.getResponseProperty("Favorite") or False + self.edit = not self.getResponseProperty("Edit") + self.viewPassword = self.getResponseProperty("ViewPassword") or True + login = self.getResponseProperty("Login") + self.login = LoginApi(login) if login else None + card = self.getResponseProperty("Card") + self.card = CardApi(card) if card else None + identity = self.getResponseProperty("Identity") + self.identity = IdentityApi(identity) if identity else None + secureNote = self.getResponseProperty("SecureNote") + self.secureNote = SecureNoteApi(secureNote) if secureNote else None + self.organizationUseTotp = self.getResponseProperty("OrganizationUseTotp") + self.revisionDate = self.getResponseProperty("RevisionDate") + self.collectionIds = self.getResponseProperty("CollectionIds") + self.creationDate = self.getResponseProperty("CreationDate") + self.deletedDate = self.getResponseProperty("DeletedDate") + fields = self.getResponseProperty("Fields") + self.fields = list(map(FieldApi, fields)) if fields else None + attachments = self.getResponseProperty("Attachments") + self.attachments = list(map(AttachmentResponse, attachments)) if attachments else None + passwordHistory = self.getResponseProperty("PasswordHistory") + self.passwordHistory = list(map(PasswordHistoryResponse, passwordHistory)) if passwordHistory else None + self.reprompt = self.getResponseProperty("Reprompt") or 0 + self.key = self.getResponseProperty("Key") + +class GlobalDomainResponse(BaseResponse): + type :int + domains :_t.List[str] + excluded:bool + def __init__(self, response: dict) -> None: + super().__init__(response) + self.type = self.getResponseProperty("Type") + self.domains = self.getResponseProperty("Domains") + self.excluded = self.getResponseProperty("Excluded") + +class DomainsResponse(BaseResponse): + equivalentDomains :_t.List[_t.List[str]] + globalEquivalentDomains:_t.List[GlobalDomainResponse] + def __init__(self, response: dict) -> None: + super().__init__(response) + self.equivalentDomains = self.getResponseProperty("EquivalentDomains") + globalEquivalentDomains = self.getResponseProperty("GlobalEquivalentDomains") + self.globalEquivalentDomains = list(map(GlobalDomainResponse, globalEquivalentDomains)) if globalEquivalentDomains else [] + +class PolicyResponse(BaseResponse): + id :str + organizationId:str + type :int + data :_t.Any + enabled :bool + def __init__(self, response: dict) -> None: + super().__init__(response) + self.id = self.getResponseProperty("Id") + self.organizationId = self.getResponseProperty("OrganizationId") + self.type = self.getResponseProperty("Type") + self.data = self.getResponseProperty("Data") + self.enabled = self.getResponseProperty("Enabled") + +class SendFileApi(BaseResponse): + id :str + fileName:str + size :str + sizeName:str + def __init__(self, response: dict) -> None: + super().__init__(response) + self.id = self.getResponseProperty("Id") + self.fileName = self.getResponseProperty("FileName") + self.size = self.getResponseProperty("Size") + self.sizeName = self.getResponseProperty("SizeName") + +class SendTextApi(BaseResponse): + text :str + hidden:bool + def __init__(self, response: dict) -> None: + super().__init__(response) + self.text = self.getResponseProperty("Text") + self.hidden = self.getResponseProperty("Hidden") or False + + +class SendResponse(BaseResponse): + id :str + accessId :str + type :int + name :str + notes :str + file :SendFileApi + text :SendTextApi + key :str + maxAccessCount :_t.Optional[int] + accessCount :int + revisionDate :str + expirationDate :str + deletionDate :str + password :str + disable :bool + hideEmail :bool + def __init__(self, response: dict) -> None: + super().__init__(response) + self.id = self.getResponseProperty("Id") + self.accessId = self.getResponseProperty("AccessId") + self.type = self.getResponseProperty("Type") + self.name = self.getResponseProperty("Name") + self.notes = self.getResponseProperty("Notes") + self.key = self.getResponseProperty("Key") + self.maxAccessCount = self.getResponseProperty("MaxAccessCount") + self.accessCount = self.getResponseProperty("AccessCount") + self.revisionDate = self.getResponseProperty("RevisionDate") + self.expirationDate = self.getResponseProperty("ExpirationDate") + self.deletionDate = self.getResponseProperty("DeletionDate") + self.password = self.getResponseProperty("Password") + self.disable = self.getResponseProperty("Disabled") or False + self.hideEmail = self.getResponseProperty("HideEmail") or False + text = self.getResponseProperty("Text") + self.text = SendTextApi(text) if text else None + + file = self.getResponseProperty("File") + self.file = SendFileApi(file) if file else None + +class SyncResponse(BaseResponse): + profile :_t.Optional[ProfileResponse] + folders :_t.List[FolderResponse] + collections:_t.List[CollectionDetailsResponse] + ciphers :_t.List[CipherResponse] + domains :_t.Optional[DomainsResponse] + policies :_t.List[PolicyResponse] + sends :_t.List[SendResponse] + __not_decrypt__ = ['profile'] + def __init__(self, response: dict) -> None: + super().__init__(response) + profile = self.getResponseProperty("Profile") + self.profile = ProfileResponse(profile) if profile else None + folders = self.getResponseProperty("Folders") + self.folders = list(map(FolderResponse, folders)) if folders else [] + collections = self.getResponseProperty("Collections") + self.collections = list(map(CollectionDetailsResponse, collections)) if collections else [] + ciphers = self.getResponseProperty("Ciphers") + self.ciphers = list(map(CipherResponse, ciphers)) if ciphers else [] + domains = self.getResponseProperty("Domains") + self.domains = DomainsResponse(domains) if domains else None + policies = self.getResponseProperty("Policies") + self.policies = list(map(PolicyResponse, policies)) if policies else [] + sends = self.getResponseProperty("Sends") + self.sends = list(map(SendResponse, sends)) if sends else [] diff --git a/vaultwarden/types.py b/vaultwarden/types.py new file mode 100644 index 0000000..4843099 --- /dev/null +++ b/vaultwarden/types.py @@ -0,0 +1,31 @@ +from enum import Enum + +class EncryptionType(Enum): + AesCbc256_B64 = 0 + AesCbc128_HmacSha256_B64 = 1 + AesCbc256_HmacSha256_B64 = 2 + Rsa2048_OaepSha256_B64 = 3 + Rsa2048_OaepSha1_B64 = 4 + Rsa2048_OaepSha256_HmacSha256_B64 = 5 + Rsa2048_OaepSha1_HmacSha256_B64 = 6 + +EncryptionTypeAes = [ + EncryptionType.AesCbc256_B64, + EncryptionType.AesCbc128_HmacSha256_B64, + EncryptionType.AesCbc256_HmacSha256_B64 +] + +EncryptionTypeRsa = [ + EncryptionType.Rsa2048_OaepSha1_B64, + EncryptionType.Rsa2048_OaepSha1_HmacSha256_B64, + EncryptionType.Rsa2048_OaepSha256_B64, + EncryptionType.Rsa2048_OaepSha256_HmacSha256_B64 +] + +class UriMatchType(Enum): + Domain = 0 + Host = 1 + StartsWith = 2 + Exact = 3 + RegularExpression = 4 + Never = 5 \ No newline at end of file