Back to Documentation

Real-time Streaming Guide

Learn how to implement real-time currency rate updates using Server-Sent Events, WebSockets, and smart polling strategies.

What You'll Learn

  • • Implement Server-Sent Events for real-time updates
  • • Build WebSocket connections for bi-directional communication
  • • Create smart polling strategies with exponential backoff
  • • Handle connection management and reconnection logic
  • • Optimize performance and minimize API calls

Real-time Strategies Comparison

Server-Sent Events

  • ✅ Simple implementation
  • ✅ Automatic reconnection
  • ✅ HTTP/2 multiplexing
  • ❌ One-way communication
  • ❌ Limited browser support

WebSockets

  • ✅ Bi-directional communication
  • ✅ Low latency
  • ✅ Full browser support
  • ❌ Complex connection management
  • ❌ Firewall/proxy issues

Smart Polling

  • ✅ Universal compatibility
  • ✅ Simple to implement
  • ✅ Reliable
  • ❌ Higher latency
  • ❌ More API calls

Method 1: Server-Sent Events (SSE)

Server-Sent Events provide a simple way to stream real-time updates from server to client.

Backend Implementation (Node.js/Express)

server/sse-server.js
const express = require('express');
const cors = require('cors');
const fetch = require('node-fetch');

const app = express();
app.use(cors());

// Store active SSE connections
const connections = new Set();

// SSE endpoint for currency rate updates
app.get('/api/rates/stream', (req, res) => {
  // Set SSE headers
  res.writeHead(200, {
    'Content-Type': 'text/event-stream',
    'Cache-Control': 'no-cache',
    'Connection': 'keep-alive',
    'Access-Control-Allow-Origin': '*',
    'Access-Control-Allow-Headers': 'Cache-Control'
  });

  // Add connection to active connections
  connections.add(res);

  // Send initial data
  sendRateUpdate(res);

  // Handle client disconnect
  req.on('close', () => {
    connections.delete(res);
  });
});

async function sendRateUpdate(res) {
  try {
    // Fetch latest rates from FXRateSync API
    const response = await fetch('https://api.fxratesync.io/api/v1/latest?base=USD', {
      headers: {
        'X-API-Key': process.env.FXRATESYNC_API_KEY
      }
    });
    
    const data = await response.json();
    
    // Send formatted SSE message
    const sseData = `data: ${JSON.stringify({
      type: 'rate_update',
      timestamp: new Date().toISOString(),
      rates: data.rates
    })}\n\n`;
    
    res.write(sseData);
  } catch (error) {
    console.error('Failed to fetch rates:', error);
    
    // Send error message
    const errorData = `data: ${JSON.stringify({
      type: 'error',
      message: 'Failed to fetch rates'
    })}\n\n`;
    
    res.write(errorData);
  }
}

// Broadcast updates to all connected clients
function broadcastRateUpdates() {
  connections.forEach(res => {
    sendRateUpdate(res);
  });
}

// Update rates every 30 seconds
setInterval(broadcastRateUpdates, 30000);

app.listen(3001, () => {
  console.log('SSE server running on port 3001');
});

Frontend Implementation (React)

components/RealTimeRates.jsx
import React, { useState, useEffect, useRef } from 'react';

