CrackedRuby CrackedRuby

Overview

Server-Sent Events is a web standard that allows servers to push updates to clients over a single, long-lived HTTP connection. The protocol defines a simple text-based message format transmitted over HTTP with the content type text/event-stream. Clients connect using the EventSource API in browsers, which automatically handles connection management, reconnection logic, and message parsing.

SSE operates on standard HTTP infrastructure, passing through proxies, firewalls, and load balancers without special configuration. The server maintains an open connection and writes data chunks as events occur. Unlike polling, where clients repeatedly request updates, SSE maintains a persistent connection that remains open until explicitly closed by either party.

The protocol emerged as a simpler alternative to WebSockets for scenarios requiring only server-to-client communication. The W3C standardized SSE as part of HTML5, with widespread browser support since 2011. Modern applications use SSE for notifications, live dashboards, real-time analytics, progress indicators, and activity feeds.

# Basic SSE endpoint structure
def stream
  response.headers['Content-Type'] = 'text/event-stream'
  response.headers['Cache-Control'] = 'no-cache'
  response.headers['X-Accel-Buffering'] = 'no'
  
  sse = SSE.new(response.stream)
  
  begin
    5.times do |n|
      sse.write({ message: "Update #{n}" })
      sleep 1
    end
  ensure
    sse.close
  end
end

SSE differs from WebSockets in protocol complexity and directionality. WebSockets require a protocol upgrade handshake and support bidirectional communication, while SSE uses standard HTTP and flows data only from server to client. This simplicity makes SSE easier to implement, debug, and deploy across existing infrastructure.

Key Principles

The SSE protocol defines a text-based format where each message consists of one or more fields. Each field begins with a field name, followed by a colon, optional space, and the field value. Messages terminate with a blank line (two consecutive newlines). The protocol recognizes four field types: data, event, id, and retry.

The data field contains the message payload. Multiple data fields concatenate together with newlines, allowing multiline messages. The protocol transmits data as UTF-8 text, typically JSON for structured information. Each data field adds content to the current message buffer.

# Message format structure
"data: Simple message\n\n"

"data: First line\ndata: Second line\n\n"

"event: userLoggedIn\ndata: {\"userId\": 123}\nid: 456\n\n"

The event field specifies a custom event type. Without an event field, messages arrive as generic "message" events. Custom event types allow clients to register different handlers for different message categories. The client can listen for specific events or catch all messages with the default handler.

The id field sets the last event ID, which the browser stores and sends in the Last-Event-ID header when reconnecting. This mechanism enables the server to resume streaming from the correct position after connection interruptions. The ID remains constant until explicitly changed by a new id field.

# Event ID for resumable connections
def stream
  last_id = request.headers['Last-Event-ID'].to_i
  
  Event.where('id > ?', last_id).each do |event|
    sse.write(event.data, event: event.type, id: event.id)
  end
end

The retry field instructs the browser how many milliseconds to wait before attempting reconnection. The default reconnection delay is 3000ms. Servers can adjust this based on expected recovery time or server load patterns.

Connection lifecycle management occurs automatically. The browser's EventSource object handles connection establishment, message parsing, automatic reconnection on failure, and cleanup on explicit closure. When the connection drops, the browser waits the specified retry interval and reconnects, sending the Last-Event-ID header to resume from the correct position.

Comments in the event stream begin with a colon and extend to the line end. Servers use comments as keepalive signals to prevent intermediary timeouts. Most proxies and load balancers close idle connections after 30-60 seconds, so servers send periodic comments to maintain the connection.

# Keepalive with comments
loop do
  if data_available?
    sse.write(data)
  else
    sse.write(": keepalive\n")
  end
  sleep 15
end

Ruby Implementation

Ruby implements SSE through Rack's streaming interface, available in frameworks like Rails and Sinatra. The response stream object provides write and close methods for sending events. Rails wraps this functionality in ActionController::Live, which enables streaming responses in controller actions.

class EventsController < ApplicationController
  include ActionController::Live
  
  def stream
    response.headers['Content-Type'] = 'text/event-stream'
    response.headers['Cache-Control'] = 'no-cache'
    response.headers['Connection'] = 'keep-alive'
    response.headers['X-Accel-Buffering'] = 'no'
    
    begin
      loop do
        message = fetch_next_message
        response.stream.write("data: #{message.to_json}\n\n")
        sleep 1
      end
    rescue IOError
      # Client disconnected
    ensure
      response.stream.close
    end
  end
