ZeroMQ is a lightweight, fast, and scalable socket programming library.
ZeroMQ supports different patterns for socket communication. One of those patterns is the so-called publisher-subscriber (PUB-SUB) pattern where a single socket publishes data and multiple sockets simultaneously retrieve the data.
Running a Simple Tick Data Server
Euler discretisation of geometric Brownian motion
S is the instrument price
r is the constant short rate
σ is the constant volatility factor
z is a standard normal random variable
Δt is the interval between two discrete observations of the instrument price
import zmq
import math
import time
import random
context = zmq.Context()
socket = context.socket(zmp.PUB)
socket.bind('tcp://0.0.0.0:5555')
The class InstrumentPrice is fro the simulation of instrument price value over time.
class InstrumentPrice(object):
def __init__(self):
self.symbol = 'SYMBOL'
self.t = time.time()
self.value = 100.
self.sigma = 0.4
self.r = 0.01
def simulate_value(self):
// Generates a new, ramdom stock price
t = time.time()
dt = (t - self.t) / (255 * 8 * 60 * 60)
dt *= 500
self.t = t
self.value *= math.exp(self.r - 0.5 * self.sigma ** 2) * dt + self.sigma * math.sqrt(dt) * random.gauss(0, 1))
Finally, the execution pauses for a random amount of time
ip = InstrumentPrice()
while True:
msg = '{} {:.2f}'.format(ip.symbol, ip.simulate_value))
print(msg)
socket.send_string(msg)
time.sleep(random.random() * 2)
Connecting a Simple Tick Data Client
import zmq
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect('tcp://0.0.0.0:555')
socket.setsockopt_string(zmq.SUBSCRIBE, 'SYMBOL')
The while loop boils down to the retrieval of the messages sent by the server socket and printing them out:
while True:
data = socket.recv_string()
print(data)
Signal Generation in Real Time
An online algorithm is an algorithm based on data that is received incrementally (bit by bit) over time.
To generate signals in real time on the basis of an online algorithm, data need to be collected and processed over time.
df = pd.DataFrame()
mom = 3
min_length = mom + 1
while True:
data = socket.recv_string()
t = datetime.datetime.now()
sym, value = data.split()
df = df.append(pd.DataFrame({sym: float(value)}, index=[t]))
dr = df.resample('5s', label='right').last()
dr['returns'] = np.log(dr / dr.shift(1))
if len(dr) > min_length:
min_length += 1
dr['momentum'] = np.sigh(dr['returns'].rolling(mom).mean())
print('\n' + '=' * 51)
print('NEW SIGNAL \ {}'.format(datetime.datetime.now()))
print('=' * 51)
print(dr.iloc[:-1].tail())
if dr['momentum'].iloc[-1] == 1.0:
print('\nLong market position.')
elif dr['momentum'].iloc[-1] == -1.0:
print('\nShort market position.')
Visualising Streaming Data with Plotly
Install Jupyter Lab extensions
conda install plotly ipywidgets
jupyter labextension install jujyterlab-plotly
jupyter labextension install @jupyter-widgets/jupyterlab-manager
jupyter labextension install plotlywidget
Create a Plotly figure widget
import zmq
from datetime import datetime
import ploty.graph_objects as go
symbol = 'SYMBOL'
fig = go.FigureWidget()
fig.add_scatter()
Set up the socket communication with the sample tick data server, which needs to run on the same machine in a separate Python process
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect('tcp://0.0.0.0:5555')
socket.setsockopt_string(zmq.SUBSCRIBE, 'SYMBOL')
times = list()
prices = list()
for _ in range(50):
msg = socket.recv_string()
t = datetime.now()
times.append(t)
_, price = msg.split()
prices.append(float(price))
fig.data[0].x = times
fig.data[0].y = prices
Three Real-Time Streams
The tick data from the sample tick data server is collected in a pandas DataFrame object.
The two SMAs are calculated after each update from the socket.
fig = go.FigureWidget()
fig.add_scatter(name='SYMBOL')
fig.add_scatter(name='SMA1', line=dict(width=1, dash='dot'), mod='lines+markets')
fig.add_scatter(name='SMA2', line=dict(width=1, dash='dash'), mode='lines+markers')
import pandas as pd
df = pd.DataFrame()
for _ in range(75):
msg = socket.recv_string()
t = datetime.now()
sym, price = msg.split()
df = df.append(pd.DataFrame({sym: float(price)}, index=[t]))
df['SMA1'] = df[sym].rolling(5).mean()
df['SMA2'] = df[sym].rolling(10).mean()
fig.data[0].x = df.index
fig.data[1].x = df.index
fig.data[2].x = df.index
fig.data[0].y = df[sym]
fig.data[1].y = df['SMA1']
fig.data[2].y = df['SMA2']
Three Sub-Plots for Three Streams
The first plots the real-time tick data.
The second plots the log returns data.
The third plots the time series momentum based on the log returns data.
from ploty.subplots umport make_subplots
f = make_subplots(rows=3, cols=1, shared_xaxes=True)
f.append_trace(go.Scatter(name='SYMBOL'), row=1, col=1)
f.append_trace(go.Scatter(name='RETURN', line=dic(width=1, dash='dot'), mode='lines+markers', marker={'symbol': 'triangle-up'}), row=2, col=1)
f.append_trace(go.Scatter(name='MOMENTUM', line=dict(width=1, dash='dash'), mode='lines+markers', marker={'symbol': 'x'}), row=3, col=1)
fig = go.FigureWidget(f)
import numpy as np
df = pd.DataFrame()
for _ in range(75):
msg = socket.recv_string()
t = datetime.now()
sym, price = msg.split()
df = df.append(pd.DataFrame({sym: float(price)}, index=[t]))
df['RET'] = np.log(df[sym] / df[sym].shift(1))
df['MOM'] = df['RET'].rolling(10).mean()
fig.data[0].x = df.index
fig.data[1].x = df.index
fig.data[2].x = df.index
fig.data[0].y = df[sym]
fig.data[1].y = df['RET']
fig.data[2].y = df['MON']
Streaming Data as Bars
socket = context.socket(zmq.SUB)
socket.connect('tcp://0.0.0.0:5556')
socket.setsockopt_string(zmq.SUBSCRIBE, '')
for _ in range(5)
msg = socket.recv_string()
print(msg)
fig = go.FigureWidget()
fig.add_bar()
x = list('abcdefgh')
fig.data[0].x = x
for _ in range(25):
msg = socket.recv_string()
y = msg.split()
y = [float(n) for n in y]
fig.data[0].y = y









