Overview
MapReduce is a programming model designed for processing large volumes of data in parallel across distributed computing environments. The paradigm divides computation into two primary phases: a map phase that processes input data into intermediate key-value pairs, and a reduce phase that aggregates these intermediate values to produce final results.
Google introduced MapReduce in 2004 to address the challenges of processing massive datasets across thousands of machines. The model abstracts the complexities of parallelization, fault tolerance, data distribution, and load balancing, allowing developers to focus on the data transformation logic rather than distributed systems infrastructure.
The paradigm takes inspiration from functional programming concepts, particularly the map and reduce operations found in languages like Lisp. However, MapReduce extends these concepts to operate on distributed datasets that exceed the capacity of single machines.
MapReduce operates on data structured as key-value pairs. The framework handles splitting input data, distributing tasks across cluster nodes, managing communication between nodes, and aggregating results. This abstraction makes it accessible to developers without deep distributed systems expertise.
# Conceptual MapReduce flow
input_data = ["hello world", "hello ruby", "world map"]
# Map phase: emit key-value pairs
mapped = input_data.flat_map do |line|
line.split.map { |word| [word, 1] }
end
# => [["hello", 1], ["world", 1], ["hello", 1], ["ruby", 1], ["world", 1], ["map", 1]]
# Shuffle phase: group by key
shuffled = mapped.group_by(&:first).transform_values { |pairs| pairs.map(&:last) }
# => {"hello"=>[1, 1], "world"=>[1, 1], "ruby"=>[1], "map"=>[1]}
# Reduce phase: aggregate values
reduced = shuffled.transform_values(&:sum)
# => {"hello"=>2, "world"=>2, "ruby"=>1, "map"=>1}
The paradigm excels at embarrassingly parallel problems where data can be processed independently without requiring communication between parallel tasks. Common applications include log analysis, inverted index construction, distributed sorting, graph processing, and statistical analysis.
Key Principles
MapReduce decomposes data processing into distinct computational phases, each with specific responsibilities and constraints. Understanding these phases and their interactions is fundamental to applying the paradigm correctly.
The map phase transforms input records into intermediate key-value pairs. Each mapper operates independently on a subset of input data, applying a user-defined function to generate zero or more output pairs. Mappers process data locally without knowledge of other mappers' operations, enabling parallel execution. The map function signature takes an input key-value pair and emits intermediate key-value pairs.
The shuffle phase occurs between map and reduce operations. The framework partitions intermediate key-value pairs by key, ensuring all values for a given key reach the same reducer. This phase involves sorting intermediate data and transferring it across the network to appropriate reducer nodes. The shuffle phase represents a synchronization point where all mappers must complete before reducers begin.
The reduce phase aggregates all values associated with each intermediate key to produce final output. Each reducer processes one or more keys, applying a user-defined reduction function to combine values. Reducers operate independently on their assigned keys, enabling parallel reduction.
# Map function signature
def map(input_key, input_value)
# Process input and emit intermediate pairs
emit(intermediate_key, intermediate_value)
end
# Reduce function signature
def reduce(intermediate_key, values_iterator)
# Aggregate values and emit final result
emit(output_key, output_value)
end
Data locality optimization places computation near data storage to minimize network transfer. The framework schedules map tasks on nodes storing the input data blocks. When data locality is impossible, the scheduler prefers nodes in the same network rack to reduce bandwidth consumption.
Fault tolerance mechanisms detect and recover from failures automatically. The framework monitors task execution through heartbeat messages. When a node fails, the framework reschedules incomplete tasks on healthy nodes. Completed map tasks on failed nodes must re-execute because their intermediate output becomes unavailable, while completed reduce tasks persist their output to stable storage.
Combiners optimize performance by performing partial reduction locally on mapper nodes before the shuffle phase. A combiner function has the same signature as the reduce function and produces intermediate key-value pairs. Combiners reduce network traffic by pre-aggregating data, but the framework provides no guarantee about when or if combiners execute.
# Combiner function (same signature as reduce)
def combine(intermediate_key, values_iterator)
# Perform partial reduction locally
emit(intermediate_key, partial_result)
end
Partitioning determines which reducer processes each intermediate key. The default partitioner uses hash-based distribution to balance load across reducers. Custom partitioners enable specialized distribution strategies, such as range partitioning for sorted output or domain-specific key grouping.
The paradigm enforces functional purity constraints on map and reduce functions. These functions should produce deterministic output given the same input and avoid side effects. Determinism ensures that re-executing failed tasks produces identical results, maintaining correctness despite failures.
Ruby Implementation
Ruby provides multiple approaches to implement MapReduce concepts, from simple in-memory operations to distributed processing frameworks. The language's enumerable methods and functional programming features align naturally with MapReduce principles.
The core enumerable methods map, group_by, and reduce correspond directly to MapReduce phases. Ruby's blocks provide concise syntax for defining transformation and aggregation logic.
class SimpleMapReduce
def initialize(data)
@data = data
end
def map(&mapper)
@intermediate = @data.flat_map(&mapper)
self
end
def shuffle
@shuffled = @intermediate.group_by(&:first)
.transform_values { |pairs| pairs.map(&:last) }
self
end
def reduce(&reducer)
@shuffled.transform_values { |values| reducer.call(values) }
end
end
# Word count example
data = ["hello world", "hello ruby", "world map"]
result = SimpleMapReduce.new(data)
.map { |line| line.split.map { |word| [word, 1] } }
.shuffle
.reduce { |counts| counts.sum }
# => {"hello"=>2, "world"=>2, "ruby"=>1, "map"=>1}
For parallel processing on multi-core systems, Ruby's Thread and Ractor classes enable concurrent execution. The parallel gem provides higher-level abstractions for parallel map and reduce operations.
require 'parallel'
class ParallelMapReduce
def initialize(data, workers: Parallel.processor_count)
@data = data
@workers = workers
end
def map(&mapper)
@intermediate = Parallel.flat_map(@data, in_threads: @workers, &mapper)
self
end
def shuffle
@shuffled = @intermediate.group_by(&:first)
.transform_values { |pairs| pairs.map(&:last) }
self
end
def reduce(&reducer)
Parallel.map(@shuffled, in_threads: @workers) do |key, values|
[key, reducer.call(values)]
end.to_h
end
end
# Process large dataset with parallel execution
data = File.readlines('large_file.txt', chomp: true)
result = ParallelMapReduce.new(data, workers: 8)
.map { |line| line.split.map { |word| [word.downcase, 1] } }
.shuffle
.reduce { |counts| counts.sum }
Distributed MapReduce requires coordination between multiple machines. The hadoop-ruby gem provides bindings to Hadoop Streaming, enabling Ruby scripts to serve as mappers and reducers in Hadoop clusters.
#!/usr/bin/env ruby
# mapper.rb - Hadoop Streaming mapper
ARGF.each_line do |line|
words = line.strip.split
words.each do |word|
puts "#{word}\t1"
end
end
#!/usr/bin/env ruby
# reducer.rb - Hadoop Streaming reducer
current_word = nil
current_count = 0
ARGF.each_line do |line|
word, count = line.strip.split("\t")
count = count.to_i
if word == current_word
current_count += count
else
puts "#{current_word}\t#{current_count}" if current_word
current_word = word
current_count = count
end
end
puts "#{current_word}\t#{current_count}" if current_word
Custom implementations can optimize for specific use cases. This example implements a MapReduce framework with combiner support and configurable partitioning.
class MapReduceFramework
attr_reader :mappers, :reducers, :combiner, :partitioner
def initialize(mappers: 4, reducers: 4)
@mappers = mappers
@reducers = reducers
@combiner = nil
@partitioner = ->(key) { key.hash % @reducers }
end
def set_combiner(&block)
@combiner = block
self
end
def set_partitioner(&block)
@partitioner = block
self
end
def execute(input_data, mapper_func, reducer_func)
# Split input across mappers
chunks = input_data.each_slice((input_data.size.to_f / @mappers).ceil).to_a
# Map phase with optional combine
map_outputs = Parallel.map(chunks, in_threads: @mappers) do |chunk|
intermediate = chunk.flat_map { |item| mapper_func.call(item) }
if @combiner
intermediate.group_by(&:first)
.flat_map { |k, pairs| [[k, @combiner.call(pairs.map(&:last))]] }
else
intermediate
end
end
# Partition intermediate data
partitions = Array.new(@reducers) { [] }
map_outputs.flatten(1).each do |key, value|
partition_id = @partitioner.call(key)
partitions[partition_id] << [key, value]
end
# Reduce phase
results = Parallel.map(partitions.each_with_index, in_threads: @reducers) do |partition, _|
partition.group_by(&:first).map do |key, pairs|
values = pairs.map(&:last)
[key, reducer_func.call(key, values)]
end
end
results.flatten(1).to_h
end
end
# Usage with combiner for word count
framework = MapReduceFramework.new(mappers: 8, reducers: 4)
.set_combiner { |counts| counts.sum }
mapper = ->(line) { line.split.map { |word| [word.downcase, 1] } }
reducer = ->(key, values) { values.sum }
result = framework.execute(large_dataset, mapper, reducer)
For distributed Ruby applications, the resque and sidekiq background job processors can orchestrate MapReduce workflows across worker processes. Each map and reduce task executes as an asynchronous job.
class MapTask
include Sidekiq::Worker
def perform(input_chunk_id, mapper_class)
input_chunk = fetch_input_chunk(input_chunk_id)
mapper = mapper_class.constantize.new
intermediate_results = input_chunk.flat_map do |record|
mapper.map(record)
end
store_intermediate_results(input_chunk_id, intermediate_results)
end
end
class ReduceTask
include Sidekiq::Worker
def perform(key, reducer_class)
intermediate_values = fetch_intermediate_values(key)
reducer = reducer_class.constantize.new
result = reducer.reduce(key, intermediate_values)
store_final_result(key, result)
end
end
Practical Examples
Understanding MapReduce through concrete examples demonstrates how the paradigm applies to real-world data processing problems. Each example shows the complete flow from input data through map, shuffle, and reduce phases.
Word Frequency Analysis represents the canonical MapReduce example. Given a corpus of text documents, count the occurrence of each word across all documents.
# Input: Collection of text documents
documents = [
"the quick brown fox jumps over the lazy dog",
"the dog barks at the fox",
"quick brown dogs run fast"
]
# Map phase: Extract words and emit (word, 1) pairs
def map_words(document)
document.downcase.scan(/\w+/).map { |word| [word, 1] }
end
intermediate = documents.flat_map { |doc| map_words(doc) }
# => [["the", 1], ["quick", 1], ["brown", 1], ["fox", 1], ...]
# Shuffle phase: Group by word
shuffled = intermediate.group_by(&:first)
.transform_values { |pairs| pairs.map(&:last) }
# => {"the"=>[1, 1, 1], "quick"=>[1, 1], "brown"=>[1, 1], ...}
# Reduce phase: Sum counts for each word
word_counts = shuffled.transform_values(&:sum)
# => {"the"=>3, "quick"=>2, "brown"=>2, "fox"=>2, "dog"=>2, ...}
Log Analysis processes server logs to identify error patterns. This example calculates error rates by status code and identifies frequently failing endpoints.
# Input: Server access logs
logs = [
"2025-01-15 10:23:45 GET /api/users 200 45ms",
"2025-01-15 10:23:46 POST /api/login 401 12ms",
"2025-01-15 10:23:47 GET /api/users 500 102ms",
"2025-01-15 10:23:48 GET /api/products 200 34ms",
"2025-01-15 10:23:49 POST /api/orders 500 98ms"
]
# Map phase: Extract status codes and endpoints
def map_log_entry(log_line)
parts = log_line.split
method, endpoint, status = parts[2], parts[3], parts[4]
[
["status:#{status}", 1],
["endpoint:#{method} #{endpoint}", 1],
["error:#{endpoint}", 1] if status.to_i >= 400
].compact
end
intermediate = logs.flat_map { |log| map_log_entry(log) }
# Shuffle and reduce
results = intermediate.group_by(&:first)
.transform_values { |pairs| pairs.map(&:last).sum }
# => {
# "status:200"=>2,
# "status:401"=>1,
# "status:500"=>2,
# "endpoint:GET /api/users"=>2,
# "endpoint:POST /api/login"=>1,
# "error:/api/users"=>1,
# "error:/api/orders"=>1
# }
Inverted Index Construction builds a search index mapping terms to documents containing those terms. This forms the foundation of search engines.
# Input: Documents with IDs
documents = {
"doc1" => "ruby programming language",
"doc2" => "python programming language",
"doc3" => "ruby on rails framework",
"doc4" => "ruby gems and bundler"
}
# Map phase: Emit (term, document_id) pairs
def map_document(doc_id, content)
content.split.uniq.map { |term| [term, doc_id] }
end
intermediate = documents.flat_map { |doc_id, content|
map_document(doc_id, content)
}
# Reduce phase: Collect all document IDs per term
inverted_index = intermediate.group_by(&:first)
.transform_values { |pairs| pairs.map(&:last).uniq }
# => {
# "ruby"=>["doc1", "doc3", "doc4"],
# "programming"=>["doc1", "doc2"],
# "language"=>["doc1", "doc2"],
# "python"=>["doc2"],
# "on"=>["doc3"],
# "rails"=>["doc3"],
# ...
# }
Distributed Grep searches for pattern matches across large datasets, emitting matching lines with their locations.
# Input: Files and search pattern
files = {
"app/models/user.rb" => ["class User < ApplicationRecord", " validates :email"],
"app/models/order.rb" => ["class Order < ApplicationRecord", " belongs_to :user"],
"spec/models/user_spec.rb" => ["describe User do", " it validates email"]
}
pattern = /email/
# Map phase: Search for pattern in each file
def map_file(filename, lines)
lines.each_with_index.flat_map do |line, index|
[[filename, "#{index + 1}: #{line}"]] if line.match?(pattern)
end.compact
end
matches = files.flat_map { |file, lines| map_file(file, lines) }
.group_by(&:first)
.transform_values { |pairs| pairs.map(&:last) }
# => {
# "app/models/user.rb"=>["2: validates :email"],
# "spec/models/user_spec.rb"=>["2: it validates email"]
# }
Join Operation combines two datasets on a common key, similar to database joins. This example joins user data with order data.
# Input: Two datasets with common keys
users = [
["user1", "Alice"],
["user2", "Bob"],
["user3", "Carol"]
]
orders = [
["order1", "user1", 100],
["order2", "user2", 200],
["order3", "user1", 150]
]
# Map phase: Tag records with source and emit by join key
def map_user(user_id, name)
[[user_id, [:user, name]]]
end
def map_order(order_id, user_id, amount)
[[user_id, [:order, order_id, amount]]]
end
user_pairs = users.flat_map { |uid, name| map_user(uid, name) }
order_pairs = orders.flat_map { |oid, uid, amt| map_order(oid, uid, amt) }
intermediate = user_pairs + order_pairs
# Reduce phase: Perform join on co-located records
joined = intermediate.group_by(&:first).transform_values do |pairs|
values = pairs.map(&:last)
user_data = values.find { |v| v[0] == :user }
order_data = values.select { |v| v[0] == :order }
order_data.map do |order|
{
user_id: user_data[1],
order_id: order[1],
amount: order[2]
}
end
end.values.flatten
# => [
# {user_id: "Alice", order_id: "order1", amount: 100},
# {user_id: "Alice", order_id: "order3", amount: 150},
# {user_id: "Bob", order_id: "order2", amount: 200}
# ]
Implementation Approaches
Different implementation strategies suit different scales and requirements. Selecting the appropriate approach depends on data volume, processing complexity, infrastructure constraints, and latency requirements.
In-Memory Processing handles datasets that fit within a single machine's memory. This approach uses language-native data structures and iteration patterns without external dependencies. In-memory processing offers the simplest implementation with minimal overhead but limits dataset size to available memory.
class InMemoryMapReduce
def self.execute(data, mapper, reducer)
intermediate = data.flat_map(&mapper)
grouped = intermediate.group_by(&:first)
grouped.transform_values { |pairs| reducer.call(pairs.map(&:last)) }
end
end
In-memory processing suits development, testing, and small-scale production workloads. The approach provides fast iteration cycles and straightforward debugging. Memory constraints become the primary limitation as data volume grows.
Multi-threaded Processing distributes computation across CPU cores within a single machine. This approach increases throughput for CPU-bound operations without requiring distributed infrastructure. Thread pools manage worker threads that process data chunks concurrently.
Thread synchronization overhead and Ruby's Global Interpreter Lock affect performance. Native extensions and I/O-bound operations release the GIL, enabling true parallelism for certain workloads. Ractors in Ruby 3+ provide parallel execution without GIL constraints but impose restrictions on shared data.
Distributed Processing partitions data and computation across multiple machines in a cluster. This approach handles datasets exceeding single-machine capacity and scales horizontally by adding nodes. Distributed frameworks manage task scheduling, data distribution, fault tolerance, and result aggregation.
Hadoop represents the dominant open-source MapReduce implementation. The framework stores data in HDFS (Hadoop Distributed File System) and executes MapReduce jobs using YARN (Yet Another Resource Negotiator). Hadoop provides fault tolerance through data replication and task re-execution.
# Hadoop Streaming enables Ruby mappers and reducers
# mapper.rb
STDIN.each_line do |line|
# Map logic
key, value = process(line)
puts "#{key}\t#{value}"
end
# reducer.rb
current_key = nil
accumulated = []
STDIN.each_line do |line|
key, value = line.chomp.split("\t")
if key != current_key
process_group(current_key, accumulated) if current_key
current_key = key
accumulated = []
end
accumulated << value
end
process_group(current_key, accumulated) if current_key
Streaming Processing handles continuous data streams rather than static datasets. Streaming frameworks process records as they arrive, maintaining running aggregations and windowed computations. This approach provides lower latency than batch processing but requires different programming models.
Apache Storm, Apache Flink, and Kafka Streams implement streaming MapReduce variants. These systems maintain state across records and provide exactly-once or at-least-once processing guarantees.
Hybrid Approaches combine multiple strategies to optimize for specific requirements. Lambda architecture maintains both batch and streaming processing pipelines, using batch processing for comprehensive analysis and streaming for real-time updates. Kappa architecture simplifies this by using only streaming infrastructure for both real-time and historical processing.
Spark bridges batch and streaming processing with a unified API. Spark's resilient distributed datasets (RDDs) support MapReduce operations alongside more complex transformations. Spark executes in-memory where possible, reducing disk I/O compared to Hadoop.
# Using spark-ruby for distributed processing
require 'spark'
conf = Spark::Conf.new.set_app_name("ruby_mapreduce")
sc = Spark::Context.new(conf: conf)
# Load and process data
rdd = sc.text_file("hdfs://path/to/data")
.flat_map { |line| line.split.map { |word| [word, 1] } }
.reduce_by_key { |a, b| a + b }
.save_as_text_file("hdfs://path/to/output")
Serverless Processing executes MapReduce workloads using cloud functions triggered by events. This approach eliminates infrastructure management and scales automatically based on load. Cloud providers' function services execute mapper and reducer functions in response to data availability.
AWS Lambda, Google Cloud Functions, and Azure Functions support this model. The approach suits sporadic workloads with variable data volumes but introduces cold start latency and limits execution duration.
Implementation selection requires evaluating trade-offs between simplicity, performance, scalability, and cost. Start with the simplest approach that meets requirements, then scale complexity as needed.
Performance Considerations
MapReduce performance depends on multiple factors including data volume, computation complexity, network bandwidth, disk I/O, and cluster resources. Understanding these factors enables optimization strategies that improve throughput and reduce latency.
Data Locality critically impacts performance by minimizing network data transfer. The framework schedules map tasks on nodes storing input data blocks. When input data resides locally on the mapper node, the task reads from local disk rather than transferring over the network. Network transfers consume bandwidth and increase latency, particularly in large clusters.
Data locality optimization works at multiple levels. Node-local placement reads data from the same machine's storage. Rack-local placement reads from machines in the same network switch, reducing but not eliminating network hops. Off-rack placement requires inter-rack network transfer, incurring the highest latency.
HDFS typically replicates each data block three times across different nodes and racks. The scheduler prefers nodes with local replicas, falling back to rack-local and then off-rack placement when local nodes are busy or unavailable.
Combiner Functions reduce network traffic by performing partial aggregation before the shuffle phase. Combiners execute on mapper nodes, reducing the volume of intermediate data transferred to reducers. Effective combiner use can reduce shuffle data by orders of magnitude.
# Without combiner: 1 million records → 1 million pairs transferred
# Map output
1_000_000.times.map { |i| ["common_key", 1] }
# => 1 million pairs sent over network
# With combiner: 1 million records → 1 pair per mapper transferred
# Map output after combiner
num_mappers = 100
num_mappers.times.map { ["common_key", 10_000] }
# => 100 pairs sent over network
Combiners must be associative and commutative to produce correct results. The framework may invoke combiners zero, one, or multiple times per key, so combiner logic must handle partial aggregation correctly. Not all reduce operations support combiners.
Partitioning Strategy affects load balance across reducers. The default hash partitioner distributes keys uniformly but may create skew for non-uniform key distributions. Some keys produce more intermediate values than others, causing certain reducers to process disproportionate data volumes.
Custom partitioners address skew by implementing domain-specific distribution logic. Range partitioning assigns contiguous key ranges to reducers, producing sorted output. Composite keys enable multi-level partitioning for complex data structures.
# Custom partitioner for skewed data
class CustomPartitioner
def initialize(num_reducers)
@num_reducers = num_reducers
@heavy_keys = Set.new(['popular_key_1', 'popular_key_2'])
end
def get_partition(key)
if @heavy_keys.include?(key)
# Spread heavy keys across reducers
key.hash % (@num_reducers / 2)
else
# Normal keys use remaining reducers
(@num_reducers / 2) + (key.hash % (@num_reducers / 2))
end
end
end
Speculation mitigates slow tasks by launching duplicate executions of stragglers. The framework monitors task progress and identifies tasks running significantly slower than the median. When detected, the scheduler launches speculative copies of slow tasks on different nodes. The first copy to complete provides the result, and remaining copies are killed.
Speculation improves job completion time when individual tasks slow due to hardware issues, resource contention, or data skew. However, speculation consumes additional cluster resources by running duplicate work. Aggressive speculation settings can waste resources on false positives.
Compression reduces storage and network transfer volumes at the cost of CPU cycles. Compressed input files reduce disk reads during the map phase. Compressed intermediate data reduces shuffle network transfer. Compressed output files reduce storage requirements.
Different compression codecs offer trade-offs between compression ratio and speed. Snappy provides fast compression with moderate ratios, while gzip achieves higher compression at lower speed. Splittable compression codecs like bzip2 enable parallel processing of compressed files.
Memory Configuration affects performance by controlling buffer sizes, sort memory, and JVM heap allocations. Larger buffers reduce disk spills during the shuffle phase but consume more memory. The framework spills intermediate data to disk when memory buffers fill, incurring I/O overhead.
Optimal memory settings depend on data characteristics and cluster resources. Increase sort buffer sizes for jobs producing large intermediate outputs. Configure JVM heap sizes to avoid garbage collection overhead while leaving memory for operating system caches.
Task Granularity balances parallelism against overhead. More tasks enable finer-grained parallelism and better load distribution but increase scheduling and management overhead. The framework launches tasks, tracks progress, and handles failures, with per-task costs.
Generally, aim for 10-100 times more tasks than cluster nodes to enable load balancing and speculation. Each task should process enough data to amortize startup overhead, typically tens of megabytes to a few gigabytes.
Tools & Ecosystem
Multiple frameworks and tools implement MapReduce concepts, each with different design philosophies, performance characteristics, and operational requirements. Understanding the ecosystem helps select appropriate tools for specific use cases.
Apache Hadoop represents the canonical open-source MapReduce implementation. Hadoop consists of HDFS for distributed storage, YARN for resource management, and MapReduce for computation. The framework provides reliability through replication and supports massive datasets across commodity hardware clusters.
Hadoop's architecture separates storage from computation, enabling independent scaling. HDFS stores data in blocks across cluster nodes with configurable replication. YARN manages cluster resources and schedules tasks. The MapReduce runtime executes map and reduce tasks, handling failures and retries.
# Submit MapReduce job to Hadoop cluster
hadoop jar mapreduce-job.jar \
-mapper mapper.rb \
-reducer reducer.rb \
-input /input/path \
-output /output/path
Ruby integration with Hadoop primarily uses Hadoop Streaming, which treats mapper and reducer scripts as executables. Scripts read from stdin and write to stdout, with the framework handling data routing. The hadoop-streaming gem provides Ruby-friendly wrappers.
Apache Spark provides faster MapReduce execution through in-memory computation and DAG (directed acyclic graph) optimization. Spark maintains intermediate data in memory when possible, avoiding disk I/O between stages. The framework supports interactive queries and iterative algorithms more efficiently than Hadoop.
Spark's RDD abstraction supports map and reduce operations alongside transformations like filter, join, and aggregate. DataFrames and Datasets provide higher-level APIs with query optimization. Spark SQL enables SQL queries on distributed data.
Amazon EMR (Elastic MapReduce) provides managed Hadoop and Spark clusters on AWS infrastructure. EMR handles cluster provisioning, configuration, and scaling, reducing operational overhead. The service integrates with S3 for storage, IAM for access control, and CloudWatch for monitoring.
Google Cloud Dataproc offers similar managed cluster services on Google Cloud Platform. Dataproc clusters start quickly and integrate with BigQuery, Cloud Storage, and other GCP services. Per-second billing and autoscaling reduce costs for variable workloads.
Apache Pig provides a higher-level language (Pig Latin) that compiles to MapReduce jobs. Pig Latin scripts express data transformations through declarative operations like FOREACH, JOIN, and GROUP. The compiler optimizes query plans and generates efficient MapReduce jobs.
-- Pig Latin script for word count
lines = LOAD 'input' AS (line:chararray);
words = FOREACH lines GENERATE FLATTEN(TOKENIZE(line)) AS word;
grouped = GROUP words BY word;
counts = FOREACH grouped GENERATE group, COUNT(words);
STORE counts INTO 'output';
Apache Hive implements SQL queries on Hadoop through the HiveQL query language. Hive translates SQL to MapReduce jobs, enabling SQL analysts to work with Hadoop data. The metastore tracks table schemas and partitions, mapping logical tables to HDFS files.
Ruby-specific Tools include several gems for MapReduce processing. The skynet gem provides distributed Ruby MapReduce using message queues. The hadoop-ruby gem wraps Hadoop Streaming with Ruby idioms. The mrjob gem simplifies writing Hadoop Streaming jobs in various languages including Ruby.
require 'hadoop'
class WordCountJob < Hadoop::Job
def map(key, value)
value.split.each do |word|
yield word, 1
end
end
def reduce(key, values)
yield key, values.sum
end
end
WordCountJob.run(ARGV)
Workflow Orchestration tools coordinate complex MapReduce pipelines. Apache Oozie schedules and manages Hadoop jobs, handling dependencies and failures. Apache Airflow provides programmatic workflow definition with extensive monitoring and retry logic.
Monitoring and Debugging tools help diagnose performance issues and failures. The Hadoop ResourceManager UI shows job progress, resource utilization, and task statistics. Spark's Web UI displays stage execution, memory usage, and query plans. Metrics collection through Ganglia or Prometheus enables historical analysis.
Distributed tracing tools like Zipkin and Jaeger track request flows across MapReduce stages, identifying bottlenecks and latency sources. These tools integrate with distributed systems to provide end-to-end visibility.
Reference
MapReduce Phases
| Phase | Input | Output | Parallelization | Purpose |
|---|---|---|---|---|
| Map | Input key-value pairs | Intermediate key-value pairs | Fully parallel across input splits | Transform and filter input data |
| Shuffle | Intermediate pairs from all mappers | Grouped pairs by key | Parallel within keys | Partition and sort intermediate data |
| Reduce | Key and iterator of values | Output key-value pairs | Fully parallel across keys | Aggregate values per key |
| Combine | Intermediate key-value pairs | Reduced intermediate pairs | Local to mapper | Pre-aggregate data before shuffle |
Function Signatures
| Function | Parameters | Returns | Constraints |
|---|---|---|---|
| map | input_key, input_value | Zero or more intermediate key-value pairs | Deterministic, no side effects |
| reduce | intermediate_key, value_iterator | Zero or more output key-value pairs | Deterministic, no side effects |
| combine | intermediate_key, value_iterator | Single intermediate key-value pair | Must be associative and commutative |
| partition | intermediate_key, num_reducers | Reducer index (0 to num_reducers-1) | Deterministic, balanced distribution |
Performance Optimization Techniques
| Technique | Impact | When to Use | Trade-offs |
|---|---|---|---|
| Combiner | Reduces shuffle data by 10-100x | Associative/commutative reductions | Additional CPU for combiner execution |
| Compression | Reduces I/O by 2-10x | Network or storage bottlenecked | CPU overhead for compression/decompression |
| Data locality | Reduces network transfer to near zero | Large datasets on HDFS | Requires data replication and smart scheduling |
| Custom partitioner | Eliminates reducer skew | Non-uniform key distributions | Increased implementation complexity |
| Speculation | Reduces tail latency by 20-50% | Heterogeneous clusters | Wastes resources on duplicate work |
| In-memory combine | Reduces spills by 50-90% | Map tasks with high key repetition | Increased memory requirements |
Common MapReduce Patterns
| Pattern | Map Output | Reduce Logic | Use Cases |
|---|---|---|---|
| Counting | (item, 1) | Sum values | Word count, event counting, histogram generation |
| Filtering | (key, record) if condition | First value | Data cleaning, subset extraction |
| Aggregation | (group_key, metric) | Sum, avg, min, max | Analytics, reporting, summarization |
| Join | (join_key, tagged_record) | Combine tagged records | Combining datasets, denormalization |
| Inverted index | (term, document_id) | Collect document IDs | Search indexing, document clustering |
| Sorting | (sort_key, record) | Identity | Distributed sorting, top-N selection |
| Graph processing | (node_id, edges) | Aggregate messages | PageRank, connected components, shortest paths |
Ruby Enumerable Methods for MapReduce
| Operation | Ruby Method | MapReduce Phase | Example |
|---|---|---|---|
| Transform elements | map, collect | Map | array.map { |
| Flatten nested arrays | flat_map | Map with multiple outputs | array.flat_map { |
| Group by key | group_by | Shuffle | array.group_by(&:first) |
| Aggregate values | reduce, inject | Reduce | values.reduce(:+) |
| Combine values | transform_values | Reduce on hash | hash.transform_values(&:sum) |
| Filter elements | select, filter | Map with filtering | array.select { |
Hadoop Streaming Command Options
| Option | Purpose | Example Value |
|---|---|---|
| -input | Input data location | hdfs://data/input |
| -output | Output data location | hdfs://data/output |
| -mapper | Mapper executable | mapper.rb |
| -reducer | Reducer executable | reducer.rb |
| -combiner | Combiner executable | combiner.rb |
| -file | Ship file with job | helper.rb |
| -numReduceTasks | Number of reduce tasks | 10 |
| -inputformat | Input format class | TextInputFormat |
| -outputformat | Output format class | TextOutputFormat |
Fault Tolerance Mechanisms
| Failure Type | Detection Method | Recovery Strategy | Impact |
|---|---|---|---|
| Task failure | Task timeout or error code | Re-execute task on different node | Minimal if speculative execution enabled |
| Node failure | Heartbeat timeout | Re-execute all tasks from failed node | Some work loss, automatic recovery |
| Rack failure | Multiple node failures | Re-execute tasks using replicated data | Higher recovery time, requires data replication |
| Data corruption | Checksum verification | Read from replica | Transparent to job, slight latency increase |
| Slow task | Progress monitoring | Launch speculative copy | Additional resource usage |
Configuration Parameters
| Parameter | Default | Purpose | Tuning Guidance |
|---|---|---|---|
| mapreduce.job.maps | Based on input size | Number of map tasks | Set to 10-100x number of nodes |
| mapreduce.job.reduces | 1 | Number of reduce tasks | Set based on output size and cluster size |
| mapreduce.task.io.sort.mb | 100 MB | Sort buffer size | Increase for large intermediate outputs |
| mapreduce.map.memory.mb | 1024 MB | Mapper memory | Increase for memory-intensive operations |
| mapreduce.reduce.memory.mb | 1024 MB | Reducer memory | Increase for large value groups |
| mapreduce.job.reduce.slowstart.completedmaps | 0.05 | When to start reducers | Lower for shuffle-heavy jobs |