Spark: Distributed Computing on a Cluster #
Cluster environment #
- Main idea: distributed computing, programming with 10k-100k cares
- How to ensure no data loss in case of failure of some system component?
- Programming model: data-parallel operations (e.g. Map and Reduce)
- Goal: make data-parallel operations
- Scalable (100ks of cores)
- Fault-tolerant (ensure no data loss in case of failure)
- Efficient (optimize system perf. with efficient use of memory)
Why use a cluster? #
- e.g. want to process 100TB of log data (e.g. 1 day at Facebook)
- On a single node: scanning at 50 MB/s = 23 days
- On 1000 nodes: scanning at 50 MB/s = 33 min
- However: hard to use that many nodes
- Hard to program that many cores
- Potential failures at that scale
- Need framework to handle this
Warehouse-scale computing (WSC) #
- Standard architecture:
- Cluster of commodity Linux nodes (e.g. multicore x86)
- Usually 16-32 core CPUs, 128 GB-1 TB of DRAM, 10-30 TB of SSD storage
- RAM bandwidth: 100 GB/s
- SSD bandwidth: 1-4 GB/s
- Private memory: separate address space and separate OS
- Ethernet network: >10GB today
- “top-of-rack” switch connects all the nodes in a rack (1-2 GB/s) with nodes in other racks (0.1-2 GB/s)
- Cluster of commodity Linux nodes (e.g. multicore x86)
- Cheap:
- Build from commodity hardware
- Thousands of nodes for <$10M
- Goal: use supercomputer networking ideas to provide high bandwidth across datacenter
- However: need to mask issues such as load balancing and failures
Storage systems #
- First order problem: if nodes can fail, how to store data persistently?
- Distributed file systems:
- Google GFS
- Hadoop HDFS (open-source)
- Typical usage pattern:
- Huge files (100s of GBs to TBs)
- Data rarely updated in place
- Reads and appends common (e.g. log files)
Architecture of a distributed file system #
- Chunk servers or HDFS DataNode
- File split into contiguous chunks (usually 64-256 MB)
- Each chunk replicated 2-3x
- Try to keep replicas in different racks
- Master node or HDFS NameNode
- Stores metadata, usually replicated
- Client library for file access
- Talks to master node to find chunk servers
- Connects directly to chunk servers to access data
MapReduce programming model #
// called once per block of input by runtime
void mapper(string inp, multimap<string, string>& results);
// called once per unique key in results
// values is a list of values associiated with the given key
void reducer(string key, list<string> values, int& result);
Writer output("hdfs://");
runMapReduceJob(mapper, reducer, input, output);
MapReduce steps #
- Run mapper function on all lines of file
- Question: how to assign work to nodes?
- Solution: Data-distribution based assignment: each node processes lines in blocks of input file that are stored locally
- Prepare intermediate data for reducer
- Run reducer function on all keys
- Question: how to get all data for key onto the correct reduce worker node?
- Solution: directive from master, assign each type of key to a node
Job scheduler responsibilities #
- Exploit data locality: “move computation to the data”
- Run mapper jobs on nodes that contain input files
- Run reducer jobs on nodes that already have most data for a certain key
- Handling node failures
- Scheduler detects job failures and reruns them on new machines
- Possible since inputs reside in persistent storage (distributed file system)
- Scheduler duplicates jobs on multiple machines (reduce overall processing latency incurred by node failures)
- Handling slow machines
- Scheduler duplicates jobs on multiple machines
MapReduce limitations #
- Permits only simple program structure: must be map, followed by reduce by key
- Generalization to DAGs: DryadLINQ, however support is not easy
- Iterative algorithms must load from disk each iteration
- This limits more complex, multi-stage applications (e.g. iterative ML and graph processing)
Canonical example: Word count #
- Input: documents containing some amount of words
- e.g.
inp = [ "the quick brown fox", "the fox ate the mouse", "how now brown cow" ]
- e.g.
- Desired outputs: how many times is each word used?
- e.g.
{ "brown": 2, "fox": 2, "how": 1, "now": 1, "the": 3, "ate": 1, "cow": 1, "mouse": 1, "quick": 1 }
- e.g.
- Example computation: map a function that does partial word count onto each document, then reduce jobs to aggregate
Example: Massive CS149 #
- Assume
cs149log.txt
is a large file containing a log of web requests to CS149 site- Stored in a distributed FS like HDFS
- Blocks of the log are stored across a cluster of 4 nodes
- Now: attempt to query about demographics of students visiting CS149 site (e.g. type of mobile phone)
void mapper(string line, multimap<string, string>& results) { string user_agent = parse_requester_user_agent(line); if (is_mobile_client(user_agent)) results.add(user_agent, 1); } void reducer(string key, list<string> values, int& result) { int sum = 0; for (string v : values) sum += v; result = sum; } Reader input("hdfs://cs149log.txt"); Writer output("hdfs://"); runMapReduceJob(mapper, reducer, input, output);
Apache Spark #
- Motivating idea: despite huge amounts of data, many working sets in big data clusters fit in memory (Ananthanarayanan et al. 2011)
- Spark: Zaharia et al. 2012
- Goals:
- Programming model for cluster-scale computations with significant intermediate dataset reuse
- Don’t want to incur inefficiency of writing intermediates to persistent distributed FS
- Keep data in memory!
- Challenge: efficiently implementing fault-tolerance for in-memory calculations at scale
Fault-tolerance for in-memory calculations #
- Naive: replicate all computations, decreases peak throughput
- Another idea: checkpoint and rollback
- Save state of program to persistent storage
- Restart from last checkpoint on node failure
- Another idea: maintain log of updates (commands and data)
- Naive: high maintenance overhead
- However, use MapReduce to cut overhead down!
- Checkpoints after each map/reduce step by writing results to FS
- Scheduler’s list of outstanding (but not-yet-complete) jobs is a log
- Functional structure of programs allows for restart at granularity of single map/reduce invocation (rather than restarting entire program)
Resilient Distributed Dataset (RDD): Spark’s key abstraction #
- Read-only, ordered, immutable collection of records
- RDDs can only be created by deterministic transformations on data in persistant storage or on existing RDDs
- Actions on RDDs return data to application
- e.g. CS149 mobile counting
// create RDD from FS data val lines = spark.textFile("hdfs://cs149log.txt"); // create RDD using filter() transformation on lines val mobileViews = lines.filter((x : String) => isMobileClient(x)); // another filter() transformation val safariViews = mobileViews.filter((x: String) => x.contains("Safari")); // then count number of elements in RDD via count() action val numViews = safariViews.count(); // one-liner for aggregating view counts across different user agents // at each step, "lineage": sequence of RDD ops needed to compute output // allows checkpointing for failure resistance val perAgentCounts = spark .textFile("hdfs://cs149log.txt") .filter(x => isMobileClient(x)) .map(x => (parseUserAgent(x), 1)) .reduceByKey((x, y) => x+y) .collect(); // can also do forks and forked computations for multiple results from a single RDD
RDD constraints and optimization #
- Storage
- Cannot keep entirely in memory: representation would be huge, larger than original file in disk
- Partitioning and dependencies
- Narrow dependencies: each partition of parent RDD referenced by at most one child RDD partition
- Allows for op fusing (e.g. can apply map, filter all at once on input element, saving on memory and disk usage)
- Not necessary in all cases to communicate between nodes of cluster for transformation, only for reduce step at end
- Wide dependencies: each partition of parent RDD needs to be referenced by multiple child RDD partitions
- Requires dependency sorting that may induce communication
- May trigger significant recomputation of ancestor lineage in failure case
- Choice of partitioning impacts whether narrow dependencies are possible or if wide dependencies are needed, e.g.
// map keys to integers val partitioner = spark.HashPartitioner(100); // inform Spark of partition // .persist(): instructs Spark to try to keep dataset in memory // note: .persist(RELIABLE): store contents in durable storage (i.e., checkpoint it) val mobileViewPartitioned = mobileViews.partitionBy(partitioner).persist(); val clientInfoPartitioned = clientInfo.partitionBy(partitioner).persist(); // due to explicit partitioning, only creates narrow dependencies void joined = mobileViewPartitioned.join(clientInfoPartitioned);
- Narrow dependencies: each partition of parent RDD referenced by at most one child RDD partition
- Node failure case: recomputing lost RDD partitions from lineage
- Must reload subset of data from disk and recompute entire sequence of operations given by lineage to regenerate missing partitions
Modern Spark ecosystem #
- Compelling feature: enables integration/composition of multiple domain-specific frameworks, all implemented with RDDs
- e.g. Spark SQL: Interleave computation and data query
- e.g. MLib: ML library on top of spark abstractions