CrackedRuby CrackedRuby

Overview

TCP flow control manages the rate at which data transmits between two endpoints in a network connection. The receiver controls this rate by advertising its available buffer space to the sender, preventing the sender from transmitting more data than the receiver can process. This prevents packet loss, reduces retransmissions, and maintains efficient network utilization.

The sliding window protocol forms the foundation of TCP flow control. The receiver maintains a receive window that indicates how many bytes it can accept. The sender tracks this value and limits its transmission accordingly. As the receiver processes data and frees buffer space, it updates the window size in acknowledgment packets, allowing the sender to transmit additional data.

Flow control operates independently from congestion control. While flow control prevents overwhelming the receiver, congestion control prevents overwhelming the network. Both mechanisms work together to ensure reliable data delivery, but they address different problems with different solutions.

require 'socket'

# Basic TCP connection showing buffer sizes
server = TCPServer.new(8080)
client_socket = server.accept

# Display receive buffer size (relates to flow control window)
recv_buffer = client_socket.getsockopt(Socket::SOL_SOCKET, Socket::SO_RCVBUF)
puts "Receive buffer size: #{recv_buffer.int} bytes"
# => Receive buffer size: 131072 bytes

# Display send buffer size
send_buffer = client_socket.getsockopt(Socket::SOL_SOCKET, Socket::SO_SNDBUF)
puts "Send buffer size: #{send_buffer.int} bytes"
# => Send buffer size: 16384 bytes

The window size appears in every TCP segment's header, allowing dynamic adjustment during the connection lifetime. When the receiver's buffer fills, it advertises a smaller window or even a zero window, pausing transmission until space becomes available.

Key Principles

The receive window represents the fundamental unit of flow control. This value, measured in bytes, appears in the TCP header of every segment the receiver sends. The sender must not transmit more unacknowledged data than this window allows. The receiver calculates the window based on available buffer space: receive_window = buffer_size - unprocessed_data.

Window scaling extends the maximum window size beyond the 16-bit header field limit of 65,535 bytes. Networks with high bandwidth-delay products require larger windows to maintain throughput. The scaling factor, negotiated during connection establishment, multiplies the advertised window value by powers of two up to 2^14, achieving a maximum window size of 1 gigabyte.

Zero window probing prevents deadlock when the receiver advertises a zero window. The sender periodically transmits zero window probe segments to check if buffer space has become available. The receiver responds with the current window size, allowing transmission to resume when space exists. Without probing, the connection could stall indefinitely if the window update acknowledgment was lost.

Silly window syndrome occurs when either sender or receiver operates with very small window sizes, creating inefficient data transfer. The sender might transmit tiny segments, or the receiver might advertise small windows, both reducing throughput due to overhead. TCP implementations include algorithms to prevent this:

  • Nagle's algorithm delays sending small segments when unacknowledged data exists
  • The receiver delays advertising window increases until significant space becomes available
  • Clark's algorithm prevents the receiver from advertising small window increases

The cumulative acknowledgment mechanism supports flow control by confirming receipt of data. The receiver sends acknowledgments containing the sequence number of the next expected byte. This number implicitly acknowledges all prior bytes, allowing the sender to advance its sending window and transmit new data.

# Simulating window calculation
class FlowControlWindow
  attr_reader :buffer_size, :unprocessed_data
  
  def initialize(buffer_size)
    @buffer_size = buffer_size
    @unprocessed_data = 0
  end
  
  def receive_data(bytes)
    @unprocessed_data += bytes
  end
  
  def process_data(bytes)
    @unprocessed_data = [@unprocessed_data - bytes, 0].max
  end
  
  def advertised_window
    [@buffer_size - @unprocessed_data, 0].max
  end
end

window = FlowControlWindow.new(65536)
puts window.advertised_window
# => 65536

window.receive_data(32768)
puts window.advertised_window
# => 32768

window.process_data(16384)
puts window.advertised_window
# => 49152

The sender maintains its own window tracking independent of the receiver. The sending window represents the range of sequence numbers the sender can transmit. This window advances as acknowledgments arrive, allowing new data transmission. The sender's usable window equals the minimum of the advertised receive window and the congestion window.

Ruby Implementation

