Source code

Revision control

Copy as Markdown

Other Tools

# mypy: allow-untyped-defs
import json
import os
import pathlib
from collections import defaultdict
from urllib.parse import quote, unquote, urljoin
from .constants import content_types
from .pipes import Pipeline, template
from .ranges import RangeParser
from .request import Authentication
from .response import MultipartContent
from .utils import HTTPException
from html import escape
__all__ = ["file_handler", "python_script_handler",
"FunctionHandler", "handler", "json_handler",
"as_is_handler", "ErrorHandler", "BasicAuthHandler"]
def guess_content_type(path):
ext = os.path.splitext(path)[1].lstrip(".")
if ext in content_types:
return content_types[ext]
return "application/octet-stream"
def filesystem_path(base_path, request, url_base="/"):
if base_path is None:
base_path = request.doc_root
path = unquote(request.url_parts.path)
if path.startswith(url_base):
path = path[len(url_base):]
if ".." in path:
raise HTTPException(404)
new_path = os.path.join(base_path, path)
# Otherwise setting path to / allows access outside the root directory
if not new_path.startswith(base_path):
raise HTTPException(404)
return new_path
class DirectoryHandler:
def __init__(self, base_path=None, url_base="/"):
self.base_path = base_path
self.url_base = url_base
def __repr__(self):
return "<%s base_path:%s url_base:%s>" % (self.__class__.__name__, self.base_path, self.url_base)
def __call__(self, request, response):
url_path = request.url_parts.path
if not url_path.endswith("/"):
response.status = 301
response.headers = [("Location", "%s/" % request.url)]
return
path = filesystem_path(self.base_path, request, self.url_base)
assert os.path.isdir(path)
response.headers = [("Content-Type", "text/html")]
response.content = """<!doctype html>
<meta name="viewport" content="width=device-width">
<title>Directory listing for %(path)s</title>
<h1>Directory listing for %(path)s</h1>
<ul>
%(items)s
</ul>
""" % {"path": escape(url_path),
"items": "\n".join(self.list_items(url_path, path))} # noqa: E122
def list_items(self, base_path, path):
assert base_path.endswith("/")
# TODO: this won't actually list all routes, only the
# ones that correspond to a real filesystem path. It's
# not possible to list every route that will match
# something, but it should be possible to at least list the
# statically defined ones
if base_path != "/":
link = urljoin(base_path, "..")
yield ("""<li class="dir"><a href="%(link)s">%(name)s</a></li>""" %
{"link": link, "name": ".."})
items = []
prev_item = None
# This ensures that .headers always sorts after the file it provides the headers for. E.g.,
# if we have x, x-y, and x.headers, the order will be x, x.headers, and then x-y.
for item in sorted(os.listdir(path), key=lambda x: (x[:-len(".headers")], x) if x.endswith(".headers") else (x, x)):
if prev_item and prev_item + ".headers" == item:
items[-1][1] = item
prev_item = None
continue
items.append([item, None])
prev_item = item
for item, dot_headers in items:
link = escape(quote(item))
dot_headers_markup = ""
if dot_headers is not None:
dot_headers_markup = (""" (<a href="%(link)s">.headers</a>)""" %
{"link": escape(quote(dot_headers))})
if os.path.isdir(os.path.join(path, item)):
link += "/"
class_ = "dir"
else:
class_ = "file"
yield ("""<li class="%(class)s"><a href="%(link)s">%(name)s</a>%(headers)s</li>""" %
{"link": link, "name": escape(item), "class": class_,
"headers": dot_headers_markup})
def parse_qs(qs):
"""Parse a query string given as a string argument (data of type
application/x-www-form-urlencoded). Data are returned as a dictionary. The
dictionary keys are the unique query variable names and the values are
lists of values for each name.
This implementation is used instead of Python's built-in `parse_qs` method
in order to support the semicolon character (which the built-in method
interprets as a parameter delimiter)."""
pairs = [item.split("=", 1) for item in qs.split('&') if item]
rv = defaultdict(list)
for pair in pairs:
if len(pair) == 1 or len(pair[1]) == 0:
continue
name = unquote(pair[0].replace('+', ' '))
value = unquote(pair[1].replace('+', ' '))
rv[name].append(value)
return dict(rv)
def wrap_pipeline(path, request, response):
"""Applies pipelines to a response.
Pipelines are specified in the filename (.sub.) or the query param (?pipe).
"""
query = parse_qs(request.url_parts.query)
pipe_string = ""
if ".sub." in path:
ml_extensions = {".html", ".htm", ".xht", ".xhtml", ".xml", ".svg"}
escape_type = "html" if os.path.splitext(path)[1] in ml_extensions else "none"
pipe_string = "sub(%s)" % escape_type
if "pipe" in query:
if pipe_string:
pipe_string += "|"
pipe_string += query["pipe"][-1]
if pipe_string:
response = Pipeline(pipe_string)(request, response)
return response
def load_headers(request, path):
"""Loads headers from files for a given path.
Attempts to load both the neighbouring __dir__{.sub}.headers and
PATH{.sub}.headers (applying template substitution if needed); results are
concatenated in that order.
"""
def _load(request, path):
headers_path = path + ".sub.headers"
if os.path.exists(headers_path):
use_sub = True
else:
headers_path = path + ".headers"
use_sub = False
try:
with open(headers_path, "rb") as headers_file:
data = headers_file.read()
except OSError:
return []
else:
if use_sub:
data = template(request, data, escape_type="none")
return [tuple(item.strip() for item in line.split(b":", 1))
for line in data.splitlines() if line]
return (_load(request, os.path.join(os.path.dirname(path), "__dir__")) +
_load(request, path))
class FileHandler:
def __init__(self, base_path=None, url_base="/"):
self.base_path = base_path
self.url_base = url_base
self.directory_handler = DirectoryHandler(self.base_path, self.url_base)
def __repr__(self):
return "<%s base_path:%s url_base:%s>" % (self.__class__.__name__, self.base_path, self.url_base)
def __call__(self, request, response):
path = filesystem_path(self.base_path, request, self.url_base)
if os.path.isdir(path):
return self.directory_handler(request, response)
try:
#This is probably racy with some other process trying to change the file
file_size = os.stat(path).st_size
response.headers.update(self.get_headers(request, path))
if "Range" in request.headers:
try:
byte_ranges = RangeParser()(request.headers['Range'], file_size)
except HTTPException as e:
if e.code == 416:
response.headers.set("Content-Range", "bytes */%i" % file_size)
raise
else:
byte_ranges = None
data = self.get_data(response, path, byte_ranges)
response.content = data
response = wrap_pipeline(path, request, response)
return response
except OSError:
raise HTTPException(404)
def get_headers(self, request, path):
rv = load_headers(request, path)
if not any(key.lower() == b"content-type" for (key, _) in rv):
rv.insert(0, (b"Content-Type", guess_content_type(path).encode("ascii")))
return rv
def get_data(self, response, path, byte_ranges):
"""Return either the handle to a file, or a string containing
the content of a chunk of the file, if we have a range request."""
if byte_ranges is None:
return open(path, 'rb')
else:
with open(path, 'rb') as f:
response.status = 206
if len(byte_ranges) > 1:
parts_content_type, content = self.set_response_multipart(response,
byte_ranges,
f)
for byte_range in byte_ranges:
content.append_part(self.get_range_data(f, byte_range),
parts_content_type,
[("Content-Range", byte_range.header_value())])
return content
else:
response.headers.set("Content-Range", byte_ranges[0].header_value())
return self.get_range_data(f, byte_ranges[0])
def set_response_multipart(self, response, ranges, f):
parts_content_type = response.headers.get("Content-Type")
if parts_content_type:
parts_content_type = parts_content_type[-1]
else:
parts_content_type = None
content = MultipartContent()
response.headers.set("Content-Type", "multipart/byteranges; boundary=%s" % content.boundary)
return parts_content_type, content
def get_range_data(self, f, byte_range):
f.seek(byte_range.lower)
return f.read(byte_range.upper - byte_range.lower)
file_handler = FileHandler() # type: ignore
class PythonScriptHandler:
def __init__(self, base_path=None, url_base="/"):
self.base_path = base_path
self.url_base = url_base
def __repr__(self):
return "<%s base_path:%s url_base:%s>" % (self.__class__.__name__, self.base_path, self.url_base)
def _load_file(self, request, response, func):
"""
This loads the requested python file as an environ variable.
If the requested file is a directory, this instead loads the first
lexicographically sorted file found in that directory that matches
"default*.py".
Once the environ is loaded, the passed `func` is run with this loaded environ.
:param request: The request object
:param response: The response object
:param func: The function to be run with the loaded environ with the modified filepath. Signature: (request, response, environ, path)
:return: The return of func
"""
path = filesystem_path(self.base_path, request, self.url_base)
# Find a default Python file if the specified path is a directory
if os.path.isdir(path):
default_py_files = sorted(list(filter(
pathlib.Path.is_file,
pathlib.Path(path).glob("default*.py"))))
if default_py_files:
path = str(default_py_files[0])
try:
environ = {"__file__": path}
with open(path, 'rb') as f:
exec(compile(f.read(), path, 'exec'), environ, environ)
if func is not None:
return func(request, response, environ, path)
except OSError:
raise HTTPException(404)
def __call__(self, request, response):
def func(request, response, environ, path):
if "main" in environ:
handler = FunctionHandler(environ["main"])
handler(request, response)
wrap_pipeline(path, request, response)
else:
raise HTTPException(500, "No main function in script %s" % path)
self._load_file(request, response, func)
def frame_handler(self, request):
"""
This creates a FunctionHandler with one or more of the handling functions.
Used by the H2 server.
:param request: The request object used to generate the handler.
:return: A FunctionHandler object with one or more of these functions: `handle_headers`, `handle_data` or `main`
"""
def func(request, response, environ, path):
def _main(req, resp):
pass
handler = FunctionHandler(_main)
if "main" in environ:
handler.func = environ["main"]
if "handle_headers" in environ:
handler.handle_headers = environ["handle_headers"]
if "handle_data" in environ:
handler.handle_data = environ["handle_data"]
if handler.func is _main and not hasattr(handler, "handle_headers") and not hasattr(handler, "handle_data"):
raise HTTPException(500, "No main function or handlers in script %s" % path)
return handler
return self._load_file(request, None, func)
python_script_handler = PythonScriptHandler() # type: ignore
class FunctionHandler:
def __init__(self, func):
self.func = func
def __call__(self, request, response):
try:
rv = self.func(request, response)
except HTTPException:
raise
except Exception as e:
raise HTTPException(500) from e
if rv is not None:
if isinstance(rv, tuple):
if len(rv) == 3:
status, headers, content = rv
response.status = status
elif len(rv) == 2:
headers, content = rv
else:
raise HTTPException(500)
response.headers.update(headers)
else:
content = rv
response.content = content
wrap_pipeline('', request, response)
# The generic name here is so that this can be used as a decorator
def handler(func):
return FunctionHandler(func)
class JsonHandler:
def __init__(self, func):
self.func = func
def __call__(self, request, response):
return FunctionHandler(self.handle_request)(request, response)
def handle_request(self, request, response):
rv = self.func(request, response)
response.headers.set("Content-Type", "application/json")
enc = json.dumps
if isinstance(rv, tuple):
rv = list(rv)
value = tuple(rv[:-1] + [enc(rv[-1])])
length = len(value[-1])
else:
value = enc(rv)
length = len(value)
response.headers.set("Content-Length", length)
return value
def json_handler(func):
return JsonHandler(func)
class AsIsHandler:
def __init__(self, base_path=None, url_base="/"):
self.base_path = base_path
self.url_base = url_base
def __call__(self, request, response):
path = filesystem_path(self.base_path, request, self.url_base)
if os.path.isdir(path):
raise HTTPException(
500, "AsIsHandler cannot process directory, %s" % path)
try:
with open(path, 'rb') as f:
response.writer.write_raw_content(f.read())
wrap_pipeline(path, request, response)
response.close_connection = True
except OSError:
raise HTTPException(404)
as_is_handler = AsIsHandler() # type: ignore
class BasicAuthHandler:
def __init__(self, handler, user, password):
"""
A Basic Auth handler
:Args:
- handler: a secondary handler for the request after authentication is successful (example file_handler)
- user: string of the valid user name or None if any / all credentials are allowed
- password: string of the password required
"""
self.user = user
self.password = password
self.handler = handler
def __call__(self, request, response):
if "authorization" not in request.headers:
response.status = 401
response.headers.set("WWW-Authenticate", "Basic")
return response
else:
auth = Authentication(request.headers)
if self.user is not None and (self.user != auth.username or self.password != auth.password):
response.set_error(403, "Invalid username or password")
return response
return self.handler(request, response)
basic_auth_handler = BasicAuthHandler(file_handler, None, None) # type: ignore
class ErrorHandler:
def __init__(self, status):
self.status = status
def __call__(self, request, response):
response.set_error(self.status)
class StringHandler:
def __init__(self, data, content_type, **headers):
"""Handler that returns a fixed data string and headers
:param data: String to use
:param content_type: Content type header to server the response with
:param headers: List of headers to send with responses"""
self.data = data
self.resp_headers = [("Content-Type", content_type)]
for k, v in headers.items():
self.resp_headers.append((k.replace("_", "-"), v))
self.handler = handler(self.handle_request)
def handle_request(self, request, response):
return self.resp_headers, self.data
def __call__(self, request, response):
rv = self.handler(request, response)
return rv
class StaticHandler:
def __init__(self, path, format_args, content_type, **headers):
"""Handler that reads a file from a path and substitutes some fixed data
Note that *.headers files have no effect in this handler.
:param path: Path to the template file to use
:param format_args: Dictionary of values to substitute into the template file
:param content_type: Content type header to server the response with
:param headers: List of headers to send with responses"""
self._path = path
self._format_args = format_args
self._content_type = content_type
self._headers = headers
self._handler = None
def __getnewargs_ex__(self):
# Do not pickle `self._handler`, which can be arbitrarily large.
args = self._path, self._format_args, self._content_type
return args, self._headers
def __call__(self, request, response):
# Load the static file contents lazily so that this handler can be
# pickled and sent to child processes efficiently. Transporting file
# contents across processes can slow `wptserve` startup by several
# seconds (crbug.com/1479850).
if not self._handler:
with open(self._path) as f:
data = f.read()
if self._format_args:
data = data % self._format_args
self._handler = StringHandler(data, self._content_type, **self._headers)
return self._handler(request, response)