CrackedRuby CrackedRuby

Overview

The WebSocket Protocol provides a standardized method for establishing persistent, bidirectional communication channels between clients and servers over a single TCP connection. Unlike traditional HTTP request-response cycles, WebSocket maintains an open connection that allows both parties to send data at any time without the overhead of repeatedly establishing new connections.

The protocol originated from the need to address limitations in existing technologies for real-time web applications. Traditional approaches like long polling, server-sent events, and HTTP streaming each presented drawbacks in efficiency, latency, or bidirectional capability. WebSocket was standardized in RFC 6455 (2011) to provide a unified solution.

WebSocket operates through an initial HTTP handshake that upgrades the connection to the WebSocket protocol. Once established, the connection remains open for bidirectional message exchange until either party closes it. This approach eliminates the latency and overhead associated with establishing new HTTP connections for each message.

The protocol defines its own framing format, distinct from HTTP, that includes mechanisms for message fragmentation, control frames, and connection management. Messages can be transmitted as text (UTF-8 encoded) or binary data, with the protocol handling frame composition and decomposition transparently.

require 'faye/websocket'

# Basic WebSocket server
EM.run {
  server = Faye::WebSocket::Client.new('ws://example.com/socket')
  
  server.on :open do |event|
    puts "Connection established"
    server.send("Hello from Ruby client")
  end
  
  server.on :message do |event|
    puts "Received: #{event.data}"
  end
  
  server.on :close do |event|
    puts "Connection closed: #{event.code} #{event.reason}"
  end
}

WebSocket connections use the ws:// scheme for unencrypted connections and wss:// for TLS-encrypted connections, mirroring the HTTP/HTTPS convention. The protocol operates on the same ports as HTTP (80 and 443), facilitating deployment through existing infrastructure without requiring additional firewall configuration.

Key Principles

The WebSocket Protocol operates on several core mechanisms that distinguish it from traditional HTTP communication. Understanding these principles clarifies how the protocol achieves efficient, real-time bidirectional communication.

Connection Establishment Through HTTP Upgrade

WebSocket connections begin as standard HTTP requests with specific headers indicating an upgrade request. The client sends an HTTP GET request containing Upgrade: websocket and Connection: Upgrade headers, along with a base64-encoded random key. The server responds with HTTP 101 Switching Protocols if it accepts the upgrade, including a hashed version of the client's key to confirm the handshake. This HTTP-based initialization allows WebSocket to traverse proxies and firewalls that permit HTTP traffic.

Message Framing Structure

After handshake completion, communication shifts to WebSocket's binary framing protocol. Each frame contains a header specifying frame type (text, binary, or control), payload length, masking information, and fragmentation details. Data frames carry application messages, while control frames manage connection state through ping, pong, and close operations. The protocol mandates that client-to-server frames must be masked using a random 32-bit key, while server-to-client frames remain unmasked. This masking requirement prevents certain cache poisoning attacks.

Full-Duplex Communication Model

WebSocket supports simultaneous bidirectional data flow. Either party can send messages at any time without waiting for a response, contrasting with HTTP's request-response pattern. This characteristic eliminates the need for techniques like long polling or multiple connections. The server can push data to clients immediately upon availability, reducing latency for time-sensitive applications.

Connection Lifecycle Management

A WebSocket connection progresses through distinct states: CONNECTING, OPEN, CLOSING, and CLOSED. The CONNECTING state exists during the HTTP handshake. Upon successful handshake, the connection enters OPEN state where message exchange occurs. Either party can initiate closure by sending a close frame, transitioning to CLOSING state. The connection reaches CLOSED state after both parties exchange close frames or when an error forces connection termination. The protocol defines status codes (1000-4999) to communicate closure reasons.

Fragmentation and Message Assembly

Large messages can be split into multiple frames, with each frame marked as either a continuation frame or a final frame. The FIN bit in the frame header indicates whether additional frames follow. This fragmentation allows applications to begin transmitting messages before the complete payload is available and prevents a single large message from monopolizing the connection. The receiving side reassembles fragments in the order received before delivering the complete message to the application.

Control Frame Operations

The protocol defines three control frame types for connection management. Ping frames test connection liveness; the recipient must respond with a pong frame containing identical payload data. Close frames initiate connection termination and may include a status code and UTF-8 encoded reason. Control frames must not be fragmented and take priority over data frames, ensuring timely connection management even during large message transfers.

Extension and Subprotocol Negotiation

WebSocket includes mechanisms for negotiating extensions and subprotocols during the handshake. Extensions modify the framing protocol itself, enabling features like message compression (permessage-deflate). Subprotocols define application-level protocols carried over WebSocket framing, such as STOMP or WAMP. The client proposes supported extensions and subprotocols in handshake headers; the server selects from these options in its handshake response.