Ruby applications interact with TCP flow control through socket operations and buffer management. The operating system's TCP stack handles flow control automatically, but applications influence behavior through socket options and I/O patterns.

Socket buffer sizes directly affect flow control window sizes. The SO_RCVBUF option sets the receive buffer size, which determines the maximum window the receiver can advertise. The SO_SNDBUF option sets the send buffer size, affecting how much data the sender can buffer before blocking.

require 'socket'

# Configuring socket buffers
socket = TCPSocket.new('example.com', 80)

# Set receive buffer to 256KB
socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_RCVBUF, 262144)

# Set send buffer to 128KB
socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDBUF, 131072)

# Verify settings
recv_buf = socket.getsockopt(Socket::SOL_SOCKET, Socket::SO_RCVBUF).int
send_buf = socket.getsockopt(Socket::SOL_SOCKET, Socket::SO_SNDBUF).int

puts "Configured receive buffer: #{recv_buf} bytes"
puts "Configured send buffer: #{send_buf} bytes"
# => Configured receive buffer: 262144 bytes
# => Configured send buffer: 131072 bytes

Non-blocking I/O operations reveal flow control effects. When the send buffer fills because the receiver cannot accept more data, write operations return EAGAIN or EWOULDBLOCK errors instead of blocking. Applications must handle these conditions by buffering data locally or slowing production.

require 'socket'

socket = TCPSocket.new('example.com', 80)
socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDBUF, 8192)

# Set non-blocking mode
socket.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)

data = 'x' * 100000  # 100KB of data
bytes_sent = 0

loop do
  begin
    sent = socket.write_nonblock(data[bytes_sent..-1])
    bytes_sent += sent
    puts "Sent #{sent} bytes (#{bytes_sent} total)"
    break if bytes_sent >= data.size
  rescue IO::WaitWritable
    puts "Socket not writable - send buffer full due to flow control"
    # Wait for socket to become writable
    IO.select(nil, [socket])
    retry
  end
end

The IO.select method monitors socket readiness, respecting flow control constraints. Selecting for writability indicates the send buffer has space, meaning the receiver's advertised window allows more data transmission. This pattern prevents busy-waiting while respecting flow control limits.

require 'socket'

class FlowControlAwareWriter
  def initialize(socket)
    @socket = socket
    @buffer = ""
  end
  
  def write(data)
    @buffer << data
    flush
  end
  
  def flush
    until @buffer.empty?
      begin
        sent = @socket.write_nonblock(@buffer)
        @buffer = @buffer[sent..-1]
      rescue IO::WaitWritable
        # Receiver's window is full, wait for space
        IO.select(nil, [@socket], nil, 1.0)
      rescue IO::EAGAINWaitWritable
        # Same as WaitWritable
        IO.select(nil, [@socket], nil, 1.0)
      end
    end
  end
  
  def pending_bytes
    @buffer.size
  end
end

socket = TCPSocket.new('example.com', 80)
writer = FlowControlAwareWriter.new(socket)

# Write large amount of data
writer.write('x' * 1_000_000)
puts "Pending: #{writer.pending_bytes} bytes"
# => Pending: 0 bytes (after flush completes)

TCP keepalive and timeout settings interact with flow control. When a receiver stops accepting data but doesn't close the connection, the sender's buffer fills and write operations block. Keepalive probes detect dead connections, while timeouts prevent indefinite blocking.

require 'socket'

socket = TCPSocket.new('example.com', 80)

# Enable keepalive
socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_KEEPALIVE, 1)

# Set keepalive parameters (Linux-specific)
if Socket.const_defined?(:TCP_KEEPIDLE)
  socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_KEEPIDLE, 60)   # 60s idle before probing
  socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_KEEPINTVL, 10)  # 10s between probes
  socket.setsockopt(Socket::IPPROTO_TCP, Socket::TCP_KEEPCNT, 3)     # 3 failed probes before timeout
end

# Set send timeout
socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, [30, 0].pack('l_2'))

begin
  socket.write('x' * 1_000_000)
rescue Errno::ETIMEDOUT
  puts "Write timed out - possible flow control stall"
end

Practical Examples