end

The X-Accel-Buffering header disables buffering in Nginx, which otherwise buffers responses before sending to clients. This header has no effect in other web servers but causes no harm. Setting Cache-Control prevents proxy caching of the event stream.

Rails requires a concurrent web server to handle SSE effectively. Puma and Unicorn in threaded mode support concurrent requests, while traditional forking servers like Passenger block other requests when streaming. Each SSE connection occupies a thread or process for its duration, limiting concurrent connections based on server configuration.

# config/puma.rb
workers 2
threads 5, 32  # Increase max threads for SSE connections

preload_app!

The rack-stream gem provides a cleaner interface for SSE responses, handling message formatting and connection management. The gem wraps Rack's streaming interface with methods for writing events, comments, and managing the connection lifecycle.

require 'rack/stream'

class EventsController < ApplicationController
  include ActionController::Live
  
  def stream
    response.headers.merge!({
      'Content-Type' => 'text/event-stream',
      'Cache-Control' => 'no-cache',
      'X-Accel-Buffering' => 'no'
    })
    
    stream = Rack::Stream.new(response.stream)
    
    begin
      Event.on_create do |event|
        stream.write(event.data, event: event.name, id: event.id)
      end
      
      stream.keepalive(interval: 15)
    ensure
      stream.close
    end
  end
end

Thread safety becomes critical in streaming responses. Instance variables in controllers can cause race conditions when multiple requests access the same controller instance. Use local variables or thread-local storage for request-specific data.

class NotificationController < ApplicationController
  include ActionController::Live
  
  def stream
    # Thread-safe: using local variable
    user_id = current_user.id
    
    response.headers['Content-Type'] = 'text/event-stream'
    
    begin
      Notification.where(user_id: user_id).on_create do |notification|
        response.stream.write("data: #{notification.to_json}\n\n")
      end
    ensure
      response.stream.close
    end
  end
end

Redis pub/sub integrates naturally with SSE for distributing events across application servers. Each SSE connection subscribes to relevant Redis channels, receiving events from any application instance. This pattern enables horizontal scaling of SSE endpoints.

class ActivityStreamController < ApplicationController
  include ActionController::Live
  
  def stream
    response.headers['Content-Type'] = 'text/event-stream'
    
    redis = Redis.new
    
    begin
      redis.subscribe("activity:#{current_user.id}") do |on|
        on.message do |channel, message|
          response.stream.write("data: #{message}\n\n")
        end
      end
    ensure
      redis.quit
      response.stream.close
    end
  end
end

The redis-rb subscribe method blocks indefinitely, maintaining the SSE connection until the client disconnects or an error occurs. When the client disconnects, Ruby raises an IOError that the rescue clause catches, triggering cleanup.

ActionCable, Rails' WebSocket framework, can also broadcast to SSE connections through adapters. However, this adds unnecessary complexity when WebSocket functionality is not required. Direct Redis integration provides better performance and simpler code for SSE-only scenarios.

Design Considerations

SSE excels at scenarios requiring unidirectional server-to-client communication. Applications that only push updates from server to client gain SSE's simplicity without WebSocket's protocol overhead. The decision between SSE and alternatives depends on communication patterns, browser requirements, infrastructure constraints, and operational complexity.

WebSockets provide bidirectional communication through a single persistent connection. Applications requiring client-to-server real-time messaging need WebSockets, as SSE only supports server-to-client flow. Chat applications, collaborative editing, and gaming require WebSocket's full-duplex communication. However, many real-time features like notifications, dashboards, and feeds only need server-to-client updates, making SSE sufficient.

# SSE appropriate: server pushes updates
class DashboardController < ApplicationController
  def metrics_stream
    # Server sends metrics, client only displays
    sse.write({ cpu: 45, memory: 2.1, requests: 830 })
  end
end

# WebSocket necessary: bidirectional chat
class ChatChannel < ApplicationCable::Channel
  def receive(data)
    # Client sends messages, server broadcasts
    ActionCable.server.broadcast("chat", data)
  end
end

