Back to Documentation
Real-time Streaming Guide
Learn how to implement real-time currency rate updates using Server-Sent Events, WebSockets, and smart polling strategies.
What You'll Learn
- • Implement Server-Sent Events for real-time updates
- • Build WebSocket connections for bi-directional communication
- • Create smart polling strategies with exponential backoff
- • Handle connection management and reconnection logic
- • Optimize performance and reduce server load
Real-time Strategies Comparison
Server-Sent Events
- ✅ Simple HTTP-based protocol
- ✅ Automatic reconnection
- ✅ Built-in browser support
- ❌ One-way communication only
- ❌ Limited by HTTP connection limits
WebSockets
- ✅ Bi-directional communication
- ✅ Low latency
- ✅ Full-duplex connection
- ❌ More complex to implement
- ❌ Requires connection management
Smart Polling
- ✅ Universal compatibility
- ✅ Simple to implement
- ✅ Easy error handling
- ❌ Higher latency
- ❌ More server requests
Implementation Guide
1. Server-Sent Events (SSE)
Server-Sent Events provide a simple way to push real-time updates from your server to the client.
Backend Implementation (Node.js/Express)
server/routes/streaming.js
const express = require('express');
const router = express.Router();
// SSE endpoint for real-time rates
router.get('/rates/stream', (req, res) => {
// Set SSE headers
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'Cache-Control'
});
// Send initial data
const sendRates = () => {
const rates = getCurrentRates(); // Your rate fetching logic
res.write(`data: ${JSON.stringify(rates)}\n\n`);
};
// Send rates immediately
sendRates();
// Set up interval for updates
const interval = setInterval(sendRates, 5000); // Update every 5 seconds
// Clean up on client disconnect
req.on('close', () => {
clearInterval(interval);
res.end();
});
});
module.exports = router;Frontend Implementation (React)
components/RealTimeRates.jsx
import React, { useState, useEffect, useRef } from 'react';
const RealTimeRates = () => {
const [rates, setRates] = useState({});
const [connectionStatus, setConnectionStatus] = useState('connecting');
const [lastUpdate, setLastUpdate] = useState(null);
const eventSourceRef = useRef(null);
useEffect(() => {
const connectSSE = () => {
const eventSource = new EventSource('/api/rates/stream');
eventSourceRef.current = eventSource;
eventSource.onopen = () => {
setConnectionStatus('connected');
console.log('SSE connection opened');
};
eventSource.onmessage = (event) => {
try {
const newRates = JSON.parse(event.data);
setRates(newRates);
setLastUpdate(new Date());
} catch (error) {
console.error('Error parsing SSE data:', error);
}
};
eventSource.onerror = (error) => {
console.error('SSE error:', error);
setConnectionStatus('error');
// Reconnect after 3 seconds
setTimeout(() => {
if (eventSource.readyState === EventSource.CLOSED) {
connectSSE();
}
}, 3000);
};
};
connectSSE();
return () => {
if (eventSourceRef.current) {
eventSourceRef.current.close();
}
};
}, []);
return (
<div className="p-6 bg-white rounded-lg shadow">
<div className="flex items-center justify-between mb-4">
<h3 className="text-lg font-semibold">Live Exchange Rates</h3>
<div className="flex items-center">
<div className={`w-2 h-2 rounded-full mr-2 ${
connectionStatus === 'connected' ? 'bg-green-500' :
connectionStatus === 'error' ? 'bg-red-500' : 'bg-yellow-500'
}`}></div>
<span className="text-sm text-gray-600">{connectionStatus}</span>
</div>
</div>
<div className="grid grid-cols-2 md:grid-cols-4 gap-4">
{Object.entries(rates).map(([currency, rate]) => (
<div key={currency} className="p-3 bg-gray-50 rounded">
<div className="text-sm font-medium text-gray-600">{currency}</div>
<div className="text-lg font-bold">{rate}</div>
</div>
))}
</div>
{lastUpdate && (
<div className="mt-4 text-xs text-gray-500">
Last updated: {lastUpdate.toLocaleTimeString()}
</div>
)}
</div>
);
};
export default RealTimeRates;2. WebSocket Implementation
WebSockets provide full-duplex communication, allowing both client and server to send messages at any time.
Backend Implementation (Node.js/Socket.io)
server/websocket.js
const socketIo = require('socket.io');
const setupWebSocket = (server) => {
const io = socketIo(server, {
cors: {
origin: process.env.CLIENT_URL || "http://localhost:3000",
methods: ["GET", "POST"]
}
});
io.on('connection', (socket) => {
console.log('Client connected:', socket.id);
// Send initial rates
socket.emit('rates-update', getCurrentRates());
// Handle subscription to specific currencies
socket.on('subscribe-currencies', (currencies) => {
socket.join(`currencies-${currencies.join('-')}`);
console.log(`Client ${socket.id} subscribed to:, currencies`);
});
// Handle client requests for rate updates
socket.on('request-rates', (currencies) => {
const rates = getCurrentRates(currencies);
socket.emit('rates-update', rates);
});
socket.on('disconnect', () => {
console.log('Client disconnected:', socket.id);
});
});
// Broadcast rate updates every 5 seconds
setInterval(() => {
const rates = getCurrentRates();
io.emit('rates-update', rates);
}, 5000);
return io;
};
module.exports = setupWebSocket;Frontend Implementation (React + Socket.io)
components/WebSocketRates.jsx
import React, { useState, useEffect, useRef } from 'react';
import io from 'socket.io-client';
const WebSocketRates = () => {
const [rates, setRates] = useState({});
const [connectionStatus, setConnectionStatus] = useState('connecting');
const [subscribedCurrencies, setSubscribedCurrencies] = useState(['USD', 'EUR', 'GBP', 'JPY']);
const socketRef = useRef(null);
useEffect(() => {
// Initialize socket connection
socketRef.current = io(process.env.NEXT_PUBLIC_WS_URL || 'http://localhost:3001');
const socket = socketRef.current;
socket.on('connect', () => {
setConnectionStatus('connected');
console.log('WebSocket connected');
// Subscribe to currencies
socket.emit('subscribe-currencies', subscribedCurrencies);
});
socket.on('rates-update', (newRates) => {
setRates(newRates);
});
socket.on('disconnect', () => {
setConnectionStatus('disconnected');
console.log('WebSocket disconnected');
});
socket.on('connect_error', (error) => {
setConnectionStatus('error');
console.error('WebSocket connection error:', error);
});
return () => {
if (socket) {
socket.disconnect();
}
};
}, [subscribedCurrencies]);
const requestRateUpdate = () => {
if (socketRef.current && socketRef.current.connected) {
socketRef.current.emit('request-rates', subscribedCurrencies);
}
};
return (
<div className="p-6 bg-white rounded-lg shadow">
<div className="flex items-center justify-between mb-4">
<h3 className="text-lg font-semibold">WebSocket Live Rates</h3>
<div className="flex items-center space-x-2">
<button
onClick={requestRateUpdate}
className="px-3 py-1 bg-blue-500 text-white rounded text-sm"
disabled={connectionStatus !== 'connected'}
>
Refresh
</button>
<div className="flex items-center">
<div className={`w-2 h-2 rounded-full mr-2 ${
connectionStatus === 'connected' ? 'bg-green-500' :
connectionStatus === 'error' ? 'bg-red-500' : 'bg-yellow-500'
}`}></div>
<span className="text-sm text-gray-600">{connectionStatus}</span>
</div>
</div>
</div>
<div className="grid grid-cols-2 md:grid-cols-4 gap-4">
{Object.entries(rates).map(([currency, rate]) => (
<div key={currency} className="p-3 bg-gray-50 rounded">
<div className="text-sm font-medium text-gray-600">{currency}</div>
<div className="text-lg font-bold">{rate}</div>
</div>
))}
</div>
</div>
);
};
export default WebSocketRates;3. Smart Polling Strategy
Implement intelligent polling with exponential backoff and adaptive intervals based on market activity.
hooks/useSmartPolling.js
import { useState, useEffect, useRef, useCallback } from 'react';
const useSmartPolling = (fetchFunction, options = {}) => {
const {
initialInterval = 5000,
maxInterval = 60000,
backoffMultiplier = 1.5,
maxRetries = 5,
adaptiveThreshold = 0.01
} = options;
const [data, setData] = useState(null);
const [isLoading, setIsLoading] = useState(false);
const [error, setError] = useState(null);
const [interval, setInterval] = useState(initialInterval);
const timeoutRef = useRef(null);
const retryCountRef = useRef(0);
const lastDataRef = useRef(null);
const poll = useCallback(async () => {
setIsLoading(true);
setError(null);
try {
const newData = await fetchFunction();
// Check if data has changed significantly
const hasSignificantChange = lastDataRef.current ?
hasDataChanged(lastDataRef.current, newData, adaptiveThreshold) : true;
setData(newData);
lastDataRef.current = newData;
retryCountRef.current = 0;
// Adaptive interval based on data changes
if (hasSignificantChange) {
// Data is changing, poll more frequently
setInterval(Math.max(initialInterval, interval * 0.8));
} else {
// Data is stable, poll less frequently
setInterval(Math.min(maxInterval, interval * 1.2));
}
} catch (err) {
setError(err);
retryCountRef.current += 1;
// Exponential backoff on errors
if (retryCountRef.current < maxRetries) {
const backoffInterval = interval * Math.pow(backoffMultiplier, retryCountRef.current);
setInterval(Math.min(maxInterval, backoffInterval));
}
} finally {
setIsLoading(false);
}
}, [fetchFunction, interval, initialInterval, maxInterval, backoffMultiplier, maxRetries, adaptiveThreshold]);
useEffect(() => {
const scheduleNextPoll = () => {
timeoutRef.current = setTimeout(() => {
poll().then(scheduleNextPoll);
}, interval);
};
// Initial poll
poll().then(scheduleNextPoll);
return () => {
if (timeoutRef.current) {
clearTimeout(timeoutRef.current);
}
};
}, [poll, interval]);
return { data, isLoading, error, interval };
};
// Helper function to detect significant data changes
const hasDataChanged = (oldData, newData, threshold) => {
if (!oldData || !newData) return true;
for (const [currency, newRate] of Object.entries(newData)) {
const oldRate = oldData[currency];
if (!oldRate) return true;
const changePercent = Math.abs((newRate - oldRate) / oldRate);
if (changePercent > threshold) return true;
}
return false;
};
export default useSmartPolling;Best Practices
Performance Tips
- Use connection pooling for WebSocket connections
- Implement proper error handling and reconnection logic
- Use compression for data transmission
- Implement rate limiting to prevent abuse
- Monitor connection health and performance metrics
- Use adaptive polling intervals based on market volatility