Skip to content

Commit

Permalink
Merge pull request #50 from Viasat/add-duo-universal-support
Browse files Browse the repository at this point in the history
add duo univ prompt support
  • Loading branch information
bcaselden-viasat committed Mar 13, 2024
2 parents cb8a98c + 7fdc0de commit a82d78b
Show file tree
Hide file tree
Showing 2 changed files with 210 additions and 115 deletions.
2 changes: 1 addition & 1 deletion alohomora/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import sys

__version__ = '3.0.3'
__version__ = '3.1.0'
__author__ = 'Viasat'
__author_email__ = '[email protected]'
__license__ = '(c) 2022 Viasat, Inc. See the LICENSE file for more details.'
Expand Down
323 changes: 209 additions & 114 deletions alohomora/req.py
Original file line number Diff line number Diff line change
Expand Up @@ -366,9 +366,199 @@ def login_one_factor(self, username, password):
return (True, assertion)
return (False, response)

def _get_duo_plugin_payload(self, soup, post_url, auth_device):
''' Get the DUO plugin payload '''
payload = { inputtag.get('name', ''): inputtag.get('value', '')
for inputtag in soup.find_all('input') }

xsrf = payload.get('_xsrf', '')

# Post data to emulate the plugin determination
(response, soup) = self._do_post(post_url, data=payload)

sid = unquote(urlparse.urlparse(response.request.url).query[4:])
new_action = self._get_form_action(soup)
device = self._get_duo_device(soup, auth_device)
factor = self._get_auth_factor(soup, device)

do_wa = device.value.startswith('WA')
plugin_payload = {
'sid': sid,
'device': device.value,
'factor': factor.name,
'out_of_date': ''
}

if factor.name == "Passcode":
plugin_payload['passcode'] = factor.value

if do_wa:
# configure webauthn plugin info
plugin_payload['factor'] = factor.name if (
device.name != "Security Key (U2F)"
and not device.value.startswith('WA')) else "WebAuthn Credential"

return (plugin_payload, xsrf, new_action)

def _wait_for_duo_status(self, allowed, duo_status_endpoint, sid, txid):
''' Poll Duo for MFA status '''
while not allowed:
# call again to get status of request
# for a push notification, this will hang until the user approves/denies
# for a phone call, you need to keep polling until the user approves/denies
(status, _) = self._do_post(duo_status_endpoint,
data={'sid': sid, 'txid': txid}, soup=False)
status_data = json.loads(status.text)

if status_data['stat'] != 'OK':
LOG.error(f"Returned from second status call: {status.text}")
alohomora.die("Sorry, there was a problem talking to Duo.")
if status_data['response']['status_code'] == 'allow':
LOG.info("Login allowed!")
allowed = True
elif status_data['response']['status_code'] == 'deny':
LOG.error(f"Login disallowed: {status.text}")
alohomora.die("The login was blocked!")
else:
print(f"Still waiting... ({status_data['response']['status_code']})")
LOG.info(f"Still waiting... ({status_data['response']['status_code']})")
LOG.debug(str(status_data))
time.sleep(2)

return status_data

def _process_webauthn_request(self, status_data, plugin_payload, duo_host, sid, iframe=False):
''' Wait for webauthn device to be approved '''
if not iframe:
duo_prompt_endpoint = f'https://{duo_host}/frame/v4/prompt'
duo_status_endpoint = f'https://{duo_host}/frame/v4/status'
else:
duo_prompt_endpoint = f'https://{duo_host}/frame/prompt'
duo_status_endpoint = f'https://{duo_host}/frame/status'
opts = status_data['response']['webauthn_credential_request_options']
challenges = [r for r in opts['allowCredentials']
if self._validate_webauthn_request(duo_host, opts['extensions']['appid'])]
if not challenges:
alohomora.die('Sorry, there was a problem talking to Duo.')
resp = self._get_webauthn_response(opts)
LOG.debug(resp)

# include the session ID as passed to us earlier
plugin_payload['sid'] = sid
# webauthn_credential and webauthn_finish are magic strings here
plugin_payload['device'] = 'webauthn_credential'
plugin_payload['factor'] = 'webauthn_finish'
# these are a copy/paste from the duo integration's POST data
plugin_payload['out_of_date'] = None
plugin_payload['days_out_of_date'] = None
plugin_payload['days_to_block'] = 'None'
# finally, the response data itself needs to be a JSON string
plugin_payload['response_data'] = json.dumps(resp,separators=(',', ':'))

LOG.debug(plugin_payload)

(status, _) = self._do_post(duo_prompt_endpoint, data=plugin_payload,
soup=False)
status_data = json.loads(status.text)
# Response is of form
# {"stat": "OK", "response": {"txid": "f95cbacc-151c-43a6-b462-b33420e72633"}}
txid = json.loads(status.text)['response']['txid']
LOG.debug("Received transaction ID %s from response %s", txid, status.text)