const RealTimeRates = () => {
  const [rates, setRates] = useState({});
  const [connectionStatus, setConnectionStatus] = useState('connecting');
  const [lastUpdate, setLastUpdate] = useState(null);
  const eventSourceRef = useRef(null);
  const reconnectTimeoutRef = useRef(null);

  useEffect(() => {
    connectToStream();
    
    return () => {
      if (eventSourceRef.current) {
        eventSourceRef.current.close();
      }
      if (reconnectTimeoutRef.current) {
        clearTimeout(reconnectTimeoutRef.current);
      }
    };
  }, []);

  const connectToStream = () => {
    try {
      // Create EventSource connection
      const eventSource = new EventSource('http://localhost:3001/api/rates/stream');
      eventSourceRef.current = eventSource;

      eventSource.onopen = () => {
        console.log('SSE connection established');
        setConnectionStatus('connected');
      };

      eventSource.onmessage = (event) => {
        try {
          const data = JSON.parse(event.data);
          
          if (data.type === 'rate_update') {
            setRates(data.rates);
            setLastUpdate(new Date(data.timestamp));
          } else if (data.type === 'error') {
            console.error('SSE error:', data.message);
            setConnectionStatus('error');
          }
        } catch (error) {
          console.error('Failed to parse SSE data:', error);
        }
      };

      eventSource.onerror = (error) => {
        console.error('SSE connection error:', error);
        setConnectionStatus('disconnected');
        
        // Automatic reconnection after 5 seconds
        reconnectTimeoutRef.current = setTimeout(() => {
          connectToStream();
        }, 5000);
      };

    } catch (error) {
      console.error('Failed to create SSE connection:', error);
      setConnectionStatus('error');
    }
  };

  const formatCurrency = (rate) => {
    return rate.toFixed(4);
  };

  const getStatusColor = () => {
    switch (connectionStatus) {
      case 'connected': return 'text-success';
      case 'connecting': return 'text-warning';
      case 'disconnected': return 'text-warning';
      case 'error': return 'text-destructive';
      default: return 'text-muted-foreground';
    }
  };

  return (
    <div className="bg-background rounded-lg shadow-md p-6">
      {/* Connection Status */}
      <div className="flex items-center justify-between mb-6">
        <h2 className="text-2xl font-bold text-foreground">Live Exchange Rates</h2>
        <div className="flex items-center space-x-2">
          <div className={`w-3 h-3 rounded-full ${
            connectionStatus === 'connected' ? 'bg-success animate-pulse' : 
            connectionStatus === 'connecting' ? 'bg-warning' :
            'bg-destructive'
          }`}></div>
          <span className={`text-sm font-medium ${getStatusColor()}`}>
            {connectionStatus.charAt(0).toUpperCase() + connectionStatus.slice(1)}
          </span>
        </div>
      </div>

      {/* Last Update Time */}
      {lastUpdate && (
        <div className="text-sm text-muted-foreground mb-4">
          Last updated: {lastUpdate.toLocaleTimeString()}
        </div>
      )}

      {/* Rates Grid */}
      <div className="grid grid-cols-2 md:grid-cols-3 lg:grid-cols-4 gap-4">
        {Object.entries(rates).slice(0, 12).map(([currency, rate]) => (
          <div 
            key={currency} 
            className="p-4 bg-surface rounded-lg border border-border hover:border-primary/50 transition-colors"
          >
            <div className="flex justify-between items-center">
              <span className="font-semibold text-foreground">{currency}</span>
              <span className="text-lg font-bold text-primary">
                {formatCurrency(rate)}
              </span>
            </div>
          </div>
        ))}
      </div>

      {/* Connection Instructions */}
      {connectionStatus === 'error' && (
        <div className="mt-6 p-4 bg-destructive/10 border border-destructive/30 rounded-lg">
          <p className="text-destructive-foreground">
            Connection failed. Please check your network connection and try refreshing the page.
          </p>
        </div>
      )}
    </div>
  );
};

export default RealTimeRates;

Method 2: WebSocket Implementation

WebSockets provide full-duplex communication for more interactive real-time features.

WebSocket Server (Node.js)

server/websocket-server.js
const WebSocket = require('ws');
const fetch = require('node-fetch');

const wss = new WebSocket.Server({ port: 8080 });

// Store active connections with their subscriptions
const connections = new Map();

