Spark: Distributed Computing on a Cluster #

Cluster environment #

  • Main idea: distributed computing, programming with 10k-100k cares
    • How to ensure no data loss in case of failure of some system component?
    • Programming model: data-parallel operations (e.g. Map and Reduce)
  • Goal: make data-parallel operations
    • Scalable (100ks of cores)
    • Fault-tolerant (ensure no data loss in case of failure)
    • Efficient (optimize system perf. with efficient use of memory)

Why use a cluster? #

  • e.g. want to process 100TB of log data (e.g. 1 day at Facebook)
    • On a single node: scanning at 50 MB/s = 23 days
    • On 1000 nodes: scanning at 50 MB/s = 33 min
  • However: hard to use that many nodes
    • Hard to program that many cores
    • Potential failures at that scale
    • Need framework to handle this

Warehouse-scale computing (WSC) #

  • Standard architecture:
    • Cluster of commodity Linux nodes (e.g. multicore x86)
      • Usually 16-32 core CPUs, 128 GB-1 TB of DRAM, 10-30 TB of SSD storage
      • RAM bandwidth: 100 GB/s
      • SSD bandwidth: 1-4 GB/s
    • Private memory: separate address space and separate OS
    • Ethernet network: >10GB today
      • “top-of-rack” switch connects all the nodes in a rack (1-2 GB/s) with nodes in other racks (0.1-2 GB/s)
  • Cheap:
    • Build from commodity hardware
    • Thousands of nodes for <$10M
  • Goal: use supercomputer networking ideas to provide high bandwidth across datacenter
    • However: need to mask issues such as load balancing and failures

Storage systems #

  • First order problem: if nodes can fail, how to store data persistently?
  • Distributed file systems:
    • Google GFS
    • Hadoop HDFS (open-source)
  • Typical usage pattern:
    • Huge files (100s of GBs to TBs)
    • Data rarely updated in place
    • Reads and appends common (e.g. log files)

Architecture of a distributed file system #

  • Chunk servers or HDFS DataNode
    • File split into contiguous chunks (usually 64-256 MB)
    • Each chunk replicated 2-3x
    • Try to keep replicas in different racks
  • Master node or HDFS NameNode
    • Stores metadata, usually replicated
  • Client library for file access
    • Talks to master node to find chunk servers
    • Connects directly to chunk servers to access data

MapReduce programming model #

// called once per block of input by runtime
void mapper(string inp, multimap<string, string>& results);

// called once per unique key in results
// values is a list of values associiated with the given key
void reducer(string key, list<string> values, int& result);

Writer output("hdfs://");

runMapReduceJob(mapper, reducer, input, output);

MapReduce steps #

  1. Run mapper function on all lines of file
    • Question: how to assign work to nodes?
    • Solution: Data-distribution based assignment: each node processes lines in blocks of input file that are stored locally
  2. Prepare intermediate data for reducer
  3. Run reducer function on all keys
    • Question: how to get all data for key onto the correct reduce worker node?
    • Solution: directive from master, assign each type of key to a node

Job scheduler responsibilities #

  • Exploit data locality: “move computation to the data”
    • Run mapper jobs on nodes that contain input files
    • Run reducer jobs on nodes that already have most data for a certain key
  • Handling node failures
    • Scheduler detects job failures and reruns them on new machines
    • Possible since inputs reside in persistent storage (distributed file system)
    • Scheduler duplicates jobs on multiple machines (reduce overall processing latency incurred by node failures)
  • Handling slow machines
    • Scheduler duplicates jobs on multiple machines

MapReduce limitations #

  • Permits only simple program structure: must be map, followed by reduce by key
    • Generalization to DAGs: DryadLINQ, however support is not easy
  • Iterative algorithms must load from disk each iteration
  • This limits more complex, multi-stage applications (e.g. iterative ML and graph processing)