# Initial call will NOT block
(status, _) = self._do_post(duo_status_endpoint,
data={'sid': sid, 'txid': txid}, soup=False)
status_data = json.loads(status.text)
LOG.info(str(status_data))
if status_data['stat'] != 'OK':
LOG.error("Returned from inital status call: %s", status.text)
alohomora.die("Sorry, there was a problem talking to Duo.")
allowed = status_data['response']['status_code'] == 'allow'
if not allowed:
alohomora.die("Sorry, there was a problem with your security key, try again.")

factor_name = 'webauthn_finish'
return (factor_name, txid, allowed, status_data)

def login_two_factor(self, response_1fa, auth_device=None):
"""Log in with the second factor, borrowing first factor data if necessary"""
# If redirected to duosecurity we are doing the oidc flow
if urlparse.urlparse(response_1fa.url).netloc.endswith('duosecurity.com'):
LOG.debug("Using DUO universal prompt")
return self._login_two_factor_duo_univ(response_1fa, auth_device)
else:
LOG.debug("Using DUO iframe prompt")
return self._login_two_factor_iframe(response_1fa, auth_device)

def _login_two_factor_duo_univ(self, response_1fa, auth_device=None):
"""Log in with the second factor, borrowing first factor data if necessary"""
soup = BeautifulSoup(response_1fa.content, 'html.parser')
post_url = response_1fa.url
duo_host = urlparse.urlparse(post_url).netloc

(plugin_payload, xsrf, _) = self._get_duo_plugin_payload(soup, post_url, auth_device)

sid = plugin_payload['sid']
factor_name = plugin_payload['factor']

# Start the Duo Auth request
(status, _) = self._do_post(f'https://{duo_host}/frame/v4/prompt',
data=plugin_payload,
soup=False)

# Response is of form
# {"stat": "OK", "response": {"txid": "f95cbacc-151c-43a6-b462-b33420e72633"}}
response = json.loads(status.text)['response']
txid = response['txid']

LOG.debug("Received response: %s", status.text)
LOG.debug("Received response %s", response)
LOG.debug("Received transaction ID %s", txid)

# Initial call will NOT block
(status, _) = self._do_post(f'https://{duo_host}/frame/v4/status',
data={'sid': sid, 'txid': txid}, soup=False)

status_data = json.loads(status.text)
if status_data['stat'] != 'OK':
LOG.error("Returned from inital status call: %s", status.text)
alohomora.die("Sorry, there was a problem talking to Duo.")
allowed = status_data['response']['status_code'] == 'allow'

# If not immediately approved, poll for status
if not allowed:
# there should never be a case where `allowed` is True if the user picked Security Key
if status_data['response']['status_code'] == 'webauthn_sent':
(factor_name, txid, allowed, _) = self._process_webauthn_request(
status_data,
plugin_payload,
duo_host,
sid)
else:
self._wait_for_duo_status(allowed,
f'https://{duo_host}/frame/v4/status',
sid,
txid)

payload = {
"sid": sid,
"txid": txid,
"factor": factor_name,
"device_key": "",
"_xsrf": xsrf,
"dampen_choice": False,
}

duo_exit_url = f'https://{duo_host}/frame/v4/oidc/exit'

(response, soup) = self._do_post(duo_exit_url,
data=payload, soup=True)

assertion = self._get_assertion(soup)
return (True, assertion)

def _login_two_factor_iframe(self, response_1fa, auth_device=None):
"""Log in with the second factor, borrowing first factor data if necessary"""
soup_1fa = BeautifulSoup(response_1fa.text, 'html.parser')
duo_host = None
sig_request = None
Expand All @@ -394,68 +584,23 @@ def login_two_factor(self, response_1fa, auth_device=None):
'Origin': origin_duo_host
})

payload = {}
for inputtag in soup.find_all('input'):
name = inputtag.get('name', '')
value = inputtag.get('value', '')
# Populate all parameters with the existing value (picks up hidden fields too)
payload[name] = value

# Post data to emulate the plugin determination
LOG.info('Posting plugin information to Duo')
(response, soup) = self._do_post(frame_url, data=payload)

sid = unquote(urlparse.urlparse(response.request.url).query[4:])
new_action = self._get_form_action(soup)
device = self._get_duo_device(soup, auth_device)
factor = self._get_auth_factor(soup, device)

do_wa = device.value.startswith('WA')
# Finally send the POST request for an auth to Duo
payload = {
'sid': sid,
'device': device.value,
'factor': factor.name,
'out_of_date': ''
}
LOG.debug("Payload: %s", payload)
if factor.name == "Passcode":
payload['passcode'] = factor.value
prompt_sid_url = response.url
headers = {'Referer': prompt_sid_url,
'Origin': origin_duo_host}
if do_wa:
# pull in the webauthn prompt
popup_url = (f'https://{duo_host}{new_action}/'
f'webauthn_auth_popup?sid={sid}&wkey={device.value}')
LOG.debug("Popup URL: %s", popup_url)
(status, soup) = self._do_get(
popup_url,
headers=headers)
payload = {
'sid': sid,
'device': device.value,
'factor': factor.name if (
device.name != "Security Key (U2F)"
and not device.value.startswith('WA')) else "WebAuthn Credential",
}
headers = {'Referer': popup_url, 'Origin': origin_duo_host}
(payload, _, new_action) = self._get_duo_plugin_payload(soup, frame_url, auth_device)