Ruby Implementation

Ruby provides several libraries for WebSocket implementation, each suited to different architectural requirements. The ecosystem includes low-level drivers, high-level frameworks, and integrated solutions for web applications.

Core WebSocket Libraries

The websocket-driver gem implements the WebSocket protocol at a low level, handling frame parsing, masking, and protocol compliance. This driver operates as a streaming parser that processes buffers incrementally, making it suitable for integration into various I/O models:

require 'websocket/driver'

# Server-side driver
driver = WebSocket::Driver.server(socket)

driver.on :connect do |event|
  driver.start if WebSocket::Driver.websocket?(event.env)
end

driver.on :message do |event|
  data = event.data
  # Process message
  driver.text("Echo: #{data}")
end

driver.on :close do |event|
  puts "Client disconnected: #{event.code} #{event.reason}"
end

# Feed data to driver
socket.on_data { |data| driver.parse(data) }

The faye-websocket gem builds on websocket-driver to provide a higher-level API with EventMachine integration. It handles connection lifecycle, automatic reconnection, and provides a cleaner interface for application code:

require 'faye/websocket'
require 'eventmachine'

EM.run {
  Faye::WebSocket.load_adapter('thin')
  
  App = lambda do |env|
    if Faye::WebSocket.websocket?(env)
      ws = Faye::WebSocket.new(env)
      
      ws.on :message do |event|
        # Broadcast to all connected clients
        clients.each { |client| client.send(event.data) }
      end
      
      ws.on :close do |event|
        ws = nil
      end
      
      ws.rack_response
    else
      [200, {'Content-Type' => 'text/plain'}, ['Non-WebSocket Request']]
    end
  end
  
  Rack::Handler.get('thin').run(App, Port: 9292)
}

ActionCable Integration

Rails applications use ActionCable, a framework built on top of WebSocket that integrates with the Rails stack. ActionCable introduces channels as abstractions for logical WebSocket connections, providing pub/sub semantics and automatic reconnection:

# app/channels/chat_channel.rb
class ChatChannel < ApplicationCable::Channel
  def subscribed
    stream_from "chat_#{params[:room_id]}"
  end
  
  def receive(data)
    # Message from client
    message = Message.create!(
      content: data['message'],
      room_id: params[:room_id],
      user: current_user
    )
    
    # Broadcast to all subscribers
    ActionCable.server.broadcast(
      "chat_#{params[:room_id]}",
      message: render_message(message)
    )
  end
  
  def unsubscribed
    # Cleanup when channel unsubscribed
    stop_all_streams
  end
  
  private
  
  def render_message(message)
    ApplicationController.render(
      partial: 'messages/message',
      locals: { message: message }
    )
  end
end

ActionCable handles authentication through the connection class:

# app/channels/application_cable/connection.rb
module ApplicationCable
  class Connection < ActionCable::Connection::Base
    identified_by :current_user
    
    def connect
      self.current_user = find_verified_user
    end
    
    private
    
    def find_verified_user
      if verified_user = User.find_by(id: cookies.encrypted[:user_id])
        verified_user
      else
        reject_unauthorized_connection
      end
    end
  end
end

Async and Fiber-Based Implementations

The async-websocket gem provides WebSocket support for the async framework, using Ruby's Fiber scheduler for concurrent I/O operations:

require 'async'
require 'async/http/endpoint'
require 'async/websocket/client'

Async do
  endpoint = Async::HTTP::Endpoint.parse('wss://example.com/socket')
  
  Async::WebSocket::Client.connect(endpoint) do |connection|
    # Send message
    connection.write({type: 'subscribe', channel: 'updates'})
    connection.flush
    
    # Receive messages
    while message = connection.read
      data = JSON.parse(message)
      puts "Received: #{data}"
      
      # Respond to ping
      if data['type'] == 'ping'
        connection.write({type: 'pong'})
        connection.flush
      end
    end
  rescue Async::WebSocket::ProtocolError => e
    puts "Protocol error: #{e.message}"
  end
end

Connection Pooling and Management

Production applications often require connection pooling to manage multiple WebSocket connections efficiently:

class WebSocketPool
  def initialize(size: 5)
    @pool = ConnectionPool.new(size: size, timeout: 5) do
      create_connection
    end
  end
  
  def with_connection(&block)
    @pool.with(&block)
  end
  
  private
  
  def create_connection
    ws = Faye::WebSocket::Client.new('wss://api.example.com')
    
    ws.on :open do
      puts "Pool connection established"
    end
    
    ws.on :error do |event|
      puts "Pool connection error: #{event.message}"
    end
    
    ws
  end
end

