Back to Documentation
Real-time Streaming Guide
Learn how to implement real-time currency rate updates using Server-Sent Events, WebSockets, and smart polling strategies.
What You'll Learn
- • Implement Server-Sent Events for real-time updates
- • Build WebSocket connections for bi-directional communication
- • Create smart polling strategies with exponential backoff
- • Handle connection management and reconnection logic
- • Optimize performance and minimize API calls
Real-time Strategies Comparison
Server-Sent Events
- ✅ Simple implementation
- ✅ Automatic reconnection
- ✅ HTTP/2 multiplexing
- ❌ One-way communication
- ❌ Limited browser support
WebSockets
- ✅ Bi-directional communication
- ✅ Low latency
- ✅ Full browser support
- ❌ Complex connection management
- ❌ Firewall/proxy issues
Smart Polling
- ✅ Universal compatibility
- ✅ Simple to implement
- ✅ Reliable
- ❌ Higher latency
- ❌ More API calls
Method 1: Server-Sent Events (SSE)
Server-Sent Events provide a simple way to stream real-time updates from server to client.
Backend Implementation (Node.js/Express)
server/sse-server.js
const express = require('express'); const cors = require('cors'); const fetch = require('node-fetch'); const app = express(); app.use(cors()); // Store active SSE connections const connections = new Set(); // SSE endpoint for currency rate updates app.get('/api/rates/stream', (req, res) => { // Set SSE headers res.writeHead(200, { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', 'Connection': 'keep-alive', 'Access-Control-Allow-Origin': '*', 'Access-Control-Allow-Headers': 'Cache-Control' }); // Add connection to active connections connections.add(res); // Send initial data sendRateUpdate(res); // Handle client disconnect req.on('close', () => { connections.delete(res); }); }); async function sendRateUpdate(res) { try { // Fetch latest rates from FXRateSync API const response = await fetch('https://api.fxratesync.io/api/v1/latest?base=USD', { headers: { 'X-API-Key': process.env.FXRATESYNC_API_KEY } }); const data = await response.json(); // Send formatted SSE message const sseData = `data: ${JSON.stringify({ type: 'rate_update', timestamp: new Date().toISOString(), rates: data.rates })}\n\n`; res.write(sseData); } catch (error) { console.error('Failed to fetch rates:', error); // Send error message const errorData = `data: ${JSON.stringify({ type: 'error', message: 'Failed to fetch rates' })}\n\n`; res.write(errorData); } } // Broadcast updates to all connected clients function broadcastRateUpdates() { connections.forEach(res => { sendRateUpdate(res); }); } // Update rates every 30 seconds setInterval(broadcastRateUpdates, 30000); app.listen(3001, () => { console.log('SSE server running on port 3001'); });
Frontend Implementation (React)
components/RealTimeRates.jsx
import React, { useState, useEffect, useRef } from 'react'; const RealTimeRates = () => { const [rates, setRates] = useState({}); const [connectionStatus, setConnectionStatus] = useState('connecting'); const [lastUpdate, setLastUpdate] = useState(null); const eventSourceRef = useRef(null); const reconnectTimeoutRef = useRef(null); useEffect(() => { connectToStream(); return () => { if (eventSourceRef.current) { eventSourceRef.current.close(); } if (reconnectTimeoutRef.current) { clearTimeout(reconnectTimeoutRef.current); } }; }, []); const connectToStream = () => { try { // Create EventSource connection const eventSource = new EventSource('http://localhost:3001/api/rates/stream'); eventSourceRef.current = eventSource; eventSource.onopen = () => { console.log('SSE connection established'); setConnectionStatus('connected'); }; eventSource.onmessage = (event) => { try { const data = JSON.parse(event.data); if (data.type === 'rate_update') { setRates(data.rates); setLastUpdate(new Date(data.timestamp)); } else if (data.type === 'error') { console.error('SSE error:', data.message); setConnectionStatus('error'); } } catch (error) { console.error('Failed to parse SSE data:', error); } }; eventSource.onerror = (error) => { console.error('SSE connection error:', error); setConnectionStatus('disconnected'); // Automatic reconnection after 5 seconds reconnectTimeoutRef.current = setTimeout(() => { connectToStream(); }, 5000); }; } catch (error) { console.error('Failed to create SSE connection:', error); setConnectionStatus('error'); } }; const formatCurrency = (rate) => { return rate.toFixed(4); }; const getStatusColor = () => { switch (connectionStatus) { case 'connected': return 'text-success'; case 'connecting': return 'text-warning'; case 'disconnected': return 'text-warning'; case 'error': return 'text-destructive'; default: return 'text-muted-foreground'; } }; return ( <div className="bg-background rounded-lg shadow-md p-6"> {/* Connection Status */} <div className="flex items-center justify-between mb-6"> <h2 className="text-2xl font-bold text-foreground">Live Exchange Rates</h2> <div className="flex items-center space-x-2"> <div className={`w-3 h-3 rounded-full ${ connectionStatus === 'connected' ? 'bg-success animate-pulse' : connectionStatus === 'connecting' ? 'bg-warning' : 'bg-destructive' }`}></div> <span className={`text-sm font-medium ${getStatusColor()}`}> {connectionStatus.charAt(0).toUpperCase() + connectionStatus.slice(1)} </span> </div> </div> {/* Last Update Time */} {lastUpdate && ( <div className="text-sm text-muted-foreground mb-4"> Last updated: {lastUpdate.toLocaleTimeString()} </div> )} {/* Rates Grid */} <div className="grid grid-cols-2 md:grid-cols-3 lg:grid-cols-4 gap-4"> {Object.entries(rates).slice(0, 12).map(([currency, rate]) => ( <div key={currency} className="p-4 bg-surface rounded-lg border border-border hover:border-primary/50 transition-colors" > <div className="flex justify-between items-center"> <span className="font-semibold text-foreground">{currency}</span> <span className="text-lg font-bold text-primary"> {formatCurrency(rate)} </span> </div> </div> ))} </div> {/* Connection Instructions */} {connectionStatus === 'error' && ( <div className="mt-6 p-4 bg-destructive/10 border border-destructive/30 rounded-lg"> <p className="text-destructive-foreground"> Connection failed. Please check your network connection and try refreshing the page. </p> </div> )} </div> ); }; export default RealTimeRates;
Method 2: WebSocket Implementation
WebSockets provide full-duplex communication for more interactive real-time features.
WebSocket Server (Node.js)
server/websocket-server.js
const WebSocket = require('ws'); const fetch = require('node-fetch'); const wss = new WebSocket.Server({ port: 8080 }); // Store active connections with their subscriptions const connections = new Map(); wss.on('connection', (ws) => { console.log('New WebSocket connection'); // Initialize connection data connections.set(ws, { subscriptions: new Set(['USD']), // Default to USD rates lastPing: Date.now() }); // Send welcome message ws.send(JSON.stringify({ type: 'welcome', message: 'Connected to FXRateSync WebSocket' })); // Handle incoming messages ws.on('message', async (data) => { try { const message = JSON.parse(data.toString()); await handleMessage(ws, message); } catch (error) { console.error('Invalid message format:', error); ws.send(JSON.stringify({ type: 'error', message: 'Invalid message format' })); } }); // Handle connection close ws.on('close', () => { console.log('WebSocket connection closed'); connections.delete(ws); }); // Handle ping/pong for connection health ws.on('pong', () => { if (connections.has(ws)) { connections.get(ws).lastPing = Date.now(); } }); }); async function handleMessage(ws, message) { const connectionData = connections.get(ws); switch (message.type) { case 'subscribe': // Subscribe to specific currency rates if (message.currencies && Array.isArray(message.currencies)) { connectionData.subscriptions.clear(); message.currencies.forEach(currency => { connectionData.subscriptions.add(currency.toUpperCase()); }); ws.send(JSON.stringify({ type: 'subscribed', currencies: Array.from(connectionData.subscriptions) })); // Send immediate update for subscribed currencies await sendRateUpdate(ws); } break; case 'get_rates': // Send current rates for subscribed currencies await sendRateUpdate(ws); break; case 'ping': // Respond to ping ws.send(JSON.stringify({ type: 'pong' })); break; default: ws.send(JSON.stringify({ type: 'error', message: `Unknown message type: ${message.type}` })); } } async function sendRateUpdate(ws) { const connectionData = connections.get(ws); if (!connectionData) return; try { // Fetch rates for each subscribed base currency const ratePromises = Array.from(connectionData.subscriptions).map(async (base) => { const response = await fetch(`https://api.fxratesync.io/api/v1/latest?base=${base}`, { headers: { 'X-API-Key': process.env.FXRATESYNC_API_KEY } }); const data = await response.json(); return { base, rates: data.rates }; }); const allRates = await Promise.all(ratePromises); ws.send(JSON.stringify({ type: 'rate_update', timestamp: new Date().toISOString(), data: allRates })); } catch (error) { console.error('Failed to fetch rates:', error); ws.send(JSON.stringify({ type: 'error', message: 'Failed to fetch rate updates' })); } } // Broadcast updates to all connected clients async function broadcastUpdates() { const updatePromises = Array.from(connections.keys()).map(ws => { if (ws.readyState === WebSocket.OPEN) { return sendRateUpdate(ws); } }); await Promise.all(updatePromises); } // Send updates every 30 seconds setInterval(broadcastUpdates, 30000); // Health check - ping connections every 60 seconds setInterval(() => { connections.forEach((data, ws) => { if (ws.readyState === WebSocket.OPEN) { // Check if connection is stale if (Date.now() - data.lastPing > 120000) { // 2 minutes console.log('Closing stale connection'); ws.terminate(); } else { ws.ping(); } } }); }, 60000); console.log('WebSocket server running on port 8080');
WebSocket Client (React Hook)
hooks/useWebSocketRates.js
import { useState, useEffect, useRef, useCallback } from 'react'; const useWebSocketRates = (currencies = ['USD']) => { const [rates, setRates] = useState({}); const [connectionStatus, setConnectionStatus] = useState('disconnecting'); const [lastUpdate, setLastUpdate] = useState(null); const [error, setError] = useState(null); const wsRef = useRef(null); const reconnectTimeoutRef = useRef(null); const pingIntervalRef = useRef(null); const connect = useCallback(() => { try { const ws = new WebSocket('ws://localhost:8080'); wsRef.current = ws; setConnectionStatus('connecting'); ws.onopen = () => { console.log('WebSocket connected'); setConnectionStatus('connected'); setError(null); // Subscribe to currencies ws.send(JSON.stringify({ type: 'subscribe', currencies: currencies })); // Set up ping interval pingIntervalRef.current = setInterval(() => { if (ws.readyState === WebSocket.OPEN) { ws.send(JSON.stringify({ type: 'ping' })); } }, 30000); }; ws.onmessage = (event) => { try { const data = JSON.parse(event.data); switch (data.type) { case 'welcome': console.log('WebSocket welcome:', data.message); break; case 'subscribed': console.log('Subscribed to currencies:', data.currencies); break; case 'rate_update': setRates(data.data); setLastUpdate(new Date(data.timestamp)); break; case 'error': console.error('WebSocket error:', data.message); setError(data.message); break; case 'pong': // Connection is alive break; default: console.log('Unknown message type:', data.type); } } catch (error) { console.error('Failed to parse WebSocket message:', error); } }; ws.onerror = (error) => { console.error('WebSocket error:', error); setError('Connection error occurred'); }; ws.onclose = (event) => { console.log('WebSocket closed:', event.code, event.reason); setConnectionStatus('disconnected'); // Clear ping interval if (pingIntervalRef.current) { clearInterval(pingIntervalRef.current); pingIntervalRef.current = null; } // Attempt to reconnect after delay if (!event.wasClean) { reconnectTimeoutRef.current = setTimeout(() => { connect(); }, 5000); } }; } catch (error) { console.error('Failed to create WebSocket connection:', error); setConnectionStatus('error'); setError('Failed to establish connection'); } }, [currencies]); const disconnect = useCallback(() => { if (wsRef.current) { wsRef.current.close(1000, 'User disconnected'); wsRef.current = null; } if (reconnectTimeoutRef.current) { clearTimeout(reconnectTimeoutRef.current); reconnectTimeoutRef.current = null; } if (pingIntervalRef.current) { clearInterval(pingIntervalRef.current); pingIntervalRef.current = null; } setConnectionStatus('disconnected'); }, []); const subscribeToCurrencies = useCallback((newCurrencies) => { if (wsRef.current && wsRef.current.readyState === WebSocket.OPEN) { wsRef.current.send(JSON.stringify({ type: 'subscribe', currencies: newCurrencies })); } }, []); useEffect(() => { connect(); return () => { disconnect(); }; }, [connect, disconnect]); return { rates, connectionStatus, lastUpdate, error, subscribeToCurrencies, reconnect: connect, disconnect }; }; export default useWebSocketRates;
Method 3: Smart Polling with Exponential Backoff
A robust polling implementation with intelligent retry logic and rate limiting awareness.
hooks/useSmartPolling.js
import { useState, useEffect, useRef, useCallback } from 'react'; const useSmartPolling = (apiEndpoint, options = {}) => { const { interval = 30000, // Default 30 seconds maxRetries = 5, backoffMultiplier = 2, maxBackoffTime = 300000, // 5 minutes max enabled = true } = options; const [data, setData] = useState(null); const [loading, setLoading] = useState(false); const [error, setError] = useState(null); const [retryCount, setRetryCount] = useState(0); const [nextPollTime, setNextPollTime] = useState(null); const timeoutRef = useRef(null); const abortControllerRef = useRef(null); const calculateBackoffDelay = useCallback((retryAttempt) => { const baseDelay = interval; const exponentialDelay = baseDelay * Math.pow(backoffMultiplier, retryAttempt); return Math.min(exponentialDelay, maxBackoffTime); }, [interval, backoffMultiplier, maxBackoffTime]); const fetchData = useCallback(async () => { if (abortControllerRef.current) { abortControllerRef.current.abort(); } abortControllerRef.current = new AbortController(); setLoading(true); try { const response = await fetch(apiEndpoint, { signal: abortControllerRef.current.signal, headers: { 'Content-Type': 'application/json', } }); if (!response.ok) { // Handle different HTTP status codes switch (response.status) { case 429: // Rate limited - check retry-after header const retryAfter = response.headers.get('retry-after'); const retryDelay = retryAfter ? parseInt(retryAfter) * 1000 : calculateBackoffDelay(retryCount); throw new Error(`Rate limited. Retry after ${retryDelay}ms`); case 401: throw new Error('Unauthorized - check your API key'); case 403: throw new Error('Forbidden - insufficient permissions'); case 500: case 502: case 503: case 504: throw new Error('Server error - will retry automatically'); default: throw new Error(`HTTP ${response.status}: ${response.statusText}`); } } const result = await response.json(); setData(result); setError(null); setRetryCount(0); // Reset retry count on success // Schedule next poll const nextPoll = Date.now() + interval; setNextPollTime(nextPoll); timeoutRef.current = setTimeout(fetchData, interval); } catch (err) { if (err.name === 'AbortError') { // Request was aborted, don't treat as error return; } console.error('Polling error:', err); setError(err.message); // Implement retry logic with exponential backoff if (retryCount < maxRetries) { const retryDelay = calculateBackoffDelay(retryCount); setRetryCount(prev => prev + 1); const nextRetry = Date.now() + retryDelay; setNextPollTime(nextRetry); timeoutRef.current = setTimeout(fetchData, retryDelay); } else { console.error('Max retries exceeded. Stopping polling.'); setError('Max retries exceeded. Please refresh to try again.'); } } finally { setLoading(false); } }, [apiEndpoint, interval, retryCount, maxRetries, calculateBackoffDelay]); const startPolling = useCallback(() => { if (enabled && !timeoutRef.current) { fetchData(); } }, [enabled, fetchData]); const stopPolling = useCallback(() => { if (timeoutRef.current) { clearTimeout(timeoutRef.current); timeoutRef.current = null; } if (abortControllerRef.current) { abortControllerRef.current.abort(); abortControllerRef.current = null; } setNextPollTime(null); }, []); const forceRefresh = useCallback(() => { stopPolling(); setRetryCount(0); setError(null); startPolling(); }, [stopPolling, startPolling]); useEffect(() => { if (enabled) { startPolling(); } else { stopPolling(); } return stopPolling; }, [enabled, startPolling, stopPolling]); // Cleanup on unmount useEffect(() => { return () => { stopPolling(); }; }, [stopPolling]); const getTimeUntilNextPoll = () => { if (!nextPollTime) return 0; return Math.max(0, nextPollTime - Date.now()); }; return { data, loading, error, retryCount, nextPollTime, timeUntilNextPoll: getTimeUntilNextPoll(), forceRefresh, startPolling, stopPolling }; }; export default useSmartPolling;
Using Smart Polling Component
components/PollingRates.jsx
import React from 'react'; import useSmartPolling from '../hooks/useSmartPolling'; const PollingRates = ({ baseCurrency = 'USD' }) => { const { data, loading, error, retryCount, timeUntilNextPoll, forceRefresh } = useSmartPolling( `/api/rates?base=${baseCurrency}`, { interval: 30000, // Poll every 30 seconds maxRetries: 5, // Max 5 retries backoffMultiplier: 2, // Double delay each retry enabled: true } ); const formatTimeUntilNext = (ms) => { const seconds = Math.ceil(ms / 1000); return seconds > 0 ? `${seconds}s` : 'Now'; }; return ( <div className="bg-background rounded-lg shadow-md p-6"> {/* Header with status */} <div className="flex items-center justify-between mb-6"> <h2 className="text-2xl font-bold text-foreground"> {baseCurrency} Exchange Rates </h2> <div className="flex items-center space-x-4"> {retryCount > 0 && ( <span className="text-sm text-warning"> Retry {retryCount}/5 </span> )} <span className="text-sm text-muted-foreground"> Next update: {formatTimeUntilNext(timeUntilNextPoll)} </span> <button onClick={forceRefresh} disabled={loading} className="px-3 py-1 bg-primary text-primary-foreground rounded text-sm hover:bg-primary/90 disabled:bg-muted" > {loading ? 'Updating...' : 'Refresh'} </button> </div> </div> {/* Error Display */} {error && ( <div className="mb-6 p-4 bg-destructive/10 border border-destructive/30 rounded-lg"> <p className="text-destructive-foreground">{error}</p> <button onClick={forceRefresh} className="mt-2 text-sm text-destructive hover:text-destructive-foreground underline" > Try Again </button> </div> )} {/* Loading Indicator */} {loading && !data && ( <div className="flex items-center justify-center py-8"> <div className="animate-spin rounded-full h-8 w-8 border-b-2 border-primary"></div> <span className="ml-2 text-muted-foreground">Loading rates...</span> </div> )} {/* Rates Display */} {data && data.rates && ( <div className="grid grid-cols-2 md:grid-cols-3 lg:grid-cols-4 gap-4"> {Object.entries(data.rates).slice(0, 12).map(([currency, rate]) => ( <div key={currency} className={`p-4 rounded-lg border transition-colors ${ loading ? 'border-primary/30 bg-primary/10' : 'border-border bg-surface' }`} > <div className="flex justify-between items-center"> <span className="font-semibold text-foreground">{currency}</span> <span className="text-lg font-bold text-primary"> {rate.toFixed(4)} </span> </div> </div> ))} </div> )} </div> ); }; export default PollingRates;
Best Practices
- Connection Management: Always handle reconnection logic gracefully
- Rate Limiting: Respect API rate limits and implement exponential backoff
- Error Handling: Provide clear error messages and recovery options
- Resource Cleanup: Always clean up connections and timers
- User Feedback: Show connection status and next update times
- Performance: Use debouncing and avoid unnecessary updates
Choosing the Right Method
Recommendations:
- Server-Sent Events: Best for simple one-way data streams, minimal setup required
- WebSockets: Use when you need bi-directional communication or very low latency
- Smart Polling: Most reliable option, works everywhere, good for most use cases
- Hybrid Approach: Use WebSockets with polling fallback for maximum compatibility
Build Real-time Currency Apps
Ready to implement real-time features in your currency application?