Poor mans autoreload server in single file python on linux



Introduction

In this post I wanted to explore creating a simple python server with automatic browser page reload on content change. There are many livereload servers in the wild and I used quite a few of them. For example Hugo has a local server with auto reload, javascript developers have pretty good local servers when using react and etc.

But all these servers requre setup, download a lot of files, download runtimes or to put simply they have some friction. This is fine if used for a bigger project and I do use them but I quite often do some crazy experiments with js/css/html and don’t want to any setup, configure settings or do anything not related to the idea. My most favourite tool for this is python -m http.server

It is not the best tool but a tool that happens to be available on all my copmuters and it just works(Long time ago when I did php development ‘php -S’ was also good enough). I waited for several years and not after not finding anything that scratches my itch, during on of such experiments I decided to spend couple to create it myself for my own needs. Of course it took more than couple hours but it is regular programmer self lie.

Disclaimer: Code in this projet is bad, architecture is bad, error handling is bad and don’t use this in production. But, I used it for my own local project and it made MY development time happier.

Setup

For this to work we need three things: a simple server, websockets and inotify. Websockets are needed because web browsers do not give access to regular sockets and any full duplex communication should be done through websockets. For this project we do not need full duplex and half duplex communication like SSE (Server Sent Events) should be enough but I chose websockets and will stick with it.

Inotify is an API to monitor file system events. It is faster to inotify compared to manually monitoring file date changes and is not that hard to implement manually without using any additional packages. Even though it is not the fastest solution it is more widespread and probably present in the system.

Inotify

When I was building I left inotify for the last part but here we will start with inotify because it is simpler and will be easier to explain in this order. First let’s create a class SmallInotify and inner class Flags for encapsulation.

class SmallINotify():
    class Flags():
        # NOTE: Values from inotify.h. There are more values but these are the ones we can wait on
        ACCESS        = 0x00000001    # File was accessed.
        MODIFY        = 0x00000002    # File was modified.
        ATTRIB        = 0x00000004    # Metadata changed.
        CLOSE_WRITE   = 0x00000008    # Writtable file was closed.
        CLOSE_NOWRITE = 0x00000010    # Unwrittable file closed.
        OPEN          = 0x00000020    # File was opened.
        MOVED_FROM    = 0x00000040    # File was moved from X.
        MOVED_TO      = 0x00000080    # File was moved to Y.
        CREATE        = 0x00000100    # Subfile was created.
        DELETE        = 0x00000200    # Subfile was deleted.
        DELETE_SELF   = 0x00000400    # Self was deleted.
        MOVE_SELF     = 0x00000800    # Self was moved.

We won’t need all this event types but I added them for later experiments. For this task we need only MODIFY, CREATE and DELETE flags.

From inotify we will need only two function inotify_init and inotify_add_watch. There is inotify_init1 function for more control but won’t be needed in our example. Since these are not built into python we need to get it from libc manually.

In the instance inialization we first manually load libc and then get inotify event queue. At the same time we setup system to wait for data on that event queue and notify us one it has some data for us to process.

def __init__(self):
    try: libc_so = ctypes.util.find_library('c')
    except: libc_so = None
    self._libc = ctypes.CDLL(libc_so or 'libc.so.6', use_errno=True)
    self._inotify_event_queue_fd = self._libc_call(self._libc.inotify_init)
    self._poller = select.poll()
    self._poller.register(self._inotify_event_queue_fd)

def _libc_call(self, function, *args):
    rc = function(*args)
    return rc

Now we create a method to read data when data is available. read() will try to ready everything from the queue and if there were no data wait/block untill there is there is some changes to the filesystem. In this example, once there is a change it will just return the size of read data without any processing because we are not interesed (currently) in differentiating events. Any change in the watched folders will result in later reload. (If you are want to have different actions for different event type then you would have to parse it and return event types)

