Real-Time Analysis

Streaming data processing with Polarway, DuckDB time-series storage, and interactive Streamlit dashboards.

Real-Time Data Pipeline

Data Flow Architecture

Exchange APIs
WebSocket Streams
β†’
Polarway
DataFrame Engine
β†’
DuckDB
Analytics DB
β†’
Streamlit
Live Dashboard

Core Components

Polarway

Streaming DataFrame Engine

  • Real-time data transformations
  • Lazy evaluation for efficiency
  • Built on Polars for speed
  • Memory-efficient streaming

DuckDB

Analytics Database

  • Optimized for time-series data
  • High ingestion rates (>1M rows/s)
  • SQL interface with Arrow integration
  • Low-latency queries

Streamlit

Interactive Dashboards

  • Real-time visualization updates
  • Python-native components
  • No JavaScript required
  • Rapid prototyping

Streaming with Polarway

Polarway extends Polars with streaming capabilities for real-time data processing:

import polars as pl

# Stream from WebSocket and process in real-time
stream = (
    pl.scan_ndjson("wss://stream.binance.com:9443/ws/btcusdt@trade")
    .with_columns([
        pl.col("price").cast(pl.Float64),
        pl.col("quantity").cast(pl.Float64),
        pl.col("timestamp").cast(pl.Datetime)
    ])
    .group_by_dynamic(
        index_column="timestamp",
        every="1s",  # 1-second windows
        period="5s"  # 5-second lookback
    )
    .agg([
        pl.col("price").mean().alias("vwap"),
        pl.col("quantity").sum().alias("volume"),
        pl.col("price").min().alias("low"),
        pl.col("price").max().alias("high")
    ])
)

# Write to DuckDB via Arrow
import duckdb
conn = duckdb.connect('data/trades.db')
conn.register('stream_df', stream.collect().to_arrow())
conn.execute("INSERT INTO btc_ohlcv SELECT * FROM stream_df")

DuckDB Integration

DuckDB provides columnar storage optimized for time-series queries:

-- Query recent arbitrage opportunities
SELECT 
    timestamp,
    exchange_a,
    exchange_b,
    asset,
    profit_pct,
    volume_usd
FROM arbitrage_signals
WHERE timestamp > current_timestamp - INTERVAL '5 minutes'
ORDER BY profit_pct DESC
LIMIT 100;

-- Real-time aggregation with time windows
SELECT 
    time_bucket(INTERVAL '1 second', timestamp) as bucket,
    avg(price) as avg_price,
    sum(volume) as total_volume
FROM trades
WHERE symbol = 'BTC-USD'
  AND timestamp > current_timestamp - INTERVAL '1 hour'
GROUP BY bucket
ORDER BY bucket DESC;

Streamlit Dashboard

Interactive dashboards update in real-time as new data arrives:

import streamlit as st
import polars as pl
import duckdb

st.set_page_config(page_title="HFThot Research Lab", layout="wide")

# Real-time data query
@st.cache_data(ttl=1)  # Cache for 1 second
def get_recent_trades():
    conn = duckdb.connect("data/trades.db", read_only=True)
    return conn.execute("""
        SELECT * FROM trades
        WHERE timestamp > current_timestamp - INTERVAL '1 minute'
        ORDER BY timestamp DESC
    """).pl()

# Auto-refresh every second
if 'last_update' not in st.session_state:
    st.session_state.last_update = 0

# Display live data
trades = get_recent_trades()
st.dataframe(trades, use_container_width=True)

# Live chart
st.line_chart(
    trades.to_pandas(),
    x="timestamp",
    y=["bid", "ask"],
    color=["#d4af37", "#f4d03f"]
)

WebSocket Streaming

Connect to exchange WebSocket feeds for ultra-low latency data:

use tokio_tungstenite::{connect_async, tungstenite::Message};
use futures_util::{StreamExt, SinkExt};

#[tokio::main]
async fn main() {
    let (ws_stream, _) = connect_async(
        "wss://stream.binance.com:9443/ws/btcusdt@depth"
    ).await.unwrap();
    
    let (mut write, mut read) = ws_stream.split();
    
    while let Some(msg) = read.next().await {
        match msg {
            Ok(Message::Text(text)) => {
                // Parse orderbook update
                let update: OrderBookUpdate = serde_json::from_str(&text)?;
                
                // Process in Rust (ultra-fast)
                detect_arbitrage(&update);
                
                // Sink to DuckDB via Arrow
                write_to_duckdb(&update).await?;
            }
            _ => {}
        }
    }
}

Performance Characteristics

Use Cases

1. Arbitrage Detection Dashboard

Monitor cross-exchange price discrepancies in real-time with automated alerts.

2. Market Microstructure Analysis

Analyze order flow, bid-ask spreads, and liquidity dynamics live.

3. Strategy Backtesting with Live Data

Test strategies on historical data while monitoring live performance side-by-side.

4. Risk Monitoring

Track portfolio exposure, PnL, and risk metrics updated every second.

Deployment

All components run in Docker containers for easy deployment:

docker-compose up -d

# Services:
# - Polarway data processor: port 50052
# - HFT gRPC server: port 50053
# - Streamlit dashboard: port 8501
# - Jupyter Lab: port 8888

# DuckDB runs embedded in-process (no separate server)
Retour Γ  l'accueil