Long polling works on older browsers without SSE support but creates more overhead. Each poll requires a complete HTTP request-response cycle with headers and connection establishment. SSE maintains one connection and sends multiple messages over it. Long polling generates more server load, network traffic, and adds latency between updates.

Polling intervals create trade-offs between latency and resource usage. Frequent polling reduces update latency but increases server load and bandwidth consumption. SSE delivers updates immediately without polling overhead. Applications with high update frequency or low latency requirements benefit significantly from SSE over polling.

Infrastructure compatibility favors SSE over WebSockets in certain environments. Proxies, load balancers, and firewalls handle HTTP traffic without special configuration, while WebSocket protocol upgrades often require specific support. Corporate networks frequently block non-HTTP protocols but allow standard HTTP. SSE passes through these restrictions transparently.

# SSE works through HTTP proxies
location /events {
  proxy_pass http://app_server;
  proxy_set_header Connection '';
  proxy_http_version 1.1;
  proxy_buffering off;
  proxy_cache off;
}

# WebSocket requires upgrade support
location /cable {
  proxy_pass http://app_server;
  proxy_http_version 1.1;
  proxy_set_header Upgrade $http_upgrade;
  proxy_set_header Connection "upgrade";
}

Browser support differs between SSE and WebSockets. Internet Explorer never implemented SSE, though Edge supports it. Polyfills exist for older browsers, typically falling back to long polling. WebSocket support is broader but requires more complex implementation. Mobile browsers on iOS and Android support both, with SSE having better battery efficiency for receive-only scenarios.

Connection limits affect architecture decisions. Browsers limit concurrent HTTP connections per domain (typically 6-8), and SSE connections count toward this limit. Applications with many concurrent SSE connections per page may exhaust the connection pool, blocking other HTTP requests. WebSockets don't count toward HTTP connection limits, allowing more concurrent connections.

Automatic reconnection distinguishes SSE from polling implementations. The EventSource API handles reconnection automatically with exponential backoff, reducing client-side code complexity. Custom polling or WebSocket implementations must handle reconnection logic explicitly, including backoff strategies and connection state management.

State recovery through event IDs enables resumable streams. When connections drop and reconnect, the server can skip already-delivered events using the Last-Event-ID header. This mechanism reduces duplicate processing and ensures clients receive complete event sequences. Polling and WebSocket implementations must build equivalent functionality.

Practical Examples

Real-time notification delivery demonstrates SSE's primary use case. Applications stream notifications to connected users as events occur, displaying alerts, badges, or inbox updates without page refreshes or polling.

# Notification streaming endpoint
class NotificationsController < ApplicationController
  include ActionController::Live
  
  def stream
    response.headers['Content-Type'] = 'text/event-stream'
    response.headers['Cache-Control'] = 'no-cache'
    response.headers['X-Accel-Buffering'] = 'no'
    
    redis = Redis.new
    user_id = current_user.id
    
    begin
      redis.subscribe("notifications:#{user_id}") do |on|
        on.message do |channel, json|
          notification = JSON.parse(json)
          
          response.stream.write(
            "event: notification\n" \
            "data: #{json}\n" \
            "id: #{notification['id']}\n\n"
          )
        end
      end
    rescue IOError, Redis::ConnectionError
      # Client disconnected or Redis error
    ensure
      redis.quit
      response.stream.close
    end
  end
end

# Publishing notifications
class NotificationService
  def self.publish(user_id, notification)
    redis = Redis.new
    json = notification.to_json
    
    redis.publish("notifications:#{user_id}", json)
    redis.quit
  end
end

# Client-side handler
# const events = new EventSource('/notifications/stream');
# events.addEventListener('notification', (e) => {
#   const notification = JSON.parse(e.data);
#   displayNotification(notification);
# });

Progress tracking for long-running operations keeps users informed during asynchronous processing. Background jobs publish progress updates through SSE as they execute, providing real-time feedback on uploads, exports, data migrations, or report generation.

# Progress tracking controller
class ProgressController < ApplicationController
  include ActionController::Live
  
  def track
    response.headers['Content-Type'] = 'text/event-stream'
    
    job_id = params[:job_id]
    redis = Redis.new
    
    begin
      redis.subscribe("progress:#{job_id}") do |on|
        on.message do |channel, message|
          data = JSON.parse(message)
          
          response.stream.write("data: #{message}\n\n")
          
          # Close stream when job completes
          break if data['status'] == 'completed' || data['status'] == 'failed'
        end
      end
    ensure
      redis.quit
      response.stream.close
    end
  end