def read(self):
    data = self._readall()
    timeout = -1
    if not data  and self._poller.poll(timeout):
        data = self._readall()
    return len(data) # TODO: Actually parse data and return list of events?

def _readall(self):
    bytes_avail = ctypes.c_int()
    fcntl.ioctl(self._inotify_event_queue_fd, termios.FIONREAD, bytes_avail)
    if not bytes_avail.value:
        return b''
    return os.read(self._inotify_event_queue_fd, bytes_avail.value)

And now methods to setup watching directories with inotify_add_watch. For my initial setup I used just a single watch directly as my structure was flat but if you have folders and subfolders then you need to watch them all recursively. Since we are talking about small ’toy’ projects then it should not be a problem. inotify_add_watch will add specified directory to the list of directories to be watched and all changes will come back to us through single event queue.

    def add_watch(self, path, mask):
        return self._libc_call(self._libc.inotify_add_watch, self._inotify_event_queue_fd, os.fsencode(path), mask)

    def add_watch_recursive(self, root, mask):
        # TODO: Add exclude dirs. Remove '.git', maybe all hidden files and just add a param to filter
        self.add_watch(root, mask)
        for current_root, dirs, files in os.walk(root):
            for _dir in dirs:
                dir_path = os.path.join(current_root, _dir)
                self.add_watch(dir_path, mask)

And this is pretty much it with inotify. Of course there is more to be done like remove deleted directories from watch list and etc but for our usecase these are more than enough.

Server

Initially, during experiment phase, I split file server and websocket server into different servers which listened on different ports but then I quickly came to conclusion that a better approach is to have one server. The server be just a regular file server but will serve websocket messages over a specified path.

class Server:
    def __init__(self, listen_port = 5000):
        self.listen_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.listen_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.listen_socket.bind(('', listen_port)) 
        self.listen_socket.listen(10)

        self.event_obj = threading.Event()

        inotify = SmallINotify()
        inotify.add_watch_recursive('.', inotify.Flags.CREATE | inotify.Flags.MODIFY  | inotify.Flags.DELETE)
        thread = threading.Thread(target = self.check_filechanges_loop, args=(inotify,))
        thread.start()

        while True:
            client_socket, client_address = self.listen_socket.accept()
            thread = threading.Thread(
                target = self.process_request,
                args = (client_socket, client_address, self.event_obj),
            )
            thread.start()

First of all we create a listening socket that will be listening on port 5000 by default and broker all other connections. event_obj is a thread event object for thread synchronization to fire update event to all client serving threads. Threading might not be the best approach to serving many clients but good enough for local setup in my more than one week testing didn’t show any obvius limitations. And using just one event object is good enough since we care about only one event type.

Then we setup filesystem monitoring and pass it to a separate thread whose sole job is to monitor file system events and notify other threads about changes. Again, since we have only one event type that we care about it is pretty simple and does not require any complex management.

And at the very end we just start an endless loop waiting for new connections and when clients connect just allocate a separate thread for that. Each client processing thread will be responsible for understanding what client actually wanted and setup websocket connections if needed.

def process_request(self, client_socket, client_address, event_obj):
    request_str = client_socket.recv(MAX_REQUEST_LENGTH)
    request_str = request_str.decode('utf-8')

    parts = request_str.split('\r\n\r\n')

    raw_headers = parts[0]
    raw_body = parts[1]

    header_parts =  raw_headers.split('\r\n')
    request_line = header_parts[0]

    method, path, http_version = request_line.split(' ')

    if path == WEBSOCKET_PATH:
        headers = self.headers_str_to_map(raw_headers)
        self.websocket_init_and_process(client_socket, headers)
    else:
        file_path = self.get_file_path(path)
        self.process_file_request(client_socket, file_path)

    client_socket.close()

Here we do manual request processing by getting all the request HTTP data and splitting it into request lines, headers and body. Of course we should have checked for errors and for bad requests from client but to reduce code size I decided against it.