wss.on('connection', (ws) => {
  console.log('New WebSocket connection');
  
  // Initialize connection data
  connections.set(ws, {
    subscriptions: new Set(['USD']), // Default to USD rates
    lastPing: Date.now()
  });

  // Send welcome message
  ws.send(JSON.stringify({
    type: 'welcome',
    message: 'Connected to FXRateSync WebSocket'
  }));

  // Handle incoming messages
  ws.on('message', async (data) => {
    try {
      const message = JSON.parse(data.toString());
      await handleMessage(ws, message);
    } catch (error) {
      console.error('Invalid message format:', error);
      ws.send(JSON.stringify({
        type: 'error',
        message: 'Invalid message format'
      }));
    }
  });

  // Handle connection close
  ws.on('close', () => {
    console.log('WebSocket connection closed');
    connections.delete(ws);
  });

  // Handle ping/pong for connection health
  ws.on('pong', () => {
    if (connections.has(ws)) {
      connections.get(ws).lastPing = Date.now();
    }
  });
});

async function handleMessage(ws, message) {
  const connectionData = connections.get(ws);
  
  switch (message.type) {
    case 'subscribe':
      // Subscribe to specific currency rates
      if (message.currencies && Array.isArray(message.currencies)) {
        connectionData.subscriptions.clear();
        message.currencies.forEach(currency => {
          connectionData.subscriptions.add(currency.toUpperCase());
        });
        
        ws.send(JSON.stringify({
          type: 'subscribed',
          currencies: Array.from(connectionData.subscriptions)
        }));
        
        // Send immediate update for subscribed currencies
        await sendRateUpdate(ws);
      }
      break;
      
    case 'get_rates':
      // Send current rates for subscribed currencies
      await sendRateUpdate(ws);
      break;
      
    case 'ping':
      // Respond to ping
      ws.send(JSON.stringify({ type: 'pong' }));
      break;
      
    default:
      ws.send(JSON.stringify({
        type: 'error',
        message: `Unknown message type: ${message.type}`
      }));
  }
}

async function sendRateUpdate(ws) {
  const connectionData = connections.get(ws);
  if (!connectionData) return;

  try {
    // Fetch rates for each subscribed base currency
    const ratePromises = Array.from(connectionData.subscriptions).map(async (base) => {
      const response = await fetch(`https://api.fxratesync.io/api/v1/latest?base=${base}`, {
        headers: {
          'X-API-Key': process.env.FXRATESYNC_API_KEY
        }
      });
      
      const data = await response.json();
      return { base, rates: data.rates };
    });

    const allRates = await Promise.all(ratePromises);
    
    ws.send(JSON.stringify({
      type: 'rate_update',
      timestamp: new Date().toISOString(),
      data: allRates
    }));
    
  } catch (error) {
    console.error('Failed to fetch rates:', error);
    ws.send(JSON.stringify({
      type: 'error',
      message: 'Failed to fetch rate updates'
    }));
  }
}

// Broadcast updates to all connected clients
async function broadcastUpdates() {
  const updatePromises = Array.from(connections.keys()).map(ws => {
    if (ws.readyState === WebSocket.OPEN) {
      return sendRateUpdate(ws);
    }
  });
  
  await Promise.all(updatePromises);
}

// Send updates every 30 seconds
setInterval(broadcastUpdates, 30000);

// Health check - ping connections every 60 seconds
setInterval(() => {
  connections.forEach((data, ws) => {
    if (ws.readyState === WebSocket.OPEN) {
      // Check if connection is stale
      if (Date.now() - data.lastPing > 120000) { // 2 minutes
        console.log('Closing stale connection');
        ws.terminate();
      } else {
        ws.ping();
      }
    }
  });
}, 60000);

console.log('WebSocket server running on port 8080');

WebSocket Client (React Hook)

hooks/useWebSocketRates.js
import { useState, useEffect, useRef, useCallback } from 'react';