A file transfer server demonstrates flow control behavior with large data volumes. The server reads file data and writes to the socket, experiencing flow control back-pressure when the receiver processes data slowly.

require 'socket'

class FileTransferServer
  def initialize(port)
    @server = TCPServer.new(port)
  end
  
  def start
    puts "Server listening on port #{@server.local_address.ip_port}"
    
    loop do
      client = @server.accept
      Thread.new(client) { |c| handle_client(c) }
    end
  end
  
  def handle_client(socket)
    puts "Client connected: #{socket.peeraddr[2]}"
    
    # Configure socket for monitoring flow control
    socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDBUF, 65536)
    
    File.open('large_file.dat', 'rb') do |file|
      total_sent = 0
      write_blocks = 0
      
      while chunk = file.read(8192)
        start_time = Time.now
        
        begin
          sent = socket.write_nonblock(chunk)
          total_sent += sent
          
          # Check if write blocked due to flow control
          elapsed = Time.now - start_time
          if elapsed > 0.01  # More than 10ms suggests blocking
            write_blocks += 1
          end
          
        rescue IO::WaitWritable
          write_blocks += 1
          IO.select(nil, [socket])
          retry
        end
      end
      
      puts "Transfer complete: #{total_sent} bytes"
      puts "Write blocked #{write_blocks} times due to flow control"
    end
    
    socket.close
  end
end

# Start server
server = FileTransferServer.new(9000)
server.start

A rate-limited data producer shows application-level flow control working with TCP flow control. The application generates data faster than the network can transmit, requiring coordination between production rate and transmission rate.

require 'socket'

class RateLimitedProducer
  def initialize(socket, rate_limit_mbps)
    @socket = socket
    @rate_limit_bytes_per_sec = (rate_limit_mbps * 1_000_000) / 8
    @window_size = 1.0  # 1 second window
    @sent_this_window = 0
    @window_start = Time.now
  end
  
  def send_data(data)
    bytes_to_send = data.size
    offset = 0
    
    while offset < bytes_to_send
      # Check if we've exceeded rate limit for this window
      enforce_rate_limit
      
      # Calculate how much we can send in this window
      remaining_quota = (@rate_limit_bytes_per_sec * @window_size) - @sent_this_window
      chunk_size = [remaining_quota, bytes_to_send - offset, 8192].min.to_i
      
      if chunk_size > 0
        chunk = data[offset, chunk_size]
        
        begin
          sent = @socket.write_nonblock(chunk)
          offset += sent
          @sent_this_window += sent
        rescue IO::WaitWritable
          # TCP flow control blocking - receiver window full
          puts "TCP flow control active - waiting for receiver"
          IO.select(nil, [@socket])
          retry
        end
      else
        # Rate limit hit, wait for next window
        sleep(0.01)
      end
    end
  end
  
  private
  
  def enforce_rate_limit
    elapsed = Time.now - @window_start
    
    if elapsed >= @window_size
      # Reset window
      @window_start = Time.now
      @sent_this_window = 0
    elsif @sent_this_window >= @rate_limit_bytes_per_sec * @window_size
      # Wait until window resets
      sleep_time = @window_size - elapsed
      sleep(sleep_time) if sleep_time > 0
      @window_start = Time.now
      @sent_this_window = 0
    end
  end
end

# Usage
socket = TCPSocket.new('example.com', 80)
producer = RateLimitedProducer.new(socket, 10)  # 10 Mbps limit

data = 'x' * 10_000_000  # 10MB
producer.send_data(data)

A streaming media server illustrates handling multiple concurrent connections with different flow control states. Some clients consume data quickly while others lag, requiring per-connection buffer management.

require 'socket'

