Streaming Data with Tornado and WebSockets
A lot of data science and machine learning practice assumes a static dataset, maybe with some MLOps tooling for rerunning a model pipeline with the freshest version of the dataset.
Working with streaming data is an entirely different ball game, and it wasn’t clear to me what tools a data scientist might reach for when dealing with streaming data1.
I recently came across a pretty straightforward and robust solution: WebSockets and Tornado. Tornado is a Python web framework with strong support for asynchronous networking. WebSockets are a way for two processes (or apps) to communicate with each other (similar to HTTP requests with REST endpoints). Of course, Tornado has pretty good support for WebSockets as well.
In this blog post I’ll give a minimal example of using Tornado and WebSockets
to handle streaming data. The toy example I have is one app (server.py
)
writing samples of a Bernoulli to a WebSocket, and another app (client.py
)
listening to the WebSocket and keeping track of the posterior distribution for
a Beta-Binomial conjugate model.
After walking through the code, I’ll discuss these tools, and why they’re good
choices for working with streaming data.
For another tutorial on this same topic, you can check out proft
’s blog
post.
Server
- When
WebSocketServer
is registered to a REST endpoint (inmain
), it keeps track of any processes who are listening to that endpoint, and pushes messages to them whensend_message
is called.- Note that
clients
is a class variable, sosend_message
is a class method. - This class could be extended to also listen to the endpoint, instead of just blindly pushing messages out — after all, WebSockets allow for bidirectional data flow.
- Note that
- The
RandomBernoulli
andPeriodicCallback
make a pretty crude example, but you could write a class that transmits data in real-time to suit your use case. For example, you could watch a file for any modifications usingwatchdog
, and dump the changes into the WebSocket. - The
websocket_ping_interval
andwebsocket_ping_timeout
arguments totornado.Application
configure periodic pings of WebSocket connections, keeping connections alive and allowing dropped connections to be detected and closed. - It’s also worth noting that there’s a
tornado.websocket.WebSocketHandler.websocket_max_message_size
attribute. While this is set to a generous 10 MiB, it’s important that the WebSocket messages don’t exceed this limit!
""" Every 100ms, sample from a Bernoulli and write the value to a WebSocket. """
import random
import tornado.ioloop
import tornado.web
import tornado.websocket
class WebSocketServer(tornado.websocket.WebSocketHandler):
"""Simple WebSocket handler to serve clients."""
# Note that `clients` is a class variable and `send_message` is a
# classmethod.
clients = set()
def open(self):
WebSocketServer.clients.add(self)
def on_close(self):
WebSocketServer.clients.remove(self)
@classmethod
def send_message(cls, message: str):
print(f"Sending message {message} to {len(cls.clients)} client(s).")
for client in cls.clients:
client.write_message(message)
class RandomBernoulli:
def __init__(self):
self.p = 0.72
print(f"True p = {self.p}")
def sample(self):
return int(random.uniform(0, 1) <= self.p)
def main():
# Create a web app whose only endpoint is a WebSocket, and start the web
# app on port 8888.
app = tornado.web.Application(
[(r"/websocket/", WebSocketServer)],
websocket_ping_interval=10,
websocket_ping_timeout=30,
)
app.listen(8888)
# Create an event loop (what Tornado calls an IOLoop).
io_loop = tornado.ioloop.IOLoop.current()
# Before starting the event loop, instantiate a RandomBernoulli and
# register a periodic callback to write a sampled value to the WebSocket
# every 100ms.
random_bernoulli = RandomBernoulli()
periodic_callback = tornado.ioloop.PeriodicCallback(
lambda: WebSocketServer.send_message(str(random_bernoulli.sample())), 100
)
periodic_callback.start()
# Start the event loop.
io_loop.start()
if __name__ == "__main__":
main()
Client
WebSocketClient
is a class that:- Can be
start
ed andstop
ped to connect/disconnect to the WebSocket and start/stop listening to it in a separate thread - Can process every message (
on_message
) it hears from the WebSocket: in this case it simply maintains a count of the number of trials and successes, but this processing could theoretically be anything. For example, you could do some further processing of the message and then dump that into a separate WebSocket for other apps (or even users!) to subscribe to.
- Can be
- To connect to the WebSocket, we need to use a WebSocket library: thankfully
Tornado has a built-in WebSocket functionality (
tornado.websocket
), but we’re also free to use other libraries such as the creatively namedwebsockets
orwebsocket-client
. - Note that we run
on_message
on the same thread as we runconnect_and_read
. This isn’t a problem so long ason_message
is fast enough, but a potentially wiser choice would be to offloadconnect_and_read
to a separate thread by instantiating aconcurrent.futures.ThreadPoolExecutor
and callingtornado.ioloop.IOLoop.run_in_executor
, so as not to block the thread where theon_message
processing happens. - The
io_loop
instantiated inmain
(as well as inserver.py
) is important: it’s how Tornado schedules tasks (a.k.a. callbacks) for delayed (a.k.a. asynchronous) execution. To add a callback, we simply callio_loop.add_callback()
. - The
ping_interval
andping_timeout
arguments towebsocket_connect
configure periodic pings of the WebSocket connection, keeping connections alive and allowing dropped connections to be detected and closed. - The
callback=self.maybe_retry_connection
is run on a futureWebSocketClientConnection
.websocket_connect
doesn’t actually establish the connection directly, but rather returns a future. Hence, we try to get thefuture.result()
itself (i.e. the WebSocket client connection) — I don’t actually do anything with theself.connection
, but you could if you wanted. In the event of an exception while doing that, we assume there’s a problem with the WebSocket connection and retryconnect_and_read
after 3 seconds. This all has the effect of recovering gracefully if the WebSocket is dropped orserver.py
experiences a brief outage for whatever reason (both of which are probably inevitable for long-running apps using WebSockets).
""" Stream data from the WebSocket and update the Beta posterior parameters online. """
import tornado.ioloop
import tornado.websocket
class WebSocketClient:
def __init__(self, io_loop):
self.connection = None
self.io_loop = io_loop
self.num_successes = 0
self.num_trials = 0
def start(self):
self.connect_and_read()
def stop(self):
self.io_loop.stop()
def connect_and_read(self):
print("Reading...")
tornado.websocket.websocket_connect(
url=f"ws://localhost:8888/websocket/",
callback=self.maybe_retry_connection,
on_message_callback=self.on_message,
ping_interval=10,
ping_timeout=30,
)
def maybe_retry_connection(self, future) -> None:
try:
self.connection = future.result()
except:
print("Could not reconnect, retrying in 3 seconds...")
self.io_loop.call_later(3, self.connect_and_read)
def on_message(self, message):
if message is None:
print("Disconnected, reconnecting...")
self.connect_and_read()
message = int(message)
self.num_successes += message
self.num_trials += 1
alpha = 2 + self.num_successes
beta = 2 + self.num_trials - self.num_successes
mean = self.num_successes / self.num_trials
print(f"α = {alpha}; β = {beta}; mean = {mean}")
def main():
# Create an event loop (what Tornado calls an IOLoop).
io_loop = tornado.ioloop.IOLoop.current()
# Before starting the event loop, instantiate a WebSocketClient and add a
# callback to the event loop to start it. This way the first thing the
# event loop does is to start the client.
client = WebSocketClient(io_loop)
io_loop.add_callback(client.start)
# Start the event loop.
io_loop.start()
if __name__ == "__main__":
main()
Why Tornado?
Tornado is a Python web framework, but unlike the more popular Python web
frameworks like Flask or
Django, it has strong support for
asynchronous networking and non-blocking
calls —
essentially, Tornado apps have one (single-threaded) event loop
(tornado.ioloop.IOLoop
), which handles all requests asynchronously,
dispatching incoming requests to the relevant non-blocking function as the
request comes in. As far as I know, Tornado is the only Python web framework
that does this.
As an aside, Tornado seems to be more popular in finance, where streaming real-time data (e.g. market data) is very common.
Why WebSockets?
A sharper question might be, why WebSockets over HTTP requests to a REST endpoint? After all, both theoretically allow a client to stream data in real-time from a server.
A lot can be said when comparing WebSockets and RESTful services, but I think the main points are accurately summarized by Kumar Chandrakant on Baeldung:
[A] WebSocket is more suitable for cases where a push-based and real-time communication defines the requirement more appropriately. Additionally, WebSocket works well for scenarios where a message needs to be pushed to multiple clients simultaneously. These are the cases where client and server communication over RESTful services will find it difficult if not prohibitive.
Tangentially, there’s one alternative that seems to be better than WebSockets from a protocol standpoint, but unfortunately doesn’t seem to have support from many Python web frameworks, and that is Server-Sent Events (a.k.a. SSE): it seems to be a cleaner protocol for unidirectional data flow, which is really all that we need.
Additionally, Armin Ronacher has a much starker view of WebSockets, seeing no value in using WebSockets over TCP/IP sockets for this application:
Websockets make you sad. […] Websockets are complex, way more complex than I anticipated. I can understand that they work that way but I definitely don’t see a value in using websockets instead of regular TCP connections if all you want is to exchange data between different endpoints and neither is a browser.
My thought after reading these criticisms is that perhaps WebSockets aren’t the ideal technology for handling streaming data (from a maintainability or architectural point of view), but that doesn’t mean that they aren’t good scalable technologies when they do work.
There is technically a difference between “real-time” and “streaming”: “real-time” refers to data that comes in as it is created, whereas “streaming” refers to a system that processes data continuously. You stream your TV show from Netflix, but since the show was created long before you watched it, you aren’t viewing it in real-time. ↩︎