Source code

Revision control

Copy as Markdown

Other Tools

from base64 import urlsafe_b64encode, b64decode
from collections import namedtuple
import logging
import re
import six
from .base import Resource
from .util import (calculate_mac,
utc_now)
from .exc import (CredentialsLookupError,
InvalidBewit,
MacMismatch,
TokenExpired)
log = logging.getLogger(__name__)
def get_bewit(resource):
"""
Returns a bewit identifier for the resource as a string.
:param resource:
Resource to generate a bewit for
:type resource: `mohawk.base.Resource`
"""
if resource.method != 'GET':
raise ValueError('bewits can only be generated for GET requests')
if resource.nonce != '':
raise ValueError('bewits must use an empty nonce')
mac = calculate_mac(
'bewit',
resource,
None,
)
if isinstance(mac, six.binary_type):
mac = mac.decode('ascii')
if resource.ext is None:
ext = ''
else:
ext = resource.ext
# Strip out \ from the client id
# since that can break parsing the response
# NB that the canonical implementation does not do this as of
# Oct 28, 2015, so this could break compat.
# We can leave \ in ext since validators can limit how many \ they split
# on (although again, the canonical implementation does not do this)
client_id = six.text_type(resource.credentials['id'])
if "\\" in client_id:
log.warn("Stripping backslash character(s) '\\' from client_id")
client_id = client_id.replace("\\", "")
# b64encode works only with bytes in python3, but all of our parameters are
# in unicode, so we need to encode them. The cleanest way to do this that
# works in both python 2 and 3 is to use string formatting to get a
# unicode string, and then explicitly encode it to bytes.
inner_bewit = u"{id}\\{exp}\\{mac}\\{ext}".format(
id=client_id,
exp=resource.timestamp,
mac=mac,
ext=ext,
)
inner_bewit_bytes = inner_bewit.encode('ascii')
bewit_bytes = urlsafe_b64encode(inner_bewit_bytes)
# Now decode the resulting bytes back to a unicode string
return bewit_bytes.decode('ascii')
bewittuple = namedtuple('bewittuple', 'id expiration mac ext')
def parse_bewit(bewit):
"""
Returns a `bewittuple` representing the parts of an encoded bewit string.
This has the following named attributes:
(id, expiration, mac, ext)
:param bewit:
A base64 encoded bewit string
:type bewit: str
"""
decoded_bewit = b64decode(bewit).decode('ascii')
bewit_parts = decoded_bewit.split("\\", 3)
if len(bewit_parts) != 4:
raise InvalidBewit('Expected 4 parts to bewit: %s' % decoded_bewit)
return bewittuple(*decoded_bewit.split("\\", 3))
def strip_bewit(url):
"""
Strips the bewit parameter out of a url.
Returns (encoded_bewit, stripped_url)
Raises InvalidBewit if no bewit found.
:param url:
The url containing a bewit parameter
:type url: str
"""
m = re.search('[?&]bewit=([^&]+)', url)
if not m:
raise InvalidBewit('no bewit data found')
bewit = m.group(1)
stripped_url = url[:m.start()] + url[m.end():]
return bewit, stripped_url
def check_bewit(url, credential_lookup, now=None):
"""
Validates the given bewit.
Returns True if the resource has a valid bewit parameter attached,
or raises a subclass of HawkFail otherwise.
:param credential_lookup:
Callable to look up the credentials dict by sender ID.
The credentials dict must have the keys:
``id``, ``key``, and ``algorithm``.
See :ref:`receiving-request` for an example.
:type credential_lookup: callable
:param now=None:
Unix epoch time for the current time to determine if bewit has expired.
If None, then the current time as given by utc_now() is used.
:type now=None: integer
"""
raw_bewit, stripped_url = strip_bewit(url)
bewit = parse_bewit(raw_bewit)
try:
credentials = credential_lookup(bewit.id)
except LookupError:
raise CredentialsLookupError('Could not find credentials for ID {0}'
.format(bewit.id))
res = Resource(url=stripped_url,
method='GET',
credentials=credentials,
timestamp=bewit.expiration,
nonce='',
ext=bewit.ext,
)
mac = calculate_mac('bewit', res, None)
mac = mac.decode('ascii')
if mac != bewit.mac:
raise MacMismatch('bewit with mac {bewit_mac} did not match expected mac {expected_mac}'
.format(bewit_mac=bewit.mac,
expected_mac=mac))
# Check that the timestamp isn't expired
if now is None:
# TODO: Add offset/skew
now = utc_now()
if int(bewit.expiration) < now:
# TODO: Refactor TokenExpired to handle this better
raise TokenExpired('bewit with UTC timestamp {ts} has expired; '
'it was compared to {now}'
.format(ts=bewit.expiration, now=now),
localtime_in_seconds=now,
www_authenticate=''
)
return True