Skip to content

Commit

Permalink
fix: basic boot password should be able to contain colon characters
Browse files Browse the repository at this point in the history
  • Loading branch information
lenkan committed Jan 13, 2025
1 parent f4a21b3 commit 4f311cd
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 43 deletions.
25 changes: 17 additions & 8 deletions src/keria/app/agenting.py
Original file line number Diff line number Diff line change
Expand Up @@ -884,6 +884,22 @@ def __init__(self, agency: Agency, username: str | None = None, password: str |
self.password = password
self.agency = agency

def parseBasicAuth(self, req: falcon.Request):
schemePrefix = 'Basic '
if req.auth is None or not req.auth.startswith(schemePrefix):
return None, None

token = b64decode(req.auth[len(schemePrefix):]).decode('utf-8')
splitIndex = token.find(':')
if splitIndex == -1:
return None, None

username = token[:splitIndex]
password = token[splitIndex + 1:]

return username, password


def authenticate(self, req: falcon.Request):
# Username AND Password is not set, so no need to authenticate
if self.username is None and self.password is None:
Expand All @@ -892,15 +908,8 @@ def authenticate(self, req: falcon.Request):
if req.auth is None:
raise falcon.HTTPUnauthorized(title="Unauthorized")

scheme, token = req.auth.split(' ')
if scheme != 'Basic':
raise falcon.HTTPUnauthorized(title="Unauthorized")

try:
username, password = b64decode(token).decode('utf-8').split(':')

if username is None or password is None:
raise falcon.HTTPUnauthorized(title="Unauthorized")
username, password = self.parseBasicAuth(req)

if username == self.username and password == self.password:
return
Expand Down
4 changes: 2 additions & 2 deletions src/keria/testing/testing_helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,8 +323,8 @@ def server(agency, httpPort=3902):
return httpServerDoer

@staticmethod
def controller():
serder, signers = Helpers.incept(bran=b'0123456789abcdefghijk', stem="signify:controller", pidx=0)
def controller(bran=b'0123456789abcdefghijk'):
serder, signers = Helpers.incept(bran=bran, stem="signify:controller", pidx=0)
sigers = [signers[0].sign(ser=serder.raw, index=0)]
return serder, sigers

Expand Down
73 changes: 40 additions & 33 deletions tests/app/test_agenting.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,52 +283,59 @@ def test_unprotected_boot_ends(helpers):
}

def test_protected_boot_ends(helpers):
agency = agenting.Agency(name="agency", bran=None, temp=True)
doist = doing.Doist(limit=1.0, tock=0.03125, real=True)
doist.enter(doers=[agency])
credentials = [
dict(bran=b'0123456789abcdefghija', username="user", password="secret"),
dict(bran=b'0123456789abcdefghijb', username="admin", password="secret with spaces"),
dict(bran=b'0123456789abcdefghijc', username="admin", password="secret : with colon")
]

serder, sigers = helpers.controller()
assert serder.pre == helpers.controllerAID
for credential in credentials:
bran = credential["bran"]
username = credential["username"]
password = credential["password"]

app = falcon.App()
client = testing.TestClient(app)
agency = agenting.Agency(name="agency", bran=None, temp=True)
doist = doing.Doist(limit=1.0, tock=0.03125, real=True)
doist.enter(doers=[agency])

username = "user"
password = "secret"
serder, sigers = helpers.controller(bran=bran)

bootEnd = agenting.BootEnd(agency, username=username, password=password)
app.add_route("/boot", bootEnd)
app = falcon.App()
client = testing.TestClient(app)

body = dict(
icp=serder.ked,
sig=sigers[0].qb64,
salty=dict(
stem='signify:aid', pidx=0, tier='low', sxlt='OBXYZ',
icodes=[MtrDex.Ed25519_Seed], ncodes=[MtrDex.Ed25519_Seed]
bootEnd = agenting.BootEnd(agency, username=username, password=password)
app.add_route("/boot", bootEnd)

body = dict(
icp=serder.ked,
sig=sigers[0].qb64,
salty=dict(
stem='signify:aid', pidx=0, tier='low', sxlt='OBXYZ',
icodes=[MtrDex.Ed25519_Seed], ncodes=[MtrDex.Ed25519_Seed]
)
)
)

rep = client.simulate_post("/boot", body=json.dumps(body).encode("utf-8"))
assert rep.status_code == 401
rep = client.simulate_post("/boot", body=json.dumps(body).encode("utf-8"))
assert rep.status_code == 401

rep = client.simulate_post("/boot", body=json.dumps(body).encode("utf-8"), headers={"Authorization": "Something test"})
assert rep.status_code == 401
rep = client.simulate_post("/boot", body=json.dumps(body).encode("utf-8"), headers={"Authorization": "Something test"})
assert rep.status_code == 401

rep = client.simulate_post("/boot", body=json.dumps(body).encode("utf-8"), headers={"Authorization": "Basic user:secret"})
assert rep.status_code == 401
rep = client.simulate_post("/boot", body=json.dumps(body).encode("utf-8"), headers={"Authorization": f"Basic {username}:{password}"})
assert rep.status_code == 401

rep = client.simulate_post("/boot", body=json.dumps(body).encode("utf-8"), headers={"Authorization": f"Basic {b64encode(b'test:secret').decode('utf-8')}"} )
assert rep.status_code == 401
rep = client.simulate_post("/boot", body=json.dumps(body).encode("utf-8"), headers={"Authorization": f"Basic {b64encode(bytes(f'test:{password}', 'utf-8')).decode('utf-8')}"} )
assert rep.status_code == 401

rep = client.simulate_post("/boot", body=json.dumps(body).encode("utf-8"), headers={"Authorization": f"Basic {b64encode(b'user').decode('utf-8')}"} )
assert rep.status_code == 401
rep = client.simulate_post("/boot", body=json.dumps(body).encode("utf-8"), headers={"Authorization": f"Basic {b64encode(bytes(f'{username}', 'utf-8')).decode('utf-8')}"} )
assert rep.status_code == 401

rep = client.simulate_post("/boot", body=json.dumps(body).encode("utf-8"), headers={"Authorization": f"Basic {b64encode(b'user:test').decode('utf-8')}"} )
assert rep.status_code == 401
rep = client.simulate_post("/boot", body=json.dumps(body).encode("utf-8"), headers={"Authorization": f"Basic {b64encode(bytes(f'{username}:test', 'utf-8')).decode('utf-8')}"} )
assert rep.status_code == 401

authorization = f"Basic {b64encode(b'user:secret').decode('utf-8')}"
rep = client.simulate_post("/boot", body=json.dumps(body).encode("utf-8"), headers={"Authorization": authorization})
assert rep.status_code == 202
authorization = f"Basic {b64encode(bytes(f'{username}:{password}', 'utf-8')).decode('utf-8')}"
rep = client.simulate_post("/boot", body=json.dumps(body).encode("utf-8"), headers={"Authorization": authorization})
assert rep.status_code == 202

def test_misconfigured_protected_boot_ends(helpers):
agency = agenting.Agency(name="agency", bran=None, temp=True)
Expand Down

0 comments on commit 4f311cd

Please sign in to comment.