So when request comes we check for the request path. If it is /websocket (which comes from a specific js code shown later) then it be processed by a websocket_init_and_process which will initialize the connection and wait int a loop to send events. If the path is anything else then it will assume that it is a regular “file” request and process it accordingly.

def process_file_request(self, client_socket, file_path):
    if os.path.exists(file_path):
        with open(file_path, 'rb') as f:
            data = f.read()
        if file_path == 'index.html':
            data += EXTRA_INDEX_DATA.encode()
        self.send_response(client_socket, 200, 'OK', "text/html", data)
        print('served: {}'.format(file_path))
    else:
        output_data = "File {} not found".format(file_path)
        message = "File not found"
        self.send_response(client_socket, 404, message, "text/html", output_data.encode())
        print('Not found: {}'.format(file_path))

The file handling part of the code is very straightforward. Take that path, check if it is available in the system and if is available just load it as bytes and send over the network to the client. Otherwise assume that is is missing and send back 404 error response.

Now comes websocket request processsing.

WEBSOCKET_MAGIC_KEY = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'

def websocket_init_and_process(self, client_socket, headers):
    client_key = headers.get('sec-websocket-key', '')
    combined_key = client_key + WEBSOCKET_MAGIC_KEY
    hashed_combined_key = hashlib.sha1(combined_key.encode())
    encoded_key = base64.b64encode(hashed_combined_key.digest())

    output = "HTTP/1.1 {} {}\r\n".format(101, "Switching protocols")
    output += "Upgrade: Websocket\r\n"
    output += "Connection: Upgrade\r\n"
    output += "Sec-WebSocket-Accept: {}\r\n".format(encoded_key.decode())
    output += "\r\n"

    client_socket.send(output.encode())

    message = 'reload'
    message_data = bytes([
        0b10000001,  # Type TEXT
        len(message),
    ])
    message_data += message.encode()

    try:
        while self.event_obj.wait():
            client_socket.send(message_data)
            self.event_obj.clear() # TODO: This synchronization method is stupid. But good enough for now. Move everything to select(...)
    except:
        pass # NOTE: Client probably refreshed and closed the socket.

It is a bit more involved but not too much. Initially websockets request start as a regular HTTTP request but send us a websocket client key which we combine with a special magic key and send it back to client to indicate that we are ready to do websocket communication. We prepare special binary binary header which indicate that we will be sending short text and extend it with ‘reload’ message. This ‘reload’ does not mean anything it is just arbitrary text that I chose to send to the client everytime our files are modified.

There is a special python package to do websocket communication properly but since our goal is to not use pip or other tools and have just one file I chose to use this handmade binary header. Good enough for this purpose.

The only question left is who starts the websocket session? Well back when we described file serving part of the server there was a EXTRA_INDEX_DATA that was appended to the end of the data if the path was index.html. EXTRA_INDEX_DATA contains some some little javascript snippet that does all the the websocket connection setup and processing. It is does it on every connection restart. Here is how it looks.

<script>
addEventListener('DOMContentLoaded', function() {
    const socket = new WebSocket('ws://localhost:5000/websocket');
    socket.addEventListener('message', function (event) {
        socket.close();
        window.location.reload();
    });
});
</script>

It’s been a while since I looked into HTML spec but it is probably against the specification and ‘spec pollice’ will be very unhappy but hey if it works it works. I had a limited budget and squized what I could.

Final code

Here is final code that you can copy into project root or put it into some some folder that is included in PATH and you can load it anytime.

#!/usr/bin/env python3
import socket
import threading
import os
import hashlib
import base64
import time

import ctypes
import select
import fcntl
import termios
import signal


WEBSOCKET_PATH = "/websocket"
MAX_REQUEST_LENGTH = 8*1024
WEBSOCKET_MAGIC_KEY = '258EAFA5-E914-47DA-95CA-C5AB0DC85B11'

