From 893d3542e6cb75a6ae99d8e5fb7d9f9b2f5a0512 Mon Sep 17 00:00:00 2001 From: bcaselden-viasat Date: Mon, 11 Mar 2024 16:09:30 -0400 Subject: [PATCH 1/3] add duo univ prompt support --- alohomora/req.py | 324 ++++++++++++++++++++++++++++++----------------- 1 file changed, 210 insertions(+), 114 deletions(-) diff --git a/alohomora/req.py b/alohomora/req.py index 85602ff..01dc45d 100644 --- a/alohomora/req.py +++ b/alohomora/req.py @@ -366,9 +366,199 @@ def login_one_factor(self, username, password): return (True, assertion) return (False, response) + def _get_duo_plugin_payload(self, soup, post_url, auth_device): + ''' Get the DUO plugin payload ''' + payload = { inputtag.get('name', ''): inputtag.get('value', '') + for inputtag in soup.find_all('input') } + + xsrf = payload.get('_xsrf', '') + + # Post data to emulate the plugin determination + (response, soup) = self._do_post(post_url, data=payload) + + sid = unquote(urlparse.urlparse(response.request.url).query[4:]) + new_action = self._get_form_action(soup) + device = self._get_duo_device(soup, auth_device) + factor = self._get_auth_factor(soup, device) + + do_wa = device.value.startswith('WA') + plugin_payload = { + 'sid': sid, + 'device': device.value, + 'factor': factor.name, + 'out_of_date': '' + } + + if factor.name == "Passcode": + plugin_payload['passcode'] = factor.value + + if do_wa: + # configure webauthn plugin info + plugin_payload['factor'] = factor.name if ( + device.name != "Security Key (U2F)" + and not device.value.startswith('WA')) else "WebAuthn Credential" + + return (plugin_payload, xsrf, new_action) + + def _wait_for_duo_status(self, allowed, duo_status_endpoint, sid, txid): + ''' Poll Duo for MFA status ''' + while not allowed: + # call again to get status of request + # for a push notification, this will hang until the user approves/denies + # for a phone call, you need to keep polling until the user approves/denies + (status, _) = self._do_post(duo_status_endpoint, + data={'sid': sid, 'txid': txid}, soup=False) + status_data = json.loads(status.text) + + if status_data['stat'] != 'OK': + LOG.error(f"Returned from second status call: {status.text}") + alohomora.die("Sorry, there was a problem talking to Duo.") + if status_data['response']['status_code'] == 'allow': + LOG.info("Login allowed!") + allowed = True + elif status_data['response']['status_code'] == 'deny': + LOG.error(f"Login disallowed: {status.text}") + alohomora.die("The login was blocked!") + else: + print(f"Still waiting... ({status_data['response']['status_code']})") + LOG.info(f"Still waiting... ({status_data['response']['status_code']})") + LOG.debug(str(status_data)) + time.sleep(2) + + return status_data + + def _process_webauthn_request(self, status_data, plugin_payload, duo_host, sid, iframe=False): + ''' Wait for webauthn device to be approved ''' + if not iframe: + duo_prompt_endpoint = f'https://{duo_host}/frame/v4/prompt' + duo_status_endpoint = f'https://{duo_host}/frame/v4/status' + else: + duo_prompt_endpoint = f'https://{duo_host}/frame/prompt' + duo_status_endpoint = f'https://{duo_host}/frame/status' + opts = status_data['response']['webauthn_credential_request_options'] + challenges = [r for r in opts['allowCredentials'] + if self._validate_webauthn_request(duo_host, opts['extensions']['appid'])] + if not challenges: + alohomora.die('Sorry, there was a problem talking to Duo.') + resp = self._get_webauthn_response(opts) + LOG.debug(resp) + + # include the session ID as passed to us earlier + plugin_payload['sid'] = sid + # webauthn_credential and webauthn_finish are magic strings here + plugin_payload['device'] = 'webauthn_credential' + plugin_payload['factor'] = 'webauthn_finish' + # these are a copy/paste from the duo integration's POST data + plugin_payload['out_of_date'] = None + plugin_payload['days_out_of_date'] = None + plugin_payload['days_to_block'] = 'None' + # finally, the response data itself needs to be a JSON string + plugin_payload['response_data'] = json.dumps(resp,separators=(',', ':')) + + LOG.debug(plugin_payload) + + (status, _) = self._do_post(duo_prompt_endpoint, data=plugin_payload, + soup=False) + status_data = json.loads(status.text) + # Response is of form + # {"stat": "OK", "response": {"txid": "f95cbacc-151c-43a6-b462-b33420e72633"}} + txid = json.loads(status.text)['response']['txid'] + LOG.debug("Received transaction ID %s from response %s", txid, status.text) + + # Initial call will NOT block + (status, _) = self._do_post(duo_status_endpoint, + data={'sid': sid, 'txid': txid}, soup=False) + status_data = json.loads(status.text) + LOG.info(str(status_data)) + if status_data['stat'] != 'OK': + LOG.error("Returned from inital status call: %s", status.text) + alohomora.die("Sorry, there was a problem talking to Duo.") + allowed = status_data['response']['status_code'] == 'allow' + if not allowed: + alohomora.die("Sorry, there was a problem with your security key, try again.") + + factor_name = 'webauthn_finish' + return (factor_name, txid, allowed, status_data) + def login_two_factor(self, response_1fa, auth_device=None): """Log in with the second factor, borrowing first factor data if necessary""" + # If redirected to duosecurity we are doing the oidc flow + if urlparse.urlparse(response_1fa.url).netloc.endswith('duosecurity.com'): + LOG.debug("Using DUO universal prompt") + return self._login_two_factor_duo_univ(response_1fa, auth_device) + else: + LOG.debug("Using DUO iframe prompt") + return self._login_two_factor_iframe(response_1fa, auth_device) + + def _login_two_factor_duo_univ(self, response_1fa, auth_device=None): + """Log in with the second factor, borrowing first factor data if necessary""" + soup = BeautifulSoup(response_1fa.content, 'html.parser') + post_url = response_1fa.url + duo_host = urlparse.urlparse(post_url).netloc + + (plugin_payload, xsrf, _) = self._get_duo_plugin_payload(soup, post_url, auth_device) + sid = plugin_payload['sid'] + factor_name = plugin_payload['factor'] + + # Start the Duo Auth request + (status, _) = self._do_post(f'https://{duo_host}/frame/v4/prompt', + data=plugin_payload, + soup=False) + + # Response is of form + # {"stat": "OK", "response": {"txid": "f95cbacc-151c-43a6-b462-b33420e72633"}} + response = json.loads(status.text)['response'] + txid = response['txid'] + + LOG.debug("Received response: %s", status.text) + LOG.debug("Received response %s", response) + LOG.debug("Received transaction ID %s", txid) + + # Initial call will NOT block + (status, _) = self._do_post(f'https://{duo_host}/frame/v4/status', + data={'sid': sid, 'txid': txid}, soup=False) + + status_data = json.loads(status.text) + if status_data['stat'] != 'OK': + LOG.error("Returned from inital status call: %s", status.text) + alohomora.die("Sorry, there was a problem talking to Duo.") + allowed = status_data['response']['status_code'] == 'allow' + + # If not immediately approved, poll for status + if not allowed: + # there should never be a case where `allowed` is True if the user picked Security Key + if status_data['response']['status_code'] == 'webauthn_sent': + (factor_name, txid, allowed, _) = self._process_webauthn_request( + status_data, + plugin_payload, + duo_host, + sid) + else: + self._wait_for_duo_status(allowed, + f'https://{duo_host}/frame/v4/status', + sid, + txid) + + payload = { + "sid": sid, + "txid": txid, + "factor": factor_name, + "device_key": "", + "_xsrf": xsrf, + "dampen_choice": False, + } + + duo_exit_url = f'https://{duo_host}/frame/v4/oidc/exit' + + (response, soup) = self._do_post(duo_exit_url, + data=payload, soup=True) + + assertion = self._get_assertion(soup) + return (True, assertion) + + def _login_two_factor_iframe(self, response_1fa, auth_device=None): + """Log in with the second factor, borrowing first factor data if necessary""" soup_1fa = BeautifulSoup(response_1fa.text, 'html.parser') duo_host = None sig_request = None @@ -394,68 +584,24 @@ def login_two_factor(self, response_1fa, auth_device=None): 'Origin': origin_duo_host }) - payload = {} - for inputtag in soup.find_all('input'): - name = inputtag.get('name', '') - value = inputtag.get('value', '') - # Populate all parameters with the existing value (picks up hidden fields too) - payload[name] = value - - # Post data to emulate the plugin determination - LOG.info('Posting plugin information to Duo') - (response, soup) = self._do_post(frame_url, data=payload) - - sid = unquote(urlparse.urlparse(response.request.url).query[4:]) - new_action = self._get_form_action(soup) - device = self._get_duo_device(soup, auth_device) - factor = self._get_auth_factor(soup, device) - - do_wa = device.value.startswith('WA') - # Finally send the POST request for an auth to Duo - payload = { - 'sid': sid, - 'device': device.value, - 'factor': factor.name, - 'out_of_date': '' - } - LOG.debug("Payload: %s", payload) - if factor.name == "Passcode": - payload['passcode'] = factor.value - prompt_sid_url = response.url - headers = {'Referer': prompt_sid_url, - 'Origin': origin_duo_host} - if do_wa: - # pull in the webauthn prompt - popup_url = (f'https://{duo_host}{new_action}/' - f'webauthn_auth_popup?sid={sid}&wkey={device.value}') - LOG.debug("Popup URL: %s", popup_url) - (status, soup) = self._do_get( - popup_url, - headers=headers) - payload = { - 'sid': sid, - 'device': device.value, - 'factor': factor.name if ( - device.name != "Security Key (U2F)" - and not device.value.startswith('WA')) else "WebAuthn Credential", - } - headers = {'Referer': popup_url, 'Origin': origin_duo_host} + (payload, _, new_action) = self._get_duo_plugin_payload(soup, frame_url, auth_device) (status, _) = self._do_post(f'https://{duo_host}{new_action}', data=payload, - headers=headers, soup=False) + # headers=headers, soup=False) + soup=False) # Response is of form # {"stat": "OK", "response": {"txid": "f95cbacc-151c-43a6-b462-b33420e72633"}} LOG.debug("Received response: %s", status.text) response = json.loads(status.text)['response'] txid = response['txid'] + sid = payload['sid'] LOG.debug("Received response %s", response) LOG.debug("Received transaction ID %s", txid) - headers = {'Referer': prompt_sid_url, 'Origin': origin_duo_host} # Initial call will NOT block (status, _) = self._do_post(f'https://{duo_host}/frame/status', - data={'sid': sid, 'txid': txid}, headers=headers, soup=False) + data={'sid': sid, 'txid': txid}, soup=False) # text from this will be something like # { # "stat": "OK", @@ -502,72 +648,22 @@ def login_two_factor(self, response_1fa, auth_device=None): print(status_data['response']['status']) allowed = status_data['response']['status_code'] == 'allow' - # there should never be a case where `allowed` is True if the user picked Security Key - if status_data['response']['status_code'] == 'webauthn_sent': - opts = status_data['response']['webauthn_credential_request_options'] - challenges = [r for r in opts['allowCredentials'] - if self._validate_webauthn_request(duo_host, opts['extensions']['appid'])] - if not challenges: - alohomora.die('Sorry, there was a problem talking to Duo.') - resp = self._get_webauthn_response(opts) - LOG.debug(resp) - - # include the session ID as passed to us earlier - payload['sid'] = sid - # webauthn_credential and webauthn_finish are magic strings here - payload['device'] = 'webauthn_credential' - payload['factor'] = 'webauthn_finish' - # these are a copy/paste from the duo integration's POST data - payload['out_of_date'] = None - payload['days_out_of_date'] = None - payload['days_to_block'] = 'None' - # finally, the response data itself needs to be a JSON string - payload['response_data'] = json.dumps(resp,separators=(',', ':')) - - LOG.debug(payload) - - (status, _) = self._do_post(f'https://{duo_host}{new_action}', data=payload, - headers=headers, soup=False) - status_data = json.loads(status.text) - # Response is of form - # {"stat": "OK", "response": {"txid": "f95cbacc-151c-43a6-b462-b33420e72633"}} - txid = json.loads(status.text)['response']['txid'] - LOG.debug("Received transaction ID %s from response %s", txid, status.text) - - # Initial call will NOT block - (status, _) = self._do_post(f'https://{duo_host}/frame/status', - data={'sid': sid, 'txid': txid}, headers=headers, soup=False) - status_data = json.loads(status.text) - LOG.info(str(status_data)) - if status_data['stat'] != 'OK': - LOG.error("Returned from inital status call: %s", status.text) - alohomora.die("Sorry, there was a problem talking to Duo.") - print(status_data['response']['status']) - allowed = status_data['response']['status_code'] == 'allow' - if not allowed: - alohomora.die("Sorry, there was a problem with your security key, try again.") - - while not allowed: - # call again to get status of request - # for a push notification, this will hang until the user approves/denies - # for a phone call, you need to keep polling until the user approves/denies - (status, _) = self._do_post(f'https://{duo_host}/frame/status', - data={'sid': sid, 'txid': txid}, soup=False) - status_data = json.loads(status.text) - - if status_data['stat'] != 'OK': - LOG.error("Returned from second status call: %s", status.text) - alohomora.die("Sorry, there was a problem talking to Duo.") - if status_data['response']['status_code'] == 'allow': - LOG.info("Login allowed!") - allowed = True - elif status_data['response']['status_code'] == 'deny': - LOG.error("Login disallowed: %s", status.text) - alohomora.die("The login was blocked!") + # If not immediately approved, poll for status + if not allowed: + # there should never be a case where `allowed` is True if the user picked Security Key + if status_data['response']['status_code'] == 'webauthn_sent': + (factor_name, txid, allowed, status_data) = self._process_webauthn_request( + status_data, + payload, + duo_host, + sid, + iframe=True) else: - LOG.info("Still waiting... (%s)", status_data['response']['status_code']) - LOG.debug(str(status_data)) - time.sleep(2) + status_data = self._wait_for_duo_status( + allowed, + f'https://{duo_host}/frame/status', + sid, + txid) signed_auth = '' if 'result_url' in status_data['response']: From d80bf4ff5bd1ed6d533dd143bd29bd8bf9e0f656 Mon Sep 17 00:00:00 2001 From: bcaselden-viasat Date: Mon, 11 Mar 2024 18:42:33 -0400 Subject: [PATCH 2/3] remove comment --- alohomora/req.py | 1 - 1 file changed, 1 deletion(-) diff --git a/alohomora/req.py b/alohomora/req.py index 01dc45d..4060a08 100644 --- a/alohomora/req.py +++ b/alohomora/req.py @@ -587,7 +587,6 @@ def _login_two_factor_iframe(self, response_1fa, auth_device=None): (payload, _, new_action) = self._get_duo_plugin_payload(soup, frame_url, auth_device) (status, _) = self._do_post(f'https://{duo_host}{new_action}', data=payload, - # headers=headers, soup=False) soup=False) # Response is of form From 7fdc0de673aa0a9be3366c774201723ced97344a Mon Sep 17 00:00:00 2001 From: bcaselden-viasat Date: Tue, 12 Mar 2024 09:59:50 -0400 Subject: [PATCH 3/3] bump version --- alohomora/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/alohomora/__init__.py b/alohomora/__init__.py index a22507c..58073cb 100644 --- a/alohomora/__init__.py +++ b/alohomora/__init__.py @@ -16,7 +16,7 @@ import sys -__version__ = '3.0.3' +__version__ = '3.1.0' __author__ = 'Viasat' __author_email__ = 'vice-support@viasat.com' __license__ = '(c) 2022 Viasat, Inc. See the LICENSE file for more details.'