CrackedRuby CrackedRuby

gRPC and Protocol Buffers

Overview

gRPC is a high-performance remote procedure call framework that enables client applications to directly call methods on server applications across different machines as if they were local objects. Developed by Google and released as open source in 2015, gRPC uses HTTP/2 for transport, Protocol Buffers as the interface definition language, and provides features including authentication, bidirectional streaming, flow control, and blocking or nonblocking bindings.

Protocol Buffers (protobuf) is a language-neutral, platform-neutral extensible mechanism for serializing structured data. Unlike JSON or XML, Protocol Buffers encodes data in a compact binary format that is smaller and faster to parse. The schema is defined in .proto files, which describe the structure of data and services. A compiler then generates source code for multiple programming languages from these definitions.

The combination of gRPC and Protocol Buffers addresses several challenges in distributed systems: type-safe communication contracts, efficient network utilization, automatic code generation for multiple languages, and support for both synchronous and asynchronous communication patterns. These technologies find extensive use in microservices architectures, internal APIs, and systems requiring high throughput and low latency.

A basic Protocol Buffers definition:

syntax = "proto3";

message User {
  int32 id = 1;
  string name = 2;
  string email = 3;
}

A gRPC service definition:

syntax = "proto3";

service UserService {
  rpc GetUser(GetUserRequest) returns (User);
  rpc ListUsers(ListUsersRequest) returns (stream User);
}

message GetUserRequest {
  int32 id = 1;
}

message User {
  int32 id = 1;
  string name = 2;
  string email = 3;
}

Key Principles

Protocol Buffers operates on a schema-first approach where data structures are defined in .proto files using a simple interface definition language. Each field in a message has a unique numeric tag that identifies it in the binary encoding. The tag numbers 1-15 require one byte to encode, while higher numbers require more bytes, making lower numbers preferable for frequently used fields. The protocol supports three syntaxes: proto2, proto3, and editions, with proto3 being the current standard that simplifies the language and removes some proto2 features.

Field types in Protocol Buffers include scalar types (int32, int64, uint32, uint64, sint32, sint64, fixed32, fixed64, sfixed32, sfixed64, float, double, bool, string, bytes), enumerations, and nested messages. The proto3 syntax eliminates required fields, making all fields optional by default. Repeated fields create arrays or lists, while map fields provide key-value associations. The oneof keyword groups fields where only one can be set at a time, similar to a union type.

message Product {
  int32 id = 1;
  string name = 2;
  double price = 3;
  repeated string tags = 4;
  map<string, string> metadata = 5;
  
  oneof discount {
    double percentage = 6;
    double fixed_amount = 7;
  }
}

The binary encoding format uses variable-length integers (varints) to encode numeric values efficiently. Small numbers use fewer bytes, making the format space-efficient for typical data. Each field in the binary format is a key-value pair where the key is the field number and wire type encoded together. The wire type indicates how to parse the value: varint, 64-bit, length-delimited, or 32-bit.

gRPC builds on Protocol Buffers by defining services with RPC methods. Each method specifies request and response message types. gRPC supports four communication patterns: unary (single request, single response), server streaming (single request, stream of responses), client streaming (stream of requests, single response), and bidirectional streaming (streams in both directions). The framework handles serialization, network transport, and deserialization automatically.

service OrderService {
  // Unary RPC
  rpc CreateOrder(Order) returns (OrderConfirmation);
  
  // Server streaming
  rpc TrackOrder(OrderId) returns (stream OrderStatus);
  
  // Client streaming
  rpc BatchCreateOrders(stream Order) returns (BatchResult);
  
  // Bidirectional streaming
  rpc ProcessOrders(stream Order) returns (stream OrderResult);
}

HTTP/2 serves as the transport protocol, providing multiplexing (multiple requests over a single connection), header compression, server push capabilities, and binary framing. The multiplexing feature allows multiple RPC calls to share a connection without head-of-line blocking. Flow control operates at both connection and stream levels, preventing fast senders from overwhelming slow receivers.

Metadata in gRPC passes additional information with requests and responses, such as authentication tokens, request IDs, or tracing information. Metadata appears as HTTP/2 headers and can be sent at the beginning of calls or as trailing metadata after responses. Deadlines specify how long a client waits for an RPC to complete, propagating through service call chains to prevent resource exhaustion. Status codes indicate success or various error conditions, following a standard set defined by gRPC.

Ruby Implementation

Ruby implements gRPC through the grpc gem, which provides both client and server capabilities. The google-protobuf gem handles Protocol Buffers compilation and runtime support. Installation requires both gems:

# Gemfile
gem 'grpc'
gem 'google-protobuf'
gem 'grpc-tools'  # For protoc compiler

Protocol Buffer definitions compile to Ruby code using the grpc_tools_ruby_protoc compiler. The compiler generates two files from each .proto file: one containing message classes (_pb.rb) and one containing service stubs (_services_pb.rb). The message classes provide constructors, field accessors, and serialization methods.

# Generated from user.proto
# user_pb.rb
class User
  include Google::Protobuf::MessageExts
  extend Google::Protobuf::MessageExts::ClassMethods
end