class StreamingServer
  def initialize(port)
    @server = TCPServer.new(port)
    @clients = []
    @mutex = Mutex.new
  end
  
  def start
    # Accept connections in background
    Thread.new { accept_clients }
    
    # Broadcast data to all clients
    broadcast_stream
  end
  
  def accept_clients
    loop do
      client = @server.accept
      client.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDBUF, 131072)
      
      client_info = {
        socket: client,
        buffer: "",
        slow_client: false,
        blocked_count: 0
      }
      
      @mutex.synchronize { @clients << client_info }
      puts "Client connected: #{client.peeraddr[2]}"
    end
  end
  
  def broadcast_stream
    sequence = 0
    
    loop do
      # Generate data chunk
      chunk = "Frame #{sequence}: " + ('x' * 1024) + "\n"
      sequence += 1
      
      @mutex.synchronize do
        @clients.each do |client_info|
          # Add to client's buffer
          client_info[:buffer] << chunk
          
          # Try to flush buffer
          flush_client_buffer(client_info)
        end
        
        # Remove disconnected or persistently slow clients
        @clients.reject! do |info|
          info[:socket].closed? || info[:blocked_count] > 100
        end
      end
      
      sleep(0.01)  # 100 frames per second
    end
  end
  
  def flush_client_buffer(client_info)
    return if client_info[:buffer].empty?
    
    begin
      sent = client_info[:socket].write_nonblock(client_info[:buffer])
      client_info[:buffer] = client_info[:buffer][sent..-1]
      client_info[:blocked_count] = 0
      
    rescue IO::WaitWritable
      # Flow control blocking this client
      client_info[:blocked_count] += 1
      
      if client_info[:blocked_count] > 50 && !client_info[:slow_client]
        puts "Client #{client_info[:socket].peeraddr[2]} marked as slow"
        client_info[:slow_client] = true
      end
      
      # Don't block server, continue to next client
    rescue => e
      puts "Error writing to client: #{e.message}"
      client_info[:socket].close rescue nil
    end
  end
end

server = StreamingServer.new(8080)
server.start

Implementation Approaches

The blocking I/O approach handles flow control implicitly by blocking write calls when the send buffer fills. This simplifies application logic but limits concurrency to one operation per thread. Applications must use thread pools or process pools to handle multiple concurrent connections.

# Blocking approach - simple but limits concurrency
socket = TCPSocket.new('example.com', 80)

# This blocks automatically when flow control limits transmission
socket.write('x' * 1_000_000)

# Thread-based concurrency for multiple connections
threads = 10.times.map do |i|
  Thread.new do
    socket = TCPSocket.new('example.com', 80)
    socket.write("Data from thread #{i}: " + ('x' * 100000))
    socket.close
  end
end

threads.each(&:join)

The non-blocking I/O with event loop approach handles flow control explicitly through readiness notifications. Applications monitor socket writability and buffer data locally when flow control prevents transmission. This enables high concurrency with single-threaded or limited-thread architectures.

require 'socket'

class NonBlockingWriter
  def initialize
    @sockets = {}  # socket => pending_data
    @read_list = []
    @write_list = []
  end
  
  def add_connection(host, port, data)
    socket = TCPSocket.new(host, port)
    socket.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)
    @sockets[socket] = data
    @write_list << socket
  end
  
  def run
    until @sockets.empty?
      readable, writable = IO.select(@read_list, @write_list, nil, 1.0)
      
      writable&.each do |socket|
        data = @sockets[socket]
        
        begin
          sent = socket.write_nonblock(data)
          remaining = data[sent..-1]
          
          if remaining.empty?
            socket.close
            @sockets.delete(socket)
            @write_list.delete(socket)
          else
            @sockets[socket] = remaining
          end
          
        rescue IO::WaitWritable
          # Flow control prevents writing, will retry on next select
        rescue => e
          puts "Error: #{e.message}"
          socket.close rescue nil
          @sockets.delete(socket)
          @write_list.delete(socket)
        end
      end
    end
  end
end

writer = NonBlockingWriter.new
10.times do |i|
  writer.add_connection('example.com', 80, "Request #{i}: " + ('x' * 50000))
end
writer.run

The buffered queue approach decouples data production from transmission by maintaining an application-level queue. Producer threads add data to the queue while dedicated writer threads drain the queue to sockets, handling flow control independently.

require 'socket'
require 'thread'