EXTRA_INDEX_DATA = '''
<script>
addEventListener('DOMContentLoaded', function() {
    const socket = new WebSocket('ws://LISTENING_HOST_REPLACE:LISTENING_PORT_REPLACE/websocket');
    socket.addEventListener('message', function (event) {
        socket.close();
        window.location.reload();
    });
});
</script>
'''

class SmallINotify():
    class Flags():
        # NOTE: Values from inotify.h. There are more values but these are the ones we can wait on
        ACCESS        = 0x00000001    # File was accessed.
        MODIFY        = 0x00000002    # File was modified.
        ATTRIB        = 0x00000004    # Metadata changed.
        CLOSE_WRITE   = 0x00000008    # Writtable file was closed.
        CLOSE_NOWRITE = 0x00000010    # Unwrittable file closed.
        OPEN          = 0x00000020    # File was opened.
        MOVED_FROM    = 0x00000040    # File was moved from X.
        MOVED_TO      = 0x00000080    # File was moved to Y.
        CREATE        = 0x00000100    # Subfile was created.
        DELETE        = 0x00000200    # Subfile was deleted.
        DELETE_SELF   = 0x00000400    # Self was deleted.
        MOVE_SELF     = 0x00000800    # Self was moved.

    def __init__(self):
        try: libc_so = ctypes.util.find_library('c')
        except: libc_so = None
        self._libc = ctypes.CDLL(libc_so or 'libc.so.6', use_errno=True)
        self._inotify_event_queue_fd = self._libc_call(self._libc.inotify_init)
        self._poller = select.poll()
        self._poller.register(self._inotify_event_queue_fd)

    def _libc_call(self, function, *args):
        # TODO: error check for intterupts?. EINTR
        return_code = function(*args)
        return return_code

    def read(self):
        data = self._readall()
        timeout = -1
        if not data  and self._poller.poll(timeout):
            data = self._readall()
        return len(data) # TODO: Actually parse data and return list of events?

    def _readall(self):
        bytes_avail = ctypes.c_int()
        fcntl.ioctl(self._inotify_event_queue_fd, termios.FIONREAD, bytes_avail)
        if not bytes_avail.value:
            return b''
        return os.read(self._inotify_event_queue_fd, bytes_avail.value)

    def add_watch(self, path, mask):
        return self._libc_call(self._libc.inotify_add_watch, self._inotify_event_queue_fd, os.fsencode(path), mask)

    def add_watch_recursive(self, root, mask):
        # TODO: Add exclude dirs. Remove '.git', maybe all hidden files and just add a param to filter
        self.add_watch(root, mask)
        for current_root, dirs, files in os.walk(root):
            for _dir in dirs:
                dir_path = os.path.join(current_root, _dir)
                self.add_watch(dir_path, mask)