# user_services_pb.rb
module UserService
  class Service
    include GRPC::GenericService
    self.marshal_class_method = :encode
    self.unmarshal_class_method = :decode
    self.service_name = 'userservice.UserService'
    
    rpc :GetUser, GetUserRequest, User
    rpc :ListUsers, ListUsersRequest, stream(User)
  end
  
  Stub = Service.rpc_stub_class
end

Creating messages uses keyword arguments matching the proto field names. The generated classes handle type conversion and validation:

user = User.new(
  id: 123,
  name: "Alice Smith",
  email: "alice@example.com"
)

# Access fields
puts user.name  # => "Alice Smith"

# Modify fields
user.email = "alice.smith@example.com"

# Serialize to binary
binary_data = User.encode(user)

# Deserialize from binary
decoded_user = User.decode(binary_data)

Repeated fields in Ruby generate array accessors. Map fields generate hash accessors. The oneof keyword creates methods to check which field is set:

product = Product.new(
  id: 1,
  name: "Widget",
  price: 29.99,
  tags: ["electronics", "gadget"],
  metadata: { "manufacturer" => "ACME", "warranty" => "1 year" }
)

# Add to repeated field
product.tags << "new-arrival"

# Access map
puts product.metadata["manufacturer"]  # => "ACME"

# Set oneof field
product.percentage = 15.0

# Check which oneof field is set
puts product.discount  # => :percentage

Implementing a gRPC server requires inheriting from the generated service class and implementing the RPC methods. The server runs on a specified port and handles concurrent requests:

require 'grpc'
require_relative 'user_services_pb'

class UserServiceImpl < UserService::Service
  def get_user(request, _call)
    # request is a GetUserRequest instance
    user_id = request.id
    
    # Database lookup (simplified)
    user_data = find_user_by_id(user_id)
    
    if user_data
      User.new(
        id: user_data[:id],
        name: user_data[:name],
        email: user_data[:email]
      )
    else
      raise GRPC::NotFound.new("User #{user_id} not found")
    end
  end
  
  def list_users(request, _call)
    # Return an Enumerator for streaming
    Enumerator.new do |yielder|
      users = fetch_all_users
      users.each do |user_data|
        yielder << User.new(
          id: user_data[:id],
          name: user_data[:name],
          email: user_data[:email]
        )
      end
    end
  end
  
  private
  
  def find_user_by_id(id)
    # Database query implementation
  end
  
  def fetch_all_users
    # Database query implementation
  end
end

# Start the server
def main
  server = GRPC::RpcServer.new
  server.add_http2_port('0.0.0.0:50051', :this_port_is_insecure)
  server.handle(UserServiceImpl.new)
  
  puts "Server listening on 0.0.0.0:50051"
  server.run_till_terminated_or_interrupted([1, 'int', 'SIGTERM'])
end

main

Creating a client uses the generated stub class. The stub provides methods corresponding to each RPC. Clients can pass metadata, set deadlines, and handle errors:

require 'grpc'
require_relative 'user_services_pb'

def create_client
  stub = UserService::Stub.new(
    'localhost:50051',
    :this_channel_is_insecure
  )
end

def get_user_example
  stub = create_client
  
  request = GetUserRequest.new(id: 123)
  
  begin
    response = stub.get_user(request, deadline: Time.now + 5)
    puts "User: #{response.name} (#{response.email})"
  rescue GRPC::NotFound => e
    puts "User not found: #{e.message}"
  rescue GRPC::DeadlineExceeded => e
    puts "Request timed out: #{e.message}"
  rescue GRPC::Unavailable => e
    puts "Service unavailable: #{e.message}"
  end
end

def list_users_example
  stub = create_client
  
  request = ListUsersRequest.new
  
  begin
    stream = stub.list_users(request)
    stream.each do |user|
      puts "User: #{user.name}"
    end
  rescue GRPC::BadStatus => e
    puts "Error: #{e.message}"
  end
end

Client streaming sends multiple requests and receives a single response. Bidirectional streaming allows concurrent sending and receiving:

def batch_create_orders(orders)
  stub = OrderService::Stub.new('localhost:50051', :this_channel_is_insecure)
  
  # Create an enumerable for requests
  requests = orders.map { |order_data| Order.new(order_data) }
  
  begin
    result = stub.batch_create_orders(requests)
    puts "Created #{result.count} orders"
  rescue GRPC::BadStatus => e
    puts "Batch creation failed: #{e.message}"
  end
end

def process_orders_bidirectional
  stub = OrderService::Stub.new('localhost:50051', :this_channel_is_insecure)
  
  # Create request enumerator
  requests = Enumerator.new do |yielder|
    10.times do |i|
      yielder << Order.new(id: i, amount: 100.0 * (i + 1))
      sleep 0.1  # Simulate processing time
    end
  end
  
  # Call bidirectional streaming method
  stream = stub.process_orders(requests)
  
  # Process responses as they arrive
  stream.each do |result|
    puts "Order #{result.order_id}: #{result.status}"
  end
end

Interceptors in Ruby allow middleware-like functionality for cross-cutting concerns. Server interceptors process requests before they reach service methods. Client interceptors modify requests before sending or responses after receiving:

class LoggingInterceptor < GRPC::ServerInterceptor
  def request_response(request:, call:, method:)
    start_time = Time.now
    puts "Request: #{method} - #{request.inspect}"
    
    begin
      response = yield
      duration = Time.now - start_time
      puts "Response: #{method} - Success (#{duration}s)"
      response
    rescue => e
      duration = Time.now - start_time
      puts "Response: #{method} - Error: #{e.message} (#{duration}s)"
      raise
    end
  end
end

# Add interceptor to server
server = GRPC::RpcServer.new(interceptors: [LoggingInterceptor.new])

Practical Examples

Consider a real-time order processing system where clients submit orders and receive status updates. The service definition includes unary, server streaming, and bidirectional streaming:

syntax = "proto3";

package orderprocessing;

service OrderProcessor {
  rpc SubmitOrder(Order) returns (OrderConfirmation);
  rpc TrackOrderStatus(OrderId) returns (stream StatusUpdate);
  rpc BatchProcessOrders(stream Order) returns (stream OrderResult);
}

message Order {
  string customer_id = 1;
  repeated OrderItem items = 2;
  string delivery_address = 3;
  PaymentInfo payment = 4;
}

message OrderItem {
  string product_id = 1;
  int32 quantity = 2;
  double price = 3;
}

message PaymentInfo {
  string method = 1;
  string token = 2;
}

message OrderConfirmation {
  string order_id = 1;
  double total_amount = 2;
  string estimated_delivery = 3;
}

message OrderId {
  string id = 1;
}

message StatusUpdate {
  string order_id = 1;
  string status = 2;
  string timestamp = 3;
  string location = 4;
}

message OrderResult {
  string order_id = 1;
  bool success = 2;
  string error_message = 3;
}

Server implementation handles order validation, inventory checking, and status tracking:

require 'grpc'
require_relative 'order_processing_services_pb'

class OrderProcessorImpl < OrderProcessor::Service
  def initialize
    @orders = {}
    @status_subscribers = {}
  end
  
  def submit_order(order, _call)
    # Validate order
    raise GRPC::InvalidArgument.new("No items in order") if order.items.empty?
    raise GRPC::InvalidArgument.new("No delivery address") if order.delivery_address.empty?
    
    # Calculate total
    total = order.items.sum { |item| item.price * item.quantity }
    
    # Generate order ID
    order_id = SecureRandom.uuid
    
    # Store order
    @orders[order_id] = {
      customer_id: order.customer_id,
      items: order.items,
      total: total,
      status: 'pending',
      created_at: Time.now
    }
    
    # Start background processing
    Thread.new { process_order(order_id) }
    
    # Return confirmation
    OrderConfirmation.new(
      order_id: order_id,
      total_amount: total,
      estimated_delivery: (Time.now + 86400 * 3).strftime('%Y-%m-%d')
    )
  end
  
  def track_order_status(order_id_msg, _call)
    order_id = order_id_msg.id
    
    raise GRPC::NotFound.new("Order not found") unless @orders.key?(order_id)
    
    Enumerator.new do |yielder|
      # Send initial status
      order = @orders[order_id]
      yielder << StatusUpdate.new(
        order_id: order_id,
        status: order[:status],
        timestamp: order[:created_at].iso8601,
        location: "Warehouse"
      )
      
      # Subscribe to future updates
      queue = Queue.new
      @status_subscribers[order_id] ||= []
      @status_subscribers[order_id] << queue
      
      # Stream updates as they arrive
      loop do
        update = queue.pop
        break if update == :done
        yielder << update
      end
    end
  end
  
  def batch_process_orders(requests, _call)
    Enumerator.new do |yielder|
      requests.each do |order|
        begin
          confirmation = submit_order(order, nil)
          yielder << OrderResult.new(
            order_id: confirmation.order_id,
            success: true
          )
        rescue GRPC::BadStatus => e
          yielder << OrderResult.new(
            order_id: '',
            success: false,
            error_message: e.message
          )
        end
      end
    end
  end
  
  private
  
  def process_order(order_id)
    statuses = ['processing', 'shipped', 'out_for_delivery', 'delivered']
    locations = ['Warehouse', 'Distribution Center', 'Local Facility', 'Delivered']
    
    statuses.each_with_index do |status, index|
      sleep 5  # Simulate processing time
      
      @orders[order_id][:status] = status
      
      update = StatusUpdate.new(
        order_id: order_id,
        status: status,
        timestamp: Time.now.iso8601,
        location: locations[index]
      )
      
      # Notify subscribers
      @status_subscribers[order_id]&.each { |queue| queue << update }
    end
    
    # Signal completion
    @status_subscribers[order_id]&.each { |queue| queue << :done }
    @status_subscribers.delete(order_id)
  end
end

Client implementation submits orders and tracks their status in real time:

require 'grpc'
require_relative 'order_processing_services_pb'