# Usage
pool = WebSocketPool.new(size: 10)
pool.with_connection do |ws|
  ws.send(JSON.generate(data))
end

Practical Examples

Real-world WebSocket applications demonstrate the protocol's capabilities across various use cases requiring real-time communication.

Real-Time Chat Application

A chat system illustrates bidirectional messaging, presence tracking, and message broadcasting:

# Server implementation
class ChatServer
  def initialize
    @clients = {}
    @rooms = Hash.new { |h, k| h[k] = [] }
  end
  
  def handle_connection(ws, user_id)
    @clients[user_id] = ws
    
    ws.on :message do |event|
      data = JSON.parse(event.data)
      
      case data['type']
      when 'join_room'
        join_room(user_id, data['room_id'])
        broadcast_presence(data['room_id'])
        
      when 'message'
        broadcast_message(
          data['room_id'],
          user_id,
          data['content']
        )
        
      when 'typing'
        broadcast_typing(data['room_id'], user_id, data['typing'])
      end
    end
    
    ws.on :close do
      leave_all_rooms(user_id)
      @clients.delete(user_id)
    end
  end
  
  private
  
  def join_room(user_id, room_id)
    @rooms[room_id] << user_id unless @rooms[room_id].include?(user_id)
  end
  
  def broadcast_message(room_id, sender_id, content)
    message = {
      type: 'message',
      sender: sender_id,
      content: content,
      timestamp: Time.now.iso8601
    }
    
    @rooms[room_id].each do |user_id|
      @clients[user_id]&.send(JSON.generate(message))
    end
  end
  
  def broadcast_typing(room_id, user_id, typing)
    event = {
      type: 'typing',
      user: user_id,
      typing: typing
    }
    
    @rooms[room_id].each do |recipient_id|
      next if recipient_id == user_id
      @clients[recipient_id]&.send(JSON.generate(event))
    end
  end
  
  def broadcast_presence(room_id)
    users = @rooms[room_id].map { |uid| user_info(uid) }
    event = {
      type: 'presence',
      users: users
    }
    
    @rooms[room_id].each do |user_id|
      @clients[user_id]&.send(JSON.generate(event))
    end
  end
end

Live Data Dashboard

A monitoring dashboard demonstrates server push for real-time metrics updates:

class MetricsPublisher
  def initialize
    @subscribers = {}
    start_metrics_collection
  end
  
  def subscribe(ws, metrics)
    subscription_id = SecureRandom.uuid
    @subscribers[subscription_id] = {
      connection: ws,
      metrics: metrics,
      last_sent: {}
    }
    
    ws.on :close do
      @subscribers.delete(subscription_id)
    end
    
    subscription_id
  end
  
  private
  
  def start_metrics_collection
    Thread.new do
      loop do
        metrics_data = collect_current_metrics
        publish_updates(metrics_data)
        sleep 1
      end
    end
  end
  
  def collect_current_metrics
    {
      cpu_usage: SystemMetrics.cpu_percent,
      memory_usage: SystemMetrics.memory_percent,
      active_users: UserSession.active_count,
      request_rate: RequestCounter.rate_per_second,
      error_rate: ErrorCounter.rate_per_minute
    }
  end
  
  def publish_updates(current_metrics)
    @subscribers.each do |id, subscriber|
      updates = {}
      
      subscriber[:metrics].each do |metric|
        current_value = current_metrics[metric.to_sym]
        last_value = subscriber[:last_sent][metric]
        
        # Only send if changed or 10 seconds elapsed
        if should_send_update?(metric, current_value, last_value)
          updates[metric] = current_value
          subscriber[:last_sent][metric] = current_value
        end
      end
      
      if updates.any?
        subscriber[:connection].send(JSON.generate({
          type: 'metrics_update',
          data: updates,
          timestamp: Time.now.to_i
        }))
      end
    end
  end
  
  def should_send_update?(metric, current, last)
    return true if last.nil?
    return true if (current - last).abs > threshold_for(metric)
    false
  end
end

Multiplayer Game State Synchronization

Game applications require low-latency state updates and conflict resolution:

class GameStateSync
  def initialize
    @game_states = {}
    @player_connections = {}
  end
  
  def handle_player(ws, game_id, player_id)
    @game_states[game_id] ||= GameState.new(game_id)
    @player_connections[player_id] = ws
    
    # Send initial state
    ws.send(serialize_state(@game_states[game_id]))
    
    ws.on :message do |event|
      action = deserialize_action(event.data)
      process_action(game_id, player_id, action)
    end
    
    ws.on :close do
      handle_disconnect(game_id, player_id)
    end
  end
  
  private
  
  def process_action(game_id, player_id, action)
    state = @game_states[game_id]
    
    # Validate action
    unless state.valid_action?(player_id, action)
      send_error(player_id, "Invalid action")
      return
    end
    
    # Apply action with timestamp
    action[:timestamp] = Time.now.to_f
    state.apply_action(player_id, action)
    
    # Broadcast state delta
    delta = state.last_delta
    broadcast_to_game(game_id, {
      type: 'state_delta',
      delta: delta,
      sequence: state.sequence_number
    })
    
    # Check win conditions
    if winner = state.check_winner
      broadcast_to_game(game_id, {
        type: 'game_over',
        winner: winner
      })
    end
  end
  
  def broadcast_to_game(game_id, message)
    state = @game_states[game_id]
    state.players.each do |player_id|
      @player_connections[player_id]&.send(JSON.generate(message))
    end
  end
  
  def handle_disconnect(game_id, player_id)
    @player_connections.delete(player_id)
    state = @game_states[game_id]
    
    state.mark_player_inactive(player_id)
    
    # Notify other players
    broadcast_to_game(game_id, {
      type: 'player_disconnected',
      player_id: player_id
    })
    
    # Clean up if all players gone
    if state.all_players_inactive?
      @game_states.delete(game_id)
    end
  end
end

Financial Data Streaming

Stock ticker applications demonstrate high-frequency updates with throttling:

class TickerStream
  def initialize
    @subscriptions = Hash.new { |h, k| h[k] = [] }
    start_market_data_feed
  end
  
  def subscribe(ws, symbols)
    subscription = {
      connection: ws,
      symbols: symbols.map(&:upcase),
      buffer: [],
      last_flush: Time.now
    }
    
    symbols.each do |symbol|
      @subscriptions[symbol] << subscription
    end
    
    ws.on :message do |event|
      handle_subscription_change(ws, event.data)
    end
    
    ws.on :close do
      remove_subscription(subscription)
    end
  end
  
  private
  
  def start_market_data_feed
    Thread.new do
      loop do
        # Receive market data updates
        updates = MarketDataFeed.get_updates
        
        updates.each do |symbol, quote|
          publish_quote(symbol, quote)
        end
        
        flush_buffers
        sleep 0.1 # 10 updates per second maximum
      end
    end
  end
  
  def publish_quote(symbol, quote)
    @subscriptions[symbol].each do |sub|
      sub[:buffer] << {
        symbol: symbol,
        price: quote.price,
        volume: quote.volume,
        timestamp: quote.timestamp
      }
    end
  end
  
  def flush_buffers
    @subscriptions.values.flatten.uniq.each do |sub|
      next if sub[:buffer].empty?
      
      # Coalesce updates - send only latest for each symbol
      latest = sub[:buffer].group_by { |u| u[:symbol] }
                           .transform_values(&:last)
      
      sub[:connection].send(JSON.generate({
        type: 'quotes',
        data: latest.values
      }))
      
      sub[:buffer].clear
      sub[:last_flush] = Time.now
    end
  end
end

Security Implications

WebSocket connections introduce security considerations that differ from traditional HTTP request-response patterns. The persistent nature of WebSocket connections and their ability to bypass same-origin policy in certain contexts require careful security implementation.

Origin Validation

The WebSocket handshake includes an Origin header that indicates the source of the connection request. Servers must validate this origin to prevent unauthorized cross-site WebSocket connections. Unlike HTTP cookies, WebSocket connections do not automatically enforce same-origin policy:

class SecureWebSocketServer
  ALLOWED_ORIGINS = [
    'https://app.example.com',
    'https://admin.example.com'
  ].freeze
  
  def validate_connection(env)
    origin = env['HTTP_ORIGIN']
    
    unless origin_allowed?(origin)
      return [403, {}, ['Forbidden: Invalid origin']]
    end
    
    # Additional validation
    unless authenticated?(env)
      return [401, {}, ['Unauthorized']]
    end
    
    # Origin and auth valid
    true
  end
  
  private
  
  def origin_allowed?(origin)
    return false if origin.nil?
    
    # Exact match or subdomain check
    ALLOWED_ORIGINS.any? do |allowed|
      origin == allowed || origin.end?(".#{allowed}")
    end
  end
  
  def authenticated?(env)
    token = extract_token(env)
    return false unless token
    
    # Validate JWT or session token
    TokenValidator.valid?(token)
  end
  
  def extract_token(env)
    # Check query parameter
    params = Rack::Utils.parse_query(env['QUERY_STRING'])
    return params['token'] if params['token']
    
    # Check cookie
    cookies = Rack::Utils.parse_cookies(env)
    cookies['auth_token']
  end
end

Transport Layer Security