class BufferedQueueWriter
  def initialize(socket, queue_size_limit = 1000)
    @socket = socket
    @queue = Queue.new
    @queue_size_limit = queue_size_limit
    @running = true
    @bytes_queued = 0
    @mutex = Mutex.new
    
    @writer_thread = Thread.new { write_loop }
  end
  
  def write(data)
    # Block if queue is too large (application-level flow control)
    loop do
      @mutex.synchronize do
        if @bytes_queued < @queue_size_limit
          @queue << data
          @bytes_queued += data.size
          return
        end
      end
      sleep(0.01)
    end
  end
  
  def close
    @running = false
    @writer_thread.join
    @socket.close
  end
  
  private
  
  def write_loop
    while @running || !@queue.empty?
      begin
        data = @queue.pop(true)  # non-blocking pop
        
        # Handle TCP flow control
        offset = 0
        while offset < data.size
          sent = @socket.write(data[offset..-1])
          offset += sent
        end
        
        @mutex.synchronize do
          @bytes_queued -= data.size
        end
        
      rescue ThreadError
        # Queue empty
        sleep(0.01)
      rescue => e
        puts "Write error: #{e.message}"
        break
      end
    end
  end
end

socket = TCPSocket.new('example.com', 80)
writer = BufferedQueueWriter.new(socket, 500_000)

# Producer threads
producers = 5.times.map do |i|
  Thread.new do
    100.times do |j|
      writer.write("Producer #{i}, message #{j}: " + ('x' * 1000))
    end
  end
end

producers.each(&:join)
writer.close

Performance Considerations

Window size directly affects throughput, especially on high-latency connections. The bandwidth-delay product determines the optimal window size: optimal_window = bandwidth * round_trip_time. Smaller windows underutilize available bandwidth, while larger windows consume more memory without improving throughput once the bandwidth-delay product is satisfied.

# Calculate optimal window size
def optimal_window_size(bandwidth_mbps, rtt_ms)
  bandwidth_bytes_per_sec = (bandwidth_mbps * 1_000_000) / 8
  rtt_seconds = rtt_ms / 1000.0
  bandwidth_bytes_per_sec * rtt_seconds
end

# Example: 100 Mbps link with 50ms RTT
window = optimal_window_size(100, 50)
puts "Optimal window size: #{window} bytes (#{window / 1024}KB)"
# => Optimal window size: 625000 bytes (610KB)

# Configure socket accordingly
socket = TCPSocket.new('example.com', 80)
socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_RCVBUF, window.to_i)
socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDBUF, window.to_i)

Receive buffer processing latency impacts window advertisement. When the application reads data slowly from the receive buffer, the available window shrinks, throttling the sender. Applications should read data promptly to maintain large windows and high throughput.

require 'socket'
require 'benchmark'

# Fast reader - maintains large window
def fast_reader(socket)
  bytes_received = 0
  
  time = Benchmark.measure do
    while chunk = socket.read(8192)
      bytes_received += chunk.size
      # Process immediately
      break if bytes_received >= 1_000_000
    end
  end
  
  puts "Fast reader: #{bytes_received} bytes in #{time.real}s"
  puts "Throughput: #{(bytes_received / time.real / 1024).to_i} KB/s"
end

# Slow reader - reduces window frequently
def slow_reader(socket)
  bytes_received = 0
  
  time = Benchmark.measure do
    while chunk = socket.read(8192)
      bytes_received += chunk.size
      # Simulate slow processing
      sleep(0.01)
      break if bytes_received >= 1_000_000
    end
  end
  
  puts "Slow reader: #{bytes_received} bytes in #{time.real}s"
  puts "Throughput: #{(bytes_received / time.real / 1024).to_i} KB/s"
end

Write buffering affects flow control efficiency. Small, frequent writes increase system call overhead and may trigger silly window syndrome. Buffering writes at the application level improves efficiency by transmitting larger segments less frequently.

class BufferedSocketWriter
  def initialize(socket, buffer_size = 8192)
    @socket = socket
    @buffer_size = buffer_size
    @buffer = ""
  end
  
  def write(data)
    @buffer << data
    
    # Flush when buffer reaches threshold
    flush if @buffer.size >= @buffer_size
  end
  
  def flush
    return if @buffer.empty?
    
    @socket.write(@buffer)
    @buffer = ""
  end
  
  def close
    flush
    @socket.close
  end
end

# Compare buffered vs unbuffered writes
require 'benchmark'

socket1 = TCPSocket.new('example.com', 80)
time1 = Benchmark.measure do
  10000.times { socket1.write("x" * 100) }  # Many small writes