const useWebSocketRates = (currencies = ['USD']) => {
  const [rates, setRates] = useState({});
  const [connectionStatus, setConnectionStatus] = useState('disconnecting');
  const [lastUpdate, setLastUpdate] = useState(null);
  const [error, setError] = useState(null);
  
  const wsRef = useRef(null);
  const reconnectTimeoutRef = useRef(null);
  const pingIntervalRef = useRef(null);

  const connect = useCallback(() => {
    try {
      const ws = new WebSocket('ws://localhost:8080');
      wsRef.current = ws;
      setConnectionStatus('connecting');

      ws.onopen = () => {
        console.log('WebSocket connected');
        setConnectionStatus('connected');
        setError(null);
        
        // Subscribe to currencies
        ws.send(JSON.stringify({
          type: 'subscribe',
          currencies: currencies
        }));
        
        // Set up ping interval
        pingIntervalRef.current = setInterval(() => {
          if (ws.readyState === WebSocket.OPEN) {
            ws.send(JSON.stringify({ type: 'ping' }));
          }
        }, 30000);
      };

      ws.onmessage = (event) => {
        try {
          const data = JSON.parse(event.data);
          
          switch (data.type) {
            case 'welcome':
              console.log('WebSocket welcome:', data.message);
              break;
              
            case 'subscribed':
              console.log('Subscribed to currencies:', data.currencies);
              break;
              
            case 'rate_update':
              setRates(data.data);
              setLastUpdate(new Date(data.timestamp));
              break;
              
            case 'error':
              console.error('WebSocket error:', data.message);
              setError(data.message);
              break;
              
            case 'pong':
              // Connection is alive
              break;
              
            default:
              console.log('Unknown message type:', data.type);
          }
        } catch (error) {
          console.error('Failed to parse WebSocket message:', error);
        }
      };

      ws.onerror = (error) => {
        console.error('WebSocket error:', error);
        setError('Connection error occurred');
      };

      ws.onclose = (event) => {
        console.log('WebSocket closed:', event.code, event.reason);
        setConnectionStatus('disconnected');
        
        // Clear ping interval
        if (pingIntervalRef.current) {
          clearInterval(pingIntervalRef.current);
          pingIntervalRef.current = null;
        }
        
        // Attempt to reconnect after delay
        if (!event.wasClean) {
          reconnectTimeoutRef.current = setTimeout(() => {
            connect();
          }, 5000);
        }
      };

    } catch (error) {
      console.error('Failed to create WebSocket connection:', error);
      setConnectionStatus('error');
      setError('Failed to establish connection');
    }
  }, [currencies]);

  const disconnect = useCallback(() => {
    if (wsRef.current) {
      wsRef.current.close(1000, 'User disconnected');
      wsRef.current = null;
    }
    
    if (reconnectTimeoutRef.current) {
      clearTimeout(reconnectTimeoutRef.current);
      reconnectTimeoutRef.current = null;
    }
    
    if (pingIntervalRef.current) {
      clearInterval(pingIntervalRef.current);
      pingIntervalRef.current = null;
    }
    
    setConnectionStatus('disconnected');
  }, []);

  const subscribeToCurrencies = useCallback((newCurrencies) => {
    if (wsRef.current && wsRef.current.readyState === WebSocket.OPEN) {
      wsRef.current.send(JSON.stringify({
        type: 'subscribe',
        currencies: newCurrencies
      }));
    }
  }, []);

  useEffect(() => {
    connect();
    
    return () => {
      disconnect();
    };
  }, [connect, disconnect]);

  return {
    rates,
    connectionStatus,
    lastUpdate,
    error,
    subscribeToCurrencies,
    reconnect: connect,
    disconnect
  };
};

export default useWebSocketRates;

Method 3: Smart Polling with Exponential Backoff

A robust polling implementation with intelligent retry logic and rate limiting awareness.

hooks/useSmartPolling.js
import { useState, useEffect, useRef, useCallback } from 'react';

const useSmartPolling = (apiEndpoint, options = {}) => {
  const {
    interval = 30000,        // Default 30 seconds
    maxRetries = 5,
    backoffMultiplier = 2,
    maxBackoffTime = 300000, // 5 minutes max
    enabled = true
  } = options;

  const [data, setData] = useState(null);
  const [loading, setLoading] = useState(false);
  const [error, setError] = useState(null);
  const [retryCount, setRetryCount] = useState(0);
  const [nextPollTime, setNextPollTime] = useState(null);

  const timeoutRef = useRef(null);
  const abortControllerRef = useRef(null);

  const calculateBackoffDelay = useCallback((retryAttempt) => {
    const baseDelay = interval;
    const exponentialDelay = baseDelay * Math.pow(backoffMultiplier, retryAttempt);
    return Math.min(exponentialDelay, maxBackoffTime);
  }, [interval, backoffMultiplier, maxBackoffTime]);

  const fetchData = useCallback(async () => {
    if (abortControllerRef.current) {
      abortControllerRef.current.abort();
    }

    abortControllerRef.current = new AbortController();
    setLoading(true);

    try {
      const response = await fetch(apiEndpoint, {
        signal: abortControllerRef.current.signal,
        headers: {
          'Content-Type': 'application/json',
        }
      });

      if (!response.ok) {
        // Handle different HTTP status codes
        switch (response.status) {
          case 429:
            // Rate limited - check retry-after header
            const retryAfter = response.headers.get('retry-after');
            const retryDelay = retryAfter ? parseInt(retryAfter) * 1000 : calculateBackoffDelay(retryCount);
            throw new Error(`Rate limited. Retry after ${retryDelay}ms`);
            
          case 401:
            throw new Error('Unauthorized - check your API key');
            
          case 403:
            throw new Error('Forbidden - insufficient permissions');
            
          case 500:
          case 502:
          case 503:
          case 504:
            throw new Error('Server error - will retry automatically');
            
          default:
            throw new Error(`HTTP ${response.status}: ${response.statusText}`);
        }
      }

      const result = await response.json();
      setData(result);
      setError(null);
      setRetryCount(0); // Reset retry count on success
      
      // Schedule next poll
      const nextPoll = Date.now() + interval;
      setNextPollTime(nextPoll);
      
      timeoutRef.current = setTimeout(fetchData, interval);

    } catch (err) {
      if (err.name === 'AbortError') {
        // Request was aborted, don't treat as error
        return;
      }

      console.error('Polling error:', err);
      setError(err.message);

      // Implement retry logic with exponential backoff
      if (retryCount < maxRetries) {
        const retryDelay = calculateBackoffDelay(retryCount);
        setRetryCount(prev => prev + 1);
        
        const nextRetry = Date.now() + retryDelay;
        setNextPollTime(nextRetry);
        
        timeoutRef.current = setTimeout(fetchData, retryDelay);
      } else {
        console.error('Max retries exceeded. Stopping polling.');
        setError('Max retries exceeded. Please refresh to try again.');
      }
    } finally {
      setLoading(false);
    }
  }, [apiEndpoint, interval, retryCount, maxRetries, calculateBackoffDelay]);

  const startPolling = useCallback(() => {
    if (enabled && !timeoutRef.current) {
      fetchData();
    }
  }, [enabled, fetchData]);

  const stopPolling = useCallback(() => {
    if (timeoutRef.current) {
      clearTimeout(timeoutRef.current);
      timeoutRef.current = null;
    }
    
    if (abortControllerRef.current) {
      abortControllerRef.current.abort();
      abortControllerRef.current = null;
    }
    
    setNextPollTime(null);
  }, []);

  const forceRefresh = useCallback(() => {
    stopPolling();
    setRetryCount(0);
    setError(null);
    startPolling();
  }, [stopPolling, startPolling]);

  useEffect(() => {
    if (enabled) {
      startPolling();
    } else {
      stopPolling();
    }

    return stopPolling;
  }, [enabled, startPolling, stopPolling]);

  // Cleanup on unmount
  useEffect(() => {
    return () => {
      stopPolling();
    };
  }, [stopPolling]);

  const getTimeUntilNextPoll = () => {
    if (!nextPollTime) return 0;
    return Math.max(0, nextPollTime - Date.now());
  };

  return {
    data,
    loading,
    error,
    retryCount,
    nextPollTime,
    timeUntilNextPoll: getTimeUntilNextPoll(),
    forceRefresh,
    startPolling,
    stopPolling
  };
};

export default useSmartPolling;

Using Smart Polling Component

components/PollingRates.jsx
import React from 'react';
import useSmartPolling from '../hooks/useSmartPolling';

const PollingRates = ({ baseCurrency = 'USD' }) => {
  const {
    data,
    loading,
    error,
    retryCount,
    timeUntilNextPoll,
    forceRefresh
  } = useSmartPolling(
    `/api/rates?base=${baseCurrency}`,
    {
      interval: 30000,     // Poll every 30 seconds
      maxRetries: 5,       // Max 5 retries
      backoffMultiplier: 2, // Double delay each retry
      enabled: true
    }
  );

  const formatTimeUntilNext = (ms) => {
    const seconds = Math.ceil(ms / 1000);
    return seconds > 0 ? `${seconds}s` : 'Now';
  };

  return (
    <div className="bg-background rounded-lg shadow-md p-6">
      {/* Header with status */}
      <div className="flex items-center justify-between mb-6">
        <h2 className="text-2xl font-bold text-foreground">
          {baseCurrency} Exchange Rates
        </h2>
        <div className="flex items-center space-x-4">
          {retryCount > 0 && (
            <span className="text-sm text-warning">
              Retry {retryCount}/5
            </span>
          )}
          <span className="text-sm text-muted-foreground">
            Next update: {formatTimeUntilNext(timeUntilNextPoll)}
          </span>
          <button
            onClick={forceRefresh}
            disabled={loading}
            className="px-3 py-1 bg-primary text-primary-foreground rounded text-sm hover:bg-primary/90 disabled:bg-muted"
          >
            {loading ? 'Updating...' : 'Refresh'}
          </button>
        </div>
      </div>

      {/* Error Display */}
      {error && (
        <div className="mb-6 p-4 bg-destructive/10 border border-destructive/30 rounded-lg">
          <p className="text-destructive-foreground">{error}</p>
          <button
            onClick={forceRefresh}
            className="mt-2 text-sm text-destructive hover:text-destructive-foreground underline"
          >
            Try Again
          </button>
        </div>
      )}

      {/* Loading Indicator */}
      {loading && !data && (
        <div className="flex items-center justify-center py-8">
          <div className="animate-spin rounded-full h-8 w-8 border-b-2 border-primary"></div>
          <span className="ml-2 text-muted-foreground">Loading rates...</span>
        </div>
      )}

      {/* Rates Display */}
      {data && data.rates && (
        <div className="grid grid-cols-2 md:grid-cols-3 lg:grid-cols-4 gap-4">
          {Object.entries(data.rates).slice(0, 12).map(([currency, rate]) => (
            <div 
              key={currency}
              className={`p-4 rounded-lg border transition-colors ${
                loading ? 'border-primary/30 bg-primary/10' : 'border-border bg-surface'
              }`}
            >
              <div className="flex justify-between items-center">
                <span className="font-semibold text-foreground">{currency}</span>
                <span className="text-lg font-bold text-primary">
                  {rate.toFixed(4)}
                </span>
              </div>
            </div>
          ))}
        </div>
      )}
    </div>
  );
};

export default PollingRates;

Best Practices

  • Connection Management: Always handle reconnection logic gracefully
  • Rate Limiting: Respect API rate limits and implement exponential backoff
  • Error Handling: Provide clear error messages and recovery options
  • Resource Cleanup: Always clean up connections and timers
  • User Feedback: Show connection status and next update times
  • Performance: Use debouncing and avoid unnecessary updates

Choosing the Right Method

Recommendations:

  • Server-Sent Events: Best for simple one-way data streams, minimal setup required
  • WebSockets: Use when you need bi-directional communication or very low latency
  • Smart Polling: Most reliable option, works everywhere, good for most use cases
  • Hybrid Approach: Use WebSockets with polling fallback for maximum compatibility

Build Real-time Currency Apps

Ready to implement real-time features in your currency application?