Overview
gRPC is a high-performance remote procedure call framework that enables client applications to directly call methods on server applications across different machines as if they were local objects. Developed by Google and released as open source in 2015, gRPC uses HTTP/2 for transport, Protocol Buffers as the interface definition language, and provides features including authentication, bidirectional streaming, flow control, and blocking or nonblocking bindings.
Protocol Buffers (protobuf) is a language-neutral, platform-neutral extensible mechanism for serializing structured data. Unlike JSON or XML, Protocol Buffers encodes data in a compact binary format that is smaller and faster to parse. The schema is defined in .proto files, which describe the structure of data and services. A compiler then generates source code for multiple programming languages from these definitions.
The combination of gRPC and Protocol Buffers addresses several challenges in distributed systems: type-safe communication contracts, efficient network utilization, automatic code generation for multiple languages, and support for both synchronous and asynchronous communication patterns. These technologies find extensive use in microservices architectures, internal APIs, and systems requiring high throughput and low latency.
A basic Protocol Buffers definition:
syntax = "proto3";
message User {
int32 id = 1;
string name = 2;
string email = 3;
}
A gRPC service definition:
syntax = "proto3";
service UserService {
rpc GetUser(GetUserRequest) returns (User);
rpc ListUsers(ListUsersRequest) returns (stream User);
}
message GetUserRequest {
int32 id = 1;
}
message User {
int32 id = 1;
string name = 2;
string email = 3;
}
Key Principles
Protocol Buffers operates on a schema-first approach where data structures are defined in .proto files using a simple interface definition language. Each field in a message has a unique numeric tag that identifies it in the binary encoding. The tag numbers 1-15 require one byte to encode, while higher numbers require more bytes, making lower numbers preferable for frequently used fields. The protocol supports three syntaxes: proto2, proto3, and editions, with proto3 being the current standard that simplifies the language and removes some proto2 features.
Field types in Protocol Buffers include scalar types (int32, int64, uint32, uint64, sint32, sint64, fixed32, fixed64, sfixed32, sfixed64, float, double, bool, string, bytes), enumerations, and nested messages. The proto3 syntax eliminates required fields, making all fields optional by default. Repeated fields create arrays or lists, while map fields provide key-value associations. The oneof keyword groups fields where only one can be set at a time, similar to a union type.
message Product {
int32 id = 1;
string name = 2;
double price = 3;
repeated string tags = 4;
map<string, string> metadata = 5;
oneof discount {
double percentage = 6;
double fixed_amount = 7;
}
}
The binary encoding format uses variable-length integers (varints) to encode numeric values efficiently. Small numbers use fewer bytes, making the format space-efficient for typical data. Each field in the binary format is a key-value pair where the key is the field number and wire type encoded together. The wire type indicates how to parse the value: varint, 64-bit, length-delimited, or 32-bit.
gRPC builds on Protocol Buffers by defining services with RPC methods. Each method specifies request and response message types. gRPC supports four communication patterns: unary (single request, single response), server streaming (single request, stream of responses), client streaming (stream of requests, single response), and bidirectional streaming (streams in both directions). The framework handles serialization, network transport, and deserialization automatically.
service OrderService {
// Unary RPC
rpc CreateOrder(Order) returns (OrderConfirmation);
// Server streaming
rpc TrackOrder(OrderId) returns (stream OrderStatus);
// Client streaming
rpc BatchCreateOrders(stream Order) returns (BatchResult);
// Bidirectional streaming
rpc ProcessOrders(stream Order) returns (stream OrderResult);
}
HTTP/2 serves as the transport protocol, providing multiplexing (multiple requests over a single connection), header compression, server push capabilities, and binary framing. The multiplexing feature allows multiple RPC calls to share a connection without head-of-line blocking. Flow control operates at both connection and stream levels, preventing fast senders from overwhelming slow receivers.
Metadata in gRPC passes additional information with requests and responses, such as authentication tokens, request IDs, or tracing information. Metadata appears as HTTP/2 headers and can be sent at the beginning of calls or as trailing metadata after responses. Deadlines specify how long a client waits for an RPC to complete, propagating through service call chains to prevent resource exhaustion. Status codes indicate success or various error conditions, following a standard set defined by gRPC.
Ruby Implementation
Ruby implements gRPC through the grpc gem, which provides both client and server capabilities. The google-protobuf gem handles Protocol Buffers compilation and runtime support. Installation requires both gems:
# Gemfile
gem 'grpc'
gem 'google-protobuf'
gem 'grpc-tools' # For protoc compiler
Protocol Buffer definitions compile to Ruby code using the grpc_tools_ruby_protoc compiler. The compiler generates two files from each .proto file: one containing message classes (_pb.rb) and one containing service stubs (_services_pb.rb). The message classes provide constructors, field accessors, and serialization methods.
# Generated from user.proto
# user_pb.rb
class User
include Google::Protobuf::MessageExts
extend Google::Protobuf::MessageExts::ClassMethods
end
# user_services_pb.rb
module UserService
class Service
include GRPC::GenericService
self.marshal_class_method = :encode
self.unmarshal_class_method = :decode
self.service_name = 'userservice.UserService'
rpc :GetUser, GetUserRequest, User
rpc :ListUsers, ListUsersRequest, stream(User)
end
Stub = Service.rpc_stub_class
end
Creating messages uses keyword arguments matching the proto field names. The generated classes handle type conversion and validation:
user = User.new(
id: 123,
name: "Alice Smith",
email: "alice@example.com"
)
# Access fields
puts user.name # => "Alice Smith"
# Modify fields
user.email = "alice.smith@example.com"
# Serialize to binary
binary_data = User.encode(user)
# Deserialize from binary
decoded_user = User.decode(binary_data)
Repeated fields in Ruby generate array accessors. Map fields generate hash accessors. The oneof keyword creates methods to check which field is set:
product = Product.new(
id: 1,
name: "Widget",
price: 29.99,
tags: ["electronics", "gadget"],
metadata: { "manufacturer" => "ACME", "warranty" => "1 year" }
)
# Add to repeated field
product.tags << "new-arrival"
# Access map
puts product.metadata["manufacturer"] # => "ACME"
# Set oneof field
product.percentage = 15.0
# Check which oneof field is set
puts product.discount # => :percentage
Implementing a gRPC server requires inheriting from the generated service class and implementing the RPC methods. The server runs on a specified port and handles concurrent requests:
require 'grpc'
require_relative 'user_services_pb'
class UserServiceImpl < UserService::Service
def get_user(request, _call)
# request is a GetUserRequest instance
user_id = request.id
# Database lookup (simplified)
user_data = find_user_by_id(user_id)
if user_data
User.new(
id: user_data[:id],
name: user_data[:name],
email: user_data[:email]
)
else
raise GRPC::NotFound.new("User #{user_id} not found")
end
end
def list_users(request, _call)
# Return an Enumerator for streaming
Enumerator.new do |yielder|
users = fetch_all_users
users.each do |user_data|
yielder << User.new(
id: user_data[:id],
name: user_data[:name],
email: user_data[:email]
)
end
end
end
private
def find_user_by_id(id)
# Database query implementation
end
def fetch_all_users
# Database query implementation
end
end
# Start the server
def main
server = GRPC::RpcServer.new
server.add_http2_port('0.0.0.0:50051', :this_port_is_insecure)
server.handle(UserServiceImpl.new)
puts "Server listening on 0.0.0.0:50051"
server.run_till_terminated_or_interrupted([1, 'int', 'SIGTERM'])
end
main
Creating a client uses the generated stub class. The stub provides methods corresponding to each RPC. Clients can pass metadata, set deadlines, and handle errors:
require 'grpc'
require_relative 'user_services_pb'
def create_client
stub = UserService::Stub.new(
'localhost:50051',
:this_channel_is_insecure
)
end
def get_user_example
stub = create_client
request = GetUserRequest.new(id: 123)
begin
response = stub.get_user(request, deadline: Time.now + 5)
puts "User: #{response.name} (#{response.email})"
rescue GRPC::NotFound => e
puts "User not found: #{e.message}"
rescue GRPC::DeadlineExceeded => e
puts "Request timed out: #{e.message}"
rescue GRPC::Unavailable => e
puts "Service unavailable: #{e.message}"
end
end
def list_users_example
stub = create_client
request = ListUsersRequest.new
begin
stream = stub.list_users(request)
stream.each do |user|
puts "User: #{user.name}"
end
rescue GRPC::BadStatus => e
puts "Error: #{e.message}"
end
end
Client streaming sends multiple requests and receives a single response. Bidirectional streaming allows concurrent sending and receiving:
def batch_create_orders(orders)
stub = OrderService::Stub.new('localhost:50051', :this_channel_is_insecure)
# Create an enumerable for requests
requests = orders.map { |order_data| Order.new(order_data) }
begin
result = stub.batch_create_orders(requests)
puts "Created #{result.count} orders"
rescue GRPC::BadStatus => e
puts "Batch creation failed: #{e.message}"
end
end
def process_orders_bidirectional
stub = OrderService::Stub.new('localhost:50051', :this_channel_is_insecure)
# Create request enumerator
requests = Enumerator.new do |yielder|
10.times do |i|
yielder << Order.new(id: i, amount: 100.0 * (i + 1))
sleep 0.1 # Simulate processing time
end
end
# Call bidirectional streaming method
stream = stub.process_orders(requests)
# Process responses as they arrive
stream.each do |result|
puts "Order #{result.order_id}: #{result.status}"
end
end
Interceptors in Ruby allow middleware-like functionality for cross-cutting concerns. Server interceptors process requests before they reach service methods. Client interceptors modify requests before sending or responses after receiving:
class LoggingInterceptor < GRPC::ServerInterceptor
def request_response(request:, call:, method:)
start_time = Time.now
puts "Request: #{method} - #{request.inspect}"
begin
response = yield
duration = Time.now - start_time
puts "Response: #{method} - Success (#{duration}s)"
response
rescue => e
duration = Time.now - start_time
puts "Response: #{method} - Error: #{e.message} (#{duration}s)"
raise
end
end
end
# Add interceptor to server
server = GRPC::RpcServer.new(interceptors: [LoggingInterceptor.new])
Practical Examples
Consider a real-time order processing system where clients submit orders and receive status updates. The service definition includes unary, server streaming, and bidirectional streaming:
syntax = "proto3";
package orderprocessing;
service OrderProcessor {
rpc SubmitOrder(Order) returns (OrderConfirmation);
rpc TrackOrderStatus(OrderId) returns (stream StatusUpdate);
rpc BatchProcessOrders(stream Order) returns (stream OrderResult);
}
message Order {
string customer_id = 1;
repeated OrderItem items = 2;
string delivery_address = 3;
PaymentInfo payment = 4;
}
message OrderItem {
string product_id = 1;
int32 quantity = 2;
double price = 3;
}
message PaymentInfo {
string method = 1;
string token = 2;
}
message OrderConfirmation {
string order_id = 1;
double total_amount = 2;
string estimated_delivery = 3;
}
message OrderId {
string id = 1;
}
message StatusUpdate {
string order_id = 1;
string status = 2;
string timestamp = 3;
string location = 4;
}
message OrderResult {
string order_id = 1;
bool success = 2;
string error_message = 3;
}
Server implementation handles order validation, inventory checking, and status tracking:
require 'grpc'
require_relative 'order_processing_services_pb'
class OrderProcessorImpl < OrderProcessor::Service
def initialize
@orders = {}
@status_subscribers = {}
end
def submit_order(order, _call)
# Validate order
raise GRPC::InvalidArgument.new("No items in order") if order.items.empty?
raise GRPC::InvalidArgument.new("No delivery address") if order.delivery_address.empty?
# Calculate total
total = order.items.sum { |item| item.price * item.quantity }
# Generate order ID
order_id = SecureRandom.uuid
# Store order
@orders[order_id] = {
customer_id: order.customer_id,
items: order.items,
total: total,
status: 'pending',
created_at: Time.now
}
# Start background processing
Thread.new { process_order(order_id) }
# Return confirmation
OrderConfirmation.new(
order_id: order_id,
total_amount: total,
estimated_delivery: (Time.now + 86400 * 3).strftime('%Y-%m-%d')
)
end
def track_order_status(order_id_msg, _call)
order_id = order_id_msg.id
raise GRPC::NotFound.new("Order not found") unless @orders.key?(order_id)
Enumerator.new do |yielder|
# Send initial status
order = @orders[order_id]
yielder << StatusUpdate.new(
order_id: order_id,
status: order[:status],
timestamp: order[:created_at].iso8601,
location: "Warehouse"
)
# Subscribe to future updates
queue = Queue.new
@status_subscribers[order_id] ||= []
@status_subscribers[order_id] << queue
# Stream updates as they arrive
loop do
update = queue.pop
break if update == :done
yielder << update
end
end
end
def batch_process_orders(requests, _call)
Enumerator.new do |yielder|
requests.each do |order|
begin
confirmation = submit_order(order, nil)
yielder << OrderResult.new(
order_id: confirmation.order_id,
success: true
)
rescue GRPC::BadStatus => e
yielder << OrderResult.new(
order_id: '',
success: false,
error_message: e.message
)
end
end
end
end
private
def process_order(order_id)
statuses = ['processing', 'shipped', 'out_for_delivery', 'delivered']
locations = ['Warehouse', 'Distribution Center', 'Local Facility', 'Delivered']
statuses.each_with_index do |status, index|
sleep 5 # Simulate processing time
@orders[order_id][:status] = status
update = StatusUpdate.new(
order_id: order_id,
status: status,
timestamp: Time.now.iso8601,
location: locations[index]
)
# Notify subscribers
@status_subscribers[order_id]&.each { |queue| queue << update }
end
# Signal completion
@status_subscribers[order_id]&.each { |queue| queue << :done }
@status_subscribers.delete(order_id)
end
end
Client implementation submits orders and tracks their status in real time:
require 'grpc'
require_relative 'order_processing_services_pb'
class OrderClient
def initialize(address)
@stub = OrderProcessor::Stub.new(address, :this_channel_is_insecure)
end
def submit_and_track_order
# Create order
order = Order.new(
customer_id: 'CUST123',
delivery_address: '123 Main St, City, State 12345',
items: [
OrderItem.new(product_id: 'PROD1', quantity: 2, price: 29.99),
OrderItem.new(product_id: 'PROD2', quantity: 1, price: 49.99)
],
payment: PaymentInfo.new(method: 'credit_card', token: 'tok_12345')
)
# Submit order
begin
confirmation = @stub.submit_order(order, deadline: Time.now + 10)
puts "Order submitted: #{confirmation.order_id}"
puts "Total: $#{confirmation.total_amount}"
puts "Estimated delivery: #{confirmation.estimated_delivery}"
# Track status
track_order(confirmation.order_id)
rescue GRPC::DeadlineExceeded
puts "Order submission timed out"
rescue GRPC::InvalidArgument => e
puts "Invalid order: #{e.message}"
end
end
def track_order(order_id)
request = OrderId.new(id: order_id)
begin
stream = @stub.track_order_status(request)
puts "\nTracking order #{order_id}:"
stream.each do |update|
puts " [#{update.timestamp}] #{update.status} - #{update.location}"
end
puts "Tracking completed"
rescue GRPC::NotFound
puts "Order not found"
rescue GRPC::Cancelled
puts "Tracking cancelled"
end
end
def batch_submit_orders(orders_data)
requests = orders_data.map { |data| create_order(data) }
puts "Submitting #{requests.size} orders..."
stream = @stub.batch_process_orders(requests.to_enum)
results = { success: 0, failed: 0 }
stream.each do |result|
if result.success
results[:success] += 1
puts " Order #{result.order_id}: Success"
else
results[:failed] += 1
puts " Order failed: #{result.error_message}"
end
end
puts "\nBatch complete: #{results[:success]} succeeded, #{results[:failed]} failed"
end
private
def create_order(data)
Order.new(
customer_id: data[:customer_id],
delivery_address: data[:address],
items: data[:items].map do |item|
OrderItem.new(
product_id: item[:product_id],
quantity: item[:quantity],
price: item[:price]
)
end,
payment: PaymentInfo.new(
method: data[:payment_method],
token: data[:payment_token]
)
)
end
end
# Usage
client = OrderClient.new('localhost:50051')
client.submit_and_track_order
Another example demonstrates a notification service with metadata for authentication and distributed tracing:
class AuthInterceptor < GRPC::ClientInterceptor
def initialize(auth_token)
@auth_token = auth_token
end
def request_response(request:, call:, method:, metadata:)
metadata['authorization'] = "Bearer #{@auth_token}"
metadata['x-request-id'] = SecureRandom.uuid
metadata['x-client-version'] = '1.0.0'
yield
end
end
class AuthenticatedClient
def initialize(address, auth_token)
interceptor = AuthInterceptor.new(auth_token)
@stub = NotificationService::Stub.new(
address,
:this_channel_is_insecure,
interceptors: [interceptor]
)
end
def send_notification(user_id, message)
request = Notification.new(
user_id: user_id,
message: message,
timestamp: Time.now.to_i
)
@stub.send_notification(request)
end
end
# Server-side metadata handling
class NotificationServiceImpl < NotificationService::Service
def send_notification(request, call)
# Extract metadata
metadata = call.metadata
auth_header = metadata['authorization']
request_id = metadata['x-request-id']
# Validate authentication
unless valid_token?(auth_header)
raise GRPC::Unauthenticated.new('Invalid authentication token')
end
# Process with tracing context
puts "Processing request #{request_id}"
# Implementation
NotificationResponse.new(success: true)
end
private
def valid_token?(header)
return false unless header&.start_with?('Bearer ')
token = header.split(' ', 2)[1]
# Token validation logic
!token.nil? && token.length > 0
end
end
Performance Considerations
Protocol Buffers binary encoding provides significant performance advantages over text-based formats. JSON encoding of a typical message requires 80-100 bytes, while the equivalent Protocol Buffers message uses 30-40 bytes. The compact representation reduces network bandwidth and serialization overhead. The binary format also parses faster because it avoids string parsing and conversion.
Field numbering affects encoding size. Fields numbered 1-15 require one byte for the tag, while fields 16-2047 require two bytes. Frequently accessed fields should use lower numbers. The varint encoding for integers stores small numbers efficiently: values 0-127 use one byte, 128-16,383 use two bytes, and so on. Fixed-size types (fixed32, fixed64) use constant space but may be larger for small values.
# Inefficient field ordering
message UserProfile {
string bio = 1; // Large field with low number
repeated string hobbies = 2; // Large repeated field
int32 id = 16; // Small, frequently accessed field with high number
string name = 17; // Frequently accessed with high number
}
# Optimized field ordering
message UserProfile {
int32 id = 1; // Small, frequent field gets low number
string name = 2; // Frequent field gets low number
string bio = 3;
repeated string hobbies = 4;
}
HTTP/2 multiplexing allows multiple concurrent RPC calls over a single TCP connection. Connection pooling becomes less critical compared to HTTP/1.1, but maintaining a reasonable pool size (5-10 connections) still provides benefits for very high throughput scenarios. Each connection handles multiple streams, and the HTTP/2 flow control prevents any single stream from monopolizing bandwidth.
# Connection pool implementation
class ConnectionPool
def initialize(address, pool_size: 5)
@address = address
@pool = Array.new(pool_size) do
UserService::Stub.new(address, :this_channel_is_insecure)
end
@index = Atomic.new(0)
end
def with_client
client = @pool[@index.value % @pool.size]
@index.increment
yield client
end
end
# Usage
pool = ConnectionPool.new('localhost:50051', pool_size: 8)
threads = 100.times.map do
Thread.new do
pool.with_client do |stub|
request = GetUserRequest.new(id: rand(1000))
stub.get_user(request)
end
end
end
threads.each(&:join)
Streaming RPCs reduce latency for scenarios requiring continuous data flow. Server streaming sends results as they become available rather than buffering all responses. This improves time-to-first-byte and reduces memory usage for large result sets. Client streaming reduces round trips by sending multiple requests in a single call. Bidirectional streaming enables full-duplex communication where both sides send and receive concurrently.
Message size affects performance. Very large messages (megabytes) increase serialization time and memory usage. Splitting large payloads into smaller chunks and using streaming improves throughput. The default message size limit is 4MB, configurable through server and client options. Exceeding limits results in RESOURCE_EXHAUSTED errors.
# Chunked file transfer using streaming
def upload_large_file(stub, file_path, chunk_size: 1024 * 1024)
chunks = Enumerator.new do |yielder|
File.open(file_path, 'rb') do |file|
while chunk = file.read(chunk_size)
yielder << FileChunk.new(
data: chunk,
offset: file.pos - chunk.size
)
end
end
end
stub.upload_file(chunks)
end
# Configure larger message size
server = GRPC::RpcServer.new(
max_send_message_size: 10 * 1024 * 1024,
max_receive_message_size: 10 * 1024 * 1024
)
Keep-alive settings maintain connections during idle periods. Without keep-alive, intermediate proxies may close idle connections. The keep-alive interval should balance connection maintenance against unnecessary traffic. Typical values range from 30 seconds to 5 minutes depending on infrastructure.
# Client with keep-alive configuration
stub = UserService::Stub.new(
'localhost:50051',
:this_channel_is_insecure,
channel_args: {
'grpc.keepalive_time_ms' => 30000,
'grpc.keepalive_timeout_ms' => 10000,
'grpc.keepalive_permit_without_calls' => 1,
'grpc.http2.max_pings_without_data' => 0
}
)
Compression reduces network transfer size at the cost of CPU usage. gRPC supports gzip compression, enabled per-call or globally. Compression benefits vary by data characteristics: structured data with repeated values compresses well, while binary data may not. Measure the trade-off between bandwidth savings and CPU overhead.
# Enable compression for a call
response = stub.get_user(
request,
metadata: { 'grpc-internal-encoding-request' => 'gzip' }
)
# Server-side compression
server = GRPC::RpcServer.new(
default_compression_algorithm: :gzip,
default_compression_level: :high
)
Thread pool sizing affects concurrency. The Ruby gRPC server uses a thread pool to handle requests. Pool size should match expected concurrent request load. Too few threads cause queueing delays. Too many threads increase context switching overhead and memory usage. A starting point is 2-4 threads per CPU core, adjusted based on workload characteristics.
# Configure server thread pool
server = GRPC::RpcServer.new(
pool_size: 32,
max_waiting_requests: 100
)
Security Implications
Transport security in gRPC uses TLS for encryption and authentication. Production deployments should always use TLS to protect data in transit and verify server identity. Self-signed certificates work for development, but production systems require certificates from trusted certificate authorities.
# Server with TLS
require 'grpc'
def load_certificates
root_cert = File.read('ca.crt')
private_key = File.read('server.key')
cert_chain = File.read('server.crt')
[private_key, cert_chain, root_cert]
end
def create_secure_server
server = GRPC::RpcServer.new
private_key, cert_chain, root_cert = load_certificates
creds = GRPC::Core::ServerCredentials.new(
root_cert,
[{ private_key: private_key, cert_chain: cert_chain }],
true # Require client certificates
)
server.add_http2_port('0.0.0.0:50051', creds)
server.handle(UserServiceImpl.new)
server
end
# Client with TLS
def create_secure_client
root_cert = File.read('ca.crt')
client_key = File.read('client.key')
client_cert = File.read('client.crt')
call_creds = GRPC::Core::CallCredentials.new(proc do |context|
# Add authentication metadata
{ 'authorization' => "Bearer #{get_token}" }
end)
channel_creds = GRPC::Core::ChannelCredentials.new(
root_cert,
client_key,
client_cert
)
combined_creds = channel_creds.compose(call_creds)
UserService::Stub.new('localhost:50051', combined_creds)
end
Authentication mechanisms include TLS client certificates, token-based authentication, and OAuth2. Token-based authentication passes credentials in metadata, typically using bearer tokens. Interceptors centralize authentication logic and token refresh.
class AuthenticationInterceptor < GRPC::ServerInterceptor
def request_response(request:, call:, method:)
metadata = call.metadata
auth_header = metadata['authorization']
unless auth_header&.start_with?('Bearer ')
raise GRPC::Unauthenticated.new('Missing authentication token')
end
token = auth_header.split(' ', 2)[1]
begin
claims = verify_token(token)
call.output_metadata['user-id'] = claims['user_id']
call.output_metadata['roles'] = claims['roles'].join(',')
rescue => e
raise GRPC::Unauthenticated.new("Invalid token: #{e.message}")
end
yield
end
private
def verify_token(token)
# JWT verification, OAuth validation, etc.
JWT.decode(token, public_key, true, algorithm: 'RS256')[0]
end
end
# Add to server
server = GRPC::RpcServer.new(
interceptors: [AuthenticationInterceptor.new]
)
Authorization determines what authenticated users can access. Service methods check permissions before processing requests. Role-based access control (RBAC) maps users to roles and roles to permissions.
class AuthorizationInterceptor < GRPC::ServerInterceptor
PERMISSIONS = {
'/userservice.UserService/GetUser' => ['user', 'admin'],
'/userservice.UserService/DeleteUser' => ['admin'],
'/userservice.UserService/ListUsers' => ['admin']
}
def request_response(request:, call:, method:)
required_roles = PERMISSIONS[method]
if required_roles
user_roles = call.metadata['roles']&.split(',') || []
unless (required_roles & user_roles).any?
raise GRPC::PermissionDenied.new(
"Insufficient permissions for #{method}"
)
end
end
yield
end
end
Input validation prevents injection attacks and ensures data integrity. Protocol Buffers type system provides basic validation, but application-level checks remain necessary. Validate string lengths, numeric ranges, and business logic constraints. Reject malformed data early to prevent resource exhaustion.
def create_user(request, _call)
# Validate required fields
raise GRPC::InvalidArgument.new('Name required') if request.name.empty?
raise GRPC::InvalidArgument.new('Email required') if request.email.empty?
# Validate formats
unless request.email.match?(/\A[\w+\-.]+@[a-z\d\-]+(\.[a-z\d\-]+)*\.[a-z]+\z/i)
raise GRPC::InvalidArgument.new('Invalid email format')
end
# Validate lengths
if request.name.length > 100
raise GRPC::InvalidArgument.new('Name too long (max 100 characters)')
end
# Validate ranges
if request.age && (request.age < 0 || request.age > 150)
raise GRPC::InvalidArgument.new('Invalid age')
end
# Business logic validation
if User.exists?(email: request.email)
raise GRPC::AlreadyExists.new('Email already registered')
end
# Proceed with creation
end
Rate limiting prevents abuse and resource exhaustion. Implement rate limiting at the interceptor level, tracking requests per client or per user. Use token bucket or sliding window algorithms.
class RateLimitInterceptor < GRPC::ServerInterceptor
def initialize(limit: 100, window: 60)
@limit = limit
@window = window
@requests = Hash.new { |h, k| h[k] = [] }
@mutex = Mutex.new
end
def request_response(request:, call:, method:)
client_id = call.metadata['client-id'] || call.peer
@mutex.synchronize do
now = Time.now
@requests[client_id].reject! { |time| now - time > @window }
if @requests[client_id].size >= @limit
raise GRPC::ResourceExhausted.new(
"Rate limit exceeded: #{@limit} requests per #{@window} seconds"
)
end
@requests[client_id] << now
end
yield
end
end
Secrets management requires secure storage and transmission of sensitive data. Never hardcode credentials in source code. Use environment variables, secret management services (HashiCorp Vault, AWS Secrets Manager), or secure configuration files with restricted permissions.
# Load credentials from environment or secret manager
def get_database_credentials
{
host: ENV['DB_HOST'],
username: ENV['DB_USERNAME'],
password: fetch_secret('db-password'),
database: ENV['DB_NAME']
}
end
def fetch_secret(key)
# Integration with secret management service
secrets_manager = Aws::SecretsManager::Client.new
response = secrets_manager.get_secret_value(secret_id: key)
JSON.parse(response.secret_string)['password']
end
Tools & Ecosystem
The Protocol Buffers compiler (protoc) generates code from .proto files. Install through package managers or download from GitHub releases. The compiler requires language plugins for code generation. Ruby uses the grpc-tools gem which includes the necessary plugins.
# Install protoc and Ruby plugin
gem install grpc-tools
# Generate Ruby code
grpc_tools_ruby_protoc \
--ruby_out=./lib \
--grpc_out=./lib \
--proto_path=./proto \
./proto/user.proto
# Generate with custom options
grpc_tools_ruby_protoc \
-I ./proto \
-I ./third_party/proto \
--ruby_out=./lib \
--grpc_out=./lib \
./proto/*.proto
Protocol Buffers supports importing other proto files for code reuse. Common message types and service definitions can be shared across services. The proto_path flag specifies import directories.
// common/types.proto
syntax = "proto3";
package common;
message Address {
string street = 1;
string city = 2;
string state = 3;
string zip_code = 4;
}
// user/user.proto
syntax = "proto3";
package user;
import "common/types.proto";
message User {
string id = 1;
string name = 2;
common.Address address = 3;
}
Well-known types provide standard messages for common scenarios. The google/protobuf package includes types like Timestamp, Duration, Any, and Struct. These types ensure consistency across services and languages.
syntax = "proto3";
import "google/protobuf/timestamp.proto";
import "google/protobuf/duration.proto";
import "google/protobuf/struct.proto";
message Event {
string id = 1;
google.protobuf.Timestamp created_at = 2;
google.protobuf.Duration duration = 3;
google.protobuf.Struct metadata = 4;
}
require 'google/protobuf/well_known_types'
event = Event.new(
id: 'evt_123',
created_at: Google::Protobuf::Timestamp.new(seconds: Time.now.to_i),
duration: Google::Protobuf::Duration.new(seconds: 3600),
metadata: Google::Protobuf::Struct.from_hash({
'priority' => 'high',
'tags' => ['important', 'urgent']
})
)
Reflection allows services to be introspected at runtime. The gRPC reflection protocol enables tools like grpcurl and grpcui to interact with services without compiled client code. Enable reflection on development servers for testing and debugging.
require 'grpc/reflection'
server = GRPC::RpcServer.new
server.add_http2_port('0.0.0.0:50051', :this_port_is_insecure)
server.handle(UserServiceImpl.new)
# Enable reflection
reflection_service = GRPC::Reflection::ReflectionService.new([
UserService::Service
])
server.handle(reflection_service)
server.run_till_terminated
Command-line tools aid development and debugging. grpcurl makes gRPC requests from the terminal, similar to curl for HTTP. grpcui provides a web interface for exploring and testing services.
# List services using reflection
grpcurl -plaintext localhost:50051 list
# List methods for a service
grpcurl -plaintext localhost:50051 list userservice.UserService
# Call a method
grpcurl -plaintext \
-d '{"id": 123}' \
localhost:50051 \
userservice.UserService/GetUser
# Use grpcui for web interface
grpcui -plaintext localhost:50051
Logging and monitoring track service health and performance. Structured logging with request IDs enables tracing requests across service boundaries. Metrics collection tracks request rates, latencies, and error rates. Distributed tracing systems like Jaeger or Zipkin provide end-to-end visibility.
require 'logger'
class LoggingInterceptor < GRPC::ServerInterceptor
def initialize
@logger = Logger.new(STDOUT)
@logger.formatter = proc do |severity, datetime, progname, msg|
JSON.generate({
timestamp: datetime.iso8601,
severity: severity,
message: msg
}) + "\n"
end
end
def request_response(request:, call:, method:)
request_id = call.metadata['x-request-id'] || SecureRandom.uuid
start_time = Time.now
@logger.info({
event: 'request_start',
request_id: request_id,
method: method,
peer: call.peer
})
begin
response = yield
duration = Time.now - start_time
@logger.info({
event: 'request_complete',
request_id: request_id,
method: method,
duration: duration,
status: 'success'
})
response
rescue => e
duration = Time.now - start_time
@logger.error({
event: 'request_error',
request_id: request_id,
method: method,
duration: duration,
error: e.class.name,
message: e.message
})
raise
end
end
end
Health checking services respond to health check requests, used by load balancers and orchestration systems. The standard health check protocol defines a service with a Check method.
syntax = "proto3";
package grpc.health.v1;
service Health {
rpc Check(HealthCheckRequest) returns (HealthCheckResponse);
rpc Watch(HealthCheckRequest) returns (stream HealthCheckResponse);
}
message HealthCheckRequest {
string service = 1;
}
message HealthCheckResponse {
enum ServingStatus {
UNKNOWN = 0;
SERVING = 1;
NOT_SERVING = 2;
SERVICE_UNKNOWN = 3;
}
ServingStatus status = 1;
}
require 'grpc/health/v1/health_pb'
require 'grpc/health/v1/health_services_pb'
class HealthServiceImpl < Grpc::Health::V1::Health::Service
def initialize
@status = {}
end
def check(request, _call)
service = request.service
status = @status[service] || :SERVING
Grpc::Health::V1::HealthCheckResponse.new(status: status)
end
def watch(request, _call)
service = request.service
Enumerator.new do |yielder|
loop do
status = @status[service] || :SERVING
yielder << Grpc::Health::V1::HealthCheckResponse.new(status: status)
sleep 5
end
end
end
def set_status(service, status)
@status[service] = status
end
end
# Add to server
health_service = HealthServiceImpl.new
server.handle(health_service)
# Update health status
health_service.set_status('userservice.UserService', :SERVING)
Docker containers simplify deployment. Package the server application with its dependencies into a container image. Use multi-stage builds to minimize image size.
# Multi-stage build for Ruby gRPC service
FROM ruby:3.1 AS builder
WORKDIR /app
COPY Gemfile Gemfile.lock ./
RUN bundle install --jobs 4
FROM ruby:3.1-slim
WORKDIR /app
COPY --from=builder /usr/local/bundle /usr/local/bundle
COPY . .
EXPOSE 50051
CMD ["ruby", "server.rb"]
Reference
Protocol Buffers Scalar Types
| Proto Type | Ruby Type | Default Value | Notes |
|---|---|---|---|
| double | Float | 0.0 | 64-bit floating point |
| float | Float | 0.0 | 32-bit floating point |
| int32 | Integer | 0 | Variable-length encoding |
| int64 | Integer | 0 | Variable-length encoding |
| uint32 | Integer | 0 | Unsigned variable-length |
| uint64 | Integer | 0 | Unsigned variable-length |
| sint32 | Integer | 0 | Signed variable-length |
| sint64 | Integer | 0 | Signed variable-length |
| fixed32 | Integer | 0 | Always 4 bytes |
| fixed64 | Integer | 0 | Always 8 bytes |
| sfixed32 | Integer | 0 | Signed, always 4 bytes |
| sfixed64 | Integer | 0 | Signed, always 8 bytes |
| bool | Boolean | false | True or false |
| string | String | "" | UTF-8 or 7-bit ASCII |
| bytes | String | "" | Arbitrary byte sequence |
gRPC Status Codes
| Code | Name | HTTP Equivalent | Description |
|---|---|---|---|
| 0 | OK | 200 | Success |
| 1 | CANCELLED | 499 | Client cancelled operation |
| 2 | UNKNOWN | 500 | Unknown error |
| 3 | INVALID_ARGUMENT | 400 | Invalid parameters |
| 4 | DEADLINE_EXCEEDED | 504 | Operation timeout |
| 5 | NOT_FOUND | 404 | Resource not found |
| 6 | ALREADY_EXISTS | 409 | Resource already exists |
| 7 | PERMISSION_DENIED | 403 | Insufficient permissions |
| 8 | RESOURCE_EXHAUSTED | 429 | Rate limit exceeded |
| 9 | FAILED_PRECONDITION | 400 | Operation not allowed in current state |
| 10 | ABORTED | 409 | Concurrent conflict |
| 11 | OUT_OF_RANGE | 400 | Value out of range |
| 12 | UNIMPLEMENTED | 501 | Method not implemented |
| 13 | INTERNAL | 500 | Internal server error |
| 14 | UNAVAILABLE | 503 | Service unavailable |
| 15 | DATA_LOSS | 500 | Data corruption |
| 16 | UNAUTHENTICATED | 401 | Authentication required |
Ruby gRPC Configuration Options
| Option | Type | Default | Description |
|---|---|---|---|
| pool_size | Integer | 30 | Server thread pool size |
| max_waiting_requests | Integer | 60 | Maximum queued requests |
| poll_period | Integer | 1 | Polling interval in seconds |
| max_send_message_size | Integer | 4194304 | Maximum send message size in bytes |
| max_receive_message_size | Integer | 4194304 | Maximum receive message size in bytes |
| default_compression_algorithm | Symbol | nil | Compression algorithm (:gzip) |
| default_compression_level | Symbol | :default | Compression level (:high, :medium, :low) |
Channel Arguments
| Argument | Type | Description |
|---|---|---|
| grpc.keepalive_time_ms | Integer | Keep-alive ping interval |
| grpc.keepalive_timeout_ms | Integer | Keep-alive timeout |
| grpc.keepalive_permit_without_calls | Integer | Allow keep-alive without active calls |
| grpc.http2.max_pings_without_data | Integer | Maximum pings without data |
| grpc.max_connection_idle_ms | Integer | Connection idle timeout |
| grpc.max_connection_age_ms | Integer | Maximum connection age |
| grpc.initial_reconnect_backoff_ms | Integer | Initial reconnect backoff |
| grpc.max_reconnect_backoff_ms | Integer | Maximum reconnect backoff |
Service Implementation Methods
| Method Pattern | Description | Return Type |
|---|---|---|
| unary | Single request, single response | Message instance |
| server_stream | Single request, multiple responses | Enumerator |
| client_stream | Multiple requests, single response | Message instance |
| bidi_stream | Multiple requests, multiple responses | Enumerator |
Proto Compiler Options
| Option | Description | Example |
|---|---|---|
| --ruby_out | Output directory for message classes | --ruby_out=./lib |
| --grpc_out | Output directory for service stubs | --grpc_out=./lib |
| --proto_path (-I) | Import search path | -I ./proto |
| --descriptor_set_out | Generate descriptor set | --descriptor_set_out=desc.pb |
| --include_imports | Include imported files in descriptor | --include_imports |
Common Interceptor Methods
| Method | Description | Parameters |
|---|---|---|
| request_response | Unary RPC interception | request, call, method, metadata |
| client_streamer | Client streaming interception | requests, call, method, metadata |
| server_streamer | Server streaming interception | request, call, method, metadata |
| bidi_streamer | Bidirectional streaming interception | requests, call, method, metadata |
TLS Certificate Types
| Type | Purpose | File Extension |
|---|---|---|
| Root CA Certificate | Verify server/client identity | .crt, .pem |
| Server Certificate | Server identity | .crt, .pem |
| Server Private Key | Server authentication | .key |
| Client Certificate | Client identity | .crt, .pem |
| Client Private Key | Client authentication | .key |