Overview
BASE represents a consistency model for distributed systems that prioritizes availability and partition tolerance over immediate consistency. The acronym stands for Basically Available, Soft state, and Eventually consistent. This model emerged as a response to the limitations of ACID (Atomicity, Consistency, Isolation, Durability) properties when applied to large-scale distributed systems.
The CAP theorem demonstrates that distributed systems cannot simultaneously guarantee Consistency, Availability, and Partition tolerance. BASE acknowledges this reality and chooses availability over strong consistency, accepting that data may temporarily differ across nodes but will eventually converge to a consistent state.
Unlike ACID systems where transactions complete fully or not at all with immediate consistency, BASE systems accept temporary inconsistencies. A write operation might succeed on one node while other nodes remain unaware of the change until replication occurs. During this window, different clients reading from different nodes may observe different values.
BASE originated from research into large-scale web applications where strict consistency requirements would severely limit scalability. Systems like Amazon's Dynamo and Google's Bigtable demonstrated that many applications could tolerate temporary inconsistencies in exchange for better availability and performance.
# ACID example - strong consistency
ActiveRecord::Base.transaction do
account.withdraw(100)
other_account.deposit(100)
# Both operations succeed or both fail
# All reads see consistent state immediately
end
# BASE example - eventual consistency
account.async_withdraw(100)
other_account.async_deposit(100)
# Operations may complete at different times
# Temporary inconsistency exists until convergence
The distinction matters significantly in system design. ACID systems block operations during conflicts, maintaining consistency at the cost of availability. BASE systems allow operations to proceed, accepting temporary divergence in exchange for continued service during network partitions or node failures.
Key Principles
BASE consists of three fundamental properties that work together to define the consistency model.
Basically Available means the system guarantees availability of data even during partial failures. The system responds to requests even when some nodes are unreachable or data is not fully consistent. This contrasts sharply with ACID systems that may block or fail requests when consistency cannot be guaranteed.
Availability in BASE does not mean every request receives the most recent write. Instead, it means the system provides a response, which might be stale data, a timeout indicator, or a best-effort result. The system remains operational even when network partitions occur or nodes fail.
class DistributedCache
def get(key)
# Try primary node first
value = primary_node.read(key)
return value if value
# Fall back to replica nodes
replica_nodes.each do |node|
value = node.read(key) rescue nil
return value if value
end
# Return cached/stale value rather than failing
stale_cache.read(key)
end
end
Soft State indicates that the state of the system may change over time without input due to eventual consistency. Data does not need to be consistent across all nodes at all times. The system state is "soft" rather than "hard" because it can evolve as replication and synchronization occur.
Soft state requires applications to handle scenarios where data changes between reads or where different reads return different values. This demands careful application design to handle inconsistencies gracefully rather than assuming strong consistency guarantees.
class EventuallyConsistentCounter
attr_reader :value, :last_updated
def initialize
@value = 0
@last_updated = Time.now
@pending_increments = []
end
def increment(amount)
# Record increment without immediate global update
@pending_increments << { amount: amount, timestamp: Time.now }
@value += amount
@last_updated = Time.now
# Asynchronously propagate to other nodes
propagate_async
end
def reconcile(remote_increments)
# Merge increments from other nodes
remote_increments.each do |inc|
unless @pending_increments.include?(inc)
@value += inc[:amount]
@pending_increments << inc
end
end
@last_updated = Time.now
end
end
Eventual Consistency guarantees that if no new updates are made to an object, all reads will eventually return the same value. The system converges toward consistency over time through replication and synchronization mechanisms. The convergence time is not specified and may vary based on network conditions, load, and system configuration.
Eventual consistency allows for multiple conflict resolution strategies. Systems may use timestamps to determine which write wins, use vector clocks to track causality, or employ application-specific conflict resolution logic. The choice of strategy significantly impacts how the system behaves during and after conflicts.
class EventuallyConsistentStore
def write(key, value, timestamp = Time.now)
# Write to local node immediately
local_store[key] = { value: value, timestamp: timestamp }
# Queue replication to other nodes
replication_queue << {
action: :write,
key: key,
value: value,
timestamp: timestamp
}
process_replication_queue_async
end
def read(key)
local_value = local_store[key]
# Optionally read from quorum for stronger consistency
if read_quorum_enabled?
values = read_from_multiple_nodes(key)
resolve_conflicts(values)
else
local_value[:value]
end
end
private
def resolve_conflicts(values)
# Last-write-wins conflict resolution
values.max_by { |v| v[:timestamp] }[:value]
end
end
The three properties work together to create a system that remains available during failures but trades strong consistency for this availability. The system accepts that data may be temporarily inconsistent but provides mechanisms for eventual convergence.
Design Considerations
Choosing BASE over ACID requires careful analysis of application requirements, acceptable trade-offs, and operational characteristics.
Consistency Requirements determine whether BASE is appropriate. Applications that require strong consistency guarantees, such as financial transactions or inventory management, may not suit BASE properties. Reading a bank account balance must reflect all completed transactions immediately. However, applications like social media feeds, caching layers, or analytics systems tolerate temporary inconsistencies.
Consider the impact of stale reads on application correctness. If a user updates their profile photo, can other users see the old photo briefly? If a product inventory decreases, can two users simultaneously purchase the last item? BASE systems answer "yes" to these scenarios.
# Inappropriate for BASE - financial transaction
class BankTransfer
def transfer(from_account, to_account, amount)
# Must maintain strong consistency
# Cannot allow temporary state where money disappears or duplicates
ActiveRecord::Base.transaction do
from_account.withdraw(amount)
to_account.deposit(amount)
end
end
end
# Appropriate for BASE - social media like
class PostLike
def like_post(user_id, post_id)
# Temporary inconsistency acceptable
# Like count may be slightly off briefly
redis.hincrby("post:#{post_id}:likes", user_id, 1)
# Asynchronously update persistent store
LikeWorker.perform_async(user_id, post_id)
end
end
Scalability Requirements often drive BASE adoption. Systems handling millions of operations per second across global deployments cannot maintain strong consistency without severe performance penalties. The coordination overhead of ensuring immediate consistency across geographically distributed data centers introduces unacceptable latency.
BASE systems scale horizontally by adding nodes without requiring complex coordination. Each node operates independently, accepting writes and reads without waiting for global consensus. Replication occurs asynchronously, allowing the system to handle increased load by distributing requests across more nodes.
Partition Tolerance determines how the system behaves during network failures. ACID systems typically block or fail operations when partitions occur to maintain consistency. BASE systems continue operating, accepting that different partitions may temporarily diverge.
During a partition, BASE systems make writes available on both sides. When the partition heals, the system must reconcile conflicting updates. The conflict resolution strategy depends on the application. Some systems use timestamps (last-write-wins), others use vector clocks or CRDTs (Conflict-free Replicated Data Types).
class PartitionTolerantCache
def initialize
@local_data = {}
@vector_clock = {}
@partition_detected = false
end
def write(key, value)
if @partition_detected
# Continue accepting writes during partition
@local_data[key] = value
@vector_clock[key] = increment_clock(key)
# Mark as needing reconciliation
@pending_reconciliation << key
else
# Normal replication
@local_data[key] = value
replicate_to_nodes(key, value)
end
end
def reconcile_after_partition
@pending_reconciliation.each do |key|
remote_values = fetch_from_all_nodes(key)
merged_value = merge_using_vector_clocks(key, remote_values)
@local_data[key] = merged_value
end
@partition_detected = false
@pending_reconciliation.clear
end
end
Read and Write Patterns influence BASE suitability. Systems with high read-to-write ratios benefit from BASE because read operations can be distributed across replicas without coordination. Write-heavy systems may experience more conflicts requiring resolution.
Applications with temporal tolerance for consistency, such as analytics dashboards or recommendation systems, work well with BASE. The data need not be perfectly current to provide value. Users understand that metrics might be slightly delayed.
Operational Complexity increases with BASE systems. Monitoring eventual consistency requires tracking replication lag, conflict rates, and convergence time. Debugging issues becomes harder when different nodes hold different data. Operations teams need tools and procedures for resolving split-brain scenarios and data conflicts.
Ruby Implementation
Ruby applications implement BASE properties through various patterns and libraries, particularly in web applications using Rails or other frameworks.
Asynchronous Processing forms the foundation of BASE implementations in Ruby. Background job frameworks like Sidekiq, Resque, or ActiveJob handle eventual consistency by processing updates asynchronously.
class UserProfileUpdateJob < ApplicationJob
queue_as :default
def perform(user_id, attributes)
user = User.find(user_id)
# Update primary database
user.update(attributes)
# Eventually update search index
UpdateSearchIndexJob.perform_later(user_id)
# Eventually update cache
InvalidateCacheJob.perform_later(user_id)
# Eventually update analytics
UpdateAnalyticsJob.perform_later(user_id, attributes)
rescue StandardError => e
# Retry logic handles temporary failures
retry_job wait: 5.minutes, queue: :low_priority
end
end
# Controller accepts request immediately
class UsersController < ApplicationController
def update
# Accept update without waiting for full consistency
UserProfileUpdateJob.perform_later(current_user.id, user_params)
render json: { status: 'accepted' }, status: :accepted
end
end
Caching Strategies implement soft state through Redis or Memcached. Cached data may become stale, representing soft state that eventually synchronizes with the source of truth.
class EventuallyConsistentCache
def initialize
@redis = Redis.new
@local_cache = {}
end
def write(key, value, ttl: 3600)
# Write to local cache immediately
@local_cache[key] = {
value: value,
timestamp: Time.now.to_i,
ttl: ttl
}
# Asynchronously write to Redis
Thread.new do
@redis.setex(key, ttl, Marshal.dump(value))
end
end
def read(key)
# Check local cache first (fastest, possibly stale)
if local_value = @local_cache[key]
return local_value[:value] if fresh?(local_value)
end
# Fall back to Redis (slower, more consistent)
if redis_value = @redis.get(key)
value = Marshal.load(redis_value)
@local_cache[key] = {
value: value,
timestamp: Time.now.to_i,
ttl: @redis.ttl(key)
}
return value
end
# Return stale data if available rather than failing
@local_cache[key]&.fetch(:value)
end
private
def fresh?(cached_value)
Time.now.to_i - cached_value[:timestamp] < cached_value[:ttl]
end
end
Database Replication in Ruby applications often exhibits BASE properties. Primary-replica setups accept writes on the primary while reads may occur on replicas with replication lag.
class Post < ApplicationRecord
# Configure read replicas
connects_to database: { writing: :primary, reading: :replica }
def self.eventually_consistent_find(id)
# Read from replica, accepting potential staleness
connected_to(role: :reading) do
find(id)
end
end
def self.create_with_eventual_read(attributes)
# Write to primary
post = connected_to(role: :writing) do
create(attributes)
end
# Immediate read might not find it on replica
# Return the created object rather than querying
post
end
end
class PostsController < ApplicationController
def show
# Accept potentially stale data from replica
@post = Post.eventually_consistent_find(params[:id])
rescue ActiveRecord::RecordNotFound
# Post might exist on primary but not yet replicated
retry_with_primary
end
private
def retry_with_primary
@post = Post.connected_to(role: :writing) do
Post.find(params[:id])
end
end
end
Event Sourcing implements eventual consistency by storing events and asynchronously updating projections.
class EventStore
def append_event(stream_id, event_type, data)
event = Event.create!(
stream_id: stream_id,
event_type: event_type,
data: data,
timestamp: Time.now
)
# Immediately return, projections updated asynchronously
ProjectionWorker.perform_async(event.id)
event
end
end
class ProjectionWorker
include Sidekiq::Worker
def perform(event_id)
event = Event.find(event_id)
# Update multiple projections eventually
update_read_model(event)
update_search_index(event)
update_analytics(event)
rescue StandardError => e
# Retry ensures eventual consistency
retry_job
end
private
def update_read_model(event)
case event.event_type
when 'user_registered'
User.create!(event.data)
when 'profile_updated'
User.find(event.stream_id).update!(event.data)
end
end
end
Conflict Resolution requires explicit implementation when multiple nodes accept concurrent writes.
class ConflictResolver
def resolve_user_profile(local_version, remote_version)
# Last-write-wins based on timestamp
return remote_version if remote_version[:updated_at] > local_version[:updated_at]
return local_version if local_version[:updated_at] > remote_version[:updated_at]
# Timestamps identical - merge fields
merge_profiles(local_version, remote_version)
end
private
def merge_profiles(local, remote)
merged = local.dup
# Field-level resolution
remote.each do |field, value|
# Prefer non-nil values
if merged[field].nil? && value.present?
merged[field] = value
elsif field_timestamp(remote, field) > field_timestamp(local, field)
merged[field] = value
end
end
merged
end
def field_timestamp(record, field)
record.dig(:field_timestamps, field) || record[:updated_at]
end
end
Distributed Counters handle concurrent increments without coordination.
class DistributedCounter
def initialize(key)
@key = key
@redis = Redis.new
@node_id = Socket.gethostname
end
def increment(amount = 1)
# Each node maintains its own counter
@redis.hincrby(@key, @node_id, amount)
end
def value
# Sum all node counters for eventual total
counts = @redis.hgetall(@key)
counts.values.map(&:to_i).sum
end
def merge_with_remote(remote_counts)
# Reconcile counts from other nodes
remote_counts.each do |node_id, count|
local_count = @redis.hget(@key, node_id).to_i
# Keep maximum (handles duplicate increments)
@redis.hset(@key, node_id, [local_count, count.to_i].max)
end
end
end
Practical Examples
Social Media Feed demonstrates BASE properties in a high-traffic scenario where immediate consistency is unnecessary.
class FeedService
def publish_post(user_id, content)
# Create post immediately
post = Post.create!(
user_id: user_id,
content: content,
published_at: Time.now
)
# Accept request without waiting for follower feeds
FanoutWorker.perform_async(post.id)
post
end
end
class FanoutWorker
include Sidekiq::Worker
def perform(post_id)
post = Post.find(post_id)
followers = User.find(post.user_id).followers
# Asynchronously add to each follower's feed
followers.find_each do |follower|
add_to_feed(follower.id, post.id)
end
end
private
def add_to_feed(user_id, post_id)
# Add to Redis sorted set (eventually consistent)
Redis.current.zadd(
"feed:#{user_id}",
Time.now.to_i,
post_id
)
rescue Redis::ConnectionError
# Retry on failure ensures eventual consistency
retry
end
end
class FeedController < ApplicationController
def show
# Read potentially stale feed from Redis
post_ids = Redis.current.zrevrange(
"feed:#{current_user.id}",
0,
49
)
# Posts may not all be propagated yet
@posts = Post.where(id: post_ids).order(published_at: :desc)
end
end
E-commerce Inventory shows BASE properties with compensation for overselling.
class InventoryService
def reserve_item(product_id, quantity)
# Optimistically decrement cached inventory
cached_inventory = decrement_cache(product_id, quantity)
if cached_inventory >= 0
# Accept reservation immediately
ReservationWorker.perform_async(product_id, quantity)
{ status: :reserved, inventory: cached_inventory }
else
# Reject if cache shows insufficient inventory
{ status: :insufficient, inventory: cached_inventory + quantity }
end
end
private
def decrement_cache(product_id, quantity)
Redis.current.decrby("inventory:#{product_id}", quantity)
end
end
class ReservationWorker
include Sidekiq::Worker
def perform(product_id, quantity)
# Eventually verify against database
product = Product.find(product_id)
if product.inventory >= quantity
# Confirm reservation
product.decrement!(:inventory, quantity)
else
# Oversold - compensate
handle_oversell(product_id, quantity)
end
end
private
def handle_oversell(product_id, quantity)
# Restore cache
Redis.current.incrby("inventory:#{product_id}", quantity)
# Notify customer of cancellation
OrderCancellationWorker.perform_async(product_id, quantity)
end
end
Analytics Pipeline implements BASE for high-volume data processing.
class EventTracker
def track_event(user_id, event_type, properties)
# Write to local buffer immediately
buffer = EventBuffer.current
buffer.append(
user_id: user_id,
event_type: event_type,
properties: properties,
timestamp: Time.now
)
# Flush buffer asynchronously
flush_buffer if buffer.should_flush?
true
end
end
class EventBuffer
BUFFER_SIZE = 1000
FLUSH_INTERVAL = 10.seconds
def initialize
@events = []
@last_flush = Time.now
@mutex = Mutex.new
end
def append(event)
@mutex.synchronize do
@events << event
end
end
def should_flush?
@events.size >= BUFFER_SIZE ||
Time.now - @last_flush > FLUSH_INTERVAL
end
def flush
events_to_flush = nil
@mutex.synchronize do
events_to_flush = @events.dup
@events.clear
@last_flush = Time.now
end
# Batch write to analytics store
AnalyticsStore.bulk_insert(events_to_flush)
end
end
class AnalyticsStore
def self.bulk_insert(events)
# Write to eventually consistent store
clickhouse_client.insert(
'events',
events.map { |e| format_event(e) }
)
rescue StandardError => e
# Retry ensures eventual consistency
RetryWorker.perform_in(1.minute, events)
end
end
Collaborative Editing uses BASE for concurrent document updates.
class DocumentService
def update_document(doc_id, user_id, changes)
# Apply changes locally immediately
doc = Document.find(doc_id)
version = doc.current_version + 1
# Create operational transformation
operation = Operation.create!(
document_id: doc_id,
user_id: user_id,
version: version,
changes: changes,
timestamp: Time.now
)
# Broadcast to other clients asynchronously
BroadcastWorker.perform_async(operation.id)
operation
end
end
class BroadcastWorker
include Sidekiq::Worker
def perform(operation_id)
operation = Operation.find(operation_id)
# Transform against concurrent operations
concurrent_ops = find_concurrent_operations(operation)
transformed = transform_operation(operation, concurrent_ops)
# Broadcast transformed operation
ActionCable.server.broadcast(
"document_#{operation.document_id}",
transformed
)
end
private
def transform_operation(operation, concurrent_ops)
# Operational transformation for conflict resolution
concurrent_ops.each do |concurrent_op|
operation = apply_transformation(operation, concurrent_op)
end
operation
end
end
Common Pitfalls
Read-After-Write Inconsistency occurs when a client writes data and immediately reads it back, receiving stale data because replication has not completed.
# Problematic code
def update_and_display
user.update(name: 'New Name')
# Read might hit replica that hasn't received update yet
updated_user = User.find(user.id)
updated_user.name # May still be 'Old Name'
end
# Solution: read from primary after write
def update_and_display
user.update(name: 'New Name')
# Force read from primary
updated_user = User.connected_to(role: :writing) do
User.find(user.id)
end
updated_user.name # Guaranteed to be 'New Name'
end
# Alternative: return written object
def update_and_display
user.update(name: 'New Name')
user.name # Use the object that was just updated
end
Lost Updates happen when concurrent modifications overwrite each other without conflict detection.
# Problematic code
def increment_counter(key)
current = redis.get(key).to_i
redis.set(key, current + 1) # Lost updates in concurrent scenarios
end
# Solution: use atomic operations
def increment_counter(key)
redis.incr(key) # Atomic increment
end
# For complex updates, use optimistic locking
class Document < ApplicationRecord
self.locking_column = :lock_version
def safe_update(attributes)
update!(attributes)
rescue ActiveRecord::StaleObjectError
# Conflict detected, reload and retry
reload
retry
end
end
Unbounded Replication Lag occurs when asynchronous replication falls behind faster than it catches up, leading to indefinite inconsistency.
# Monitor replication lag
class ReplicationMonitor
MAX_LAG_SECONDS = 60
def check_lag
lag = calculate_replication_lag
if lag > MAX_LAG_SECONDS
alert_operations(lag)
# Temporarily direct reads to primary
redirect_reads_to_primary
end
end
private
def calculate_replication_lag
primary_position = ActiveRecord::Base.connection.execute(
"SELECT pg_current_wal_lsn()"
).first['pg_current_wal_lsn']
replica_position = ActiveRecord::Base.connected_to(role: :reading) do
ActiveRecord::Base.connection.execute(
"SELECT pg_last_wal_replay_lsn()"
).first['pg_last_wal_replay_lsn']
end
calculate_lag_from_positions(primary_position, replica_position)
end
end
Conflicting Updates arise when different nodes accept writes for the same data without coordination.
# Implement vector clocks for conflict detection
class VectorClock
def initialize
@clocks = {}
end
def increment(node_id)
@clocks[node_id] = (@clocks[node_id] || 0) + 1
end
def happens_before?(other)
# Check if this clock happened before other
@clocks.all? do |node_id, count|
other.clocks[node_id].to_i >= count
end && @clocks != other.clocks
end
def concurrent?(other)
!happens_before?(other) && !other.happens_before?(self)
end
def merge(other)
merged = VectorClock.new
all_nodes = (@clocks.keys + other.clocks.keys).uniq
all_nodes.each do |node_id|
merged.clocks[node_id] = [
@clocks[node_id].to_i,
other.clocks[node_id].to_i
].max
end
merged
end
attr_reader :clocks
end
class VersionedDocument
attr_accessor :content, :vector_clock
def update(new_content, node_id)
@content = new_content
@vector_clock.increment(node_id)
end
def merge_with(other)
if @vector_clock.happens_before?(other.vector_clock)
# Other version is newer
@content = other.content
@vector_clock = other.vector_clock
elsif other.vector_clock.happens_before?(@vector_clock)
# This version is newer, keep it
elsif @vector_clock.concurrent?(other.vector_clock)
# Conflict - requires resolution
@content = resolve_conflict(@content, other.content)
@vector_clock = @vector_clock.merge(other.vector_clock)
end
end
end
Cascade Failures occur when eventual consistency delays propagate across dependent systems.
# Implement circuit breakers
class CircuitBreaker
def initialize(failure_threshold: 5, timeout: 60)
@failure_threshold = failure_threshold
@timeout = timeout
@failure_count = 0
@last_failure_time = nil
@state = :closed
end
def call
case @state
when :open
if Time.now - @last_failure_time > @timeout
@state = :half_open
attempt_call
else
raise CircuitOpenError
end
when :half_open, :closed
attempt_call
end
end
private
def attempt_call
yield
reset_failures
rescue StandardError => e
record_failure
raise e
end
def record_failure
@failure_count += 1
@last_failure_time = Time.now
if @failure_count >= @failure_threshold
@state = :open
end
end
def reset_failures
@failure_count = 0
@state = :closed
end
end
# Use circuit breaker for dependent services
class DependentService
def initialize
@circuit_breaker = CircuitBreaker.new
end
def call_external_service
@circuit_breaker.call do
HTTP.get(external_url)
end
rescue CircuitOpenError
# Return cached or default value
cached_response
end
end
Tools & Ecosystem
Redis provides data structures supporting eventual consistency with tunable durability.
class RedisBasedCache
def initialize
@redis = Redis.new(
# Asynchronous persistence
save: [[900, 1], [300, 10], [60, 10000]],
appendonly: true,
appendfsync: 'everysec' # Background sync
)
end
def eventually_consistent_set(key, value, ttl: 3600)
# Write returns before persistence completes
@redis.setex(key, ttl, value)
end
def replicated_increment(key)
# Replicated to followers asynchronously
@redis.incr(key)
end
end
Apache Cassandra provides tunable consistency levels for BASE semantics.
require 'cassandra'
class CassandraStore
def initialize
@cluster = Cassandra.cluster(
consistency: :one # Write to one node, eventual replication
)
@session = @cluster.connect('keyspace_name')
end
def write_eventual(key, value)
# Write succeeds when one node acknowledges
@session.execute(
'INSERT INTO data (key, value) VALUES (?, ?)',
arguments: [key, value],
consistency: :one
)
end
def read_eventual(key)
# Read from one node, may be stale
result = @session.execute(
'SELECT value FROM data WHERE key = ?',
arguments: [key],
consistency: :one
)
result.first['value']
end
def write_quorum(key, value)
# Trade availability for stronger consistency
@session.execute(
'INSERT INTO data (key, value) VALUES (?, ?)',
arguments: [key, value],
consistency: :quorum
)
end
end
Amazon DynamoDB implements eventual consistency with optional strong reads.
require 'aws-sdk-dynamodb'
class DynamoDBStore
def initialize
@client = Aws::DynamoDB::Client.new
@table_name = 'base_table'
end
def put_item_eventual(key, attributes)
# Eventually consistent write
@client.put_item(
table_name: @table_name,
item: attributes.merge({ id: key })
)
end
def get_item_eventual(key)
# Eventually consistent read (default)
@client.get_item(
table_name: @table_name,
key: { id: key }
).item
end
def get_item_strong(key)
# Optionally request strong consistency
@client.get_item(
table_name: @table_name,
key: { id: key },
consistent_read: true
).item
end
end
PostgreSQL Logical Replication supports BASE through asynchronous replication.
class PostgresReplicated
def configure_replication
# Primary database configuration
primary_conn.execute(<<-SQL)
ALTER SYSTEM SET wal_level = logical;
ALTER SYSTEM SET max_replication_slots = 4;
ALTER SYSTEM SET max_wal_senders = 4;
SQL
# Create publication for logical replication
primary_conn.execute(<<-SQL)
CREATE PUBLICATION base_publication
FOR ALL TABLES;
SQL
# On replica, create subscription
replica_conn.execute(<<-SQL)
CREATE SUBSCRIPTION base_subscription
CONNECTION 'host=primary_host dbname=mydb'
PUBLICATION base_publication;
SQL
end
def monitor_replication_status
primary_conn.execute(<<-SQL)
SELECT
slot_name,
plugin,
database,
active,
pg_wal_lsn_diff(pg_current_wal_lsn(), confirmed_flush_lsn) AS lag_bytes
FROM pg_replication_slots;
SQL
end
end
Sidekiq handles asynchronous job processing for eventual consistency.
class EventualConsistencyWorker
include Sidekiq::Worker
sidekiq_options retry: 25, dead: false
def perform(data_id, operation)
case operation
when 'replicate'
replicate_data(data_id)
when 'reconcile'
reconcile_conflicts(data_id)
when 'propagate'
propagate_changes(data_id)
end
rescue StandardError => e
# Retry with exponential backoff
# Ensures eventual consistency
raise e
end
private
def replicate_data(data_id)
data = PrimaryStore.find(data_id)
ReplicaStore.upsert(data)
end
end
# Configure Sidekiq for reliability
Sidekiq.configure_server do |config|
config.reliable_scheduler!
config.server_middleware do |chain|
chain.add Sidekiq::Middleware::Server::RetryJobs
end
end
EventMachine enables asynchronous operations for BASE implementations.
require 'eventmachine'
class AsyncReplicator
def replicate_to_nodes(key, value, nodes)
EM.run do
nodes.each do |node|
EM.defer(
-> { write_to_node(node, key, value) },
-> (result) { handle_write_result(node, result) }
)
end
# Don't wait for all writes to complete
EM.add_timer(0.1) { EM.stop }
end
end
private
def write_to_node(node, key, value)
HTTP.post(
"http://#{node}/data",
json: { key: key, value: value }
)
rescue StandardError => e
{ error: e.message, node: node }
end
def handle_write_result(node, result)
if result.is_a?(Hash) && result[:error]
# Queue for retry
RetryQueue.push(node: node, result: result)
end
end
end
Reference
BASE Properties Comparison
| Property | Description | System Behavior |
|---|---|---|
| Basically Available | System remains responsive during failures | Partial data or stale reads acceptable |
| Soft State | State changes without input due to consistency | Data propagates asynchronously across nodes |
| Eventually Consistent | All replicas converge given no new updates | Temporary divergence with eventual agreement |
ACID vs BASE Trade-offs
| Aspect | ACID | BASE |
|---|---|---|
| Consistency | Immediate, strong | Eventual, weak |
| Availability | Lower during partitions | Higher during partitions |
| Partition Tolerance | Blocks on partition | Continues on partition |
| Latency | Higher due to coordination | Lower, no coordination |
| Scalability | Limited by coordination | Highly scalable |
| Complexity | Simpler reasoning | Complex conflict resolution |
| Use Cases | Financial systems, inventory | Social media, analytics, caching |
Consistency Levels
| Level | Guarantees | Performance | Availability |
|---|---|---|---|
| Strong | All nodes agree before response | Slowest | Lowest |
| Bounded Staleness | Maximum staleness defined | Medium | Medium |
| Session | Consistency within session | Fast | High |
| Eventual | No immediate guarantee | Fastest | Highest |
| Consistent Prefix | Reads see prefix of writes | Fast | High |
Conflict Resolution Strategies
| Strategy | Description | Use Case |
|---|---|---|
| Last Write Wins | Timestamp determines winner | Simple data, acceptable data loss |
| Version Vectors | Track causality for detection | Complex data requiring conflict awareness |
| CRDT | Mathematically mergeable types | Counters, sets, collaborative editing |
| Application Logic | Custom merge function | Business-specific requirements |
| Multi-Version | Keep all versions | User-driven resolution needed |
Ruby Async Processing Gems
| Gem | Purpose | Consistency Support |
|---|---|---|
| Sidekiq | Background job processing | Retry mechanisms for eventual consistency |
| Resque | Redis-backed job queue | Manual retry logic |
| DelayedJob | Database-backed jobs | Transaction-based consistency |
| Sneakers | RabbitMQ job processing | Message acknowledgment patterns |
| GoodJob | Postgres-backed jobs | ACID within database |
| Que | Postgres queue | Transactional job creation |
Monitoring Metrics
| Metric | What to Monitor | Threshold Example |
|---|---|---|
| Replication Lag | Time delay between primary and replica | Alert if greater than 60 seconds |
| Conflict Rate | Number of conflicts per minute | Alert if greater than 10/min |
| Convergence Time | Time to reach consistency | Alert if greater than 5 minutes |
| Write Success Rate | Percentage of successful writes | Alert if less than 99.9% |
| Stale Read Rate | Percentage of stale reads served | Alert if greater than 5% |
| Queue Depth | Pending async operations | Alert if greater than 10000 |
Common Patterns Code Reference
# Pattern: Read-Your-Writes Consistency
class ReadYourWritesCache
def write_and_read(key, value)
write(key, value)
read_from_primary(key) # Ensure reading own writes
end
end
# Pattern: Optimistic Locking
class OptimisticDocument < ApplicationRecord
self.locking_column = :version
def safe_update(attrs)
update!(attrs)
rescue ActiveRecord::StaleObjectError
reload
retry
end
end
# Pattern: Idempotent Operations
class IdempotentWorker
def perform(operation_id, data)
return if already_processed?(operation_id)
process_data(data)
mark_processed(operation_id)
end
end
# Pattern: Compensating Transactions
class CompensatingTransaction
def execute_with_compensation
result = execute_operation
if invalid?(result)
compensate(result)
end
end
end