(status, _) = self._do_post(f'https://{duo_host}{new_action}', data=payload,
headers=headers, soup=False)
soup=False)

# Response is of form
# {"stat": "OK", "response": {"txid": "f95cbacc-151c-43a6-b462-b33420e72633"}}
LOG.debug("Received response: %s", status.text)
response = json.loads(status.text)['response']
txid = response['txid']
sid = payload['sid']
LOG.debug("Received response %s", response)
LOG.debug("Received transaction ID %s", txid)

headers = {'Referer': prompt_sid_url, 'Origin': origin_duo_host}
# Initial call will NOT block
(status, _) = self._do_post(f'https://{duo_host}/frame/status',
data={'sid': sid, 'txid': txid}, headers=headers, soup=False)
data={'sid': sid, 'txid': txid}, soup=False)
# text from this will be something like
# {
# "stat": "OK",
Expand Down Expand Up @@ -502,72 +647,22 @@ def login_two_factor(self, response_1fa, auth_device=None):
print(status_data['response']['status'])
allowed = status_data['response']['status_code'] == 'allow'

# there should never be a case where `allowed` is True if the user picked Security Key
if status_data['response']['status_code'] == 'webauthn_sent':
opts = status_data['response']['webauthn_credential_request_options']
challenges = [r for r in opts['allowCredentials']
if self._validate_webauthn_request(duo_host, opts['extensions']['appid'])]
if not challenges:
alohomora.die('Sorry, there was a problem talking to Duo.')
resp = self._get_webauthn_response(opts)
LOG.debug(resp)

# include the session ID as passed to us earlier
payload['sid'] = sid
# webauthn_credential and webauthn_finish are magic strings here
payload['device'] = 'webauthn_credential'
payload['factor'] = 'webauthn_finish'
# these are a copy/paste from the duo integration's POST data
payload['out_of_date'] = None
payload['days_out_of_date'] = None
payload['days_to_block'] = 'None'
# finally, the response data itself needs to be a JSON string
payload['response_data'] = json.dumps(resp,separators=(',', ':'))

LOG.debug(payload)

(status, _) = self._do_post(f'https://{duo_host}{new_action}', data=payload,
headers=headers, soup=False)
status_data = json.loads(status.text)
# Response is of form
# {"stat": "OK", "response": {"txid": "f95cbacc-151c-43a6-b462-b33420e72633"}}
txid = json.loads(status.text)['response']['txid']
LOG.debug("Received transaction ID %s from response %s", txid, status.text)

# Initial call will NOT block
(status, _) = self._do_post(f'https://{duo_host}/frame/status',
data={'sid': sid, 'txid': txid}, headers=headers, soup=False)
status_data = json.loads(status.text)
LOG.info(str(status_data))
if status_data['stat'] != 'OK':
LOG.error("Returned from inital status call: %s", status.text)
alohomora.die("Sorry, there was a problem talking to Duo.")
print(status_data['response']['status'])
allowed = status_data['response']['status_code'] == 'allow'
if not allowed:
alohomora.die("Sorry, there was a problem with your security key, try again.")

while not allowed:
# call again to get status of request
# for a push notification, this will hang until the user approves/denies
# for a phone call, you need to keep polling until the user approves/denies
(status, _) = self._do_post(f'https://{duo_host}/frame/status',
data={'sid': sid, 'txid': txid}, soup=False)
status_data = json.loads(status.text)

if status_data['stat'] != 'OK':
LOG.error("Returned from second status call: %s", status.text)
alohomora.die("Sorry, there was a problem talking to Duo.")
if status_data['response']['status_code'] == 'allow':
LOG.info("Login allowed!")
allowed = True
elif status_data['response']['status_code'] == 'deny':
LOG.error("Login disallowed: %s", status.text)
alohomora.die("The login was blocked!")
# If not immediately approved, poll for status
if not allowed:
# there should never be a case where `allowed` is True if the user picked Security Key
if status_data['response']['status_code'] == 'webauthn_sent':
(factor_name, txid, allowed, status_data) = self._process_webauthn_request(
status_data,
payload,
duo_host,
sid,
iframe=True)
else:
LOG.info("Still waiting... (%s)", status_data['response']['status_code'])
LOG.debug(str(status_data))
time.sleep(2)
status_data = self._wait_for_duo_status(
allowed,
f'https://{duo_host}/frame/status',
sid,
txid)

signed_auth = ''
if 'result_url' in status_data['response']:
Expand Down

0 comments on commit a82d78b

Please sign in to comment.