class Server:
    def __init__(self, host = "localhost", listen_port = 5000):
        self.host = host
        self.listen_port = listen_port
        self.listen_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.listen_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.listen_socket.bind((host, self.listen_port)) 
        self.listen_socket.listen(10)
        self.event_obj = threading.Event()


        print("serving content on {}:{}".format(host, listen_port));

        inotify = SmallINotify()
        watch_flags = inotify.Flags.CREATE | inotify.Flags.MODIFY  | inotify.Flags.DELETE
        wd = inotify.add_watch('.', watch_flags)

        thread = threading.Thread(target = self.check_filechanges_loop, args=(inotify,))
        thread.start()

        while True:
            client_socket, client_address = self.listen_socket.accept()
            thread = threading.Thread(
                target = self.process_request,
                args = (client_socket, client_address, self.event_obj),
            )
            thread.start()


    def check_filechanges_loop(self, inotify):
        while True:
            inotify.read() # NOTE: Read all events
            print('--------: Event');
            self.event_obj.set()

        # while True:
        #     time.sleep(2)
        #     self.event_obj.set()

    def process_file_request(self, client_socket, file_path):
        if os.path.exists(file_path):
            with open(file_path, 'rb') as f:
                data = f.read()
            if file_path == 'index.html':
                new_data = EXTRA_INDEX_DATA
                new_data = new_data.replace('LISTENING_HOST_REPLACE', str(self.host))
                new_data = new_data.replace('LISTENING_PORT_REPLACE', str(self.listen_port))
                data += new_data.encode()
            self.send_response(client_socket, 200, 'OK', "text/html", data)
            print('served: {}'.format(file_path))
        else:
            output_data = "File {} not found".format(file_path)
            message = "File not found"
            self.send_response(client_socket, 404, message, "text/html", output_data.encode())
            print('Not found: {}'.format(file_path))

    def websocket_init_and_process(self, client_socket, headers):
        # TODO: Verify that request is correct websocket initialization request

        client_key = headers.get('sec-websocket-key', '')
        combined_key = client_key + WEBSOCKET_MAGIC_KEY
        hashed_combined_key = hashlib.sha1(combined_key.encode())
        encoded_key = base64.b64encode(hashed_combined_key.digest())

        output = "HTTP/1.1 {} {}\r\n".format(101, "Switching protocols")
        output += "Upgrade: Websocket\r\n"
        output += "Connection: Upgrade\r\n"
        output += "Sec-WebSocket-Accept: {}\r\n".format(encoded_key.decode())
        output += "\r\n"

        client_socket.send(output.encode())

        message = 'reload'
        message_data = bytes([
            0b10000001,  # Type TEXT
            len(message),
        ])
        message_data += message.encode()

        try:
            while self.event_obj.wait():
                client_socket.send(message_data)
                self.event_obj.clear() # TODO: This synchronization method is stupid. But good enough for now. Move everything to select(...)
        except:
            pass # NOTE: Client probably refreshed and closed the socket.

    def headers_str_to_map(self, headers_str):
        lines = headers_str.split('\r\n')
        result = {}
        for line in lines[1:]:
            key, value = line.split(':', 1)
            result[key.lower()] = value.strip()
        return result


    def process_request(self, client_socket, client_address, event_obj):
        # TODO: Do it in a loop
        request_str = client_socket.recv(MAX_REQUEST_LENGTH)
        request_str = request_str.decode('utf-8')

        #TODO: break if no parts available
        parts = request_str.split('\r\n\r\n')

        raw_headers = parts[0]
        raw_body = parts[1]

        header_parts =  raw_headers.split('\r\n')
        request_line = header_parts[0]

        method, path, http_version = request_line.split(' ')

        if path == WEBSOCKET_PATH:
            headers = self.headers_str_to_map(raw_headers)
            self.websocket_init_and_process(client_socket, headers)
        else:
            file_path = self.get_file_path(path)
            self.process_file_request(client_socket, file_path)

        client_socket.close()

    def send_response(self, client_socket, code, message, content_type, data):
        output = "HTTP/1.1 {} {}\r\n".format(code, message)
        # output += b"Content-Type: {}\r\n".format(content_type,)
        output += "\r\n"
        output = output.encode()
        output += data

        client_socket.send(output)

    def get_file_path(self, path):
        file_path = path[1:]
        if path == '/':
            file_path = 'index.html'
        return file_path

def signal_handler(sig, frame):
    os._exit(1)

if __name__ == '__main__':
    signal.signal(signal.SIGINT, signal_handler)
    Server()

Here is another video showing this script serving content on different devices on local network.

Conclusion

Final result turned out to be pretty good and very fun to use. I have built several small toys with it and it increased my productivity. Ofcourse this solution is not for everybody and not for production but pretty good and fun to use for local development.

I want to remind that this code will work only linux because I built it for myself, I don’t use Windows much and Mac is used only for iOS development. There probably will be at most 10 people interested in this approach and if it helps you save some time and have some fun then this post reached its goal. If for some reason you want to do this on a Mac or Windows and don’t know how too approach it yourself you can ping me and I will look at it on one of the weekends.