Overview
The WebSocket Protocol provides a standardized method for establishing persistent, bidirectional communication channels between clients and servers over a single TCP connection. Unlike traditional HTTP request-response cycles, WebSocket maintains an open connection that allows both parties to send data at any time without the overhead of repeatedly establishing new connections.
The protocol originated from the need to address limitations in existing technologies for real-time web applications. Traditional approaches like long polling, server-sent events, and HTTP streaming each presented drawbacks in efficiency, latency, or bidirectional capability. WebSocket was standardized in RFC 6455 (2011) to provide a unified solution.
WebSocket operates through an initial HTTP handshake that upgrades the connection to the WebSocket protocol. Once established, the connection remains open for bidirectional message exchange until either party closes it. This approach eliminates the latency and overhead associated with establishing new HTTP connections for each message.
The protocol defines its own framing format, distinct from HTTP, that includes mechanisms for message fragmentation, control frames, and connection management. Messages can be transmitted as text (UTF-8 encoded) or binary data, with the protocol handling frame composition and decomposition transparently.
require 'faye/websocket'
# Basic WebSocket server
EM.run {
server = Faye::WebSocket::Client.new('ws://example.com/socket')
server.on :open do |event|
puts "Connection established"
server.send("Hello from Ruby client")
end
server.on :message do |event|
puts "Received: #{event.data}"
end
server.on :close do |event|
puts "Connection closed: #{event.code} #{event.reason}"
end
}
WebSocket connections use the ws:// scheme for unencrypted connections and wss:// for TLS-encrypted connections, mirroring the HTTP/HTTPS convention. The protocol operates on the same ports as HTTP (80 and 443), facilitating deployment through existing infrastructure without requiring additional firewall configuration.
Key Principles
The WebSocket Protocol operates on several core mechanisms that distinguish it from traditional HTTP communication. Understanding these principles clarifies how the protocol achieves efficient, real-time bidirectional communication.
Connection Establishment Through HTTP Upgrade
WebSocket connections begin as standard HTTP requests with specific headers indicating an upgrade request. The client sends an HTTP GET request containing Upgrade: websocket and Connection: Upgrade headers, along with a base64-encoded random key. The server responds with HTTP 101 Switching Protocols if it accepts the upgrade, including a hashed version of the client's key to confirm the handshake. This HTTP-based initialization allows WebSocket to traverse proxies and firewalls that permit HTTP traffic.
Message Framing Structure
After handshake completion, communication shifts to WebSocket's binary framing protocol. Each frame contains a header specifying frame type (text, binary, or control), payload length, masking information, and fragmentation details. Data frames carry application messages, while control frames manage connection state through ping, pong, and close operations. The protocol mandates that client-to-server frames must be masked using a random 32-bit key, while server-to-client frames remain unmasked. This masking requirement prevents certain cache poisoning attacks.
Full-Duplex Communication Model
WebSocket supports simultaneous bidirectional data flow. Either party can send messages at any time without waiting for a response, contrasting with HTTP's request-response pattern. This characteristic eliminates the need for techniques like long polling or multiple connections. The server can push data to clients immediately upon availability, reducing latency for time-sensitive applications.
Connection Lifecycle Management
A WebSocket connection progresses through distinct states: CONNECTING, OPEN, CLOSING, and CLOSED. The CONNECTING state exists during the HTTP handshake. Upon successful handshake, the connection enters OPEN state where message exchange occurs. Either party can initiate closure by sending a close frame, transitioning to CLOSING state. The connection reaches CLOSED state after both parties exchange close frames or when an error forces connection termination. The protocol defines status codes (1000-4999) to communicate closure reasons.
Fragmentation and Message Assembly
Large messages can be split into multiple frames, with each frame marked as either a continuation frame or a final frame. The FIN bit in the frame header indicates whether additional frames follow. This fragmentation allows applications to begin transmitting messages before the complete payload is available and prevents a single large message from monopolizing the connection. The receiving side reassembles fragments in the order received before delivering the complete message to the application.
Control Frame Operations
The protocol defines three control frame types for connection management. Ping frames test connection liveness; the recipient must respond with a pong frame containing identical payload data. Close frames initiate connection termination and may include a status code and UTF-8 encoded reason. Control frames must not be fragmented and take priority over data frames, ensuring timely connection management even during large message transfers.
Extension and Subprotocol Negotiation
WebSocket includes mechanisms for negotiating extensions and subprotocols during the handshake. Extensions modify the framing protocol itself, enabling features like message compression (permessage-deflate). Subprotocols define application-level protocols carried over WebSocket framing, such as STOMP or WAMP. The client proposes supported extensions and subprotocols in handshake headers; the server selects from these options in its handshake response.
Ruby Implementation
Ruby provides several libraries for WebSocket implementation, each suited to different architectural requirements. The ecosystem includes low-level drivers, high-level frameworks, and integrated solutions for web applications.
Core WebSocket Libraries
The websocket-driver gem implements the WebSocket protocol at a low level, handling frame parsing, masking, and protocol compliance. This driver operates as a streaming parser that processes buffers incrementally, making it suitable for integration into various I/O models:
require 'websocket/driver'
# Server-side driver
driver = WebSocket::Driver.server(socket)
driver.on :connect do |event|
driver.start if WebSocket::Driver.websocket?(event.env)
end
driver.on :message do |event|
data = event.data
# Process message
driver.text("Echo: #{data}")
end
driver.on :close do |event|
puts "Client disconnected: #{event.code} #{event.reason}"
end
# Feed data to driver
socket.on_data { |data| driver.parse(data) }
The faye-websocket gem builds on websocket-driver to provide a higher-level API with EventMachine integration. It handles connection lifecycle, automatic reconnection, and provides a cleaner interface for application code:
require 'faye/websocket'
require 'eventmachine'
EM.run {
Faye::WebSocket.load_adapter('thin')
App = lambda do |env|
if Faye::WebSocket.websocket?(env)
ws = Faye::WebSocket.new(env)
ws.on :message do |event|
# Broadcast to all connected clients
clients.each { |client| client.send(event.data) }
end
ws.on :close do |event|
ws = nil
end
ws.rack_response
else
[200, {'Content-Type' => 'text/plain'}, ['Non-WebSocket Request']]
end
end
Rack::Handler.get('thin').run(App, Port: 9292)
}
ActionCable Integration
Rails applications use ActionCable, a framework built on top of WebSocket that integrates with the Rails stack. ActionCable introduces channels as abstractions for logical WebSocket connections, providing pub/sub semantics and automatic reconnection:
# app/channels/chat_channel.rb
class ChatChannel < ApplicationCable::Channel
def subscribed
stream_from "chat_#{params[:room_id]}"
end
def receive(data)
# Message from client
message = Message.create!(
content: data['message'],
room_id: params[:room_id],
user: current_user
)
# Broadcast to all subscribers
ActionCable.server.broadcast(
"chat_#{params[:room_id]}",
message: render_message(message)
)
end
def unsubscribed
# Cleanup when channel unsubscribed
stop_all_streams
end
private
def render_message(message)
ApplicationController.render(
partial: 'messages/message',
locals: { message: message }
)
end
end
ActionCable handles authentication through the connection class:
# app/channels/application_cable/connection.rb
module ApplicationCable
class Connection < ActionCable::Connection::Base
identified_by :current_user
def connect
self.current_user = find_verified_user
end
private
def find_verified_user
if verified_user = User.find_by(id: cookies.encrypted[:user_id])
verified_user
else
reject_unauthorized_connection
end
end
end
end
Async and Fiber-Based Implementations
The async-websocket gem provides WebSocket support for the async framework, using Ruby's Fiber scheduler for concurrent I/O operations:
require 'async'
require 'async/http/endpoint'
require 'async/websocket/client'
Async do
endpoint = Async::HTTP::Endpoint.parse('wss://example.com/socket')
Async::WebSocket::Client.connect(endpoint) do |connection|
# Send message
connection.write({type: 'subscribe', channel: 'updates'})
connection.flush
# Receive messages
while message = connection.read
data = JSON.parse(message)
puts "Received: #{data}"
# Respond to ping
if data['type'] == 'ping'
connection.write({type: 'pong'})
connection.flush
end
end
rescue Async::WebSocket::ProtocolError => e
puts "Protocol error: #{e.message}"
end
end
Connection Pooling and Management
Production applications often require connection pooling to manage multiple WebSocket connections efficiently:
class WebSocketPool
def initialize(size: 5)
@pool = ConnectionPool.new(size: size, timeout: 5) do
create_connection
end
end
def with_connection(&block)
@pool.with(&block)
end
private
def create_connection
ws = Faye::WebSocket::Client.new('wss://api.example.com')
ws.on :open do
puts "Pool connection established"
end
ws.on :error do |event|
puts "Pool connection error: #{event.message}"
end
ws
end
end
# Usage
pool = WebSocketPool.new(size: 10)
pool.with_connection do |ws|
ws.send(JSON.generate(data))
end
Practical Examples
Real-world WebSocket applications demonstrate the protocol's capabilities across various use cases requiring real-time communication.
Real-Time Chat Application
A chat system illustrates bidirectional messaging, presence tracking, and message broadcasting:
# Server implementation
class ChatServer
def initialize
@clients = {}
@rooms = Hash.new { |h, k| h[k] = [] }
end
def handle_connection(ws, user_id)
@clients[user_id] = ws
ws.on :message do |event|
data = JSON.parse(event.data)
case data['type']
when 'join_room'
join_room(user_id, data['room_id'])
broadcast_presence(data['room_id'])
when 'message'
broadcast_message(
data['room_id'],
user_id,
data['content']
)
when 'typing'
broadcast_typing(data['room_id'], user_id, data['typing'])
end
end
ws.on :close do
leave_all_rooms(user_id)
@clients.delete(user_id)
end
end
private
def join_room(user_id, room_id)
@rooms[room_id] << user_id unless @rooms[room_id].include?(user_id)
end
def broadcast_message(room_id, sender_id, content)
message = {
type: 'message',
sender: sender_id,
content: content,
timestamp: Time.now.iso8601
}
@rooms[room_id].each do |user_id|
@clients[user_id]&.send(JSON.generate(message))
end
end
def broadcast_typing(room_id, user_id, typing)
event = {
type: 'typing',
user: user_id,
typing: typing
}
@rooms[room_id].each do |recipient_id|
next if recipient_id == user_id
@clients[recipient_id]&.send(JSON.generate(event))
end
end
def broadcast_presence(room_id)
users = @rooms[room_id].map { |uid| user_info(uid) }
event = {
type: 'presence',
users: users
}
@rooms[room_id].each do |user_id|
@clients[user_id]&.send(JSON.generate(event))
end
end
end
Live Data Dashboard
A monitoring dashboard demonstrates server push for real-time metrics updates:
class MetricsPublisher
def initialize
@subscribers = {}
start_metrics_collection
end
def subscribe(ws, metrics)
subscription_id = SecureRandom.uuid
@subscribers[subscription_id] = {
connection: ws,
metrics: metrics,
last_sent: {}
}
ws.on :close do
@subscribers.delete(subscription_id)
end
subscription_id
end
private
def start_metrics_collection
Thread.new do
loop do
metrics_data = collect_current_metrics
publish_updates(metrics_data)
sleep 1
end
end
end
def collect_current_metrics
{
cpu_usage: SystemMetrics.cpu_percent,
memory_usage: SystemMetrics.memory_percent,
active_users: UserSession.active_count,
request_rate: RequestCounter.rate_per_second,
error_rate: ErrorCounter.rate_per_minute
}
end
def publish_updates(current_metrics)
@subscribers.each do |id, subscriber|
updates = {}
subscriber[:metrics].each do |metric|
current_value = current_metrics[metric.to_sym]
last_value = subscriber[:last_sent][metric]
# Only send if changed or 10 seconds elapsed
if should_send_update?(metric, current_value, last_value)
updates[metric] = current_value
subscriber[:last_sent][metric] = current_value
end
end
if updates.any?
subscriber[:connection].send(JSON.generate({
type: 'metrics_update',
data: updates,
timestamp: Time.now.to_i
}))
end
end
end
def should_send_update?(metric, current, last)
return true if last.nil?
return true if (current - last).abs > threshold_for(metric)
false
end
end
Multiplayer Game State Synchronization
Game applications require low-latency state updates and conflict resolution:
class GameStateSync
def initialize
@game_states = {}
@player_connections = {}
end
def handle_player(ws, game_id, player_id)
@game_states[game_id] ||= GameState.new(game_id)
@player_connections[player_id] = ws
# Send initial state
ws.send(serialize_state(@game_states[game_id]))
ws.on :message do |event|
action = deserialize_action(event.data)
process_action(game_id, player_id, action)
end
ws.on :close do
handle_disconnect(game_id, player_id)
end
end
private
def process_action(game_id, player_id, action)
state = @game_states[game_id]
# Validate action
unless state.valid_action?(player_id, action)
send_error(player_id, "Invalid action")
return
end
# Apply action with timestamp
action[:timestamp] = Time.now.to_f
state.apply_action(player_id, action)
# Broadcast state delta
delta = state.last_delta
broadcast_to_game(game_id, {
type: 'state_delta',
delta: delta,
sequence: state.sequence_number
})
# Check win conditions
if winner = state.check_winner
broadcast_to_game(game_id, {
type: 'game_over',
winner: winner
})
end
end
def broadcast_to_game(game_id, message)
state = @game_states[game_id]
state.players.each do |player_id|
@player_connections[player_id]&.send(JSON.generate(message))
end
end
def handle_disconnect(game_id, player_id)
@player_connections.delete(player_id)
state = @game_states[game_id]
state.mark_player_inactive(player_id)
# Notify other players
broadcast_to_game(game_id, {
type: 'player_disconnected',
player_id: player_id
})
# Clean up if all players gone
if state.all_players_inactive?
@game_states.delete(game_id)
end
end
end
Financial Data Streaming
Stock ticker applications demonstrate high-frequency updates with throttling:
class TickerStream
def initialize
@subscriptions = Hash.new { |h, k| h[k] = [] }
start_market_data_feed
end
def subscribe(ws, symbols)
subscription = {
connection: ws,
symbols: symbols.map(&:upcase),
buffer: [],
last_flush: Time.now
}
symbols.each do |symbol|
@subscriptions[symbol] << subscription
end
ws.on :message do |event|
handle_subscription_change(ws, event.data)
end
ws.on :close do
remove_subscription(subscription)
end
end
private
def start_market_data_feed
Thread.new do
loop do
# Receive market data updates
updates = MarketDataFeed.get_updates
updates.each do |symbol, quote|
publish_quote(symbol, quote)
end
flush_buffers
sleep 0.1 # 10 updates per second maximum
end
end
end
def publish_quote(symbol, quote)
@subscriptions[symbol].each do |sub|
sub[:buffer] << {
symbol: symbol,
price: quote.price,
volume: quote.volume,
timestamp: quote.timestamp
}
end
end
def flush_buffers
@subscriptions.values.flatten.uniq.each do |sub|
next if sub[:buffer].empty?
# Coalesce updates - send only latest for each symbol
latest = sub[:buffer].group_by { |u| u[:symbol] }
.transform_values(&:last)
sub[:connection].send(JSON.generate({
type: 'quotes',
data: latest.values
}))
sub[:buffer].clear
sub[:last_flush] = Time.now
end
end
end
Security Implications
WebSocket connections introduce security considerations that differ from traditional HTTP request-response patterns. The persistent nature of WebSocket connections and their ability to bypass same-origin policy in certain contexts require careful security implementation.
Origin Validation
The WebSocket handshake includes an Origin header that indicates the source of the connection request. Servers must validate this origin to prevent unauthorized cross-site WebSocket connections. Unlike HTTP cookies, WebSocket connections do not automatically enforce same-origin policy:
class SecureWebSocketServer
ALLOWED_ORIGINS = [
'https://app.example.com',
'https://admin.example.com'
].freeze
def validate_connection(env)
origin = env['HTTP_ORIGIN']
unless origin_allowed?(origin)
return [403, {}, ['Forbidden: Invalid origin']]
end
# Additional validation
unless authenticated?(env)
return [401, {}, ['Unauthorized']]
end
# Origin and auth valid
true
end
private
def origin_allowed?(origin)
return false if origin.nil?
# Exact match or subdomain check
ALLOWED_ORIGINS.any? do |allowed|
origin == allowed || origin.end?(".#{allowed}")
end
end
def authenticated?(env)
token = extract_token(env)
return false unless token
# Validate JWT or session token
TokenValidator.valid?(token)
end
def extract_token(env)
# Check query parameter
params = Rack::Utils.parse_query(env['QUERY_STRING'])
return params['token'] if params['token']
# Check cookie
cookies = Rack::Utils.parse_cookies(env)
cookies['auth_token']
end
end
Transport Layer Security
Production WebSocket deployments must use TLS (wss://) to prevent eavesdropping and man-in-the-middle attacks. Unencrypted WebSocket connections (ws://) expose all transmitted data, including authentication credentials and application data:
# Rack configuration for secure WebSocket
app = Rack::Builder.new do
use Rack::SSL, exclude: -> (env) {
# Allow health checks without SSL
env['PATH_INFO'] == '/health'
}
map '/cable' do
run ActionCable.server
end
end
# Enforce secure WebSocket URLs in client code
WebSocket::Client.configure do |config|
config.enforce_wss = Rails.env.production?
end
Message Validation and Sanitization
All incoming WebSocket messages must be validated and sanitized. Message content should be treated as untrusted user input, subject to the same validation as HTTP request parameters:
class MessageValidator
MAX_MESSAGE_SIZE = 64.kilobytes
RATE_LIMIT = 100 # messages per minute
def initialize
@rate_tracker = Hash.new { |h, k| h[k] = [] }
end
def validate(user_id, raw_message)
# Size check
if raw_message.bytesize > MAX_MESSAGE_SIZE
raise SecurityError, "Message exceeds size limit"
end
# Rate limiting
unless within_rate_limit?(user_id)
raise SecurityError, "Rate limit exceeded"
end
# Parse and validate structure
message = JSON.parse(raw_message)
unless valid_message_structure?(message)
raise SecurityError, "Invalid message structure"
end
# Sanitize content
sanitize_message(message)
end
private
def within_rate_limit?(user_id)
now = Time.now
cutoff = now - 60
# Remove old entries
@rate_tracker[user_id].reject! { |t| t < cutoff }
# Check limit
if @rate_tracker[user_id].size >= RATE_LIMIT
return false
end
@rate_tracker[user_id] << now
true
end
def sanitize_message(message)
if message['content'].is_a?(String)
message['content'] = sanitize_html(message['content'])
end
# Remove potentially dangerous keys
message.delete('__proto__')
message.delete('constructor')
message
end
def sanitize_html(content)
# Use appropriate sanitization for your use case
Sanitize.fragment(content, Sanitize::Config::BASIC)
end
end
Cross-Site WebSocket Hijacking Prevention
CSWSH attacks exploit the fact that browsers send cookies with WebSocket handshake requests. Attackers can trick users into establishing WebSocket connections from malicious sites. Defense requires combining origin validation, CSRF tokens, and custom authentication:
class CSWSHProtection
def validate_handshake(env, cookies)
# Origin check
unless valid_origin?(env['HTTP_ORIGIN'])
raise SecurityError, "Invalid origin"
end
# CSRF token in initial handshake
token = parse_query_params(env)['csrf_token']
unless valid_csrf_token?(token, cookies['session_id'])
raise SecurityError, "Invalid CSRF token"
end
# Additional authentication
unless authenticated_session?(cookies['session_id'])
raise SecurityError, "Unauthenticated"
end
true
end
private
def valid_csrf_token?(token, session_id)
expected = generate_csrf_token(session_id)
ActiveSupport::SecurityUtils.secure_compare(token, expected)
end
def generate_csrf_token(session_id)
# Use your CSRF token generation strategy
OpenSSL::HMAC.hexdigest('SHA256', Rails.application.secret_key_base, session_id)
end
end
Denial of Service Prevention
WebSocket connections remain open indefinitely, making servers vulnerable to resource exhaustion. Implement connection limits, idle timeouts, and message size restrictions:
class ConnectionLimiter
MAX_CONNECTIONS_PER_IP = 10
IDLE_TIMEOUT = 300 # seconds
def initialize
@connections = Hash.new(0)
@last_activity = {}
start_timeout_monitor
end
def accept_connection?(ip_address)
@connections[ip_address] < MAX_CONNECTIONS_PER_IP
end
def track_connection(connection_id, ip_address)
@connections[ip_address] += 1
@last_activity[connection_id] = Time.now
end
def update_activity(connection_id)
@last_activity[connection_id] = Time.now
end
private
def start_timeout_monitor
Thread.new do
loop do
check_timeouts
sleep 60
end
end
end
def check_timeouts
now = Time.now
@last_activity.each do |connection_id, last_time|
if now - last_time > IDLE_TIMEOUT
close_connection(connection_id)
end
end
end
end
Performance Considerations
WebSocket performance characteristics differ significantly from HTTP due to persistent connections and different framing overhead. Understanding these factors enables efficient implementation and scaling.
Connection Overhead and Scaling
WebSocket connections consume server resources for their entire duration. Each connection requires memory for buffers, file descriptors for sockets, and processing capacity for message handling. A single server typically handles 10,000-100,000 concurrent connections depending on message frequency and processing complexity:
class ConnectionMetrics
def initialize
@connection_count = 0
@message_rates = Hash.new(0)
@bandwidth_usage = Hash.new(0)
end
def track_connection
@connection_count += 1
log_metrics if @connection_count % 1000 == 0
end
def track_message(connection_id, size)
@message_rates[connection_id] += 1
@bandwidth_usage[connection_id] += size
end
def log_metrics
avg_message_rate = @message_rates.values.sum.to_f / @connection_count
total_bandwidth = @bandwidth_usage.values.sum
puts "Connections: #{@connection_count}"
puts "Avg messages/sec: #{avg_message_rate}"
puts "Total bandwidth: #{total_bandwidth / 1024}KB"
puts "Memory per connection: #{memory_per_connection}KB"
end
private
def memory_per_connection
# Approximate memory usage
process_memory = `ps -o rss= -p #{Process.pid}`.to_i
process_memory / @connection_count
end
end
Message Batching and Coalescing
High-frequency updates benefit from batching multiple messages into single frames. This reduces framing overhead and system call frequency:
class MessageBatcher
def initialize(flush_interval: 0.1)
@batches = Hash.new { |h, k| h[k] = [] }
@flush_interval = flush_interval
start_flusher
end
def queue_message(connection_id, message)
@batches[connection_id] << message
# Flush immediately if batch large enough
flush_batch(connection_id) if @batches[connection_id].size >= 10
end
private
def start_flusher
Thread.new do
loop do
sleep @flush_interval
flush_all_batches
end
end
end
def flush_all_batches
@batches.each do |connection_id, messages|
flush_batch(connection_id) unless messages.empty?
end
end
def flush_batch(connection_id)
messages = @batches[connection_id]
return if messages.empty?
connection = get_connection(connection_id)
connection.send(JSON.generate({
type: 'batch',
messages: messages,
count: messages.size
}))
@batches[connection_id].clear
end
end
Compression and Frame Optimization
The permessage-deflate extension compresses individual messages, reducing bandwidth for text-heavy payloads. Binary protocols using MessagePack or Protocol Buffers offer better performance than JSON for structured data:
class OptimizedSerializer
def initialize(use_compression: true)
@use_compression = use_compression
end
def serialize(data)
# Use MessagePack for binary efficiency
packed = MessagePack.pack(data)
if @use_compression && packed.bytesize > 1024
compressed = Zlib::Deflate.deflate(packed)
# Only use compressed if actually smaller
if compressed.bytesize < packed.bytesize
return { compressed: true, data: compressed }
end
end
{ compressed: false, data: packed }
end
def deserialize(payload)
if payload[:compressed]
packed = Zlib::Inflate.inflate(payload[:data])
else
packed = payload[:data]
end
MessagePack.unpack(packed)
end
end
# Benchmark comparison
Benchmark.bm do |x|
data = { users: 1000.times.map { |i| { id: i, name: "User#{i}" } } }
x.report("JSON") {
1000.times { JSON.generate(data) }
}
x.report("MessagePack") {
1000.times { MessagePack.pack(data) }
}
end
Connection Pooling and Load Distribution
Distributing WebSocket connections across multiple processes or servers prevents resource exhaustion. Redis pub/sub enables message routing between processes:
class DistributedWebSocket
def initialize
@redis = Redis.new
@local_connections = {}
subscribe_to_broadcasts
end
def register_connection(connection_id, ws)
@local_connections[connection_id] = ws
# Store connection server mapping in Redis
@redis.hset('connection_servers', connection_id, server_id)
end
def send_to_connection(connection_id, message)
if @local_connections.key?(connection_id)
# Local connection
@local_connections[connection_id].send(message)
else
# Remote connection - publish to Redis
server = @redis.hget('connection_servers', connection_id)
@redis.publish("server:#{server}", {
connection_id: connection_id,
message: message
}.to_json)
end
end
def broadcast_to_all(message)
# Publish once - all servers receive
@redis.publish('broadcast', message)
end
private
def subscribe_to_broadcasts
Thread.new do
Redis.new.subscribe("server:#{server_id}", 'broadcast') do |on|
on.message do |channel, data|
handle_redis_message(data)
end
end
end
end
def handle_redis_message(data)
parsed = JSON.parse(data)
if parsed['connection_id']
# Targeted message
conn = @local_connections[parsed['connection_id']]
conn&.send(parsed['message'])
else
# Broadcast to all local connections
@local_connections.each_value do |ws|
ws.send(parsed['message'])
end
end
end
def server_id
@server_id ||= "#{Socket.gethostname}:#{Process.pid}"
end
end
Heartbeat and Keep-Alive Optimization
Ping/pong frames maintain connection liveness but add overhead. Balance detection speed against network traffic:
class HeartbeatManager
PING_INTERVAL = 30 # seconds
PONG_TIMEOUT = 10 # seconds
def monitor_connection(ws)
last_pong = Time.now
Thread.new do
loop do
sleep PING_INTERVAL
ws.ping
pong_received = false
# Wait for pong
timeout_at = Time.now + PONG_TIMEOUT
until pong_received || Time.now > timeout_at
sleep 0.1
pong_received = check_pong_received(ws)
end
unless pong_received
ws.close(1001, "Ping timeout")
break
end
end
end
ws.on :pong do
mark_pong_received(ws)
end
end
end
Tools & Ecosystem
Ruby's WebSocket ecosystem includes libraries for various use cases, from low-level protocol implementation to high-level application frameworks.
Core Protocol Libraries
The websocket-driver gem provides RFC 6455 compliant protocol implementation. It operates as a streaming parser handling frame encoding, decoding, and validation. Applications using custom I/O systems or integration with non-standard event loops use this library directly.
The faye-websocket gem builds on websocket-driver to provide EventMachine and EventLoop integration. It offers a cleaner API for standard WebSocket operations and handles connection lifecycle management automatically.
Framework Integration
ActionCable integrates WebSocket functionality into Rails applications. It provides channel abstractions, authentication hooks, and pub/sub mechanisms. ActionCable adapters support various backends including Redis, PostgreSQL, and async adapters.
The websocket-rails gem offers an alternative Rails integration focused on event-driven architecture. It provides controller-style routing for WebSocket messages and data store synchronization.
Client Libraries
The websocket-client-simple gem implements WebSocket client functionality with a straightforward API:
require 'websocket-client-simple'
ws = WebSocket::Client::Simple.connect 'wss://example.com/socket'
ws.on :message do |msg|
puts msg.data
end
ws.on :open do
ws.send 'hello'
end
ws.on :close do |e|
puts "Connection closed: #{e.code} #{e.reason}"
end
ws.on :error do |e|
puts "Error: #{e.message}"
end
Testing Tools
The websocket-client-simple gem facilitates WebSocket testing in RSpec and Minitest:
# RSpec WebSocket testing
RSpec.describe 'WebSocket Chat' do
let(:ws_url) { 'ws://localhost:9292/chat' }
it 'broadcasts messages to all clients' do
clients = []
received_messages = []
# Connect multiple clients
3.times do
ws = WebSocket::Client::Simple.connect(ws_url)
ws.on(:message) { |msg| received_messages << msg.data }
clients << ws
sleep 0.1 # Allow connection
end
# Send message from first client
clients.first.send(JSON.generate({
type: 'message',
content: 'Hello everyone'
}))
sleep 0.5 # Allow broadcast
expect(received_messages.size).to eq(3)
clients.each(&:close)
end
end
Performance Monitoring
The scout_apm and skylight gems provide WebSocket connection monitoring and performance insights:
class MonitoredWebSocket
def handle_connection(ws)
ScoutApm::Transaction.start_layer(
ScoutApm::Layer.new('WebSocket', 'Connection')
)
ws.on :message do |event|
ScoutApm::Transaction.start_layer(
ScoutApm::Layer.new('WebSocket', 'Message')
)
begin
process_message(event.data)
ensure
ScoutApm::Transaction.stop_layer
end
end
ws.on :close do
ScoutApm::Transaction.stop_layer
end
end
end
Proxy and Load Balancer Support
Nginx and HAProxy support WebSocket proxying with specific configuration. The rack-timeout gem requires special handling for long-lived connections:
# config/initializers/rack_timeout.rb
Rack::Timeout.unregister_state_change_observer(:logger)
# Exclude WebSocket paths from timeout
Rack::Timeout::Logger.disable_for(%r{^/cable})
Message Queue Integration
WebSocket applications integrate with message queues for reliability and scaling:
class QueuedWebSocketPublisher
def initialize
@rabbitmq = Bunny.new
@rabbitmq.start
@channel = @rabbitmq.create_channel
@exchange = @channel.fanout('websocket_events')
end
def publish(event_type, data)
message = {
type: event_type,
data: data,
timestamp: Time.now.iso8601
}
@exchange.publish(
JSON.generate(message),
routing_key: event_type,
persistent: true
)
end
def subscribe(&block)
queue = @channel.queue('', exclusive: true)
queue.bind(@exchange)
queue.subscribe do |delivery_info, properties, body|
event = JSON.parse(body)
block.call(event)
end
end
end
Reference
WebSocket URL Schemes
| Scheme | Description | Default Port |
|---|---|---|
| ws | Unencrypted WebSocket | 80 |
| wss | TLS-encrypted WebSocket | 443 |
Handshake Headers
| Header | Direction | Purpose |
|---|---|---|
| Upgrade | Client | Requests protocol upgrade to websocket |
| Connection | Client | Must contain Upgrade |
| Sec-WebSocket-Key | Client | Base64-encoded random 16-byte value |
| Sec-WebSocket-Accept | Server | SHA-1 hash of key + magic string |
| Sec-WebSocket-Version | Client | WebSocket protocol version (13) |
| Sec-WebSocket-Protocol | Both | Subprotocol negotiation |
| Sec-WebSocket-Extensions | Both | Extension negotiation |
| Origin | Client | Origin of connection request |
Frame Types
| Opcode | Type | Purpose |
|---|---|---|
| 0x0 | Continuation | Continuation frame for fragmented message |
| 0x1 | Text | UTF-8 text message |
| 0x2 | Binary | Binary data message |
| 0x8 | Close | Connection close with optional status |
| 0x9 | Ping | Liveness check request |
| 0xA | Pong | Response to ping frame |
Close Status Codes
| Code | Meaning | Usage |
|---|---|---|
| 1000 | Normal Closure | Successful completion |
| 1001 | Going Away | Endpoint removed (server shutdown, browser navigation) |
| 1002 | Protocol Error | Protocol violation detected |
| 1003 | Unsupported Data | Endpoint cannot process data type |
| 1006 | Abnormal Closure | No close frame received (connection lost) |
| 1007 | Invalid Frame Payload Data | Inconsistent message data |
| 1008 | Policy Violation | Message violates endpoint policy |
| 1009 | Message Too Big | Message exceeds size limit |
| 1010 | Mandatory Extension | Client expected extension not negotiated |
| 1011 | Internal Server Error | Server encountered unexpected condition |
| 1015 | TLS Handshake | TLS handshake failure |
Ruby WebSocket Gems Comparison
| Gem | Concurrency Model | Use Case |
|---|---|---|
| websocket-driver | Any | Low-level protocol implementation |
| faye-websocket | EventMachine | EventMachine applications |
| async-websocket | Fiber-based | Async framework applications |
| websocket-client-simple | Thread-based | Simple client implementations |
| actioncable | Thread pool | Rails applications |
ActionCable Channel Methods
| Method | Purpose | When Called |
|---|---|---|
| subscribed | Handle new subscription | Client subscribes to channel |
| unsubscribed | Handle subscription removal | Client unsubscribes |
| receive | Process incoming message | Client sends data to channel |
| stream_from | Subscribe to broadcast stream | Setup streaming during subscribed |
| stream_for | Subscribe to model-based stream | Setup model-specific streaming |
| transmit | Send message to subscriber | Any channel method |
Connection States
| State | Description | Valid Transitions |
|---|---|---|
| CONNECTING | Handshake in progress | OPEN, CLOSED |
| OPEN | Connection established | CLOSING |
| CLOSING | Close frame sent | CLOSED |
| CLOSED | Connection terminated | None |
Extension Parameters
| Extension | Parameter | Purpose |
|---|---|---|
| permessage-deflate | server_no_context_takeover | Server resets compression context |
| permessage-deflate | client_no_context_takeover | Client resets compression context |
| permessage-deflate | server_max_window_bits | Server compression window size |
| permessage-deflate | client_max_window_bits | Client compression window size |
Frame Header Structure
| Field | Bits | Description |
|---|---|---|
| FIN | 1 | Final fragment flag |
| RSV1-3 | 3 | Reserved for extensions |
| Opcode | 4 | Frame type identifier |
| MASK | 1 | Payload masking flag |
| Payload Length | 7, 7+16, or 7+64 | Payload size in bytes |
| Masking Key | 0 or 32 | XOR mask for payload |
| Payload Data | Variable | Application data |
Ruby Implementation Patterns
| Pattern | Code Approach |
|---|---|
| Basic Connection | Faye::WebSocket.new(env) |
| Client Connection | WebSocket::Client::Simple.connect(url) |
| Authentication | Override ApplicationCable::Connection#connect |
| Broadcasting | ActionCable.server.broadcast(channel, message) |
| Streaming | stream_from(channel_name) in subscribed method |
| Message Validation | Validate in receive(data) method |
| Connection Tracking | Store connections in instance variable hash |
| Heartbeat | Implement custom ping/pong in connection class |