diff --git a/README.rst b/README.rst index 8e1def1..a5c54ab 100644 --- a/README.rst +++ b/README.rst @@ -138,7 +138,7 @@ environment --environment Output command to set ENV v ------------- --------------- ---------------------- ---------------------------------------- silent --silent Silence Info output ------------- --------------- ---------------------- ---------------------------------------- -factor --factor AWS_OKTA_FACTOR MFA type. `push:okta` and `token:software:totp:okta` supported. +factor --factor AWS_OKTA_FACTOR MFA type. `push:okta`, `token:software:totp:okta`, `token:software:totp:google` and `token:hardware:yubico` are supported. ------------- --------------- ---------------------- ---------------------------------------- no_okta_cache --no-okta-cache AWS_OKTA_NO_OKTA_CACHE Do not read okta cache ------------- --------------- ---------------------- ---------------------------------------- diff --git a/src/aws_okta_processor/__init__.py b/src/aws_okta_processor/__init__.py index 0bb84ff..bcd8d54 100644 --- a/src/aws_okta_processor/__init__.py +++ b/src/aws_okta_processor/__init__.py @@ -1 +1 @@ -__version__ = '1.5.3' +__version__ = '1.6.0' diff --git a/src/aws_okta_processor/core/okta.py b/src/aws_okta_processor/core/okta.py index a1c0c97..d49c42f 100644 --- a/src/aws_okta_processor/core/okta.py +++ b/src/aws_okta_processor/core/okta.py @@ -326,6 +326,7 @@ def get_supported_factors(factors=None): for factor in factors: try: supported_factor = FactorBase.factory(factor["factorType"]) + key = '{}:{}'.format( factor["factorType"], factor["provider"]).lower() matching_factors[key] = supported_factor( @@ -362,6 +363,7 @@ def send_error(response=None, json=True, exit=True): class FactorType: PUSH = "push" TOTP = "token:software:totp" + HARDWARE = "token:hardware" @add_metaclass(abc.ABCMeta) @@ -418,3 +420,18 @@ def payload(): def retry(self, response): return False + + +class FactorHardwareToken(FactorBase): + factor = FactorType.HARDWARE + + def __init__(self, link=None): + super(FactorHardwareToken, self).__init__(link=link) + + @staticmethod + def payload(): + print_tty("Hardware Token: ", newline=False) + return {"passCode": input()} + + def retry(self, response): + return False diff --git a/tests/AUTH_MFA_MULTIPLE_RESPONSE b/tests/AUTH_MFA_MULTIPLE_RESPONSE index d00cbf1..aee42e5 100644 --- a/tests/AUTH_MFA_MULTIPLE_RESPONSE +++ b/tests/AUTH_MFA_MULTIPLE_RESPONSE @@ -1,5 +1,6 @@ {"status":"MFA_REQUIRED", "stateToken": "state_token", "_embedded": {"factors": [ {"factorType": "token:software:totp", "provider": "GOOGLE", "_links": {"verify": {"href": "https://organization.okta.com/api/v1/authn/factors/id/verify"}}}, {"factorType": "push", "provider": "OKTA", "_links": {"verify": {"href": "https://organization.okta.com/api/v1/authn/factors/id/verify"}}}, - {"factorType": "token:software:totp", "provider": "OKTA", "_links": {"verify": {"href": "https://organization.okta.com/api/v1/authn/factors/id/verify"}}} + {"factorType": "token:software:totp", "provider": "OKTA", "_links": {"verify": {"href": "https://organization.okta.com/api/v1/authn/factors/id/verify"}}}, + {"factorType": "token:hardware", "provider": "YUBICO", "_links": {"verify": {"href": "https://organization.okta.com/api/v1/authn/factors/id/verify"}}} ]}} \ No newline at end of file diff --git a/tests/AUTH_MFA_YUBICO_HARDWARE_RESPONSE b/tests/AUTH_MFA_YUBICO_HARDWARE_RESPONSE new file mode 100644 index 0000000..38e3a7a --- /dev/null +++ b/tests/AUTH_MFA_YUBICO_HARDWARE_RESPONSE @@ -0,0 +1 @@ +{"status":"MFA_REQUIRED", "stateToken": "state_token", "_embedded": {"factors": [{"factorType": "token:hardware", "provider": "YUBICO", "_links": {"verify": {"href": "https://organization.okta.com/api/v1/authn/factors/id/verify"}}}]}} \ No newline at end of file diff --git a/tests/core/test_okta.py b/tests/core/test_okta.py index e30b98d..a787238 100644 --- a/tests/core/test_okta.py +++ b/tests/core/test_okta.py @@ -4,6 +4,7 @@ from tests.test_base import AUTH_MFA_PUSH_RESPONSE from tests.test_base import AUTH_MFA_TOTP_RESPONSE from tests.test_base import AUTH_MFA_MULTIPLE_RESPONSE +from tests.test_base import AUTH_MFA_YUBICO_HARDWARE_RESPONSE from tests.test_base import MFA_WAITING_RESPONSE from tests.test_base import APPLICATIONS_RESPONSE from tests.test_base import SAML_RESPONSE @@ -288,6 +289,50 @@ def test_okta_mfa_totp_challenge( self.assertEqual(okta.organization, "organization.okta.com") self.assertEqual(okta.okta_session_id, "session_token") + @patch('aws_okta_processor.core.okta.input') + @patch('aws_okta_processor.core.okta.os.chmod') + @patch('aws_okta_processor.core.okta.open') + @patch('aws_okta_processor.core.okta.os.makedirs') + @patch('aws_okta_processor.core.okta.print_tty') + @responses.activate + def test_okta_mfa_hardware_token_challenge( + self, + mock_print_tty, + mock_makedirs, + mock_open, + mock_chmod, + mock_input + ): + mock_input.return_value = "123456" + + responses.add( + responses.POST, + 'https://organization.okta.com/api/v1/authn', + json=json.loads(AUTH_MFA_YUBICO_HARDWARE_RESPONSE) + ) + + responses.add( + responses.POST, + 'https://organization.okta.com/api/v1/authn/factors/id/verify', + json=json.loads(AUTH_TOKEN_RESPONSE) + ) + + responses.add( + responses.POST, + 'https://organization.okta.com/api/v1/sessions', + json=json.loads(SESSION_RESPONSE) + ) + + okta = Okta( + user_name="user_name", + user_pass="user_pass", + organization="organization.okta.com" + ) + + self.assertEqual(okta.okta_single_use_token, "single_use_token") + self.assertEqual(okta.organization, "organization.okta.com") + self.assertEqual(okta.okta_session_id, "session_token") + @patch('aws_okta_processor.core.prompt.input') @patch('aws_okta_processor.core.okta.os.chmod') @patch('aws_okta_processor.core.okta.open') diff --git a/tests/test_base.py b/tests/test_base.py index 44b9907..5111ce4 100644 --- a/tests/test_base.py +++ b/tests/test_base.py @@ -16,6 +16,8 @@ AUTH_MFA_PUSH_RESPONSE = open(AUTH_MFA_PUSH_RESPONSE_PATH, 'r').read() AUTH_MFA_TOTP_RESPONSE_PATH = os.path.join(ABS_PATH, "AUTH_MFA_TOTP_RESPONSE") AUTH_MFA_TOTP_RESPONSE = open(AUTH_MFA_TOTP_RESPONSE_PATH, 'r').read() +AUTH_MFA_YUBICO_HARDWARE_RESPONSE_PATH = os.path.join(ABS_PATH, "AUTH_MFA_YUBICO_HARDWARE_RESPONSE") +AUTH_MFA_YUBICO_HARDWARE_RESPONSE = open(AUTH_MFA_YUBICO_HARDWARE_RESPONSE_PATH, 'r').read() AUTH_MFA_MULTIPLE_RESPONSE_PATH = os.path.join(ABS_PATH, "AUTH_MFA_MULTIPLE_RESPONSE") AUTH_MFA_MULTIPLE_RESPONSE = open(AUTH_MFA_MULTIPLE_RESPONSE_PATH, 'r').read() MFA_WAITING_RESPONSE_PATH = os.path.join(ABS_PATH, "MFA_WAITING_RESPONSE")