class OrderClient
  def initialize(address)
    @stub = OrderProcessor::Stub.new(address, :this_channel_is_insecure)
  end
  
  def submit_and_track_order
    # Create order
    order = Order.new(
      customer_id: 'CUST123',
      delivery_address: '123 Main St, City, State 12345',
      items: [
        OrderItem.new(product_id: 'PROD1', quantity: 2, price: 29.99),
        OrderItem.new(product_id: 'PROD2', quantity: 1, price: 49.99)
      ],
      payment: PaymentInfo.new(method: 'credit_card', token: 'tok_12345')
    )
    
    # Submit order
    begin
      confirmation = @stub.submit_order(order, deadline: Time.now + 10)
      puts "Order submitted: #{confirmation.order_id}"
      puts "Total: $#{confirmation.total_amount}"
      puts "Estimated delivery: #{confirmation.estimated_delivery}"
      
      # Track status
      track_order(confirmation.order_id)
    rescue GRPC::DeadlineExceeded
      puts "Order submission timed out"
    rescue GRPC::InvalidArgument => e
      puts "Invalid order: #{e.message}"
    end
  end
  
  def track_order(order_id)
    request = OrderId.new(id: order_id)
    
    begin
      stream = @stub.track_order_status(request)
      puts "\nTracking order #{order_id}:"
      
      stream.each do |update|
        puts "  [#{update.timestamp}] #{update.status} - #{update.location}"
      end
      
      puts "Tracking completed"
    rescue GRPC::NotFound
      puts "Order not found"
    rescue GRPC::Cancelled
      puts "Tracking cancelled"
    end
  end
  
  def batch_submit_orders(orders_data)
    requests = orders_data.map { |data| create_order(data) }
    
    puts "Submitting #{requests.size} orders..."
    
    stream = @stub.batch_process_orders(requests.to_enum)
    results = { success: 0, failed: 0 }
    
    stream.each do |result|
      if result.success
        results[:success] += 1
        puts "  Order #{result.order_id}: Success"
      else
        results[:failed] += 1
        puts "  Order failed: #{result.error_message}"
      end
    end
    
    puts "\nBatch complete: #{results[:success]} succeeded, #{results[:failed]} failed"
  end
  
  private
  
  def create_order(data)
    Order.new(
      customer_id: data[:customer_id],
      delivery_address: data[:address],
      items: data[:items].map do |item|
        OrderItem.new(
          product_id: item[:product_id],
          quantity: item[:quantity],
          price: item[:price]
        )
      end,
      payment: PaymentInfo.new(
        method: data[:payment_method],
        token: data[:payment_token]
      )
    )
  end
end

# Usage
client = OrderClient.new('localhost:50051')
client.submit_and_track_order

Another example demonstrates a notification service with metadata for authentication and distributed tracing:

class AuthInterceptor < GRPC::ClientInterceptor
  def initialize(auth_token)
    @auth_token = auth_token
  end
  
  def request_response(request:, call:, method:, metadata:)
    metadata['authorization'] = "Bearer #{@auth_token}"
    metadata['x-request-id'] = SecureRandom.uuid
    metadata['x-client-version'] = '1.0.0'
    
    yield
  end
end

class AuthenticatedClient
  def initialize(address, auth_token)
    interceptor = AuthInterceptor.new(auth_token)
    @stub = NotificationService::Stub.new(
      address,
      :this_channel_is_insecure,
      interceptors: [interceptor]
    )
  end
  
  def send_notification(user_id, message)
    request = Notification.new(
      user_id: user_id,
      message: message,
      timestamp: Time.now.to_i
    )
    
    @stub.send_notification(request)
  end
end

# Server-side metadata handling
class NotificationServiceImpl < NotificationService::Service
  def send_notification(request, call)
    # Extract metadata
    metadata = call.metadata
    auth_header = metadata['authorization']
    request_id = metadata['x-request-id']
    
    # Validate authentication
    unless valid_token?(auth_header)
      raise GRPC::Unauthenticated.new('Invalid authentication token')
    end
    
    # Process with tracing context
    puts "Processing request #{request_id}"
    
    # Implementation
    NotificationResponse.new(success: true)
  end
  
  private
  
  def valid_token?(header)
    return false unless header&.start_with?('Bearer ')
    token = header.split(' ', 2)[1]
    # Token validation logic
    !token.nil? && token.length > 0
  end
end

Performance Considerations

Protocol Buffers binary encoding provides significant performance advantages over text-based formats. JSON encoding of a typical message requires 80-100 bytes, while the equivalent Protocol Buffers message uses 30-40 bytes. The compact representation reduces network bandwidth and serialization overhead. The binary format also parses faster because it avoids string parsing and conversion.

Field numbering affects encoding size. Fields numbered 1-15 require one byte for the tag, while fields 16-2047 require two bytes. Frequently accessed fields should use lower numbers. The varint encoding for integers stores small numbers efficiently: values 0-127 use one byte, 128-16,383 use two bytes, and so on. Fixed-size types (fixed32, fixed64) use constant space but may be larger for small values.

# Inefficient field ordering
message UserProfile {
  string bio = 1;              // Large field with low number
  repeated string hobbies = 2; // Large repeated field
  int32 id = 16;              // Small, frequently accessed field with high number
  string name = 17;           // Frequently accessed with high number
}

# Optimized field ordering
message UserProfile {
  int32 id = 1;               // Small, frequent field gets low number
  string name = 2;            // Frequent field gets low number
  string bio = 3;
  repeated string hobbies = 4;
}

