Delphic Alpha

Delphic Alpha

Building a Production-Grade Data Streamer for Hyperliquid

How to aggregate real-time trades into OHLCV bars using the Hyperliquid WebSocket API

oracle's avatar
oracle
Mar 14, 2026
∙ Paid

Why Stream Your Own Data?

If you're running a quantitative strategy on Hyperliquid, you need candle data. You can poll the REST API every N minutes, but that approach has problems:

  1. Latency. You don't get the bar until you ask for it. A 30-second poll delay on a 5-minute bar is a 10% timing error.

  2. Reliability. REST calls fail. Rate limits bite. Your cron job misses a tick and you've got a gap.

  3. Control. You want bars aligned exactly to your strategy's clock, not whatever the exchange decided.

The solution is a WebSocket trade stream that aggregates ticks into bars locally. This post walks through how to build one from scratch, based on a system that has run in production across 20+ crypto perpetuals.


Architecture Overview

The design is simple on purpose:

Hyperliquid WebSocket (wss://api.hyperliquid.xyz/ws)
        │
        │  trade prints (coin, price, size, timestamp)
        ▼
  ┌─────────────┐
  │  Bar State   │  ← one BarState per coin, protected by a lock
  │  (OHLCV)     │
  └──────┬──────┘
         │
         │  flush on period boundary or timer
         ▼
  ┌─────────────┐
  │  CSV Files   │  ← one file per coin, appended with file locks
  │  (append)    │
  └─────────────┘
         │
         ▼
  Downstream consumers (ML pipeline, backtester, live runner)

Three threads: - WebSocket thread — maintains the connection, parses trades, updates bar state. - Rollover thread — wakes up every few seconds, flushes bars whose time window has elapsed. - Main thread — monitors health, triggers reconnections if the feed goes stale.

Why CSV? Because every tool in the quant stack can read a CSV. Pandas, polars, R, Excel, tail -f. No database to operate, no schema migrations, no port conflicts. When your ML pipeline polls a CSV for new rows, it just works.


Step 1: Connect to the WebSocket

Hyperliquid's public WebSocket endpoint is wss://api.hyperliquid.xyz/ws. No authentication required for market data.

Install the client:

pip install websocket-client requests

The connection wrapper:

import json
import threading
import random
import time
import traceback
from typing import Optional

import websocket

WS_URL = "wss://api.hyperliquid.xyz/ws"
PING_INTERVAL_SECONDS = 20
MAX_BACKOFF_SECONDS = 60

COINS = ["BTC", "ETH", "SOL", "XRP", "DOGE", "LINK", "HYPE"]


class HyperliquidWsWorker:
    def __init__(self, stop_event: threading.Event):
        self._stop = stop_event
        self._thread: Optional[threading.Thread] = None
        self._app: Optional[websocket.WebSocketApp] = None

    def start(self):
        self._thread = threading.Thread(
            target=self._run_forever,
            name="hyperliquid-ws",
            daemon=True,
        )
        self._thread.start()

    def request_close(self):
        if self._app:
            try:
                self._app.close()
            except Exception:
                pass

    def _run_forever(self):
        backoff = 1.0
        while not self._stop.is_set():
            self._app = websocket.WebSocketApp(
                WS_URL,
                on_open=self._on_open,
                on_message=self._on_message,
                on_error=self._on_error,
                on_close=self._on_close,
            )
            try:
                self._app.run_forever(
                    ping_interval=PING_INTERVAL_SECONDS,
                    ping_timeout=10,
                )
            except Exception as exc:
                print(f"[ws] run_forever error: {exc}")

            if self._stop.is_set():
                break

            # Exponential backoff with jitter
            wait = min(backoff, MAX_BACKOFF_SECONDS)
            jitter = random.uniform(0.0, 1.0)
            print(f"[ws] Reconnecting in {wait + jitter:.1f}s")
            self._stop.wait(wait + jitter)
            backoff = min(wait * 2, MAX_BACKOFF_SECONDS)

    def _on_open(self, ws):
        print("[ws] Connected, subscribing...")
        for coin in COINS:
            payload = {
                "method": "subscribe",
                "subscription": {"type": "trades", "coin": coin},
            }
            ws.send(json.dumps(payload))

    def _on_message(self, _ws, message: str):
        # We'll fill this in next
        pass

    def _on_error(self, _ws, error):
        print(f"[ws] Error: {error}")

    def _on_close(self, _ws, status_code, msg):
        print(f"[ws] Closed (code={status_code})")

Three things to notice:

  1. run_forever with ping_interval=20. Hyperliquid will drop idle connections. The ping keeps it alive.

  2. Exponential backoff with jitter. When the connection drops (and it will — maintenance windows, network blips, AWS hiccups), you don't want all your streamers hammering the endpoint simultaneously.

  3. Daemon thread. If the main process dies, the thread dies with it. No orphaned connections.


Step 2: Parse Trade Messages

Hyperliquid sends trade data in this format:

{
  "channel": "trades",
  "data": [
    {"coin": "BTC", "side": "A", "px": "97432.0", "sz": "0.001", "time": 1709827200000}
  ]
}

But depending on SDK version and message type, the schema can vary. A production parser needs to handle multiple formats gracefully:

from typing import Iterator, Tuple
from datetime import datetime, timezone

UTC = timezone.utc


def iter_trades(payload: dict) -> Iterator[Tuple[str, float, float, datetime]]:
    """Yield (coin, price, size, trade_dt) from a WebSocket message."""
    data = payload.get("data")
    if not isinstance(data, list):
        return

    for entry in data:
        if not isinstance(entry, dict):
            continue

        coin = entry.get("coin") or entry.get("symbol")
        if not coin:
            continue

        price = _safe_float(entry.get("px") or entry.get("price"))
        size = _safe_float(entry.get("sz") or entry.get("size"))
        if price is None or size is None:
            continue

        ts = entry.get("time") or entry.get("timestamp") or entry.get("ts")
        if isinstance(ts, (int, float)):
            ts_val = float(ts)
            if ts_val > 1e12:  # milliseconds
                ts_val /= 1000.0
            trade_dt = datetime.fromtimestamp(ts_val, tz=UTC).replace(tzinfo=None)
        else:
            trade_dt = datetime.utcnow()

        yield str(coin).upper(), price, size, trade_dt


def _safe_float(value) -> float | None:
    if value is None:
        return None
    try:
        return float(value)
    except (TypeError, ValueError):
        return None

The _safe_float helper looks trivial but it prevents the #1 crash in production WebSocket code: a None or "" sneaking into a float() call at 2 AM.


Now wire _on_message to parse trades and feed them to the bar aggregator (shown in Step 3):

def _on_message(self, _ws, message: str):
    global last_message_ts
    last_message_ts = time.time()

    try:
        payload = json.loads(message)
    except json.JSONDecodeError:
        return

    if payload.get("channel") != "trades":
        return

    for coin, price, size, trade_dt in iter_trades(payload):
        update_bar(coin, price, size, trade_dt)

This updates the stale-connection timestamp on every message and delegates trade parsing to iter_trades from Step 2.


Step 3: The Bar Aggregator

This is the core of the system. Each coin gets a BarState object that tracks the current bar's OHLCV:

from dataclasses import dataclass
from typing import Optional


@dataclass
class BarState:
    open: Optional[float] = None
    high: float = float("-inf")
    low: float = float("inf")
    close: Optional[float] = None
    volume: float = 0.0
    window_start: Optional[datetime] = None

    def update(self, price: float, volume: float):
        if self.open is None:
            self.open = self.high = self.low = self.close = price
        if price > self.high:
            self.high = price
        if price < self.low:
            self.low = price
        self.close = price
        self.volume += max(volume, 0.0)

    def reset_for(self, window_start: datetime, price: float, volume: float):
        self.open = self.high = self.low = self.close = price
        self.volume = max(volume, 0.0)
        self.window_start = window_start

    def reset(self):
        self.open = None
        self.high = float("-inf")
        self.low = float("inf")
        self.close = None
        self.volume = 0.0
        self.window_start = None

The window alignment function floors a timestamp to the start of its bar:

BAR_INTERVAL_MINUTES = 30

def floor_to_window(dt_utc: datetime) -> datetime:
    minute = (dt_utc.minute // BAR_INTERVAL_MINUTES) * BAR_INTERVAL_MINUTES
    return dt_utc.replace(minute=minute, second=0, microsecond=0)

For a 30-minute bar, 14:47:33 becomes 14:30:00. For 5-minute bars, change BAR_INTERVAL_MINUTES = 5 and 14:47:33 becomes 14:45:00.

Now the update logic that ties it together:

bars_lock = threading.RLock()
bars = {coin: BarState() for coin in COINS}


def update_bar(coin: str, price: float, volume: float, trade_dt: datetime):
    with bars_lock:
        if coin not in bars:
            return
        state = bars[coin]
        window_start = floor_to_window(trade_dt)

        if state.window_start is None:
            # First trade we've ever seen
            state.reset_for(window_start, price, volume)
        elif window_start > state.window_start:
            # New period — flush the old bar, start fresh
            flush_bar(coin, reason="boundary")
            state.reset_for(window_start, price, volume)
        else:
            # Same period — just update OHLCV
            state.update(price, volume)

The key insight: when a trade arrives whose floor_to_window is later than the current window_start, we know the bar is complete. Flush it and start a new one.


Step 4: Writing Bars to CSV

CSV writes need file-level locking because the downstream consumer (your ML pipeline) may be reading the same file:

import csv
import fcntl
import os

OUTPUT_DIR = "/data/hyperliquid"
CSV_HEADERS = ["Datetime", "Open", "High", "Low", "Close", "Volume"]


def append_row_locked(path: str, row):
    with open(path, "a", newline="") as handle:
        fcntl.flock(handle, fcntl.LOCK_EX)
        csv.writer(handle).writerow(list(row))
        handle.flush()
        os.fsync(handle.fileno())
        fcntl.flock(handle, fcntl.LOCK_UN)


def flush_bar(coin: str, reason: str = "period"):
    with bars_lock:
        state = bars[coin]
        if state.window_start is None or state.open is None:
            return

        dt_str = state.window_start.strftime("%Y-%m-%d %H:%M:%S")
        row = [dt_str, state.open, state.high, state.low, state.close, state.volume]
        path = os.path.join(OUTPUT_DIR, f"{coin.lower()}_30m_data.csv")
        append_row_locked(path, row)

        print(
            f"[bar] {coin} {dt_str} "
            f"O:{state.open} H:{state.high} L:{state.low} "
            f"C:{state.close} V:{state.volume} ({reason})"
        )

Why os.fsync? Without it, the OS may buffer writes. Your ML pipeline reads the file, sees no new rows, and skips a signal. The fsync guarantees the data hits disk before we release the lock.

Why fcntl.flock? On Linux, this is an advisory lock. Both the writer (this streamer) and any reader that also uses flock will coordinate. Even without flock on the reader side, the atomic append mode plus fsync is enough for most use cases.


Step 5: The Rollover Timer

Here's a subtlety. What if a coin doesn't trade for 30 minutes? No trade message arrives, so the bar never flushes. Your downstream pipeline sees a gap.

The solution is a background timer that periodically checks if any bar's window has elapsed:

from datetime import timedelta

ROLLOVER_CHECK_SECONDS = 5


def rollover(now: datetime):
    with bars_lock:
        for coin, state in bars.items():
            if state.window_start is None:
                continue
            while now >= state.window_start + timedelta(minutes=BAR_INTERVAL_MINUTES):
                # Flush the completed bar
                flush_bar(coin, reason="timer")

                # Advance to next window, carrying forward the last close
                next_start = state.window_start + timedelta(minutes=BAR_INTERVAL_MINUTES)
                last_close = state.close
                state.reset()
                state.window_start = next_start
                if last_close is not None:
                    state.open = state.high = state.low = state.close = last_close
                    state.volume = 0.0


def rollover_loop(stop_event: threading.Event):
    while not stop_event.is_set():
        try:
            rollover(datetime.utcnow())
        except Exception as exc:
            print(f"[rollover] Error: {exc}")
        stop_event.wait(ROLLOVER_CHECK_SECONDS)

The while loop (not if) handles the case where the streamer was down for multiple bar periods — it catches up by flushing each missed window in sequence, carrying the last known close forward.


Step 6: Stale Connection Detection

WebSocket connections can silently die. The server stops sending, but the TCP connection stays open. You'll sit there forever waiting for trades that will never come.

The fix: track when you last received any message, and force a reconnect if it's been too long.

STALE_MESSAGE_SECONDS = 90
last_message_ts = 0.0


def health_check_loop(stop_event, ws_worker):
    while not stop_event.is_set():
        now = time.time()
        if last_message_ts and now - last_message_ts > STALE_MESSAGE_SECONDS:
            print("[health] Feed stale, forcing reconnect")
            ws_worker.request_close()
            last_message_ts = now  # prevent repeated close calls
        stop_event.wait(1.0)

90 seconds is generous. BTC trades every few seconds. But for low-liquidity altcoins on weekends, you might go a minute without a print. Tune this based on your coin list.


Step 7: Backfilling Gaps on Startup

When the streamer starts (or restarts after a crash), you've missed bars. The REST API can fill those gaps:

import requests

API_URL = "https://api.hyperliquid.xyz/info"


def backfill_recent(coins: list[str], hours: int = 6):
    """Download recent candles from REST API and fill gaps in CSVs."""
    now_utc = datetime.utcnow()
    end_ms = int(now_utc.timestamp() * 1000)
    start_ms = end_ms - hours * 3600 * 1000

    for coin in coins:
        path = os.path.join(OUTPUT_DIR, f"{coin.lower()}_30m_data.csv")
        existing = _read_existing_timestamps(path)

        body = {
            "type": "candleSnapshot",
            "req": {
                "coin": coin,
                "interval": "30m",
                "startTime": start_ms,
                "endTime": end_ms,
            },
        }
        resp = requests.post(API_URL, json=body, timeout=20)
        resp.raise_for_status()
        candles = resp.json()

        inserted = 0
        for candle in candles:
            ts_ms = candle.get("t") or candle.get("time")
            dt = datetime.utcfromtimestamp(int(ts_ms) / 1000.0)
            dt_str = dt.strftime("%Y-%m-%d %H:%M:%S")

            # Skip current incomplete bar
            if dt >= floor_to_window(now_utc):
                continue
            # Skip already-written bars
            if dt_str in existing:
                continue

            row = [
                dt_str,
                float(candle["o"]),
                float(candle["h"]),
                float(candle["l"]),
                float(candle["c"]),
                float(candle["v"]),
            ]
            append_row_locked(path, row)
            existing.add(dt_str)
            inserted += 1

        print(f"[backfill] {coin}: +{inserted} bars")
        time.sleep(0.25)  # be nice to the API


def _read_existing_timestamps(path: str) -> set[str]:
    """Return set of Datetime strings already in the CSV."""
    timestamps = set()
    if not os.path.exists(path):
        return timestamps
    with open(path) as f:
        reader = csv.reader(f)
        next(reader, None)  # skip header
        for row in reader:
            if row:
                timestamps.add(row[0])
    return timestamps

Run this once at startup: backfill_recent(COINS, hours=6). It's idempotent — existing timestamps are skipped.


Step 8: Putting It All Together

import signal


def main():
    # Backfill any gaps from downtime
    backfill_recent(COINS, hours=6)

    stop_event = threading.Event()

    # Ensure CSV files exist with headers
    os.makedirs(OUTPUT_DIR, exist_ok=True)
    for coin in COINS:
        path = os.path.join(OUTPUT_DIR, f"{coin.lower()}_30m_data.csv")
        if not os.path.exists(path) or os.path.getsize(path) == 0:
            with open(path, "w", newline="") as f:
                csv.writer(f).writerow(CSV_HEADERS)

    # Start WebSocket
    ws_worker = HyperliquidWsWorker(stop_event)
    ws_worker.start()

    # Start rollover timer
    rollover_thread = threading.Thread(
        target=rollover_loop,
        args=(stop_event,),
        daemon=True,
    )
    rollover_thread.start()

    # Graceful shutdown
    def handle_signal(sig, _frame):
        print(f"[main] Signal {sig}, shutting down...")
        stop_event.set()
        ws_worker.request_close()

    signal.signal(signal.SIGINT, handle_signal)
    signal.signal(signal.SIGTERM, handle_signal)

    # Health monitor
    print("[main] Streamer running. Ctrl+C to stop.")
    while not stop_event.is_set():
        now = time.time()
        if last_message_ts and now - last_message_ts > STALE_MESSAGE_SECONDS:
            print("[main] Feed stale, forcing reconnect")
            ws_worker.request_close()
        stop_event.wait(1.0)

    # Cleanup
    stop_event.set()
    ws_worker.request_close()
    rollover_thread.join(timeout=2.0)
    print("[main] Bye.")


if __name__ == "__main__":
    main()

Run it:

python hyperliquid_streamer.py

You'll see output like:

[backfill] BTC: +12 bars
[backfill] ETH: +12 bars
[ws] Connected, subscribing...
[main] Streamer running. Ctrl+C to stop.
[bar] BTC 2024-12-05 14:30:00 O:97432.0 H:97891.0 L:97210.5 C:97654.0 V:142.31 (boundary)
[bar] ETH 2024-12-05 14:30:00 O:3421.5 H:3445.2 L:3418.0 C:3439.8 V:2891.4 (boundary)

Running as a Systemd Service

For production, you want this to survive reboots:

# /etc/systemd/system/hyperliquid-streamer.service

[Unit]
Description=Hyperliquid 30m Data Streamer
After=network-online.target
Wants=network-online.target

[Service]
Type=simple
User=trader
WorkingDirectory=/home/trader/streamer
ExecStart=/home/trader/miniconda3/envs/trading/bin/python hyperliquid_streamer.py
Restart=always
RestartSec=10
StandardOutput=journal
StandardError=journal

[Install]
WantedBy=multi-user.target
sudo systemctl enable hyperliquid-streamer
sudo systemctl start hyperliquid-streamer

# Check logs
journalctl -u hyperliquid-streamer -f

Bonus: Streaming Order Book Data

If you need Level 2 book data (for market making or fill probability models), subscribe to l2Book instead of trades:

def _on_open(self, ws):
    ws.send(json.dumps({
        "method": "subscribe",
        "subscription": {"type": "l2Book", "coin": "BTC"},
    }))

def _on_message(self, _ws, message: str):
    payload = json.loads(message)
    if payload.get("channel") != "l2Book":
        return

    data = payload["data"]
    levels = data.get("levels", [[], []])
    bids = levels[0]  # [{"px": "97430.0", "sz": "1.5"}, ...]
    asks = levels[1]  # [{"px": "97432.0", "sz": "0.8"}, ...]

    best_bid = float(bids[0]["px"]) if bids else None
    best_ask = float(asks[0]["px"]) if asks else None

    print(f"BTC bid={best_bid} ask={best_ask} spread={best_ask - best_bid:.1f}")

You can also subscribe to userFills for real-time fill notifications (requires your wallet address):

ws.send(json.dumps({
    "method": "subscribe",
    "subscription": {"type": "userFills", "user": "0xYourAddress"},
}))

Common Pitfalls

1. Forgetting thread safety. The WebSocket callback runs on its own thread. If your bar state isn't protected by a lock, you'll get corrupted OHLCV values. Use threading.RLock() around all bar reads and writes.

2. Not handling timestamp formats. Hyperliquid sometimes returns milliseconds, sometimes seconds. Always check: if ts > 1e12, divide by 1000.

3. Silent WebSocket death. A connection can go "zombie" — TCP stays open but no data flows. The stale message detector is not optional; it's essential.

4. Missing the rollover for illiquid coins. DOGE might trade every second. ASTER might go 20 minutes without a print. Without the timer-based rollover, your bars have gaps. With it, you get bars with volume=0 and OHLC=last_close, which is correct.

5. Not using os.fsync. Without it, your data can survive in OS buffers for seconds. If the machine loses power (or your OOM-killer strikes), you lose bars.


Summary

The complete data streamer is roughly 200 lines of Python. It:

  • Connects to Hyperliquid's public WebSocket

  • Subscribes to trade prints for any set of coins

  • Aggregates trades into time-aligned OHLCV bars

  • Writes completed bars to per-coin CSV files with file locking

  • Handles reconnection with exponential backoff and jitter

  • Detects stale connections and forces reconnects

  • Backfills gaps from the REST API on startup

  • Runs as a systemd service for production reliability

The output CSV files can be consumed by any downstream system — pandas, a live trading runner, a backtester — with nothing more than pd.read_csv().

The boring parts (file locks, fsync, stale connection detection, rollover timers) are what separate a streamer that works in a notebook from one that runs unattended for months.


Download the Code

User's avatar

Continue reading this post for free, courtesy of oracle.

Or purchase a paid subscription.
© 2026 Oracle · Privacy ∙ Terms ∙ Collection notice
Start your SubstackGet the app
Substack is the home for great culture