Production WebSocket deployments must use TLS (wss://) to prevent eavesdropping and man-in-the-middle attacks. Unencrypted WebSocket connections (ws://) expose all transmitted data, including authentication credentials and application data:

# Rack configuration for secure WebSocket
app = Rack::Builder.new do
  use Rack::SSL, exclude: -> (env) { 
    # Allow health checks without SSL
    env['PATH_INFO'] == '/health'
  }
  
  map '/cable' do
    run ActionCable.server
  end
end

# Enforce secure WebSocket URLs in client code
WebSocket::Client.configure do |config|
  config.enforce_wss = Rails.env.production?
end

Message Validation and Sanitization

All incoming WebSocket messages must be validated and sanitized. Message content should be treated as untrusted user input, subject to the same validation as HTTP request parameters:

class MessageValidator
  MAX_MESSAGE_SIZE = 64.kilobytes
  RATE_LIMIT = 100 # messages per minute
  
  def initialize
    @rate_tracker = Hash.new { |h, k| h[k] = [] }
  end
  
  def validate(user_id, raw_message)
    # Size check
    if raw_message.bytesize > MAX_MESSAGE_SIZE
      raise SecurityError, "Message exceeds size limit"
    end
    
    # Rate limiting
    unless within_rate_limit?(user_id)
      raise SecurityError, "Rate limit exceeded"
    end
    
    # Parse and validate structure
    message = JSON.parse(raw_message)
    
    unless valid_message_structure?(message)
      raise SecurityError, "Invalid message structure"
    end
    
    # Sanitize content
    sanitize_message(message)
  end
  
  private
  
  def within_rate_limit?(user_id)
    now = Time.now
    cutoff = now - 60
    
    # Remove old entries
    @rate_tracker[user_id].reject! { |t| t < cutoff }
    
    # Check limit
    if @rate_tracker[user_id].size >= RATE_LIMIT
      return false
    end
    
    @rate_tracker[user_id] << now
    true
  end
  
  def sanitize_message(message)
    if message['content'].is_a?(String)
      message['content'] = sanitize_html(message['content'])
    end
    
    # Remove potentially dangerous keys
    message.delete('__proto__')
    message.delete('constructor')
    
    message
  end
  
  def sanitize_html(content)
    # Use appropriate sanitization for your use case
    Sanitize.fragment(content, Sanitize::Config::BASIC)
  end
end

Cross-Site WebSocket Hijacking Prevention

CSWSH attacks exploit the fact that browsers send cookies with WebSocket handshake requests. Attackers can trick users into establishing WebSocket connections from malicious sites. Defense requires combining origin validation, CSRF tokens, and custom authentication:

class CSWSHProtection
  def validate_handshake(env, cookies)
    # Origin check
    unless valid_origin?(env['HTTP_ORIGIN'])
      raise SecurityError, "Invalid origin"
    end
    
    # CSRF token in initial handshake
    token = parse_query_params(env)['csrf_token']
    unless valid_csrf_token?(token, cookies['session_id'])
      raise SecurityError, "Invalid CSRF token"
    end
    
    # Additional authentication
    unless authenticated_session?(cookies['session_id'])
      raise SecurityError, "Unauthenticated"
    end
    
    true
  end
  
  private
  
  def valid_csrf_token?(token, session_id)
    expected = generate_csrf_token(session_id)
    ActiveSupport::SecurityUtils.secure_compare(token, expected)
  end
  
  def generate_csrf_token(session_id)
    # Use your CSRF token generation strategy
    OpenSSL::HMAC.hexdigest('SHA256', Rails.application.secret_key_base, session_id)
  end
end

Denial of Service Prevention

WebSocket connections remain open indefinitely, making servers vulnerable to resource exhaustion. Implement connection limits, idle timeouts, and message size restrictions:

class ConnectionLimiter
  MAX_CONNECTIONS_PER_IP = 10
  IDLE_TIMEOUT = 300 # seconds
  
  def initialize
    @connections = Hash.new(0)
    @last_activity = {}
    start_timeout_monitor
  end
  
  def accept_connection?(ip_address)
    @connections[ip_address] < MAX_CONNECTIONS_PER_IP
  end
  
  def track_connection(connection_id, ip_address)
    @connections[ip_address] += 1
    @last_activity[connection_id] = Time.now
  end
  
  def update_activity(connection_id)
    @last_activity[connection_id] = Time.now
  end
  
  private
  
  def start_timeout_monitor
    Thread.new do
      loop do
        check_timeouts
        sleep 60
      end
    end
  end
  
  def check_timeouts
    now = Time.now
    @last_activity.each do |connection_id, last_time|
      if now - last_time > IDLE_TIMEOUT
        close_connection(connection_id)
      end
    end
  end
end

Performance Considerations

WebSocket performance characteristics differ significantly from HTTP due to persistent connections and different framing overhead. Understanding these factors enables efficient implementation and scaling.

Connection Overhead and Scaling

WebSocket connections consume server resources for their entire duration. Each connection requires memory for buffers, file descriptors for sockets, and processing capacity for message handling. A single server typically handles 10,000-100,000 concurrent connections depending on message frequency and processing complexity:

class ConnectionMetrics
  def initialize
    @connection_count = 0
    @message_rates = Hash.new(0)
    @bandwidth_usage = Hash.new(0)
  end
  
  def track_connection
    @connection_count += 1
    log_metrics if @connection_count % 1000 == 0
  end
  
  def track_message(connection_id, size)
    @message_rates[connection_id] += 1
    @bandwidth_usage[connection_id] += size
  end
  
  def log_metrics
    avg_message_rate = @message_rates.values.sum.to_f / @connection_count
    total_bandwidth = @bandwidth_usage.values.sum
    
    puts "Connections: #{@connection_count}"
    puts "Avg messages/sec: #{avg_message_rate}"
    puts "Total bandwidth: #{total_bandwidth / 1024}KB"
    puts "Memory per connection: #{memory_per_connection}KB"
  end
  
  private
  
  def memory_per_connection
    # Approximate memory usage
    process_memory = `ps -o rss= -p #{Process.pid}`.to_i
    process_memory / @connection_count
  end
end

Message Batching and Coalescing

High-frequency updates benefit from batching multiple messages into single frames. This reduces framing overhead and system call frequency:

class MessageBatcher
  def initialize(flush_interval: 0.1)
    @batches = Hash.new { |h, k| h[k] = [] }
    @flush_interval = flush_interval
    start_flusher
  end
  
  def queue_message(connection_id, message)
    @batches[connection_id] << message
    
    # Flush immediately if batch large enough
    flush_batch(connection_id) if @batches[connection_id].size >= 10
  end
  
  private
  
  def start_flusher
    Thread.new do
      loop do
        sleep @flush_interval
        flush_all_batches
      end
    end
  end
  
  def flush_all_batches
    @batches.each do |connection_id, messages|
      flush_batch(connection_id) unless messages.empty?
    end
  end
  
  def flush_batch(connection_id)
    messages = @batches[connection_id]
    return if messages.empty?
    
    connection = get_connection(connection_id)
    connection.send(JSON.generate({
      type: 'batch',
      messages: messages,
      count: messages.size
    }))
    
    @batches[connection_id].clear
  end
end

Compression and Frame Optimization

The permessage-deflate extension compresses individual messages, reducing bandwidth for text-heavy payloads. Binary protocols using MessagePack or Protocol Buffers offer better performance than JSON for structured data:

class OptimizedSerializer
  def initialize(use_compression: true)
    @use_compression = use_compression
  end
  
  def serialize(data)
    # Use MessagePack for binary efficiency
    packed = MessagePack.pack(data)
    
    if @use_compression && packed.bytesize > 1024
      compressed = Zlib::Deflate.deflate(packed)
      
      # Only use compressed if actually smaller
      if compressed.bytesize < packed.bytesize
        return { compressed: true, data: compressed }
      end
    end
    
    { compressed: false, data: packed }
  end
  
  def deserialize(payload)
    if payload[:compressed]
      packed = Zlib::Inflate.inflate(payload[:data])
    else
      packed = payload[:data]
    end
    
    MessagePack.unpack(packed)
  end
end

# Benchmark comparison
Benchmark.bm do |x|
  data = { users: 1000.times.map { |i| { id: i, name: "User#{i}" } } }
  
  x.report("JSON") { 
    1000.times { JSON.generate(data) }
  }
  
  x.report("MessagePack") {
    1000.times { MessagePack.pack(data) }
  }
end

Connection Pooling and Load Distribution

Distributing WebSocket connections across multiple processes or servers prevents resource exhaustion. Redis pub/sub enables message routing between processes:

class DistributedWebSocket
  def initialize
    @redis = Redis.new
    @local_connections = {}
    subscribe_to_broadcasts
  end
  
  def register_connection(connection_id, ws)
    @local_connections[connection_id] = ws
    
    # Store connection server mapping in Redis
    @redis.hset('connection_servers', connection_id, server_id)
  end
  
  def send_to_connection(connection_id, message)
    if @local_connections.key?(connection_id)
      # Local connection
      @local_connections[connection_id].send(message)
    else
      # Remote connection - publish to Redis
      server = @redis.hget('connection_servers', connection_id)
      @redis.publish("server:#{server}", {
        connection_id: connection_id,
        message: message
      }.to_json)
    end
  end
  
  def broadcast_to_all(message)
    # Publish once - all servers receive
    @redis.publish('broadcast', message)
  end
  
  private
  
  def subscribe_to_broadcasts
    Thread.new do
      Redis.new.subscribe("server:#{server_id}", 'broadcast') do |on|
        on.message do |channel, data|
          handle_redis_message(data)
        end
      end
    end
  end
  
  def handle_redis_message(data)
    parsed = JSON.parse(data)
    
    if parsed['connection_id']
      # Targeted message
      conn = @local_connections[parsed['connection_id']]
      conn&.send(parsed['message'])
    else
      # Broadcast to all local connections
      @local_connections.each_value do |ws|
        ws.send(parsed['message'])
      end
    end
  end
  
  def server_id
    @server_id ||= "#{Socket.gethostname}:#{Process.pid}"
  end
end

Heartbeat and Keep-Alive Optimization

Ping/pong frames maintain connection liveness but add overhead. Balance detection speed against network traffic:

class HeartbeatManager
  PING_INTERVAL = 30 # seconds
  PONG_TIMEOUT = 10  # seconds
  
  def monitor_connection(ws)
    last_pong = Time.now
    
    Thread.new do
      loop do
        sleep PING_INTERVAL
        
        ws.ping
        pong_received = false
        
        # Wait for pong
        timeout_at = Time.now + PONG_TIMEOUT
        until pong_received || Time.now > timeout_at
          sleep 0.1
          pong_received = check_pong_received(ws)
        end
        
        unless pong_received
          ws.close(1001, "Ping timeout")
          break
        end
      end
    end
    
    ws.on :pong do
      mark_pong_received(ws)
    end
  end
end

Tools & Ecosystem

Ruby's WebSocket ecosystem includes libraries for various use cases, from low-level protocol implementation to high-level application frameworks.

Core Protocol Libraries

The websocket-driver gem provides RFC 6455 compliant protocol implementation. It operates as a streaming parser handling frame encoding, decoding, and validation. Applications using custom I/O systems or integration with non-standard event loops use this library directly.

The faye-websocket gem builds on websocket-driver to provide EventMachine and EventLoop integration. It offers a cleaner API for standard WebSocket operations and handles connection lifecycle management automatically.

Framework Integration

ActionCable integrates WebSocket functionality into Rails applications. It provides channel abstractions, authentication hooks, and pub/sub mechanisms. ActionCable adapters support various backends including Redis, PostgreSQL, and async adapters.

The websocket-rails gem offers an alternative Rails integration focused on event-driven architecture. It provides controller-style routing for WebSocket messages and data store synchronization.

Client Libraries

The websocket-client-simple gem implements WebSocket client functionality with a straightforward API:

require 'websocket-client-simple'

ws = WebSocket::Client::Simple.connect 'wss://example.com/socket'

ws.on :message do |msg|
  puts msg.data
end

ws.on :open do
  ws.send 'hello'
end

ws.on :close do |e|
  puts "Connection closed: #{e.code} #{e.reason}"
end

ws.on :error do |e|
  puts "Error: #{e.message}"
end

Testing Tools

The websocket-client-simple gem facilitates WebSocket testing in RSpec and Minitest:

# RSpec WebSocket testing
RSpec.describe 'WebSocket Chat' do
  let(:ws_url) { 'ws://localhost:9292/chat' }
  
  it 'broadcasts messages to all clients' do
    clients = []
    received_messages = []
    
    # Connect multiple clients
    3.times do
      ws = WebSocket::Client::Simple.connect(ws_url)
      ws.on(:message) { |msg| received_messages << msg.data }
      clients << ws
      sleep 0.1 # Allow connection
    end
    
    # Send message from first client
    clients.first.send(JSON.generate({
      type: 'message',
      content: 'Hello everyone'
    }))
    
    sleep 0.5 # Allow broadcast
    
    expect(received_messages.size).to eq(3)
    clients.each(&:close)
  end
end

Performance Monitoring

The scout_apm and skylight gems provide WebSocket connection monitoring and performance insights:

class MonitoredWebSocket
  def handle_connection(ws)
    ScoutApm::Transaction.start_layer(
      ScoutApm::Layer.new('WebSocket', 'Connection')
    )
    
    ws.on :message do |event|
      ScoutApm::Transaction.start_layer(
        ScoutApm::Layer.new('WebSocket', 'Message')
      )
      
      begin
        process_message(event.data)
      ensure
        ScoutApm::Transaction.stop_layer
      end
    end
    
    ws.on :close do
      ScoutApm::Transaction.stop_layer
    end
  end
end

Proxy and Load Balancer Support

Nginx and HAProxy support WebSocket proxying with specific configuration. The rack-timeout gem requires special handling for long-lived connections:

# config/initializers/rack_timeout.rb
Rack::Timeout.unregister_state_change_observer(:logger)

# Exclude WebSocket paths from timeout
Rack::Timeout::Logger.disable_for(%r{^/cable})

Message Queue Integration

WebSocket applications integrate with message queues for reliability and scaling:

class QueuedWebSocketPublisher
  def initialize
    @rabbitmq = Bunny.new
    @rabbitmq.start
    @channel = @rabbitmq.create_channel
    @exchange = @channel.fanout('websocket_events')
  end
  
  def publish(event_type, data)
    message = {
      type: event_type,
      data: data,
      timestamp: Time.now.iso8601
    }
    
    @exchange.publish(
      JSON.generate(message),
      routing_key: event_type,
      persistent: true
    )
  end
  
  def subscribe(&block)
    queue = @channel.queue('', exclusive: true)
    queue.bind(@exchange)
    
    queue.subscribe do |delivery_info, properties, body|
      event = JSON.parse(body)
      block.call(event)
    end
  end
end

Reference

WebSocket URL Schemes

Scheme Description Default Port
ws Unencrypted WebSocket 80
wss TLS-encrypted WebSocket 443

Handshake Headers

Header Direction Purpose
Upgrade Client Requests protocol upgrade to websocket
Connection Client Must contain Upgrade
Sec-WebSocket-Key Client Base64-encoded random 16-byte value
Sec-WebSocket-Accept Server SHA-1 hash of key + magic string
Sec-WebSocket-Version Client WebSocket protocol version (13)
Sec-WebSocket-Protocol Both Subprotocol negotiation
Sec-WebSocket-Extensions Both Extension negotiation
Origin Client Origin of connection request

Frame Types

Opcode Type Purpose
0x0 Continuation Continuation frame for fragmented message
0x1 Text UTF-8 text message
0x2 Binary Binary data message
0x8 Close Connection close with optional status
0x9 Ping Liveness check request
0xA Pong Response to ping frame

Close Status Codes

Code Meaning Usage
1000 Normal Closure Successful completion
1001 Going Away Endpoint removed (server shutdown, browser navigation)
1002 Protocol Error Protocol violation detected
1003 Unsupported Data Endpoint cannot process data type
1006 Abnormal Closure No close frame received (connection lost)
1007 Invalid Frame Payload Data Inconsistent message data
1008 Policy Violation Message violates endpoint policy
1009 Message Too Big Message exceeds size limit
1010 Mandatory Extension Client expected extension not negotiated
1011 Internal Server Error Server encountered unexpected condition
1015 TLS Handshake TLS handshake failure

Ruby WebSocket Gems Comparison

Gem Concurrency Model Use Case
websocket-driver Any Low-level protocol implementation
faye-websocket EventMachine EventMachine applications
async-websocket Fiber-based Async framework applications
websocket-client-simple Thread-based Simple client implementations
actioncable Thread pool Rails applications

ActionCable Channel Methods

Method Purpose When Called
subscribed Handle new subscription Client subscribes to channel
unsubscribed Handle subscription removal Client unsubscribes
receive Process incoming message Client sends data to channel
stream_from Subscribe to broadcast stream Setup streaming during subscribed
stream_for Subscribe to model-based stream Setup model-specific streaming
transmit Send message to subscriber Any channel method

Connection States

State Description Valid Transitions
CONNECTING Handshake in progress OPEN, CLOSED
OPEN Connection established CLOSING
CLOSING Close frame sent CLOSED
CLOSED Connection terminated None

Extension Parameters

Extension Parameter Purpose
permessage-deflate server_no_context_takeover Server resets compression context
permessage-deflate client_no_context_takeover Client resets compression context
permessage-deflate server_max_window_bits Server compression window size
permessage-deflate client_max_window_bits Client compression window size

Frame Header Structure

Field Bits Description
FIN 1 Final fragment flag
RSV1-3 3 Reserved for extensions
Opcode 4 Frame type identifier
MASK 1 Payload masking flag
Payload Length 7, 7+16, or 7+64 Payload size in bytes
Masking Key 0 or 32 XOR mask for payload
Payload Data Variable Application data

Ruby Implementation Patterns

Pattern Code Approach
Basic Connection Faye::WebSocket.new(env)
Client Connection WebSocket::Client::Simple.connect(url)
Authentication Override ApplicationCable::Connection#connect
Broadcasting ActionCable.server.broadcast(channel, message)
Streaming stream_from(channel_name) in subscribed method
Message Validation Validate in receive(data) method
Connection Tracking Store connections in instance variable hash
Heartbeat Implement custom ping/pong in connection class