HTTP/2 multiplexing allows multiple concurrent RPC calls over a single TCP connection. Connection pooling becomes less critical compared to HTTP/1.1, but maintaining a reasonable pool size (5-10 connections) still provides benefits for very high throughput scenarios. Each connection handles multiple streams, and the HTTP/2 flow control prevents any single stream from monopolizing bandwidth.

# Connection pool implementation
class ConnectionPool
  def initialize(address, pool_size: 5)
    @address = address
    @pool = Array.new(pool_size) do
      UserService::Stub.new(address, :this_channel_is_insecure)
    end
    @index = Atomic.new(0)
  end
  
  def with_client
    client = @pool[@index.value % @pool.size]
    @index.increment
    yield client
  end
end

# Usage
pool = ConnectionPool.new('localhost:50051', pool_size: 8)

threads = 100.times.map do
  Thread.new do
    pool.with_client do |stub|
      request = GetUserRequest.new(id: rand(1000))
      stub.get_user(request)
    end
  end
end

threads.each(&:join)

Streaming RPCs reduce latency for scenarios requiring continuous data flow. Server streaming sends results as they become available rather than buffering all responses. This improves time-to-first-byte and reduces memory usage for large result sets. Client streaming reduces round trips by sending multiple requests in a single call. Bidirectional streaming enables full-duplex communication where both sides send and receive concurrently.

Message size affects performance. Very large messages (megabytes) increase serialization time and memory usage. Splitting large payloads into smaller chunks and using streaming improves throughput. The default message size limit is 4MB, configurable through server and client options. Exceeding limits results in RESOURCE_EXHAUSTED errors.

# Chunked file transfer using streaming
def upload_large_file(stub, file_path, chunk_size: 1024 * 1024)
  chunks = Enumerator.new do |yielder|
    File.open(file_path, 'rb') do |file|
      while chunk = file.read(chunk_size)
        yielder << FileChunk.new(
          data: chunk,
          offset: file.pos - chunk.size
        )
      end
    end
  end
  
  stub.upload_file(chunks)
end

# Configure larger message size
server = GRPC::RpcServer.new(
  max_send_message_size: 10 * 1024 * 1024,
  max_receive_message_size: 10 * 1024 * 1024
)

Keep-alive settings maintain connections during idle periods. Without keep-alive, intermediate proxies may close idle connections. The keep-alive interval should balance connection maintenance against unnecessary traffic. Typical values range from 30 seconds to 5 minutes depending on infrastructure.

# Client with keep-alive configuration
stub = UserService::Stub.new(
  'localhost:50051',
  :this_channel_is_insecure,
  channel_args: {
    'grpc.keepalive_time_ms' => 30000,
    'grpc.keepalive_timeout_ms' => 10000,
    'grpc.keepalive_permit_without_calls' => 1,
    'grpc.http2.max_pings_without_data' => 0
  }
)

Compression reduces network transfer size at the cost of CPU usage. gRPC supports gzip compression, enabled per-call or globally. Compression benefits vary by data characteristics: structured data with repeated values compresses well, while binary data may not. Measure the trade-off between bandwidth savings and CPU overhead.

# Enable compression for a call
response = stub.get_user(
  request,
  metadata: { 'grpc-internal-encoding-request' => 'gzip' }
)

# Server-side compression
server = GRPC::RpcServer.new(
  default_compression_algorithm: :gzip,
  default_compression_level: :high
)

Thread pool sizing affects concurrency. The Ruby gRPC server uses a thread pool to handle requests. Pool size should match expected concurrent request load. Too few threads cause queueing delays. Too many threads increase context switching overhead and memory usage. A starting point is 2-4 threads per CPU core, adjusted based on workload characteristics.

# Configure server thread pool
server = GRPC::RpcServer.new(
  pool_size: 32,
  max_waiting_requests: 100
)

Security Implications

Transport security in gRPC uses TLS for encryption and authentication. Production deployments should always use TLS to protect data in transit and verify server identity. Self-signed certificates work for development, but production systems require certificates from trusted certificate authorities.

# Server with TLS
require 'grpc'

def load_certificates
  root_cert = File.read('ca.crt')
  private_key = File.read('server.key')
  cert_chain = File.read('server.crt')
  
  [private_key, cert_chain, root_cert]
end

def create_secure_server
  server = GRPC::RpcServer.new
  
  private_key, cert_chain, root_cert = load_certificates
  
  creds = GRPC::Core::ServerCredentials.new(
    root_cert,
    [{ private_key: private_key, cert_chain: cert_chain }],
    true  # Require client certificates
  )
  
  server.add_http2_port('0.0.0.0:50051', creds)
  server.handle(UserServiceImpl.new)
  
  server
end

# Client with TLS
def create_secure_client
  root_cert = File.read('ca.crt')
  client_key = File.read('client.key')
  client_cert = File.read('client.crt')
  
  call_creds = GRPC::Core::CallCredentials.new(proc do |context|
    # Add authentication metadata
    { 'authorization' => "Bearer #{get_token}" }
  end)
  
  channel_creds = GRPC::Core::ChannelCredentials.new(
    root_cert,
    client_key,
    client_cert
  )
  
  combined_creds = channel_creds.compose(call_creds)
  
  UserService::Stub.new('localhost:50051', combined_creds)