end

# Background job publishing progress
class DataExportJob
  def perform(export_id)
    redis = Redis.new
    channel = "progress:export-#{export_id}"
    
    total_records = Record.count
    
    Record.find_each.with_index do |record, index|
      process_record(record)
      
      if index % 100 == 0
        progress = ((index.to_f / total_records) * 100).round(1)
        
        redis.publish(channel, {
          status: 'processing',
          progress: progress,
          processed: index,
          total: total_records
        }.to_json)
      end
    end
    
    redis.publish(channel, {
      status: 'completed',
      progress: 100,
      processed: total_records,
      total: total_records
    }.to_json)
    
    redis.quit
  end
end

Live dashboards stream metrics and analytics to connected clients. Monitoring systems, admin panels, and analytics dashboards display real-time data without polling. This pattern reduces database load while keeping dashboards current.

# Metrics streaming for dashboard
class MetricsController < ApplicationController
  include ActionController::Live
  
  def stream
    response.headers['Content-Type'] = 'text/event-stream'
    
    begin
      loop do
        metrics = {
          timestamp: Time.now.to_i,
          active_users: User.active.count,
          requests_per_minute: RequestCounter.current_rate,
          response_time: ResponseTime.average,
          error_rate: ErrorTracker.rate
        }
        
        response.stream.write("data: #{metrics.to_json}\n\n")
        
        sleep 5
      end
    rescue IOError
      # Client disconnected
    ensure
      response.stream.close
    end
  end
end

Activity feeds broadcast user actions to followers in real-time. Social networks, collaboration tools, and community platforms stream updates about followed users, projects, or topics. The server publishes actions to relevant SSE connections immediately.

# Activity feed streaming
class ActivityController < ApplicationController
  include ActionController::Live
  
  def feed
    response.headers['Content-Type'] = 'text/event-stream'
    
    redis = Redis.new
    channels = current_user.following_ids.map { |id| "activity:user:#{id}" }
    channels << "activity:user:#{current_user.id}"  # Own activity
    
    begin
      redis.subscribe(*channels) do |on|
        on.message do |channel, message|
          activity = JSON.parse(message)
          
          response.stream.write(
            "event: activity\n" \
            "data: #{message}\n" \
            "id: #{activity['id']}\n\n"
          )
        end
      end
    ensure
      redis.quit
      response.stream.close
    end
  end
end

# Publishing activity
class Activity < ApplicationRecord
  after_create :broadcast
  
  def broadcast
    redis = Redis.new
    
    user.followers.find_each do |follower|
      redis.publish(
        "activity:user:#{follower.id}",
        to_json
      )
    end
    
    redis.quit
  end
end

Stock tickers and real-time pricing demonstrate SSE for financial data. Trading platforms, price comparison sites, and auction systems stream price updates to connected clients as market data changes.

# Price update streaming
class PricesController < ApplicationController
  include ActionController::Live
  
  def stream
    response.headers['Content-Type'] = 'text/event-stream'
    
    symbols = params[:symbols].split(',')
    redis = Redis.new
    channels = symbols.map { |s| "prices:#{s}" }
    
    begin
      # Send initial prices
      symbols.each do |symbol|
        price = Price.current(symbol)
        response.stream.write("data: #{price.to_json}\n\n")
      end
      
      # Stream updates
      redis.subscribe(*channels) do |on|
        on.message do |channel, message|
          response.stream.write("data: #{message}\n\n")
        end
      end
    ensure
      redis.quit
      response.stream.close
    end
  end
end

Error Handling & Edge Cases

Connection interruptions occur frequently in SSE implementations. Network failures, mobile connectivity changes, server restarts, and client navigation all break connections. The EventSource API automatically reconnects, but servers must handle broken pipes and cleanup resources appropriately.

