diff --git a/src/keria/app/agenting.py b/src/keria/app/agenting.py index 1fc8a210..b21b38e5 100644 --- a/src/keria/app/agenting.py +++ b/src/keria/app/agenting.py @@ -884,6 +884,22 @@ def __init__(self, agency: Agency, username: str | None = None, password: str | self.password = password self.agency = agency + def parseBasicAuth(self, req: falcon.Request): + schemePrefix = 'Basic ' + if req.auth is None or not req.auth.startswith(schemePrefix): + return None, None + + token = b64decode(req.auth[len(schemePrefix):]).decode('utf-8') + splitIndex = token.find(':') + if splitIndex == -1: + return None, None + + username = token[:splitIndex] + password = token[splitIndex + 1:] + + return username, password + + def authenticate(self, req: falcon.Request): # Username AND Password is not set, so no need to authenticate if self.username is None and self.password is None: @@ -892,15 +908,8 @@ def authenticate(self, req: falcon.Request): if req.auth is None: raise falcon.HTTPUnauthorized(title="Unauthorized") - scheme, token = req.auth.split(' ') - if scheme != 'Basic': - raise falcon.HTTPUnauthorized(title="Unauthorized") - try: - username, password = b64decode(token).decode('utf-8').split(':') - - if username is None or password is None: - raise falcon.HTTPUnauthorized(title="Unauthorized") + username, password = self.parseBasicAuth(req) if username == self.username and password == self.password: return diff --git a/src/keria/testing/testing_helper.py b/src/keria/testing/testing_helper.py index 2337be45..7464d489 100644 --- a/src/keria/testing/testing_helper.py +++ b/src/keria/testing/testing_helper.py @@ -323,8 +323,8 @@ def server(agency, httpPort=3902): return httpServerDoer @staticmethod - def controller(): - serder, signers = Helpers.incept(bran=b'0123456789abcdefghijk', stem="signify:controller", pidx=0) + def controller(bran=b'0123456789abcdefghijk'): + serder, signers = Helpers.incept(bran=bran, stem="signify:controller", pidx=0) sigers = [signers[0].sign(ser=serder.raw, index=0)] return serder, sigers diff --git a/tests/app/test_agenting.py b/tests/app/test_agenting.py index e0338f38..996b0fda 100644 --- a/tests/app/test_agenting.py +++ b/tests/app/test_agenting.py @@ -283,52 +283,59 @@ def test_unprotected_boot_ends(helpers): } def test_protected_boot_ends(helpers): - agency = agenting.Agency(name="agency", bran=None, temp=True) - doist = doing.Doist(limit=1.0, tock=0.03125, real=True) - doist.enter(doers=[agency]) + credentials = [ + dict(bran=b'0123456789abcdefghija', username="user", password="secret"), + dict(bran=b'0123456789abcdefghijb', username="admin", password="secret with spaces"), + dict(bran=b'0123456789abcdefghijc', username="admin", password="secret : with colon") + ] - serder, sigers = helpers.controller() - assert serder.pre == helpers.controllerAID + for credential in credentials: + bran = credential["bran"] + username = credential["username"] + password = credential["password"] - app = falcon.App() - client = testing.TestClient(app) + agency = agenting.Agency(name="agency", bran=None, temp=True) + doist = doing.Doist(limit=1.0, tock=0.03125, real=True) + doist.enter(doers=[agency]) - username = "user" - password = "secret" + serder, sigers = helpers.controller(bran=bran) - bootEnd = agenting.BootEnd(agency, username=username, password=password) - app.add_route("/boot", bootEnd) + app = falcon.App() + client = testing.TestClient(app) - body = dict( - icp=serder.ked, - sig=sigers[0].qb64, - salty=dict( - stem='signify:aid', pidx=0, tier='low', sxlt='OBXYZ', - icodes=[MtrDex.Ed25519_Seed], ncodes=[MtrDex.Ed25519_Seed] + bootEnd = agenting.BootEnd(agency, username=username, password=password) + app.add_route("/boot", bootEnd) + + body = dict( + icp=serder.ked, + sig=sigers[0].qb64, + salty=dict( + stem='signify:aid', pidx=0, tier='low', sxlt='OBXYZ', + icodes=[MtrDex.Ed25519_Seed], ncodes=[MtrDex.Ed25519_Seed] + ) ) - ) - rep = client.simulate_post("/boot", body=json.dumps(body).encode("utf-8")) - assert rep.status_code == 401 + rep = client.simulate_post("/boot", body=json.dumps(body).encode("utf-8")) + assert rep.status_code == 401 - rep = client.simulate_post("/boot", body=json.dumps(body).encode("utf-8"), headers={"Authorization": "Something test"}) - assert rep.status_code == 401 + rep = client.simulate_post("/boot", body=json.dumps(body).encode("utf-8"), headers={"Authorization": "Something test"}) + assert rep.status_code == 401 - rep = client.simulate_post("/boot", body=json.dumps(body).encode("utf-8"), headers={"Authorization": "Basic user:secret"}) - assert rep.status_code == 401 + rep = client.simulate_post("/boot", body=json.dumps(body).encode("utf-8"), headers={"Authorization": f"Basic {username}:{password}"}) + assert rep.status_code == 401 - rep = client.simulate_post("/boot", body=json.dumps(body).encode("utf-8"), headers={"Authorization": f"Basic {b64encode(b'test:secret').decode('utf-8')}"} ) - assert rep.status_code == 401 + rep = client.simulate_post("/boot", body=json.dumps(body).encode("utf-8"), headers={"Authorization": f"Basic {b64encode(bytes(f'test:{password}', 'utf-8')).decode('utf-8')}"} ) + assert rep.status_code == 401 - rep = client.simulate_post("/boot", body=json.dumps(body).encode("utf-8"), headers={"Authorization": f"Basic {b64encode(b'user').decode('utf-8')}"} ) - assert rep.status_code == 401 + rep = client.simulate_post("/boot", body=json.dumps(body).encode("utf-8"), headers={"Authorization": f"Basic {b64encode(bytes(f'{username}', 'utf-8')).decode('utf-8')}"} ) + assert rep.status_code == 401 - rep = client.simulate_post("/boot", body=json.dumps(body).encode("utf-8"), headers={"Authorization": f"Basic {b64encode(b'user:test').decode('utf-8')}"} ) - assert rep.status_code == 401 + rep = client.simulate_post("/boot", body=json.dumps(body).encode("utf-8"), headers={"Authorization": f"Basic {b64encode(bytes(f'{username}:test', 'utf-8')).decode('utf-8')}"} ) + assert rep.status_code == 401 - authorization = f"Basic {b64encode(b'user:secret').decode('utf-8')}" - rep = client.simulate_post("/boot", body=json.dumps(body).encode("utf-8"), headers={"Authorization": authorization}) - assert rep.status_code == 202 + authorization = f"Basic {b64encode(bytes(f'{username}:{password}', 'utf-8')).decode('utf-8')}" + rep = client.simulate_post("/boot", body=json.dumps(body).encode("utf-8"), headers={"Authorization": authorization}) + assert rep.status_code == 202 def test_misconfigured_protected_boot_ends(helpers): agency = agenting.Agency(name="agency", bran=None, temp=True)