end
puts "Unbuffered: #{time1.real}s"

socket2 = TCPSocket.new('example.com', 80)
writer = BufferedSocketWriter.new(socket2)
time2 = Benchmark.measure do
  10000.times { writer.write("x" * 100) }
  writer.flush
end
puts "Buffered: #{time2.real}s"

Common Pitfalls

Ignoring write failures causes data loss when flow control prevents transmission. Applications must check write return values and handle partial writes, especially with non-blocking sockets. Assuming all data transmits in a single write call leads to silent data truncation.

# Wrong - ignores partial writes
socket.write(data)  # Might not send all data

# Correct - ensures all data transmits
def write_all(socket, data)
  offset = 0
  while offset < data.size
    sent = socket.write(data[offset..-1])
    offset += sent
  end
end

# Correct with error handling
def write_all_safe(socket, data)
  offset = 0
  retries = 0
  max_retries = 3
  
  while offset < data.size
    begin
      sent = socket.write(data[offset..-1])
      offset += sent
      retries = 0  # Reset on successful write
      
    rescue IO::WaitWritable
      retries += 1
      raise "Write failed after #{max_retries} retries" if retries > max_retries
      
      IO.select(nil, [socket], nil, 5.0)
      
    rescue Errno::EPIPE, Errno::ECONNRESET
      raise "Connection lost after sending #{offset} of #{data.size} bytes"
    end
  end
  
  offset
end

Blocking indefinitely on writes creates unresponsive applications. When the receiver stops consuming data but doesn't close the connection, write calls block forever. Applications need timeouts and monitoring to detect stuck connections.

# Add timeout to prevent indefinite blocking
socket = TCPSocket.new('example.com', 80)
socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDTIMEO, [30, 0].pack('l_2'))

begin
  write_with_timeout(socket, large_data, 30)
rescue Errno::ETIMEDOUT
  puts "Write timed out - receiver not consuming data"
  socket.close
end

def write_with_timeout(socket, data, timeout_seconds)
  deadline = Time.now + timeout_seconds
  offset = 0
  
  while offset < data.size
    remaining_time = deadline - Time.now
    raise Errno::ETIMEDOUT if remaining_time <= 0
    
    ready = IO.select(nil, [socket], nil, remaining_time)
    raise Errno::ETIMEDOUT unless ready
    
    begin
      sent = socket.write_nonblock(data[offset..-1])
      offset += sent
    rescue IO::WaitWritable
      # Loop continues, will check timeout
    end
  end
end

Mismatching buffer sizes between sender and receiver reduces efficiency. When the send buffer significantly exceeds the receive buffer, the sender buffers excess data locally without benefit. When the receive buffer is larger than necessary, memory is wasted. Applications should coordinate buffer sizes based on bandwidth-delay product and processing capabilities.

# Configure balanced buffer sizes
def configure_connection(socket, target_throughput_mbps, rtt_ms)
  optimal_size = optimal_window_size(target_throughput_mbps, rtt_ms)
  
  # Set buffers to optimal size
  socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_RCVBUF, optimal_size)
  socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_SNDBUF, optimal_size)
  
  # Verify actual sizes (OS may adjust)
  actual_rcv = socket.getsockopt(Socket::SOL_SOCKET, Socket::SO_RCVBUF).int
  actual_snd = socket.getsockopt(Socket::SOL_SOCKET, Socket::SO_SNDBUF).int
  
  puts "Requested: #{optimal_size} bytes"
  puts "Actual RCV: #{actual_rcv} bytes"
  puts "Actual SND: #{actual_snd} bytes"
end

socket = TCPSocket.new('example.com', 80)
configure_connection(socket, 100, 50)

Not handling zero window conditions properly causes stalls. When the receiver's buffer fills completely, it advertises a zero window. The sender must continue sending zero window probes but should not overwhelm the receiver. Applications that aggressively retry writes during zero window periods waste CPU and network resources.

