Streaming data processing with Polarway, DuckDB time-series storage, and interactive Streamlit dashboards.
Streaming DataFrame Engine
Analytics Database
Interactive Dashboards
Polarway extends Polars with streaming capabilities for real-time data processing:
import polars as pl
# Stream from WebSocket and process in real-time
stream = (
pl.scan_ndjson("wss://stream.binance.com:9443/ws/btcusdt@trade")
.with_columns([
pl.col("price").cast(pl.Float64),
pl.col("quantity").cast(pl.Float64),
pl.col("timestamp").cast(pl.Datetime)
])
.group_by_dynamic(
index_column="timestamp",
every="1s", # 1-second windows
period="5s" # 5-second lookback
)
.agg([
pl.col("price").mean().alias("vwap"),
pl.col("quantity").sum().alias("volume"),
pl.col("price").min().alias("low"),
pl.col("price").max().alias("high")
])
)
# Write to DuckDB via Arrow
import duckdb
conn = duckdb.connect('data/trades.db')
conn.register('stream_df', stream.collect().to_arrow())
conn.execute("INSERT INTO btc_ohlcv SELECT * FROM stream_df")
DuckDB provides columnar storage optimized for time-series queries:
-- Query recent arbitrage opportunities
SELECT
timestamp,
exchange_a,
exchange_b,
asset,
profit_pct,
volume_usd
FROM arbitrage_signals
WHERE timestamp > current_timestamp - INTERVAL '5 minutes'
ORDER BY profit_pct DESC
LIMIT 100;
-- Real-time aggregation with time windows
SELECT
time_bucket(INTERVAL '1 second', timestamp) as bucket,
avg(price) as avg_price,
sum(volume) as total_volume
FROM trades
WHERE symbol = 'BTC-USD'
AND timestamp > current_timestamp - INTERVAL '1 hour'
GROUP BY bucket
ORDER BY bucket DESC;
Interactive dashboards update in real-time as new data arrives:
import streamlit as st
import polars as pl
import duckdb
st.set_page_config(page_title="HFThot Research Lab", layout="wide")
# Real-time data query
@st.cache_data(ttl=1) # Cache for 1 second
def get_recent_trades():
conn = duckdb.connect("data/trades.db", read_only=True)
return conn.execute("""
SELECT * FROM trades
WHERE timestamp > current_timestamp - INTERVAL '1 minute'
ORDER BY timestamp DESC
""").pl()
# Auto-refresh every second
if 'last_update' not in st.session_state:
st.session_state.last_update = 0
# Display live data
trades = get_recent_trades()
st.dataframe(trades, use_container_width=True)
# Live chart
st.line_chart(
trades.to_pandas(),
x="timestamp",
y=["bid", "ask"],
color=["#d4af37", "#f4d03f"]
)
Connect to exchange WebSocket feeds for ultra-low latency data:
use tokio_tungstenite::{connect_async, tungstenite::Message};
use futures_util::{StreamExt, SinkExt};
#[tokio::main]
async fn main() {
let (ws_stream, _) = connect_async(
"wss://stream.binance.com:9443/ws/btcusdt@depth"
).await.unwrap();
let (mut write, mut read) = ws_stream.split();
while let Some(msg) = read.next().await {
match msg {
Ok(Message::Text(text)) => {
// Parse orderbook update
let update: OrderBookUpdate = serde_json::from_str(&text)?;
// Process in Rust (ultra-fast)
detect_arbitrage(&update);
// Sink to DuckDB via Arrow
write_to_duckdb(&update).await?;
}
_ => {}
}
}
}
Monitor cross-exchange price discrepancies in real-time with automated alerts.
Analyze order flow, bid-ask spreads, and liquidity dynamics live.
Test strategies on historical data while monitoring live performance side-by-side.
Track portfolio exposure, PnL, and risk metrics updated every second.
All components run in Docker containers for easy deployment:
docker-compose up -d
# Services:
# - Polarway data processor: port 50052
# - HFT gRPC server: port 50053
# - Streamlit dashboard: port 8501
# - Jupyter Lab: port 8888
# DuckDB runs embedded in-process (no separate server)
Retour Γ l'accueil