Building a Production-Grade Data Streamer for Hyperliquid
How to aggregate real-time trades into OHLCV bars using the Hyperliquid WebSocket API
Why Stream Your Own Data?
If you're running a quantitative strategy on Hyperliquid, you need candle data. You can poll the REST API every N minutes, but that approach has problems:
Latency. You don't get the bar until you ask for it. A 30-second poll delay on a 5-minute bar is a 10% timing error.
Reliability. REST calls fail. Rate limits bite. Your cron job misses a tick and you've got a gap.
Control. You want bars aligned exactly to your strategy's clock, not whatever the exchange decided.
The solution is a WebSocket trade stream that aggregates ticks into bars locally. This post walks through how to build one from scratch, based on a system that has run in production across 20+ crypto perpetuals.
Architecture Overview
The design is simple on purpose:
Hyperliquid WebSocket (wss://api.hyperliquid.xyz/ws)
│
│ trade prints (coin, price, size, timestamp)
▼
┌─────────────┐
│ Bar State │ ← one BarState per coin, protected by a lock
│ (OHLCV) │
└──────┬──────┘
│
│ flush on period boundary or timer
▼
┌─────────────┐
│ CSV Files │ ← one file per coin, appended with file locks
│ (append) │
└─────────────┘
│
▼
Downstream consumers (ML pipeline, backtester, live runner)Three threads: - WebSocket thread — maintains the connection, parses trades, updates bar state. - Rollover thread — wakes up every few seconds, flushes bars whose time window has elapsed. - Main thread — monitors health, triggers reconnections if the feed goes stale.
Why CSV? Because every tool in the quant stack can read a CSV. Pandas, polars, R, Excel, tail -f. No database to operate, no schema migrations, no port conflicts. When your ML pipeline polls a CSV for new rows, it just works.
Step 1: Connect to the WebSocket
Hyperliquid's public WebSocket endpoint is wss://api.hyperliquid.xyz/ws. No authentication required for market data.
Install the client:
pip install websocket-client requestsThe connection wrapper:
import json
import threading
import random
import time
import traceback
from typing import Optional
import websocket
WS_URL = "wss://api.hyperliquid.xyz/ws"
PING_INTERVAL_SECONDS = 20
MAX_BACKOFF_SECONDS = 60
COINS = ["BTC", "ETH", "SOL", "XRP", "DOGE", "LINK", "HYPE"]
class HyperliquidWsWorker:
def __init__(self, stop_event: threading.Event):
self._stop = stop_event
self._thread: Optional[threading.Thread] = None
self._app: Optional[websocket.WebSocketApp] = None
def start(self):
self._thread = threading.Thread(
target=self._run_forever,
name="hyperliquid-ws",
daemon=True,
)
self._thread.start()
def request_close(self):
if self._app:
try:
self._app.close()
except Exception:
pass
def _run_forever(self):
backoff = 1.0
while not self._stop.is_set():
self._app = websocket.WebSocketApp(
WS_URL,
on_open=self._on_open,
on_message=self._on_message,
on_error=self._on_error,
on_close=self._on_close,
)
try:
self._app.run_forever(
ping_interval=PING_INTERVAL_SECONDS,
ping_timeout=10,
)
except Exception as exc:
print(f"[ws] run_forever error: {exc}")
if self._stop.is_set():
break
# Exponential backoff with jitter
wait = min(backoff, MAX_BACKOFF_SECONDS)
jitter = random.uniform(0.0, 1.0)
print(f"[ws] Reconnecting in {wait + jitter:.1f}s")
self._stop.wait(wait + jitter)
backoff = min(wait * 2, MAX_BACKOFF_SECONDS)
def _on_open(self, ws):
print("[ws] Connected, subscribing...")
for coin in COINS:
payload = {
"method": "subscribe",
"subscription": {"type": "trades", "coin": coin},
}
ws.send(json.dumps(payload))
def _on_message(self, _ws, message: str):
# We'll fill this in next
pass
def _on_error(self, _ws, error):
print(f"[ws] Error: {error}")
def _on_close(self, _ws, status_code, msg):
print(f"[ws] Closed (code={status_code})")Three things to notice:
run_foreverwithping_interval=20. Hyperliquid will drop idle connections. The ping keeps it alive.Exponential backoff with jitter. When the connection drops (and it will — maintenance windows, network blips, AWS hiccups), you don't want all your streamers hammering the endpoint simultaneously.
Daemon thread. If the main process dies, the thread dies with it. No orphaned connections.
Step 2: Parse Trade Messages
Hyperliquid sends trade data in this format:
{
"channel": "trades",
"data": [
{"coin": "BTC", "side": "A", "px": "97432.0", "sz": "0.001", "time": 1709827200000}
]
}But depending on SDK version and message type, the schema can vary. A production parser needs to handle multiple formats gracefully:
from typing import Iterator, Tuple
from datetime import datetime, timezone
UTC = timezone.utc
def iter_trades(payload: dict) -> Iterator[Tuple[str, float, float, datetime]]:
"""Yield (coin, price, size, trade_dt) from a WebSocket message."""
data = payload.get("data")
if not isinstance(data, list):
return
for entry in data:
if not isinstance(entry, dict):
continue
coin = entry.get("coin") or entry.get("symbol")
if not coin:
continue
price = _safe_float(entry.get("px") or entry.get("price"))
size = _safe_float(entry.get("sz") or entry.get("size"))
if price is None or size is None:
continue
ts = entry.get("time") or entry.get("timestamp") or entry.get("ts")
if isinstance(ts, (int, float)):
ts_val = float(ts)
if ts_val > 1e12: # milliseconds
ts_val /= 1000.0
trade_dt = datetime.fromtimestamp(ts_val, tz=UTC).replace(tzinfo=None)
else:
trade_dt = datetime.utcnow()
yield str(coin).upper(), price, size, trade_dt
def _safe_float(value) -> float | None:
if value is None:
return None
try:
return float(value)
except (TypeError, ValueError):
return NoneThe _safe_float helper looks trivial but it prevents the #1 crash in production WebSocket code: a None or "" sneaking into a float() call at 2 AM.
Now wire _on_message to parse trades and feed them to the bar aggregator (shown in Step 3):
def _on_message(self, _ws, message: str):
global last_message_ts
last_message_ts = time.time()
try:
payload = json.loads(message)
except json.JSONDecodeError:
return
if payload.get("channel") != "trades":
return
for coin, price, size, trade_dt in iter_trades(payload):
update_bar(coin, price, size, trade_dt)This updates the stale-connection timestamp on every message and delegates trade parsing to iter_trades from Step 2.
Step 3: The Bar Aggregator
This is the core of the system. Each coin gets a BarState object that tracks the current bar's OHLCV:
from dataclasses import dataclass
from typing import Optional
@dataclass
class BarState:
open: Optional[float] = None
high: float = float("-inf")
low: float = float("inf")
close: Optional[float] = None
volume: float = 0.0
window_start: Optional[datetime] = None
def update(self, price: float, volume: float):
if self.open is None:
self.open = self.high = self.low = self.close = price
if price > self.high:
self.high = price
if price < self.low:
self.low = price
self.close = price
self.volume += max(volume, 0.0)
def reset_for(self, window_start: datetime, price: float, volume: float):
self.open = self.high = self.low = self.close = price
self.volume = max(volume, 0.0)
self.window_start = window_start
def reset(self):
self.open = None
self.high = float("-inf")
self.low = float("inf")
self.close = None
self.volume = 0.0
self.window_start = NoneThe window alignment function floors a timestamp to the start of its bar:
BAR_INTERVAL_MINUTES = 30
def floor_to_window(dt_utc: datetime) -> datetime:
minute = (dt_utc.minute // BAR_INTERVAL_MINUTES) * BAR_INTERVAL_MINUTES
return dt_utc.replace(minute=minute, second=0, microsecond=0)For a 30-minute bar, 14:47:33 becomes 14:30:00. For 5-minute bars, change BAR_INTERVAL_MINUTES = 5 and 14:47:33 becomes 14:45:00.
Now the update logic that ties it together:
bars_lock = threading.RLock()
bars = {coin: BarState() for coin in COINS}
def update_bar(coin: str, price: float, volume: float, trade_dt: datetime):
with bars_lock:
if coin not in bars:
return
state = bars[coin]
window_start = floor_to_window(trade_dt)
if state.window_start is None:
# First trade we've ever seen
state.reset_for(window_start, price, volume)
elif window_start > state.window_start:
# New period — flush the old bar, start fresh
flush_bar(coin, reason="boundary")
state.reset_for(window_start, price, volume)
else:
# Same period — just update OHLCV
state.update(price, volume)The key insight: when a trade arrives whose floor_to_window is later than the current window_start, we know the bar is complete. Flush it and start a new one.
Step 4: Writing Bars to CSV
CSV writes need file-level locking because the downstream consumer (your ML pipeline) may be reading the same file:
import csv
import fcntl
import os
OUTPUT_DIR = "/data/hyperliquid"
CSV_HEADERS = ["Datetime", "Open", "High", "Low", "Close", "Volume"]
def append_row_locked(path: str, row):
with open(path, "a", newline="") as handle:
fcntl.flock(handle, fcntl.LOCK_EX)
csv.writer(handle).writerow(list(row))
handle.flush()
os.fsync(handle.fileno())
fcntl.flock(handle, fcntl.LOCK_UN)
def flush_bar(coin: str, reason: str = "period"):
with bars_lock:
state = bars[coin]
if state.window_start is None or state.open is None:
return
dt_str = state.window_start.strftime("%Y-%m-%d %H:%M:%S")
row = [dt_str, state.open, state.high, state.low, state.close, state.volume]
path = os.path.join(OUTPUT_DIR, f"{coin.lower()}_30m_data.csv")
append_row_locked(path, row)
print(
f"[bar] {coin} {dt_str} "
f"O:{state.open} H:{state.high} L:{state.low} "
f"C:{state.close} V:{state.volume} ({reason})"
)Why os.fsync? Without it, the OS may buffer writes. Your ML pipeline reads the file, sees no new rows, and skips a signal. The fsync guarantees the data hits disk before we release the lock.
Why fcntl.flock? On Linux, this is an advisory lock. Both the writer (this streamer) and any reader that also uses flock will coordinate. Even without flock on the reader side, the atomic append mode plus fsync is enough for most use cases.
Step 5: The Rollover Timer
Here's a subtlety. What if a coin doesn't trade for 30 minutes? No trade message arrives, so the bar never flushes. Your downstream pipeline sees a gap.
The solution is a background timer that periodically checks if any bar's window has elapsed:
from datetime import timedelta
ROLLOVER_CHECK_SECONDS = 5
def rollover(now: datetime):
with bars_lock:
for coin, state in bars.items():
if state.window_start is None:
continue
while now >= state.window_start + timedelta(minutes=BAR_INTERVAL_MINUTES):
# Flush the completed bar
flush_bar(coin, reason="timer")
# Advance to next window, carrying forward the last close
next_start = state.window_start + timedelta(minutes=BAR_INTERVAL_MINUTES)
last_close = state.close
state.reset()
state.window_start = next_start
if last_close is not None:
state.open = state.high = state.low = state.close = last_close
state.volume = 0.0
def rollover_loop(stop_event: threading.Event):
while not stop_event.is_set():
try:
rollover(datetime.utcnow())
except Exception as exc:
print(f"[rollover] Error: {exc}")
stop_event.wait(ROLLOVER_CHECK_SECONDS)The while loop (not if) handles the case where the streamer was down for multiple bar periods — it catches up by flushing each missed window in sequence, carrying the last known close forward.
Step 6: Stale Connection Detection
WebSocket connections can silently die. The server stops sending, but the TCP connection stays open. You'll sit there forever waiting for trades that will never come.
The fix: track when you last received any message, and force a reconnect if it's been too long.
STALE_MESSAGE_SECONDS = 90
last_message_ts = 0.0
def health_check_loop(stop_event, ws_worker):
while not stop_event.is_set():
now = time.time()
if last_message_ts and now - last_message_ts > STALE_MESSAGE_SECONDS:
print("[health] Feed stale, forcing reconnect")
ws_worker.request_close()
last_message_ts = now # prevent repeated close calls
stop_event.wait(1.0)90 seconds is generous. BTC trades every few seconds. But for low-liquidity altcoins on weekends, you might go a minute without a print. Tune this based on your coin list.
Step 7: Backfilling Gaps on Startup
When the streamer starts (or restarts after a crash), you've missed bars. The REST API can fill those gaps:
import requests
API_URL = "https://api.hyperliquid.xyz/info"
def backfill_recent(coins: list[str], hours: int = 6):
"""Download recent candles from REST API and fill gaps in CSVs."""
now_utc = datetime.utcnow()
end_ms = int(now_utc.timestamp() * 1000)
start_ms = end_ms - hours * 3600 * 1000
for coin in coins:
path = os.path.join(OUTPUT_DIR, f"{coin.lower()}_30m_data.csv")
existing = _read_existing_timestamps(path)
body = {
"type": "candleSnapshot",
"req": {
"coin": coin,
"interval": "30m",
"startTime": start_ms,
"endTime": end_ms,
},
}
resp = requests.post(API_URL, json=body, timeout=20)
resp.raise_for_status()
candles = resp.json()
inserted = 0
for candle in candles:
ts_ms = candle.get("t") or candle.get("time")
dt = datetime.utcfromtimestamp(int(ts_ms) / 1000.0)
dt_str = dt.strftime("%Y-%m-%d %H:%M:%S")
# Skip current incomplete bar
if dt >= floor_to_window(now_utc):
continue
# Skip already-written bars
if dt_str in existing:
continue
row = [
dt_str,
float(candle["o"]),
float(candle["h"]),
float(candle["l"]),
float(candle["c"]),
float(candle["v"]),
]
append_row_locked(path, row)
existing.add(dt_str)
inserted += 1
print(f"[backfill] {coin}: +{inserted} bars")
time.sleep(0.25) # be nice to the API
def _read_existing_timestamps(path: str) -> set[str]:
"""Return set of Datetime strings already in the CSV."""
timestamps = set()
if not os.path.exists(path):
return timestamps
with open(path) as f:
reader = csv.reader(f)
next(reader, None) # skip header
for row in reader:
if row:
timestamps.add(row[0])
return timestampsRun this once at startup: backfill_recent(COINS, hours=6). It's idempotent — existing timestamps are skipped.
Step 8: Putting It All Together
import signal
def main():
# Backfill any gaps from downtime
backfill_recent(COINS, hours=6)
stop_event = threading.Event()
# Ensure CSV files exist with headers
os.makedirs(OUTPUT_DIR, exist_ok=True)
for coin in COINS:
path = os.path.join(OUTPUT_DIR, f"{coin.lower()}_30m_data.csv")
if not os.path.exists(path) or os.path.getsize(path) == 0:
with open(path, "w", newline="") as f:
csv.writer(f).writerow(CSV_HEADERS)
# Start WebSocket
ws_worker = HyperliquidWsWorker(stop_event)
ws_worker.start()
# Start rollover timer
rollover_thread = threading.Thread(
target=rollover_loop,
args=(stop_event,),
daemon=True,
)
rollover_thread.start()
# Graceful shutdown
def handle_signal(sig, _frame):
print(f"[main] Signal {sig}, shutting down...")
stop_event.set()
ws_worker.request_close()
signal.signal(signal.SIGINT, handle_signal)
signal.signal(signal.SIGTERM, handle_signal)
# Health monitor
print("[main] Streamer running. Ctrl+C to stop.")
while not stop_event.is_set():
now = time.time()
if last_message_ts and now - last_message_ts > STALE_MESSAGE_SECONDS:
print("[main] Feed stale, forcing reconnect")
ws_worker.request_close()
stop_event.wait(1.0)
# Cleanup
stop_event.set()
ws_worker.request_close()
rollover_thread.join(timeout=2.0)
print("[main] Bye.")
if __name__ == "__main__":
main()Run it:
python hyperliquid_streamer.pyYou'll see output like:
[backfill] BTC: +12 bars
[backfill] ETH: +12 bars
[ws] Connected, subscribing...
[main] Streamer running. Ctrl+C to stop.
[bar] BTC 2024-12-05 14:30:00 O:97432.0 H:97891.0 L:97210.5 C:97654.0 V:142.31 (boundary)
[bar] ETH 2024-12-05 14:30:00 O:3421.5 H:3445.2 L:3418.0 C:3439.8 V:2891.4 (boundary)Running as a Systemd Service
For production, you want this to survive reboots:
# /etc/systemd/system/hyperliquid-streamer.service
[Unit]
Description=Hyperliquid 30m Data Streamer
After=network-online.target
Wants=network-online.target
[Service]
Type=simple
User=trader
WorkingDirectory=/home/trader/streamer
ExecStart=/home/trader/miniconda3/envs/trading/bin/python hyperliquid_streamer.py
Restart=always
RestartSec=10
StandardOutput=journal
StandardError=journal
[Install]
WantedBy=multi-user.targetsudo systemctl enable hyperliquid-streamer
sudo systemctl start hyperliquid-streamer
# Check logs
journalctl -u hyperliquid-streamer -fBonus: Streaming Order Book Data
If you need Level 2 book data (for market making or fill probability models), subscribe to l2Book instead of trades:
def _on_open(self, ws):
ws.send(json.dumps({
"method": "subscribe",
"subscription": {"type": "l2Book", "coin": "BTC"},
}))
def _on_message(self, _ws, message: str):
payload = json.loads(message)
if payload.get("channel") != "l2Book":
return
data = payload["data"]
levels = data.get("levels", [[], []])
bids = levels[0] # [{"px": "97430.0", "sz": "1.5"}, ...]
asks = levels[1] # [{"px": "97432.0", "sz": "0.8"}, ...]
best_bid = float(bids[0]["px"]) if bids else None
best_ask = float(asks[0]["px"]) if asks else None
print(f"BTC bid={best_bid} ask={best_ask} spread={best_ask - best_bid:.1f}")You can also subscribe to userFills for real-time fill notifications (requires your wallet address):
ws.send(json.dumps({
"method": "subscribe",
"subscription": {"type": "userFills", "user": "0xYourAddress"},
}))Common Pitfalls
1. Forgetting thread safety. The WebSocket callback runs on its own thread. If your bar state isn't protected by a lock, you'll get corrupted OHLCV values. Use threading.RLock() around all bar reads and writes.
2. Not handling timestamp formats. Hyperliquid sometimes returns milliseconds, sometimes seconds. Always check: if ts > 1e12, divide by 1000.
3. Silent WebSocket death. A connection can go "zombie" — TCP stays open but no data flows. The stale message detector is not optional; it's essential.
4. Missing the rollover for illiquid coins. DOGE might trade every second. ASTER might go 20 minutes without a print. Without the timer-based rollover, your bars have gaps. With it, you get bars with volume=0 and OHLC=last_close, which is correct.
5. Not using os.fsync. Without it, your data can survive in OS buffers for seconds. If the machine loses power (or your OOM-killer strikes), you lose bars.
Summary
The complete data streamer is roughly 200 lines of Python. It:
Connects to Hyperliquid's public WebSocket
Subscribes to trade prints for any set of coins
Aggregates trades into time-aligned OHLCV bars
Writes completed bars to per-coin CSV files with file locking
Handles reconnection with exponential backoff and jitter
Detects stale connections and forces reconnects
Backfills gaps from the REST API on startup
Runs as a systemd service for production reliability
The output CSV files can be consumed by any downstream system — pandas, a live trading runner, a backtester — with nothing more than pd.read_csv().
The boring parts (file locks, fsync, stale connection detection, rollover timers) are what separate a streamer that works in a notebook from one that runs unattended for months.