end

Authentication mechanisms include TLS client certificates, token-based authentication, and OAuth2. Token-based authentication passes credentials in metadata, typically using bearer tokens. Interceptors centralize authentication logic and token refresh.

class AuthenticationInterceptor < GRPC::ServerInterceptor
  def request_response(request:, call:, method:)
    metadata = call.metadata
    auth_header = metadata['authorization']
    
    unless auth_header&.start_with?('Bearer ')
      raise GRPC::Unauthenticated.new('Missing authentication token')
    end
    
    token = auth_header.split(' ', 2)[1]
    
    begin
      claims = verify_token(token)
      call.output_metadata['user-id'] = claims['user_id']
      call.output_metadata['roles'] = claims['roles'].join(',')
    rescue => e
      raise GRPC::Unauthenticated.new("Invalid token: #{e.message}")
    end
    
    yield
  end
  
  private
  
  def verify_token(token)
    # JWT verification, OAuth validation, etc.
    JWT.decode(token, public_key, true, algorithm: 'RS256')[0]
  end
end

# Add to server
server = GRPC::RpcServer.new(
  interceptors: [AuthenticationInterceptor.new]
)

Authorization determines what authenticated users can access. Service methods check permissions before processing requests. Role-based access control (RBAC) maps users to roles and roles to permissions.

class AuthorizationInterceptor < GRPC::ServerInterceptor
  PERMISSIONS = {
    '/userservice.UserService/GetUser' => ['user', 'admin'],
    '/userservice.UserService/DeleteUser' => ['admin'],
    '/userservice.UserService/ListUsers' => ['admin']
  }
  
  def request_response(request:, call:, method:)
    required_roles = PERMISSIONS[method]
    
    if required_roles
      user_roles = call.metadata['roles']&.split(',') || []
      
      unless (required_roles & user_roles).any?
        raise GRPC::PermissionDenied.new(
          "Insufficient permissions for #{method}"
        )
      end
    end
    
    yield
  end
end

Input validation prevents injection attacks and ensures data integrity. Protocol Buffers type system provides basic validation, but application-level checks remain necessary. Validate string lengths, numeric ranges, and business logic constraints. Reject malformed data early to prevent resource exhaustion.

def create_user(request, _call)
  # Validate required fields
  raise GRPC::InvalidArgument.new('Name required') if request.name.empty?
  raise GRPC::InvalidArgument.new('Email required') if request.email.empty?
  
  # Validate formats
  unless request.email.match?(/\A[\w+\-.]+@[a-z\d\-]+(\.[a-z\d\-]+)*\.[a-z]+\z/i)
    raise GRPC::InvalidArgument.new('Invalid email format')
  end
  
  # Validate lengths
  if request.name.length > 100
    raise GRPC::InvalidArgument.new('Name too long (max 100 characters)')
  end
  
  # Validate ranges
  if request.age && (request.age < 0 || request.age > 150)
    raise GRPC::InvalidArgument.new('Invalid age')
  end
  
  # Business logic validation
  if User.exists?(email: request.email)
    raise GRPC::AlreadyExists.new('Email already registered')
  end
  
  # Proceed with creation
end

Rate limiting prevents abuse and resource exhaustion. Implement rate limiting at the interceptor level, tracking requests per client or per user. Use token bucket or sliding window algorithms.

class RateLimitInterceptor < GRPC::ServerInterceptor
  def initialize(limit: 100, window: 60)
    @limit = limit
    @window = window
    @requests = Hash.new { |h, k| h[k] = [] }
    @mutex = Mutex.new
  end
  
  def request_response(request:, call:, method:)
    client_id = call.metadata['client-id'] || call.peer
    
    @mutex.synchronize do
      now = Time.now
      @requests[client_id].reject! { |time| now - time > @window }
      
      if @requests[client_id].size >= @limit
        raise GRPC::ResourceExhausted.new(
          "Rate limit exceeded: #{@limit} requests per #{@window} seconds"
        )
      end
      
      @requests[client_id] << now
    end
    
    yield
  end
end

Secrets management requires secure storage and transmission of sensitive data. Never hardcode credentials in source code. Use environment variables, secret management services (HashiCorp Vault, AWS Secrets Manager), or secure configuration files with restricted permissions.

# Load credentials from environment or secret manager
def get_database_credentials
  {
    host: ENV['DB_HOST'],
    username: ENV['DB_USERNAME'],
    password: fetch_secret('db-password'),
    database: ENV['DB_NAME']
  }
end

def fetch_secret(key)
  # Integration with secret management service
  secrets_manager = Aws::SecretsManager::Client.new
  response = secrets_manager.get_secret_value(secret_id: key)
  JSON.parse(response.secret_string)['password']
end

Tools & Ecosystem

The Protocol Buffers compiler (protoc) generates code from .proto files. Install through package managers or download from GitHub releases. The compiler requires language plugins for code generation. Ruby uses the grpc-tools gem which includes the necessary plugins.