class ZeroWindowAwareWriter
  def initialize(socket)
    @socket = socket
    @zero_window_count = 0
    @backoff_ms = 10
  end
  
  def write(data)
    offset = 0
    
    while offset < data.size
      begin
        sent = @socket.write_nonblock(data[offset..-1])
        offset += sent
        
        # Successful write, reset backoff
        @zero_window_count = 0
        @backoff_ms = 10
        
      rescue IO::WaitWritable
        @zero_window_count += 1
        
        # Exponential backoff during zero window
        sleep(@backoff_ms / 1000.0)
        @backoff_ms = [@backoff_ms * 2, 1000].min  # Cap at 1 second
        
        # Check if connection is still alive
        if @zero_window_count > 100
          raise "Connection stalled - zero window for #{@zero_window_count} attempts"
        end
        
        IO.select(nil, [@socket])
      end
    end
  end
end

Reference

Flow Control Components

Component Description Function
Receive Window Bytes available in receiver buffer Advertised to sender in TCP header
Send Window Bytes sender can transmit Limited by receive window and congestion window
Window Scale Multiplier for window size Negotiated during connection setup
Zero Window Probe Segment checking for window updates Prevents deadlock when window is zero
Sequence Number Position in byte stream Tracks data transmission progress
Acknowledgment Number Next expected byte Confirms received data

Socket Buffer Options

Option Level Description Typical Values
SO_RCVBUF SOL_SOCKET Receive buffer size in bytes 65536-262144
SO_SNDBUF SOL_SOCKET Send buffer size in bytes 16384-131072
SO_KEEPALIVE SOL_SOCKET Enable connection keepalive 0 or 1
SO_SNDTIMEO SOL_SOCKET Send timeout in seconds 0 (infinite) or 10-300
TCP_NODELAY IPPROTO_TCP Disable Nagle algorithm 0 or 1
TCP_KEEPIDLE IPPROTO_TCP Seconds before keepalive probes 60-7200

Window Calculation Formulas

Formula Purpose Variables
receive_window = buffer_size - unprocessed_data Available receive window buffer_size: total buffer, unprocessed_data: bytes awaiting application
bandwidth_delay_product = bandwidth * RTT Optimal window size bandwidth: bytes/sec, RTT: round-trip time
effective_window = min(receive_window, congestion_window) Sender's usable window receive_window: advertised, congestion_window: from congestion control
scaled_window = advertised_window * 2^scale_factor Actual window with scaling scale_factor: 0-14 from negotiation

Flow Control States

State Condition Sender Behavior Application Action
Normal Operation Window size > 0 Transmits up to window Continue sending data
Window Closing Window size decreasing Reduces transmission rate Monitor buffer consumption
Zero Window Window size = 0 Sends only probes Buffer data, check connection health
Window Update ACK with larger window Resumes transmission Flush pending data
Persist Timer Active Probing zero window Periodic probe segments Implement timeout and retry logic

Common Buffer Size Recommendations

Scenario Send Buffer Receive Buffer Notes
Local network 64KB 64KB Low latency, default sizes sufficient
Internet connection 128-256KB 128-256KB Accounts for typical latency
High bandwidth 512KB-1MB 512KB-1MB Calculated from bandwidth-delay product
Slow receiver 64KB 256KB+ Larger receive buffer for burst handling
Many connections 32-64KB 32-64KB Memory conservation with many sockets
Bulk transfer 256KB-1MB 256KB-1MB Maximizes throughput for large files

Error Conditions

Error Cause Detection Recovery
EAGAIN / EWOULDBLOCK Send buffer full write_nonblock raises exception Wait for writability with IO.select
ETIMEDOUT Write timeout exceeded write raises Errno::ETIMEDOUT Close connection or retry after backoff
EPIPE Connection closed by receiver write raises Errno::EPIPE Handle gracefully, log error
ECONNRESET Connection reset Any I/O operation fails Clean up resources, reconnect if appropriate

Ruby Methods for Flow Control

Method Purpose Example
write_nonblock Non-blocking write socket.write_nonblock(data)
read_nonblock Non-blocking read socket.read_nonblock(8192)
setsockopt Configure socket option socket.setsockopt(Socket::SOL_SOCKET, Socket::SO_RCVBUF, 131072)
getsockopt Query socket option socket.getsockopt(Socket::SOL_SOCKET, Socket::SO_RCVBUF).int
IO.select Wait for I/O readiness IO.select(read_list, write_list, nil, timeout)
fcntl Set non-blocking mode socket.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK)