Skip to content
Snippets Groups Projects
Select Git revision
  • 05ec38f6f8334f31748e612d30c8e269a30795fb
  • master default protected
2 results

hello.arduino.328P.blink.c.hex

Blame
  • microdot.py NaN GiB
    """
    microdot
    --------
    
    The ``microdot`` module defines a few classes that help implement HTTP-based
    servers for MicroPython and standard Python, with multithreading support for
    Python interpreters that support it.
    """
    try:
        from sys import print_exception
    except ImportError:  # pragma: no cover
        import traceback
    
        def print_exception(exc):
            traceback.print_exc()
    try:
        import uerrno as errno
    except ImportError:
        import errno
    
    concurrency_mode = 'threaded'
    
    try:  # pragma: no cover
        import threading
    
        def create_thread(f, *args, **kwargs):
            # use the threading module
            threading.Thread(target=f, args=args, kwargs=kwargs).start()
    except ImportError:  # pragma: no cover
        def create_thread(f, *args, **kwargs):
            # no threads available, call function synchronously
            f(*args, **kwargs)
    
        concurrency_mode = 'sync'
    
    try:
        import ujson as json
    except ImportError:
        import json
    
    try:
        import ure as re
    except ImportError:
        import re
    
    socket_timeout_error = OSError
    try:
        import usocket as socket
    except ImportError:
        try:
            import socket
            socket_timeout_error = socket.timeout
        except ImportError:  # pragma: no cover
            socket = None
    
    MUTED_SOCKET_ERRORS = [
        32,  # Broken pipe
        54,  # Connection reset by peer
        104,  # Connection reset by peer
        128,  # Operation on closed socket
    ]
    
    
    def urldecode_str(s):
        s = s.replace('+', ' ')
        parts = s.split('%')
        if len(parts) == 1:
            return s
        result = [parts[0]]
        for item in parts[1:]:
            if item == '':
                result.append('%')
            else:
                code = item[:2]
                result.append(chr(int(code, 16)))
                result.append(item[2:])
        return ''.join(result)
    
    
    def urldecode_bytes(s):
        s = s.replace(b'+', b' ')
        parts = s.split(b'%')
        if len(parts) == 1:
            return s.decode()
        result = [parts[0]]
        for item in parts[1:]:
            if item == b'':
                result.append(b'%')
            else:
                code = item[:2]
                result.append(bytes([int(code, 16)]))
                result.append(item[2:])
        return b''.join(result).decode()
    
    
    def urlencode(s):
        return s.replace('+', '%2B').replace(' ', '+').replace(
            '%', '%25').replace('?', '%3F').replace('#', '%23').replace(
                '&', '%26').replace('=', '%3D')
    
    
    class NoCaseDict(dict):
        """A subclass of dictionary that holds case-insensitive keys.
    
        :param initial_dict: an initial dictionary of key/value pairs to
                             initialize this object with.
    
        Example::
    
            >>> d = NoCaseDict()
            >>> d['Content-Type'] = 'text/html'
            >>> print(d['Content-Type'])
            text/html
            >>> print(d['content-type'])
            text/html
            >>> print(d['CONTENT-TYPE'])
            text/html
            >>> del d['cOnTeNt-TyPe']
            >>> print(d)
            {}
        """
        def __init__(self, initial_dict=None):
            super().__init__(initial_dict or {})
            self.keymap = {k.lower(): k for k in self.keys() if k.lower() != k}
    
        def __setitem__(self, key, value):
            kl = key.lower()
            key = self.keymap.get(kl, key)
            if kl != key:
                self.keymap[kl] = key
            super().__setitem__(key, value)
    
        def __getitem__(self, key):
            kl = key.lower()
            return super().__getitem__(self.keymap.get(kl, kl))
    
        def __delitem__(self, key):
            kl = key.lower()
            super().__delitem__(self.keymap.get(kl, kl))
    
        def __contains__(self, key):
            kl = key.lower()
            return self.keymap.get(kl, kl) in self.keys()
    
        def get(self, key, default=None):
            kl = key.lower()
            return super().get(self.keymap.get(kl, kl), default)
    
        def update(self, other_dict):
            for key, value in other_dict.items():
                self[key] = value
    
    
    def mro(cls):  # pragma: no cover
        """Return the method resolution order of a class.
    
        This is a helper function that returns the method resolution order of a
        class. It is used by Microdot to find the best error handler to invoke for
        the raised exception.
    
        In CPython, this function returns the ``__mro__`` attribute of the class.
        In MicroPython, this function implements a recursive depth-first scanning
        of the class hierarchy.
        """
        if hasattr(cls, 'mro'):
            return cls.__mro__
    
        def _mro(cls):
            m = [cls]
            for base in cls.__bases__:
                m += _mro(base)
            return m
    
        mro_list = _mro(cls)
    
        # If a class appears multiple times (due to multiple inheritance) remove
        # all but the last occurence. This matches the method resolution order
        # of MicroPython, but not CPython.
        mro_pruned = []
        for i in range(len(mro_list)):
            base = mro_list.pop(0)
            if base not in mro_list:
                mro_pruned.append(base)
        return mro_pruned
    
    
    class MultiDict(dict):
        """A subclass of dictionary that can hold multiple values for the same
        key. It is used to hold key/value pairs decoded from query strings and
        form submissions.
    
        :param initial_dict: an initial dictionary of key/value pairs to
                             initialize this object with.
    
        Example::
    
            >>> d = MultiDict()
            >>> d['sort'] = 'name'
            >>> d['sort'] = 'email'
            >>> print(d['sort'])
            'name'
            >>> print(d.getlist('sort'))
            ['name', 'email']
        """
        def __init__(self, initial_dict=None):
            super().__init__()
            if initial_dict:
                for key, value in initial_dict.items():
                    self[key] = value
    
        def __setitem__(self, key, value):
            if key not in self:
                super().__setitem__(key, [])
            super().__getitem__(key).append(value)
    
        def __getitem__(self, key):
            return super().__getitem__(key)[0]
    
        def get(self, key, default=None, type=None):
            """Return the value for a given key.
    
            :param key: The key to retrieve.
            :param default: A default value to use if the key does not exist.
            :param type: A type conversion callable to apply to the value.
    
            If the multidict contains more than one value for the requested key,
            this method returns the first value only.
    
            Example::
    
                >>> d = MultiDict()
                >>> d['age'] = '42'
                >>> d.get('age')
                '42'
                >>> d.get('age', type=int)
                42
                >>> d.get('name', default='noname')
                'noname'
            """
            if key not in self:
                return default
            value = self[key]
            if type is not None:
                value = type(value)
            return value
    
        def getlist(self, key, type=None):
            """Return all the values for a given key.
    
            :param key: The key to retrieve.
            :param type: A type conversion callable to apply to the values.
    
            If the requested key does not exist in the dictionary, this method
            returns an empty list.
    
            Example::
    
                >>> d = MultiDict()
                >>> d.getlist('items')
                []
                >>> d['items'] = '3'
                >>> d.getlist('items')
                ['3']
                >>> d['items'] = '56'
                >>> d.getlist('items')
                ['3', '56']
                >>> d.getlist('items', type=int)
                [3, 56]
            """
            if key not in self:
                return []
            values = super().__getitem__(key)
            if type is not None:
                values = [type(value) for value in values]
            return values
    
    
    class Request():
        """An HTTP request."""
        #: Specify the maximum payload size that is accepted. Requests with larger
        #: payloads will be rejected with a 413 status code. Applications can
        #: change this maximum as necessary.
        #:
        #: Example::
        #:
        #:    Request.max_content_length = 1 * 1024 * 1024  # 1MB requests allowed
        max_content_length = 16 * 1024
    
        #: Specify the maximum payload size that can be stored in ``body``.
        #: Requests with payloads that are larger than this size and up to
        #: ``max_content_length`` bytes will be accepted, but the application will
        #: only be able to access the body of the request by reading from
        #: ``stream``. Set to 0 if you always access the body as a stream.
        #:
        #: Example::
        #:
        #:    Request.max_body_length = 4 * 1024  # up to 4KB bodies read
        max_body_length = 16 * 1024
    
        #: Specify the maximum length allowed for a line in the request. Requests
        #: with longer lines will not be correctly interpreted. Applications can
        #: change this maximum as necessary.
        #:
        #: Example::
        #:
        #:    Request.max_readline = 16 * 1024  # 16KB lines allowed
        max_readline = 2 * 1024
    
        #: Specify a suggested read timeout to use when reading the request. Set to
        #: 0 to disable the use of a timeout. This timeout should be considered a
        #: suggestion only, as some platforms may not support it. The default is
        #: 1 second.
        socket_read_timeout = 1
    
        class G:
            pass
    
        def __init__(self, app, client_addr, method, url, http_version, headers,
                     body=None, stream=None, sock=None):
            #: The application instance to which this request belongs.
            self.app = app
            #: The address of the client, as a tuple (host, port).
            self.client_addr = client_addr
            #: The HTTP method of the request.
            self.method = method
            #: The request URL, including the path and query string.
            self.url = url
            #: The path portion of the URL.
            self.path = url
            #: The query string portion of the URL.
            self.query_string = None
            #: The parsed query string, as a
            #: :class:`MultiDict <microdot.MultiDict>` object.
            self.args = {}
            #: A dictionary with the headers included in the request.
            self.headers = headers
            #: A dictionary with the cookies included in the request.
            self.cookies = {}
            #: The parsed ``Content-Length`` header.
            self.content_length = 0
            #: The parsed ``Content-Type`` header.
            self.content_type = None
            #: A general purpose container for applications to store data during
            #: the life of the request.
            self.g = Request.G()
    
            self.http_version = http_version
            if '?' in self.path:
                self.path, self.query_string = self.path.split('?', 1)
                self.args = self._parse_urlencoded(self.query_string)
    
            if 'Content-Length' in self.headers:
                self.content_length = int(self.headers['Content-Length'])
            if 'Content-Type' in self.headers:
                self.content_type = self.headers['Content-Type']
            if 'Cookie' in self.headers:
                for cookie in self.headers['Cookie'].split(';'):
                    name, value = cookie.strip().split('=', 1)
                    self.cookies[name] = value
    
            self._body = body
            self.body_used = False
            self._stream = stream
            self.stream_used = False
            self.sock = sock
            self._json = None
            self._form = None
            self.after_request_handlers = []
    
        @staticmethod
        def create(app, client_stream, client_addr, client_sock=None):
            """Create a request object.
    
    
            :param app: The Microdot application instance.
            :param client_stream: An input stream from where the request data can
                                  be read.
            :param client_addr: The address of the client, as a tuple.
            :param client_sock: The low-level socket associated with the request.
    
            This method returns a newly created ``Request`` object.
            """
            # request line
            line = Request._safe_readline(client_stream).strip().decode()
            if not line:
                return None
            method, url, http_version = line.split()
            http_version = http_version.split('/', 1)[1]
    
            # headers
            headers = NoCaseDict()
            while True:
                line = Request._safe_readline(client_stream).strip().decode()
                if line == '':
                    break
                header, value = line.split(':', 1)
                value = value.strip()
                headers[header] = value
    
            return Request(app, client_addr, method, url, http_version, headers,
                           stream=client_stream, sock=client_sock)
    
        def _parse_urlencoded(self, urlencoded):
            data = MultiDict()
            if len(urlencoded) > 0:
                if isinstance(urlencoded, str):
                    for kv in [pair.split('=', 1)
                               for pair in urlencoded.split('&') if pair]:
                        data[urldecode_str(kv[0])] = urldecode_str(kv[1]) \
                            if len(kv) > 1 else ''
                elif isinstance(urlencoded, bytes):  # pragma: no branch
                    for kv in [pair.split(b'=', 1)
                               for pair in urlencoded.split(b'&') if pair]:
                        data[urldecode_bytes(kv[0])] = urldecode_bytes(kv[1]) \
                            if len(kv) > 1 else b''
            return data
    
        @property
        def body(self):
            """The body of the request, as bytes."""
            if self.stream_used:
                raise RuntimeError('Cannot use both stream and body')
            if self._body is None:
                self._body = b''
                if self.content_length and \
                        self.content_length <= Request.max_body_length:
                    while len(self._body) < self.content_length:
                        data = self._stream.read(
                            self.content_length - len(self._body))
                        if len(data) == 0:  # pragma: no cover
                            raise EOFError()
                        self._body += data
                    self.body_used = True
            return self._body
    
        @property
        def stream(self):
            """The input stream, containing the request body."""
            if self.body_used:
                raise RuntimeError('Cannot use both stream and body')
            self.stream_used = True
            return self._stream
    
        @property
        def json(self):
            """The parsed JSON body, or ``None`` if the request does not have a
            JSON body."""
            if self._json is None:
                if self.content_type is None:
                    return None
                mime_type = self.content_type.split(';')[0]
                if mime_type != 'application/json':
                    return None
                self._json = json.loads(self.body.decode())
            return self._json
    
        @property
        def form(self):
            """The parsed form submission body, as a
            :class:`MultiDict <microdot.MultiDict>` object, or ``None`` if the
            request does not have a form submission."""
            if self._form is None:
                if self.content_type is None:
                    return None
                mime_type = self.content_type.split(';')[0]
                if mime_type != 'application/x-www-form-urlencoded':
                    return None
                self._form = self._parse_urlencoded(self.body)
            return self._form
    
        def after_request(self, f):
            """Register a request-specific function to run after the request is
            handled. Request-specific after request handlers run at the very end,
            after the application's own after request handlers. The function must
            take two arguments, the request and response objects. The return value
            of the function must be the updated response object.
    
            Example::
    
                @app.route('/')
                def index(request):
                    # register a request-specific after request handler
                    @req.after_request
                    def func(request, response):
                        # ...
                        return response
    
                    return 'Hello, World!'
    
            Note that the function is not called if the request handler raises an
            exception and an error response is returned instead.
            """
            self.after_request_handlers.append(f)
            return f
    
        @staticmethod
        def _safe_readline(stream):
            line = stream.readline(Request.max_readline + 1)
            if len(line) > Request.max_readline:
                raise ValueError('line too long')
            return line
    
    
    class Response():
        """An HTTP response class.
    
        :param body: The body of the response. If a dictionary or list is given,
                     a JSON formatter is used to generate the body. If a file-like
                     object or a generator is given, a streaming response is used.
                     If a string is given, it is encoded from UTF-8. Else, the
                     body should be a byte sequence.
        :param status_code: The numeric HTTP status code of the response. The
                            default is 200.
        :param headers: A dictionary of headers to include in the response.
        :param reason: A custom reason phrase to add after the status code. The
                       default is "OK" for responses with a 200 status code and
                       "N/A" for any other status codes.
        """
        types_map = {
            'css': 'text/css',
            'gif': 'image/gif',
            'html': 'text/html',
            'jpg': 'image/jpeg',
            'js': 'application/javascript',
            'json': 'application/json',
            'png': 'image/png',
            'txt': 'text/plain',
        }
        send_file_buffer_size = 1024
    
        #: The content type to use for responses that do not explicitly define a
        #: ``Content-Type`` header.
        default_content_type = 'text/plain'
    
        #: The default cache control max age used by :meth:`send_file`. A value
        #: of ``None`` means that no ``Cache-Control`` header is added.
        default_send_file_max_age = None
    
        #: Special response used to signal that a response does not need to be
        #: written to the client. Used to exit WebSocket connections cleanly.
        already_handled = None
    
        def __init__(self, body='', status_code=200, headers=None, reason=None):
            if body is None and status_code == 200:
                body = ''
                status_code = 204
            self.status_code = status_code
            self.headers = NoCaseDict(headers or {})
            self.reason = reason
            if isinstance(body, (dict, list)):
                self.body = json.dumps(body).encode()
                self.headers['Content-Type'] = 'application/json; charset=UTF-8'
            elif isinstance(body, str):
                self.body = body.encode()
            else:
                # this applies to bytes, file-like objects or generators
                self.body = body
            self.is_head = False
    
        def set_cookie(self, cookie, value, path=None, domain=None, expires=None,
                       max_age=None, secure=False, http_only=False):
            """Add a cookie to the response.
    
            :param cookie: The cookie's name.
            :param value: The cookie's value.
            :param path: The cookie's path.
            :param domain: The cookie's domain.
            :param expires: The cookie expiration time, as a ``datetime`` object
                            or a correctly formatted string.
            :param max_age: The cookie's ``Max-Age`` value.
            :param secure: The cookie's ``secure`` flag.
            :param http_only: The cookie's ``HttpOnly`` flag.
            """
            http_cookie = '{cookie}={value}'.format(cookie=cookie, value=value)
            if path:
                http_cookie += '; Path=' + path
            if domain:
                http_cookie += '; Domain=' + domain
            if expires:
                if isinstance(expires, str):
                    http_cookie += '; Expires=' + expires
                else:
                    http_cookie += '; Expires=' + expires.strftime(
                        '%a, %d %b %Y %H:%M:%S GMT')
            if max_age:
                http_cookie += '; Max-Age=' + str(max_age)
            if secure:
                http_cookie += '; Secure'
            if http_only:
                http_cookie += '; HttpOnly'
            if 'Set-Cookie' in self.headers:
                self.headers['Set-Cookie'].append(http_cookie)
            else:
                self.headers['Set-Cookie'] = [http_cookie]
    
        def complete(self):
            if isinstance(self.body, bytes) and \
                    'Content-Length' not in self.headers:
                self.headers['Content-Length'] = str(len(self.body))
            if 'Content-Type' not in self.headers:
                self.headers['Content-Type'] = self.default_content_type
                if 'charset=' not in self.headers['Content-Type']:
                    self.headers['Content-Type'] += '; charset=UTF-8'
    
        def write(self, stream):
            self.complete()
    
            # status code
            reason = self.reason if self.reason is not None else \
                ('OK' if self.status_code == 200 else 'N/A')
            stream.write('HTTP/1.0 {status_code} {reason}\r\n'.format(
                status_code=self.status_code, reason=reason).encode())
    
            # headers
            for header, value in self.headers.items():
                values = value if isinstance(value, list) else [value]
                for value in values:
                    stream.write('{header}: {value}\r\n'.format(
                        header=header, value=value).encode())
            stream.write(b'\r\n')
    
            # body
            if not self.is_head:
                can_flush = hasattr(stream, 'flush')
                try:
                    for body in self.body_iter():
                        if isinstance(body, str):  # pragma: no cover
                            body = body.encode()
                        stream.write(body)
                        if can_flush:  # pragma: no cover
                            stream.flush()
                except OSError as exc:  # pragma: no cover
                    if exc.errno in MUTED_SOCKET_ERRORS:
                        pass
                    else:
                        raise
    
        def body_iter(self):
            if self.body:
                if hasattr(self.body, 'read'):
                    while True:
                        buf = self.body.read(self.send_file_buffer_size)
                        if len(buf):
                            yield buf
                        if len(buf) < self.send_file_buffer_size:
                            break
                    if hasattr(self.body, 'close'):  # pragma: no cover
                        self.body.close()
                elif hasattr(self.body, '__next__'):
                    yield from self.body
                else:
                    yield self.body
    
        @classmethod
        def redirect(cls, location, status_code=302):
            """Return a redirect response.
    
            :param location: The URL to redirect to.
            :param status_code: The 3xx status code to use for the redirect. The
                                default is 302.
            """
            if '\x0d' in location or '\x0a' in location:
                raise ValueError('invalid redirect URL')
            return cls(status_code=status_code, headers={'Location': location})
    
        @classmethod
        def send_file(cls, filename, status_code=200, content_type=None,
                      stream=None, max_age=None, compressed=False,
                      file_extension=''):
            """Send file contents in a response.
    
            :param filename: The filename of the file.
            :param status_code: The 3xx status code to use for the redirect. The
                                default is 302.
            :param content_type: The ``Content-Type`` header to use in the
                                 response. If omitted, it is generated
                                 automatically from the file extension of the
                                 ``filename`` parameter.
            :param stream: A file-like object to read the file contents from. If
                           a stream is given, the ``filename`` parameter is only
                           used when generating the ``Content-Type`` header.
            :param max_age: The ``Cache-Control`` header's ``max-age`` value in
                            seconds. If omitted, the value of the
                            :attr:`Response.default_send_file_max_age` attribute is
                            used.
            :param compressed: Whether the file is compressed. If ``True``, the
                               ``Content-Encoding`` header is set to ``gzip``. A
                               string with the header value can also be passed.
                               Note that when using this option the file must have
                               been compressed beforehand. This option only sets
                               the header.
            :param file_extension: A file extension to append to the ``filename``
                                   parameter when opening the file, including the
                                   dot. The extension given here is not considered
                                   when generating the ``Content-Type`` header.
    
            Security note: The filename is assumed to be trusted. Never pass
            filenames provided by the user without validating and sanitizing them
            first.
            """
            if content_type is None:
                ext = filename.split('.')[-1]
                if ext in Response.types_map:
                    content_type = Response.types_map[ext]
                else:
                    content_type = 'application/octet-stream'
            headers = {'Content-Type': content_type}
    
            if max_age is None:
                max_age = cls.default_send_file_max_age
            if max_age is not None:
                headers['Cache-Control'] = 'max-age={}'.format(max_age)
    
            if compressed:
                headers['Content-Encoding'] = compressed \
                    if isinstance(compressed, str) else 'gzip'
    
            f = stream or open(filename + file_extension, 'rb')
            return cls(body=f, status_code=status_code, headers=headers)
    
    
    class URLPattern():
        def __init__(self, url_pattern):
            self.url_pattern = url_pattern
            self.pattern = ''
            self.args = []
            use_regex = False
            for segment in url_pattern.lstrip('/').split('/'):
                if segment and segment[0] == '<':
                    if segment[-1] != '>':
                        raise ValueError('invalid URL pattern')
                    segment = segment[1:-1]
                    if ':' in segment:
                        type_, name = segment.rsplit(':', 1)
                    else:
                        type_ = 'string'
                        name = segment
                    if type_ == 'string':
                        pattern = '[^/]+'
                    elif type_ == 'int':
                        pattern = '-?\\d+'
                    elif type_ == 'path':
                        pattern = '.+'
                    elif type_.startswith('re:'):
                        pattern = type_[3:]
                    else:
                        raise ValueError('invalid URL segment type')
                    use_regex = True
                    self.pattern += '/({pattern})'.format(pattern=pattern)
                    self.args.append({'type': type_, 'name': name})
                else:
                    self.pattern += '/{segment}'.format(segment=segment)
            if use_regex:
                self.pattern = re.compile('^' + self.pattern + '$')
    
        def match(self, path):
            if isinstance(self.pattern, str):
                if path != self.pattern:
                    return
                return {}
            g = self.pattern.match(path)
            if not g:
                return
            args = {}
            i = 1
            for arg in self.args:
                value = g.group(i)
                if arg['type'] == 'int':
                    value = int(value)
                args[arg['name']] = value
                i += 1
            return args
    
    
    class HTTPException(Exception):
        def __init__(self, status_code, reason=None):
            self.status_code = status_code
            self.reason = reason or str(status_code) + ' error'
    
        def __repr__(self):  # pragma: no cover
            return 'HTTPException: {}'.format(self.status_code)
    
    
    class Microdot():
        """An HTTP application class.
    
        This class implements an HTTP application instance and is heavily
        influenced by the ``Flask`` class of the Flask framework. It is typically
        declared near the start of the main application script.
    
        Example::
    
            from microdot import Microdot
    
            app = Microdot()
        """
    
        def __init__(self):
            self.url_map = []
            self.before_request_handlers = []
            self.after_request_handlers = []
            self.after_error_request_handlers = []
            self.error_handlers = {}
            self.shutdown_requested = False
            self.options_handler = self.default_options_handler
            self.debug = False
            self.server = None
    
        def route(self, url_pattern, methods=None):
            """Decorator that is used to register a function as a request handler
            for a given URL.
    
            :param url_pattern: The URL pattern that will be compared against
                                incoming requests.
            :param methods: The list of HTTP methods to be handled by the
                            decorated function. If omitted, only ``GET`` requests
                            are handled.
    
            The URL pattern can be a static path (for example, ``/users`` or
            ``/api/invoices/search``) or a path with dynamic components enclosed
            in ``<`` and ``>`` (for example, ``/users/<id>`` or
            ``/invoices/<number>/products``). Dynamic path components can also
            include a type prefix, separated from the name with a colon (for
            example, ``/users/<int:id>``). The type can be ``string`` (the
            default), ``int``, ``path`` or ``re:[regular-expression]``.
    
            The first argument of the decorated function must be
            the request object. Any path arguments that are specified in the URL
            pattern are passed as keyword arguments. The return value of the
            function must be a :class:`Response` instance, or the arguments to
            be passed to this class.
    
            Example::
    
                @app.route('/')
                def index(request):
                    return 'Hello, world!'
            """
            def decorated(f):
                self.url_map.append(
                    ([m.upper() for m in (methods or ['GET'])],
                     URLPattern(url_pattern), f))
                return f
            return decorated
    
        def get(self, url_pattern):
            """Decorator that is used to register a function as a ``GET`` request
            handler for a given URL.
    
            :param url_pattern: The URL pattern that will be compared against
                                incoming requests.
    
            This decorator can be used as an alias to the ``route`` decorator with
            ``methods=['GET']``.
    
            Example::
    
                @app.get('/users/<int:id>')
                def get_user(request, id):
                    # ...
            """
            return self.route(url_pattern, methods=['GET'])
    
        def post(self, url_pattern):
            """Decorator that is used to register a function as a ``POST`` request
            handler for a given URL.
    
            :param url_pattern: The URL pattern that will be compared against
                                incoming requests.
    
            This decorator can be used as an alias to the``route`` decorator with
            ``methods=['POST']``.
    
            Example::
    
                @app.post('/users')
                def create_user(request):
                    # ...
            """
            return self.route(url_pattern, methods=['POST'])
    
        def put(self, url_pattern):
            """Decorator that is used to register a function as a ``PUT`` request
            handler for a given URL.
    
            :param url_pattern: The URL pattern that will be compared against
                                incoming requests.
    
            This decorator can be used as an alias to the ``route`` decorator with
            ``methods=['PUT']``.
    
            Example::
    
                @app.put('/users/<int:id>')
                def edit_user(request, id):
                    # ...
            """
            return self.route(url_pattern, methods=['PUT'])
    
        def patch(self, url_pattern):
            """Decorator that is used to register a function as a ``PATCH`` request
            handler for a given URL.
    
            :param url_pattern: The URL pattern that will be compared against
                                incoming requests.
    
            This decorator can be used as an alias to the ``route`` decorator with
            ``methods=['PATCH']``.
    
            Example::
    
                @app.patch('/users/<int:id>')
                def edit_user(request, id):
                    # ...
            """
            return self.route(url_pattern, methods=['PATCH'])
    
        def delete(self, url_pattern):
            """Decorator that is used to register a function as a ``DELETE``
            request handler for a given URL.
    
            :param url_pattern: The URL pattern that will be compared against
                                incoming requests.
    
            This decorator can be used as an alias to the ``route`` decorator with
            ``methods=['DELETE']``.
    
            Example::
    
                @app.delete('/users/<int:id>')
                def delete_user(request, id):
                    # ...
            """
            return self.route(url_pattern, methods=['DELETE'])
    
        def before_request(self, f):
            """Decorator to register a function to run before each request is
            handled. The decorated function must take a single argument, the
            request object.
    
            Example::
    
                @app.before_request
                def func(request):
                    # ...
            """
            self.before_request_handlers.append(f)
            return f
    
        def after_request(self, f):
            """Decorator to register a function to run after each request is
            handled. The decorated function must take two arguments, the request
            and response objects. The return value of the function must be an
            updated response object.
    
            Example::
    
                @app.after_request
                def func(request, response):
                    # ...
                    return response
            """
            self.after_request_handlers.append(f)
            return f
    
        def after_error_request(self, f):
            """Decorator to register a function to run after an error response is
            generated. The decorated function must take two arguments, the request
            and response objects. The return value of the function must be an
            updated response object. The handler is invoked for error responses
            generated by Microdot, as well as those returned by application-defined
            error handlers.
    
            Example::
    
                @app.after_error_request
                def func(request, response):
                    # ...
                    return response
            """
            self.after_error_request_handlers.append(f)
            return f
    
        def errorhandler(self, status_code_or_exception_class):
            """Decorator to register a function as an error handler. Error handler
            functions for numeric HTTP status codes must accept a single argument,
            the request object. Error handler functions for Python exceptions
            must accept two arguments, the request object and the exception
            object.
    
            :param status_code_or_exception_class: The numeric HTTP status code or
                                                   Python exception class to
                                                   handle.
    
            Examples::
    
                @app.errorhandler(404)
                def not_found(request):
                    return 'Not found'
    
                @app.errorhandler(RuntimeError)
                def runtime_error(request, exception):
                    return 'Runtime error'
            """
            def decorated(f):
                self.error_handlers[status_code_or_exception_class] = f
                return f
            return decorated
    
        def mount(self, subapp, url_prefix=''):
            """Mount a sub-application, optionally under the given URL prefix.
    
            :param subapp: The sub-application to mount.
            :param url_prefix: The URL prefix to mount the application under.
            """
            for methods, pattern, handler in subapp.url_map:
                self.url_map.append(
                    (methods, URLPattern(url_prefix + pattern.url_pattern),
                     handler))
            for handler in subapp.before_request_handlers:
                self.before_request_handlers.append(handler)
            for handler in subapp.after_request_handlers:
                self.after_request_handlers.append(handler)
            for handler in subapp.after_error_request_handlers:
                self.after_error_request_handlers.append(handler)
            for status_code, handler in subapp.error_handlers.items():
                self.error_handlers[status_code] = handler
    
        @staticmethod
        def abort(status_code, reason=None):
            """Abort the current request and return an error response with the
            given status code.
    
            :param status_code: The numeric status code of the response.
            :param reason: The reason for the response, which is included in the
                           response body.
    
            Example::
    
                from microdot import abort
    
                @app.route('/users/<int:id>')
                def get_user(id):
                    user = get_user_by_id(id)
                    if user is None:
                        abort(404)
                    return user.to_dict()
            """
            raise HTTPException(status_code, reason)
    
        def run(self, host='0.0.0.0', port=5000, debug=False, ssl=None):
            """Start the web server. This function does not normally return, as
            the server enters an endless listening loop. The :func:`shutdown`
            function provides a method for terminating the server gracefully.
    
            :param host: The hostname or IP address of the network interface that
                         will be listening for requests. A value of ``'0.0.0.0'``
                         (the default) indicates that the server should listen for
                         requests on all the available interfaces, and a value of
                         ``127.0.0.1`` indicates that the server should listen
                         for requests only on the internal networking interface of
                         the host.
            :param port: The port number to listen for requests. The default is
                         port 5000.
            :param debug: If ``True``, the server logs debugging information. The
                          default is ``False``.
            :param ssl: An ``SSLContext`` instance or ``None`` if the server should
                        not use TLS. The default is ``None``.
    
            Example::
    
                from microdot import Microdot
    
                app = Microdot()
    
                @app.route('/')
                def index(request):
                    return 'Hello, world!'
    
                app.run(debug=True)
            """
            self.debug = debug
            self.shutdown_requested = False
    
            self.server = socket.socket()
            ai = socket.getaddrinfo(host, port)
            addr = ai[0][-1]
    
            if self.debug:  # pragma: no cover
                print('Starting {mode} server on {host}:{port}...'.format(
                    mode=concurrency_mode, host=host, port=port))
            self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            self.server.bind(addr)
            self.server.listen(5)
    
            if ssl:
                self.server = ssl.wrap_socket(self.server, server_side=True)
    
            while not self.shutdown_requested:
                try:
                    sock, addr = self.server.accept()
                except OSError as exc:  # pragma: no cover
                    if exc.errno == errno.ECONNABORTED:
                        break
                    else:
                        print_exception(exc)
                except Exception as exc:  # pragma: no cover
                    print_exception(exc)
                else:
                    create_thread(self.handle_request, sock, addr)
    
        def shutdown(self):
            """Request a server shutdown. The server will then exit its request
            listening loop and the :func:`run` function will return. This function
            can be safely called from a route handler, as it only schedules the
            server to terminate as soon as the request completes.
    
            Example::
    
                @app.route('/shutdown')
                def shutdown(request):
                    request.app.shutdown()
                    return 'The server is shutting down...'
            """
            self.shutdown_requested = True
    
        def find_route(self, req):
            method = req.method.upper()
            if method == 'OPTIONS' and self.options_handler:
                return self.options_handler(req)
            if method == 'HEAD':
                method = 'GET'
            f = 404
            for route_methods, route_pattern, route_handler in self.url_map:
                req.url_args = route_pattern.match(req.path)
                if req.url_args is not None:
                    if method in route_methods:
                        f = route_handler
                        break
                    else:
                        f = 405
            return f
    
        def default_options_handler(self, req):
            allow = []
            for route_methods, route_pattern, route_handler in self.url_map:
                if route_pattern.match(req.path) is not None:
                    allow.extend(route_methods)
            if 'GET' in allow:
                allow.append('HEAD')
            allow.append('OPTIONS')
            return {'Allow': ', '.join(allow)}
    
        def handle_request(self, sock, addr):
            if Request.socket_read_timeout and \
                    hasattr(sock, 'settimeout'):  # pragma: no cover
                sock.settimeout(Request.socket_read_timeout)
            if not hasattr(sock, 'readline'):  # pragma: no cover
                stream = sock.makefile("rwb")
            else:
                stream = sock
    
            req = None
            res = None
            try:
                req = Request.create(self, stream, addr, sock)
                res = self.dispatch_request(req)
            except socket_timeout_error as exc:  # pragma: no cover
                if exc.errno and exc.errno != errno.ETIMEDOUT:
                    print_exception(exc)  # not a timeout
            except Exception as exc:  # pragma: no cover
                print_exception(exc)
            try:
                if res and res != Response.already_handled:  # pragma: no branch
                    res.write(stream)
                stream.close()
            except OSError as exc:  # pragma: no cover
                if exc.errno in MUTED_SOCKET_ERRORS:
                    pass
                else:
                    print_exception(exc)
            except Exception as exc:  # pragma: no cover
                print_exception(exc)
            if stream != sock:  # pragma: no cover
                sock.close()
            if self.shutdown_requested:  # pragma: no cover
                self.server.close()
            if self.debug and req:  # pragma: no cover
                print('{method} {path} {status_code}'.format(
                    method=req.method, path=req.path,
                    status_code=res.status_code))
    
        def dispatch_request(self, req):
            after_request_handled = False
            if req:
                if req.content_length > req.max_content_length:
                    if 413 in self.error_handlers:
                        res = self.error_handlers[413](req)
                    else:
                        res = 'Payload too large', 413
                else:
                    f = self.find_route(req)
                    try:
                        res = None
                        if callable(f):
                            for handler in self.before_request_handlers:
                                res = handler(req)
                                if res:
                                    break
                            if res is None:
                                res = f(req, **req.url_args)
                            if isinstance(res, tuple):
                                body = res[0]
                                if isinstance(res[1], int):
                                    status_code = res[1]
                                    headers = res[2] if len(res) > 2 else {}
                                else:
                                    status_code = 200
                                    headers = res[1]
                                res = Response(body, status_code, headers)
                            elif not isinstance(res, Response):
                                res = Response(res)
                            for handler in self.after_request_handlers:
                                res = handler(req, res) or res
                            for handler in req.after_request_handlers:
                                res = handler(req, res) or res
                            after_request_handled = True
                        elif isinstance(f, dict):
                            res = Response(headers=f)
                        elif f in self.error_handlers:
                            res = self.error_handlers[f](req)
                        else:
                            res = 'Not found', f
                    except HTTPException as exc:
                        if exc.status_code in self.error_handlers:
                            res = self.error_handlers[exc.status_code](req)
                        else:
                            res = exc.reason, exc.status_code
                    except Exception as exc:
                        print_exception(exc)
                        exc_class = None
                        res = None
                        if exc.__class__ in self.error_handlers:
                            exc_class = exc.__class__
                        else:
                            for c in mro(exc.__class__)[1:]:
                                if c in self.error_handlers:
                                    exc_class = c
                                    break
                        if exc_class:
                            try:
                                res = self.error_handlers[exc_class](req, exc)
                            except Exception as exc2:  # pragma: no cover
                                print_exception(exc2)
                        if res is None:
                            if 500 in self.error_handlers:
                                res = self.error_handlers[500](req)
                            else:
                                res = 'Internal server error', 500
            else:
                if 400 in self.error_handlers:
                    res = self.error_handlers[400](req)
                else:
                    res = 'Bad request', 400
    
            if isinstance(res, tuple):
                res = Response(*res)
            elif not isinstance(res, Response):
                res = Response(res)
            if not after_request_handled:
                for handler in self.after_error_request_handlers:
                    res = handler(req, res) or res
            res.is_head = (req and req.method == 'HEAD')
            return res
    
    
    abort = Microdot.abort
    Response.already_handled = Response()
    redirect = Response.redirect
    send_file = Response.send_file