Canonical example: Word count #

  • Input: documents containing some amount of words
    • e.g.
      inp = [
          "the quick brown fox",
          "the fox ate the mouse",
          "how now brown cow"
  • Desired outputs: how many times is each word used?
    • e.g.
          "brown": 2,
          "fox": 2,
          "how": 1,
          "now": 1,
          "the": 3,
          "ate": 1,
          "cow": 1,
          "mouse": 1,
          "quick": 1
  • Example computation: map a function that does partial word count onto each document, then reduce jobs to aggregate

Example: Massive CS149 #

  • Assume cs149log.txt is a large file containing a log of web requests to CS149 site
    • Stored in a distributed FS like HDFS
    • Blocks of the log are stored across a cluster of 4 nodes
  • Now: attempt to query about demographics of students visiting CS149 site (e.g. type of mobile phone)
    void mapper(string line, multimap<string, string>& results) {
        string user_agent = parse_requester_user_agent(line);
        if (is_mobile_client(user_agent))
            results.add(user_agent, 1);
    void reducer(string key, list<string> values, int& result) {
        int sum = 0;
        for (string v : values)
            sum += v;
        result = sum;
    Reader input("hdfs://cs149log.txt");
    Writer output("hdfs://");
    runMapReduceJob(mapper, reducer, input, output);

Apache Spark #

  • Motivating idea: despite huge amounts of data, many working sets in big data clusters fit in memory (Ananthanarayanan et al. 2011)
    • Spark: Zaharia et al. 2012
  • Goals:
    • Programming model for cluster-scale computations with significant intermediate dataset reuse
    • Don’t want to incur inefficiency of writing intermediates to persistent distributed FS
      • Keep data in memory!
      • Challenge: efficiently implementing fault-tolerance for in-memory calculations at scale

Fault-tolerance for in-memory calculations #

  • Naive: replicate all computations, decreases peak throughput
  • Another idea: checkpoint and rollback
    • Save state of program to persistent storage
    • Restart from last checkpoint on node failure
  • Another idea: maintain log of updates (commands and data)
    • Naive: high maintenance overhead
    • However, use MapReduce to cut overhead down!
      • Checkpoints after each map/reduce step by writing results to FS
      • Scheduler’s list of outstanding (but not-yet-complete) jobs is a log
      • Functional structure of programs allows for restart at granularity of single map/reduce invocation (rather than restarting entire program)

Resilient Distributed Dataset (RDD): Spark’s key abstraction #

  • Read-only, ordered, immutable collection of records
  • RDDs can only be created by deterministic transformations on data in persistant storage or on existing RDDs
  • Actions on RDDs return data to application
  • e.g. CS149 mobile counting
    // create RDD from FS data
    val lines = spark.textFile("hdfs://cs149log.txt");
    // create RDD using filter() transformation on lines
    val mobileViews = lines.filter((x : String) => isMobileClient(x));
    // another filter() transformation
    val safariViews = mobileViews.filter((x: String) => x.contains("Safari"));
    // then count number of elements in RDD via count() action
    val numViews = safariViews.count();
    // one-liner for aggregating view counts across different user agents
    // at each step, "lineage": sequence of RDD ops needed to compute output
    // allows checkpointing for failure resistance
    val perAgentCounts = spark
        .filter(x => isMobileClient(x))
        .map(x => (parseUserAgent(x), 1))
        .reduceByKey((x, y) => x+y)
    // can also do forks and forked computations for multiple results from a single RDD

RDD constraints and optimization #

  • Storage
    • Cannot keep entirely in memory: representation would be huge, larger than original file in disk
  • Partitioning and dependencies
    • Narrow dependencies: each partition of parent RDD referenced by at most one child RDD partition
      • Allows for op fusing (e.g. can apply map, filter all at once on input element, saving on memory and disk usage)
      • Not necessary in all cases to communicate between nodes of cluster for transformation, only for reduce step at end
    • Wide dependencies: each partition of parent RDD needs to be referenced by multiple child RDD partitions
      • Requires dependency sorting that may induce communication
      • May trigger significant recomputation of ancestor lineage in failure case
    • Choice of partitioning impacts whether narrow dependencies are possible or if wide dependencies are needed, e.g.
      // map keys to integers
      val partitioner = spark.HashPartitioner(100);
      // inform Spark of partition
      // .persist(): instructs Spark to try to keep dataset in memory
      // note: .persist(RELIABLE): store contents in durable storage (i.e., checkpoint it)
      val mobileViewPartitioned = mobileViews.partitionBy(partitioner).persist();
      val clientInfoPartitioned = clientInfo.partitionBy(partitioner).persist();
      // due to explicit partitioning, only creates narrow dependencies
      void joined = mobileViewPartitioned.join(clientInfoPartitioned);
  • Node failure case: recomputing lost RDD partitions from lineage
    • Must reload subset of data from disk and recompute entire sequence of operations given by lineage to regenerate missing partitions

Modern Spark ecosystem #

  • Compelling feature: enables integration/composition of multiple domain-specific frameworks, all implemented with RDDs
    • e.g. Spark SQL: Interleave computation and data query
    • e.g. MLib: ML library on top of spark abstractions