# Install protoc and Ruby plugin
gem install grpc-tools

# Generate Ruby code
grpc_tools_ruby_protoc \
  --ruby_out=./lib \
  --grpc_out=./lib \
  --proto_path=./proto \
  ./proto/user.proto

# Generate with custom options
grpc_tools_ruby_protoc \
  -I ./proto \
  -I ./third_party/proto \
  --ruby_out=./lib \
  --grpc_out=./lib \
  ./proto/*.proto

Protocol Buffers supports importing other proto files for code reuse. Common message types and service definitions can be shared across services. The proto_path flag specifies import directories.

// common/types.proto
syntax = "proto3";
package common;

message Address {
  string street = 1;
  string city = 2;
  string state = 3;
  string zip_code = 4;
}

// user/user.proto
syntax = "proto3";
package user;

import "common/types.proto";

message User {
  string id = 1;
  string name = 2;
  common.Address address = 3;
}

Well-known types provide standard messages for common scenarios. The google/protobuf package includes types like Timestamp, Duration, Any, and Struct. These types ensure consistency across services and languages.

syntax = "proto3";

import "google/protobuf/timestamp.proto";
import "google/protobuf/duration.proto";
import "google/protobuf/struct.proto";

message Event {
  string id = 1;
  google.protobuf.Timestamp created_at = 2;
  google.protobuf.Duration duration = 3;
  google.protobuf.Struct metadata = 4;
}
require 'google/protobuf/well_known_types'

event = Event.new(
  id: 'evt_123',
  created_at: Google::Protobuf::Timestamp.new(seconds: Time.now.to_i),
  duration: Google::Protobuf::Duration.new(seconds: 3600),
  metadata: Google::Protobuf::Struct.from_hash({
    'priority' => 'high',
    'tags' => ['important', 'urgent']
  })
)

Reflection allows services to be introspected at runtime. The gRPC reflection protocol enables tools like grpcurl and grpcui to interact with services without compiled client code. Enable reflection on development servers for testing and debugging.

require 'grpc/reflection'

server = GRPC::RpcServer.new
server.add_http2_port('0.0.0.0:50051', :this_port_is_insecure)
server.handle(UserServiceImpl.new)

# Enable reflection
reflection_service = GRPC::Reflection::ReflectionService.new([
  UserService::Service
])
server.handle(reflection_service)

server.run_till_terminated

Command-line tools aid development and debugging. grpcurl makes gRPC requests from the terminal, similar to curl for HTTP. grpcui provides a web interface for exploring and testing services.

# List services using reflection
grpcurl -plaintext localhost:50051 list

# List methods for a service
grpcurl -plaintext localhost:50051 list userservice.UserService

# Call a method
grpcurl -plaintext \
  -d '{"id": 123}' \
  localhost:50051 \
  userservice.UserService/GetUser

# Use grpcui for web interface
grpcui -plaintext localhost:50051

Logging and monitoring track service health and performance. Structured logging with request IDs enables tracing requests across service boundaries. Metrics collection tracks request rates, latencies, and error rates. Distributed tracing systems like Jaeger or Zipkin provide end-to-end visibility.

require 'logger'

class LoggingInterceptor < GRPC::ServerInterceptor
  def initialize
    @logger = Logger.new(STDOUT)
    @logger.formatter = proc do |severity, datetime, progname, msg|
      JSON.generate({
        timestamp: datetime.iso8601,
        severity: severity,
        message: msg
      }) + "\n"
    end
  end
  
  def request_response(request:, call:, method:)
    request_id = call.metadata['x-request-id'] || SecureRandom.uuid
    start_time = Time.now
    
    @logger.info({
      event: 'request_start',
      request_id: request_id,
      method: method,
      peer: call.peer
    })
    
    begin
      response = yield
      duration = Time.now - start_time
      
      @logger.info({
        event: 'request_complete',
        request_id: request_id,
        method: method,
        duration: duration,
        status: 'success'
      })
      
      response
    rescue => e
      duration = Time.now - start_time
      
      @logger.error({
        event: 'request_error',
        request_id: request_id,
        method: method,
        duration: duration,
        error: e.class.name,
        message: e.message
      })
      
      raise
    end
  end
end

Health checking services respond to health check requests, used by load balancers and orchestration systems. The standard health check protocol defines a service with a Check method.

syntax = "proto3";

package grpc.health.v1;

service Health {
  rpc Check(HealthCheckRequest) returns (HealthCheckResponse);
  rpc Watch(HealthCheckRequest) returns (stream HealthCheckResponse);
}

message HealthCheckRequest {
  string service = 1;
}

message HealthCheckResponse {
  enum ServingStatus {
    UNKNOWN = 0;
    SERVING = 1;
    NOT_SERVING = 2;
    SERVICE_UNKNOWN = 3;
  }
  ServingStatus status = 1;
}
require 'grpc/health/v1/health_pb'
require 'grpc/health/v1/health_services_pb'

class HealthServiceImpl < Grpc::Health::V1::Health::Service
  def initialize
    @status = {}
  end
  
  def check(request, _call)
    service = request.service
    status = @status[service] || :SERVING
    
    Grpc::Health::V1::HealthCheckResponse.new(status: status)
  end
  
  def watch(request, _call)
    service = request.service
    
    Enumerator.new do |yielder|
      loop do
        status = @status[service] || :SERVING
        yielder << Grpc::Health::V1::HealthCheckResponse.new(status: status)
        sleep 5
      end
    end
  end
  
  def set_status(service, status)
    @status[service] = status
  end
end

# Add to server
health_service = HealthServiceImpl.new
server.handle(health_service)

# Update health status
health_service.set_status('userservice.UserService', :SERVING)

Docker containers simplify deployment. Package the server application with its dependencies into a container image. Use multi-stage builds to minimize image size.

# Multi-stage build for Ruby gRPC service
FROM ruby:3.1 AS builder

WORKDIR /app
COPY Gemfile Gemfile.lock ./
RUN bundle install --jobs 4

FROM ruby:3.1-slim

WORKDIR /app
COPY --from=builder /usr/local/bundle /usr/local/bundle
COPY . .

EXPOSE 50051

CMD ["ruby", "server.rb"]

Reference

Protocol Buffers Scalar Types

Proto Type Ruby Type Default Value Notes
double Float 0.0 64-bit floating point
float Float 0.0 32-bit floating point
int32 Integer 0 Variable-length encoding
int64 Integer 0 Variable-length encoding
uint32 Integer 0 Unsigned variable-length
uint64 Integer 0 Unsigned variable-length
sint32 Integer 0 Signed variable-length
sint64 Integer 0 Signed variable-length
fixed32 Integer 0 Always 4 bytes
fixed64 Integer 0 Always 8 bytes
sfixed32 Integer 0 Signed, always 4 bytes
sfixed64 Integer 0 Signed, always 8 bytes
bool Boolean false True or false
string String "" UTF-8 or 7-bit ASCII
bytes String "" Arbitrary byte sequence

gRPC Status Codes

Code Name HTTP Equivalent Description
0 OK 200 Success
1 CANCELLED 499 Client cancelled operation
2 UNKNOWN 500 Unknown error
3 INVALID_ARGUMENT 400 Invalid parameters
4 DEADLINE_EXCEEDED 504 Operation timeout
5 NOT_FOUND 404 Resource not found
6 ALREADY_EXISTS 409 Resource already exists
7 PERMISSION_DENIED 403 Insufficient permissions
8 RESOURCE_EXHAUSTED 429 Rate limit exceeded
9 FAILED_PRECONDITION 400 Operation not allowed in current state
10 ABORTED 409 Concurrent conflict
11 OUT_OF_RANGE 400 Value out of range
12 UNIMPLEMENTED 501 Method not implemented
13 INTERNAL 500 Internal server error
14 UNAVAILABLE 503 Service unavailable
15 DATA_LOSS 500 Data corruption
16 UNAUTHENTICATED 401 Authentication required

Ruby gRPC Configuration Options

Option Type Default Description
pool_size Integer 30 Server thread pool size
max_waiting_requests Integer 60 Maximum queued requests
poll_period Integer 1 Polling interval in seconds
max_send_message_size Integer 4194304 Maximum send message size in bytes
max_receive_message_size Integer 4194304 Maximum receive message size in bytes
default_compression_algorithm Symbol nil Compression algorithm (:gzip)
default_compression_level Symbol :default Compression level (:high, :medium, :low)

Channel Arguments

Argument Type Description
grpc.keepalive_time_ms Integer Keep-alive ping interval
grpc.keepalive_timeout_ms Integer Keep-alive timeout
grpc.keepalive_permit_without_calls Integer Allow keep-alive without active calls
grpc.http2.max_pings_without_data Integer Maximum pings without data
grpc.max_connection_idle_ms Integer Connection idle timeout
grpc.max_connection_age_ms Integer Maximum connection age
grpc.initial_reconnect_backoff_ms Integer Initial reconnect backoff
grpc.max_reconnect_backoff_ms Integer Maximum reconnect backoff

Service Implementation Methods

Method Pattern Description Return Type
unary Single request, single response Message instance
server_stream Single request, multiple responses Enumerator
client_stream Multiple requests, single response Message instance
bidi_stream Multiple requests, multiple responses Enumerator

Proto Compiler Options

Option Description Example
--ruby_out Output directory for message classes --ruby_out=./lib
--grpc_out Output directory for service stubs --grpc_out=./lib
--proto_path (-I) Import search path -I ./proto
--descriptor_set_out Generate descriptor set --descriptor_set_out=desc.pb
--include_imports Include imported files in descriptor --include_imports

Common Interceptor Methods

Method Description Parameters
request_response Unary RPC interception request, call, method, metadata
client_streamer Client streaming interception requests, call, method, metadata
server_streamer Server streaming interception request, call, method, metadata
bidi_streamer Bidirectional streaming interception requests, call, method, metadata

TLS Certificate Types

Type Purpose File Extension
Root CA Certificate Verify server/client identity .crt, .pem
Server Certificate Server identity .crt, .pem
Server Private Key Server authentication .key
Client Certificate Client identity .crt, .pem
Client Private Key Client authentication .key