class StreamController < ApplicationController
  include ActionController::Live
  
  def stream
    response.headers['Content-Type'] = 'text/event-stream'
    redis = Redis.new
    
    begin
      redis.subscribe('events') do |on|
        on.message do |channel, message|
          begin
            response.stream.write("data: #{message}\n\n")
          rescue IOError => e
            # Client disconnected mid-write
            Rails.logger.info("Client disconnected: #{e.message}")
            raise  # Exit subscribe block
          end
        end
      end
    rescue IOError, Errno::EPIPE
      # Connection broken
      Rails.logger.info("Stream connection closed")
    rescue Redis::ConnectionError => e
      Rails.logger.error("Redis connection failed: #{e.message}")
    ensure
      begin
        redis.quit
      rescue Redis::ConnectionError
        # Redis already disconnected
      end
      
      begin
        response.stream.close
      rescue IOError
        # Stream already closed
      end
    end
  end
end

The IOError exception signals client disconnection in Ruby. Rails raises this when writing to a closed stream. Handlers must catch this exception to prevent error logs from normal disconnections. Wrapping stream.write in a rescue block allows graceful handling of broken connections.

Timeouts from intermediary proxies close idle connections after configured intervals. Load balancers and reverse proxies typically timeout after 30-60 seconds without activity. Servers prevent timeouts by sending periodic keepalive messages, usually comments that clients ignore.

# Keepalive pattern
def stream_with_keepalive
  response.headers['Content-Type'] = 'text/event-stream'
  
  keepalive_thread = Thread.new do
    loop do
      sleep 15
      begin
        response.stream.write(": keepalive\n\n")
      rescue IOError
        break
      end
    end
  end
  
  begin
    stream_events
  ensure
    keepalive_thread.kill
    response.stream.close
  end
end

Event ID tracking enables resumable streams but requires careful implementation. The server must store event IDs persistently and handle replay requests efficiently. Missing event IDs cause data loss when clients reconnect after the server restarts.

# Resumable event stream with ID tracking
class EventsController < ApplicationController
  include ActionController::Live
  
  def stream
    response.headers['Content-Type'] = 'text/event-stream'
    
    last_id = request.headers['HTTP_LAST_EVENT_ID'].to_i
    
    begin
      # Send missed events
      Event.where('id > ?', last_id).order(:id).each do |event|
        write_event(event)
      end
      
      # Stream new events
      Event.on_create do |event|
        write_event(event)
      end
    ensure
      response.stream.close
    end
  end
  
  private
  
  def write_event(event)
    response.stream.write(
      "id: #{event.id}\n" \
      "event: #{event.type}\n" \
      "data: #{event.payload}\n\n"
    )
  rescue IOError
    raise  # Propagate to outer handler
  end
end

Malformed events cause parsing failures in clients. The EventSource API silently ignores malformed messages, making debugging difficult. Servers must ensure proper message formatting, including correct newline sequences and field structure.

# Proper message formatting
def format_event(data, event: nil, id: nil, retry: nil)
  message = ""
  message << "id: #{id}\n" if id
  message << "event: #{event}\n" if event
  message << "retry: #{retry}\n" if retry
  
  # Handle multiline data
  data_lines = data.to_s.split("\n")
  data_lines.each do |line|
    message << "data: #{line}\n"
  end
  
  message << "\n"  # Blank line terminates message
end

Character encoding issues arise with non-ASCII characters. SSE requires UTF-8 encoding, and servers must ensure response content uses UTF-8. Ruby strings default to UTF-8 in modern versions, but external data sources may use different encodings.

# Ensure UTF-8 encoding
def write_event(data)
  json = data.to_json.encode('UTF-8', invalid: :replace, undef: :replace)
  response.stream.write("data: #{json}\n\n")
end

Memory leaks occur when SSE connections accumulate without cleanup. Each connection holds resources including threads, Redis subscriptions, and memory buffers. Failed cleanup causes resource exhaustion over time, degrading server performance and eventually causing failures.

# Resource cleanup tracking
class ConnectionManager
  @connections = Concurrent::Hash.new
  
  def self.register(connection_id)
    @connections[connection_id] = Time.now
    Rails.logger.info("Active connections: #{@connections.size}")
  end
  
  def self.unregister(connection_id)
    @connections.delete(connection_id)
    Rails.logger.info("Active connections: #{@connections.size}")
  end
end

class StreamController < ApplicationController
  include ActionController::Live
  
  def stream
    connection_id = SecureRandom.uuid
    ConnectionManager.register(connection_id)
    
    begin
      stream_events
    ensure
      cleanup_resources
      ConnectionManager.unregister(connection_id)
    end
  end
end

Thundering herd problems emerge when many clients reconnect simultaneously after server restarts. All clients attempt reconnection at once, overwhelming the server. Implementing jitter in retry intervals distributes reconnection attempts over time.

# Jittered retry interval
def stream
  response.headers['Content-Type'] = 'text/event-stream'
  
  # Base retry + random jitter
  retry_ms = 3000 + rand(2000)
  response.stream.write("retry: #{retry_ms}\n\n")
  
  begin
    stream_events
  ensure
    response.stream.close
  end
end

Security Implications

Authentication mechanisms must secure SSE endpoints like any HTTP endpoint. The EventSource API sends cookies and authentication headers automatically, enabling standard session-based or token-based authentication. However, the API doesn't support custom headers, limiting authentication options.

class SecureStreamController < ApplicationController
  before_action :authenticate_user!
  
  def stream
    # Authentication verified by before_action
    response.headers['Content-Type'] = 'text/event-stream'
    
    user_id = current_user.id
    
    begin
      stream_events_for_user(user_id)
    ensure
      response.stream.close
    end
  end
end

Token-based authentication for SSE requires tokens in URLs or cookies since custom headers are unavailable. URL tokens expose credentials in logs and browser history. Cookie-based tokens provide better security but require CSRF protection.

# Token-based SSE authentication
class EventStreamController < ApplicationController
  skip_before_action :verify_authenticity_token
  
  def stream
    token = params[:token]
    
    unless valid_token?(token)
      render plain: "Unauthorized", status: 401
      return
    end
    
    response.headers['Content-Type'] = 'text/event-stream'
    
    user = User.find_by_token(token)
    stream_events_for_user(user.id)
  end
  
  private
  
  def valid_token?(token)
    return false if token.blank?
    User.exists?(token: token, token_expires_at: Time.now..)
  end
end

Authorization checks prevent users from accessing unauthorized event streams. The server must verify each client's permission to receive specific events, especially in multi-tenant applications or systems with granular access controls.

class NotificationStreamController < ApplicationController
  def stream
    user_id = current_user.id
    organization_id = params[:organization_id]
    
    unless current_user.member_of?(organization_id)
      render plain: "Forbidden", status: 403
      return
    end
    
    response.headers['Content-Type'] = 'text/event-stream'
    
    redis = Redis.new
    
    begin
      redis.subscribe("org:#{organization_id}:notifications") do |on|
        on.message do |channel, message|
          # Filter messages for user's permissions
          notification = JSON.parse(message)
          
          if current_user.can_view?(notification)
            response.stream.write("data: #{message}\n\n")
          end
        end
      end
    ensure
      redis.quit
      response.stream.close
    end
  end
end

Information disclosure risks arise when broadcasting sensitive data to multiple clients. Servers must filter events based on recipient permissions before streaming. Broadcasting unfiltered data through pub/sub channels leaks information to unauthorized users.

# Secure message filtering
class SecureStreamService
  def initialize(user, stream)
    @user = user
    @stream = stream
  end
  
  def send_event(event)
    return unless authorized?(event)
    
    filtered_data = filter_sensitive_fields(event.data)
    @stream.write("data: #{filtered_data.to_json}\n\n")
  end
  
  private
  
  def authorized?(event)
    event.public? || @user.can_access?(event)
  end
  
  def filter_sensitive_fields(data)
    return data if @user.admin?
    
    data.except(:ssn, :credit_card, :internal_notes)
  end
end

Cross-Site Request Forgery (CSRF) attacks target SSE endpoints when authentication relies on cookies. Attackers embed EventSource connections in malicious sites, and browsers send authentication cookies automatically. CSRF tokens in URLs provide protection but expose tokens in logs.

# CSRF protection for SSE
class ProtectedStreamController < ApplicationController
  skip_before_action :verify_authenticity_token
  
  def stream
    unless valid_csrf_token?(params[:csrf_token])
      render plain: "Forbidden", status: 403
      return
    end
    
    response.headers['Content-Type'] = 'text/event-stream'
    stream_events
  end
  
  private
  
  def valid_csrf_token?(token)
    return false if token.blank?
    
    # Verify token matches session
    valid_authenticity_token?(session, token)
  end
end

# Client includes CSRF token in URL
# const token = document.querySelector('meta[name="csrf-token"]').content;
# const events = new EventSource(`/stream?csrf_token=${token}`);

Rate limiting prevents abuse of SSE connections. Without limits, attackers can exhaust server resources by opening many connections. Connection limits per IP or user prevent resource exhaustion attacks.

# Connection rate limiting
class RateLimitedStreamController < ApplicationController
  before_action :check_connection_limit
  
  def stream
    ConnectionTracker.increment(current_user.id)
    
    begin
      stream_events
    ensure
      ConnectionTracker.decrement(current_user.id)
    end
  end
  
  private
  
  def check_connection_limit
    if ConnectionTracker.count(current_user.id) >= 5
      render plain: "Too many connections", status: 429
      return
    end
  end
end

class ConnectionTracker
  @counters = Concurrent::Hash.new(0)
  
  def self.increment(user_id)
    @counters[user_id] += 1
  end
  
  def self.decrement(user_id)
    @counters[user_id] -= 1
  end
  
  def self.count(user_id)
    @counters[user_id]
  end
end

Content injection vulnerabilities exist when streaming user-generated content without sanitization. Malicious users can inject event stream control characters to manipulate message boundaries or inject fake events. Proper escaping prevents injection attacks.

# Safe event streaming
def stream_user_content(content)
  # Remove or escape SSE control characters
  safe_content = content
    .gsub("\n", "\\n")    # Escape newlines
    .gsub("\r", "")       # Remove carriage returns
    .gsub(":", "\\:")     # Escape colons
  
  response.stream.write("data: #{safe_content}\n\n")
end

Reference

Event Stream Message Format

Field Purpose Format Example
data Message payload UTF-8 text, typically JSON data: {"message":"hello"}
event Custom event type String identifier event: userUpdate
id Last event identifier for resumption String or number id: 123
retry Reconnection delay in milliseconds Integer retry: 5000
comment Keepalive or debugging info Line starting with colon : keepalive ping

Ruby Response Headers

Header Value Purpose
Content-Type text/event-stream Identifies SSE stream
Cache-Control no-cache Prevents proxy caching
Connection keep-alive Maintains persistent connection
X-Accel-Buffering no Disables Nginx buffering

EventSource Client States

State Value Description
CONNECTING 0 Connection being established
OPEN 1 Connection open and receiving events
CLOSED 2 Connection closed, no reconnection

Common Event Types

Event Type Use Case Payload Example
message Default event type Generic data
notification User notifications Alert data with title and body
update Data updates Changed records or fields
delete Resource deletion Deleted item identifier
error Error conditions Error message and code
keepalive Connection maintenance Empty or timestamp

Connection Lifecycle

Phase Description Client Behavior Server Behavior
Connect Initial connection Opens EventSource Accepts connection, sends headers
Stream Active streaming Receives and processes events Writes events as they occur
Disconnect Connection lost Waits retry interval Detects broken pipe, cleans up
Reconnect Resume connection Sends Last-Event-ID header Sends events since last ID

Error Handling Patterns

Scenario Detection Recovery
Client disconnect IOError exception Close stream, cleanup resources
Redis failure Redis::ConnectionError Log error, close stream
Timeout Proxy closes connection Client auto-reconnects
Write failure Errno::EPIPE Exit stream loop
Malformed JSON JSON::ParserError Skip event, log error

Performance Characteristics

Metric Typical Value Notes
Connection overhead 1-2 KB/connection HTTP headers and initial setup
Idle bandwidth 0.1-1 KB/min Keepalive messages only
Message latency 10-100 ms Network dependent
Concurrent connections 100-10000/server Limited by threads/processes
Reconnection delay 3000 ms Configurable via retry field

Security Checklist

Concern Mitigation
Authentication Verify user identity before streaming
Authorization Check permissions for each event
CSRF Include token in URL or use Origin checks
Rate limiting Limit connections per user/IP
Information disclosure Filter events by recipient permissions
Resource exhaustion Implement connection limits and timeouts
Injection attacks Sanitize user content in events

Infrastructure Configuration

Component Configuration Purpose
Nginx proxy_buffering off Disable response buffering
Puma threads 5, 32 Support concurrent connections
Redis timeout 0 No timeout for pub/sub connections
Load balancer Connection timeout 300s Prevent premature disconnection
Application ActionController::Live Enable streaming responses